Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 3b7ed473

History | View | Annotate | Download (153.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45

    
46
class LogicalUnit(object):
47
  """Logical Unit base class.
48

49
  Subclasses must follow these rules:
50
    - implement CheckPrereq which also fills in the opcode instance
51
      with all the fields (even if as None)
52
    - implement Exec
53
    - implement BuildHooksEnv
54
    - redefine HPATH and HTYPE
55
    - optionally redefine their run requirements (REQ_CLUSTER,
56
      REQ_MASTER); note that all commands require root permissions
57

58
  """
59
  HPATH = None
60
  HTYPE = None
61
  _OP_REQP = []
62
  REQ_CLUSTER = True
63
  REQ_MASTER = True
64

    
65
  def __init__(self, processor, op, cfg, sstore):
66
    """Constructor for LogicalUnit.
67

68
    This needs to be overriden in derived classes in order to check op
69
    validity.
70

71
    """
72
    self.proc = processor
73
    self.op = op
74
    self.cfg = cfg
75
    self.sstore = sstore
76
    for attr_name in self._OP_REQP:
77
      attr_val = getattr(op, attr_name, None)
78
      if attr_val is None:
79
        raise errors.OpPrereqError("Required parameter '%s' missing" %
80
                                   attr_name)
81
    if self.REQ_CLUSTER:
82
      if not cfg.IsCluster():
83
        raise errors.OpPrereqError("Cluster not initialized yet,"
84
                                   " use 'gnt-cluster init' first.")
85
      if self.REQ_MASTER:
86
        master = sstore.GetMasterNode()
87
        if master != utils.HostInfo().name:
88
          raise errors.OpPrereqError("Commands must be run on the master"
89
                                     " node %s" % master)
90

    
91
  def CheckPrereq(self):
92
    """Check prerequisites for this LU.
93

94
    This method should check that the prerequisites for the execution
95
    of this LU are fulfilled. It can do internode communication, but
96
    it should be idempotent - no cluster or system changes are
97
    allowed.
98

99
    The method should raise errors.OpPrereqError in case something is
100
    not fulfilled. Its return value is ignored.
101

102
    This method should also update all the parameters of the opcode to
103
    their canonical form; e.g. a short node name must be fully
104
    expanded after this method has successfully completed (so that
105
    hooks, logging, etc. work correctly).
106

107
    """
108
    raise NotImplementedError
109

    
110
  def Exec(self, feedback_fn):
111
    """Execute the LU.
112

113
    This method should implement the actual work. It should raise
114
    errors.OpExecError for failures that are somewhat dealt with in
115
    code, or expected.
116

117
    """
118
    raise NotImplementedError
119

    
120
  def BuildHooksEnv(self):
121
    """Build hooks environment for this LU.
122

123
    This method should return a three-node tuple consisting of: a dict
124
    containing the environment that will be used for running the
125
    specific hook for this LU, a list of node names on which the hook
126
    should run before the execution, and a list of node names on which
127
    the hook should run after the execution.
128

129
    The keys of the dict must not have 'GANETI_' prefixed as this will
130
    be handled in the hooks runner. Also note additional keys will be
131
    added by the hooks runner. If the LU doesn't define any
132
    environment, an empty dict (and not None) should be returned.
133

134
    As for the node lists, the master should not be included in the
135
    them, as it will be added by the hooks runner in case this LU
136
    requires a cluster to run on (otherwise we don't have a node
137
    list). No nodes should be returned as an empty list (and not
138
    None).
139

140
    Note that if the HPATH for a LU class is None, this function will
141
    not be called.
142

143
    """
144
    raise NotImplementedError
145

    
146

    
147
class NoHooksLU(LogicalUnit):
148
  """Simple LU which runs no hooks.
149

150
  This LU is intended as a parent for other LogicalUnits which will
151
  run no hooks, in order to reduce duplicate code.
152

153
  """
154
  HPATH = None
155
  HTYPE = None
156

    
157
  def BuildHooksEnv(self):
158
    """Build hooks env.
159

160
    This is a no-op, since we don't run hooks.
161

162
    """
163
    return {}, [], []
164

    
165

    
166
def _AddHostToEtcHosts(hostname):
167
  """Wrapper around utils.SetEtcHostsEntry.
168

169
  """
170
  hi = utils.HostInfo(name=hostname)
171
  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
172

    
173

    
174
def _RemoveHostFromEtcHosts(hostname):
175
  """Wrapper around utils.RemoveEtcHostsEntry.
176

177
  """
178
  hi = utils.HostInfo(name=hostname)
179
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
180
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
181

    
182

    
183
def _GetWantedNodes(lu, nodes):
184
  """Returns list of checked and expanded node names.
185

186
  Args:
187
    nodes: List of nodes (strings) or None for all
188

189
  """
190
  if not isinstance(nodes, list):
191
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
192

    
193
  if nodes:
194
    wanted = []
195

    
196
    for name in nodes:
197
      node = lu.cfg.ExpandNodeName(name)
198
      if node is None:
199
        raise errors.OpPrereqError("No such node name '%s'" % name)
200
      wanted.append(node)
201

    
202
  else:
203
    wanted = lu.cfg.GetNodeList()
204
  return utils.NiceSort(wanted)
205

    
206

    
207
def _GetWantedInstances(lu, instances):
208
  """Returns list of checked and expanded instance names.
209

210
  Args:
211
    instances: List of instances (strings) or None for all
212

213
  """
214
  if not isinstance(instances, list):
215
    raise errors.OpPrereqError("Invalid argument type 'instances'")
216

    
217
  if instances:
218
    wanted = []
219

    
220
    for name in instances:
221
      instance = lu.cfg.ExpandInstanceName(name)
222
      if instance is None:
223
        raise errors.OpPrereqError("No such instance name '%s'" % name)
224
      wanted.append(instance)
225

    
226
  else:
227
    wanted = lu.cfg.GetInstanceList()
228
  return utils.NiceSort(wanted)
229

    
230

    
231
def _CheckOutputFields(static, dynamic, selected):
232
  """Checks whether all selected fields are valid.
233

234
  Args:
235
    static: Static fields
236
    dynamic: Dynamic fields
237

238
  """
239
  static_fields = frozenset(static)
240
  dynamic_fields = frozenset(dynamic)
241

    
242
  all_fields = static_fields | dynamic_fields
243

    
244
  if not all_fields.issuperset(selected):
245
    raise errors.OpPrereqError("Unknown output fields selected: %s"
246
                               % ",".join(frozenset(selected).
247
                                          difference(all_fields)))
248

    
249

    
250
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
251
                          memory, vcpus, nics):
252
  """Builds instance related env variables for hooks from single variables.
253

254
  Args:
255
    secondary_nodes: List of secondary nodes as strings
256
  """
257
  env = {
258
    "OP_TARGET": name,
259
    "INSTANCE_NAME": name,
260
    "INSTANCE_PRIMARY": primary_node,
261
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
262
    "INSTANCE_OS_TYPE": os_type,
263
    "INSTANCE_STATUS": status,
264
    "INSTANCE_MEMORY": memory,
265
    "INSTANCE_VCPUS": vcpus,
266
  }
267

    
268
  if nics:
269
    nic_count = len(nics)
270
    for idx, (ip, bridge, mac) in enumerate(nics):
271
      if ip is None:
272
        ip = ""
273
      env["INSTANCE_NIC%d_IP" % idx] = ip
274
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
275
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
276
  else:
277
    nic_count = 0
278

    
279
  env["INSTANCE_NIC_COUNT"] = nic_count
280

    
281
  return env
282

    
283

    
284
def _BuildInstanceHookEnvByObject(instance, override=None):
285
  """Builds instance related env variables for hooks from an object.
286

287
  Args:
288
    instance: objects.Instance object of instance
289
    override: dict of values to override
290
  """
291
  args = {
292
    'name': instance.name,
293
    'primary_node': instance.primary_node,
294
    'secondary_nodes': instance.secondary_nodes,
295
    'os_type': instance.os,
296
    'status': instance.os,
297
    'memory': instance.memory,
298
    'vcpus': instance.vcpus,
299
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
300
  }
301
  if override:
302
    args.update(override)
303
  return _BuildInstanceHookEnv(**args)
304

    
305

    
306
def _UpdateKnownHosts(fullnode, ip, pubkey):
307
  """Ensure a node has a correct known_hosts entry.
308

309
  Args:
310
    fullnode - Fully qualified domain name of host. (str)
311
    ip       - IPv4 address of host (str)
312
    pubkey   - the public key of the cluster
313

314
  """
315
  if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
316
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
317
  else:
318
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
319

    
320
  inthere = False
321

    
322
  save_lines = []
323
  add_lines = []
324
  removed = False
325

    
326
  for rawline in f:
327
    logger.Debug('read %s' % (repr(rawline),))
328

    
329
    parts = rawline.rstrip('\r\n').split()
330

    
331
    # Ignore unwanted lines
332
    if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
333
      fields = parts[0].split(',')
334
      key = parts[2]
335

    
336
      haveall = True
337
      havesome = False
338
      for spec in [ ip, fullnode ]:
339
        if spec not in fields:
340
          haveall = False
341
        if spec in fields:
342
          havesome = True
343

    
344
      logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
345
      if haveall and key == pubkey:
346
        inthere = True
347
        save_lines.append(rawline)
348
        logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
349
        continue
350

    
351
      if havesome and (not haveall or key != pubkey):
352
        removed = True
353
        logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
354
        continue
355

    
356
    save_lines.append(rawline)
357

    
358
  if not inthere:
359
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
360
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
361

    
362
  if removed:
363
    save_lines = save_lines + add_lines
364

    
365
    # Write a new file and replace old.
366
    fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
367
                                   constants.DATA_DIR)
368
    newfile = os.fdopen(fd, 'w')
369
    try:
370
      newfile.write(''.join(save_lines))
371
    finally:
372
      newfile.close()
373
    logger.Debug("Wrote new known_hosts.")
374
    os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
375

    
376
  elif add_lines:
377
    # Simply appending a new line will do the trick.
378
    f.seek(0, 2)
379
    for add in add_lines:
380
      f.write(add)
381

    
382
  f.close()
383

    
384

    
385
def _HasValidVG(vglist, vgname):
386
  """Checks if the volume group list is valid.
387

388
  A non-None return value means there's an error, and the return value
389
  is the error message.
390

391
  """
392
  vgsize = vglist.get(vgname, None)
393
  if vgsize is None:
394
    return "volume group '%s' missing" % vgname
395
  elif vgsize < 20480:
396
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
397
            (vgname, vgsize))
398
  return None
399

    
400

    
401
def _InitSSHSetup(node):
402
  """Setup the SSH configuration for the cluster.
403

404

405
  This generates a dsa keypair for root, adds the pub key to the
406
  permitted hosts and adds the hostkey to its own known hosts.
407

408
  Args:
409
    node: the name of this host as a fqdn
410

411
  """
412
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
413

    
414
  for name in priv_key, pub_key:
415
    if os.path.exists(name):
416
      utils.CreateBackup(name)
417
    utils.RemoveFile(name)
418

    
419
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
420
                         "-f", priv_key,
421
                         "-q", "-N", ""])
422
  if result.failed:
423
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
424
                             result.output)
425

    
426
  f = open(pub_key, 'r')
427
  try:
428
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
429
  finally:
430
    f.close()
431

    
432

    
433
def _InitGanetiServerSetup(ss):
434
  """Setup the necessary configuration for the initial node daemon.
435

436
  This creates the nodepass file containing the shared password for
437
  the cluster and also generates the SSL certificate.
438

439
  """
440
  # Create pseudo random password
441
  randpass = sha.new(os.urandom(64)).hexdigest()
442
  # and write it into sstore
443
  ss.SetKey(ss.SS_NODED_PASS, randpass)
444

    
445
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
446
                         "-days", str(365*5), "-nodes", "-x509",
447
                         "-keyout", constants.SSL_CERT_FILE,
448
                         "-out", constants.SSL_CERT_FILE, "-batch"])
449
  if result.failed:
450
    raise errors.OpExecError("could not generate server ssl cert, command"
451
                             " %s had exitcode %s and error message %s" %
452
                             (result.cmd, result.exit_code, result.output))
453

    
454
  os.chmod(constants.SSL_CERT_FILE, 0400)
455

    
456
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
457

    
458
  if result.failed:
459
    raise errors.OpExecError("Could not start the node daemon, command %s"
460
                             " had exitcode %s and error %s" %
461
                             (result.cmd, result.exit_code, result.output))
462

    
463

    
464
def _CheckInstanceBridgesExist(instance):
465
  """Check that the brigdes needed by an instance exist.
466

467
  """
468
  # check bridges existance
469
  brlist = [nic.bridge for nic in instance.nics]
470
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
471
    raise errors.OpPrereqError("one or more target bridges %s does not"
472
                               " exist on destination node '%s'" %
473
                               (brlist, instance.primary_node))
474

    
475

    
476
class LUInitCluster(LogicalUnit):
477
  """Initialise the cluster.
478

479
  """
480
  HPATH = "cluster-init"
481
  HTYPE = constants.HTYPE_CLUSTER
482
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
483
              "def_bridge", "master_netdev"]
484
  REQ_CLUSTER = False
485

    
486
  def BuildHooksEnv(self):
487
    """Build hooks env.
488

489
    Notes: Since we don't require a cluster, we must manually add
490
    ourselves in the post-run node list.
491

492
    """
493
    env = {"OP_TARGET": self.op.cluster_name}
494
    return env, [], [self.hostname.name]
495

    
496
  def CheckPrereq(self):
497
    """Verify that the passed name is a valid one.
498

499
    """
500
    if config.ConfigWriter.IsCluster():
501
      raise errors.OpPrereqError("Cluster is already initialised")
502

    
503
    if self.op.hypervisor_type == constants.HT_XEN_HVM31:
504
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
505
        raise errors.OpPrereqError("Please prepare the cluster VNC"
506
                                   "password file %s" %
507
                                   constants.VNC_PASSWORD_FILE)
508

    
509
    self.hostname = hostname = utils.HostInfo()
510

    
511
    if hostname.ip.startswith("127."):
512
      raise errors.OpPrereqError("This host's IP resolves to the private"
513
                                 " range (%s). Please fix DNS or %s." %
514
                                 (hostname.ip, constants.ETC_HOSTS))
515

    
516
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
517

    
518
    if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
519
                         constants.DEFAULT_NODED_PORT):
520
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
521
                                 " to %s,\nbut this ip address does not"
522
                                 " belong to this host."
523
                                 " Aborting." % hostname.ip)
524

    
525
    secondary_ip = getattr(self.op, "secondary_ip", None)
526
    if secondary_ip and not utils.IsValidIP(secondary_ip):
527
      raise errors.OpPrereqError("Invalid secondary ip given")
528
    if (secondary_ip and
529
        secondary_ip != hostname.ip and
530
        (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
531
                           constants.DEFAULT_NODED_PORT))):
532
      raise errors.OpPrereqError("You gave %s as secondary IP,"
533
                                 " but it does not belong to this host." %
534
                                 secondary_ip)
535
    self.secondary_ip = secondary_ip
536

    
537
    # checks presence of the volume group given
538
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
539

    
540
    if vgstatus:
541
      raise errors.OpPrereqError("Error: %s" % vgstatus)
542

    
543
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
544
                    self.op.mac_prefix):
545
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
546
                                 self.op.mac_prefix)
547

    
548
    if self.op.hypervisor_type not in constants.HYPER_TYPES:
549
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
550
                                 self.op.hypervisor_type)
551

    
552
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
553
    if result.failed:
554
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
555
                                 (self.op.master_netdev,
556
                                  result.output.strip()))
557

    
558
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
559
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
560
      raise errors.OpPrereqError("Init.d script '%s' missing or not"
561
                                 " executable." % constants.NODE_INITD_SCRIPT)
562

    
563
  def Exec(self, feedback_fn):
564
    """Initialize the cluster.
565

566
    """
567
    clustername = self.clustername
568
    hostname = self.hostname
569

    
570
    # set up the simple store
571
    self.sstore = ss = ssconf.SimpleStore()
572
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
573
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
574
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
575
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
576
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
577

    
578
    # set up the inter-node password and certificate
579
    _InitGanetiServerSetup(ss)
580

    
581
    # start the master ip
582
    rpc.call_node_start_master(hostname.name)
583

    
584
    # set up ssh config and /etc/hosts
585
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
586
    try:
587
      sshline = f.read()
588
    finally:
589
      f.close()
590
    sshkey = sshline.split(" ")[1]
591

    
592
    _AddHostToEtcHosts(hostname.name)
593

    
594
    _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
595

    
596
    _InitSSHSetup(hostname.name)
597

    
598
    # init of cluster config file
599
    self.cfg = cfgw = config.ConfigWriter()
600
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
601
                    sshkey, self.op.mac_prefix,
602
                    self.op.vg_name, self.op.def_bridge)
603

    
604

    
605
class LUDestroyCluster(NoHooksLU):
606
  """Logical unit for destroying the cluster.
607

608
  """
609
  _OP_REQP = []
610

    
611
  def CheckPrereq(self):
612
    """Check prerequisites.
613

614
    This checks whether the cluster is empty.
615

616
    Any errors are signalled by raising errors.OpPrereqError.
617

618
    """
619
    master = self.sstore.GetMasterNode()
620

    
621
    nodelist = self.cfg.GetNodeList()
622
    if len(nodelist) != 1 or nodelist[0] != master:
623
      raise errors.OpPrereqError("There are still %d node(s) in"
624
                                 " this cluster." % (len(nodelist) - 1))
625
    instancelist = self.cfg.GetInstanceList()
626
    if instancelist:
627
      raise errors.OpPrereqError("There are still %d instance(s) in"
628
                                 " this cluster." % len(instancelist))
629

    
630
  def Exec(self, feedback_fn):
631
    """Destroys the cluster.
632

633
    """
634
    master = self.sstore.GetMasterNode()
635
    if not rpc.call_node_stop_master(master):
636
      raise errors.OpExecError("Could not disable the master role")
637
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
638
    utils.CreateBackup(priv_key)
639
    utils.CreateBackup(pub_key)
640
    rpc.call_node_leave_cluster(master)
641

    
642

    
643
class LUVerifyCluster(NoHooksLU):
644
  """Verifies the cluster status.
645

646
  """
647
  _OP_REQP = []
648

    
649
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
650
                  remote_version, feedback_fn):
651
    """Run multiple tests against a node.
652

653
    Test list:
654
      - compares ganeti version
655
      - checks vg existance and size > 20G
656
      - checks config file checksum
657
      - checks ssh to other nodes
658

659
    Args:
660
      node: name of the node to check
661
      file_list: required list of files
662
      local_cksum: dictionary of local files and their checksums
663

664
    """
665
    # compares ganeti version
666
    local_version = constants.PROTOCOL_VERSION
667
    if not remote_version:
668
      feedback_fn(" - ERROR: connection to %s failed" % (node))
