Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 17dfc522

History | View | Annotate | Download (151.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement CheckPrereq which also fills in the opcode instance
52
      with all the fields (even if as None)
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements (REQ_CLUSTER,
57
      REQ_MASTER); note that all commands require root permissions
58

59
  """
60
  HPATH = None
61
  HTYPE = None
62
  _OP_REQP = []
63
  REQ_CLUSTER = True
64
  REQ_MASTER = True
65

    
66
  def __init__(self, processor, op, cfg, sstore):
67
    """Constructor for LogicalUnit.
68

69
    This needs to be overriden in derived classes in order to check op
70
    validity.
71

72
    """
73
    self.proc = processor
74
    self.op = op
75
    self.cfg = cfg
76
    self.sstore = sstore
77
    self.__ssh = None
78

    
79
    for attr_name in self._OP_REQP:
80
      attr_val = getattr(op, attr_name, None)
81
      if attr_val is None:
82
        raise errors.OpPrereqError("Required parameter '%s' missing" %
83
                                   attr_name)
84
    if self.REQ_CLUSTER:
85
      if not cfg.IsCluster():
86
        raise errors.OpPrereqError("Cluster not initialized yet,"
87
                                   " use 'gnt-cluster init' first.")
88
      if self.REQ_MASTER:
89
        master = sstore.GetMasterNode()
90
        if master != utils.HostInfo().name:
91
          raise errors.OpPrereqError("Commands must be run on the master"
92
                                     " node %s" % master)
93

    
94
  def __GetSSH(self):
95
    """Returns the SshRunner object
96

97
    """
98
    if not self.__ssh:
99
      self.__ssh = ssh.SshRunner(self.sstore)
100
    return self.__ssh
101

    
102
  ssh = property(fget=__GetSSH)
103

    
104
  def CheckPrereq(self):
105
    """Check prerequisites for this LU.
106

107
    This method should check that the prerequisites for the execution
108
    of this LU are fulfilled. It can do internode communication, but
109
    it should be idempotent - no cluster or system changes are
110
    allowed.
111

112
    The method should raise errors.OpPrereqError in case something is
113
    not fulfilled. Its return value is ignored.
114

115
    This method should also update all the parameters of the opcode to
116
    their canonical form; e.g. a short node name must be fully
117
    expanded after this method has successfully completed (so that
118
    hooks, logging, etc. work correctly).
119

120
    """
121
    raise NotImplementedError
122

    
123
  def Exec(self, feedback_fn):
124
    """Execute the LU.
125

126
    This method should implement the actual work. It should raise
127
    errors.OpExecError for failures that are somewhat dealt with in
128
    code, or expected.
129

130
    """
131
    raise NotImplementedError
132

    
133
  def BuildHooksEnv(self):
134
    """Build hooks environment for this LU.
135

136
    This method should return a three-node tuple consisting of: a dict
137
    containing the environment that will be used for running the
138
    specific hook for this LU, a list of node names on which the hook
139
    should run before the execution, and a list of node names on which
140
    the hook should run after the execution.
141

142
    The keys of the dict must not have 'GANETI_' prefixed as this will
143
    be handled in the hooks runner. Also note additional keys will be
144
    added by the hooks runner. If the LU doesn't define any
145
    environment, an empty dict (and not None) should be returned.
146

147
    As for the node lists, the master should not be included in the
148
    them, as it will be added by the hooks runner in case this LU
149
    requires a cluster to run on (otherwise we don't have a node
150
    list). No nodes should be returned as an empty list (and not
151
    None).
152

153
    Note that if the HPATH for a LU class is None, this function will
154
    not be called.
155

156
    """
157
    raise NotImplementedError
158

    
159

    
160
class NoHooksLU(LogicalUnit):
161
  """Simple LU which runs no hooks.
162

163
  This LU is intended as a parent for other LogicalUnits which will
164
  run no hooks, in order to reduce duplicate code.
165

166
  """
167
  HPATH = None
168
  HTYPE = None
169

    
170
  def BuildHooksEnv(self):
171
    """Build hooks env.
172

173
    This is a no-op, since we don't run hooks.
174

175
    """
176
    return {}, [], []
177

    
178

    
179
def _AddHostToEtcHosts(hostname):
180
  """Wrapper around utils.SetEtcHostsEntry.
181

182
  """
183
  hi = utils.HostInfo(name=hostname)
184
  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
185

    
186

    
187
def _RemoveHostFromEtcHosts(hostname):
188
  """Wrapper around utils.RemoveEtcHostsEntry.
189

190
  """
191
  hi = utils.HostInfo(name=hostname)
192
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
193
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
194

    
195

    
196
def _GetWantedNodes(lu, nodes):
197
  """Returns list of checked and expanded node names.
198

199
  Args:
200
    nodes: List of nodes (strings) or None for all
201

202
  """
203
  if not isinstance(nodes, list):
204
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
205

    
206
  if nodes:
207
    wanted = []
208

    
209
    for name in nodes:
210
      node = lu.cfg.ExpandNodeName(name)
211
      if node is None:
212
        raise errors.OpPrereqError("No such node name '%s'" % name)
213
      wanted.append(node)
214

    
215
  else:
216
    wanted = lu.cfg.GetNodeList()
217
  return utils.NiceSort(wanted)
218

    
219

    
220
def _GetWantedInstances(lu, instances):
221
  """Returns list of checked and expanded instance names.
222

223
  Args:
224
    instances: List of instances (strings) or None for all
225

226
  """
227
  if not isinstance(instances, list):
228
    raise errors.OpPrereqError("Invalid argument type 'instances'")
229

    
230
  if instances:
231
    wanted = []
232

    
233
    for name in instances:
234
      instance = lu.cfg.ExpandInstanceName(name)
235
      if instance is None:
236
        raise errors.OpPrereqError("No such instance name '%s'" % name)
237
      wanted.append(instance)
238

    
239
  else:
240
    wanted = lu.cfg.GetInstanceList()
241
  return utils.NiceSort(wanted)
242

    
243

    
244
def _CheckOutputFields(static, dynamic, selected):
245
  """Checks whether all selected fields are valid.
246

247
  Args:
248
    static: Static fields
249
    dynamic: Dynamic fields
250

251
  """
252
  static_fields = frozenset(static)
253
  dynamic_fields = frozenset(dynamic)
254

    
255
  all_fields = static_fields | dynamic_fields
256

    
257
  if not all_fields.issuperset(selected):
258
    raise errors.OpPrereqError("Unknown output fields selected: %s"
259
                               % ",".join(frozenset(selected).
260
                                          difference(all_fields)))
261

    
262

    
263
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
264
                          memory, vcpus, nics):
265
  """Builds instance related env variables for hooks from single variables.
266

267
  Args:
268
    secondary_nodes: List of secondary nodes as strings
269
  """
270
  env = {
271
    "OP_TARGET": name,
272
    "INSTANCE_NAME": name,
273
    "INSTANCE_PRIMARY": primary_node,
274
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
275
    "INSTANCE_OS_TYPE": os_type,
276
    "INSTANCE_STATUS": status,
277
    "INSTANCE_MEMORY": memory,
278
    "INSTANCE_VCPUS": vcpus,
279
  }
280

    
281
  if nics:
282
    nic_count = len(nics)
283
    for idx, (ip, bridge, mac) in enumerate(nics):
284
      if ip is None:
285
        ip = ""
286
      env["INSTANCE_NIC%d_IP" % idx] = ip
287
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
288
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
289
  else:
290
    nic_count = 0
291

    
292
  env["INSTANCE_NIC_COUNT"] = nic_count
293

    
294
  return env
295

    
296

    
297
def _BuildInstanceHookEnvByObject(instance, override=None):
298
  """Builds instance related env variables for hooks from an object.
299

300
  Args:
301
    instance: objects.Instance object of instance
302
    override: dict of values to override
303
  """
304
  args = {
305
    'name': instance.name,
306
    'primary_node': instance.primary_node,
307
    'secondary_nodes': instance.secondary_nodes,
308
    'os_type': instance.os,
309
    'status': instance.os,
310
    'memory': instance.memory,
311
    'vcpus': instance.vcpus,
312
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
313
  }
314
  if override:
315
    args.update(override)
316
  return _BuildInstanceHookEnv(**args)
317

    
318

    
319
def _HasValidVG(vglist, vgname):
320
  """Checks if the volume group list is valid.
321

322
  A non-None return value means there's an error, and the return value
323
  is the error message.
324

325
  """
326
  vgsize = vglist.get(vgname, None)
327
  if vgsize is None:
328
    return "volume group '%s' missing" % vgname
329
  elif vgsize < 20480:
330
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
331
            (vgname, vgsize))
332
  return None
333

    
334

    
335
def _InitSSHSetup(node):
336
  """Setup the SSH configuration for the cluster.
337

338

339
  This generates a dsa keypair for root, adds the pub key to the
340
  permitted hosts and adds the hostkey to its own known hosts.
341

342
  Args:
343
    node: the name of this host as a fqdn
344

345
  """
346
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
347

    
348
  for name in priv_key, pub_key:
349
    if os.path.exists(name):
350
      utils.CreateBackup(name)
351
    utils.RemoveFile(name)
352

    
353
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
354
                         "-f", priv_key,
355
                         "-q", "-N", ""])
356
  if result.failed:
357
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
358
                             result.output)
359

    
360
  f = open(pub_key, 'r')
361
  try:
362
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
363
  finally:
364
    f.close()
365

    
366

    
367
def _InitGanetiServerSetup(ss):
368
  """Setup the necessary configuration for the initial node daemon.
369

370
  This creates the nodepass file containing the shared password for
371
  the cluster and also generates the SSL certificate.
372

373
  """
374
  # Create pseudo random password
375
  randpass = sha.new(os.urandom(64)).hexdigest()
376
  # and write it into sstore
377
  ss.SetKey(ss.SS_NODED_PASS, randpass)
378

    
379
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
380
                         "-days", str(365*5), "-nodes", "-x509",
381
                         "-keyout", constants.SSL_CERT_FILE,
382
                         "-out", constants.SSL_CERT_FILE, "-batch"])
383
  if result.failed:
384
    raise errors.OpExecError("could not generate server ssl cert, command"
385
                             " %s had exitcode %s and error message %s" %
386
                             (result.cmd, result.exit_code, result.output))
387

    
388
  os.chmod(constants.SSL_CERT_FILE, 0400)
389

    
390
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
391

    
392
  if result.failed:
393
    raise errors.OpExecError("Could not start the node daemon, command %s"
394
                             " had exitcode %s and error %s" %
395
                             (result.cmd, result.exit_code, result.output))
396

    
397

    
398
def _CheckInstanceBridgesExist(instance):
399
  """Check that the brigdes needed by an instance exist.
400

401
  """
402
  # check bridges existance
403
  brlist = [nic.bridge for nic in instance.nics]
404
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
405
    raise errors.OpPrereqError("one or more target bridges %s does not"
406
                               " exist on destination node '%s'" %
407
                               (brlist, instance.primary_node))
408

    
409

    
410
class LUInitCluster(LogicalUnit):
411
  """Initialise the cluster.
412

413
  """
414
  HPATH = "cluster-init"
415
  HTYPE = constants.HTYPE_CLUSTER
416
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
417
              "def_bridge", "master_netdev"]
418
  REQ_CLUSTER = False
419

    
420
  def BuildHooksEnv(self):
421
    """Build hooks env.
422

423
    Notes: Since we don't require a cluster, we must manually add
424
    ourselves in the post-run node list.
425

426
    """
427
    env = {"OP_TARGET": self.op.cluster_name}
428
    return env, [], [self.hostname.name]
429

    
430
  def CheckPrereq(self):
431
    """Verify that the passed name is a valid one.
432

433
    """
434
    if config.ConfigWriter.IsCluster():
435
      raise errors.OpPrereqError("Cluster is already initialised")
436

    
437
    if self.op.hypervisor_type == constants.HT_XEN_HVM31:
438
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
439
        raise errors.OpPrereqError("Please prepare the cluster VNC"
440
                                   "password file %s" %
441
                                   constants.VNC_PASSWORD_FILE)
442

    
443
    self.hostname = hostname = utils.HostInfo()
444

    
445
    if hostname.ip.startswith("127."):
446
      raise errors.OpPrereqError("This host's IP resolves to the private"
447
                                 " range (%s). Please fix DNS or %s." %
448
                                 (hostname.ip, constants.ETC_HOSTS))
449

    
450
    if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
451
                         source=constants.LOCALHOST_IP_ADDRESS):
452
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
453
                                 " to %s,\nbut this ip address does not"
454
                                 " belong to this host."
455
                                 " Aborting." % hostname.ip)
456

    
457
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
458

    
459
    if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
460
                     timeout=5):
461
      raise errors.OpPrereqError("Cluster IP already active. Aborting.")
462

    
463
    secondary_ip = getattr(self.op, "secondary_ip", None)
464
    if secondary_ip and not utils.IsValidIP(secondary_ip):
465
      raise errors.OpPrereqError("Invalid secondary ip given")
466
    if (secondary_ip and
467
        secondary_ip != hostname.ip and
468
        (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
469
                           source=constants.LOCALHOST_IP_ADDRESS))):
470
      raise errors.OpPrereqError("You gave %s as secondary IP,"
471
                                 " but it does not belong to this host." %
472
                                 secondary_ip)
473
    self.secondary_ip = secondary_ip
474

    
475
    # checks presence of the volume group given
476
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
477

    
478
    if vgstatus:
479
      raise errors.OpPrereqError("Error: %s" % vgstatus)
480

    
481
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
482
                    self.op.mac_prefix):
483
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
484
                                 self.op.mac_prefix)
485

    
486
    if self.op.hypervisor_type not in constants.HYPER_TYPES:
487
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
488
                                 self.op.hypervisor_type)
489

    
490
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
491
    if result.failed:
492
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
493
                                 (self.op.master_netdev,
494
                                  result.output.strip()))
495

    
496
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
497
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
498
      raise errors.OpPrereqError("Init.d script '%s' missing or not"
499
                                 " executable." % constants.NODE_INITD_SCRIPT)
500

    
501
  def Exec(self, feedback_fn):
502
    """Initialize the cluster.
503

504
    """
505
    clustername = self.clustername
506
    hostname = self.hostname
507

    
508
    # set up the simple store
509
    self.sstore = ss = ssconf.SimpleStore()
510
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
511
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
512
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
513
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
514
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
515

    
516
    # set up the inter-node password and certificate
517
    _InitGanetiServerSetup(ss)
518

    
519
    # start the master ip
520
    rpc.call_node_start_master(hostname.name)
521

    
522
    # set up ssh config and /etc/hosts
523
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
524
    try:
525
      sshline = f.read()
526
    finally:
527
      f.close()
528
    sshkey = sshline.split(" ")[1]
529

    
530
    _AddHostToEtcHosts(hostname.name)
531
    _InitSSHSetup(hostname.name)
532

    
533
    # init of cluster config file
534
    self.cfg = cfgw = config.ConfigWriter()
535
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
536
                    sshkey, self.op.mac_prefix,
537
                    self.op.vg_name, self.op.def_bridge)
538

    
539
    ssh.WriteKnownHostsFile(cfgw, ss, constants.SSH_KNOWN_HOSTS_FILE)
540

    
541

    
542
class LUDestroyCluster(NoHooksLU):
543
  """Logical unit for destroying the cluster.
544

545
  """
546
  _OP_REQP = []
547

    
548
  def CheckPrereq(self):
549
    """Check prerequisites.
550

551
    This checks whether the cluster is empty.
552

553
    Any errors are signalled by raising errors.OpPrereqError.
554

555
    """
556
    master = self.sstore.GetMasterNode()
557

    
558
    nodelist = self.cfg.GetNodeList()
559
    if len(nodelist) != 1 or nodelist[0] != master:
560
      raise errors.OpPrereqError("There are still %d node(s) in"
561
                                 " this cluster." % (len(nodelist) - 1))
562
    instancelist = self.cfg.GetInstanceList()
563
    if instancelist:
564
      raise errors.OpPrereqError("There are still %d instance(s) in"
565
                                 " this cluster." % len(instancelist))
566

    
567
  def Exec(self, feedback_fn):
568
    """Destroys the cluster.
569

570
    """
571
    master = self.sstore.GetMasterNode()
572
    if not rpc.call_node_stop_master(master):
573
      raise errors.OpExecError("Could not disable the master role")
574
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
575
    utils.CreateBackup(priv_key)
576
    utils.CreateBackup(pub_key)
577
    rpc.call_node_leave_cluster(master)
578

    
579

    
580
class LUVerifyCluster(NoHooksLU):
581
  """Verifies the cluster status.
582

583
  """
584
  _OP_REQP = []
585

    
586
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
587
                  remote_version, feedback_fn):
588
    """Run multiple tests against a node.
589

590
    Test list:
591
      - compares ganeti version
592
      - checks vg existance and size > 20G
593
      - checks config file checksum
594
      - checks ssh to other nodes
595

596
    Args:
597
      node: name of the node to check
598
      file_list: required list of files
599
      local_cksum: dictionary of local files and their checksums
600

601
    """
602
    # compares ganeti version
603
    local_version = constants.PROTOCOL_VERSION
604
    if not remote_version:
605
      feedback_fn(" - ERROR: connection to %s failed" % (node))
606
      return True
607

    
608
    if local_version != remote_version:
609
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
610
                      (local_version, node, remote_version))
611
      return True
612

    
613
    # checks vg existance and size > 20G
614

    
615
    bad = False
616
    if not vglist:
617
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
618
                      (node,))
619
      bad = True
620
    else:
621
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
622
      if vgstatus:
623
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
624
        bad = True
625

    
626
    # checks config file checksum
627
    # checks ssh to any
628

    
629
    if 'filelist' not in node_result:
630
      bad = True
631
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
632
    else:
633
      remote_cksum = node_result['filelist']
634
      for file_name in file_list:
635
        if file_name not in remote_cksum:
636
          bad = True
637
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
638
        elif remote_cksum[file_name] != local_cksum[file_name]:
639
          bad = True
640
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
641

    
642
    if 'nodelist' not in node_result:
643
      bad = True
644
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
645
    else:
646
      if node_result['nodelist']:
647
        bad = True
648
        for node in node_result['nodelist']:
649
          feedback_fn("  - ERROR: communication with node '%s': %s" %
650
                          (node, node_result['nodelist'][node]))
651
    hyp_result = node_result.get('hypervisor', None)
652
    if hyp_result is not None:
653
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
654
    return bad
655

    
656
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
657
    """Verify an instance.
658

659
    This function checks to see if the required block devices are
660
    available on the instance's node.
661

662
    """
663
    bad = False
664

    
665
    instancelist = self.cfg.GetInstanceList()
666
    if not instance in instancelist:
667
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
668
                      (instance, instancelist))
669
      bad = True
670

    
671
    instanceconfig = self.cfg.GetInstanceInfo(instance)
672
    node_current = instanceconfig.primary_node
673

    
674
    node_vol_should = {}
675
    instanceconfig.MapLVsByNode(node_vol_should)
676

    
677
    for node in node_vol_should:
678
      for volume in node_vol_should[node]:
679
        if node not in node_vol_is or volume not in node_vol_is[node]:
680
          feedback_fn("  - ERROR: volume %s missing on node %s" %
681
                          (volume, node))
682
          bad = True
683

    
684
    if not instanceconfig.status == 'down':
685
      if not instance in node_instance[node_current]:
686
        feedback_fn("  - ERROR: instance %s not running on node %s" %
687
                        (instance, node_current))
688
        bad = True
689

    
690
    for node in node_instance:
691
      if (not node == node_current):
692
        if instance in node_instance[node]:
693
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
694
                          (instance, node))
695
          bad = True
696

    
697
    return bad
698

    
699
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
700
    """Verify if there are any unknown volumes in the cluster.
701

702
    The .os, .swap and backup volumes are ignored. All other volumes are
703
    reported as unknown.
704

705
    """
706
    bad = False
707

    
708
    for node in node_vol_is:
709
      for volume in node_vol_is[node]:
710
        if node not in node_vol_should or volume not in node_vol_should[node]:
711
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
712
                      (volume, node))
713
          bad = True
714
    return bad
715

    
716
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
717
    """Verify the list of running instances.
718

719
    This checks what instances are running but unknown to the cluster.
720

721
    """
722
    bad = False
723
    for node in node_instance:
724
      for runninginstance in node_instance[node]:
725
        if runninginstance not in instancelist:
726
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
727
                          (runninginstance, node))
728
          bad = True
729
    return bad
730

    
731
  def CheckPrereq(self):
732
    """Check prerequisites.
733

734
    This has no prerequisites.
735

736
    """
737
    pass
738

    
739
  def Exec(self, feedback_fn):
740
    """Verify integrity of cluster, performing various test on nodes.
741

742
    """
743
    bad = False
744
    feedback_fn("* Verifying global settings")
745
    for msg in self.cfg.VerifyConfig():
746
      feedback_fn("  - ERROR: %s" % msg)
747

    
748
    vg_name = self.cfg.GetVGName()
749
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
750
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
751
    node_volume = {}
752
    node_instance = {}
753

    
754
    # FIXME: verify OS list
755
    # do local checksums
756
    file_names = list(self.sstore.GetFileList())
757
    file_names.append(constants.SSL_CERT_FILE)
758
    file_names.append(constants.CLUSTER_CONF_FILE)
759
    local_checksums = utils.FingerprintFiles(file_names)
760

    
761
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
762
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
763
    all_instanceinfo = rpc.call_instance_list(nodelist)
764
    all_vglist = rpc.call_vg_list(nodelist)
765
    node_verify_param = {
766
      'filelist': file_names,
767
      'nodelist': nodelist,
768
      'hypervisor': None,
769
      }
770
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
771
    all_rversion = rpc.call_version(nodelist)
772

    
773
    for node in nodelist:
774
      feedback_fn("* Verifying node %s" % node)
775
      result = self._VerifyNode(node, file_names, local_checksums,
776
                                all_vglist[node], all_nvinfo[node],
777
                                all_rversion[node], feedback_fn)
778
      bad = bad or result
779

    
780
      # node_volume
781
      volumeinfo = all_volumeinfo[node]
782

    
783
      if isinstance(volumeinfo, basestring):
784
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
785
                    (node, volumeinfo[-400:].encode('string_escape')))
786
        bad = True
787
        node_volume[node] = {}
788
      elif not isinstance(volumeinfo, dict):
789
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
790
        bad = True
791
        continue
792
      else:
793
        node_volume[node] = volumeinfo
794

    
795
      # node_instance
796
      nodeinstance = all_instanceinfo[node]
797
      if type(nodeinstance) != list:
798
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
799
        bad = True
800
        continue
801

    
802
      node_instance[node] = nodeinstance
803

    
804
    node_vol_should = {}
805

    
806
    for instance in instancelist:
807
      feedback_fn("* Verifying instance %s" % instance)
808
      result =  self._VerifyInstance(instance, node_volume, node_instance,
809
                                     feedback_fn)
810
      bad = bad or result
811

    
812
      inst_config = self.cfg.GetInstanceInfo(instance)
813

    
814
      inst_config.MapLVsByNode(node_vol_should)
815

    
816
    feedback_fn("* Verifying orphan volumes")
817
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
818
                                       feedback_fn)
819
    bad = bad or result
820

    
821
    feedback_fn("* Verifying remaining instances")
822
    result = self._VerifyOrphanInstances(instancelist, node_instance,
823
                                         feedback_fn)
824
    bad = bad or result
825

    
826
    return int(bad)
827

    
828

    
829
class LUVerifyDisks(NoHooksLU):
830
  """Verifies the cluster disks status.
831

832
  """
833
  _OP_REQP = []
834

    
835
  def CheckPrereq(self):
836
    """Check prerequisites.
837

838
    This has no prerequisites.
839

840
    """
841
    pass
842

    
843
  def Exec(self, feedback_fn):
844
    """Verify integrity of cluster disks.
845

846
    """
847
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
848

    
849
    vg_name = self.cfg.GetVGName()
850
    nodes = utils.NiceSort(self.cfg.GetNodeList())
851
    instances = [self.cfg.GetInstanceInfo(name)
852
                 for name in self.cfg.GetInstanceList()]
853

    
854
    nv_dict = {}
855
    for inst in instances:
856
      inst_lvs = {}
857
      if (inst.status != "up" or
858
          inst.disk_template not in constants.DTS_NET_MIRROR):
859
        continue
860
      inst.MapLVsByNode(inst_lvs)
861
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
862
      for node, vol_list in inst_lvs.iteritems():
863
        for vol in vol_list:
864
          nv_dict[(node, vol)] = inst
865

    
866
    if not nv_dict:
867
      return result
868

    
869
    node_lvs = rpc.call_volume_list(nodes, vg_name)
870

    
871
    to_act = set()
872
    for node in nodes:
873
      # node_volume
874
      lvs = node_lvs[node]
875

    
876
      if isinstance(lvs, basestring):
877
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
878
        res_nlvm[node] = lvs
879
      elif not isinstance(lvs, dict):
880
        logger.Info("connection to node %s failed or invalid data returned" %
881
                    (node,))
882
        res_nodes.append(node)
883
        continue
884

    
885
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
886
        inst = nv_dict.pop((node, lv_name), None)
887
        if (not lv_online and inst is not None
888
            and inst.name not in res_instances):
889
          res_instances.append(inst.name)
890

    
891
    # any leftover items in nv_dict are missing LVs, let's arrange the
892
    # data better
893
    for key, inst in nv_dict.iteritems():
894
      if inst.name not in res_missing:
895
        res_missing[inst.name] = []
896
      res_missing[inst.name].append(key)
897

    
898
    return result
899

    
900

    
901
class LURenameCluster(LogicalUnit):
902
  """Rename the cluster.
903

904
  """
905
  HPATH = "cluster-rename"
906
  HTYPE = constants.HTYPE_CLUSTER
907
  _OP_REQP = ["name"]
908

    
909
  def BuildHooksEnv(self):
910
    """Build hooks env.
911

912
    """
913
    env = {
914
      "OP_TARGET": self.sstore.GetClusterName(),
915
      "NEW_NAME": self.op.name,
916
      }
917
    mn = self.sstore.GetMasterNode()
918
    return env, [mn], [mn]
919

    
920
  def CheckPrereq(self):
921
    """Verify that the passed name is a valid one.
922

923
    """
924
    hostname = utils.HostInfo(self.op.name)
925

    
926
    new_name = hostname.name
927
    self.ip = new_ip = hostname.ip
928
    old_name = self.sstore.GetClusterName()
929
    old_ip = self.sstore.GetMasterIP()
930
    if new_name == old_name and new_ip == old_ip:
931
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
932
                                 " cluster has changed")
933
    if new_ip != old_ip:
934
      result = utils.RunCmd(["fping", "-q", new_ip])
935
      if not result.failed:
936
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
937
                                   " reachable on the network. Aborting." %
938
                                   new_ip)
939

    
940
    self.op.name = new_name
941

    
942
  def Exec(self, feedback_fn):
943
    """Rename the cluster.
944

945
    """
946
    clustername = self.op.name
947
    ip = self.ip
948
    ss = self.sstore
949

    
950
    # shutdown the master IP
951
    master = ss.GetMasterNode()
952
    if not rpc.call_node_stop_master(master):
953
      raise errors.OpExecError("Could not disable the master role")
954

    
955
    try:
956
      # modify the sstore
957
      ss.SetKey(ss.SS_MASTER_IP, ip)
958
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
959

    
960
      # Distribute updated ss config to all nodes
961
      myself = self.cfg.GetNodeInfo(master)
962
      dist_nodes = self.cfg.GetNodeList()
963
      if myself.name in dist_nodes:
964
        dist_nodes.remove(myself.name)
965

    
966
      logger.Debug("Copying updated ssconf data to all nodes")
967
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
968
        fname = ss.KeyToFilename(keyname)
969
        result = rpc.call_upload_file(dist_nodes, fname)
970
        for to_node in dist_nodes:
971
          if not result[to_node]:
972
            logger.Error("copy of file %s to node %s failed" %
973
                         (fname, to_node))
974
    finally:
975
      if not rpc.call_node_start_master(master):
976
        logger.Error("Could not re-enable the master role on the master,"
977
                     " please restart manually.")
978

    
979

    
980
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
981
  """Sleep and poll for an instance's disk to sync.
982

983
  """
984
  if not instance.disks:
985
    return True
986

    
987
  if not oneshot:
988
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
989

    
990
  node = instance.primary_node
991

    
992
  for dev in instance.disks:
993
    cfgw.SetDiskID(dev, node)
994

    
995
  retries = 0
996
  while True:
997
    max_time = 0
998
    done = True
999
    cumul_degraded = False
1000
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1001
    if not rstats:
1002
      proc.LogWarning("Can't get any data from node %s" % node)
1003
      retries += 1
1004
      if retries >= 10:
1005
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1006
                                 " aborting." % node)
1007
      time.sleep(6)
1008
      continue
1009
    retries = 0
1010
    for i in range(len(rstats)):
1011
      mstat = rstats[i]
1012
      if mstat is None:
1013
        proc.LogWarning("Can't compute data for node %s/%s" %
1014
                        (node, instance.disks[i].iv_name))
1015
        continue
1016
      # we ignore the ldisk parameter
1017
      perc_done, est_time, is_degraded, _ = mstat
1018
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1019
      if perc_done is not None:
1020
        done = False
1021
        if est_time is not None:
1022
          rem_time = "%d estimated seconds remaining" % est_time
1023
          max_time = est_time
1024
        else:
1025
          rem_time = "no time estimate"
1026
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1027
                     (instance.disks[i].iv_name, perc_done, rem_time))
1028
    if done or oneshot:
1029
      break
1030

    
1031
    if unlock:
1032
      utils.Unlock('cmd')
1033
    try:
1034
      time.sleep(min(60, max_time))
1035
    finally:
1036
      if unlock:
1037
        utils.Lock('cmd')
1038

    
1039
  if done:
1040
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1041
  return not cumul_degraded
1042

    
1043

    
1044
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1045
  """Check that mirrors are not degraded.
1046

1047
  The ldisk parameter, if True, will change the test from the
1048
  is_degraded attribute (which represents overall non-ok status for
1049
  the device(s)) to the ldisk (representing the local storage status).
1050

1051
  """
1052
  cfgw.SetDiskID(dev, node)
1053
  if ldisk:
1054
    idx = 6
1055
  else:
1056
    idx = 5
1057

    
1058
  result = True
1059
  if on_primary or dev.AssembleOnSecondary():
1060
    rstats = rpc.call_blockdev_find(node, dev)
1061
    if not rstats:
1062
      logger.ToStderr("Can't get any data from node %s" % node)
1063
      result = False
1064
    else:
1065
      result = result and (not rstats[idx])
1066
  if dev.children:
1067
    for child in dev.children:
1068
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1069

    
1070
  return result
1071

    
1072

    
1073
class LUDiagnoseOS(NoHooksLU):
1074
  """Logical unit for OS diagnose/query.
1075

1076
  """
1077
  _OP_REQP = []
1078

    
1079
  def CheckPrereq(self):
1080
    """Check prerequisites.
1081

1082
    This always succeeds, since this is a pure query LU.
1083

1084
    """
1085
    return
1086

    
1087
  def Exec(self, feedback_fn):
1088
    """Compute the list of OSes.
1089

1090
    """
1091
    node_list = self.cfg.GetNodeList()
1092
    node_data = rpc.call_os_diagnose(node_list)
1093
    if node_data == False:
1094
      raise errors.OpExecError("Can't gather the list of OSes")
1095
    return node_data
1096

    
1097

    
1098
class LURemoveNode(LogicalUnit):
1099
  """Logical unit for removing a node.
1100

1101
  """
1102
  HPATH = "node-remove"
1103
  HTYPE = constants.HTYPE_NODE
1104
  _OP_REQP = ["node_name"]
1105

    
1106
  def BuildHooksEnv(self):
1107
    """Build hooks env.
1108

1109
    This doesn't run on the target node in the pre phase as a failed
1110
    node would not allows itself to run.
1111

1112
    """
1113
    env = {
1114
      "OP_TARGET": self.op.node_name,
1115
      "NODE_NAME": self.op.node_name,
1116
      }
1117
    all_nodes = self.cfg.GetNodeList()
1118
    all_nodes.remove(self.op.node_name)
1119
    return env, all_nodes, all_nodes
1120

    
1121
  def CheckPrereq(self):
1122
    """Check prerequisites.
1123

1124
    This checks:
1125
     - the node exists in the configuration
1126
     - it does not have primary or secondary instances
1127
     - it's not the master
1128

1129
    Any errors are signalled by raising errors.OpPrereqError.
1130

1131
    """
1132
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1133
    if node is None:
1134
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1135

    
1136
    instance_list = self.cfg.GetInstanceList()
1137

    
1138
    masternode = self.sstore.GetMasterNode()
1139
    if node.name == masternode:
1140
      raise errors.OpPrereqError("Node is the master node,"
1141
                                 " you need to failover first.")
1142

    
1143
    for instance_name in instance_list:
1144
      instance = self.cfg.GetInstanceInfo(instance_name)
1145
      if node.name == instance.primary_node:
1146
        raise errors.OpPrereqError("Instance %s still running on the node,"
1147
                                   " please remove first." % instance_name)
1148
      if node.name in instance.secondary_nodes:
1149
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1150
                                   " please remove first." % instance_name)
1151
    self.op.node_name = node.name
1152
    self.node = node
1153

    
1154
  def Exec(self, feedback_fn):
1155
    """Removes the node from the cluster.
1156

1157
    """
1158
    node = self.node
1159
    logger.Info("stopping the node daemon and removing configs from node %s" %
1160
                node.name)
1161

    
1162
    rpc.call_node_leave_cluster(node.name)
1163

    
1164
    self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1165

    
1166
    logger.Info("Removing node %s from config" % node.name)
1167

    
1168
    self.cfg.RemoveNode(node.name)
1169

    
1170
    _RemoveHostFromEtcHosts(node.name)
1171

    
1172

    
1173
class LUQueryNodes(NoHooksLU):
1174
  """Logical unit for querying nodes.
1175

1176
  """
1177
  _OP_REQP = ["output_fields", "names"]
1178

    
1179
  def CheckPrereq(self):
1180
    """Check prerequisites.
1181

1182
    This checks that the fields required are valid output fields.
1183

1184
    """
1185
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1186
                                     "mtotal", "mnode", "mfree",
1187
                                     "bootid"])
1188

    
1189
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1190
                               "pinst_list", "sinst_list",
1191
                               "pip", "sip"],
1192
                       dynamic=self.dynamic_fields,
1193
                       selected=self.op.output_fields)
1194

    
1195
    self.wanted = _GetWantedNodes(self, self.op.names)
1196

    
1197
  def Exec(self, feedback_fn):
1198
    """Computes the list of nodes and their attributes.
1199

1200
    """
1201
    nodenames = self.wanted
1202
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1203

    
1204
    # begin data gathering
1205

    
1206
    if self.dynamic_fields.intersection(self.op.output_fields):
1207
      live_data = {}
1208
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1209
      for name in nodenames:
1210
        nodeinfo = node_data.get(name, None)
1211
        if nodeinfo:
1212
          live_data[name] = {
1213
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1214
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1215
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1216
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1217
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1218
            "bootid": nodeinfo['bootid'],
1219
            }
1220
        else:
1221
          live_data[name] = {}
1222
    else:
1223
      live_data = dict.fromkeys(nodenames, {})
1224

    
1225
    node_to_primary = dict([(name, set()) for name in nodenames])
1226
    node_to_secondary = dict([(name, set()) for name in nodenames])
1227

    
1228
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1229
                             "sinst_cnt", "sinst_list"))
1230
    if inst_fields & frozenset(self.op.output_fields):
1231
      instancelist = self.cfg.GetInstanceList()
1232

    
1233
      for instance_name in instancelist:
1234
        inst = self.cfg.GetInstanceInfo(instance_name)
1235
        if inst.primary_node in node_to_primary:
1236
          node_to_primary[inst.primary_node].add(inst.name)
1237
        for secnode in inst.secondary_nodes:
1238
          if secnode in node_to_secondary:
1239
            node_to_secondary[secnode].add(inst.name)
1240

    
1241
    # end data gathering
1242

    
1243
    output = []
1244
    for node in nodelist:
1245
      node_output = []
1246
      for field in self.op.output_fields:
1247
        if field == "name":
1248
          val = node.name
1249
        elif field == "pinst_list":
1250
          val = list(node_to_primary[node.name])
1251
        elif field == "sinst_list":
1252
          val = list(node_to_secondary[node.name])
1253
        elif field == "pinst_cnt":
1254
          val = len(node_to_primary[node.name])
1255
        elif field == "sinst_cnt":
1256
          val = len(node_to_secondary[node.name])
1257
        elif field == "pip":
1258
          val = node.primary_ip
1259
        elif field == "sip":
1260
          val = node.secondary_ip
1261
        elif field in self.dynamic_fields:
1262
          val = live_data[node.name].get(field, None)
1263
        else:
1264
          raise errors.ParameterError(field)
1265
        node_output.append(val)
1266
      output.append(node_output)
1267

    
1268
    return output
1269

    
1270

    
1271
class LUQueryNodeVolumes(NoHooksLU):
1272
  """Logical unit for getting volumes on node(s).
1273

1274
  """
1275
  _OP_REQP = ["nodes", "output_fields"]
1276

    
1277
  def CheckPrereq(self):
1278
    """Check prerequisites.
1279

1280
    This checks that the fields required are valid output fields.
1281

1282
    """
1283
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1284

    
1285
    _CheckOutputFields(static=["node"],
1286
                       dynamic=["phys", "vg", "name", "size", "instance"],
1287
                       selected=self.op.output_fields)
1288

    
1289

    
1290
  def Exec(self, feedback_fn):
1291
    """Computes the list of nodes and their attributes.
1292

1293
    """
1294
    nodenames = self.nodes
1295
    volumes = rpc.call_node_volumes(nodenames)
1296

    
1297
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1298
             in self.cfg.GetInstanceList()]
1299

    
1300
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1301

    
1302
    output = []
1303
    for node in nodenames:
1304
      if node not in volumes or not volumes[node]:
1305
        continue
1306

    
1307
      node_vols = volumes[node][:]
1308
      node_vols.sort(key=lambda vol: vol['dev'])
1309

    
1310
      for vol in node_vols:
1311
        node_output = []
1312
        for field in self.op.output_fields:
1313
          if field == "node":
1314
            val = node
1315
          elif field == "phys":
1316
            val = vol['dev']
1317
          elif field == "vg":
1318
            val = vol['vg']
1319
          elif field == "name":
1320
            val = vol['name']
1321
          elif field == "size":
1322
            val = int(float(vol['size']))
1323
          elif field == "instance":
1324
            for inst in ilist:
1325
              if node not in lv_by_node[inst]:
1326
                continue
1327
              if vol['name'] in lv_by_node[inst][node]:
1328
                val = inst.name
1329
                break
1330
            else:
1331
              val = '-'
1332
          else:
1333
            raise errors.ParameterError(field)
1334
          node_output.append(str(val))
1335

    
1336
        output.append(node_output)
1337

    
1338
    return output
1339

    
1340

    
1341
class LUAddNode(LogicalUnit):
1342
  """Logical unit for adding node to the cluster.
1343

1344
  """
1345
  HPATH = "node-add"
1346
  HTYPE = constants.HTYPE_NODE
1347
  _OP_REQP = ["node_name"]
1348

    
1349
  def BuildHooksEnv(self):
1350
    """Build hooks env.
1351

1352
    This will run on all nodes before, and on all nodes + the new node after.
1353

1354
    """
1355
    env = {
1356
      "OP_TARGET": self.op.node_name,
1357
      "NODE_NAME": self.op.node_name,
1358
      "NODE_PIP": self.op.primary_ip,
1359
      "NODE_SIP": self.op.secondary_ip,
1360
      }
1361
    nodes_0 = self.cfg.GetNodeList()
1362
    nodes_1 = nodes_0 + [self.op.node_name, ]
1363
    return env, nodes_0, nodes_1
1364

    
1365
  def CheckPrereq(self):
1366
    """Check prerequisites.
1367

1368
    This checks:
1369
     - the new node is not already in the config
1370
     - it is resolvable
1371
     - its parameters (single/dual homed) matches the cluster
1372

1373
    Any errors are signalled by raising errors.OpPrereqError.
1374

1375
    """
1376
    node_name = self.op.node_name
1377
    cfg = self.cfg
1378

    
1379
    dns_data = utils.HostInfo(node_name)
1380

    
1381
    node = dns_data.name
1382
    primary_ip = self.op.primary_ip = dns_data.ip
1383
    secondary_ip = getattr(self.op, "secondary_ip", None)
1384
    if secondary_ip is None:
1385
      secondary_ip = primary_ip
1386
    if not utils.IsValidIP(secondary_ip):
1387
      raise errors.OpPrereqError("Invalid secondary IP given")
1388
    self.op.secondary_ip = secondary_ip
1389
    node_list = cfg.GetNodeList()
1390
    if node in node_list:
1391
      raise errors.OpPrereqError("Node %s is already in the configuration"
1392
                                 % node)
1393

    
1394
    for existing_node_name in node_list:
1395
      existing_node = cfg.GetNodeInfo(existing_node_name)
1396
      if (existing_node.primary_ip == primary_ip or
1397
          existing_node.secondary_ip == primary_ip or
1398
          existing_node.primary_ip == secondary_ip or
1399
          existing_node.secondary_ip == secondary_ip):
1400
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1401
                                   " existing node %s" % existing_node.name)
1402

    
1403
    # check that the type of the node (single versus dual homed) is the
1404
    # same as for the master
1405
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1406
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1407
    newbie_singlehomed = secondary_ip == primary_ip
1408
    if master_singlehomed != newbie_singlehomed:
1409
      if master_singlehomed:
1410
        raise errors.OpPrereqError("The master has no private ip but the"
1411
                                   " new node has one")
1412
      else:
1413
        raise errors.OpPrereqError("The master has a private ip but the"
1414
                                   " new node doesn't have one")
1415

    
1416
    # checks reachablity
1417
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1418
      raise errors.OpPrereqError("Node not reachable by ping")
1419

    
1420
    if not newbie_singlehomed:
1421
      # check reachability from my secondary ip to newbie's secondary ip
1422
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1423
                           source=myself.secondary_ip):
1424
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1425
                                   " based ping to noded port")
1426

    
1427
    self.new_node = objects.Node(name=node,
1428
                                 primary_ip=primary_ip,
1429
                                 secondary_ip=secondary_ip)
1430

    
1431
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1432
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1433
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1434
                                   constants.VNC_PASSWORD_FILE)
1435

    
1436
  def Exec(self, feedback_fn):
1437
    """Adds the new node to the cluster.
1438

1439
    """
1440
    new_node = self.new_node
1441
    node = new_node.name
1442

    
1443
    # set up inter-node password and certificate and restarts the node daemon
1444
    gntpass = self.sstore.GetNodeDaemonPassword()
1445
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1446
      raise errors.OpExecError("ganeti password corruption detected")
1447
    f = open(constants.SSL_CERT_FILE)
1448
    try:
1449
      gntpem = f.read(8192)
1450
    finally:
1451
      f.close()
1452
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1453
    # so we use this to detect an invalid certificate; as long as the
1454
    # cert doesn't contain this, the here-document will be correctly
1455
    # parsed by the shell sequence below
1456
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1457
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1458
    if not gntpem.endswith("\n"):
1459
      raise errors.OpExecError("PEM must end with newline")
1460
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1461

    
1462
    # and then connect with ssh to set password and start ganeti-noded
1463
    # note that all the below variables are sanitized at this point,
1464
    # either by being constants or by the checks above
1465
    ss = self.sstore
1466
    mycommand = ("umask 077 && "
1467
                 "echo '%s' > '%s' && "
1468
                 "cat > '%s' << '!EOF.' && \n"
1469
                 "%s!EOF.\n%s restart" %
1470
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1471
                  constants.SSL_CERT_FILE, gntpem,
1472
                  constants.NODE_INITD_SCRIPT))
1473

    
1474
    result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1475
    if result.failed:
1476
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1477
                               " output: %s" %
1478
                               (node, result.fail_reason, result.output))
1479

    
1480
    # check connectivity
1481
    time.sleep(4)
1482

    
1483
    result = rpc.call_version([node])[node]
1484
    if result:
1485
      if constants.PROTOCOL_VERSION == result:
1486
        logger.Info("communication to node %s fine, sw version %s match" %
1487
                    (node, result))
1488
      else:
1489
        raise errors.OpExecError("Version mismatch master version %s,"
1490
                                 " node version %s" %
1491
                                 (constants.PROTOCOL_VERSION, result))
1492
    else:
1493
      raise errors.OpExecError("Cannot get version from the new node")
1494

    
1495
    # setup ssh on node
1496
    logger.Info("copy ssh key to node %s" % node)
1497
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1498
    keyarray = []
1499
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1500
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1501
                priv_key, pub_key]
1502

    
1503
    for i in keyfiles:
1504
      f = open(i, 'r')
1505
      try:
1506
        keyarray.append(f.read())
1507
      finally:
1508
        f.close()
1509

    
1510
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1511
                               keyarray[3], keyarray[4], keyarray[5])
1512

    
1513
    if not result:
1514
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1515

    
1516
    # Add node to our /etc/hosts, and add key to known_hosts
1517
    _AddHostToEtcHosts(new_node.name)
1518

    
1519
    if new_node.secondary_ip != new_node.primary_ip:
1520
      if not rpc.call_node_tcp_ping(new_node.name,
1521
                                    constants.LOCALHOST_IP_ADDRESS,
1522
                                    new_node.secondary_ip,
1523
                                    constants.DEFAULT_NODED_PORT,
1524
                                    10, False):
1525
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1526
                                 " you gave (%s). Please fix and re-run this"
1527
                                 " command." % new_node.secondary_ip)
1528

    
1529
    success, msg = self.ssh.VerifyNodeHostname(node)
1530
    if not success:
1531
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1532
                               " than the one the resolver gives: %s."
1533
                               " Please fix and re-run this command." %
1534
                               (node, msg))
1535

    
1536
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1537
    # including the node just added
1538
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1539
    dist_nodes = self.cfg.GetNodeList() + [node]
1540
    if myself.name in dist_nodes:
1541
      dist_nodes.remove(myself.name)
1542

    
1543
    logger.Debug("Copying hosts and known_hosts to all nodes")
1544
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1545
      result = rpc.call_upload_file(dist_nodes, fname)
1546
      for to_node in dist_nodes:
1547
        if not result[to_node]:
1548
          logger.Error("copy of file %s to node %s failed" %
1549
                       (fname, to_node))
1550

    
1551
    to_copy = ss.GetFileList()
1552
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1553
      to_copy.append(constants.VNC_PASSWORD_FILE)
1554
    for fname in to_copy:
1555
      if not self.ssh.CopyFileToNode(node, fname):
1556
        logger.Error("could not copy file %s to node %s" % (fname, node))
1557

    
1558
    logger.Info("adding node %s to cluster.conf" % node)
1559
    self.cfg.AddNode(new_node)
1560

    
1561

    
1562
class LUMasterFailover(LogicalUnit):
1563
  """Failover the master node to the current node.
1564

1565
  This is a special LU in that it must run on a non-master node.
1566

1567
  """
1568
  HPATH = "master-failover"
1569
  HTYPE = constants.HTYPE_CLUSTER
1570
  REQ_MASTER = False
1571
  _OP_REQP = []
1572

    
1573
  def BuildHooksEnv(self):
1574
    """Build hooks env.
1575

1576
    This will run on the new master only in the pre phase, and on all
1577
    the nodes in the post phase.
1578

1579
    """
1580
    env = {
1581
      "OP_TARGET": self.new_master,
1582
      "NEW_MASTER": self.new_master,
1583
      "OLD_MASTER": self.old_master,
1584
      }
1585
    return env, [self.new_master], self.cfg.GetNodeList()
1586

    
1587
  def CheckPrereq(self):
1588
    """Check prerequisites.
1589

1590
    This checks that we are not already the master.
1591

1592
    """
1593
    self.new_master = utils.HostInfo().name
1594
    self.old_master = self.sstore.GetMasterNode()
1595

    
1596
    if self.old_master == self.new_master:
1597
      raise errors.OpPrereqError("This commands must be run on the node"
1598
                                 " where you want the new master to be."
1599
                                 " %s is already the master" %
1600
                                 self.old_master)
1601

    
1602
  def Exec(self, feedback_fn):
1603
    """Failover the master node.
1604

1605
    This command, when run on a non-master node, will cause the current
1606
    master to cease being master, and the non-master to become new
1607
    master.
1608

1609
    """
1610
    #TODO: do not rely on gethostname returning the FQDN
1611
    logger.Info("setting master to %s, old master: %s" %
1612
                (self.new_master, self.old_master))
1613

    
1614
    if not rpc.call_node_stop_master(self.old_master):
1615
      logger.Error("could disable the master role on the old master"
1616
                   " %s, please disable manually" % self.old_master)
1617

    
1618
    ss = self.sstore
1619
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1620
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1621
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1622
      logger.Error("could not distribute the new simple store master file"
1623
                   " to the other nodes, please check.")
1624

    
1625
    if not rpc.call_node_start_master(self.new_master):
1626
      logger.Error("could not start the master role on the new master"
1627
                   " %s, please check" % self.new_master)
1628
      feedback_fn("Error in activating the master IP on the new master,"
1629
                  " please fix manually.")
1630

    
1631

    
1632

    
1633
class LUQueryClusterInfo(NoHooksLU):
1634
  """Query cluster configuration.
1635

1636
  """
1637
  _OP_REQP = []
1638
  REQ_MASTER = False
1639

    
1640
  def CheckPrereq(self):
1641
    """No prerequsites needed for this LU.
1642

1643
    """
1644
    pass
1645

    
1646
  def Exec(self, feedback_fn):
1647
    """Return cluster config.
1648

1649
    """
1650
    result = {
1651
      "name": self.sstore.GetClusterName(),
1652
      "software_version": constants.RELEASE_VERSION,
1653
      "protocol_version": constants.PROTOCOL_VERSION,
1654
      "config_version": constants.CONFIG_VERSION,
1655
      "os_api_version": constants.OS_API_VERSION,
1656
      "export_version": constants.EXPORT_VERSION,
1657
      "master": self.sstore.GetMasterNode(),
1658
      "architecture": (platform.architecture()[0], platform.machine()),
1659
      }
1660

    
1661
    return result
1662

    
1663

    
1664
class LUClusterCopyFile(NoHooksLU):
1665
  """Copy file to cluster.
1666

1667
  """
1668
  _OP_REQP = ["nodes", "filename"]
1669

    
1670
  def CheckPrereq(self):
1671
    """Check prerequisites.
1672

1673
    It should check that the named file exists and that the given list
1674
    of nodes is valid.
1675

1676
    """
1677
    if not os.path.exists(self.op.filename):
1678
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1679

    
1680
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1681

    
1682
  def Exec(self, feedback_fn):
1683
    """Copy a file from master to some nodes.
1684

1685
    Args:
1686
      opts - class with options as members
1687
      args - list containing a single element, the file name
1688
    Opts used:
1689
      nodes - list containing the name of target nodes; if empty, all nodes
1690

1691
    """
1692
    filename = self.op.filename
1693

    
1694
    myname = utils.HostInfo().name
1695

    
1696
    for node in self.nodes:
1697
      if node == myname:
1698
        continue
1699
      if not self.ssh.CopyFileToNode(node, filename):
1700
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1701

    
1702

    
1703
class LUDumpClusterConfig(NoHooksLU):
1704
  """Return a text-representation of the cluster-config.
1705

1706
  """
1707
  _OP_REQP = []
1708

    
1709
  def CheckPrereq(self):
1710
    """No prerequisites.
1711

1712
    """
1713
    pass
1714

    
1715
  def Exec(self, feedback_fn):
1716
    """Dump a representation of the cluster config to the standard output.
1717

1718
    """
1719
    return self.cfg.DumpConfig()
1720

    
1721

    
1722
class LURunClusterCommand(NoHooksLU):
1723
  """Run a command on some nodes.
1724

1725
  """
1726
  _OP_REQP = ["command", "nodes"]
1727

    
1728
  def CheckPrereq(self):
1729
    """Check prerequisites.
1730

1731
    It checks that the given list of nodes is valid.
1732

1733
    """
1734
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1735

    
1736
  def Exec(self, feedback_fn):
1737
    """Run a command on some nodes.
1738

1739
    """
1740
    data = []
1741
    for node in self.nodes:
1742
      result = self.ssh.Run(node, "root", self.op.command)
1743
      data.append((node, result.output, result.exit_code))
1744

    
1745
    return data
1746

    
1747

    
1748
class LUActivateInstanceDisks(NoHooksLU):
1749
  """Bring up an instance's disks.
1750

1751
  """
1752
  _OP_REQP = ["instance_name"]
1753

    
1754
  def CheckPrereq(self):
1755
    """Check prerequisites.
1756

1757
    This checks that the instance is in the cluster.
1758

1759
    """
1760
    instance = self.cfg.GetInstanceInfo(
1761
      self.cfg.ExpandInstanceName(self.op.instance_name))
1762
    if instance is None:
1763
      raise errors.OpPrereqError("Instance '%s' not known" %
1764
                                 self.op.instance_name)
1765
    self.instance = instance
1766

    
1767

    
1768
  def Exec(self, feedback_fn):
1769
    """Activate the disks.
1770

1771
    """
1772
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1773
    if not disks_ok:
1774
      raise errors.OpExecError("Cannot activate block devices")
1775

    
1776
    return disks_info
1777

    
1778

    
1779
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1780
  """Prepare the block devices for an instance.
1781

1782
  This sets up the block devices on all nodes.
1783

1784
  Args:
1785
    instance: a ganeti.objects.Instance object
1786
    ignore_secondaries: if true, errors on secondary nodes won't result
1787
                        in an error return from the function
1788

1789
  Returns:
1790
    false if the operation failed
1791
    list of (host, instance_visible_name, node_visible_name) if the operation
1792
         suceeded with the mapping from node devices to instance devices
1793
  """
1794
  device_info = []
1795
  disks_ok = True
1796
  iname = instance.name
1797
  # With the two passes mechanism we try to reduce the window of
1798
  # opportunity for the race condition of switching DRBD to primary
1799
  # before handshaking occured, but we do not eliminate it
1800

    
1801
  # The proper fix would be to wait (with some limits) until the
1802
  # connection has been made and drbd transitions from WFConnection
1803
  # into any other network-connected state (Connected, SyncTarget,
1804
  # SyncSource, etc.)
1805

    
1806
  # 1st pass, assemble on all nodes in secondary mode
1807
  for inst_disk in instance.disks:
1808
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1809
      cfg.SetDiskID(node_disk, node)
1810
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1811
      if not result:
1812
        logger.Error("could not prepare block device %s on node %s"
1813
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1814
        if not ignore_secondaries:
1815
          disks_ok = False
1816

    
1817
  # FIXME: race condition on drbd migration to primary
1818

    
1819
  # 2nd pass, do only the primary node
1820
  for inst_disk in instance.disks:
1821
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1822
      if node != instance.primary_node:
1823
        continue
1824
      cfg.SetDiskID(node_disk, node)
1825
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1826
      if not result:
1827
        logger.Error("could not prepare block device %s on node %s"
1828
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1829
        disks_ok = False
1830
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1831

    
1832
  # leave the disks configured for the primary node
1833
  # this is a workaround that would be fixed better by
1834
  # improving the logical/physical id handling
1835
  for disk in instance.disks:
1836
    cfg.SetDiskID(disk, instance.primary_node)
1837

    
1838
  return disks_ok, device_info
1839

    
1840

    
1841
def _StartInstanceDisks(cfg, instance, force):
1842
  """Start the disks of an instance.
1843

1844
  """
1845
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1846
                                           ignore_secondaries=force)
1847
  if not disks_ok:
1848
    _ShutdownInstanceDisks(instance, cfg)
1849
    if force is not None and not force:
1850
      logger.Error("If the message above refers to a secondary node,"
1851
                   " you can retry the operation using '--force'.")
1852
    raise errors.OpExecError("Disk consistency error")
1853

    
1854

    
1855
class LUDeactivateInstanceDisks(NoHooksLU):
1856
  """Shutdown an instance's disks.
1857

1858
  """
1859
  _OP_REQP = ["instance_name"]
1860

    
1861
  def CheckPrereq(self):
1862
    """Check prerequisites.
1863

1864
    This checks that the instance is in the cluster.
1865

1866
    """
1867
    instance = self.cfg.GetInstanceInfo(
1868
      self.cfg.ExpandInstanceName(self.op.instance_name))
1869
    if instance is None:
1870
      raise errors.OpPrereqError("Instance '%s' not known" %
1871
                                 self.op.instance_name)
1872
    self.instance = instance
1873

    
1874
  def Exec(self, feedback_fn):
1875
    """Deactivate the disks
1876

1877
    """
1878
    instance = self.instance
1879
    ins_l = rpc.call_instance_list([instance.primary_node])
1880
    ins_l = ins_l[instance.primary_node]
1881
    if not type(ins_l) is list:
1882
      raise errors.OpExecError("Can't contact node '%s'" %
1883
                               instance.primary_node)
1884

    
1885
    if self.instance.name in ins_l:
1886
      raise errors.OpExecError("Instance is running, can't shutdown"
1887
                               " block devices.")
1888

    
1889
    _ShutdownInstanceDisks(instance, self.cfg)
1890

    
1891

    
1892
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1893
  """Shutdown block devices of an instance.
1894

1895
  This does the shutdown on all nodes of the instance.
1896

1897
  If the ignore_primary is false, errors on the primary node are
1898
  ignored.
1899

1900
  """
1901
  result = True
1902
  for disk in instance.disks:
1903
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1904
      cfg.SetDiskID(top_disk, node)
1905
      if not rpc.call_blockdev_shutdown(node, top_disk):
1906
        logger.Error("could not shutdown block device %s on node %s" %
1907
                     (disk.iv_name, node))
1908
        if not ignore_primary or node != instance.primary_node:
1909
          result = False
1910
  return result
1911

    
1912

    
1913
def _CheckNodeFreeMemory(cfg, node, reason, requested):
1914
  """Checks if a node has enough free memory.
1915

1916
  This function check if a given node has the needed amount of free
1917
  memory. In case the node has less memory or we cannot get the
1918
  information from the node, this function raise an OpPrereqError
1919
  exception.
1920

1921
  Args:
1922
    - cfg: a ConfigWriter instance
1923
    - node: the node name
1924
    - reason: string to use in the error message
1925
    - requested: the amount of memory in MiB
1926

1927
  """
1928
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1929
  if not nodeinfo or not isinstance(nodeinfo, dict):
1930
    raise errors.OpPrereqError("Could not contact node %s for resource"
1931
                             " information" % (node,))
1932

    
1933
  free_mem = nodeinfo[node].get('memory_free')
1934
  if not isinstance(free_mem, int):
1935
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1936
                             " was '%s'" % (node, free_mem))
1937
  if requested > free_mem:
1938
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1939
                             " needed %s MiB, available %s MiB" %
1940
                             (node, reason, requested, free_mem))
1941

    
1942

    
1943
class LUStartupInstance(LogicalUnit):
1944
  """Starts an instance.
1945

1946
  """
1947
  HPATH = "instance-start"
1948
  HTYPE = constants.HTYPE_INSTANCE
1949
  _OP_REQP = ["instance_name", "force"]
1950

    
1951
  def BuildHooksEnv(self):
1952
    """Build hooks env.
1953

1954
    This runs on master, primary and secondary nodes of the instance.
1955

1956
    """
1957
    env = {
1958
      "FORCE": self.op.force,
1959
      }
1960
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1961
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1962
          list(self.instance.secondary_nodes))
1963
    return env, nl, nl
1964

    
1965
  def CheckPrereq(self):
1966
    """Check prerequisites.
1967

1968
    This checks that the instance is in the cluster.
1969

1970
    """
1971
    instance = self.cfg.GetInstanceInfo(
1972
      self.cfg.ExpandInstanceName(self.op.instance_name))
1973
    if instance is None:
1974
      raise errors.OpPrereqError("Instance '%s' not known" %
1975
                                 self.op.instance_name)
1976

    
1977
    # check bridges existance
1978
    _CheckInstanceBridgesExist(instance)
1979

    
1980
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
1981
                         "starting instance %s" % instance.name,
1982
                         instance.memory)
1983

    
1984
    self.instance = instance
1985
    self.op.instance_name = instance.name
1986

    
1987
  def Exec(self, feedback_fn):
1988
    """Start the instance.
1989

1990
    """
1991
    instance = self.instance
1992
    force = self.op.force
1993
    extra_args = getattr(self.op, "extra_args", "")
1994

    
1995
    node_current = instance.primary_node
1996

    
1997
    _StartInstanceDisks(self.cfg, instance, force)
1998

    
1999
    if not rpc.call_instance_start(node_current, instance, extra_args):
2000
      _ShutdownInstanceDisks(instance, self.cfg)
2001
      raise errors.OpExecError("Could not start instance")
2002

    
2003
    self.cfg.MarkInstanceUp(instance.name)
2004

    
2005

    
2006
class LURebootInstance(LogicalUnit):
2007
  """Reboot an instance.
2008

2009
  """
2010
  HPATH = "instance-reboot"
2011
  HTYPE = constants.HTYPE_INSTANCE
2012
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2013

    
2014
  def BuildHooksEnv(self):
2015
    """Build hooks env.
2016

2017
    This runs on master, primary and secondary nodes of the instance.
2018

2019
    """
2020
    env = {
2021
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2022
      }
2023
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2024
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2025
          list(self.instance.secondary_nodes))
2026
    return env, nl, nl
2027

    
2028
  def CheckPrereq(self):
2029
    """Check prerequisites.
2030

2031
    This checks that the instance is in the cluster.
2032

2033
    """
2034
    instance = self.cfg.GetInstanceInfo(
2035
      self.cfg.ExpandInstanceName(self.op.instance_name))
2036
    if instance is None:
2037
      raise errors.OpPrereqError("Instance '%s' not known" %
2038
                                 self.op.instance_name)
2039

    
2040
    # check bridges existance
2041
    _CheckInstanceBridgesExist(instance)
2042

    
2043
    self.instance = instance
2044
    self.op.instance_name = instance.name
2045

    
2046
  def Exec(self, feedback_fn):
2047
    """Reboot the instance.
2048

2049
    """
2050
    instance = self.instance
2051
    ignore_secondaries = self.op.ignore_secondaries
2052
    reboot_type = self.op.reboot_type
2053
    extra_args = getattr(self.op, "extra_args", "")
2054

    
2055
    node_current = instance.primary_node
2056

    
2057
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2058
                           constants.INSTANCE_REBOOT_HARD,
2059
                           constants.INSTANCE_REBOOT_FULL]:
2060
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2061
                                  (constants.INSTANCE_REBOOT_SOFT,
2062
                                   constants.INSTANCE_REBOOT_HARD,
2063
                                   constants.INSTANCE_REBOOT_FULL))
2064

    
2065
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2066
                       constants.INSTANCE_REBOOT_HARD]:
2067
      if not rpc.call_instance_reboot(node_current, instance,
2068
                                      reboot_type, extra_args):
2069
        raise errors.OpExecError("Could not reboot instance")
2070
    else:
2071
      if not rpc.call_instance_shutdown(node_current, instance):
2072
        raise errors.OpExecError("could not shutdown instance for full reboot")
2073
      _ShutdownInstanceDisks(instance, self.cfg)
2074
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2075
      if not rpc.call_instance_start(node_current, instance, extra_args):
2076
        _ShutdownInstanceDisks(instance, self.cfg)
2077
        raise errors.OpExecError("Could not start instance for full reboot")
2078

    
2079
    self.cfg.MarkInstanceUp(instance.name)
2080

    
2081

    
2082
class LUShutdownInstance(LogicalUnit):
2083
  """Shutdown an instance.
2084

2085
  """
2086
  HPATH = "instance-stop"
2087
  HTYPE = constants.HTYPE_INSTANCE
2088
  _OP_REQP = ["instance_name"]
2089

    
2090
  def BuildHooksEnv(self):
2091
    """Build hooks env.
2092

2093
    This runs on master, primary and secondary nodes of the instance.
2094

2095
    """
2096
    env = _BuildInstanceHookEnvByObject(self.instance)
2097
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2098
          list(self.instance.secondary_nodes))
2099
    return env, nl, nl
2100

    
2101
  def CheckPrereq(self):
2102
    """Check prerequisites.
2103

2104
    This checks that the instance is in the cluster.
2105

2106
    """
2107
    instance = self.cfg.GetInstanceInfo(
2108
      self.cfg.ExpandInstanceName(self.op.instance_name))
2109
    if instance is None:
2110
      raise errors.OpPrereqError("Instance '%s' not known" %
2111
                                 self.op.instance_name)
2112
    self.instance = instance
2113

    
2114
  def Exec(self, feedback_fn):
2115
    """Shutdown the instance.
2116

2117
    """
2118
    instance = self.instance
2119
    node_current = instance.primary_node
2120
    if not rpc.call_instance_shutdown(node_current, instance):
2121
      logger.Error("could not shutdown instance")
2122

    
2123
    self.cfg.MarkInstanceDown(instance.name)
2124
    _ShutdownInstanceDisks(instance, self.cfg)
2125

    
2126

    
2127
class LUReinstallInstance(LogicalUnit):
2128
  """Reinstall an instance.
2129

2130
  """
2131
  HPATH = "instance-reinstall"
2132
  HTYPE = constants.HTYPE_INSTANCE
2133
  _OP_REQP = ["instance_name"]
2134

    
2135
  def BuildHooksEnv(self):
2136
    """Build hooks env.
2137

2138
    This runs on master, primary and secondary nodes of the instance.
2139

2140
    """
2141
    env = _BuildInstanceHookEnvByObject(self.instance)
2142
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2143
          list(self.instance.secondary_nodes))
2144
    return env, nl, nl
2145

    
2146
  def CheckPrereq(self):
2147
    """Check prerequisites.
2148

2149
    This checks that the instance is in the cluster and is not running.
2150

2151
    """
2152
    instance = self.cfg.GetInstanceInfo(
2153
      self.cfg.ExpandInstanceName(self.op.instance_name))
2154
    if instance is None:
2155
      raise errors.OpPrereqError("Instance '%s' not known" %
2156
                                 self.op.instance_name)
2157
    if instance.disk_template == constants.DT_DISKLESS:
2158
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2159
                                 self.op.instance_name)
2160
    if instance.status != "down":
2161
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2162
                                 self.op.instance_name)
2163
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2164
    if remote_info:
2165
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2166
                                 (self.op.instance_name,
2167
                                  instance.primary_node))
2168

    
2169
    self.op.os_type = getattr(self.op, "os_type", None)
2170
    if self.op.os_type is not None:
2171
      # OS verification
2172
      pnode = self.cfg.GetNodeInfo(
2173
        self.cfg.ExpandNodeName(instance.primary_node))
2174
      if pnode is None:
2175
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2176
                                   self.op.pnode)
2177
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2178
      if not os_obj:
2179
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2180
                                   " primary node"  % self.op.os_type)
2181

    
2182
    self.instance = instance
2183

    
2184
  def Exec(self, feedback_fn):
2185
    """Reinstall the instance.
2186

2187
    """
2188
    inst = self.instance
2189

    
2190
    if self.op.os_type is not None:
2191
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2192
      inst.os = self.op.os_type
2193
      self.cfg.AddInstance(inst)
2194

    
2195
    _StartInstanceDisks(self.cfg, inst, None)
2196
    try:
2197
      feedback_fn("Running the instance OS create scripts...")
2198
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2199
        raise errors.OpExecError("Could not install OS for instance %s"
2200
                                 " on node %s" %
2201
                                 (inst.name, inst.primary_node))
2202
    finally:
2203
      _ShutdownInstanceDisks(inst, self.cfg)
2204

    
2205

    
2206
class LURenameInstance(LogicalUnit):
2207
  """Rename an instance.
2208

2209
  """
2210
  HPATH = "instance-rename"
2211
  HTYPE = constants.HTYPE_INSTANCE
2212
  _OP_REQP = ["instance_name", "new_name"]
2213

    
2214
  def BuildHooksEnv(self):
2215
    """Build hooks env.
2216

2217
    This runs on master, primary and secondary nodes of the instance.
2218

2219
    """
2220
    env = _BuildInstanceHookEnvByObject(self.instance)
2221
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2222
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2223
          list(self.instance.secondary_nodes))
2224
    return env, nl, nl
2225

    
2226
  def CheckPrereq(self):
2227
    """Check prerequisites.
2228

2229
    This checks that the instance is in the cluster and is not running.
2230

2231
    """
2232
    instance = self.cfg.GetInstanceInfo(
2233
      self.cfg.ExpandInstanceName(self.op.instance_name))
2234
    if instance is None:
2235
      raise errors.OpPrereqError("Instance '%s' not known" %
2236
                                 self.op.instance_name)
2237
    if instance.status != "down":
2238
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2239
                                 self.op.instance_name)
2240
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2241
    if remote_info:
2242
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2243
                                 (self.op.instance_name,
2244
                                  instance.primary_node))
2245
    self.instance = instance
2246

    
2247
    # new name verification
2248
    name_info = utils.HostInfo(self.op.new_name)
2249

    
2250
    self.op.new_name = new_name = name_info.name
2251
    instance_list = self.cfg.GetInstanceList()
2252
    if new_name in instance_list:
2253
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2254
                                 instance_name)
2255

    
2256
    if not getattr(self.op, "ignore_ip", False):
2257
      command = ["fping", "-q", name_info.ip]
2258
      result = utils.RunCmd(command)
2259
      if not result.failed:
2260
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2261
                                   (name_info.ip, new_name))
2262

    
2263

    
2264
  def Exec(self, feedback_fn):
2265
    """Reinstall the instance.
2266

2267
    """
2268
    inst = self.instance
2269
    old_name = inst.name
2270

    
2271
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2272

    
2273
    # re-read the instance from the configuration after rename
2274
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2275

    
2276
    _StartInstanceDisks(self.cfg, inst, None)
2277
    try:
2278
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2279
                                          "sda", "sdb"):
2280
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2281
               " instance has been renamed in Ganeti)" %
2282
               (inst.name, inst.primary_node))
2283
        logger.Error(msg)
2284
    finally:
2285
      _ShutdownInstanceDisks(inst, self.cfg)
2286

    
2287

    
2288
class LURemoveInstance(LogicalUnit):
2289
  """Remove an instance.
2290

2291
  """
2292
  HPATH = "instance-remove"
2293
  HTYPE = constants.HTYPE_INSTANCE
2294
  _OP_REQP = ["instance_name"]
2295

    
2296
  def BuildHooksEnv(self):
2297
    """Build hooks env.
2298

2299
    This runs on master, primary and secondary nodes of the instance.
2300

2301
    """
2302
    env = _BuildInstanceHookEnvByObject(self.instance)
2303
    nl = [self.sstore.GetMasterNode()]
2304
    return env, nl, nl
2305

    
2306
  def CheckPrereq(self):
2307
    """Check prerequisites.
2308

2309
    This checks that the instance is in the cluster.
2310

2311
    """
2312
    instance = self.cfg.GetInstanceInfo(
2313
      self.cfg.ExpandInstanceName(self.op.instance_name))
2314
    if instance is None:
2315
      raise errors.OpPrereqError("Instance '%s' not known" %
2316
                                 self.op.instance_name)
2317
    self.instance = instance
2318

    
2319
  def Exec(self, feedback_fn):
2320
    """Remove the instance.
2321

2322
    """
2323
    instance = self.instance
2324
    logger.Info("shutting down instance %s on node %s" %
2325
                (instance.name, instance.primary_node))
2326

    
2327
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2328
      if self.op.ignore_failures:
2329
        feedback_fn("Warning: can't shutdown instance")
2330
      else:
2331
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2332
                                 (instance.name, instance.primary_node))
2333

    
2334
    logger.Info("removing block devices for instance %s" % instance.name)
2335

    
2336
    if not _RemoveDisks(instance, self.cfg):
2337
      if self.op.ignore_failures:
2338
        feedback_fn("Warning: can't remove instance's disks")
2339
      else:
2340
        raise errors.OpExecError("Can't remove instance's disks")
2341

    
2342
    logger.Info("removing instance %s out of cluster config" % instance.name)
2343

    
2344
    self.cfg.RemoveInstance(instance.name)
2345

    
2346

    
2347
class LUQueryInstances(NoHooksLU):
2348
  """Logical unit for querying instances.
2349

2350
  """
2351
  _OP_REQP = ["output_fields", "names"]
2352

    
2353
  def CheckPrereq(self):
2354
    """Check prerequisites.
2355

2356
    This checks that the fields required are valid output fields.
2357

2358
    """
2359
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2360
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2361
                               "admin_state", "admin_ram",
2362
                               "disk_template", "ip", "mac", "bridge",
2363
                               "sda_size", "sdb_size", "vcpus"],
2364
                       dynamic=self.dynamic_fields,
2365
                       selected=self.op.output_fields)
2366

    
2367
    self.wanted = _GetWantedInstances(self, self.op.names)
2368

    
2369
  def Exec(self, feedback_fn):
2370
    """Computes the list of nodes and their attributes.
2371

2372
    """
2373
    instance_names = self.wanted
2374
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2375
                     in instance_names]
2376

    
2377
    # begin data gathering
2378

    
2379
    nodes = frozenset([inst.primary_node for inst in instance_list])
2380

    
2381
    bad_nodes = []
2382
    if self.dynamic_fields.intersection(self.op.output_fields):
2383
      live_data = {}
2384
      node_data = rpc.call_all_instances_info(nodes)
2385
      for name in nodes:
2386
        result = node_data[name]
2387
        if result:
2388
          live_data.update(result)
2389
        elif result == False:
2390
          bad_nodes.append(name)
2391
        # else no instance is alive
2392
    else:
2393
      live_data = dict([(name, {}) for name in instance_names])
2394

    
2395
    # end data gathering
2396

    
2397
    output = []
2398
    for instance in instance_list:
2399
      iout = []
2400
      for field in self.op.output_fields:
2401
        if field == "name":
2402
          val = instance.name
2403
        elif field == "os":
2404
          val = instance.os
2405
        elif field == "pnode":
2406
          val = instance.primary_node
2407
        elif field == "snodes":
2408
          val = list(instance.secondary_nodes)
2409
        elif field == "admin_state":
2410
          val = (instance.status != "down")
2411
        elif field == "oper_state":
2412
          if instance.primary_node in bad_nodes:
2413
            val = None
2414
          else:
2415
            val = bool(live_data.get(instance.name))
2416
        elif field == "status":
2417
          if instance.primary_node in bad_nodes:
2418
            val = "ERROR_nodedown"
2419
          else:
2420
            running = bool(live_data.get(instance.name))
2421
            if running:
2422
              if instance.status != "down":
2423
                val = "running"
2424
              else:
2425
                val = "ERROR_up"
2426
            else:
2427
              if instance.status != "down":
2428
                val = "ERROR_down"
2429
              else:
2430
                val = "ADMIN_down"
2431
        elif field == "admin_ram":
2432
          val = instance.memory
2433
        elif field == "oper_ram":
2434
          if instance.primary_node in bad_nodes:
2435
            val = None
2436
          elif instance.name in live_data:
2437
            val = live_data[instance.name].get("memory", "?")
2438
          else:
2439
            val = "-"
2440
        elif field == "disk_template":
2441
          val = instance.disk_template
2442
        elif field == "ip":
2443
          val = instance.nics[0].ip
2444
        elif field == "bridge":
2445
          val = instance.nics[0].bridge
2446
        elif field == "mac":
2447
          val = instance.nics[0].mac
2448
        elif field == "sda_size" or field == "sdb_size":
2449
          disk = instance.FindDisk(field[:3])
2450
          if disk is None:
2451
            val = None
2452
          else:
2453
            val = disk.size
2454
        elif field == "vcpus":
2455
          val = instance.vcpus
2456
        else:
2457
          raise errors.ParameterError(field)
2458
        iout.append(val)
2459
      output.append(iout)
2460

    
2461
    return output
2462

    
2463

    
2464
class LUFailoverInstance(LogicalUnit):
2465
  """Failover an instance.
2466

2467
  """
2468
  HPATH = "instance-failover"
2469
  HTYPE = constants.HTYPE_INSTANCE
2470
  _OP_REQP = ["instance_name", "ignore_consistency"]
2471

    
2472
  def BuildHooksEnv(self):
2473
    """Build hooks env.
2474

2475
    This runs on master, primary and secondary nodes of the instance.
2476

2477
    """
2478
    env = {
2479
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2480
      }
2481
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2482
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2483
    return env, nl, nl
2484

    
2485
  def CheckPrereq(self):
2486
    """Check prerequisites.
2487

2488
    This checks that the instance is in the cluster.
2489

2490
    """
2491
    instance = self.cfg.GetInstanceInfo(
2492
      self.cfg.ExpandInstanceName(self.op.instance_name))
2493
    if instance is None:
2494
      raise errors.OpPrereqError("Instance '%s' not known" %
2495
                                 self.op.instance_name)
2496

    
2497
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2498
      raise errors.OpPrereqError("Instance's disk layout is not"
2499
                                 " network mirrored, cannot failover.")
2500

    
2501
    secondary_nodes = instance.secondary_nodes
2502
    if not secondary_nodes:
2503
      raise errors.ProgrammerError("no secondary node but using "
2504
                                   "DT_REMOTE_RAID1 template")
2505

    
2506
    target_node = secondary_nodes[0]
2507
    # check memory requirements on the secondary node
2508
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2509
                         instance.name, instance.memory)
2510

    
2511
    # check bridge existance
2512
    brlist = [nic.bridge for nic in instance.nics]
2513
    if not rpc.call_bridges_exist(target_node, brlist):
2514
      raise errors.OpPrereqError("One or more target bridges %s does not"
2515
                                 " exist on destination node '%s'" %
2516
                                 (brlist, target_node))
2517

    
2518
    self.instance = instance
2519

    
2520
  def Exec(self, feedback_fn):
2521
    """Failover an instance.
2522

2523
    The failover is done by shutting it down on its present node and
2524
    starting it on the secondary.
2525

2526
    """
2527
    instance = self.instance
2528

    
2529
    source_node = instance.primary_node
2530
    target_node = instance.secondary_nodes[0]
2531

    
2532
    feedback_fn("* checking disk consistency between source and target")
2533
    for dev in instance.disks:
2534
      # for remote_raid1, these are md over drbd
2535
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2536
        if not self.op.ignore_consistency:
2537
          raise errors.OpExecError("Disk %s is degraded on target node,"
2538
                                   " aborting failover." % dev.iv_name)
2539

    
2540
    feedback_fn("* shutting down instance on source node")
2541
    logger.Info("Shutting down instance %s on node %s" %
2542
                (instance.name, source_node))
2543

    
2544
    if not rpc.call_instance_shutdown(source_node, instance):
2545
      if self.op.ignore_consistency:
2546
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2547
                     " anyway. Please make sure node %s is down"  %
2548
                     (instance.name, source_node, source_node))
2549
      else:
2550
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2551
                                 (instance.name, source_node))
2552

    
2553
    feedback_fn("* deactivating the instance's disks on source node")
2554
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2555
      raise errors.OpExecError("Can't shut down the instance's disks.")
2556

    
2557
    instance.primary_node = target_node
2558
    # distribute new instance config to the other nodes
2559
    self.cfg.AddInstance(instance)
2560

    
2561
    feedback_fn("* activating the instance's disks on target node")
2562
    logger.Info("Starting instance %s on node %s" %
2563
                (instance.name, target_node))
2564

    
2565
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2566
                                             ignore_secondaries=True)
2567
    if not disks_ok:
2568
      _ShutdownInstanceDisks(instance, self.cfg)
2569
      raise errors.OpExecError("Can't activate the instance's disks")
2570

    
2571
    feedback_fn("* starting the instance on the target node")
2572
    if not rpc.call_instance_start(target_node, instance, None):
2573
      _ShutdownInstanceDisks(instance, self.cfg)
2574
      raise errors.OpExecError("Could not start instance %s on node %s." %
2575
                               (instance.name, target_node))
2576

    
2577

    
2578
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2579
  """Create a tree of block devices on the primary node.
2580

2581
  This always creates all devices.
2582

2583
  """
2584
  if device.children:
2585
    for child in device.children:
2586
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2587
        return False
2588

    
2589
  cfg.SetDiskID(device, node)
2590
  new_id = rpc.call_blockdev_create(node, device, device.size,
2591
                                    instance.name, True, info)
2592
  if not new_id:
2593
    return False
2594
  if device.physical_id is None:
2595
    device.physical_id = new_id
2596
  return True
2597

    
2598

    
2599
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2600
  """Create a tree of block devices on a secondary node.
2601

2602
  If this device type has to be created on secondaries, create it and
2603
  all its children.
2604

2605
  If not, just recurse to children keeping the same 'force' value.
2606

2607
  """
2608
  if device.CreateOnSecondary():
2609
    force = True
2610
  if device.children:
2611
    for child in device.children:
2612
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2613
                                        child, force, info):
2614
        return False
2615

    
2616
  if not force:
2617
    return True
2618
  cfg.SetDiskID(device, node)
2619
  new_id = rpc.call_blockdev_create(node, device, device.size,
2620
                                    instance.name, False, info)
2621
  if not new_id:
2622
    return False
2623
  if device.physical_id is None:
2624
    device.physical_id = new_id
2625
  return True
2626

    
2627

    
2628
def _GenerateUniqueNames(cfg, exts):
2629
  """Generate a suitable LV name.
2630

2631
  This will generate a logical volume name for the given instance.
2632

2633
  """
2634
  results = []
2635
  for val in exts:
2636
    new_id = cfg.GenerateUniqueID()
2637
    results.append("%s%s" % (new_id, val))
2638
  return results
2639

    
2640

    
2641
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2642
  """Generate a drbd device complete with its children.
2643

2644
  """
2645
  port = cfg.AllocatePort()
2646
  vgname = cfg.GetVGName()
2647
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2648
                          logical_id=(vgname, names[0]))
2649
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2650
                          logical_id=(vgname, names[1]))
2651
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2652
                          logical_id = (primary, secondary, port),
2653
                          children = [dev_data, dev_meta])
2654
  return drbd_dev
2655

    
2656

    
2657
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2658
  """Generate a drbd8 device complete with its children.
2659

2660
  """
2661
  port = cfg.AllocatePort()
2662
  vgname = cfg.GetVGName()
2663
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2664
                          logical_id=(vgname, names[0]))
2665
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2666
                          logical_id=(vgname, names[1]))
2667
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2668
                          logical_id = (primary, secondary, port),
2669
                          children = [dev_data, dev_meta],
2670
                          iv_name=iv_name)
2671
  return drbd_dev
2672

    
2673

    
2674
def _GenerateDiskTemplate(cfg, template_name,
2675
                          instance_name, primary_node,
2676
                          secondary_nodes, disk_sz, swap_sz):
2677
  """Generate the entire disk layout for a given template type.
2678

2679
  """
2680
  #TODO: compute space requirements
2681

    
2682
  vgname = cfg.GetVGName()
2683
  if template_name == constants.DT_DISKLESS:
2684
    disks = []
2685
  elif template_name == constants.DT_PLAIN:
2686
    if len(secondary_nodes) != 0:
2687
      raise errors.ProgrammerError("Wrong template configuration")
2688

    
2689
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2690
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2691
                           logical_id=(vgname, names[0]),
2692
                           iv_name = "sda")
2693
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2694
                           logical_id=(vgname, names[1]),
2695
                           iv_name = "sdb")
2696
    disks = [sda_dev, sdb_dev]
2697
  elif template_name == constants.DT_LOCAL_RAID1:
2698
    if len(secondary_nodes) != 0:
2699
      raise errors.ProgrammerError("Wrong template configuration")
2700

    
2701

    
2702
    names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2703
                                       ".sdb_m1", ".sdb_m2"])
2704
    sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2705
                              logical_id=(vgname, names[0]))
2706
    sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2707
                              logical_id=(vgname, names[1]))
2708
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2709
                              size=disk_sz,
2710
                              children = [sda_dev_m1, sda_dev_m2])
2711
    sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2712
                              logical_id=(vgname, names[2]))
2713
    sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2714
                              logical_id=(vgname, names[3]))
2715
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2716
                              size=swap_sz,
2717
                              children = [sdb_dev_m1, sdb_dev_m2])
2718
    disks = [md_sda_dev, md_sdb_dev]
2719
  elif template_name == constants.DT_REMOTE_RAID1:
2720
    if len(secondary_nodes) != 1:
2721
      raise errors.ProgrammerError("Wrong template configuration")
2722
    remote_node = secondary_nodes[0]
2723
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2724
                                       ".sdb_data", ".sdb_meta"])
2725
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2726
                                         disk_sz, names[0:2])
2727
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2728
                              children = [drbd_sda_dev], size=disk_sz)
2729
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2730
                                         swap_sz, names[2:4])
2731
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2732
                              children = [drbd_sdb_dev], size=swap_sz)
2733
    disks = [md_sda_dev, md_sdb_dev]
2734
  elif template_name == constants.DT_DRBD8:
2735
    if len(secondary_nodes) != 1:
2736
      raise errors.ProgrammerError("Wrong template configuration")
2737
    remote_node = secondary_nodes[0]
2738
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2739
                                       ".sdb_data", ".sdb_meta"])
2740
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2741
                                         disk_sz, names[0:2], "sda")
2742
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2743
                                         swap_sz, names[2:4], "sdb")
2744
    disks = [drbd_sda_dev, drbd_sdb_dev]
2745
  else:
2746
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2747
  return disks
2748

    
2749

    
2750
def _GetInstanceInfoText(instance):
2751
  """Compute that text that should be added to the disk's metadata.
2752

2753
  """
2754
  return "originstname+%s" % instance.name
2755

    
2756

    
2757
def _CreateDisks(cfg, instance):
2758
  """Create all disks for an instance.
2759

2760
  This abstracts away some work from AddInstance.
2761

2762
  Args:
2763
    instance: the instance object
2764

2765
  Returns:
2766
    True or False showing the success of the creation process
2767

2768
  """
2769
  info = _GetInstanceInfoText(instance)
2770

    
2771
  for device in instance.disks:
2772
    logger.Info("creating volume %s for instance %s" %
2773
              (device.iv_name, instance.name))
2774
    #HARDCODE
2775
    for secondary_node in instance.secondary_nodes:
2776
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2777
                                        device, False, info):
2778
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2779
                     (device.iv_name, device, secondary_node))
2780
        return False
2781
    #HARDCODE
2782
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2783
                                    instance, device, info):
2784
      logger.Error("failed to create volume %s on primary!" %
2785
                   device.iv_name)
2786
      return False
2787
  return True
2788

    
2789

    
2790
def _RemoveDisks(instance, cfg):
2791
  """Remove all disks for an instance.
2792

2793
  This abstracts away some work from `AddInstance()` and
2794
  `RemoveInstance()`. Note that in case some of the devices couldn't
2795
  be removed, the removal will continue with the other ones (compare
2796
  with `_CreateDisks()`).
2797

2798
  Args:
2799
    instance: the instance object
2800

2801
  Returns:
2802
    True or False showing the success of the removal proces
2803

2804
  """
2805
  logger.Info("removing block devices for instance %s" % instance.name)
2806

    
2807
  result = True
2808
  for device in instance.disks:
2809
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2810
      cfg.SetDiskID(disk, node)
2811
      if not rpc.call_blockdev_remove(node, disk):
2812
        logger.Error("could not remove block device %s on node %s,"
2813
                     " continuing anyway" %
2814
                     (device.iv_name, node))
2815
        result = False
2816
  return result
2817

    
2818

    
2819
class LUCreateInstance(LogicalUnit):
2820
  """Create an instance.
2821

2822
  """
2823
  HPATH = "instance-add"
2824
  HTYPE = constants.HTYPE_INSTANCE
2825
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2826
              "disk_template", "swap_size", "mode", "start", "vcpus",
2827
              "wait_for_sync", "ip_check", "mac"]
2828

    
2829
  def BuildHooksEnv(self):
2830
    """Build hooks env.
2831

2832
    This runs on master, primary and secondary nodes of the instance.
2833

2834
    """
2835
    env = {
2836
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2837
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2838
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2839
      "INSTANCE_ADD_MODE": self.op.mode,
2840
      }
2841
    if self.op.mode == constants.INSTANCE_IMPORT:
2842
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2843
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2844
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2845

    
2846
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2847
      primary_node=self.op.pnode,
2848
      secondary_nodes=self.secondaries,
2849
      status=self.instance_status,
2850
      os_type=self.op.os_type,
2851
      memory=self.op.mem_size,
2852
      vcpus=self.op.vcpus,
2853
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2854
    ))
2855

    
2856
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2857
          self.secondaries)
2858
    return env, nl, nl
2859

    
2860

    
2861
  def CheckPrereq(self):
2862
    """Check prerequisites.
2863

2864
    """
2865
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
2866
      if not hasattr(self.op, attr):
2867
        setattr(self.op, attr, None)
2868

    
2869
    if self.op.mode not in (constants.INSTANCE_CREATE,
2870
                            constants.INSTANCE_IMPORT):
2871
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2872
                                 self.op.mode)
2873

    
2874
    if self.op.mode == constants.INSTANCE_IMPORT:
2875
      src_node = getattr(self.op, "src_node", None)
2876
      src_path = getattr(self.op, "src_path", None)
2877
      if src_node is None or src_path is None:
2878
        raise errors.OpPrereqError("Importing an instance requires source"
2879
                                   " node and path options")
2880
      src_node_full = self.cfg.ExpandNodeName(src_node)
2881
      if src_node_full is None:
2882
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2883
      self.op.src_node = src_node = src_node_full
2884

    
2885
      if not os.path.isabs(src_path):
2886
        raise errors.OpPrereqError("The source path must be absolute")
2887

    
2888
      export_info = rpc.call_export_info(src_node, src_path)
2889

    
2890
      if not export_info:
2891
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2892

    
2893
      if not export_info.has_section(constants.INISECT_EXP):
2894
        raise errors.ProgrammerError("Corrupted export config")
2895

    
2896
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2897
      if (int(ei_version) != constants.EXPORT_VERSION):
2898
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2899
                                   (ei_version, constants.EXPORT_VERSION))
2900

    
2901
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2902
        raise errors.OpPrereqError("Can't import instance with more than"
2903
                                   " one data disk")
2904

    
2905
      # FIXME: are the old os-es, disk sizes, etc. useful?
2906
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2907
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2908
                                                         'disk0_dump'))
2909
      self.src_image = diskimage
2910
    else: # INSTANCE_CREATE
2911
      if getattr(self.op, "os_type", None) is None:
2912
        raise errors.OpPrereqError("No guest OS specified")
2913

    
2914
    # check primary node
2915
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2916
    if pnode is None:
2917
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
2918
                                 self.op.pnode)
2919
    self.op.pnode = pnode.name
2920
    self.pnode = pnode
2921
    self.secondaries = []
2922
    # disk template and mirror node verification
2923
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2924
      raise errors.OpPrereqError("Invalid disk template name")
2925

    
2926
    if self.op.disk_template in constants.DTS_NET_MIRROR:
2927
      if getattr(self.op, "snode", None) is None:
2928
        raise errors.OpPrereqError("The networked disk templates need"
2929
                                   " a mirror node")
2930

    
2931
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2932
      if snode_name is None:
2933
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
2934
                                   self.op.snode)
2935
      elif snode_name == pnode.name:
2936
        raise errors.OpPrereqError("The secondary node cannot be"
2937
                                   " the primary node.")
2938
      self.secondaries.append(snode_name)
2939

    
2940
    # Required free disk space as a function of disk and swap space
2941
    req_size_dict = {
2942
      constants.DT_DISKLESS: None,
2943
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2944
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2945
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2946
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2947
      constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2948
    }
2949

    
2950
    if self.op.disk_template not in req_size_dict:
2951
      raise errors.ProgrammerError("Disk template '%s' size requirement"
2952
                                   " is unknown" %  self.op.disk_template)
2953

    
2954
    req_size = req_size_dict[self.op.disk_template]
2955

    
2956
    # Check lv size requirements
2957
    if req_size is not None:
2958
      nodenames = [pnode.name] + self.secondaries
2959
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2960
      for node in nodenames:
2961
        info = nodeinfo.get(node, None)
2962
        if not info:
2963
          raise errors.OpPrereqError("Cannot get current information"
2964
                                     " from node '%s'" % nodeinfo)
2965
        vg_free = info.get('vg_free', None)
2966
        if not isinstance(vg_free, int):
2967
          raise errors.OpPrereqError("Can't compute free disk space on"
2968
                                     " node %s" % node)
2969
        if req_size > info['vg_free']:
2970
          raise errors.OpPrereqError("Not enough disk space on target node %s."
2971
                                     " %d MB available, %d MB required" %
2972
                                     (node, info['vg_free'], req_size))
2973

    
2974
    # os verification
2975
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2976
    if not os_obj:
2977
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
2978
                                 " primary node"  % self.op.os_type)
2979

    
2980
    if self.op.kernel_path == constants.VALUE_NONE:
2981
      raise errors.OpPrereqError("Can't set instance kernel to none")
2982

    
2983
    # instance verification
2984
    hostname1 = utils.HostInfo(self.op.instance_name)
2985

    
2986
    self.op.instance_name = instance_name = hostname1.name
2987
    instance_list = self.cfg.GetInstanceList()
2988
    if instance_name in instance_list:
2989
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2990
                                 instance_name)
2991

    
2992
    ip = getattr(self.op, "ip", None)
2993
    if ip is None or ip.lower() == "none":
2994
      inst_ip = None
2995
    elif ip.lower() == "auto":
2996
      inst_ip = hostname1.ip
2997
    else:
2998
      if not utils.IsValidIP(ip):
2999
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3000
                                   " like a valid IP" % ip)
3001
      inst_ip = ip
3002
    self.inst_ip = inst_ip
3003

    
3004
    if self.op.start and not self.op.ip_check:
3005
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3006
                                 " adding an instance in start mode")
3007

    
3008
    if self.op.ip_check:
3009
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3010
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3011
                                   (hostname1.ip, instance_name))
3012

    
3013
    # MAC address verification
3014
    if self.op.mac != "auto":
3015
      if not utils.IsValidMac(self.op.mac.lower()):
3016
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3017
                                   self.op.mac)
3018

    
3019
    # bridge verification
3020
    bridge = getattr(self.op, "bridge", None)
3021
    if bridge is None:
3022
      self.op.bridge = self.cfg.GetDefBridge()
3023
    else:
3024
      self.op.bridge = bridge
3025

    
3026
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3027
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3028
                                 " destination node '%s'" %
3029
                                 (self.op.bridge, pnode.name))
3030

    
3031
    # boot order verification
3032
    if self.op.hvm_boot_order is not None:
3033
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3034
        raise errors.OpPrereqError("invalid boot order specified,"
3035
                                   " must be one or more of [acdn]")
3036

    
3037
    if self.op.start:
3038
      self.instance_status = 'up'
3039
    else:
3040
      self.instance_status = 'down'
3041

    
3042
  def Exec(self, feedback_fn):
3043
    """Create and add the instance to the cluster.
3044

3045
    """
3046
    instance = self.op.instance_name
3047
    pnode_name = self.pnode.name
3048

    
3049
    if self.op.mac == "auto":
3050
      mac_address = self.cfg.GenerateMAC()
3051
    else:
3052
      mac_address = self.op.mac
3053

    
3054
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3055
    if self.inst_ip is not None:
3056
      nic.ip = self.inst_ip
3057

    
3058
    ht_kind = self.sstore.GetHypervisorType()
3059
    if ht_kind in constants.HTS_REQ_PORT:
3060
      network_port = self.cfg.AllocatePort()
3061
    else:
3062
      network_port = None
3063

    
3064
    disks = _GenerateDiskTemplate(self.cfg,
3065
                                  self.op.disk_template,
3066
                                  instance, pnode_name,
3067
                                  self.secondaries, self.op.disk_size,
3068
                                  self.op.swap_size)
3069

    
3070
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3071
                            primary_node=pnode_name,
3072
                            memory=self.op.mem_size,
3073
                            vcpus=self.op.vcpus,
3074
                            nics=[nic], disks=disks,
3075
                            disk_template=self.op.disk_template,
3076
                            status=self.instance_status,
3077
                            network_port=network_port,
3078
                            kernel_path=self.op.kernel_path,
3079
                            initrd_path=self.op.initrd_path,
3080
                            hvm_boot_order=self.op.hvm_boot_order,
3081
                            )
3082

    
3083
    feedback_fn("* creating instance disks...")
3084
    if not _CreateDisks(self.cfg, iobj):
3085
      _RemoveDisks(iobj, self.cfg)
3086
      raise errors.OpExecError("Device creation failed, reverting...")
3087

    
3088
    feedback_fn("adding instance %s to cluster config" % instance)
3089

    
3090
    self.cfg.AddInstance(iobj)
3091

    
3092
    if self.op.wait_for_sync:
3093
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3094
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3095
      # make sure the disks are not degraded (still sync-ing is ok)
3096
      time.sleep(15)
3097
      feedback_fn("* checking mirrors status")
3098
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3099
    else:
3100
      disk_abort = False
3101

    
3102
    if disk_abort:
3103
      _RemoveDisks(iobj, self.cfg)
3104
      self.cfg.RemoveInstance(iobj.name)
3105
      raise errors.OpExecError("There are some degraded disks for"
3106
                               " this instance")
3107

    
3108
    feedback_fn("creating os for instance %s on node %s" %
3109
                (instance, pnode_name))
3110

    
3111
    if iobj.disk_template != constants.DT_DISKLESS:
3112
      if self.op.mode == constants.INSTANCE_CREATE:
3113
        feedback_fn("* running the instance OS create scripts...")
3114
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3115
          raise errors.OpExecError("could not add os for instance %s"
3116
                                   " on node %s" %
3117
                                   (instance, pnode_name))
3118

    
3119
      elif self.op.mode == constants.INSTANCE_IMPORT:
3120
        feedback_fn("* running the instance OS import scripts...")
3121
        src_node = self.op.src_node
3122
        src_image = self.src_image
3123
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3124
                                                src_node, src_image):
3125
          raise errors.OpExecError("Could not import os for instance"
3126
                                   " %s on node %s" %
3127
                                   (instance, pnode_name))
3128
      else:
3129
        # also checked in the prereq part
3130
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3131
                                     % self.op.mode)
3132

    
3133
    if self.op.start:
3134
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3135
      feedback_fn("* starting instance...")
3136
      if not rpc.call_instance_start(pnode_name, iobj, None):
3137
        raise errors.OpExecError("Could not start instance")
3138

    
3139

    
3140
class LUConnectConsole(NoHooksLU):
3141
  """Connect to an instance's console.
3142

3143
  This is somewhat special in that it returns the command line that
3144
  you need to run on the master node in order to connect to the
3145
  console.
3146

3147
  """
3148
  _OP_REQP = ["instance_name"]
3149

    
3150
  def CheckPrereq(self):
3151
    """Check prerequisites.
3152

3153
    This checks that the instance is in the cluster.
3154

3155
    """
3156
    instance = self.cfg.GetInstanceInfo(
3157
      self.cfg.ExpandInstanceName(self.op.instance_name))
3158
    if instance is None:
3159
      raise errors.OpPrereqError("Instance '%s' not known" %
3160
                                 self.op.instance_name)
3161
    self.instance = instance
3162

    
3163
  def Exec(self, feedback_fn):
3164
    """Connect to the console of an instance
3165

3166
    """
3167
    instance = self.instance
3168
    node = instance.primary_node
3169

    
3170
    node_insts = rpc.call_instance_list([node])[node]
3171
    if node_insts is False:
3172
      raise errors.OpExecError("Can't connect to node %s." % node)
3173

    
3174
    if instance.name not in node_insts:
3175
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3176

    
3177
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3178

    
3179
    hyper = hypervisor.GetHypervisor()
3180
    console_cmd = hyper.GetShellCommandForConsole(instance)
3181

    
3182
    # build ssh cmdline
3183
    cmd = self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3184
    return cmd[0], cmd
3185

    
3186

    
3187
class LUAddMDDRBDComponent(LogicalUnit):
3188
  """Adda new mirror member to an instance's disk.
3189

3190
  """
3191
  HPATH = "mirror-add"
3192
  HTYPE = constants.HTYPE_INSTANCE
3193
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3194

    
3195
  def BuildHooksEnv(self):
3196
    """Build hooks env.
3197

3198
    This runs on the master, the primary and all the secondaries.
3199

3200
    """
3201
    env = {
3202
      "NEW_SECONDARY": self.op.remote_node,
3203
      "DISK_NAME": self.op.disk_name,
3204
      }
3205
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3206
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3207
          self.op.remote_node,] + list(self.instance.secondary_nodes)
3208
    return env, nl, nl
3209

    
3210
  def CheckPrereq(self):
3211
    """Check prerequisites.
3212

3213
    This checks that the instance is in the cluster.
3214

3215
    """
3216
    instance = self.cfg.GetInstanceInfo(
3217
      self.cfg.ExpandInstanceName(self.op.instance_name))
3218
    if instance is None:
3219
      raise errors.OpPrereqError("Instance '%s' not known" %
3220
                                 self.op.instance_name)
3221
    self.instance = instance
3222

    
3223
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3224
    if remote_node is None:
3225
      raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3226
    self.remote_node = remote_node
3227

    
3228
    if remote_node == instance.primary_node:
3229
      raise errors.OpPrereqError("The specified node is the primary node of"
3230
                                 " the instance.")
3231

    
3232
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3233
      raise errors.OpPrereqError("Instance's disk layout is not"
3234
                                 " remote_raid1.")
3235
    for disk in instance.disks:
3236
      if disk.iv_name == self.op.disk_name:
3237
        break
3238
    else:
3239
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3240
                                 " instance." % self.op.disk_name)
3241
    if len(disk.children) > 1:
3242
      raise errors.OpPrereqError("The device already has two slave devices."
3243
                                 " This would create a 3-disk raid1 which we"
3244
                                 " don't allow.")
3245
    self.disk = disk
3246

    
3247
  def Exec(self, feedback_fn):
3248
    """Add the mirror component
3249

3250
    """
3251
    disk = self.disk
3252
    instance = self.instance
3253

    
3254
    remote_node = self.remote_node
3255
    lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3256
    names = _GenerateUniqueNames(self.cfg, lv_names)
3257
    new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3258
                                     remote_node, disk.size, names)
3259

    
3260
    logger.Info("adding new mirror component on secondary")
3261
    #HARDCODE
3262
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3263
                                      new_drbd, False,
3264
                                      _GetInstanceInfoText(instance)):
3265
      raise errors.OpExecError("Failed to create new component on secondary"
3266
                               " node %s" % remote_node)
3267

    
3268
    logger.Info("adding new mirror component on primary")
3269
    #HARDCODE
3270
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3271
                                    instance, new_drbd,
3272
                                    _GetInstanceInfoText(instance)):
3273
      # remove secondary dev
3274
      self.cfg.SetDiskID(new_drbd, remote_node)
3275
      rpc.call_blockdev_remove(remote_node, new_drbd)
3276
      raise errors.OpExecError("Failed to create volume on primary")
3277

    
3278
    # the device exists now
3279
    # call the primary node to add the mirror to md
3280
    logger.Info("adding new mirror component to md")
3281
    if not rpc.call_blockdev_addchildren(instance.primary_node,
3282
                                         disk, [new_drbd]):
3283
      logger.Error("Can't add mirror compoment to md!")
3284
      self.cfg.SetDiskID(new_drbd, remote_node)
3285
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
3286
        logger.Error("Can't rollback on secondary")
3287
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
3288
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3289
        logger.Error("Can't rollback on primary")
3290
      raise errors.OpExecError("Can't add mirror component to md array")
3291

    
3292
    disk.children.append(new_drbd)
3293

    
3294
    self.cfg.AddInstance(instance)
3295

    
3296
    _WaitForSync(self.cfg, instance, self.proc)
3297

    
3298
    return 0
3299

    
3300

    
3301
class LURemoveMDDRBDComponent(LogicalUnit):
3302
  """Remove a component from a remote_raid1 disk.
3303

3304
  """
3305
  HPATH = "mirror-remove"
3306
  HTYPE = constants.HTYPE_INSTANCE
3307
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3308

    
3309
  def BuildHooksEnv(self):
3310
    """Build hooks env.
3311

3312
    This runs on the master, the primary and all the secondaries.
3313

3314
    """
3315
    env = {
3316
      "DISK_NAME": self.op.disk_name,
3317
      "DISK_ID": self.op.disk_id,
3318
      "OLD_SECONDARY": self.old_secondary,
3319
      }
3320
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3321
    nl = [self.sstore.GetMasterNode(),
3322
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3323
    return env, nl, nl
3324

    
3325
  def CheckPrereq(self):
3326
    """Check prerequisites.
3327

3328
    This checks that the instance is in the cluster.
3329

3330
    """
3331
    instance = self.cfg.GetInstanceInfo(
3332
      self.cfg.ExpandInstanceName(self.op.instance_name))
3333
    if instance is None:
3334
      raise errors.OpPrereqError("Instance '%s' not known" %
3335
                                 self.op.instance_name)
3336
    self.instance = instance
3337

    
3338
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3339
      raise errors.OpPrereqError("Instance's disk layout is not"
3340
                                 " remote_raid1.")
3341
    for disk in instance.disks:
3342
      if disk.iv_name == self.op.disk_name:
3343
        break
3344
    else:
3345
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3346
                                 " instance." % self.op.disk_name)
3347
    for child in disk.children:
3348
      if (child.dev_type == constants.LD_DRBD7 and
3349
          child.logical_id[2] == self.op.disk_id):
3350
        break
3351
    else:
3352
      raise errors.OpPrereqError("Can't find the device with this port.")
3353

    
3354
    if len(disk.children) < 2:
3355
      raise errors.OpPrereqError("Cannot remove the last component from"
3356
                                 " a mirror.")
3357
    self.disk = disk
3358
    self.child = child
3359
    if self.child.logical_id[0] == instance.primary_node:
3360
      oid = 1
3361
    else:
3362
      oid = 0
3363
    self.old_secondary = self.child.logical_id[oid]
3364

    
3365
  def Exec(self, feedback_fn):
3366
    """Remove the mirror component
3367

3368
    """
3369
    instance = self.instance
3370
    disk = self.disk
3371
    child = self.child
3372
    logger.Info("remove mirror component")
3373
    self.cfg.SetDiskID(disk, instance.primary_node)
3374
    if not rpc.call_blockdev_removechildren(instance.primary_node,
3375
                                            disk, [child]):
3376
      raise errors.OpExecError("Can't remove child from mirror.")
3377

    
3378
    for node in child.logical_id[:2]:
3379
      self.cfg.SetDiskID(child, node)
3380
      if not rpc.call_blockdev_remove(node, child):
3381
        logger.Error("Warning: failed to remove device from node %s,"
3382
                     " continuing operation." % node)
3383

    
3384
    disk.children.remove(child)
3385
    self.cfg.AddInstance(instance)
3386

    
3387

    
3388
class LUReplaceDisks(LogicalUnit):
3389
  """Replace the disks of an instance.
3390

3391
  """
3392
  HPATH = "mirrors-replace"
3393
  HTYPE = constants.HTYPE_INSTANCE
3394
  _OP_REQP = ["instance_name", "mode", "disks"]
3395

    
3396
  def BuildHooksEnv(self):
3397
    """Build hooks env.
3398

3399
    This runs on the master, the primary and all the secondaries.
3400

3401
    """
3402
    env = {
3403
      "MODE": self.op.mode,
3404
      "NEW_SECONDARY": self.op.remote_node,
3405
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3406
      }
3407
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3408
    nl = [
3409
      self.sstore.GetMasterNode(),
3410
      self.instance.primary_node,
3411
      ]
3412
    if self.op.remote_node is not None:
3413
      nl.append(self.op.remote_node)
3414
    return env, nl, nl
3415

    
3416
  def CheckPrereq(self):
3417
    """Check prerequisites.
3418

3419
    This checks that the instance is in the cluster.
3420

3421
    """
3422
    instance = self.cfg.GetInstanceInfo(
3423
      self.cfg.ExpandInstanceName(self.op.instance_name))
3424
    if instance is None:
3425
      raise errors.OpPrereqError("Instance '%s' not known" %
3426
                                 self.op.instance_name)
3427
    self.instance = instance
3428
    self.op.instance_name = instance.name
3429

    
3430
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3431
      raise errors.OpPrereqError("Instance's disk layout is not"
3432
                                 " network mirrored.")
3433

    
3434
    if len(instance.secondary_nodes) != 1:
3435
      raise errors.OpPrereqError("The instance has a strange layout,"
3436
                                 " expected one secondary but found %d" %
3437
                                 len(instance.secondary_nodes))
3438

    
3439
    self.sec_node = instance.secondary_nodes[0]
3440

    
3441
    remote_node = getattr(self.op, "remote_node", None)
3442
    if remote_node is not None:
3443
      remote_node = self.cfg.ExpandNodeName(remote_node)
3444
      if remote_node is None:
3445
        raise errors.OpPrereqError("Node '%s' not known" %
3446
                                   self.op.remote_node)
3447
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3448
    else:
3449
      self.remote_node_info = None
3450
    if remote_node == instance.primary_node:
3451
      raise errors.OpPrereqError("The specified node is the primary node of"
3452
                                 " the instance.")
3453
    elif remote_node == self.sec_node:
3454
      if self.op.mode == constants.REPLACE_DISK_SEC:
3455
        # this is for DRBD8, where we can't execute the same mode of
3456
        # replacement as for drbd7 (no different port allocated)
3457
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3458
                                   " replacement")
3459
      # the user gave the current secondary, switch to
3460
      # 'no-replace-secondary' mode for drbd7
3461
      remote_node = None
3462
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3463
        self.op.mode != constants.REPLACE_DISK_ALL):
3464
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3465
                                 " disks replacement, not individual ones")
3466
    if instance.disk_template == constants.DT_DRBD8:
3467
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3468
          remote_node is not None):
3469
        # switch to replace secondary mode
3470
        self.op.mode = constants.REPLACE_DISK_SEC
3471

    
3472
      if self.op.mode == constants.REPLACE_DISK_ALL:
3473
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3474
                                   " secondary disk replacement, not"
3475
                                   " both at once")
3476
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3477
        if remote_node is not None:
3478
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3479
                                     " the secondary while doing a primary"
3480
                                     " node disk replacement")
3481
        self.tgt_node = instance.primary_node
3482
        self.oth_node = instance.secondary_nodes[0]
3483
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3484
        self.new_node = remote_node # this can be None, in which case
3485
                                    # we don't change the secondary
3486
        self.tgt_node = instance.secondary_nodes[0]
3487
        self.oth_node = instance.primary_node
3488
      else:
3489
        raise errors.ProgrammerError("Unhandled disk replace mode")
3490

    
3491
    for name in self.op.disks:
3492
      if instance.FindDisk(name) is None:
3493
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3494
                                   (name, instance.name))
3495
    self.op.remote_node = remote_node
3496

    
3497
  def _ExecRR1(self, feedback_fn):
3498
    """Replace the disks of an instance.
3499

3500
    """
3501
    instance = self.instance
3502
    iv_names = {}
3503
    # start of work
3504
    if self.op.remote_node is None:
3505
      remote_node = self.sec_node
3506
    else:
3507
      remote_node = self.op.remote_node
3508
    cfg = self.cfg
3509
    for dev in instance.disks:
3510
      size = dev.size
3511
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3512
      names = _GenerateUniqueNames(cfg, lv_names)
3513
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3514
                                       remote_node, size, names)
3515
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3516
      logger.Info("adding new mirror component on secondary for %s" %
3517
                  dev.iv_name)
3518
      #HARDCODE
3519
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3520
                                        new_drbd, False,
3521
                                        _GetInstanceInfoText(instance)):
3522
        raise errors.OpExecError("Failed to create new component on secondary"
3523
                                 " node %s. Full abort, cleanup manually!" %
3524
                                 remote_node)
3525

    
3526
      logger.Info("adding new mirror component on primary")
3527
      #HARDCODE
3528
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3529
                                      instance, new_drbd,
3530
                                      _GetInstanceInfoText(instance)):
3531
        # remove secondary dev
3532
        cfg.SetDiskID(new_drbd, remote_node)
3533
        rpc.call_blockdev_remove(remote_node, new_drbd)
3534
        raise errors.OpExecError("Failed to create volume on primary!"
3535
                                 " Full abort, cleanup manually!!")
3536

    
3537
      # the device exists now
3538
      # call the primary node to add the mirror to md
3539
      logger.Info("adding new mirror component to md")
3540
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3541
                                           [new_drbd]):
3542
        logger.Error("Can't add mirror compoment to md!")
3543
        cfg.SetDiskID(new_drbd, remote_node)
3544
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3545
          logger.Error("Can't rollback on secondary")
3546
        cfg.SetDiskID(new_drbd, instance.primary_node)
3547
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3548
          logger.Error("Can't rollback on primary")
3549
        raise errors.OpExecError("Full abort, cleanup manually!!")
3550

    
3551
      dev.children.append(new_drbd)
3552
      cfg.AddInstance(instance)
3553

    
3554
    # this can fail as the old devices are degraded and _WaitForSync
3555
    # does a combined result over all disks, so we don't check its
3556
    # return value
3557
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3558

    
3559
    # so check manually all the devices
3560
    for name in iv_names:
3561
      dev, child, new_drbd = iv_names[name]
3562
      cfg.SetDiskID(dev, instance.primary_node)
3563
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3564
      if is_degr:
3565
        raise errors.OpExecError("MD device %s is degraded!" % name)
3566
      cfg.SetDiskID(new_drbd, instance.primary_node)
3567
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3568
      if is_degr:
3569
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3570

    
3571
    for name in iv_names:
3572
      dev, child, new_drbd = iv_names[name]
3573
      logger.Info("remove mirror %s component" % name)
3574
      cfg.SetDiskID(dev, instance.primary_node)
3575
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3576
                                              dev, [child]):
3577
        logger.Error("Can't remove child from mirror, aborting"
3578
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3579
        continue
3580

    
3581
      for node in child.logical_id[:2]:
3582
        logger.Info("remove child device on %s" % node)
3583
        cfg.SetDiskID(child, node)
3584
        if not rpc.call_blockdev_remove(node, child):
3585
          logger.Error("Warning: failed to remove device from node %s,"
3586
                       " continuing operation." % node)
3587

    
3588
      dev.children.remove(child)
3589

    
3590
      cfg.AddInstance(instance)
3591

    
3592
  def _ExecD8DiskOnly(self, feedback_fn):
3593
    """Replace a disk on the primary or secondary for dbrd8.
3594

3595
    The algorithm for replace is quite complicated:
3596
      - for each disk to be replaced:
3597
        - create new LVs on the target node with unique names
3598
        - detach old LVs from the drbd device
3599
        - rename old LVs to name_replaced.<time_t>
3600
        - rename new LVs to old LVs
3601
        - attach the new LVs (with the old names now) to the drbd device
3602
      - wait for sync across all devices
3603
      - for each modified disk:
3604
        - remove old LVs (which have the name name_replaces.<time_t>)
3605

3606
    Failures are not very well handled.
3607

3608
    """
3609
    steps_total = 6
3610
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3611
    instance = self.instance
3612
    iv_names = {}
3613
    vgname = self.cfg.GetVGName()
3614
    # start of work
3615
    cfg = self.cfg
3616
    tgt_node = self.tgt_node
3617
    oth_node = self.oth_node
3618

    
3619
    # Step: check device activation
3620
    self.proc.LogStep(1, steps_total, "check device existence")
3621
    info("checking volume groups")
3622
    my_vg = cfg.GetVGName()
3623
    results = rpc.call_vg_list([oth_node, tgt_node])
3624
    if not results:
3625
      raise errors.OpExecError("Can't list volume groups on the nodes")
3626
    for node in oth_node, tgt_node:
3627
      res = results.get(node, False)
3628
      if not res or my_vg not in res:
3629
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3630
                                 (my_vg, node))
3631
    for dev in instance.disks:
3632
      if not dev.iv_name in self.op.disks:
3633
        continue
3634
      for node in tgt_node, oth_node:
3635
        info("checking %s on %s" % (dev.iv_name, node))
3636
        cfg.SetDiskID(dev, node)
3637
        if not rpc.call_blockdev_find(node, dev):
3638
          raise errors.OpExecError("Can't find device %s on node %s" %
3639
                                   (dev.iv_name, node))
3640

    
3641
    # Step: check other node consistency
3642
    self.proc.LogStep(2, steps_total, "check peer consistency")
3643
    for dev in instance.disks:
3644
      if not dev.iv_name in self.op.disks:
3645
        continue
3646
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3647
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3648
                                   oth_node==instance.primary_node):
3649
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3650
                                 " to replace disks on this node (%s)" %
3651
                                 (oth_node, tgt_node))
3652

    
3653
    # Step: create new storage
3654
    self.proc.LogStep(3, steps_total, "allocate new storage")
3655
    for dev in instance.disks:
3656
      if not dev.iv_name in self.op.disks:
3657
        continue
3658
      size = dev.size
3659
      cfg.SetDiskID(dev, tgt_node)
3660
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3661
      names = _GenerateUniqueNames(cfg, lv_names)
3662
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3663
                             logical_id=(vgname, names[0]))
3664
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3665
                             logical_id=(vgname, names[1]))
3666
      new_lvs = [lv_data, lv_meta]
3667
      old_lvs = dev.children
3668
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3669
      info("creating new local storage on %s for %s" %
3670
           (tgt_node, dev.iv_name))
3671
      # since we *always* want to create this LV, we use the
3672
      # _Create...OnPrimary (which forces the creation), even if we
3673
      # are talking about the secondary node
3674
      for new_lv in new_lvs:
3675
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3676
                                        _GetInstanceInfoText(instance)):
3677
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3678
                                   " node '%s'" %
3679
                                   (new_lv.logical_id[1], tgt_node))
3680

    
3681
    # Step: for each lv, detach+rename*2+attach
3682
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3683
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3684
      info("detaching %s drbd from local storage" % dev.iv_name)
3685
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3686
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3687
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3688
      #dev.children = []
3689
      #cfg.Update(instance)
3690

    
3691
      # ok, we created the new LVs, so now we know we have the needed
3692
      # storage; as such, we proceed on the target node to rename
3693
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3694
      # using the assumption that logical_id == physical_id (which in
3695
      # turn is the unique_id on that node)
3696

    
3697
      # FIXME(iustin): use a better name for the replaced LVs
3698
      temp_suffix = int(time.time())
3699
      ren_fn = lambda d, suff: (d.physical_id[0],
3700
                                d.physical_id[1] + "_replaced-%s" % suff)
3701
      # build the rename list based on what LVs exist on the node
3702
      rlist = []
3703
      for to_ren in old_lvs:
3704
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3705
        if find_res is not None: # device exists
3706
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3707

    
3708
      info("renaming the old LVs on the target node")
3709
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3710
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3711
      # now we rename the new LVs to the old LVs
3712
      info("renaming the new LVs on the target node")
3713
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3714
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3715
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3716

    
3717
      for old, new in zip(old_lvs, new_lvs):
3718
        new.logical_id = old.logical_id
3719
        cfg.SetDiskID(new, tgt_node)
3720

    
3721
      for disk in old_lvs:
3722
        disk.logical_id = ren_fn(disk, temp_suffix)
3723
        cfg.SetDiskID(disk, tgt_node)
3724

    
3725
      # now that the new lvs have the old name, we can add them to the device
3726
      info("adding new mirror component on %s" % tgt_node)
3727
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3728
        for new_lv in new_lvs:
3729
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3730
            warning("Can't rollback device %s", hint="manually cleanup unused"
3731
                    " logical volumes")
3732
        raise errors.OpExecError("Can't add local storage to drbd")
3733

    
3734
      dev.children = new_lvs
3735
      cfg.Update(instance)
3736

    
3737
    # Step: wait for sync
3738

    
3739
    # this can fail as the old devices are degraded and _WaitForSync
3740
    # does a combined result over all disks, so we don't check its
3741
    # return value
3742
    self.proc.LogStep(5, steps_total, "sync devices")
3743
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3744

    
3745
    # so check manually all the devices
3746
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3747
      cfg.SetDiskID(dev, instance.primary_node)
3748
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3749
      if is_degr:
3750
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3751

    
3752
    # Step: remove old storage
3753
    self.proc.LogStep(6, steps_total, "removing old storage")
3754
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3755
      info("remove logical volumes for %s" % name)
3756
      for lv in old_lvs:
3757
        cfg.SetDiskID(lv, tgt_node)
3758
        if not rpc.call_blockdev_remove(tgt_node, lv):
3759
          warning("Can't remove old LV", hint="manually remove unused LVs")
3760
          continue
3761

    
3762
  def _ExecD8Secondary(self, feedback_fn):
3763
    """Replace the secondary node for drbd8.
3764

3765
    The algorithm for replace is quite complicated:
3766
      - for all disks of the instance:
3767
        - create new LVs on the new node with same names
3768
        - shutdown the drbd device on the old secondary
3769
        - disconnect the drbd network on the primary
3770
        - create the drbd device on the new secondary
3771
        - network attach the drbd on the primary, using an artifice:
3772
          the drbd code for Attach() will connect to the network if it
3773
          finds a device which is connected to the good local disks but
3774
          not network enabled
3775
      - wait for sync across all devices
3776
      - remove all disks from the old secondary
3777

3778
    Failures are not very well handled.
3779

3780
    """
3781
    steps_total = 6
3782
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3783
    instance = self.instance
3784
    iv_names = {}
3785
    vgname = self.cfg.GetVGName()
3786
    # start of work
3787
    cfg = self.cfg
3788
    old_node = self.tgt_node
3789
    new_node = self.new_node
3790
    pri_node = instance.primary_node
3791

    
3792
    # Step: check device activation
3793
    self.proc.LogStep(1, steps_total, "check device existence")
3794
    info("checking volume groups")
3795
    my_vg = cfg.GetVGName()
3796
    results = rpc.call_vg_list([pri_node, new_node])
3797
    if not results:
3798
      raise errors.OpExecError("Can't list volume groups on the nodes")
3799
    for node in pri_node, new_node:
3800
      res = results.get(node, False)
3801
      if not res or my_vg not in res:
3802
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3803
                                 (my_vg, node))
3804
    for dev in instance.disks:
3805
      if not dev.iv_name in self.op.disks:
3806
        continue
3807
      info("checking %s on %s" % (dev.iv_name, pri_node))
3808
      cfg.SetDiskID(dev, pri_node)
3809
      if not rpc.call_blockdev_find(pri_node, dev):
3810
        raise errors.OpExecError("Can't find device %s on node %s" %
3811
                                 (dev.iv_name, pri_node))
3812

    
3813
    # Step: check other node consistency
3814
    self.proc.LogStep(2, steps_total, "check peer consistency")
3815
    for dev in instance.disks:
3816
      if not dev.iv_name in self.op.disks:
3817
        continue
3818
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3819
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3820
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3821
                                 " unsafe to replace the secondary" %
3822
                                 pri_node)
3823

    
3824
    # Step: create new storage
3825
    self.proc.LogStep(3, steps_total, "allocate new storage")
3826
    for dev in instance.disks:
3827
      size = dev.size
3828
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3829
      # since we *always* want to create this LV, we use the
3830
      # _Create...OnPrimary (which forces the creation), even if we
3831
      # are talking about the secondary node
3832
      for new_lv in dev.children:
3833
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3834
                                        _GetInstanceInfoText(instance)):
3835
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3836
                                   " node '%s'" %
3837
                                   (new_lv.logical_id[1], new_node))
3838

    
3839
      iv_names[dev.iv_name] = (dev, dev.children)
3840

    
3841
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3842
    for dev in instance.disks:
3843
      size = dev.size
3844
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3845
      # create new devices on new_node
3846
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3847
                              logical_id=(pri_node, new_node,
3848
                                          dev.logical_id[2]),
3849
                              children=dev.children)
3850
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3851
                                        new_drbd, False,
3852
                                      _GetInstanceInfoText(instance)):
3853
        raise errors.OpExecError("Failed to create new DRBD on"
3854
                                 " node '%s'" % new_node)
3855

    
3856
    for dev in instance.disks:
3857
      # we have new devices, shutdown the drbd on the old secondary
3858
      info("shutting down drbd for %s on old node" % dev.iv_name)
3859
      cfg.SetDiskID(dev, old_node)
3860
      if not rpc.call_blockdev_shutdown(old_node, dev):
3861
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3862
                hint="Please cleanup this device manually as soon as possible")
3863

    
3864
    info("detaching primary drbds from the network (=> standalone)")
3865
    done = 0
3866
    for dev in instance.disks:
3867
      cfg.SetDiskID(dev, pri_node)
3868
      # set the physical (unique in bdev terms) id to None, meaning
3869
      # detach from network
3870
      dev.physical_id = (None,) * len(dev.physical_id)
3871
      # and 'find' the device, which will 'fix' it to match the
3872
      # standalone state
3873
      if rpc.call_blockdev_find(pri_node, dev):
3874
        done += 1
3875
      else:
3876
        warning("Failed to detach drbd %s from network, unusual case" %
3877
                dev.iv_name)
3878

    
3879
    if not done:
3880
      # no detaches succeeded (very unlikely)
3881
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3882

    
3883
    # if we managed to detach at least one, we update all the disks of
3884
    # the instance to point to the new secondary
3885
    info("updating instance configuration")
3886
    for dev in instance.disks:
3887
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3888
      cfg.SetDiskID(dev, pri_node)
3889
    cfg.Update(instance)
3890

    
3891
    # and now perform the drbd attach
3892
    info("attaching primary drbds to new secondary (standalone => connected)")
3893
    failures = []
3894
    for dev in instance.disks:
3895
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3896
      # since the attach is smart, it's enough to 'find' the device,
3897
      # it will automatically activate the network, if the physical_id
3898
      # is correct
3899
      cfg.SetDiskID(dev, pri_node)
3900
      if not rpc.call_blockdev_find(pri_node, dev):
3901
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3902
                "please do a gnt-instance info to see the status of disks")
3903

    
3904
    # this can fail as the old devices are degraded and _WaitForSync
3905
    # does a combined result over all disks, so we don't check its
3906
    # return value
3907
    self.proc.LogStep(5, steps_total, "sync devices")
3908
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3909

    
3910
    # so check manually all the devices
3911
    for name, (dev, old_lvs) in iv_names.iteritems():
3912
      cfg.SetDiskID(dev, pri_node)
3913
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3914
      if is_degr:
3915
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3916

    
3917
    self.proc.LogStep(6, steps_total, "removing old storage")
3918
    for name, (dev, old_lvs) in iv_names.iteritems():
3919
      info("remove logical volumes for %s" % name)
3920
      for lv in old_lvs:
3921
        cfg.SetDiskID(lv, old_node)
3922
        if not rpc.call_blockdev_remove(old_node, lv):
3923
          warning("Can't remove LV on old secondary",
3924
                  hint="Cleanup stale volumes by hand")
3925

    
3926
  def Exec(self, feedback_fn):
3927
    """Execute disk replacement.
3928

3929
    This dispatches the disk replacement to the appropriate handler.
3930

3931
    """
3932
    instance = self.instance
3933
    if instance.disk_template == constants.DT_REMOTE_RAID1:
3934
      fn = self._ExecRR1
3935
    elif instance.disk_template == constants.DT_DRBD8:
3936
      if self.op.remote_node is None:
3937
        fn = self._ExecD8DiskOnly
3938
      else:
3939
        fn = self._ExecD8Secondary
3940
    else:
3941
      raise errors.ProgrammerError("Unhandled disk replacement case")
3942
    return fn(feedback_fn)
3943

    
3944

    
3945
class LUQueryInstanceData(NoHooksLU):
3946
  """Query runtime instance data.
3947

3948
  """
3949
  _OP_REQP = ["instances"]
3950

    
3951
  def CheckPrereq(self):
3952
    """Check prerequisites.
3953

3954
    This only checks the optional instance list against the existing names.
3955

3956
    """
3957
    if not isinstance(self.op.instances, list):
3958
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3959
    if self.op.instances:
3960
      self.wanted_instances = []
3961
      names = self.op.instances
3962
      for name in names:
3963
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3964
        if instance is None:
3965
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3966
        self.wanted_instances.append(instance)
3967
    else:
3968
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3969
                               in self.cfg.GetInstanceList()]
3970
    return
3971

    
3972

    
3973
  def _ComputeDiskStatus(self, instance, snode, dev):
3974
    """Compute block device status.
3975

3976
    """
3977
    self.cfg.SetDiskID(dev, instance.primary_node)
3978
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3979
    if dev.dev_type in constants.LDS_DRBD:
3980
      # we change the snode then (otherwise we use the one passed in)
3981
      if dev.logical_id[0] == instance.primary_node:
3982
        snode = dev.logical_id[1]
3983
      else:
3984
        snode = dev.logical_id[0]
3985

    
3986
    if snode:
3987
      self.cfg.SetDiskID(dev, snode)
3988
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3989
    else:
3990
      dev_sstatus = None
3991

    
3992
    if dev.children:
3993
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3994
                      for child in dev.children]
3995
    else:
3996
      dev_children = []
3997

    
3998
    data = {
3999
      "iv_name": dev.iv_name,
4000
      "dev_type": dev.dev_type,
4001
      "logical_id": dev.logical_id,
4002
      "physical_id": dev.physical_id,
4003
      "pstatus": dev_pstatus,
4004
      "sstatus": dev_sstatus,
4005
      "children": dev_children,
4006
      }
4007

    
4008
    return data
4009

    
4010
  def Exec(self, feedback_fn):
4011
    """Gather and return data"""
4012
    result = {}
4013
    for instance in self.wanted_instances:
4014
      remote_info = rpc.call_instance_info(instance.primary_node,
4015
                                                instance.name)
4016
      if remote_info and "state" in remote_info:
4017
        remote_state = "up"
4018
      else:
4019
        remote_state = "down"
4020
      if instance.status == "down":
4021
        config_state = "down"
4022
      else:
4023
        config_state = "up"
4024

    
4025
      disks = [self._ComputeDiskStatus(instance, None, device)
4026
               for device in instance.disks]
4027

    
4028
      idict = {
4029
        "name": instance.name,
4030
        "config_state": config_state,
4031
        "run_state": remote_state,
4032
        "pnode": instance.primary_node,
4033
        "snodes": instance.secondary_nodes,
4034
        "os": instance.os,
4035
        "memory": instance.memory,
4036
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4037
        "disks": disks,
4038
        "network_port": instance.network_port,
4039
        "vcpus": instance.vcpus,
4040
        "kernel_path": instance.kernel_path,
4041
        "initrd_path": instance.initrd_path,
4042
        "hvm_boot_order": instance.hvm_boot_order,
4043
        }
4044

    
4045
      result[instance.name] = idict
4046

    
4047
    return result
4048

    
4049

    
4050
class LUSetInstanceParms(LogicalUnit):
4051
  """Modifies an instances's parameters.
4052

4053
  """
4054
  HPATH = "instance-modify"
4055
  HTYPE = constants.HTYPE_INSTANCE
4056
  _OP_REQP = ["instance_name"]
4057

    
4058
  def BuildHooksEnv(self):
4059
    """Build hooks env.
4060

4061
    This runs on the master, primary and secondaries.
4062

4063
    """
4064
    args = dict()
4065
    if self.mem:
4066
      args['memory'] = self.mem
4067
    if self.vcpus:
4068
      args['vcpus'] = self.vcpus
4069
    if self.do_ip or self.do_bridge or self.mac:
4070
      if self.do_ip:
4071
        ip = self.ip
4072
      else:
4073
        ip = self.instance.nics[0].ip
4074
      if self.bridge:
4075
        bridge = self.bridge
4076
      else:
4077
        bridge = self.instance.nics[0].bridge
4078
      if self.mac:
4079
        mac = self.mac
4080
      else:
4081
        mac = self.instance.nics[0].mac
4082
      args['nics'] = [(ip, bridge, mac)]
4083
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4084
    nl = [self.sstore.GetMasterNode(),
4085
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4086
    return env, nl, nl
4087

    
4088
  def CheckPrereq(self):
4089
    """Check prerequisites.
4090

4091
    This only checks the instance list against the existing names.
4092

4093
    """
4094
    self.mem = getattr(self.op, "mem", None)
4095
    self.vcpus = getattr(self.op, "vcpus", None)
4096
    self.ip = getattr(self.op, "ip", None)
4097
    self.mac = getattr(self.op, "mac", None)
4098
    self.bridge = getattr(self.op, "bridge", None)
4099
    self.kernel_path = getattr(self.op, "kernel_path", None)
4100
    self.initrd_path = getattr(self.op, "initrd_path", None)
4101
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4102
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4103
                 self.kernel_path, self.initrd_path, self.hvm_boot_order]
4104
    if all_parms.count(None) == len(all_parms):
4105
      raise errors.OpPrereqError("No changes submitted")
4106
    if self.mem is not None:
4107
      try:
4108
        self.mem = int(self.mem)
4109
      except ValueError, err:
4110
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4111
    if self.vcpus is not None:
4112
      try:
4113
        self.vcpus = int(self.vcpus)
4114
      except ValueError, err:
4115
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4116
    if self.ip is not None:
4117
      self.do_ip = True
4118
      if self.ip.lower() == "none":
4119
        self.ip = None
4120
      else:
4121
        if not utils.IsValidIP(self.ip):
4122
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4123
    else:
4124
      self.do_ip = False
4125
    self.do_bridge = (self.bridge is not None)
4126
    if self.mac is not None:
4127
      if self.cfg.IsMacInUse(self.mac):
4128
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4129
                                   self.mac)
4130
      if not utils.IsValidMac(self.mac):
4131
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4132

    
4133
    if self.kernel_path is not None:
4134
      self.do_kernel_path = True
4135
      if self.kernel_path == constants.VALUE_NONE:
4136
        raise errors.OpPrereqError("Can't set instance to no kernel")
4137

    
4138
      if self.kernel_path != constants.VALUE_DEFAULT:
4139
        if not os.path.isabs(self.kernel_path):
4140
          raise errors.OpPrereqError("The kernel path must be an absolute"
4141
                                    " filename")
4142
    else:
4143
      self.do_kernel_path = False
4144

    
4145
    if self.initrd_path is not None:
4146
      self.do_initrd_path = True
4147
      if self.initrd_path not in (constants.VALUE_NONE,
4148
                                  constants.VALUE_DEFAULT):
4149
        if not os.path.isabs(self.initrd_path):
4150
          raise errors.OpPrereqError("The initrd path must be an absolute"
4151
                                    " filename")
4152
    else:
4153
      self.do_initrd_path = False
4154

    
4155
    # boot order verification
4156
    if self.hvm_boot_order is not None:
4157
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4158
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4159
          raise errors.OpPrereqError("invalid boot order specified,"
4160
                                     " must be one or more of [acdn]"
4161
                                     " or 'default'")
4162

    
4163
    instance = self.cfg.GetInstanceInfo(
4164
      self.cfg.ExpandInstanceName(self.op.instance_name))
4165
    if instance is None:
4166
      raise errors.OpPrereqError("No such instance name '%s'" %
4167
                                 self.op.instance_name)
4168
    self.op.instance_name = instance.name
4169
    self.instance = instance
4170
    return
4171

    
4172
  def Exec(self, feedback_fn):
4173
    """Modifies an instance.
4174

4175
    All parameters take effect only at the next restart of the instance.
4176
    """
4177
    result = []
4178
    instance = self.instance
4179
    if self.mem:
4180
      instance.memory = self.mem
4181
      result.append(("mem", self.mem))
4182
    if self.vcpus:
4183
      instance.vcpus = self.vcpus
4184
      result.append(("vcpus",  self.vcpus))
4185
    if self.do_ip:
4186
      instance.nics[0].ip = self.ip
4187
      result.append(("ip", self.ip))
4188
    if self.bridge:
4189
      instance.nics[0].bridge = self.bridge
4190
      result.append(("bridge", self.bridge))
4191
    if self.mac:
4192
      instance.nics[0].mac = self.mac
4193
      result.append(("mac", self.mac))
4194
    if self.do_kernel_path:
4195
      instance.kernel_path = self.kernel_path
4196
      result.append(("kernel_path", self.kernel_path))
4197
    if self.do_initrd_path:
4198
      instance.initrd_path = self.initrd_path
4199
      result.append(("initrd_path", self.initrd_path))
4200
    if self.hvm_boot_order:
4201
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4202
        instance.hvm_boot_order = None
4203
      else:
4204
        instance.hvm_boot_order = self.hvm_boot_order
4205
      result.append(("hvm_boot_order", self.hvm_boot_order))
4206

    
4207
    self.cfg.AddInstance(instance)
4208

    
4209
    return result
4210

    
4211

    
4212
class LUQueryExports(NoHooksLU):
4213
  """Query the exports list
4214

4215
  """
4216
  _OP_REQP = []
4217

    
4218
  def CheckPrereq(self):
4219
    """Check that the nodelist contains only existing nodes.
4220

4221
    """
4222
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4223

    
4224
  def Exec(self, feedback_fn):
4225
    """Compute the list of all the exported system images.
4226

4227
    Returns:
4228
      a dictionary with the structure node->(export-list)
4229
      where export-list is a list of the instances exported on
4230
      that node.
4231

4232
    """
4233
    return rpc.call_export_list(self.nodes)
4234

    
4235

    
4236
class LUExportInstance(LogicalUnit):
4237
  """Export an instance to an image in the cluster.
4238

4239
  """
4240
  HPATH = "instance-export"
4241
  HTYPE = constants.HTYPE_INSTANCE
4242
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4243

    
4244
  def BuildHooksEnv(self):
4245
    """Build hooks env.
4246

4247
    This will run on the master, primary node and target node.
4248

4249
    """
4250
    env = {
4251
      "EXPORT_NODE": self.op.target_node,
4252
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4253
      }
4254
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4255
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4256
          self.op.target_node]
4257
    return env, nl, nl
4258

    
4259
  def CheckPrereq(self):
4260
    """Check prerequisites.
4261

4262
    This checks that the instance name is a valid one.
4263

4264
    """
4265
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4266
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4267
    if self.instance is None:
4268
      raise errors.OpPrereqError("Instance '%s' not found" %
4269
                                 self.op.instance_name)
4270

    
4271
    # node verification
4272
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4273
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4274

    
4275
    if self.dst_node is None:
4276
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4277
                                 self.op.target_node)
4278
    self.op.target_node = self.dst_node.name
4279

    
4280
  def Exec(self, feedback_fn):
4281
    """Export an instance to an image in the cluster.
4282

4283
    """
4284
    instance = self.instance
4285
    dst_node = self.dst_node
4286
    src_node = instance.primary_node
4287
    # shutdown the instance, unless requested not to do so
4288
    if self.op.shutdown:
4289
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
4290
      self.proc.ChainOpCode(op)
4291

    
4292
    vgname = self.cfg.GetVGName()
4293

    
4294
    snap_disks = []
4295

    
4296
    try:
4297
      for disk in instance.disks:
4298
        if disk.iv_name == "sda":
4299
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4300
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4301

    
4302
          if not new_dev_name:
4303
            logger.Error("could not snapshot block device %s on node %s" %
4304
                         (disk.logical_id[1], src_node))
4305
          else:
4306
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4307
                                      logical_id=(vgname, new_dev_name),
4308
                                      physical_id=(vgname, new_dev_name),
4309
                                      iv_name=disk.iv_name)
4310
            snap_disks.append(new_dev)
4311

    
4312
    finally:
4313
      if self.op.shutdown:
4314
        op = opcodes.OpStartupInstance(instance_name=instance.name,
4315
                                       force=False)
4316
        self.proc.ChainOpCode(op)
4317

    
4318
    # TODO: check for size
4319

    
4320
    for dev in snap_disks:
4321
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4322
                                           instance):
4323
        logger.Error("could not export block device %s from node"
4324
                     " %s to node %s" %
4325
                     (dev.logical_id[1], src_node, dst_node.name))
4326
      if not rpc.call_blockdev_remove(src_node, dev):
4327
        logger.Error("could not remove snapshot block device %s from"
4328
                     " node %s" % (dev.logical_id[1], src_node))
4329

    
4330
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4331
      logger.Error("could not finalize export for instance %s on node %s" %
4332
                   (instance.name, dst_node.name))
4333

    
4334
    nodelist = self.cfg.GetNodeList()
4335
    nodelist.remove(dst_node.name)
4336

    
4337
    # on one-node clusters nodelist will be empty after the removal
4338
    # if we proceed the backup would be removed because OpQueryExports
4339
    # substitutes an empty list with the full cluster node list.
4340
    if nodelist:
4341
      op = opcodes.OpQueryExports(nodes=nodelist)
4342
      exportlist = self.proc.ChainOpCode(op)
4343
      for node in exportlist:
4344
        if instance.name in exportlist[node]:
4345
          if not rpc.call_export_remove(node, instance.name):
4346
            logger.Error("could not remove older export for instance %s"
4347
                         " on node %s" % (instance.name, node))
4348

    
4349

    
4350
class TagsLU(NoHooksLU):
4351
  """Generic tags LU.
4352

4353
  This is an abstract class which is the parent of all the other tags LUs.
4354

4355
  """
4356
  def CheckPrereq(self):
4357
    """Check prerequisites.
4358

4359
    """
4360
    if self.op.kind == constants.TAG_CLUSTER:
4361
      self.target = self.cfg.GetClusterInfo()
4362
    elif self.op.kind == constants.TAG_NODE:
4363
      name = self.cfg.ExpandNodeName(self.op.name)
4364
      if name is None:
4365
        raise errors.OpPrereqError("Invalid node name (%s)" %
4366
                                   (self.op.name,))
4367
      self.op.name = name
4368
      self.target = self.cfg.GetNodeInfo(name)
4369
    elif self.op.kind == constants.TAG_INSTANCE:
4370
      name = self.cfg.ExpandInstanceName(self.op.name)
4371
      if name is None:
4372
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4373
                                   (self.op.name,))
4374
      self.op.name = name
4375
      self.target = self.cfg.GetInstanceInfo(name)
4376
    else:
4377
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4378
                                 str(self.op.kind))
4379

    
4380

    
4381
class LUGetTags(TagsLU):
4382
  """Returns the tags of a given object.
4383

4384
  """
4385
  _OP_REQP = ["kind", "name"]
4386

    
4387
  def Exec(self, feedback_fn):
4388
    """Returns the tag list.
4389

4390
    """
4391
    return self.target.GetTags()
4392

    
4393

    
4394
class LUSearchTags(NoHooksLU):
4395
  """Searches the tags for a given pattern.
4396

4397
  """
4398
  _OP_REQP = ["pattern"]
4399

    
4400
  def CheckPrereq(self):
4401
    """Check prerequisites.
4402

4403
    This checks the pattern passed for validity by compiling it.
4404

4405
    """
4406
    try:
4407
      self.re = re.compile(self.op.pattern)
4408
    except re.error, err:
4409
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4410
                                 (self.op.pattern, err))
4411

    
4412
  def Exec(self, feedback_fn):
4413
    """Returns the tag list.
4414

4415
    """
4416
    cfg = self.cfg
4417
    tgts = [("/cluster", cfg.GetClusterInfo())]
4418
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4419
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4420
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4421
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4422
    results = []
4423
    for path, target in tgts:
4424
      for tag in target.GetTags():
4425
        if self.re.search(tag):
4426
          results.append((path, tag))
4427
    return results
4428

    
4429

    
4430
class LUAddTags(TagsLU):
4431
  """Sets a tag on a given object.
4432

4433
  """
4434
  _OP_REQP = ["kind", "name", "tags"]
4435

    
4436
  def CheckPrereq(self):
4437
    """Check prerequisites.
4438

4439
    This checks the type and length of the tag name and value.
4440

4441
    """
4442
    TagsLU.CheckPrereq(self)
4443
    for tag in self.op.tags:
4444
      objects.TaggableObject.ValidateTag(tag)
4445

    
4446
  def Exec(self, feedback_fn):
4447
    """Sets the tag.
4448

4449
    """
4450
    try:
4451
      for tag in self.op.tags:
4452
        self.target.AddTag(tag)
4453
    except errors.TagError, err:
4454
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4455
    try:
4456
      self.cfg.Update(self.target)
4457
    except errors.ConfigurationError:
4458
      raise errors.OpRetryError("There has been a modification to the"
4459
                                " config file and the operation has been"
4460
                                " aborted. Please retry.")
4461

    
4462

    
4463
class LUDelTags(TagsLU):
4464
  """Delete a list of tags from a given object.
4465

4466
  """
4467
  _OP_REQP = ["kind", "name", "tags"]
4468

    
4469
  def CheckPrereq(self):
4470
    """Check prerequisites.
4471

4472
    This checks that we have the given tag.
4473

4474
    """
4475
    TagsLU.CheckPrereq(self)
4476
    for tag in self.op.tags:
4477
      objects.TaggableObject.ValidateTag(tag)
4478
    del_tags = frozenset(self.op.tags)
4479
    cur_tags = self.target.GetTags()
4480
    if not del_tags <= cur_tags:
4481
      diff_tags = del_tags - cur_tags
4482
      diff_names = ["'%s'" % tag for tag in diff_tags]
4483
      diff_names.sort()
4484
      raise errors.OpPrereqError("Tag(s) %s not found" %
4485
                                 (",".join(diff_names)))
4486

    
4487
  def Exec(self, feedback_fn):
4488
    """Remove the tag from the object.
4489

4490
    """
4491
    for tag in self.op.tags:
4492
      self.target.RemoveTag(tag)
4493
    try:
4494
      self.cfg.Update(self.target)
4495
    except errors.ConfigurationError:
4496
      raise errors.OpRetryError("There has been a modification to the"
4497
                                " config file and the operation has been"
4498
                                " aborted. Please retry.")
4499

    
4500
class LUTestDelay(NoHooksLU):
4501
  """Sleep for a specified amount of time.
4502

4503
  This LU sleeps on the master and/or nodes for a specified amoutn of
4504
  time.
4505

4506
  """
4507
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4508

    
4509
  def CheckPrereq(self):
4510
    """Check prerequisites.
4511

4512
    This checks that we have a good list of nodes and/or the duration
4513
    is valid.
4514

4515
    """
4516

    
4517
    if self.op.on_nodes:
4518
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4519

    
4520
  def Exec(self, feedback_fn):
4521
    """Do the actual sleep.
4522

4523
    """
4524
    if self.op.on_master:
4525
      if not utils.TestDelay(self.op.duration):
4526
        raise errors.OpExecError("Error during master delay test")
4527
    if self.op.on_nodes:
4528
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4529
      if not result:
4530
        raise errors.OpExecError("Complete failure from rpc call")
4531
      for node, node_result in result.items():
4532
        if not node_result:
4533
          raise errors.OpExecError("Failure during rpc call to node %s,"
4534
                                   " result: %s" % (node, node_result))