669
      return True
670

    
671
    if local_version != remote_version:
672
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
673
                      (local_version, node, remote_version))
674
      return True
675

    
676
    # checks vg existance and size > 20G
677

    
678
    bad = False
679
    if not vglist:
680
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
681
                      (node,))
682
      bad = True
683
    else:
684
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
685
      if vgstatus:
686
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
687
        bad = True
688

    
689
    # checks config file checksum
690
    # checks ssh to any
691

    
692
    if 'filelist' not in node_result:
693
      bad = True
694
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
695
    else:
696
      remote_cksum = node_result['filelist']
697
      for file_name in file_list:
698
        if file_name not in remote_cksum:
699
          bad = True
700
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
701
        elif remote_cksum[file_name] != local_cksum[file_name]:
702
          bad = True
703
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
704

    
705
    if 'nodelist' not in node_result:
706
      bad = True
707
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
708
    else:
709
      if node_result['nodelist']:
710
        bad = True
711
        for node in node_result['nodelist']:
712
          feedback_fn("  - ERROR: communication with node '%s': %s" %
713
                          (node, node_result['nodelist'][node]))
714
    hyp_result = node_result.get('hypervisor', None)
715
    if hyp_result is not None:
716
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
717
    return bad
718

    
719
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
720
    """Verify an instance.
721

722
    This function checks to see if the required block devices are
723
    available on the instance's node.
724

725
    """
726
    bad = False
727

    
728
    instancelist = self.cfg.GetInstanceList()
729
    if not instance in instancelist:
730
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
731
                      (instance, instancelist))
732
      bad = True
733

    
734
    instanceconfig = self.cfg.GetInstanceInfo(instance)
735
    node_current = instanceconfig.primary_node
736

    
737
    node_vol_should = {}
738
    instanceconfig.MapLVsByNode(node_vol_should)
739

    
740
    for node in node_vol_should:
741
      for volume in node_vol_should[node]:
742
        if node not in node_vol_is or volume not in node_vol_is[node]:
743
          feedback_fn("  - ERROR: volume %s missing on node %s" %
744
                          (volume, node))
745
          bad = True
746

    
747
    if not instanceconfig.status == 'down':
748
      if not instance in node_instance[node_current]:
749
        feedback_fn("  - ERROR: instance %s not running on node %s" %
750
                        (instance, node_current))
751
        bad = True
752

    
753
    for node in node_instance:
754
      if (not node == node_current):
755
        if instance in node_instance[node]:
756
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
757
                          (instance, node))
758
          bad = True
759

    
760
    return bad
761

    
762
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
763
    """Verify if there are any unknown volumes in the cluster.
764

765
    The .os, .swap and backup volumes are ignored. All other volumes are
766
    reported as unknown.
767

768
    """
769
    bad = False
770

    
771
    for node in node_vol_is:
772
      for volume in node_vol_is[node]:
773
        if node not in node_vol_should or volume not in node_vol_should[node]:
774
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
775
                      (volume, node))
776
          bad = True
777
    return bad
778

    
779
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
780
    """Verify the list of running instances.
781

782
    This checks what instances are running but unknown to the cluster.
783

784
    """
785
    bad = False
786
    for node in node_instance:
787
      for runninginstance in node_instance[node]:
788
        if runninginstance not in instancelist:
789
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
790
                          (runninginstance, node))
791
          bad = True
792
    return bad
793

    
794
  def CheckPrereq(self):
795
    """Check prerequisites.
796

797
    This has no prerequisites.
798

799
    """
800
    pass
801

    
802
  def Exec(self, feedback_fn):
803
    """Verify integrity of cluster, performing various test on nodes.
804

805
    """
806
    bad = False
807
    feedback_fn("* Verifying global settings")
808
    for msg in self.cfg.VerifyConfig():
809
      feedback_fn("  - ERROR: %s" % msg)
810

    
811
    vg_name = self.cfg.GetVGName()
812
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
813
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
814
    node_volume = {}
815
    node_instance = {}
816

    
817
    # FIXME: verify OS list
818
    # do local checksums
819
    file_names = list(self.sstore.GetFileList())
820
    file_names.append(constants.SSL_CERT_FILE)
821
    file_names.append(constants.CLUSTER_CONF_FILE)
822
    local_checksums = utils.FingerprintFiles(file_names)
823

    
824
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
825
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
826
    all_instanceinfo = rpc.call_instance_list(nodelist)
827
    all_vglist = rpc.call_vg_list(nodelist)
828
    node_verify_param = {
829
      'filelist': file_names,
830
      'nodelist': nodelist,
831
      'hypervisor': None,
832
      }
833
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
834
    all_rversion = rpc.call_version(nodelist)
835

    
836
    for node in nodelist:
837
      feedback_fn("* Verifying node %s" % node)
838
      result = self._VerifyNode(node, file_names, local_checksums,
839
                                all_vglist[node], all_nvinfo[node],
840
                                all_rversion[node], feedback_fn)
841
      bad = bad or result
842

    
843
      # node_volume
844
      volumeinfo = all_volumeinfo[node]
845

    
846
      if isinstance(volumeinfo, basestring):
847
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
848
                    (node, volumeinfo[-400:].encode('string_escape')))
849
        bad = True
850
        node_volume[node] = {}
851
      elif not isinstance(volumeinfo, dict):
852
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
853
        bad = True
854
        continue
855
      else:
856
        node_volume[node] = volumeinfo
857

    
858
      # node_instance
859
      nodeinstance = all_instanceinfo[node]
860
      if type(nodeinstance) != list:
861
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
862
        bad = True
863
        continue
864

    
865
      node_instance[node] = nodeinstance
866

    
867
    node_vol_should = {}
868

    
869
    for instance in instancelist:
870
      feedback_fn("* Verifying instance %s" % instance)
871
      result =  self._VerifyInstance(instance, node_volume, node_instance,
872
                                     feedback_fn)
873
      bad = bad or result
874

    
875
      inst_config = self.cfg.GetInstanceInfo(instance)
876

    
877
      inst_config.MapLVsByNode(node_vol_should)
878

    
879
    feedback_fn("* Verifying orphan volumes")
880
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
881
                                       feedback_fn)
882
    bad = bad or result
883

    
884
    feedback_fn("* Verifying remaining instances")
885
    result = self._VerifyOrphanInstances(instancelist, node_instance,
886
                                         feedback_fn)
887
    bad = bad or result
888

    
889
    return int(bad)
890

    
891

    
892
class LUVerifyDisks(NoHooksLU):
893
  """Verifies the cluster disks status.
894

895
  """
896
  _OP_REQP = []
897

    
898
  def CheckPrereq(self):
899
    """Check prerequisites.
900

901
    This has no prerequisites.
902

903
    """
904
    pass
905

    
906
  def Exec(self, feedback_fn):
907
    """Verify integrity of cluster disks.
908

909
    """
910
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
911

    
912
    vg_name = self.cfg.GetVGName()
913
    nodes = utils.NiceSort(self.cfg.GetNodeList())
914
    instances = [self.cfg.GetInstanceInfo(name)
915
                 for name in self.cfg.GetInstanceList()]
916

    
917
    nv_dict = {}
918
    for inst in instances:
919
      inst_lvs = {}
920
      if (inst.status != "up" or
921
          inst.disk_template not in constants.DTS_NET_MIRROR):
922
        continue
923
      inst.MapLVsByNode(inst_lvs)
924
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
925
      for node, vol_list in inst_lvs.iteritems():
926
        for vol in vol_list:
927
          nv_dict[(node, vol)] = inst
928

    
929
    if not nv_dict:
930
      return result
931

    
932
    node_lvs = rpc.call_volume_list(nodes, vg_name)
933

    
934
    to_act = set()
935
    for node in nodes:
936
      # node_volume
937
      lvs = node_lvs[node]
938

    
939
      if isinstance(lvs, basestring):
940
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
941
        res_nlvm[node] = lvs
942
      elif not isinstance(lvs, dict):
943
        logger.Info("connection to node %s failed or invalid data returned" %
944
                    (node,))
945
        res_nodes.append(node)
946
        continue
947

    
948
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
949
        inst = nv_dict.pop((node, lv_name), None)
950
        if (not lv_online and inst is not None
951
            and inst.name not in res_instances):
952
          res_instances.append(inst.name)
953

    
954
    # any leftover items in nv_dict are missing LVs, let's arrange the
955
    # data better
956
    for key, inst in nv_dict.iteritems():
957
      if inst.name not in res_missing:
958
        res_missing[inst.name] = []
959
      res_missing[inst.name].append(key)
960

    
961
    return result
962

    
963

    
964
class LURenameCluster(LogicalUnit):
965
  """Rename the cluster.
966

967
  """
968
  HPATH = "cluster-rename"
969
  HTYPE = constants.HTYPE_CLUSTER
970
  _OP_REQP = ["name"]
971

    
972
  def BuildHooksEnv(self):
973
    """Build hooks env.
974

975
    """
976
    env = {
977
      "OP_TARGET": self.sstore.GetClusterName(),
978
      "NEW_NAME": self.op.name,
979
      }
980
    mn = self.sstore.GetMasterNode()
981
    return env, [mn], [mn]
982

    
983
  def CheckPrereq(self):
984
    """Verify that the passed name is a valid one.
985

986
    """
987
    hostname = utils.HostInfo(self.op.name)
988

    
989
    new_name = hostname.name
990
    self.ip = new_ip = hostname.ip
991
    old_name = self.sstore.GetClusterName()
992
    old_ip = self.sstore.GetMasterIP()
993
    if new_name == old_name and new_ip == old_ip:
994
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
995
                                 " cluster has changed")
996
    if new_ip != old_ip:
997
      result = utils.RunCmd(["fping", "-q", new_ip])
998
      if not result.failed:
999
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1000
                                   " reachable on the network. Aborting." %
1001
                                   new_ip)
1002

    
1003
    self.op.name = new_name
1004

    
1005
  def Exec(self, feedback_fn):
1006
    """Rename the cluster.
1007

1008
    """
1009
    clustername = self.op.name
1010
    ip = self.ip
1011
    ss = self.sstore
1012

    
1013
    # shutdown the master IP
1014
    master = ss.GetMasterNode()
1015
    if not rpc.call_node_stop_master(master):
1016
      raise errors.OpExecError("Could not disable the master role")
1017

    
1018
    try:
1019
      # modify the sstore
1020
      ss.SetKey(ss.SS_MASTER_IP, ip)
1021
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1022

    
1023
      # Distribute updated ss config to all nodes
1024
      myself = self.cfg.GetNodeInfo(master)
1025
      dist_nodes = self.cfg.GetNodeList()
1026
      if myself.name in dist_nodes:
1027
        dist_nodes.remove(myself.name)
1028

    
1029
      logger.Debug("Copying updated ssconf data to all nodes")
1030
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1031
        fname = ss.KeyToFilename(keyname)
1032
        result = rpc.call_upload_file(dist_nodes, fname)
1033
        for to_node in dist_nodes:
1034
          if not result[to_node]:
1035
            logger.Error("copy of file %s to node %s failed" %
1036
                         (fname, to_node))
1037
    finally:
1038
      if not rpc.call_node_start_master(master):
1039
        logger.Error("Could not re-enable the master role on the master,"
1040
                     " please restart manually.")
1041

    
1042

    
1043
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1044
  """Sleep and poll for an instance's disk to sync.
1045

1046
  """
1047
  if not instance.disks:
1048
    return True
1049

    
1050
  if not oneshot:
1051
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1052

    
1053
  node = instance.primary_node
1054

    
1055
  for dev in instance.disks:
1056
    cfgw.SetDiskID(dev, node)
1057

    
1058
  retries = 0
1059
  while True:
1060
    max_time = 0
1061
    done = True
1062
    cumul_degraded = False
1063
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1064
    if not rstats:
1065
      proc.LogWarning("Can't get any data from node %s" % node)
1066
      retries += 1
1067
      if retries >= 10:
1068
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1069
                                 " aborting." % node)
1070
      time.sleep(6)
1071
      continue
1072
    retries = 0
1073
    for i in range(len(rstats)):
1074
      mstat = rstats[i]
1075
      if mstat is None:
1076
        proc.LogWarning("Can't compute data for node %s/%s" %
1077
                        (node, instance.disks[i].iv_name))
1078
        continue
1079
      # we ignore the ldisk parameter
1080
      perc_done, est_time, is_degraded, _ = mstat
1081
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1082
      if perc_done is not None:
1083
        done = False
1084
        if est_time is not None:
1085
          rem_time = "%d estimated seconds remaining" % est_time
1086
          max_time = est_time
1087
        else:
1088
          rem_time = "no time estimate"
1089
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1090
                     (instance.disks[i].iv_name, perc_done, rem_time))
1091
    if done or oneshot:
1092
      break
1093

    
1094
    if unlock:
1095
      utils.Unlock('cmd')
1096
    try:
1097
      time.sleep(min(60, max_time))
1098
    finally:
1099
      if unlock:
1100
        utils.Lock('cmd')
1101

    
1102
  if done:
1103
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1104
  return not cumul_degraded
1105

    
1106

    
1107
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1108
  """Check that mirrors are not degraded.
1109

1110
  The ldisk parameter, if True, will change the test from the
1111
  is_degraded attribute (which represents overall non-ok status for
1112
  the device(s)) to the ldisk (representing the local storage status).
1113

1114
  """
1115
  cfgw.SetDiskID(dev, node)
1116
  if ldisk:
1117
    idx = 6
1118
  else:
1119
    idx = 5
1120

    
1121
  result = True
1122
  if on_primary or dev.AssembleOnSecondary():
1123
    rstats = rpc.call_blockdev_find(node, dev)
1124
    if not rstats:
1125
      logger.ToStderr("Can't get any data from node %s" % node)
1126
      result = False
1127
    else:
1128
      result = result and (not rstats[idx])
1129
  if dev.children:
1130
    for child in dev.children:
1131
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1132

    
1133
  return result
1134

    
1135

    
1136
class LUDiagnoseOS(NoHooksLU):
1137
  """Logical unit for OS diagnose/query.
1138

1139
  """
1140
  _OP_REQP = []
1141

    
1142
  def CheckPrereq(self):
1143
    """Check prerequisites.
1144

1145
    This always succeeds, since this is a pure query LU.
1146

1147
    """
1148
    return
1149

    
1150
  def Exec(self, feedback_fn):
1151
    """Compute the list of OSes.
1152

1153
    """
1154
    node_list = self.cfg.GetNodeList()
1155
    node_data = rpc.call_os_diagnose(node_list)
1156
    if node_data == False:
1157
      raise errors.OpExecError("Can't gather the list of OSes")
1158
    return node_data
1159

    
1160

    
1161
class LURemoveNode(LogicalUnit):
1162
  """Logical unit for removing a node.
1163

1164
  """
1165
  HPATH = "node-remove"
1166
  HTYPE = constants.HTYPE_NODE
1167
  _OP_REQP = ["node_name"]
1168

    
1169
  def BuildHooksEnv(self):
1170
    """Build hooks env.
1171

1172
    This doesn't run on the target node in the pre phase as a failed
1173
    node would not allows itself to run.
1174

1175
    """
1176
    env = {
1177
      "OP_TARGET": self.op.node_name,
1178
      "NODE_NAME": self.op.node_name,
1179
      }
1180
    all_nodes = self.cfg.GetNodeList()
1181
    all_nodes.remove(self.op.node_name)
1182
    return env, all_nodes, all_nodes
1183

    
1184
  def CheckPrereq(self):
1185
    """Check prerequisites.
1186

1187
    This checks:
1188
     - the node exists in the configuration
1189
     - it does not have primary or secondary instances
1190
     - it's not the master
1191

1192
    Any errors are signalled by raising errors.OpPrereqError.
1193

1194
    """
1195
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1196
    if node is None:
1197
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1198

    
1199
    instance_list = self.cfg.GetInstanceList()
1200

    
1201
    masternode = self.sstore.GetMasterNode()
1202
    if node.name == masternode:
1203
      raise errors.OpPrereqError("Node is the master node,"
1204
                                 " you need to failover first.")
1205

    
1206
    for instance_name in instance_list:
1207
      instance = self.cfg.GetInstanceInfo(instance_name)
1208
      if node.name == instance.primary_node:
1209
        raise errors.OpPrereqError("Instance %s still running on the node,"
1210
                                   " please remove first." % instance_name)
1211
      if node.name in instance.secondary_nodes:
1212
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1213
                                   " please remove first." % instance_name)
1214
    self.op.node_name = node.name
1215
    self.node = node
1216

    
1217
  def Exec(self, feedback_fn):
1218
    """Removes the node from the cluster.
1219

1220
    """
1221
    node = self.node
1222
    logger.Info("stopping the node daemon and removing configs from node %s" %
1223
                node.name)
1224

    
1225
    rpc.call_node_leave_cluster(node.name)
1226

    
1227
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1228

    
1229
    logger.Info("Removing node %s from config" % node.name)
1230

    
1231
    self.cfg.RemoveNode(node.name)
1232

    
1233
    _RemoveHostFromEtcHosts(node.name)
1234

    
1235

    
1236
class LUQueryNodes(NoHooksLU):
1237
  """Logical unit for querying nodes.
1238

1239
  """
1240
  _OP_REQP = ["output_fields", "names"]
1241

    
1242
  def CheckPrereq(self):
1243
    """Check prerequisites.
1244

1245
    This checks that the fields required are valid output fields.
1246

1247
    """
1248
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1249
                                     "mtotal", "mnode", "mfree",
1250
                                     "bootid"])
1251

    
1252
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1253
                               "pinst_list", "sinst_list",
1254
                               "pip", "sip"],
1255
                       dynamic=self.dynamic_fields,
1256
                       selected=self.op.output_fields)
1257

    
1258
    self.wanted = _GetWantedNodes(self, self.op.names)
1259

    
1260
  def Exec(self, feedback_fn):
1261
    """Computes the list of nodes and their attributes.
1262

1263
    """
1264
    nodenames = self.wanted
1265
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1266

    
1267
    # begin data gathering
1268

    
1269
    if self.dynamic_fields.intersection(self.op.output_fields):
1270
      live_data = {}
1271
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1272
      for name in nodenames:
1273
        nodeinfo = node_data.get(name, None)
1274
        if nodeinfo:
1275
          live_data[name] = {
1276
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1277
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1278
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1279
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1280
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1281
            "bootid": nodeinfo['bootid'],
1282
            }
1283
        else:
1284
          live_data[name] = {}
1285
    else:
1286
      live_data = dict.fromkeys(nodenames, {})
1287

    
1288
    node_to_primary = dict([(name, set()) for name in nodenames])
1289
    node_to_secondary = dict([(name, set()) for name in nodenames])
1290

    
1291
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1292
                             "sinst_cnt", "sinst_list"))
1293
    if inst_fields & frozenset(self.op.output_fields):
1294
      instancelist = self.cfg.GetInstanceList()
1295

    
1296
      for instance_name in instancelist:
1297
        inst = self.cfg.GetInstanceInfo(instance_name)
1298
        if inst.primary_node in node_to_primary:
1299
          node_to_primary[inst.primary_node].add(inst.name)
1300
        for secnode in inst.secondary_nodes:
1301
          if secnode in node_to_secondary:
1302
            node_to_secondary[secnode].add(inst.name)
1303

    
1304
    # end data gathering
1305

    
1306
    output = []
1307
    for node in nodelist:
1308
      node_output = []
1309
      for field in self.op.output_fields:
1310
        if field == "name":
1311
          val = node.name
1312
        elif field == "pinst_list":
1313
          val = list(node_to_primary[node.name])
1314
        elif field == "sinst_list":
1315
          val = list(node_to_secondary[node.name])
1316
        elif field == "pinst_cnt":
1317
          val = len(node_to_primary[node.name])
1318
        elif field == "sinst_cnt":
1319
          val = len(node_to_secondary[node.name])
1320
        elif field == "pip":
1321
          val = node.primary_ip
1322
        elif field == "sip":
1323
          val = node.secondary_ip
1324
        elif field in self.dynamic_fields:
1325
          val = live_data[node.name].get(field, None)
1326
        else:
1327
          raise errors.ParameterError(field)
1328
        node_output.append(val)
1329
      output.append(node_output)
1330

    
1331
    return output
1332

    
1333

    
1334
class LUQueryNodeVolumes(NoHooksLU):
1335
  """Logical unit for getting volumes on node(s).
1336

1337
  """
1338
  _OP_REQP = ["nodes", "output_fields"]
1339

    
1340
  def CheckPrereq(self):
1341
    """Check prerequisites.
1342

1343
    This checks that the fields required are valid output fields.
1344

1345
    """
1346
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1347

    
1348
    _CheckOutputFields(static=["node"],
1349
                       dynamic=["phys", "vg", "name", "size", "instance"],
1350
                       selected=self.op.output_fields)
1351

    
1352

    
1353
  def Exec(self, feedback_fn):
1354
    """Computes the list of nodes and their attributes.
1355

1356
    """
1357
    nodenames = self.nodes
1358
    volumes = rpc.call_node_volumes(nodenames)
1359

    
1360
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1361
             in self.cfg.GetInstanceList()]
1362

    
1363
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1364

    
1365
    output = []
1366
    for node in nodenames:
1367
      if node not in volumes or not volumes[node]:
1368
        continue
1369

    
1370
      node_vols = volumes[node][:]
1371
      node_vols.sort(key=lambda vol: vol['dev'])
1372

    
1373
      for vol in node_vols:
1374
        node_output = []
1375
        for field in self.op.output_fields:
1376
          if field == "node":
1377
            val = node
1378
          elif field == "phys":
1379
            val = vol['dev']
1380
          elif field == "vg":
1381
            val = vol['vg']
1382
          elif field == "name":
1383
            val = vol['name']
1384
          elif field == "size":
1385
            val = int(float(vol['size']))
1386
          elif field == "instance":
1387
            for inst in ilist:
1388
              if node not in lv_by_node[inst]:
1389
                continue
1390
              if vol['name'] in lv_by_node[inst][node]:
1391
                val = inst.name
1392
                break
1393
            else:
1394
              val = '-'
1395
          else:
1396
            raise errors.ParameterError(field)
1397
          node_output.append(str(val))
1398

    
1399
        output.append(node_output)
1400

    
1401
    return output
1402

    
1403

    
1404
class LUAddNode(LogicalUnit):
1405
  """Logical unit for adding node to the cluster.
1406

1407
  """
1408
  HPATH = "node-add"
1409
  HTYPE = constants.HTYPE_NODE
1410
  _OP_REQP = ["node_name"]
1411

    
1412
  def BuildHooksEnv(self):
1413
    """Build hooks env.
1414

1415
    This will run on all nodes before, and on all nodes + the new node after.
1416

1417
    """
1418
    env = {
1419
      "OP_TARGET": self.op.node_name,
1420
      "NODE_NAME": self.op.node_name,
1421
      "NODE_PIP": self.op.primary_ip,
1422
      "NODE_SIP": self.op.secondary_ip,
1423
      }
1424
    nodes_0 = self.cfg.GetNodeList()
1425
    nodes_1 = nodes_0 + [self.op.node_name, ]
1426
    return env, nodes_0, nodes_1
1427

    
1428
  def CheckPrereq(self):
1429
    """Check prerequisites.
1430

1431
    This checks:
1432
     - the new node is not already in the config
1433
     - it is resolvable
1434
     - its parameters (single/dual homed) matches the cluster
1435

1436
    Any errors are signalled by raising errors.OpPrereqError.
1437

1438
    """
1439
    node_name = self.op.node_name
1440
    cfg = self.cfg
1441

    
1442
    dns_data = utils.HostInfo(node_name)
1443

    
1444
    node = dns_data.name
1445
    primary_ip = self.op.primary_ip = dns_data.ip
1446
    secondary_ip = getattr(self.op, "secondary_ip", None)
1447
    if secondary_ip is None:
1448
      secondary_ip = primary_ip
1449
    if not utils.IsValidIP(secondary_ip):
1450
      raise errors.OpPrereqError("Invalid secondary IP given")
1451
    self.op.secondary_ip = secondary_ip
1452
    node_list = cfg.GetNodeList()
1453
    if node in node_list:
1454
      raise errors.OpPrereqError("Node %s is already in the configuration"
1455
                                 % node)
1456

    
1457
    for existing_node_name in node_list:
1458
      existing_node = cfg.GetNodeInfo(existing_node_name)
1459
      if (existing_node.primary_ip == primary_ip or
1460
          existing_node.secondary_ip == primary_ip or
1461
          existing_node.primary_ip == secondary_ip or
1462
          existing_node.secondary_ip == secondary_ip):
1463
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1464
                                   " existing node %s" % existing_node.name)
1465

    
1466
    # check that the type of the node (single versus dual homed) is the
1467
    # same as for the master
1468
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1469
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1470
    newbie_singlehomed = secondary_ip == primary_ip
1471
    if master_singlehomed != newbie_singlehomed:
1472
      if master_singlehomed:
1473
        raise errors.OpPrereqError("The master has no private ip but the"
1474
                                   " new node has one")
1475
      else:
1476
        raise errors.OpPrereqError("The master has a private ip but the"
1477
                                   " new node doesn't have one")
1478

    
1479
    # checks reachablity
1480
    if not utils.TcpPing(utils.HostInfo().name,
1481
                         primary_ip,
1482
                         constants.DEFAULT_NODED_PORT):
1483
      raise errors.OpPrereqError("Node not reachable by ping")
1484

    
1485
    if not newbie_singlehomed:
1486
      # check reachability from my secondary ip to newbie's secondary ip
1487
      if not utils.TcpPing(myself.secondary_ip,
1488
                           secondary_ip,
1489
                           constants.DEFAULT_NODED_PORT):
1490
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1491
                                   " based ping to noded port")
1492

    
1493
    self.new_node = objects.Node(name=node,
1494
                                 primary_ip=primary_ip,
1495
                                 secondary_ip=secondary_ip)
1496

    
1497
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1498
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1499
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1500
                                   constants.VNC_PASSWORD_FILE)
1501

    
1502
  def Exec(self, feedback_fn):
1503
    """Adds the new node to the cluster.
1504

1505
    """
1506
    new_node = self.new_node
1507
    node = new_node.name
1508

    
1509
    # set up inter-node password and certificate and restarts the node daemon
1510
    gntpass = self.sstore.GetNodeDaemonPassword()
1511
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1512
      raise errors.OpExecError("ganeti password corruption detected")
1513
    f = open(constants.SSL_CERT_FILE)
1514
    try:
1515
      gntpem = f.read(8192)
1516
    finally:
1517
      f.close()
1518
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1519
    # so we use this to detect an invalid certificate; as long as the
1520
    # cert doesn't contain this, the here-document will be correctly
1521
    # parsed by the shell sequence below
1522
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1523
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1524
    if not gntpem.endswith("\n"):
1525
      raise errors.OpExecError("PEM must end with newline")
1526
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1527

    
1528
    # and then connect with ssh to set password and start ganeti-noded
1529
    # note that all the below variables are sanitized at this point,
1530
    # either by being constants or by the checks above
1531
    ss = self.sstore
1532
    mycommand = ("umask 077 && "
1533
                 "echo '%s' > '%s' && "
1534
                 "cat > '%s' << '!EOF.' && \n"
1535
                 "%s!EOF.\n%s restart" %
1536
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1537
                  constants.SSL_CERT_FILE, gntpem,
1538
                  constants.NODE_INITD_SCRIPT))
1539

    
1540
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1541
    if result.failed:
1542
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1543
                               " output: %s" %
1544
                               (node, result.fail_reason, result.output))
1545

    
1546
    # check connectivity
1547
    time.sleep(4)
1548

    
1549
    result = rpc.call_version([node])[node]
1550
    if result:
1551
      if constants.PROTOCOL_VERSION == result:
1552
        logger.Info("communication to node %s fine, sw version %s match" %
1553
                    (node, result))
1554
      else:
1555
        raise errors.OpExecError("Version mismatch master version %s,"
1556
                                 " node version %s" %
1557
                                 (constants.PROTOCOL_VERSION, result))
1558
    else:
1559
      raise errors.OpExecError("Cannot get version from the new node")
1560

    
1561
    # setup ssh on node
1562
    logger.Info("copy ssh key to node %s" % node)
1563
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1564
    keyarray = []
1565
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1566
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1567
                priv_key, pub_key]
1568

    
1569
    for i in keyfiles:
1570
      f = open(i, 'r')
1571
      try:
1572
        keyarray.append(f.read())
1573
      finally:
1574
        f.close()
1575

    
1576
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1577
                               keyarray[3], keyarray[4], keyarray[5])
1578

    
1579
    if not result:
1580
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1581

    
1582
    # Add node to our /etc/hosts, and add key to known_hosts
1583
    _AddHostToEtcHosts(new_node.name)
1584

    
1585
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1586
                      self.cfg.GetHostKey())
1587

    
1588
    if new_node.secondary_ip != new_node.primary_ip:
1589
      if not rpc.call_node_tcp_ping(new_node.name,
1590
                                    constants.LOCALHOST_IP_ADDRESS,
1591
                                    new_node.secondary_ip,
1592
                                    constants.DEFAULT_NODED_PORT,
1593
                                    10, False):
1594
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1595
                                 " you gave (%s). Please fix and re-run this"
1596
                                 " command." % new_node.secondary_ip)
1597

    
1598
    success, msg = ssh.VerifyNodeHostname(node)
1599
    if not success:
1600
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1601
                               " than the one the resolver gives: %s."
1602
                               " Please fix and re-run this command." %
1603
                               (node, msg))
1604

    
1605
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1606
    # including the node just added
1607
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1608
    dist_nodes = self.cfg.GetNodeList() + [node]
1609
    if myself.name in dist_nodes:
1610
      dist_nodes.remove(myself.name)
1611

    
1612
    logger.Debug("Copying hosts and known_hosts to all nodes")
1613
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1614
      result = rpc.call_upload_file(dist_nodes, fname)
1615
      for to_node in dist_nodes:
1616
        if not result[to_node]:
1617
          logger.Error("copy of file %s to node %s failed" %
1618
                       (fname, to_node))
1619

    
1620
    to_copy = ss.GetFileList()
1621
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1622
      to_copy.append(constants.VNC_PASSWORD_FILE)
1623
    for fname in to_copy:
1624
      if not ssh.CopyFileToNode(node, fname):
1625
        logger.Error("could not copy file %s to node %s" % (fname, node))
1626

    
1627
    logger.Info("adding node %s to cluster.conf" % node)
1628
    self.cfg.AddNode(new_node)
1629

    
1630

    
1631
class LUMasterFailover(LogicalUnit):
1632
  """Failover the master node to the current node.
1633

1634
  This is a special LU in that it must run on a non-master node.
1635

1636
  """
1637
  HPATH = "master-failover"
1638
  HTYPE = constants.HTYPE_CLUSTER
1639
  REQ_MASTER = False
1640
  _OP_REQP = []
1641

    
1642
  def BuildHooksEnv(self):
1643
    """Build hooks env.
1644

1645
    This will run on the new master only in the pre phase, and on all
1646
    the nodes in the post phase.
1647

1648
    """
1649
    env = {
1650
      "OP_TARGET": self.new_master,
1651
      "NEW_MASTER": self.new_master,
1652
      "OLD_MASTER": self.old_master,
1653
      }
1654
    return env, [self.new_master], self.cfg.GetNodeList()
1655

    
1656
  def CheckPrereq(self):
1657
    """Check prerequisites.
1658

1659
    This checks that we are not already the master.
1660

1661
    """
1662
    self.new_master = utils.HostInfo().name
1663
    self.old_master = self.sstore.GetMasterNode()
1664

    
1665
    if self.old_master == self.new_master:
1666
      raise errors.OpPrereqError("This commands must be run on the node"
1667
                                 " where you want the new master to be."
1668
                                 " %s is already the master" %
1669
                                 self.old_master)
1670

    
1671
  def Exec(self, feedback_fn):
1672
    """Failover the master node.
1673

1674
    This command, when run on a non-master node, will cause the current
1675
    master to cease being master, and the non-master to become new
1676
    master.
1677

1678
    """
1679
    #TODO: do not rely on gethostname returning the FQDN
1680
    logger.Info("setting master to %s, old master: %s" %
1681
                (self.new_master, self.old_master))
1682

    
1683
    if not rpc.call_node_stop_master(self.old_master):
1684
      logger.Error("could disable the master role on the old master"
1685
                   " %s, please disable manually" % self.old_master)
1686

    
1687
    ss = self.sstore
1688
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1689
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1690
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1691
      logger.Error("could not distribute the new simple store master file"
1692
                   " to the other nodes, please check.")
1693

    
1694
    if not rpc.call_node_start_master(self.new_master):
1695
      logger.Error("could not start the master role on the new master"
1696
                   " %s, please check" % self.new_master)
1697
      feedback_fn("Error in activating the master IP on the new master,"
1698
                  " please fix manually.")
1699

    
1700

    
1701

    
1702
class LUQueryClusterInfo(NoHooksLU):
1703
  """Query cluster configuration.
1704

1705
  """
1706
  _OP_REQP = []
1707
  REQ_MASTER = False
1708

    
1709
  def CheckPrereq(self):
1710
    """No prerequsites needed for this LU.
1711

1712
    """
1713
    pass
1714

    
1715
  def Exec(self, feedback_fn):
1716
    """Return cluster config.
1717

1718
    """
1719
    result = {
1720
      "name": self.sstore.GetClusterName(),
1721
      "software_version": constants.RELEASE_VERSION,
1722
      "protocol_version": constants.PROTOCOL_VERSION,
1723
      "config_version": constants.CONFIG_VERSION,
1724
      "os_api_version": constants.OS_API_VERSION,
1725
      "export_version": constants.EXPORT_VERSION,
1726
      "master": self.sstore.GetMasterNode(),
1727
      "architecture": (platform.architecture()[0], platform.machine()),
1728
      }
1729

    
1730
    return result
1731

    
1732

    
1733
class LUClusterCopyFile(NoHooksLU):
1734
  """Copy file to cluster.
1735

1736
  """
1737
  _OP_REQP = ["nodes", "filename"]
1738

    
1739
  def CheckPrereq(self):
1740
    """Check prerequisites.
1741

1742
    It should check that the named file exists and that the given list
1743
    of nodes is valid.
1744

1745
    """
1746
    if not os.path.exists(self.op.filename):
1747
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1748

    
1749
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1750

    
1751
  def Exec(self, feedback_fn):
1752
    """Copy a file from master to some nodes.
1753

1754
    Args:
1755
      opts - class with options as members
1756
      args - list containing a single element, the file name
1757
    Opts used:
1758
      nodes - list containing the name of target nodes; if empty, all nodes
1759

1760
    """
1761
    filename = self.op.filename
1762

    
1763
    myname = utils.HostInfo().name
1764

    
1765
    for node in self.nodes:
1766
      if node == myname:
1767
        continue
1768
      if not ssh.CopyFileToNode(node, filename):
1769
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1770

    
1771

    
1772
class LUDumpClusterConfig(NoHooksLU):
1773
  """Return a text-representation of the cluster-config.
1774

1775
  """
1776
  _OP_REQP = []
1777

    
1778
  def CheckPrereq(self):
1779
    """No prerequisites.
1780

1781
    """
1782
    pass
1783

    
1784
  def Exec(self, feedback_fn):
1785
    """Dump a representation of the cluster config to the standard output.
1786

1787
    """
1788
    return self.cfg.DumpConfig()
1789

    
1790

    
1791
class LURunClusterCommand(NoHooksLU):
1792
  """Run a command on some nodes.
1793

1794
  """
1795
  _OP_REQP = ["command", "nodes"]
1796

    
1797
  def CheckPrereq(self):
1798
    """Check prerequisites.
1799

1800
    It checks that the given list of nodes is valid.
1801

1802
    """
1803
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1804

    
1805
  def Exec(self, feedback_fn):
1806
    """Run a command on some nodes.
1807

1808
    """
1809
    data = []
1810
    for node in self.nodes:
1811
      result = ssh.SSHCall(node, "root", self.op.command)
1812
      data.append((node, result.output, result.exit_code))
1813

    
1814
    return data
1815

    
1816

    
1817
class LUActivateInstanceDisks(NoHooksLU):
1818
  """Bring up an instance's disks.
1819

1820
  """
1821
  _OP_REQP = ["instance_name"]
1822

    
1823
  def CheckPrereq(self):
1824
    """Check prerequisites.
1825

1826
    This checks that the instance is in the cluster.
1827

1828
    """
1829
    instance = self.cfg.GetInstanceInfo(
1830
      self.cfg.ExpandInstanceName(self.op.instance_name))
1831
    if instance is None:
1832
      raise errors.OpPrereqError("Instance '%s' not known" %
1833
                                 self.op.instance_name)
1834
    self.instance = instance
1835

    
1836

    
1837
  def Exec(self, feedback_fn):
1838
    """Activate the disks.
1839

1840
    """
1841
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1842
    if not disks_ok:
1843
      raise errors.OpExecError("Cannot activate block devices")
1844

    
1845
    return disks_info
1846

    
1847

    
1848
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1849
  """Prepare the block devices for an instance.
1850

1851
  This sets up the block devices on all nodes.
1852

1853
  Args:
1854
    instance: a ganeti.objects.Instance object
1855
    ignore_secondaries: if true, errors on secondary nodes won't result
1856
                        in an error return from the function
1857

1858
  Returns:
1859
    false if the operation failed
1860
    list of (host, instance_visible_name, node_visible_name) if the operation
1861
         suceeded with the mapping from node devices to instance devices
1862
  """
1863
  device_info = []
1864
  disks_ok = True
1865
  iname = instance.name
1866
  # With the two passes mechanism we try to reduce the window of
1867
  # opportunity for the race condition of switching DRBD to primary
1868
  # before handshaking occured, but we do not eliminate it
1869

    
1870
  # The proper fix would be to wait (with some limits) until the
1871
  # connection has been made and drbd transitions from WFConnection
1872
  # into any other network-connected state (Connected, SyncTarget,
1873
  # SyncSource, etc.)
1874

    
1875
  # 1st pass, assemble on all nodes in secondary mode
1876
  for inst_disk in instance.disks:
1877
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1878
      cfg.SetDiskID(node_disk, node)
1879
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1880
      if not result:
1881
        logger.Error("could not prepare block device %s on node %s"
1882
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1883
        if not ignore_secondaries:
1884
          disks_ok = False
1885

    
1886
  # FIXME: race condition on drbd migration to primary
1887

    
1888
  # 2nd pass, do only the primary node
1889
  for inst_disk in instance.disks:
1890
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1891
      if node != instance.primary_node:
1892
        continue
1893
      cfg.SetDiskID(node_disk, node)
1894
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1895
      if not result:
1896
        logger.Error("could not prepare block device %s on node %s"
1897
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1898
        disks_ok = False
1899
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1900

    
1901
  # leave the disks configured for the primary node
1902
  # this is a workaround that would be fixed better by
1903
  # improving the logical/physical id handling
1904
  for disk in instance.disks:
1905
    cfg.SetDiskID(disk, instance.primary_node)
1906

    
1907
  return disks_ok, device_info
1908

    
1909

    
1910
def _StartInstanceDisks(cfg, instance, force):
1911
  """Start the disks of an instance.
1912

1913
  """
1914
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1915
                                           ignore_secondaries=force)
1916
  if not disks_ok:
1917
    _ShutdownInstanceDisks(instance, cfg)
1918
    if force is not None and not force:
1919
      logger.Error("If the message above refers to a secondary node,"
1920
                   " you can retry the operation using '--force'.")
1921
    raise errors.OpExecError("Disk consistency error")
1922

    
1923

    
1924
class LUDeactivateInstanceDisks(NoHooksLU):
1925
  """Shutdown an instance's disks.
1926

1927
  """
1928
  _OP_REQP = ["instance_name"]
1929

    
1930
  def CheckPrereq(self):
1931
    """Check prerequisites.
1932

1933
    This checks that the instance is in the cluster.
1934

1935
    """
1936
    instance = self.cfg.GetInstanceInfo(
1937
      self.cfg.ExpandInstanceName(self.op.instance_name))
1938
    if instance is None:
1939
      raise errors.OpPrereqError("Instance '%s' not known" %
1940
                                 self.op.instance_name)
1941
    self.instance = instance
1942

    
1943
  def Exec(self, feedback_fn):
1944
    """Deactivate the disks
1945

1946
    """
1947
    instance = self.instance
1948
    ins_l = rpc.call_instance_list([instance.primary_node])
1949
    ins_l = ins_l[instance.primary_node]
1950
    if not type(ins_l) is list:
1951
      raise errors.OpExecError("Can't contact node '%s'" %
1952
                               instance.primary_node)
1953

    
1954
    if self.instance.name in ins_l:
1955
      raise errors.OpExecError("Instance is running, can't shutdown"
1956
                               " block devices.")
1957

    
1958
    _ShutdownInstanceDisks(instance, self.cfg)
1959

    
1960

    
1961
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1962
  """Shutdown block devices of an instance.
1963

1964
  This does the shutdown on all nodes of the instance.
1965

1966
  If the ignore_primary is false, errors on the primary node are
1967
  ignored.
1968

1969
  """
1970
  result = True
1971
  for disk in instance.disks:
1972
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1973
      cfg.SetDiskID(top_disk, node)
1974
      if not rpc.call_blockdev_shutdown(node, top_disk):
1975
        logger.Error("could not shutdown block device %s on node %s" %
1976
                     (disk.iv_name, node))
1977
        if not ignore_primary or node != instance.primary_node:
1978
          result = False
1979
  return result
1980

    
1981

    
1982
def _CheckNodeFreeMemory(cfg, node, reason, requested):
1983
  """Checks if a node has enough free memory.
1984

1985
  This function check if a given node has the needed amount of free
1986
  memory. In case the node has less memory or we cannot get the
1987
  information from the node, this function raise an OpPrereqError
1988
  exception.
1989

1990
  Args:
1991
    - cfg: a ConfigWriter instance
1992
    - node: the node name
1993
    - reason: string to use in the error message
1994
    - requested: the amount of memory in MiB
1995

1996
  """
1997
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1998
  if not nodeinfo or not isinstance(nodeinfo, dict):
1999
    raise errors.OpPrereqError("Could not contact node %s for resource"
2000
                             " information" % (node,))
2001

    
2002
  free_mem = nodeinfo[node].get('memory_free')
2003
  if not isinstance(free_mem, int):
2004
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2005
                             " was '%s'" % (node, free_mem))
2006
  if requested > free_mem:
2007
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2008
                             " needed %s MiB, available %s MiB" %
2009
                             (node, reason, requested, free_mem))
2010

    
2011

    
2012
class LUStartupInstance(LogicalUnit):
2013
  """Starts an instance.
2014

2015
  """
2016
  HPATH = "instance-start"
2017
  HTYPE = constants.HTYPE_INSTANCE
2018
  _OP_REQP = ["instance_name", "force"]
2019

    
2020
  def BuildHooksEnv(self):
2021
    """Build hooks env.
2022

2023
    This runs on master, primary and secondary nodes of the instance.
2024

2025
    """
2026
    env = {
2027
      "FORCE": self.op.force,
2028
      }
2029
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2030
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2031
          list(self.instance.secondary_nodes))
2032
    return env, nl, nl
2033

    
2034
  def CheckPrereq(self):
2035
    """Check prerequisites.
2036

2037
    This checks that the instance is in the cluster.
2038

2039
    """
2040
    instance = self.cfg.GetInstanceInfo(
2041
      self.cfg.ExpandInstanceName(self.op.instance_name))
2042
    if instance is None:
2043
      raise errors.OpPrereqError("Instance '%s' not known" %
2044
                                 self.op.instance_name)
2045

    
2046
    # check bridges existance
2047
    _CheckInstanceBridgesExist(instance)
2048

    
2049
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2050
                         "starting instance %s" % instance.name,
2051
                         instance.memory)
2052

    
2053
    self.instance = instance
2054
    self.op.instance_name = instance.name
2055

    
2056
  def Exec(self, feedback_fn):
2057
    """Start the instance.
2058

2059
    """
2060
    instance = self.instance
2061
    force = self.op.force
2062
    extra_args = getattr(self.op, "extra_args", "")
2063

    
2064
    node_current = instance.primary_node
2065

    
2066
    _StartInstanceDisks(self.cfg, instance, force)
2067

    
2068
    if not rpc.call_instance_start(node_current, instance, extra_args):
2069
      _ShutdownInstanceDisks(instance, self.cfg)
2070
      raise errors.OpExecError("Could not start instance")
2071

    
2072
    self.cfg.MarkInstanceUp(instance.name)
2073

    
2074

    
2075
class LURebootInstance(LogicalUnit):
2076
  """Reboot an instance.
2077

2078
  """
2079
  HPATH = "instance-reboot"
2080
  HTYPE = constants.HTYPE_INSTANCE
2081
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2082

    
2083
  def BuildHooksEnv(self):
2084
    """Build hooks env.
2085

2086
    This runs on master, primary and secondary nodes of the instance.
2087

2088
    """
2089
    env = {
2090
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2091
      }
2092
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2093
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2094
          list(self.instance.secondary_nodes))
2095
    return env, nl, nl
2096

    
2097
  def CheckPrereq(self):
2098
    """Check prerequisites.
2099

2100
    This checks that the instance is in the cluster.
2101

2102
    """
2103
    instance = self.cfg.GetInstanceInfo(
2104
      self.cfg.ExpandInstanceName(self.op.instance_name))
2105
    if instance is None:
2106
      raise errors.OpPrereqError("Instance '%s' not known" %
2107
                                 self.op.instance_name)
2108

    
2109
    # check bridges existance
2110
    _CheckInstanceBridgesExist(instance)
2111

    
2112
    self.instance = instance
2113
    self.op.instance_name = instance.name
2114

    
2115
  def Exec(self, feedback_fn):
2116
    """Reboot the instance.
2117

2118
    """
2119
    instance = self.instance
2120
    ignore_secondaries = self.op.ignore_secondaries
2121
    reboot_type = self.op.reboot_type
2122
    extra_args = getattr(self.op, "extra_args", "")
2123

    
2124
    node_current = instance.primary_node
2125

    
2126
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2127
                           constants.INSTANCE_REBOOT_HARD,
2128
                           constants.INSTANCE_REBOOT_FULL]:
2129
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2130
                                  (constants.INSTANCE_REBOOT_SOFT,
2131
                                   constants.INSTANCE_REBOOT_HARD,
2132
                                   constants.INSTANCE_REBOOT_FULL))
2133

    
2134
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2135
                       constants.INSTANCE_REBOOT_HARD]:
2136
      if not rpc.call_instance_reboot(node_current, instance,
2137
                                      reboot_type, extra_args):
2138
        raise errors.OpExecError("Could not reboot instance")
2139
    else:
2140
      if not rpc.call_instance_shutdown(node_current, instance):
2141
        raise errors.OpExecError("could not shutdown instance for full reboot")
2142
      _ShutdownInstanceDisks(instance, self.cfg)
2143
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2144
      if not rpc.call_instance_start(node_current, instance, extra_args):
2145
        _ShutdownInstanceDisks(instance, self.cfg)
2146
        raise errors.OpExecError("Could not start instance for full reboot")
2147

    
2148
    self.cfg.MarkInstanceUp(instance.name)
2149

    
2150

    
2151
class LUShutdownInstance(LogicalUnit):
2152
  """Shutdown an instance.
2153

2154
  """
2155
  HPATH = "instance-stop"
2156
  HTYPE = constants.HTYPE_INSTANCE
2157
  _OP_REQP = ["instance_name"]
2158

    
2159
  def BuildHooksEnv(self):
2160
    """Build hooks env.
2161

2162
    This runs on master, primary and secondary nodes of the instance.
2163

2164
    """
2165
    env = _BuildInstanceHookEnvByObject(self.instance)
2166
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2167
          list(self.instance.secondary_nodes))
2168
    return env, nl, nl
2169

    
2170
  def CheckPrereq(self):
2171
    """Check prerequisites.
2172

2173
    This checks that the instance is in the cluster.
2174

2175
    """
2176
    instance = self.cfg.GetInstanceInfo(
2177
      self.cfg.ExpandInstanceName(self.op.instance_name))
2178
    if instance is None:
2179
      raise errors.OpPrereqError("Instance '%s' not known" %
2180
                                 self.op.instance_name)
2181
    self.instance = instance
2182

    
2183
  def Exec(self, feedback_fn):
2184
    """Shutdown the instance.
2185

2186
    """
2187
    instance = self.instance
2188
    node_current = instance.primary_node
2189
    if not rpc.call_instance_shutdown(node_current, instance):
2190
      logger.Error("could not shutdown instance")
2191

    
2192
    self.cfg.MarkInstanceDown(instance.name)
2193
    _ShutdownInstanceDisks(instance, self.cfg)
2194

    
2195

    
2196
class LUReinstallInstance(LogicalUnit):
2197
  """Reinstall an instance.
2198

2199
  """
2200
  HPATH = "instance-reinstall"
2201
  HTYPE = constants.HTYPE_INSTANCE
2202
  _OP_REQP = ["instance_name"]
2203

    
2204
  def BuildHooksEnv(self):
2205
    """Build hooks env.
2206

2207
    This runs on master, primary and secondary nodes of the instance.
2208

2209
    """
2210
    env = _BuildInstanceHookEnvByObject(self.instance)
2211
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2212
          list(self.instance.secondary_nodes))
2213
    return env, nl, nl
2214

    
2215
  def CheckPrereq(self):
2216
    """Check prerequisites.
2217

2218
    This checks that the instance is in the cluster and is not running.
2219

2220
    """
2221
    instance = self.cfg.GetInstanceInfo(
2222
      self.cfg.ExpandInstanceName(self.op.instance_name))
2223
    if instance is None:
2224
      raise errors.OpPrereqError("Instance '%s' not known" %
2225
                                 self.op.instance_name)
2226
    if instance.disk_template == constants.DT_DISKLESS:
2227
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2228
                                 self.op.instance_name)
2229
    if instance.status != "down":
2230
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2231
                                 self.op.instance_name)
2232
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2233
    if remote_info:
2234
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2235
                                 (self.op.instance_name,
2236
                                  instance.primary_node))
2237

    
2238
    self.op.os_type = getattr(self.op, "os_type", None)
2239
    if self.op.os_type is not None:
2240
      # OS verification
2241
      pnode = self.cfg.GetNodeInfo(
2242
        self.cfg.ExpandNodeName(instance.primary_node))
2243
      if pnode is None:
2244
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2245
                                   self.op.pnode)
2246
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2247
      if not os_obj:
2248
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2249
                                   " primary node"  % self.op.os_type)
2250

    
2251
    self.instance = instance
2252

    
2253
  def Exec(self, feedback_fn):
2254
    """Reinstall the instance.
2255

2256
    """
2257
    inst = self.instance
2258

    
2259
    if self.op.os_type is not None:
2260
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2261
      inst.os = self.op.os_type
2262
      self.cfg.AddInstance(inst)
2263

    
2264
    _StartInstanceDisks(self.cfg, inst, None)
2265
    try:
2266
      feedback_fn("Running the instance OS create scripts...")
2267
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2268
        raise errors.OpExecError("Could not install OS for instance %s"
2269
                                 " on node %s" %
2270
                                 (inst.name, inst.primary_node))
2271
    finally:
2272
      _ShutdownInstanceDisks(inst, self.cfg)
2273

    
2274

    
2275
class LURenameInstance(LogicalUnit):
2276
  """Rename an instance.
2277

2278
  """
2279
  HPATH = "instance-rename"
2280
  HTYPE = constants.HTYPE_INSTANCE
2281
  _OP_REQP = ["instance_name", "new_name"]
2282

    
2283
  def BuildHooksEnv(self):
2284
    """Build hooks env.
2285

2286
    This runs on master, primary and secondary nodes of the instance.
2287

2288
    """
2289
    env = _BuildInstanceHookEnvByObject(self.instance)
2290
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2291
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2292
          list(self.instance.secondary_nodes))
2293
    return env, nl, nl
2294

    
2295
  def CheckPrereq(self):
2296
    """Check prerequisites.
2297

2298
    This checks that the instance is in the cluster and is not running.
2299

2300
    """
2301
    instance = self.cfg.GetInstanceInfo(
2302
      self.cfg.ExpandInstanceName(self.op.instance_name))
2303
    if instance is None:
2304
      raise errors.OpPrereqError("Instance '%s' not known" %
2305
                                 self.op.instance_name)
2306
    if instance.status != "down":
2307
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2308
                                 self.op.instance_name)
2309
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2310
    if remote_info:
2311
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2312
                                 (self.op.instance_name,
2313
                                  instance.primary_node))
2314
    self.instance = instance
2315

    
2316
    # new name verification
2317
    name_info = utils.HostInfo(self.op.new_name)
2318

    
2319
    self.op.new_name = new_name = name_info.name
2320
    instance_list = self.cfg.GetInstanceList()
2321
    if new_name in instance_list:
2322
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2323
                                 instance_name)
2324

    
2325
    if not getattr(self.op, "ignore_ip", False):
2326
      command = ["fping", "-q", name_info.ip]
2327
      result = utils.RunCmd(command)
2328
      if not result.failed:
2329
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2330
                                   (name_info.ip, new_name))
2331

    
2332

    
2333
  def Exec(self, feedback_fn):
2334
    """Reinstall the instance.
2335

2336
    """
2337
    inst = self.instance
2338
    old_name = inst.name
2339

    
2340
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2341

    
2342
    # re-read the instance from the configuration after rename
2343
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2344

    
2345
    _StartInstanceDisks(self.cfg, inst, None)
2346
    try:
2347
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2348
                                          "sda", "sdb"):
2349
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2350
               " instance has been renamed in Ganeti)" %
2351
               (inst.name, inst.primary_node))
2352
        logger.Error(msg)
2353
    finally:
2354
      _ShutdownInstanceDisks(inst, self.cfg)
2355

    
2356

    
2357
class LURemoveInstance(LogicalUnit):
2358
  """Remove an instance.
2359

2360
  """
2361
  HPATH = "instance-remove"
2362
  HTYPE = constants.HTYPE_INSTANCE
2363
  _OP_REQP = ["instance_name"]
2364

    
2365
  def BuildHooksEnv(self):
2366
    """Build hooks env.
2367

2368
    This runs on master, primary and secondary nodes of the instance.
2369

2370
    """
2371
    env = _BuildInstanceHookEnvByObject(self.instance)
2372
    nl = [self.sstore.GetMasterNode()]
2373
    return env, nl, nl
2374

    
2375
  def CheckPrereq(self):
2376
    """Check prerequisites.
2377

2378
    This checks that the instance is in the cluster.
2379

2380
    """
2381
    instance = self.cfg.GetInstanceInfo(
2382
      self.cfg.ExpandInstanceName(self.op.instance_name))
2383
    if instance is None:
2384
      raise errors.OpPrereqError("Instance '%s' not known" %
2385
                                 self.op.instance_name)
2386
    self.instance = instance
2387

    
2388
  def Exec(self, feedback_fn):
2389
    """Remove the instance.
2390

2391
    """
2392
    instance = self.instance
2393
    logger.Info("shutting down instance %s on node %s" %
2394
                (instance.name, instance.primary_node))
2395

    
2396
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2397
      if self.op.ignore_failures:
2398
        feedback_fn("Warning: can't shutdown instance")
2399
      else:
2400
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2401
                                 (instance.name, instance.primary_node))
2402

    
2403
    logger.Info("removing block devices for instance %s" % instance.name)
2404

    
2405
    if not _RemoveDisks(instance, self.cfg):
2406
      if self.op.ignore_failures:
2407
        feedback_fn("Warning: can't remove instance's disks")
2408
      else:
2409
        raise errors.OpExecError("Can't remove instance's disks")
2410

    
2411
    logger.Info("removing instance %s out of cluster config" % instance.name)
2412

    
2413
    self.cfg.RemoveInstance(instance.name)
2414

    
2415

    
2416
class LUQueryInstances(NoHooksLU):
2417
  """Logical unit for querying instances.
2418

2419
  """
2420
  _OP_REQP = ["output_fields", "names"]
2421

    
2422
  def CheckPrereq(self):
2423
    """Check prerequisites.
2424

2425
    This checks that the fields required are valid output fields.
2426

2427
    """
2428
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2429
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2430
                               "admin_state", "admin_ram",
2431
                               "disk_template", "ip", "mac", "bridge",
2432
                               "sda_size", "sdb_size", "vcpus"],
2433
                       dynamic=self.dynamic_fields,
2434
                       selected=self.op.output_fields)
2435

    
2436
    self.wanted = _GetWantedInstances(self, self.op.names)
2437

    
2438
  def Exec(self, feedback_fn):
2439
    """Computes the list of nodes and their attributes.
2440

2441
    """
2442
    instance_names = self.wanted
2443
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2444
                     in instance_names]
2445

    
2446
    # begin data gathering
2447

    
2448
    nodes = frozenset([inst.primary_node for inst in instance_list])
2449

    
2450
    bad_nodes = []
2451
    if self.dynamic_fields.intersection(self.op.output_fields):
2452
      live_data = {}
2453
      node_data = rpc.call_all_instances_info(nodes)
2454
      for name in nodes:
2455
        result = node_data[name]
2456
        if result:
2457
          live_data.update(result)
2458
        elif result == False:
2459
          bad_nodes.append(name)
2460
        # else no instance is alive
2461
    else:
2462
      live_data = dict([(name, {}) for name in instance_names])
2463

    
2464
    # end data gathering
2465

    
2466
    output = []
2467
    for instance in instance_list:
2468
      iout = []
2469
      for field in self.op.output_fields:
2470
        if field == "name":
2471
          val = instance.name
2472
        elif field == "os":
2473
          val = instance.os
2474
        elif field == "pnode":
2475
          val = instance.primary_node
2476
        elif field == "snodes":
2477
          val = list(instance.secondary_nodes)
2478
        elif field == "admin_state":
2479
          val = (instance.status != "down")
2480
        elif field == "oper_state":
2481
          if instance.primary_node in bad_nodes:
2482
            val = None
2483
          else:
2484
            val = bool(live_data.get(instance.name))
2485
        elif field == "status":
2486
          if instance.primary_node in bad_nodes:
2487
            val = "ERROR_nodedown"
2488
          else:
2489
            running = bool(live_data.get(instance.name))
2490
            if running:
2491
              if instance.status != "down":
2492
                val = "running"
2493
              else:
2494
                val = "ERROR_up"
2495
            else:
2496
              if instance.status != "down":
2497
                val = "ERROR_down"
2498
              else:
2499
                val = "ADMIN_down"
2500
        elif field == "admin_ram":
2501
          val = instance.memory
2502
        elif field == "oper_ram":
2503
          if instance.primary_node in bad_nodes:
2504
            val = None
2505
          elif instance.name in live_data:
2506
            val = live_data[instance.name].get("memory", "?")
2507
          else:
2508
            val = "-"
2509
        elif field == "disk_template":
2510
          val = instance.disk_template
2511
        elif field == "ip":
2512
          val = instance.nics[0].ip
2513
        elif field == "bridge":
2514
          val = instance.nics[0].bridge
2515
        elif field == "mac":
2516
          val = instance.nics[0].mac
2517
        elif field == "sda_size" or field == "sdb_size":
2518
          disk = instance.FindDisk(field[:3])
2519
          if disk is None:
2520
            val = None
2521
          else:
2522
            val = disk.size
2523
        elif field == "vcpus":
2524
          val = instance.vcpus
2525
        else:
2526
          raise errors.ParameterError(field)
2527
        iout.append(val)
2528
      output.append(iout)
2529

    
2530
    return output
2531

    
2532

    
2533
class LUFailoverInstance(LogicalUnit):
2534
  """Failover an instance.
2535

2536
  """
2537
  HPATH = "instance-failover"
2538
  HTYPE = constants.HTYPE_INSTANCE
2539
  _OP_REQP = ["instance_name", "ignore_consistency"]
2540

    
2541
  def BuildHooksEnv(self):
2542
    """Build hooks env.
2543

2544
    This runs on master, primary and secondary nodes of the instance.
2545

2546
    """
2547
    env = {
2548
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2549
      }
2550
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2551
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2552
    return env, nl, nl
2553

    
2554
  def CheckPrereq(self):
2555
    """Check prerequisites.
2556

2557
    This checks that the instance is in the cluster.
2558

2559
    """
2560
    instance = self.cfg.GetInstanceInfo(
2561
      self.cfg.ExpandInstanceName(self.op.instance_name))
2562
    if instance is None:
2563
      raise errors.OpPrereqError("Instance '%s' not known" %
2564
                                 self.op.instance_name)
2565

    
2566
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2567
      raise errors.OpPrereqError("Instance's disk layout is not"
2568
                                 " network mirrored, cannot failover.")
2569

    
2570
    secondary_nodes = instance.secondary_nodes
2571
    if not secondary_nodes:
2572
      raise errors.ProgrammerError("no secondary node but using "
2573
                                   "DT_REMOTE_RAID1 template")
2574

    
2575
    target_node = secondary_nodes[0]
2576
    # check memory requirements on the secondary node
2577
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2578
                         instance.name, instance.memory)
2579

    
2580
    # check bridge existance
2581
    brlist = [nic.bridge for nic in instance.nics]
2582
    if not rpc.call_bridges_exist(target_node, brlist):
2583
      raise errors.OpPrereqError("One or more target bridges %s does not"
2584
                                 " exist on destination node '%s'" %
2585
                                 (brlist, target_node))
2586

    
2587
    self.instance = instance
2588

    
2589
  def Exec(self, feedback_fn):
2590
    """Failover an instance.
2591

2592
    The failover is done by shutting it down on its present node and
2593
    starting it on the secondary.
2594

2595
    """
2596
    instance = self.instance
2597

    
2598
    source_node = instance.primary_node
2599
    target_node = instance.secondary_nodes[0]
2600

    
2601
    feedback_fn("* checking disk consistency between source and target")
2602
    for dev in instance.disks:
2603
      # for remote_raid1, these are md over drbd
2604
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2605
        if not self.op.ignore_consistency:
2606
          raise errors.OpExecError("Disk %s is degraded on target node,"
2607
                                   " aborting failover." % dev.iv_name)
2608

    
2609
    feedback_fn("* shutting down instance on source node")
2610
    logger.Info("Shutting down instance %s on node %s" %
2611
                (instance.name, source_node))
2612

    
2613
    if not rpc.call_instance_shutdown(source_node, instance):
2614
      if self.op.ignore_consistency:
2615
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2616
                     " anyway. Please make sure node %s is down"  %
2617
                     (instance.name, source_node, source_node))
2618
      else:
2619
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2620
                                 (instance.name, source_node))
2621

    
2622
    feedback_fn("* deactivating the instance's disks on source node")
2623
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2624
      raise errors.OpExecError("Can't shut down the instance's disks.")
2625

    
2626
    instance.primary_node = target_node
2627
    # distribute new instance config to the other nodes
2628
    self.cfg.AddInstance(instance)
2629

    
2630
    feedback_fn("* activating the instance's disks on target node")
2631
    logger.Info("Starting instance %s on node %s" %
2632
                (instance.name, target_node))
2633

    
2634
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2635
                                             ignore_secondaries=True)
2636
    if not disks_ok:
2637
      _ShutdownInstanceDisks(instance, self.cfg)
2638
      raise errors.OpExecError("Can't activate the instance's disks")
2639

    
2640
    feedback_fn("* starting the instance on the target node")
2641
    if not rpc.call_instance_start(target_node, instance, None):
2642
      _ShutdownInstanceDisks(instance, self.cfg)
2643
      raise errors.OpExecError("Could not start instance %s on node %s." %
2644
                               (instance.name, target_node))
2645

    
2646

    
2647
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2648
  """Create a tree of block devices on the primary node.
2649

2650
  This always creates all devices.
2651

2652
  """
2653
  if device.children:
2654
    for child in device.children:
2655
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2656
        return False
2657

    
2658
  cfg.SetDiskID(device, node)
2659
  new_id = rpc.call_blockdev_create(node, device, device.size,
2660
                                    instance.name, True, info)
2661
  if not new_id:
2662
    return False
2663
  if device.physical_id is None:
2664
    device.physical_id = new_id
2665
  return True
2666

    
2667

    
2668
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2669
  """Create a tree of block devices on a secondary node.
2670

2671
  If this device type has to be created on secondaries, create it and
2672
  all its children.
2673

2674
  If not, just recurse to children keeping the same 'force' value.
2675

2676
  """
2677
  if device.CreateOnSecondary():
2678
    force = True
2679
  if device.children:
2680
    for child in device.children:
2681
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2682
                                        child, force, info):
2683
        return False
2684

    
2685
  if not force:
2686
    return True
2687
  cfg.SetDiskID(device, node)
2688
  new_id = rpc.call_blockdev_create(node, device, device.size,
2689
                                    instance.name, False, info)
2690
  if not new_id:
2691
    return False
2692
  if device.physical_id is None:
2693
    device.physical_id = new_id
2694
  return True
2695

    
2696

    
2697
def _GenerateUniqueNames(cfg, exts):
2698
  """Generate a suitable LV name.
2699

2700
  This will generate a logical volume name for the given instance.
2701

2702
  """
2703
  results = []
2704
  for val in exts:
2705
    new_id = cfg.GenerateUniqueID()
2706
    results.append("%s%s" % (new_id, val))
2707
  return results
2708

    
2709

    
2710
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2711
  """Generate a drbd device complete with its children.
2712

2713
  """
2714
  port = cfg.AllocatePort()
2715
  vgname = cfg.GetVGName()
2716
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2717
                          logical_id=(vgname, names[0]))
2718
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2719
                          logical_id=(vgname, names[1]))
2720
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2721
                          logical_id = (primary, secondary, port),
2722
                          children = [dev_data, dev_meta])
2723
  return drbd_dev
2724

    
2725

    
2726
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2727
  """Generate a drbd8 device complete with its children.
2728

2729
  """
2730
  port = cfg.AllocatePort()
2731
  vgname = cfg.GetVGName()
2732
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2733
                          logical_id=(vgname, names[0]))
2734
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2735
                          logical_id=(vgname, names[1]))
2736
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2737
                          logical_id = (primary, secondary, port),
2738
                          children = [dev_data, dev_meta],
2739
                          iv_name=iv_name)
2740
  return drbd_dev
2741

    
2742
def _GenerateDiskTemplate(cfg, template_name,
2743
                          instance_name, primary_node,
2744
                          secondary_nodes, disk_sz, swap_sz):
2745
  """Generate the entire disk layout for a given template type.
2746

2747
  """
2748
  #TODO: compute space requirements
2749

    
2750
  vgname = cfg.GetVGName()
2751
  if template_name == constants.DT_DISKLESS:
2752
    disks = []
2753
  elif template_name == constants.DT_PLAIN:
2754
    if len(secondary_nodes) != 0:
2755
      raise errors.ProgrammerError("Wrong template configuration")
2756

    
2757
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2758
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2759
                           logical_id=(vgname, names[0]),
2760
                           iv_name = "sda")
2761
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2762
                           logical_id=(vgname, names[1]),
2763
                           iv_name = "sdb")
2764
    disks = [sda_dev, sdb_dev]
2765
  elif template_name == constants.DT_LOCAL_RAID1:
2766
    if len(secondary_nodes) != 0:
2767
      raise errors.ProgrammerError("Wrong template configuration")
2768

    
2769

    
2770
    names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2771
                                       ".sdb_m1", ".sdb_m2"])
2772
    sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2773
                              logical_id=(vgname, names[0]))
2774
    sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2775
                              logical_id=(vgname, names[1]))
2776
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2777
                              size=disk_sz,
2778
                              children = [sda_dev_m1, sda_dev_m2])
2779
    sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2780
                              logical_id=(vgname, names[2]))
2781
    sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2782
                              logical_id=(vgname, names[3]))
2783
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2784
                              size=swap_sz,
2785
                              children = [sdb_dev_m1, sdb_dev_m2])
2786
    disks = [md_sda_dev, md_sdb_dev]
2787
  elif template_name == constants.DT_REMOTE_RAID1:
2788
    if len(secondary_nodes) != 1:
2789
      raise errors.ProgrammerError("Wrong template configuration")
2790
    remote_node = secondary_nodes[0]
2791
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2792
                                       ".sdb_data", ".sdb_meta"])
2793
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2794
                                         disk_sz, names[0:2])
2795
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2796
                              children = [drbd_sda_dev], size=disk_sz)
2797
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2798
                                         swap_sz, names[2:4])
2799
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2800
                              children = [drbd_sdb_dev], size=swap_sz)
2801
    disks = [md_sda_dev, md_sdb_dev]
2802
  elif template_name == constants.DT_DRBD8:
2803
    if len(secondary_nodes) != 1:
2804
      raise errors.ProgrammerError("Wrong template configuration")
2805
    remote_node = secondary_nodes[0]
2806
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2807
                                       ".sdb_data", ".sdb_meta"])
2808
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2809
                                         disk_sz, names[0:2], "sda")
2810
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2811
                                         swap_sz, names[2:4], "sdb")
2812
    disks = [drbd_sda_dev, drbd_sdb_dev]
2813
  else:
2814
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2815
  return disks
2816

    
2817

    
2818
def _GetInstanceInfoText(instance):
2819
  """Compute that text that should be added to the disk's metadata.
2820

2821
  """
2822
  return "originstname+%s" % instance.name
2823

    
2824

    
2825
def _CreateDisks(cfg, instance):
2826
  """Create all disks for an instance.
2827

2828
  This abstracts away some work from AddInstance.
2829

2830
  Args:
2831
    instance: the instance object
2832

2833
  Returns:
2834
    True or False showing the success of the creation process
2835

2836
  """
2837
  info = _GetInstanceInfoText(instance)
2838

    
2839
  for device in instance.disks:
2840
    logger.Info("creating volume %s for instance %s" %
2841
              (device.iv_name, instance.name))
2842
    #HARDCODE
2843
    for secondary_node in instance.secondary_nodes:
2844
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2845
                                        device, False, info):
2846
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2847
                     (device.iv_name, device, secondary_node))
2848
        return False
2849
    #HARDCODE
2850
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2851
                                    instance, device, info):
2852
      logger.Error("failed to create volume %s on primary!" %
2853
                   device.iv_name)
2854
      return False
2855
  return True
2856

    
2857

    
2858
def _RemoveDisks(instance, cfg):
2859
  """Remove all disks for an instance.
2860

2861
  This abstracts away some work from `AddInstance()` and
2862
  `RemoveInstance()`. Note that in case some of the devices couldn't
2863
  be removed, the removal will continue with the other ones (compare
2864
  with `_CreateDisks()`).
2865

2866
  Args:
2867
    instance: the instance object
2868

2869
  Returns:
2870
    True or False showing the success of the removal proces
2871

2872
  """
2873
  logger.Info("removing block devices for instance %s" % instance.name)
2874

    
2875
  result = True
2876
  for device in instance.disks:
2877
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2878
      cfg.SetDiskID(disk, node)
2879
      if not rpc.call_blockdev_remove(node, disk):
2880
        logger.Error("could not remove block device %s on node %s,"
2881
                     " continuing anyway" %
2882
                     (device.iv_name, node))
2883
        result = False
2884
  return result
2885

    
2886

    
2887
class LUCreateInstance(LogicalUnit):
2888
  """Create an instance.
2889

2890
  """
2891
  HPATH = "instance-add"
2892
  HTYPE = constants.HTYPE_INSTANCE
2893
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2894
              "disk_template", "swap_size", "mode", "start", "vcpus",
2895
              "wait_for_sync", "ip_check", "mac"]
2896

    
2897
  def BuildHooksEnv(self):
2898
    """Build hooks env.
2899

2900
    This runs on master, primary and secondary nodes of the instance.
2901

2902
    """
2903
    env = {
2904
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2905
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2906
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2907
      "INSTANCE_ADD_MODE": self.op.mode,
2908
      }
2909
    if self.op.mode == constants.INSTANCE_IMPORT:
2910
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2911
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2912
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2913

    
2914
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2915
      primary_node=self.op.pnode,
2916
      secondary_nodes=self.secondaries,
2917
      status=self.instance_status,
2918
      os_type=self.op.os_type,
2919
      memory=self.op.mem_size,
2920
      vcpus=self.op.vcpus,
2921
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2922
    ))
2923

    
2924
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2925
          self.secondaries)
2926
    return env, nl, nl
2927

    
2928

    
2929
  def CheckPrereq(self):
2930
    """Check prerequisites.
2931

2932
    """
2933
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
2934
      if not hasattr(self.op, attr):
2935
        setattr(self.op, attr, None)
2936

    
2937
    if self.op.mode not in (constants.INSTANCE_CREATE,
2938
                            constants.INSTANCE_IMPORT):
2939
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2940
                                 self.op.mode)
2941

    
2942
    if self.op.mode == constants.INSTANCE_IMPORT:
2943
      src_node = getattr(self.op, "src_node", None)
2944
      src_path = getattr(self.op, "src_path", None)
2945
      if src_node is None or src_path is None:
2946
        raise errors.OpPrereqError("Importing an instance requires source"
2947
                                   " node and path options")
2948
      src_node_full = self.cfg.ExpandNodeName(src_node)
2949
      if src_node_full is None:
2950
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2951
      self.op.src_node = src_node = src_node_full
2952

    
2953
      if not os.path.isabs(src_path):
2954
        raise errors.OpPrereqError("The source path must be absolute")
2955

    
2956
      export_info = rpc.call_export_info(src_node, src_path)
2957

    
2958
      if not export_info:
2959
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2960

    
2961
      if not export_info.has_section(constants.INISECT_EXP):
2962
        raise errors.ProgrammerError("Corrupted export config")
2963

    
2964
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2965
      if (int(ei_version) != constants.EXPORT_VERSION):
2966
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2967
                                   (ei_version, constants.EXPORT_VERSION))
2968

    
2969
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2970
        raise errors.OpPrereqError("Can't import instance with more than"
2971
                                   " one data disk")
2972

    
2973
      # FIXME: are the old os-es, disk sizes, etc. useful?
2974
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2975
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2976
                                                         'disk0_dump'))
2977
      self.src_image = diskimage
2978
    else: # INSTANCE_CREATE
2979
      if getattr(self.op, "os_type", None) is None:
2980
        raise errors.OpPrereqError("No guest OS specified")
2981

    
2982
    # check primary node
2983
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2984
    if pnode is None:
2985
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
2986
                                 self.op.pnode)
2987
    self.op.pnode = pnode.name
2988
    self.pnode = pnode
2989
    self.secondaries = []
2990
    # disk template and mirror node verification
2991
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2992
      raise errors.OpPrereqError("Invalid disk template name")
2993

    
2994
    if self.op.disk_template in constants.DTS_NET_MIRROR:
2995
      if getattr(self.op, "snode", None) is None:
2996
        raise errors.OpPrereqError("The networked disk templates need"
2997
                                   " a mirror node")
2998

    
2999
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3000
      if snode_name is None:
3001
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3002
                                   self.op.snode)
3003
      elif snode_name == pnode.name:
3004
        raise errors.OpPrereqError("The secondary node cannot be"
3005
                                   " the primary node.")
3006
      self.secondaries.append(snode_name)
3007

    
3008
    # Required free disk space as a function of disk and swap space
3009
    req_size_dict = {
3010
      constants.DT_DISKLESS: None,
3011
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
3012
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
3013
      # 256 MB are added for drbd metadata, 128MB for each drbd device
3014
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
3015
      constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
3016
    }
3017

    
3018
    if self.op.disk_template not in req_size_dict:
3019
      raise errors.ProgrammerError("Disk template '%s' size requirement"
3020
                                   " is unknown" %  self.op.disk_template)
3021

    
3022
    req_size = req_size_dict[self.op.disk_template]
3023

    
3024
    # Check lv size requirements
3025
    if req_size is not None:
3026
      nodenames = [pnode.name] + self.secondaries
3027
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3028
      for node in nodenames:
3029
        info = nodeinfo.get(node, None)
3030
        if not info:
3031
          raise errors.OpPrereqError("Cannot get current information"
3032
                                     " from node '%s'" % nodeinfo)
3033
        vg_free = info.get('vg_free', None)
3034
        if not isinstance(vg_free, int):
3035
          raise errors.OpPrereqError("Can't compute free disk space on"
3036
                                     " node %s" % node)
3037
        if req_size > info['vg_free']:
3038
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3039
                                     " %d MB available, %d MB required" %
3040
                                     (node, info['vg_free'], req_size))
3041

    
3042
    # os verification
3043
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3044
    if not os_obj:
3045
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3046
                                 " primary node"  % self.op.os_type)
3047

    
3048
    if self.op.kernel_path == constants.VALUE_NONE:
3049
      raise errors.OpPrereqError("Can't set instance kernel to none")
3050

    
3051
    # instance verification
3052
    hostname1 = utils.HostInfo(self.op.instance_name)
3053

    
3054
    self.op.instance_name = instance_name = hostname1.name
3055
    instance_list = self.cfg.GetInstanceList()
3056
    if instance_name in instance_list:
3057
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3058
                                 instance_name)
3059

    
3060
    ip = getattr(self.op, "ip", None)
3061
    if ip is None or ip.lower() == "none":
3062
      inst_ip = None
3063
    elif ip.lower() == "auto":
3064
      inst_ip = hostname1.ip
3065
    else:
3066
      if not utils.IsValidIP(ip):
3067
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3068
                                   " like a valid IP" % ip)
3069
      inst_ip = ip
3070
    self.inst_ip = inst_ip
3071

    
3072
    if self.op.start and not self.op.ip_check:
3073
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3074
                                 " adding an instance in start mode")
3075

    
3076
    if self.op.ip_check:
3077
      if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
3078
                       constants.DEFAULT_NODED_PORT):
3079
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3080
                                   (hostname1.ip, instance_name))
3081

    
3082
    # MAC address verification
3083
    if self.op.mac != "auto":
3084
      if not utils.IsValidMac(self.op.mac.lower()):
3085
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3086
                                   self.op.mac)
3087

    
3088
    # bridge verification
3089
    bridge = getattr(self.op, "bridge", None)
3090
    if bridge is None:
3091
      self.op.bridge = self.cfg.GetDefBridge()
3092
    else:
3093
      self.op.bridge = bridge
3094

    
3095
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3096
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3097
                                 " destination node '%s'" %
3098
                                 (self.op.bridge, pnode.name))
3099

    
3100
    # boot order verification
3101
    if self.op.hvm_boot_order is not None:
3102
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3103
        raise errors.OpPrereqError("invalid boot order specified,"
3104
                                   " must be one or more of [acdn]")
3105

    
3106
    if self.op.start:
3107
      self.instance_status = 'up'
3108
    else:
3109
      self.instance_status = 'down'
3110

    
3111
  def Exec(self, feedback_fn):
3112
    """Create and add the instance to the cluster.
3113

3114
    """
3115
    instance = self.op.instance_name
3116
    pnode_name = self.pnode.name
3117

    
3118
    if self.op.mac == "auto":
3119
      mac_address = self.cfg.GenerateMAC()
3120
    else:
3121
      mac_address = self.op.mac
3122

    
3123
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3124
    if self.inst_ip is not None:
3125
      nic.ip = self.inst_ip
3126

    
3127
    ht_kind = self.sstore.GetHypervisorType()
3128
    if ht_kind in constants.HTS_REQ_PORT:
3129
      network_port = self.cfg.AllocatePort()
3130
    else:
3131
      network_port = None
3132

    
3133
    disks = _GenerateDiskTemplate(self.cfg,
3134
                                  self.op.disk_template,
3135
                                  instance, pnode_name,
3136
                                  self.secondaries, self.op.disk_size,
3137
                                  self.op.swap_size)
3138

    
3139
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3140
                            primary_node=pnode_name,
3141
                            memory=self.op.mem_size,
3142
                            vcpus=self.op.vcpus,
3143
                            nics=[nic], disks=disks,
3144
                            disk_template=self.op.disk_template,
3145
                            status=self.instance_status,
3146
                            network_port=network_port,
3147
                            kernel_path=self.op.kernel_path,
3148
                            initrd_path=self.op.initrd_path,
3149
                            hvm_boot_order=self.op.hvm_boot_order,
3150
                            )
3151

    
3152
    feedback_fn("* creating instance disks...")
3153
    if not _CreateDisks(self.cfg, iobj):
3154
      _RemoveDisks(iobj, self.cfg)
3155
      raise errors.OpExecError("Device creation failed, reverting...")
3156

    
3157
    feedback_fn("adding instance %s to cluster config" % instance)
3158

    
3159
    self.cfg.AddInstance(iobj)
3160

    
3161
    if self.op.wait_for_sync:
3162
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3163
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3164
      # make sure the disks are not degraded (still sync-ing is ok)
3165
      time.sleep(15)
3166
      feedback_fn("* checking mirrors status")
3167
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3168
    else:
3169
      disk_abort = False
3170

    
3171
    if disk_abort:
3172
      _RemoveDisks(iobj, self.cfg)
3173
      self.cfg.RemoveInstance(iobj.name)
3174
      raise errors.OpExecError("There are some degraded disks for"
3175
                               " this instance")
3176

    
3177
    feedback_fn("creating os for instance %s on node %s" %
3178
                (instance, pnode_name))
3179

    
3180
    if iobj.disk_template != constants.DT_DISKLESS:
3181
      if self.op.mode == constants.INSTANCE_CREATE:
3182
        feedback_fn("* running the instance OS create scripts...")
3183
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3184
          raise errors.OpExecError("could not add os for instance %s"
3185
                                   " on node %s" %
3186
                                   (instance, pnode_name))
3187

    
3188
      elif self.op.mode == constants.INSTANCE_IMPORT:
3189
        feedback_fn("* running the instance OS import scripts...")
3190
        src_node = self.op.src_node
3191
        src_image = self.src_image
3192
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3193
                                                src_node, src_image):
3194
          raise errors.OpExecError("Could not import os for instance"
3195
                                   " %s on node %s" %
3196
                                   (instance, pnode_name))
3197
      else:
3198
        # also checked in the prereq part
3199
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3200
                                     % self.op.mode)
3201

    
3202
    if self.op.start:
3203
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3204
      feedback_fn("* starting instance...")
3205
      if not rpc.call_instance_start(pnode_name, iobj, None):
3206
        raise errors.OpExecError("Could not start instance")
3207

    
3208

    
3209
class LUConnectConsole(NoHooksLU):
3210
  """Connect to an instance's console.
3211

3212
  This is somewhat special in that it returns the command line that
3213
  you need to run on the master node in order to connect to the
3214
  console.
3215

3216
  """
3217
  _OP_REQP = ["instance_name"]
3218

    
3219
  def CheckPrereq(self):
3220
    """Check prerequisites.
3221

3222
    This checks that the instance is in the cluster.
3223

3224
    """
3225
    instance = self.cfg.GetInstanceInfo(
3226
      self.cfg.ExpandInstanceName(self.op.instance_name))
3227
    if instance is None:
3228
      raise errors.OpPrereqError("Instance '%s' not known" %
3229
                                 self.op.instance_name)
3230
    self.instance = instance
3231

    
3232
  def Exec(self, feedback_fn):
3233
    """Connect to the console of an instance
3234

3235
    """
3236
    instance = self.instance
3237
    node = instance.primary_node
3238

    
3239
    node_insts = rpc.call_instance_list([node])[node]
3240
    if node_insts is False:
3241
      raise errors.OpExecError("Can't connect to node %s." % node)
3242

    
3243
    if instance.name not in node_insts:
3244
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3245

    
3246
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3247

    
3248
    hyper = hypervisor.GetHypervisor()
3249
    console_cmd = hyper.GetShellCommandForConsole(instance)
3250
    # build ssh cmdline
3251
    argv = ["ssh", "-q", "-t"]
3252
    argv.extend(ssh.KNOWN_HOSTS_OPTS)
3253
    argv.extend(ssh.BATCH_MODE_OPTS)
3254
    argv.append(node)
3255
    argv.append(console_cmd)
3256
    return "ssh", argv
3257

    
3258

    
3259
class LUAddMDDRBDComponent(LogicalUnit):
3260
  """Adda new mirror member to an instance's disk.
3261

3262
  """
3263
  HPATH = "mirror-add"
3264
  HTYPE = constants.HTYPE_INSTANCE
3265
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3266

    
3267
  def BuildHooksEnv(self):
3268
    """Build hooks env.
3269

3270
    This runs on the master, the primary and all the secondaries.
3271

3272
    """
3273
    env = {
3274
      "NEW_SECONDARY": self.op.remote_node,
3275
      "DISK_NAME": self.op.disk_name,
3276
      }
3277
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3278
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3279
          self.op.remote_node,] + list(self.instance.secondary_nodes)
3280
    return env, nl, nl
3281

    
3282
  def CheckPrereq(self):
3283
    """Check prerequisites.
3284

3285
    This checks that the instance is in the cluster.
3286

3287
    """
3288
    instance = self.cfg.GetInstanceInfo(
3289
      self.cfg.ExpandInstanceName(self.op.instance_name))
3290
    if instance is None:
3291
      raise errors.OpPrereqError("Instance '%s' not known" %
3292
                                 self.op.instance_name)
3293
    self.instance = instance
3294

    
3295
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3296
    if remote_node is None:
3297
      raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3298
    self.remote_node = remote_node
3299

    
3300
    if remote_node == instance.primary_node:
3301
      raise errors.OpPrereqError("The specified node is the primary node of"
3302
                                 " the instance.")
3303

    
3304
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3305
      raise errors.OpPrereqError("Instance's disk layout is not"
3306
                                 " remote_raid1.")
3307
    for disk in instance.disks:
3308
      if disk.iv_name == self.op.disk_name:
3309
        break
3310
    else:
3311
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3312
                                 " instance." % self.op.disk_name)
3313
    if len(disk.children) > 1:
3314
      raise errors.OpPrereqError("The device already has two slave devices."
3315
                                 " This would create a 3-disk raid1 which we"
3316
                                 " don't allow.")
3317
    self.disk = disk
3318

    
3319
  def Exec(self, feedback_fn):
3320
    """Add the mirror component
3321

3322
    """
3323
    disk = self.disk
3324
    instance = self.instance
3325

    
3326
    remote_node = self.remote_node
3327
    lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3328
    names = _GenerateUniqueNames(self.cfg, lv_names)
3329
    new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3330
                                     remote_node, disk.size, names)
3331

    
3332
    logger.Info("adding new mirror component on secondary")
3333
    #HARDCODE
3334
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3335
                                      new_drbd, False,
3336
                                      _GetInstanceInfoText(instance)):
3337
      raise errors.OpExecError("Failed to create new component on secondary"
3338
                               " node %s" % remote_node)
3339

    
3340
    logger.Info("adding new mirror component on primary")
3341
    #HARDCODE
3342
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3343
                                    instance, new_drbd,
3344
                                    _GetInstanceInfoText(instance)):
3345
      # remove secondary dev
3346
      self.cfg.SetDiskID(new_drbd, remote_node)
3347
      rpc.call_blockdev_remove(remote_node, new_drbd)
3348
      raise errors.OpExecError("Failed to create volume on primary")
3349

    
3350
    # the device exists now
3351
    # call the primary node to add the mirror to md
3352
    logger.Info("adding new mirror component to md")
3353
    if not rpc.call_blockdev_addchildren(instance.primary_node,
3354
                                         disk, [new_drbd]):
3355
      logger.Error("Can't add mirror compoment to md!")
3356
      self.cfg.SetDiskID(new_drbd, remote_node)
3357
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
3358
        logger.Error("Can't rollback on secondary")
3359
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
3360
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3361
        logger.Error("Can't rollback on primary")
3362
      raise errors.OpExecError("Can't add mirror component to md array")
3363

    
3364
    disk.children.append(new_drbd)
3365

    
3366
    self.cfg.AddInstance(instance)
3367

    
3368
    _WaitForSync(self.cfg, instance, self.proc)
3369

    
3370
    return 0
3371

    
3372

    
3373
class LURemoveMDDRBDComponent(LogicalUnit):
3374
  """Remove a component from a remote_raid1 disk.
3375

3376
  """
3377
  HPATH = "mirror-remove"
3378
  HTYPE = constants.HTYPE_INSTANCE
3379
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3380

    
3381
  def BuildHooksEnv(self):
3382
    """Build hooks env.
3383

3384
    This runs on the master, the primary and all the secondaries.
3385

3386
    """
3387
    env = {
3388
      "DISK_NAME": self.op.disk_name,
3389
      "DISK_ID": self.op.disk_id,
3390
      "OLD_SECONDARY": self.old_secondary,
3391
      }
3392
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3393
    nl = [self.sstore.GetMasterNode(),
3394
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3395
    return env, nl, nl
3396

    
3397
  def CheckPrereq(self):
3398
    """Check prerequisites.
3399

3400
    This checks that the instance is in the cluster.
3401

3402
    """
3403
    instance = self.cfg.GetInstanceInfo(
3404
      self.cfg.ExpandInstanceName(self.op.instance_name))
3405
    if instance is None:
3406
      raise errors.OpPrereqError("Instance '%s' not known" %
3407
                                 self.op.instance_name)
3408
    self.instance = instance
3409

    
3410
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3411
      raise errors.OpPrereqError("Instance's disk layout is not"
3412
                                 " remote_raid1.")
3413
    for disk in instance.disks:
3414
      if disk.iv_name == self.op.disk_name:
3415
        break
3416
    else:
3417
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3418
                                 " instance." % self.op.disk_name)
3419
    for child in disk.children:
3420
      if (child.dev_type == constants.LD_DRBD7 and
3421
          child.logical_id[2] == self.op.disk_id):
3422
        break
3423
    else:
3424
      raise errors.OpPrereqError("Can't find the device with this port.")
3425

    
3426
    if len(disk.children) < 2:
3427
      raise errors.OpPrereqError("Cannot remove the last component from"
3428
                                 " a mirror.")
3429
    self.disk = disk
3430
    self.child = child
3431
    if self.child.logical_id[0] == instance.primary_node:
3432
      oid = 1
3433
    else:
3434
      oid = 0
3435
    self.old_secondary = self.child.logical_id[oid]
3436

    
3437
  def Exec(self, feedback_fn):
3438
    """Remove the mirror component
3439

3440
    """
3441
    instance = self.instance
3442
    disk = self.disk
3443
    child = self.child
3444
    logger.Info("remove mirror component")
3445
    self.cfg.SetDiskID(disk, instance.primary_node)
3446
    if not rpc.call_blockdev_removechildren(instance.primary_node,
3447
                                            disk, [child]):
3448
      raise errors.OpExecError("Can't remove child from mirror.")
3449

    
3450
    for node in child.logical_id[:2]:
3451
      self.cfg.SetDiskID(child, node)
3452
      if not rpc.call_blockdev_remove(node, child):
3453
        logger.Error("Warning: failed to remove device from node %s,"
3454
                     " continuing operation." % node)
3455

    
3456
    disk.children.remove(child)
3457
    self.cfg.AddInstance(instance)
3458

    
3459

    
3460
class LUReplaceDisks(LogicalUnit):
3461
  """Replace the disks of an instance.
3462

3463
  """
3464
  HPATH = "mirrors-replace"
3465
  HTYPE = constants.HTYPE_INSTANCE
3466
  _OP_REQP = ["instance_name", "mode", "disks"]
3467

    
3468
  def BuildHooksEnv(self):
3469
    """Build hooks env.
3470

3471
    This runs on the master, the primary and all the secondaries.
3472

3473
    """
3474
    env = {
3475
      "MODE": self.op.mode,
3476
      "NEW_SECONDARY": self.op.remote_node,
3477
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3478
      }
3479
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3480
    nl = [
3481
      self.sstore.GetMasterNode(),
3482
      self.instance.primary_node,
3483
      ]
3484
    if self.op.remote_node is not None:
3485
      nl.append(self.op.remote_node)
3486
    return env, nl, nl
3487

    
3488
  def CheckPrereq(self):
3489
    """Check prerequisites.
3490

3491
    This checks that the instance is in the cluster.
3492

3493
    """
3494
    instance = self.cfg.GetInstanceInfo(
3495
      self.cfg.ExpandInstanceName(self.op.instance_name))
3496
    if instance is None:
3497
      raise errors.OpPrereqError("Instance '%s' not known" %
3498
                                 self.op.instance_name)
3499
    self.instance = instance
3500
    self.op.instance_name = instance.name
3501

    
3502
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3503
      raise errors.OpPrereqError("Instance's disk layout is not"
3504
                                 " network mirrored.")
3505

    
3506
    if len(instance.secondary_nodes) != 1:
3507
      raise errors.OpPrereqError("The instance has a strange layout,"
3508
                                 " expected one secondary but found %d" %
3509
                                 len(instance.secondary_nodes))
3510

    
3511
    self.sec_node = instance.secondary_nodes[0]
3512

    
3513
    remote_node = getattr(self.op, "remote_node", None)
3514
    if remote_node is not None:
3515
      remote_node = self.cfg.ExpandNodeName(remote_node)
3516
      if remote_node is None:
3517
        raise errors.OpPrereqError("Node '%s' not known" %
3518
                                   self.op.remote_node)
3519
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3520
    else:
3521
      self.remote_node_info = None
3522
    if remote_node == instance.primary_node:
3523
      raise errors.OpPrereqError("The specified node is the primary node of"
3524
                                 " the instance.")
3525
    elif remote_node == self.sec_node:
3526
      if self.op.mode == constants.REPLACE_DISK_SEC:
3527
        # this is for DRBD8, where we can't execute the same mode of
3528
        # replacement as for drbd7 (no different port allocated)
3529
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3530
                                   " replacement")
3531
      # the user gave the current secondary, switch to
3532
      # 'no-replace-secondary' mode for drbd7
3533
      remote_node = None
3534
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3535
        self.op.mode != constants.REPLACE_DISK_ALL):
3536
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3537
                                 " disks replacement, not individual ones")
3538
    if instance.disk_template == constants.DT_DRBD8:
3539
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3540
          remote_node is not None):
3541
        # switch to replace secondary mode
3542
        self.op.mode = constants.REPLACE_DISK_SEC
3543

    
3544
      if self.op.mode == constants.REPLACE_DISK_ALL:
3545
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3546
                                   " secondary disk replacement, not"
3547
                                   " both at once")
3548
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3549
        if remote_node is not None:
3550
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3551
                                     " the secondary while doing a primary"
3552
                                     " node disk replacement")
3553
        self.tgt_node = instance.primary_node
3554
        self.oth_node = instance.secondary_nodes[0]
3555
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3556
        self.new_node = remote_node # this can be None, in which case
3557
                                    # we don't change the secondary
3558
        self.tgt_node = instance.secondary_nodes[0]
3559
        self.oth_node = instance.primary_node
3560
      else:
3561
        raise errors.ProgrammerError("Unhandled disk replace mode")
3562

    
3563
    for name in self.op.disks:
3564
      if instance.FindDisk(name) is None:
3565
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3566
                                   (name, instance.name))
3567
    self.op.remote_node = remote_node
3568

    
3569
  def _ExecRR1(self, feedback_fn):
3570
    """Replace the disks of an instance.
3571

3572
    """
3573
    instance = self.instance
3574
    iv_names = {}
3575
    # start of work
3576
    if self.op.remote_node is None:
3577
      remote_node = self.sec_node
3578
    else:
3579
      remote_node = self.op.remote_node
3580
    cfg = self.cfg
3581
    for dev in instance.disks:
3582
      size = dev.size
3583
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3584
      names = _GenerateUniqueNames(cfg, lv_names)
3585
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3586
                                       remote_node, size, names)
3587
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3588
      logger.Info("adding new mirror component on secondary for %s" %
3589
                  dev.iv_name)
3590
      #HARDCODE
3591
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3592
                                        new_drbd, False,
3593
                                        _GetInstanceInfoText(instance)):
3594
        raise errors.OpExecError("Failed to create new component on secondary"
3595
                                 " node %s. Full abort, cleanup manually!" %
3596
                                 remote_node)
3597

    
3598
      logger.Info("adding new mirror component on primary")
3599
      #HARDCODE
3600
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3601
                                      instance, new_drbd,
3602
                                      _GetInstanceInfoText(instance)):
3603
        # remove secondary dev
3604
        cfg.SetDiskID(new_drbd, remote_node)
3605
        rpc.call_blockdev_remove(remote_node, new_drbd)
3606
        raise errors.OpExecError("Failed to create volume on primary!"
3607
                                 " Full abort, cleanup manually!!")
3608

    
3609
      # the device exists now
3610
      # call the primary node to add the mirror to md
3611
      logger.Info("adding new mirror component to md")
3612
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3613
                                           [new_drbd]):
3614
        logger.Error("Can't add mirror compoment to md!")
3615
        cfg.SetDiskID(new_drbd, remote_node)
3616
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3617
          logger.Error("Can't rollback on secondary")
3618
        cfg.SetDiskID(new_drbd, instance.primary_node)
3619
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3620
          logger.Error("Can't rollback on primary")
3621
        raise errors.OpExecError("Full abort, cleanup manually!!")
3622

    
3623
      dev.children.append(new_drbd)
3624
      cfg.AddInstance(instance)
3625

    
3626
    # this can fail as the old devices are degraded and _WaitForSync
3627
    # does a combined result over all disks, so we don't check its
3628
    # return value
3629
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3630

    
3631
    # so check manually all the devices
3632
    for name in iv_names:
3633
      dev, child, new_drbd = iv_names[name]
3634
      cfg.SetDiskID(dev, instance.primary_node)
3635
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3636
      if is_degr:
3637
        raise errors.OpExecError("MD device %s is degraded!" % name)
3638
      cfg.SetDiskID(new_drbd, instance.primary_node)
3639
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3640
      if is_degr:
3641
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3642

    
3643
    for name in iv_names:
3644
      dev, child, new_drbd = iv_names[name]
3645
      logger.Info("remove mirror %s component" % name)
3646
      cfg.SetDiskID(dev, instance.primary_node)
3647
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3648
                                              dev, [child]):
3649
        logger.Error("Can't remove child from mirror, aborting"
3650
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3651
        continue
3652

    
3653
      for node in child.logical_id[:2]:
3654
        logger.Info("remove child device on %s" % node)
3655
        cfg.SetDiskID(child, node)
3656
        if not rpc.call_blockdev_remove(node, child):
3657
          logger.Error("Warning: failed to remove device from node %s,"
3658
                       " continuing operation." % node)
3659

    
3660
      dev.children.remove(child)
3661

    
3662
      cfg.AddInstance(instance)
3663

    
3664
  def _ExecD8DiskOnly(self, feedback_fn):
3665
    """Replace a disk on the primary or secondary for dbrd8.
3666

3667
    The algorithm for replace is quite complicated:
3668
      - for each disk to be replaced:
3669
        - create new LVs on the target node with unique names
3670
        - detach old LVs from the drbd device
3671
        - rename old LVs to name_replaced.<time_t>
3672
        - rename new LVs to old LVs
3673
        - attach the new LVs (with the old names now) to the drbd device
3674
      - wait for sync across all devices
3675
      - for each modified disk:
3676
        - remove old LVs (which have the name name_replaces.<time_t>)
3677

3678
    Failures are not very well handled.
3679

3680
    """
3681
    steps_total = 6
3682
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3683
    instance = self.instance
3684
    iv_names = {}
3685
    vgname = self.cfg.GetVGName()
3686
    # start of work
3687
    cfg = self.cfg
3688
    tgt_node = self.tgt_node
3689
    oth_node = self.oth_node
3690

    
3691
    # Step: check device activation
3692
    self.proc.LogStep(1, steps_total, "check device existence")
3693
    info("checking volume groups")
3694
    my_vg = cfg.GetVGName()
3695
    results = rpc.call_vg_list([oth_node, tgt_node])
3696
    if not results:
3697
      raise errors.OpExecError("Can't list volume groups on the nodes")
3698
    for node in oth_node, tgt_node:
3699
      res = results.get(node, False)
3700
      if not res or my_vg not in res:
3701
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3702
                                 (my_vg, node))
3703
    for dev in instance.disks:
3704
      if not dev.iv_name in self.op.disks:
3705
        continue
3706
      for node in tgt_node, oth_node:
3707
        info("checking %s on %s" % (dev.iv_name, node))
3708
        cfg.SetDiskID(dev, node)
3709
        if not rpc.call_blockdev_find(node, dev):
3710
          raise errors.OpExecError("Can't find device %s on node %s" %
3711
                                   (dev.iv_name, node))
3712

    
3713
    # Step: check other node consistency
3714
    self.proc.LogStep(2, steps_total, "check peer consistency")
3715
    for dev in instance.disks:
3716
      if not dev.iv_name in self.op.disks:
3717
        continue
3718
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3719
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3720
                                   oth_node==instance.primary_node):
3721
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3722
                                 " to replace disks on this node (%s)" %
3723
                                 (oth_node, tgt_node))
3724

    
3725
    # Step: create new storage
3726
    self.proc.LogStep(3, steps_total, "allocate new storage")
3727
    for dev in instance.disks:
3728
      if not dev.iv_name in self.op.disks:
3729
        continue
3730
      size = dev.size
3731
      cfg.SetDiskID(dev, tgt_node)
3732
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3733
      names = _GenerateUniqueNames(cfg, lv_names)
3734
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3735
                             logical_id=(vgname, names[0]))
3736
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3737
                             logical_id=(vgname, names[1]))
3738
      new_lvs = [lv_data, lv_meta]
3739
      old_lvs = dev.children
3740
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3741
      info("creating new local storage on %s for %s" %
3742
           (tgt_node, dev.iv_name))
3743
      # since we *always* want to create this LV, we use the
3744
      # _Create...OnPrimary (which forces the creation), even if we
3745
      # are talking about the secondary node
3746
      for new_lv in new_lvs:
3747
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3748
                                        _GetInstanceInfoText(instance)):
3749
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3750
                                   " node '%s'" %
3751
                                   (new_lv.logical_id[1], tgt_node))
3752

    
3753
    # Step: for each lv, detach+rename*2+attach
3754
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3755
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3756
      info("detaching %s drbd from local storage" % dev.iv_name)
3757
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3758
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3759
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3760
      #dev.children = []
3761
      #cfg.Update(instance)
3762

    
3763
      # ok, we created the new LVs, so now we know we have the needed
3764
      # storage; as such, we proceed on the target node to rename
3765
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3766
      # using the assumption that logical_id == physical_id (which in
3767
      # turn is the unique_id on that node)
3768

    
3769
      # FIXME(iustin): use a better name for the replaced LVs
3770
      temp_suffix = int(time.time())
3771
      ren_fn = lambda d, suff: (d.physical_id[0],
3772
                                d.physical_id[1] + "_replaced-%s" % suff)
3773
      # build the rename list based on what LVs exist on the node
3774
      rlist = []
3775
      for to_ren in old_lvs:
3776
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3777
        if find_res is not None: # device exists
3778
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3779

    
3780
      info("renaming the old LVs on the target node")
3781
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3782
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3783
      # now we rename the new LVs to the old LVs
3784
      info("renaming the new LVs on the target node")
3785
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3786
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3787
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3788

    
3789
      for old, new in zip(old_lvs, new_lvs):
3790
        new.logical_id = old.logical_id
3791
        cfg.SetDiskID(new, tgt_node)
3792

    
3793
      for disk in old_lvs:
3794
        disk.logical_id = ren_fn(disk, temp_suffix)
3795
        cfg.SetDiskID(disk, tgt_node)
3796

    
3797
      # now that the new lvs have the old name, we can add them to the device
3798
      info("adding new mirror component on %s" % tgt_node)
3799
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3800
        for new_lv in new_lvs:
3801
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3802
            warning("Can't rollback device %s", hint="manually cleanup unused"
3803
                    " logical volumes")
3804
        raise errors.OpExecError("Can't add local storage to drbd")
3805

    
3806
      dev.children = new_lvs
3807
      cfg.Update(instance)
3808

    
3809
    # Step: wait for sync
3810

    
3811
    # this can fail as the old devices are degraded and _WaitForSync
3812
    # does a combined result over all disks, so we don't check its
3813
    # return value
3814
    self.proc.LogStep(5, steps_total, "sync devices")
3815
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3816

    
3817
    # so check manually all the devices
3818
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3819
      cfg.SetDiskID(dev, instance.primary_node)
3820
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3821
      if is_degr:
3822
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3823

    
3824
    # Step: remove old storage
3825
    self.proc.LogStep(6, steps_total, "removing old storage")
3826
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3827
      info("remove logical volumes for %s" % name)
3828
      for lv in old_lvs:
3829
        cfg.SetDiskID(lv, tgt_node)
3830
        if not rpc.call_blockdev_remove(tgt_node, lv):
3831
          warning("Can't remove old LV", hint="manually remove unused LVs")
3832
          continue
3833

    
3834
  def _ExecD8Secondary(self, feedback_fn):
3835
    """Replace the secondary node for drbd8.
3836

3837
    The algorithm for replace is quite complicated:
3838
      - for all disks of the instance:
3839
        - create new LVs on the new node with same names
3840
        - shutdown the drbd device on the old secondary
3841
        - disconnect the drbd network on the primary
3842
        - create the drbd device on the new secondary
3843
        - network attach the drbd on the primary, using an artifice:
3844
          the drbd code for Attach() will connect to the network if it
3845
          finds a device which is connected to the good local disks but
3846
          not network enabled
3847
      - wait for sync across all devices
3848
      - remove all disks from the old secondary
3849

3850
    Failures are not very well handled.
3851

3852
    """
3853
    steps_total = 6
3854
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3855
    instance = self.instance
3856
    iv_names = {}
3857
    vgname = self.cfg.GetVGName()
3858
    # start of work
3859
    cfg = self.cfg
3860
    old_node = self.tgt_node
3861
    new_node = self.new_node
3862
    pri_node = instance.primary_node
3863

    
3864
    # Step: check device activation
3865
    self.proc.LogStep(1, steps_total, "check device existence")
3866
    info("checking volume groups")
3867
    my_vg = cfg.GetVGName()
3868
    results = rpc.call_vg_list([pri_node, new_node])
3869
    if not results:
3870
      raise errors.OpExecError("Can't list volume groups on the nodes")
3871
    for node in pri_node, new_node:
3872
      res = results.get(node, False)
3873
      if not res or my_vg not in res:
3874
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3875
                                 (my_vg, node))
3876
    for dev in instance.disks:
3877
      if not dev.iv_name in self.op.disks:
3878
        continue
3879
      info("checking %s on %s" % (dev.iv_name, pri_node))
3880
      cfg.SetDiskID(dev, pri_node)
3881
      if not rpc.call_blockdev_find(pri_node, dev):
3882
        raise errors.OpExecError("Can't find device %s on node %s" %
3883
                                 (dev.iv_name, pri_node))
3884

    
3885
    # Step: check other node consistency
3886
    self.proc.LogStep(2, steps_total, "check peer consistency")
3887
    for dev in instance.disks:
3888
      if not dev.iv_name in self.op.disks:
3889
        continue
3890
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3891
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3892
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3893
                                 " unsafe to replace the secondary" %
3894
                                 pri_node)
3895

    
3896
    # Step: create new storage
3897
    self.proc.LogStep(3, steps_total, "allocate new storage")
3898
    for dev in instance.disks:
3899
      size = dev.size
3900
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3901
      # since we *always* want to create this LV, we use the
3902
      # _Create...OnPrimary (which forces the creation), even if we
3903
      # are talking about the secondary node
3904
      for new_lv in dev.children:
3905
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3906
                                        _GetInstanceInfoText(instance)):
3907
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3908
                                   " node '%s'" %
3909
                                   (new_lv.logical_id[1], new_node))
3910

    
3911
      iv_names[dev.iv_name] = (dev, dev.children)
3912

    
3913
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3914
    for dev in instance.disks:
3915
      size = dev.size
3916
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3917
      # create new devices on new_node
3918
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3919
                              logical_id=(pri_node, new_node,
3920
                                          dev.logical_id[2]),
3921
                              children=dev.children)
3922
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3923
                                        new_drbd, False,
3924
                                      _GetInstanceInfoText(instance)):
3925
        raise errors.OpExecError("Failed to create new DRBD on"
3926
                                 " node '%s'" % new_node)
3927

    
3928
    for dev in instance.disks:
3929
      # we have new devices, shutdown the drbd on the old secondary
3930
      info("shutting down drbd for %s on old node" % dev.iv_name)
3931
      cfg.SetDiskID(dev, old_node)
3932
      if not rpc.call_blockdev_shutdown(old_node, dev):
3933
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3934
                hint="Please cleanup this device manually as soon as possible")
3935

    
3936
    info("detaching primary drbds from the network (=> standalone)")
3937
    done = 0
3938
    for dev in instance.disks:
3939
      cfg.SetDiskID(dev, pri_node)
3940
      # set the physical (unique in bdev terms) id to None, meaning
3941
      # detach from network
3942
      dev.physical_id = (None,) * len(dev.physical_id)
3943
      # and 'find' the device, which will 'fix' it to match the
3944
      # standalone state
3945
      if rpc.call_blockdev_find(pri_node, dev):
3946
        done += 1
3947
      else:
3948
        warning("Failed to detach drbd %s from network, unusual case" %
3949
                dev.iv_name)
3950

    
3951
    if not done:
3952
      # no detaches succeeded (very unlikely)
3953
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3954

    
3955
    # if we managed to detach at least one, we update all the disks of
3956
    # the instance to point to the new secondary
3957
    info("updating instance configuration")
3958
    for dev in instance.disks:
3959
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3960
      cfg.SetDiskID(dev, pri_node)
3961
    cfg.Update(instance)
3962

    
3963
    # and now perform the drbd attach
3964
    info("attaching primary drbds to new secondary (standalone => connected)")
3965
    failures = []
3966
    for dev in instance.disks:
3967
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3968
      # since the attach is smart, it's enough to 'find' the device,
3969
      # it will automatically activate the network, if the physical_id
3970
      # is correct
3971
      cfg.SetDiskID(dev, pri_node)
3972
      if not rpc.call_blockdev_find(pri_node, dev):
3973
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3974
                "please do a gnt-instance info to see the status of disks")
3975

    
3976
    # this can fail as the old devices are degraded and _WaitForSync
3977
    # does a combined result over all disks, so we don't check its
3978
    # return value
3979
    self.proc.LogStep(5, steps_total, "sync devices")
3980
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3981

    
3982
    # so check manually all the devices
3983
    for name, (dev, old_lvs) in iv_names.iteritems():
3984
      cfg.SetDiskID(dev, pri_node)
3985
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3986
      if is_degr:
3987
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3988

    
3989
    self.proc.LogStep(6, steps_total, "removing old storage")
3990
    for name, (dev, old_lvs) in iv_names.iteritems():
3991
      info("remove logical volumes for %s" % name)
3992
      for lv in old_lvs:
3993
        cfg.SetDiskID(lv, old_node)
3994
        if not rpc.call_blockdev_remove(old_node, lv):
3995
          warning("Can't remove LV on old secondary",
3996
                  hint="Cleanup stale volumes by hand")
3997

    
3998
  def Exec(self, feedback_fn):
3999
    """Execute disk replacement.
4000

4001
    This dispatches the disk replacement to the appropriate handler.
4002

4003
    """
4004
    instance = self.instance
4005
    if instance.disk_template == constants.DT_REMOTE_RAID1:
4006
      fn = self._ExecRR1
4007
    elif instance.disk_template == constants.DT_DRBD8:
4008
      if self.op.remote_node is None:
4009
        fn = self._ExecD8DiskOnly
4010
      else:
4011
        fn = self._ExecD8Secondary
4012
    else:
4013
      raise errors.ProgrammerError("Unhandled disk replacement case")
4014
    return fn(feedback_fn)
4015

    
4016

    
4017
class LUQueryInstanceData(NoHooksLU):
4018
  """Query runtime instance data.
4019

4020
  """
4021
  _OP_REQP = ["instances"]
4022

    
4023
  def CheckPrereq(self):
4024
    """Check prerequisites.
4025

4026
    This only checks the optional instance list against the existing names.
4027

4028
    """
4029
    if not isinstance(self.op.instances, list):
4030
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4031
    if self.op.instances:
4032
      self.wanted_instances = []
4033
      names = self.op.instances
4034
      for name in names:
4035
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4036
        if instance is None:
4037
          raise errors.OpPrereqError("No such instance name '%s'" % name)
4038
        self.wanted_instances.append(instance)
4039
    else:
4040
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4041
                               in self.cfg.GetInstanceList()]
4042
    return
4043

    
4044

    
4045
  def _ComputeDiskStatus(self, instance, snode, dev):
4046
    """Compute block device status.
4047

4048
    """
4049
    self.cfg.SetDiskID(dev, instance.primary_node)
4050
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4051
    if dev.dev_type in constants.LDS_DRBD:
4052
      # we change the snode then (otherwise we use the one passed in)
4053
      if dev.logical_id[0] == instance.primary_node:
4054
        snode = dev.logical_id[1]
4055
      else:
4056
        snode = dev.logical_id[0]
4057

    
4058
    if snode:
4059
      self.cfg.SetDiskID(dev, snode)
4060
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4061
    else:
4062
      dev_sstatus = None
4063

    
4064
    if dev.children:
4065
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4066
                      for child in dev.children]
4067
    else:
4068
      dev_children = []
4069

    
4070
    data = {
4071
      "iv_name": dev.iv_name,
4072
      "dev_type": dev.dev_type,
4073
      "logical_id": dev.logical_id,
4074
      "physical_id": dev.physical_id,
4075
      "pstatus": dev_pstatus,
4076
      "sstatus": dev_sstatus,
4077
      "children": dev_children,
4078
      }
4079

    
4080
    return data
4081

    
4082
  def Exec(self, feedback_fn):
4083
    """Gather and return data"""
4084
    result = {}
4085
    for instance in self.wanted_instances:
4086
      remote_info = rpc.call_instance_info(instance.primary_node,
4087
                                                instance.name)
4088
      if remote_info and "state" in remote_info:
4089
        remote_state = "up"
4090
      else:
4091
        remote_state = "down"
4092
      if instance.status == "down":
4093
        config_state = "down"
4094
      else:
4095
        config_state = "up"
4096

    
4097
      disks = [self._ComputeDiskStatus(instance, None, device)
4098
               for device in instance.disks]
4099

    
4100
      idict = {
4101
        "name": instance.name,
4102
        "config_state": config_state,
4103
        "run_state": remote_state,
4104
        "pnode": instance.primary_node,
4105
        "snodes": instance.secondary_nodes,
4106
        "os": instance.os,
4107
        "memory": instance.memory,
4108
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4109
        "disks": disks,
4110
        "network_port": instance.network_port,
4111
        "vcpus": instance.vcpus,
4112
        "kernel_path": instance.kernel_path,
4113
        "initrd_path": instance.initrd_path,
4114
        "hvm_boot_order": instance.hvm_boot_order,
4115
        }
4116

    
4117
      result[instance.name] = idict
4118

    
4119
    return result
4120

    
4121

    
4122
class LUSetInstanceParms(LogicalUnit):
4123
  """Modifies an instances's parameters.
4124

4125
  """
4126
  HPATH = "instance-modify"
4127
  HTYPE = constants.HTYPE_INSTANCE
4128
  _OP_REQP = ["instance_name"]
4129

    
4130
  def BuildHooksEnv(self):
4131
    """Build hooks env.
4132

4133
    This runs on the master, primary and secondaries.
4134

4135
    """
4136
    args = dict()
4137
    if self.mem:
4138
      args['memory'] = self.mem
4139
    if self.vcpus:
4140
      args['vcpus'] = self.vcpus
4141
    if self.do_ip or self.do_bridge or self.mac:
4142
      if self.do_ip:
4143
        ip = self.ip
4144
      else:
4145
        ip = self.instance.nics[0].ip
4146
      if self.bridge:
4147
        bridge = self.bridge
4148
      else:
4149
        bridge = self.instance.nics[0].bridge
4150
      if self.mac:
4151
        mac = self.mac
4152
      else:
4153
        mac = self.instance.nics[0].mac
4154
      args['nics'] = [(ip, bridge, mac)]
4155
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4156
    nl = [self.sstore.GetMasterNode(),
4157
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4158
    return env, nl, nl
4159

    
4160
  def CheckPrereq(self):
4161
    """Check prerequisites.
4162

4163
    This only checks the instance list against the existing names.
4164

4165
    """
4166
    self.mem = getattr(self.op, "mem", None)
4167
    self.vcpus = getattr(self.op, "vcpus", None)
4168
    self.ip = getattr(self.op, "ip", None)
4169
    self.mac = getattr(self.op, "mac", None)
4170
    self.bridge = getattr(self.op, "bridge", None)
4171
    self.kernel_path = getattr(self.op, "kernel_path", None)
4172
    self.initrd_path = getattr(self.op, "initrd_path", None)
4173
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4174
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4175
                 self.kernel_path, self.initrd_path, self.hvm_boot_order]
4176
    if all_parms.count(None) == len(all_parms):
4177
      raise errors.OpPrereqError("No changes submitted")
4178
    if self.mem is not None:
4179
      try:
4180
        self.mem = int(self.mem)
4181
      except ValueError, err:
4182
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4183
    if self.vcpus is not None:
4184
      try:
4185
        self.vcpus = int(self.vcpus)
4186
      except ValueError, err:
4187
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4188
    if self.ip is not None:
4189
      self.do_ip = True
4190
      if self.ip.lower() == "none":
4191
        self.ip = None
4192
      else:
4193
        if not utils.IsValidIP(self.ip):
4194
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4195
    else:
4196
      self.do_ip = False
4197
    self.do_bridge = (self.bridge is not None)
4198
    if self.mac is not None:
4199
      if self.cfg.IsMacInUse(self.mac):
4200
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4201
                                   self.mac)
4202
      if not utils.IsValidMac(self.mac):
4203
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4204

    
4205
    if self.kernel_path is not None:
4206
      self.do_kernel_path = True
4207
      if self.kernel_path == constants.VALUE_NONE:
4208
        raise errors.OpPrereqError("Can't set instance to no kernel")
4209

    
4210
      if self.kernel_path != constants.VALUE_DEFAULT:
4211
        if not os.path.isabs(self.kernel_path):
4212
          raise errors.OpPrereqError("The kernel path must be an absolute"
4213
                                    " filename")
4214
    else:
4215
      self.do_kernel_path = False
4216

    
4217
    if self.initrd_path is not None:
4218
      self.do_initrd_path = True
4219
      if self.initrd_path not in (constants.VALUE_NONE,
4220
                                  constants.VALUE_DEFAULT):
4221
        if not os.path.isabs(self.initrd_path):
4222
          raise errors.OpPrereqError("The initrd path must be an absolute"
4223
                                    " filename")
4224
    else:
4225
      self.do_initrd_path = False
4226

    
4227
    # boot order verification
4228
    if self.hvm_boot_order is not None:
4229
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4230
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4231
          raise errors.OpPrereqError("invalid boot order specified,"
4232
                                     " must be one or more of [acdn]"
4233
                                     " or 'default'")
4234

    
4235
    instance = self.cfg.GetInstanceInfo(
4236
      self.cfg.ExpandInstanceName(self.op.instance_name))
4237
    if instance is None:
4238
      raise errors.OpPrereqError("No such instance name '%s'" %
4239
                                 self.op.instance_name)
4240
    self.op.instance_name = instance.name
4241
    self.instance = instance
4242
    return
4243

    
4244
  def Exec(self, feedback_fn):
4245
    """Modifies an instance.
4246

4247
    All parameters take effect only at the next restart of the instance.
4248
    """
4249
    result = []
4250
    instance = self.instance
4251
    if self.mem:
4252
      instance.memory = self.mem
4253
      result.append(("mem", self.mem))
4254
    if self.vcpus:
4255
      instance.vcpus = self.vcpus
4256
      result.append(("vcpus",  self.vcpus))
4257
    if self.do_ip:
4258
      instance.nics[0].ip = self.ip
4259
      result.append(("ip", self.ip))
4260
    if self.bridge:
4261
      instance.nics[0].bridge = self.bridge
4262
      result.append(("bridge", self.bridge))
4263
    if self.mac:
4264
      instance.nics[0].mac = self.mac
4265
      result.append(("mac", self.mac))
4266
    if self.do_kernel_path:
4267
      instance.kernel_path = self.kernel_path
4268
      result.append(("kernel_path", self.kernel_path))
4269
    if self.do_initrd_path:
4270
      instance.initrd_path = self.initrd_path
4271
      result.append(("initrd_path", self.initrd_path))
4272
    if self.hvm_boot_order:
4273
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4274
        instance.hvm_boot_order = None
4275
      else:
4276
        instance.hvm_boot_order = self.hvm_boot_order
4277
      result.append(("hvm_boot_order", self.hvm_boot_order))
4278

    
4279
    self.cfg.AddInstance(instance)
4280

    
4281
    return result
4282

    
4283

    
4284
class LUQueryExports(NoHooksLU):
4285
  """Query the exports list
4286

4287
  """
4288
  _OP_REQP = []
4289

    
4290
  def CheckPrereq(self):
4291
    """Check that the nodelist contains only existing nodes.
4292

4293
    """
4294
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4295

    
4296
  def Exec(self, feedback_fn):
4297
    """Compute the list of all the exported system images.
4298

4299
    Returns:
4300
      a dictionary with the structure node->(export-list)
4301
      where export-list is a list of the instances exported on
4302
      that node.
4303

4304
    """
4305
    return rpc.call_export_list(self.nodes)
4306

    
4307

    
4308
class LUExportInstance(LogicalUnit):
4309
  """Export an instance to an image in the cluster.
4310

4311
  """
4312
  HPATH = "instance-export"
4313
  HTYPE = constants.HTYPE_INSTANCE
4314
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4315

    
4316
  def BuildHooksEnv(self):
4317
    """Build hooks env.
4318

4319
    This will run on the master, primary node and target node.
4320

4321
    """
4322
    env = {
4323
      "EXPORT_NODE": self.op.target_node,
4324
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4325
      }
4326
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4327
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4328
          self.op.target_node]
4329
    return env, nl, nl
4330

    
4331
  def CheckPrereq(self):
4332
    """Check prerequisites.
4333

4334
    This checks that the instance name is a valid one.
4335

4336
    """
4337
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4338
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4339
    if self.instance is None:
4340
      raise errors.OpPrereqError("Instance '%s' not found" %
4341
                                 self.op.instance_name)
4342

    
4343
    # node verification
4344
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4345
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4346

    
4347
    if self.dst_node is None:
4348
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4349
                                 self.op.target_node)
4350
    self.op.target_node = self.dst_node.name
4351

    
4352
  def Exec(self, feedback_fn):
4353
    """Export an instance to an image in the cluster.
4354

4355
    """
4356
    instance = self.instance
4357
    dst_node = self.dst_node
4358
    src_node = instance.primary_node
4359
    # shutdown the instance, unless requested not to do so
4360
    if self.op.shutdown:
4361
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
4362
      self.proc.ChainOpCode(op)
4363

    
4364
    vgname = self.cfg.GetVGName()
4365

    
4366
    snap_disks = []
4367

    
4368
    try:
4369
      for disk in instance.disks:
4370
        if disk.iv_name == "sda":
4371
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4372
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4373

    
4374
          if not new_dev_name:
4375
            logger.Error("could not snapshot block device %s on node %s" %
4376
                         (disk.logical_id[1], src_node))
4377
          else:
4378
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4379
                                      logical_id=(vgname, new_dev_name),
4380
                                      physical_id=(vgname, new_dev_name),
4381
                                      iv_name=disk.iv_name)
4382
            snap_disks.append(new_dev)
4383

    
4384
    finally:
4385
      if self.op.shutdown:
4386
        op = opcodes.OpStartupInstance(instance_name=instance.name,
4387
                                       force=False)
4388
        self.proc.ChainOpCode(op)
4389

    
4390
    # TODO: check for size
4391

    
4392
    for dev in snap_disks:
4393
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4394
                                           instance):
4395
        logger.Error("could not export block device %s from node"
4396
                     " %s to node %s" %
4397
                     (dev.logical_id[1], src_node, dst_node.name))
4398
      if not rpc.call_blockdev_remove(src_node, dev):
4399
        logger.Error("could not remove snapshot block device %s from"
4400
                     " node %s" % (dev.logical_id[1], src_node))
4401

    
4402
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4403
      logger.Error("could not finalize export for instance %s on node %s" %
4404
                   (instance.name, dst_node.name))
4405

    
4406
    nodelist = self.cfg.GetNodeList()
4407
    nodelist.remove(dst_node.name)
4408

    
4409
    # on one-node clusters nodelist will be empty after the removal
4410
    # if we proceed the backup would be removed because OpQueryExports
4411
    # substitutes an empty list with the full cluster node list.
4412
    if nodelist:
4413
      op = opcodes.OpQueryExports(nodes=nodelist)
4414
      exportlist = self.proc.ChainOpCode(op)
4415
      for node in exportlist:
4416
        if instance.name in exportlist[node]:
4417
          if not rpc.call_export_remove(node, instance.name):
4418
            logger.Error("could not remove older export for instance %s"
4419
                         " on node %s" % (instance.name, node))
4420

    
4421

    
4422
class TagsLU(NoHooksLU):
4423
  """Generic tags LU.
4424

4425
  This is an abstract class which is the parent of all the other tags LUs.
4426

4427
  """
4428
  def CheckPrereq(self):
4429
    """Check prerequisites.
4430

4431
    """
4432
    if self.op.kind == constants.TAG_CLUSTER:
4433
      self.target = self.cfg.GetClusterInfo()
4434
    elif self.op.kind == constants.TAG_NODE:
4435
      name = self.cfg.ExpandNodeName(self.op.name)
4436
      if name is None:
4437
        raise errors.OpPrereqError("Invalid node name (%s)" %
4438
                                   (self.op.name,))
4439
      self.op.name = name
4440
      self.target = self.cfg.GetNodeInfo(name)
4441
    elif self.op.kind == constants.TAG_INSTANCE:
4442
      name = self.cfg.ExpandInstanceName(self.op.name)
4443
      if name is None:
4444
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4445
                                   (self.op.name,))
4446
      self.op.name = name
4447
      self.target = self.cfg.GetInstanceInfo(name)
4448
    else:
4449
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4450
                                 str(self.op.kind))
4451

    
4452

    
4453
class LUGetTags(TagsLU):
4454
  """Returns the tags of a given object.
4455

4456
  """
4457
  _OP_REQP = ["kind", "name"]
4458

    
4459
  def Exec(self, feedback_fn):
4460
    """Returns the tag list.
4461

4462
    """
4463
    return self.target.GetTags()
4464

    
4465

    
4466
class LUSearchTags(NoHooksLU):
4467
  """Searches the tags for a given pattern.
4468

4469
  """
4470
  _OP_REQP = ["pattern"]
4471

    
4472
  def CheckPrereq(self):
4473
    """Check prerequisites.
4474

4475
    This checks the pattern passed for validity by compiling it.
4476

4477
    """
4478
    try:
4479
      self.re = re.compile(self.op.pattern)
4480
    except re.error, err:
4481
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4482
                                 (self.op.pattern, err))
4483

    
4484
  def Exec(self, feedback_fn):
4485
    """Returns the tag list.
4486

4487
    """
4488
    cfg = self.cfg
4489
    tgts = [("/cluster", cfg.GetClusterInfo())]
4490
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4491
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4492
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4493
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4494
    results = []
4495
    for path, target in tgts:
4496
      for tag in target.GetTags():
4497
        if self.re.search(tag):
4498
          results.append((path, tag))
4499
    return results
4500

    
4501

    
4502
class LUAddTags(TagsLU):
4503
  """Sets a tag on a given object.
4504

4505
  """
4506
  _OP_REQP = ["kind", "name", "tags"]
4507

    
4508
  def CheckPrereq(self):
4509
    """Check prerequisites.
4510

4511
    This checks the type and length of the tag name and value.
4512

4513
    """
4514
    TagsLU.CheckPrereq(self)
4515
    for tag in self.op.tags:
4516
      objects.TaggableObject.ValidateTag(tag)
4517

    
4518
  def Exec(self, feedback_fn):
4519
    """Sets the tag.
4520

4521
    """
4522
    try:
4523
      for tag in self.op.tags:
4524
        self.target.AddTag(tag)
4525
    except errors.TagError, err:
4526
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4527
    try:
4528
      self.cfg.Update(self.target)
4529
    except errors.ConfigurationError:
4530
      raise errors.OpRetryError("There has been a modification to the"
4531
                                " config file and the operation has been"
4532
                                " aborted. Please retry.")
4533

    
4534

    
4535
class LUDelTags(TagsLU):
4536
  """Delete a list of tags from a given object.
4537

4538
  """
4539
  _OP_REQP = ["kind", "name", "tags"]
4540

    
4541
  def CheckPrereq(self):
4542
    """Check prerequisites.
4543

4544
    This checks that we have the given tag.
4545

4546
    """
4547
    TagsLU.CheckPrereq(self)
4548
    for tag in self.op.tags:
4549
      objects.TaggableObject.ValidateTag(tag)
4550
    del_tags = frozenset(self.op.tags)
4551
    cur_tags = self.target.GetTags()
4552
    if not del_tags <= cur_tags:
4553
      diff_tags = del_tags - cur_tags
4554
      diff_names = ["'%s'" % tag for tag in diff_tags]
4555
      diff_names.sort()
4556
      raise errors.OpPrereqError("Tag(s) %s not found" %
4557
                                 (",".join(diff_names)))
4558

    
4559
  def Exec(self, feedback_fn):
4560
    """Remove the tag from the object.
4561

4562
    """
4563
    for tag in self.op.tags:
4564
      self.target.RemoveTag(tag)
4565
    try:
4566
      self.cfg.Update(self.target)
4567
    except errors.ConfigurationError:
4568
      raise errors.OpRetryError("There has been a modification to the"
4569
                                " config file and the operation has been"
4570
                                " aborted. Please retry.")
4571

    
4572
class LUTestDelay(NoHooksLU):
4573
  """Sleep for a specified amount of time.
4574

4575
  This LU sleeps on the master and/or nodes for a specified amoutn of
4576
  time.
4577

4578
  """
4579
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4580

    
4581
  def CheckPrereq(self):
4582
    """Check prerequisites.
4583

4584
    This checks that we have a good list of nodes and/or the duration
4585
    is valid.
4586

4587
    """
4588

    
4589
    if self.op.on_nodes:
4590
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4591

    
4592
  def Exec(self, feedback_fn):
4593
    """Do the actual sleep.
4594

4595
    """
4596
    if self.op.on_master:
4597
      if not utils.TestDelay(self.op.duration):
4598
        raise errors.OpExecError("Error during master delay test")
4599
    if self.op.on_nodes:
4600
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4601
      if not result:
4602
        raise errors.OpExecError("Complete failure from rpc call")
4603
      for node, node_result in result.items():
4604
        if not node_result:
4605
          raise errors.OpExecError("Failure during rpc call to node %s,"
4606
                                   " result: %s" % (node, node_result))