Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ b15d625f

History | View | Annotate | Download (153.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement CheckPrereq which also fills in the opcode instance
52
      with all the fields (even if as None)
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements (REQ_CLUSTER,
57
      REQ_MASTER); note that all commands require root permissions
58

59
  """
60
  HPATH = None
61
  HTYPE = None
62
  _OP_REQP = []
63
  REQ_CLUSTER = True
64
  REQ_MASTER = True
65

    
66
  def __init__(self, processor, op, cfg, sstore):
67
    """Constructor for LogicalUnit.
68

69
    This needs to be overriden in derived classes in order to check op
70
    validity.
71

72
    """
73
    self.proc = processor
74
    self.op = op
75
    self.cfg = cfg
76
    self.sstore = sstore
77
    for attr_name in self._OP_REQP:
78
      attr_val = getattr(op, attr_name, None)
79
      if attr_val is None:
80
        raise errors.OpPrereqError("Required parameter '%s' missing" %
81
                                   attr_name)
82
    if self.REQ_CLUSTER:
83
      if not cfg.IsCluster():
84
        raise errors.OpPrereqError("Cluster not initialized yet,"
85
                                   " use 'gnt-cluster init' first.")
86
      if self.REQ_MASTER:
87
        master = sstore.GetMasterNode()
88
        if master != utils.HostInfo().name:
89
          raise errors.OpPrereqError("Commands must be run on the master"
90
                                     " node %s" % master)
91

    
92
  def CheckPrereq(self):
93
    """Check prerequisites for this LU.
94

95
    This method should check that the prerequisites for the execution
96
    of this LU are fulfilled. It can do internode communication, but
97
    it should be idempotent - no cluster or system changes are
98
    allowed.
99

100
    The method should raise errors.OpPrereqError in case something is
101
    not fulfilled. Its return value is ignored.
102

103
    This method should also update all the parameters of the opcode to
104
    their canonical form; e.g. a short node name must be fully
105
    expanded after this method has successfully completed (so that
106
    hooks, logging, etc. work correctly).
107

108
    """
109
    raise NotImplementedError
110

    
111
  def Exec(self, feedback_fn):
112
    """Execute the LU.
113

114
    This method should implement the actual work. It should raise
115
    errors.OpExecError for failures that are somewhat dealt with in
116
    code, or expected.
117

118
    """
119
    raise NotImplementedError
120

    
121
  def BuildHooksEnv(self):
122
    """Build hooks environment for this LU.
123

124
    This method should return a three-node tuple consisting of: a dict
125
    containing the environment that will be used for running the
126
    specific hook for this LU, a list of node names on which the hook
127
    should run before the execution, and a list of node names on which
128
    the hook should run after the execution.
129

130
    The keys of the dict must not have 'GANETI_' prefixed as this will
131
    be handled in the hooks runner. Also note additional keys will be
132
    added by the hooks runner. If the LU doesn't define any
133
    environment, an empty dict (and not None) should be returned.
134

135
    As for the node lists, the master should not be included in the
136
    them, as it will be added by the hooks runner in case this LU
137
    requires a cluster to run on (otherwise we don't have a node
138
    list). No nodes should be returned as an empty list (and not
139
    None).
140

141
    Note that if the HPATH for a LU class is None, this function will
142
    not be called.
143

144
    """
145
    raise NotImplementedError
146

    
147

    
148
class NoHooksLU(LogicalUnit):
149
  """Simple LU which runs no hooks.
150

151
  This LU is intended as a parent for other LogicalUnits which will
152
  run no hooks, in order to reduce duplicate code.
153

154
  """
155
  HPATH = None
156
  HTYPE = None
157

    
158
  def BuildHooksEnv(self):
159
    """Build hooks env.
160

161
    This is a no-op, since we don't run hooks.
162

163
    """
164
    return {}, [], []
165

    
166

    
167
def _AddHostToEtcHosts(hostname):
168
  """Wrapper around utils.SetEtcHostsEntry.
169

170
  """
171
  hi = utils.HostInfo(name=hostname)
172
  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
173

    
174

    
175
def _RemoveHostFromEtcHosts(hostname):
176
  """Wrapper around utils.RemoveEtcHostsEntry.
177

178
  """
179
  hi = utils.HostInfo(name=hostname)
180
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
181
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
182

    
183

    
184
def _GetWantedNodes(lu, nodes):
185
  """Returns list of checked and expanded node names.
186

187
  Args:
188
    nodes: List of nodes (strings) or None for all
189

190
  """
191
  if not isinstance(nodes, list):
192
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
193

    
194
  if nodes:
195
    wanted = []
196

    
197
    for name in nodes:
198
      node = lu.cfg.ExpandNodeName(name)
199
      if node is None:
200
        raise errors.OpPrereqError("No such node name '%s'" % name)
201
      wanted.append(node)
202

    
203
  else:
204
    wanted = lu.cfg.GetNodeList()
205
  return utils.NiceSort(wanted)
206

    
207

    
208
def _GetWantedInstances(lu, instances):
209
  """Returns list of checked and expanded instance names.
210

211
  Args:
212
    instances: List of instances (strings) or None for all
213

214
  """
215
  if not isinstance(instances, list):
216
    raise errors.OpPrereqError("Invalid argument type 'instances'")
217

    
218
  if instances:
219
    wanted = []
220

    
221
    for name in instances:
222
      instance = lu.cfg.ExpandInstanceName(name)
223
      if instance is None:
224
        raise errors.OpPrereqError("No such instance name '%s'" % name)
225
      wanted.append(instance)
226

    
227
  else:
228
    wanted = lu.cfg.GetInstanceList()
229
  return utils.NiceSort(wanted)
230

    
231

    
232
def _CheckOutputFields(static, dynamic, selected):
233
  """Checks whether all selected fields are valid.
234

235
  Args:
236
    static: Static fields
237
    dynamic: Dynamic fields
238

239
  """
240
  static_fields = frozenset(static)
241
  dynamic_fields = frozenset(dynamic)
242

    
243
  all_fields = static_fields | dynamic_fields
244

    
245
  if not all_fields.issuperset(selected):
246
    raise errors.OpPrereqError("Unknown output fields selected: %s"
247
                               % ",".join(frozenset(selected).
248
                                          difference(all_fields)))
249

    
250

    
251
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
252
                          memory, vcpus, nics):
253
  """Builds instance related env variables for hooks from single variables.
254

255
  Args:
256
    secondary_nodes: List of secondary nodes as strings
257
  """
258
  env = {
259
    "OP_TARGET": name,
260
    "INSTANCE_NAME": name,
261
    "INSTANCE_PRIMARY": primary_node,
262
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
263
    "INSTANCE_OS_TYPE": os_type,
264
    "INSTANCE_STATUS": status,
265
    "INSTANCE_MEMORY": memory,
266
    "INSTANCE_VCPUS": vcpus,
267
  }
268

    
269
  if nics:
270
    nic_count = len(nics)
271
    for idx, (ip, bridge, mac) in enumerate(nics):
272
      if ip is None:
273
        ip = ""
274
      env["INSTANCE_NIC%d_IP" % idx] = ip
275
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
276
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
277
  else:
278
    nic_count = 0
279

    
280
  env["INSTANCE_NIC_COUNT"] = nic_count
281

    
282
  return env
283

    
284

    
285
def _BuildInstanceHookEnvByObject(instance, override=None):
286
  """Builds instance related env variables for hooks from an object.
287

288
  Args:
289
    instance: objects.Instance object of instance
290
    override: dict of values to override
291
  """
292
  args = {
293
    'name': instance.name,
294
    'primary_node': instance.primary_node,
295
    'secondary_nodes': instance.secondary_nodes,
296
    'os_type': instance.os,
297
    'status': instance.os,
298
    'memory': instance.memory,
299
    'vcpus': instance.vcpus,
300
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
301
  }
302
  if override:
303
    args.update(override)
304
  return _BuildInstanceHookEnv(**args)
305

    
306

    
307
def _UpdateKnownHosts(fullnode, ip, pubkey):
308
  """Ensure a node has a correct known_hosts entry.
309

310
  Args:
311
    fullnode - Fully qualified domain name of host. (str)
312
    ip       - IPv4 address of host (str)
313
    pubkey   - the public key of the cluster
314

315
  """
316
  if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
317
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
318
  else:
319
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
320

    
321
  inthere = False
322

    
323
  save_lines = []
324
  add_lines = []
325
  removed = False
326

    
327
  for rawline in f:
328
    logger.Debug('read %s' % (repr(rawline),))
329

    
330
    parts = rawline.rstrip('\r\n').split()
331

    
332
    # Ignore unwanted lines
333
    if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
334
      fields = parts[0].split(',')
335
      key = parts[2]
336

    
337
      haveall = True
338
      havesome = False
339
      for spec in [ ip, fullnode ]:
340
        if spec not in fields:
341
          haveall = False
342
        if spec in fields:
343
          havesome = True
344

    
345
      logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
346
      if haveall and key == pubkey:
347
        inthere = True
348
        save_lines.append(rawline)
349
        logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
350
        continue
351

    
352
      if havesome and (not haveall or key != pubkey):
353
        removed = True
354
        logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
355
        continue
356

    
357
    save_lines.append(rawline)
358

    
359
  if not inthere:
360
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
361
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
362

    
363
  if removed:
364
    save_lines = save_lines + add_lines
365

    
366
    # Write a new file and replace old.
367
    fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
368
                                   constants.DATA_DIR)
369
    newfile = os.fdopen(fd, 'w')
370
    try:
371
      newfile.write(''.join(save_lines))
372
    finally:
373
      newfile.close()
374
    logger.Debug("Wrote new known_hosts.")
375
    os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
376

    
377
  elif add_lines:
378
    # Simply appending a new line will do the trick.
379
    f.seek(0, 2)
380
    for add in add_lines:
381
      f.write(add)
382

    
383
  f.close()
384

    
385

    
386
def _HasValidVG(vglist, vgname):
387
  """Checks if the volume group list is valid.
388

389
  A non-None return value means there's an error, and the return value
390
  is the error message.
391

392
  """
393
  vgsize = vglist.get(vgname, None)
394
  if vgsize is None:
395
    return "volume group '%s' missing" % vgname
396
  elif vgsize < 20480:
397
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
398
            (vgname, vgsize))
399
  return None
400

    
401

    
402
def _InitSSHSetup(node):
403
  """Setup the SSH configuration for the cluster.
404

405

406
  This generates a dsa keypair for root, adds the pub key to the
407
  permitted hosts and adds the hostkey to its own known hosts.
408

409
  Args:
410
    node: the name of this host as a fqdn
411

412
  """
413
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
414

    
415
  for name in priv_key, pub_key:
416
    if os.path.exists(name):
417
      utils.CreateBackup(name)
418
    utils.RemoveFile(name)
419

    
420
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
421
                         "-f", priv_key,
422
                         "-q", "-N", ""])
423
  if result.failed:
424
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
425
                             result.output)
426

    
427
  f = open(pub_key, 'r')
428
  try:
429
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
430
  finally:
431
    f.close()
432

    
433

    
434
def _InitGanetiServerSetup(ss):
435
  """Setup the necessary configuration for the initial node daemon.
436

437
  This creates the nodepass file containing the shared password for
438
  the cluster and also generates the SSL certificate.
439

440
  """
441
  # Create pseudo random password
442
  randpass = sha.new(os.urandom(64)).hexdigest()
443
  # and write it into sstore
444
  ss.SetKey(ss.SS_NODED_PASS, randpass)
445

    
446
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
447
                         "-days", str(365*5), "-nodes", "-x509",
448
                         "-keyout", constants.SSL_CERT_FILE,
449
                         "-out", constants.SSL_CERT_FILE, "-batch"])
450
  if result.failed:
451
    raise errors.OpExecError("could not generate server ssl cert, command"
452
                             " %s had exitcode %s and error message %s" %
453
                             (result.cmd, result.exit_code, result.output))
454

    
455
  os.chmod(constants.SSL_CERT_FILE, 0400)
456

    
457
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
458

    
459
  if result.failed:
460
    raise errors.OpExecError("Could not start the node daemon, command %s"
461
                             " had exitcode %s and error %s" %
462
                             (result.cmd, result.exit_code, result.output))
463

    
464

    
465
def _CheckInstanceBridgesExist(instance):
466
  """Check that the brigdes needed by an instance exist.
467

468
  """
469
  # check bridges existance
470
  brlist = [nic.bridge for nic in instance.nics]
471
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
472
    raise errors.OpPrereqError("one or more target bridges %s does not"
473
                               " exist on destination node '%s'" %
474
                               (brlist, instance.primary_node))
475

    
476

    
477
class LUInitCluster(LogicalUnit):
478
  """Initialise the cluster.
479

480
  """
481
  HPATH = "cluster-init"
482
  HTYPE = constants.HTYPE_CLUSTER
483
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
484
              "def_bridge", "master_netdev"]
485
  REQ_CLUSTER = False
486

    
487
  def BuildHooksEnv(self):
488
    """Build hooks env.
489

490
    Notes: Since we don't require a cluster, we must manually add
491
    ourselves in the post-run node list.
492

493
    """
494
    env = {"OP_TARGET": self.op.cluster_name}
495
    return env, [], [self.hostname.name]
496

    
497
  def CheckPrereq(self):
498
    """Verify that the passed name is a valid one.
499

500
    """
501
    if config.ConfigWriter.IsCluster():
502
      raise errors.OpPrereqError("Cluster is already initialised")
503

    
504
    if self.op.hypervisor_type == constants.HT_XEN_HVM31:
505
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
506
        raise errors.OpPrereqError("Please prepare the cluster VNC"
507
                                   "password file %s" %
508
                                   constants.VNC_PASSWORD_FILE)
509

    
510
    self.hostname = hostname = utils.HostInfo()
511

    
512
    if hostname.ip.startswith("127."):
513
      raise errors.OpPrereqError("This host's IP resolves to the private"
514
                                 " range (%s). Please fix DNS or %s." %
515
                                 (hostname.ip, constants.ETC_HOSTS))
516

    
517
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
518

    
519
    if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
520
                         source=constants.LOCALHOST_IP_ADDRESS):
521
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
522
                                 " to %s,\nbut this ip address does not"
523
                                 " belong to this host."
524
                                 " Aborting." % hostname.ip)
525

    
526
    secondary_ip = getattr(self.op, "secondary_ip", None)
527
    if secondary_ip and not utils.IsValidIP(secondary_ip):
528
      raise errors.OpPrereqError("Invalid secondary ip given")
529
    if (secondary_ip and
530
        secondary_ip != hostname.ip and
531
        (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
532
                           source=constants.LOCALHOST_IP_ADDRESS))):
533
      raise errors.OpPrereqError("You gave %s as secondary IP,"
534
                                 " but it does not belong to this host." %
535
                                 secondary_ip)
536
    self.secondary_ip = secondary_ip
537

    
538
    # checks presence of the volume group given
539
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
540

    
541
    if vgstatus:
542
      raise errors.OpPrereqError("Error: %s" % vgstatus)
543

    
544
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
545
                    self.op.mac_prefix):
546
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
547
                                 self.op.mac_prefix)
548

    
549
    if self.op.hypervisor_type not in constants.HYPER_TYPES:
550
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
551
                                 self.op.hypervisor_type)
552

    
553
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
554
    if result.failed:
555
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
556
                                 (self.op.master_netdev,
557
                                  result.output.strip()))
558

    
559
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
560
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
561
      raise errors.OpPrereqError("Init.d script '%s' missing or not"
562
                                 " executable." % constants.NODE_INITD_SCRIPT)
563

    
564
  def Exec(self, feedback_fn):
565
    """Initialize the cluster.
566

567
    """
568
    clustername = self.clustername
569
    hostname = self.hostname
570

    
571
    # set up the simple store
572
    self.sstore = ss = ssconf.SimpleStore()
573
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
574
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
575
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
576
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
577
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
578

    
579
    # set up the inter-node password and certificate
580
    _InitGanetiServerSetup(ss)
581

    
582
    # start the master ip
583
    rpc.call_node_start_master(hostname.name)
584

    
585
    # set up ssh config and /etc/hosts
586
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
587
    try:
588
      sshline = f.read()
589
    finally:
590
      f.close()
591
    sshkey = sshline.split(" ")[1]
592

    
593
    _AddHostToEtcHosts(hostname.name)
594

    
595
    _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
596

    
597
    _InitSSHSetup(hostname.name)
598

    
599
    # init of cluster config file
600
    self.cfg = cfgw = config.ConfigWriter()
601
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
602
                    sshkey, self.op.mac_prefix,
603
                    self.op.vg_name, self.op.def_bridge)
604

    
605

    
606
class LUDestroyCluster(NoHooksLU):
607
  """Logical unit for destroying the cluster.
608

609
  """
610
  _OP_REQP = []
611

    
612
  def CheckPrereq(self):
613
    """Check prerequisites.
614

615
    This checks whether the cluster is empty.
616

617
    Any errors are signalled by raising errors.OpPrereqError.
618

619
    """
620
    master = self.sstore.GetMasterNode()
621

    
622
    nodelist = self.cfg.GetNodeList()
623
    if len(nodelist) != 1 or nodelist[0] != master:
624
      raise errors.OpPrereqError("There are still %d node(s) in"
625
                                 " this cluster." % (len(nodelist) - 1))
626
    instancelist = self.cfg.GetInstanceList()
627
    if instancelist:
628
      raise errors.OpPrereqError("There are still %d instance(s) in"
629
                                 " this cluster." % len(instancelist))
630

    
631
  def Exec(self, feedback_fn):
632
    """Destroys the cluster.
633

634
    """
635
    master = self.sstore.GetMasterNode()
636
    if not rpc.call_node_stop_master(master):
637
      raise errors.OpExecError("Could not disable the master role")
638
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
639
    utils.CreateBackup(priv_key)
640
    utils.CreateBackup(pub_key)
641
    rpc.call_node_leave_cluster(master)
642

    
643

    
644
class LUVerifyCluster(NoHooksLU):
645
  """Verifies the cluster status.
646

647
  """
648
  _OP_REQP = []
649

    
650
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
651
                  remote_version, feedback_fn):
652
    """Run multiple tests against a node.
653

654
    Test list:
655
      - compares ganeti version
656
      - checks vg existance and size > 20G
657
      - checks config file checksum
658
      - checks ssh to other nodes
659

660
    Args:
661
      node: name of the node to check
662
      file_list: required list of files
663
      local_cksum: dictionary of local files and their checksums
664

665
    """
666
    # compares ganeti version
667
    local_version = constants.PROTOCOL_VERSION
668
    if not remote_version:
669
      feedback_fn(" - ERROR: connection to %s failed" % (node))
670
      return True
671

    
672
    if local_version != remote_version:
673
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
674
                      (local_version, node, remote_version))
675
      return True
676

    
677
    # checks vg existance and size > 20G
678

    
679
    bad = False
680
    if not vglist:
681
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
682
                      (node,))
683
      bad = True
684
    else:
685
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
686
      if vgstatus:
687
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
688
        bad = True
689

    
690
    # checks config file checksum
691
    # checks ssh to any
692

    
693
    if 'filelist' not in node_result:
694
      bad = True
695
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
696
    else:
697
      remote_cksum = node_result['filelist']
698
      for file_name in file_list:
699
        if file_name not in remote_cksum:
700
          bad = True
701
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
702
        elif remote_cksum[file_name] != local_cksum[file_name]:
703
          bad = True
704
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
705

    
706
    if 'nodelist' not in node_result:
707
      bad = True
708
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
709
    else:
710
      if node_result['nodelist']:
711
        bad = True
712
        for node in node_result['nodelist']:
713
          feedback_fn("  - ERROR: communication with node '%s': %s" %
714
                          (node, node_result['nodelist'][node]))
715
    hyp_result = node_result.get('hypervisor', None)
716
    if hyp_result is not None:
717
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
718
    return bad
719

    
720
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
721
    """Verify an instance.
722

723
    This function checks to see if the required block devices are
724
    available on the instance's node.
725

726
    """
727
    bad = False
728

    
729
    instancelist = self.cfg.GetInstanceList()
730
    if not instance in instancelist:
731
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
732
                      (instance, instancelist))
733
      bad = True
734

    
735
    instanceconfig = self.cfg.GetInstanceInfo(instance)
736
    node_current = instanceconfig.primary_node
737

    
738
    node_vol_should = {}
739
    instanceconfig.MapLVsByNode(node_vol_should)
740

    
741
    for node in node_vol_should:
742
      for volume in node_vol_should[node]:
743
        if node not in node_vol_is or volume not in node_vol_is[node]:
744
          feedback_fn("  - ERROR: volume %s missing on node %s" %
745
                          (volume, node))
746
          bad = True
747

    
748
    if not instanceconfig.status == 'down':
749
      if not instance in node_instance[node_current]:
750
        feedback_fn("  - ERROR: instance %s not running on node %s" %
751
                        (instance, node_current))
752
        bad = True
753

    
754
    for node in node_instance:
755
      if (not node == node_current):
756
        if instance in node_instance[node]:
757
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
758
                          (instance, node))
759
          bad = True
760

    
761
    return bad
762

    
763
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
764
    """Verify if there are any unknown volumes in the cluster.
765

766
    The .os, .swap and backup volumes are ignored. All other volumes are
767
    reported as unknown.
768

769
    """
770
    bad = False
771

    
772
    for node in node_vol_is:
773
      for volume in node_vol_is[node]:
774
        if node not in node_vol_should or volume not in node_vol_should[node]:
775
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
776
                      (volume, node))
777
          bad = True
778
    return bad
779

    
780
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
781
    """Verify the list of running instances.
782

783
    This checks what instances are running but unknown to the cluster.
784

785
    """
786
    bad = False
787
    for node in node_instance:
788
      for runninginstance in node_instance[node]:
789
        if runninginstance not in instancelist:
790
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
791
                          (runninginstance, node))
792
          bad = True
793
    return bad
794

    
795
  def CheckPrereq(self):
796
    """Check prerequisites.
797

798
    This has no prerequisites.
799

800
    """
801
    pass
802

    
803
  def Exec(self, feedback_fn):
804
    """Verify integrity of cluster, performing various test on nodes.
805

806
    """
807
    bad = False
808
    feedback_fn("* Verifying global settings")
809
    for msg in self.cfg.VerifyConfig():
810
      feedback_fn("  - ERROR: %s" % msg)
811

    
812
    vg_name = self.cfg.GetVGName()
813
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
814
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
815
    node_volume = {}
816
    node_instance = {}
817

    
818
    # FIXME: verify OS list
819
    # do local checksums
820
    file_names = list(self.sstore.GetFileList())
821
    file_names.append(constants.SSL_CERT_FILE)
822
    file_names.append(constants.CLUSTER_CONF_FILE)
823
    local_checksums = utils.FingerprintFiles(file_names)
824

    
825
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
826
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
827
    all_instanceinfo = rpc.call_instance_list(nodelist)
828
    all_vglist = rpc.call_vg_list(nodelist)
829
    node_verify_param = {
830
      'filelist': file_names,
831
      'nodelist': nodelist,
832
      'hypervisor': None,
833
      }
834
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
835
    all_rversion = rpc.call_version(nodelist)
836

    
837
    for node in nodelist:
838
      feedback_fn("* Verifying node %s" % node)
839
      result = self._VerifyNode(node, file_names, local_checksums,
840
                                all_vglist[node], all_nvinfo[node],
841
                                all_rversion[node], feedback_fn)
842
      bad = bad or result
843

    
844
      # node_volume
845
      volumeinfo = all_volumeinfo[node]
846

    
847
      if isinstance(volumeinfo, basestring):
848
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
849
                    (node, volumeinfo[-400:].encode('string_escape')))
850
        bad = True
851
        node_volume[node] = {}
852
      elif not isinstance(volumeinfo, dict):
853
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
854
        bad = True
855
        continue
856
      else:
857
        node_volume[node] = volumeinfo
858

    
859
      # node_instance
860
      nodeinstance = all_instanceinfo[node]
861
      if type(nodeinstance) != list:
862
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
863
        bad = True
864
        continue
865

    
866
      node_instance[node] = nodeinstance
867

    
868
    node_vol_should = {}
869

    
870
    for instance in instancelist:
871
      feedback_fn("* Verifying instance %s" % instance)
872
      result =  self._VerifyInstance(instance, node_volume, node_instance,
873
                                     feedback_fn)
874
      bad = bad or result
875

    
876
      inst_config = self.cfg.GetInstanceInfo(instance)
877

    
878
      inst_config.MapLVsByNode(node_vol_should)
879

    
880
    feedback_fn("* Verifying orphan volumes")
881
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
882
                                       feedback_fn)
883
    bad = bad or result
884

    
885
    feedback_fn("* Verifying remaining instances")
886
    result = self._VerifyOrphanInstances(instancelist, node_instance,
887
                                         feedback_fn)
888
    bad = bad or result
889

    
890
    return int(bad)
891

    
892

    
893
class LUVerifyDisks(NoHooksLU):
894
  """Verifies the cluster disks status.
895

896
  """
897
  _OP_REQP = []
898

    
899
  def CheckPrereq(self):
900
    """Check prerequisites.
901

902
    This has no prerequisites.
903

904
    """
905
    pass
906

    
907
  def Exec(self, feedback_fn):
908
    """Verify integrity of cluster disks.
909

910
    """
911
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
912

    
913
    vg_name = self.cfg.GetVGName()
914
    nodes = utils.NiceSort(self.cfg.GetNodeList())
915
    instances = [self.cfg.GetInstanceInfo(name)
916
                 for name in self.cfg.GetInstanceList()]
917

    
918
    nv_dict = {}
919
    for inst in instances:
920
      inst_lvs = {}
921
      if (inst.status != "up" or
922
          inst.disk_template not in constants.DTS_NET_MIRROR):
923
        continue
924
      inst.MapLVsByNode(inst_lvs)
925
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
926
      for node, vol_list in inst_lvs.iteritems():
927
        for vol in vol_list:
928
          nv_dict[(node, vol)] = inst
929

    
930
    if not nv_dict:
931
      return result
932

    
933
    node_lvs = rpc.call_volume_list(nodes, vg_name)
934

    
935
    to_act = set()
936
    for node in nodes:
937
      # node_volume
938
      lvs = node_lvs[node]
939

    
940
      if isinstance(lvs, basestring):
941
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
942
        res_nlvm[node] = lvs
943
      elif not isinstance(lvs, dict):
944
        logger.Info("connection to node %s failed or invalid data returned" %
945
                    (node,))
946
        res_nodes.append(node)
947
        continue
948

    
949
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
950
        inst = nv_dict.pop((node, lv_name), None)
951
        if (not lv_online and inst is not None
952
            and inst.name not in res_instances):
953
          res_instances.append(inst.name)
954

    
955
    # any leftover items in nv_dict are missing LVs, let's arrange the
956
    # data better
957
    for key, inst in nv_dict.iteritems():
958
      if inst.name not in res_missing:
959
        res_missing[inst.name] = []
960
      res_missing[inst.name].append(key)
961

    
962
    return result
963

    
964

    
965
class LURenameCluster(LogicalUnit):
966
  """Rename the cluster.
967

968
  """
969
  HPATH = "cluster-rename"
970
  HTYPE = constants.HTYPE_CLUSTER
971
  _OP_REQP = ["name"]
972

    
973
  def BuildHooksEnv(self):
974
    """Build hooks env.
975

976
    """
977
    env = {
978
      "OP_TARGET": self.sstore.GetClusterName(),
979
      "NEW_NAME": self.op.name,
980
      }
981
    mn = self.sstore.GetMasterNode()
982
    return env, [mn], [mn]
983

    
984
  def CheckPrereq(self):
985
    """Verify that the passed name is a valid one.
986

987
    """
988
    hostname = utils.HostInfo(self.op.name)
989

    
990
    new_name = hostname.name
991
    self.ip = new_ip = hostname.ip
992
    old_name = self.sstore.GetClusterName()
993
    old_ip = self.sstore.GetMasterIP()
994
    if new_name == old_name and new_ip == old_ip:
995
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
996
                                 " cluster has changed")
997
    if new_ip != old_ip:
998
      result = utils.RunCmd(["fping", "-q", new_ip])
999
      if not result.failed:
1000
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1001
                                   " reachable on the network. Aborting." %
1002
                                   new_ip)
1003

    
1004
    self.op.name = new_name
1005

    
1006
  def Exec(self, feedback_fn):
1007
    """Rename the cluster.
1008

1009
    """
1010
    clustername = self.op.name
1011
    ip = self.ip
1012
    ss = self.sstore
1013

    
1014
    # shutdown the master IP
1015
    master = ss.GetMasterNode()
1016
    if not rpc.call_node_stop_master(master):
1017
      raise errors.OpExecError("Could not disable the master role")
1018

    
1019
    try:
1020
      # modify the sstore
1021
      ss.SetKey(ss.SS_MASTER_IP, ip)
1022
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1023

    
1024
      # Distribute updated ss config to all nodes
1025
      myself = self.cfg.GetNodeInfo(master)
1026
      dist_nodes = self.cfg.GetNodeList()
1027
      if myself.name in dist_nodes:
1028
        dist_nodes.remove(myself.name)
1029

    
1030
      logger.Debug("Copying updated ssconf data to all nodes")
1031
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1032
        fname = ss.KeyToFilename(keyname)
1033
        result = rpc.call_upload_file(dist_nodes, fname)
1034
        for to_node in dist_nodes:
1035
          if not result[to_node]:
1036
            logger.Error("copy of file %s to node %s failed" %
1037
                         (fname, to_node))
1038
    finally:
1039
      if not rpc.call_node_start_master(master):
1040
        logger.Error("Could not re-enable the master role on the master,"
1041
                     " please restart manually.")
1042

    
1043

    
1044
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1045
  """Sleep and poll for an instance's disk to sync.
1046

1047
  """
1048
  if not instance.disks:
1049
    return True
1050

    
1051
  if not oneshot:
1052
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1053

    
1054
  node = instance.primary_node
1055

    
1056
  for dev in instance.disks:
1057
    cfgw.SetDiskID(dev, node)
1058

    
1059
  retries = 0
1060
  while True:
1061
    max_time = 0
1062
    done = True
1063
    cumul_degraded = False
1064
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1065
    if not rstats:
1066
      proc.LogWarning("Can't get any data from node %s" % node)
1067
      retries += 1
1068
      if retries >= 10:
1069
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1070
                                 " aborting." % node)
1071
      time.sleep(6)
1072
      continue
1073
    retries = 0
1074
    for i in range(len(rstats)):
1075
      mstat = rstats[i]
1076
      if mstat is None:
1077
        proc.LogWarning("Can't compute data for node %s/%s" %
1078
                        (node, instance.disks[i].iv_name))
1079
        continue
1080
      # we ignore the ldisk parameter
1081
      perc_done, est_time, is_degraded, _ = mstat
1082
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1083
      if perc_done is not None:
1084
        done = False
1085
        if est_time is not None:
1086
          rem_time = "%d estimated seconds remaining" % est_time
1087
          max_time = est_time
1088
        else:
1089
          rem_time = "no time estimate"
1090
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1091
                     (instance.disks[i].iv_name, perc_done, rem_time))
1092
    if done or oneshot:
1093
      break
1094

    
1095
    if unlock:
1096
      utils.Unlock('cmd')
1097
    try:
1098
      time.sleep(min(60, max_time))
1099
    finally:
1100
      if unlock:
1101
        utils.Lock('cmd')
1102

    
1103
  if done:
1104
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1105
  return not cumul_degraded
1106

    
1107

    
1108
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1109
  """Check that mirrors are not degraded.
1110

1111
  The ldisk parameter, if True, will change the test from the
1112
  is_degraded attribute (which represents overall non-ok status for
1113
  the device(s)) to the ldisk (representing the local storage status).
1114

1115
  """
1116
  cfgw.SetDiskID(dev, node)
1117
  if ldisk:
1118
    idx = 6
1119
  else:
1120
    idx = 5
1121

    
1122
  result = True
1123
  if on_primary or dev.AssembleOnSecondary():
1124
    rstats = rpc.call_blockdev_find(node, dev)
1125
    if not rstats:
1126
      logger.ToStderr("Can't get any data from node %s" % node)
1127
      result = False
1128
    else:
1129
      result = result and (not rstats[idx])
1130
  if dev.children:
1131
    for child in dev.children:
1132
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1133

    
1134
  return result
1135

    
1136

    
1137
class LUDiagnoseOS(NoHooksLU):
1138
  """Logical unit for OS diagnose/query.
1139

1140
  """
1141
  _OP_REQP = []
1142

    
1143
  def CheckPrereq(self):
1144
    """Check prerequisites.
1145

1146
    This always succeeds, since this is a pure query LU.
1147

1148
    """
1149
    return
1150

    
1151
  def Exec(self, feedback_fn):
1152
    """Compute the list of OSes.
1153

1154
    """
1155
    node_list = self.cfg.GetNodeList()
1156
    node_data = rpc.call_os_diagnose(node_list)
1157
    if node_data == False:
1158
      raise errors.OpExecError("Can't gather the list of OSes")
1159
    return node_data
1160

    
1161

    
1162
class LURemoveNode(LogicalUnit):
1163
  """Logical unit for removing a node.
1164

1165
  """
1166
  HPATH = "node-remove"
1167
  HTYPE = constants.HTYPE_NODE
1168
  _OP_REQP = ["node_name"]
1169

    
1170
  def BuildHooksEnv(self):
1171
    """Build hooks env.
1172

1173
    This doesn't run on the target node in the pre phase as a failed
1174
    node would not allows itself to run.
1175

1176
    """
1177
    env = {
1178
      "OP_TARGET": self.op.node_name,
1179
      "NODE_NAME": self.op.node_name,
1180
      }
1181
    all_nodes = self.cfg.GetNodeList()
1182
    all_nodes.remove(self.op.node_name)
1183
    return env, all_nodes, all_nodes
1184

    
1185
  def CheckPrereq(self):
1186
    """Check prerequisites.
1187

1188
    This checks:
1189
     - the node exists in the configuration
1190
     - it does not have primary or secondary instances
1191
     - it's not the master
1192

1193
    Any errors are signalled by raising errors.OpPrereqError.
1194

1195
    """
1196
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1197
    if node is None:
1198
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1199

    
1200
    instance_list = self.cfg.GetInstanceList()
1201

    
1202
    masternode = self.sstore.GetMasterNode()
1203
    if node.name == masternode:
1204
      raise errors.OpPrereqError("Node is the master node,"
1205
                                 " you need to failover first.")
1206

    
1207
    for instance_name in instance_list:
1208
      instance = self.cfg.GetInstanceInfo(instance_name)
1209
      if node.name == instance.primary_node:
1210
        raise errors.OpPrereqError("Instance %s still running on the node,"
1211
                                   " please remove first." % instance_name)
1212
      if node.name in instance.secondary_nodes:
1213
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1214
                                   " please remove first." % instance_name)
1215
    self.op.node_name = node.name
1216
    self.node = node
1217

    
1218
  def Exec(self, feedback_fn):
1219
    """Removes the node from the cluster.
1220

1221
    """
1222
    node = self.node
1223
    logger.Info("stopping the node daemon and removing configs from node %s" %
1224
                node.name)
1225

    
1226
    rpc.call_node_leave_cluster(node.name)
1227

    
1228
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1229

    
1230
    logger.Info("Removing node %s from config" % node.name)
1231

    
1232
    self.cfg.RemoveNode(node.name)
1233

    
1234
    _RemoveHostFromEtcHosts(node.name)
1235

    
1236

    
1237
class LUQueryNodes(NoHooksLU):
1238
  """Logical unit for querying nodes.
1239

1240
  """
1241
  _OP_REQP = ["output_fields", "names"]
1242

    
1243
  def CheckPrereq(self):
1244
    """Check prerequisites.
1245

1246
    This checks that the fields required are valid output fields.
1247

1248
    """
1249
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1250
                                     "mtotal", "mnode", "mfree",
1251
                                     "bootid"])
1252

    
1253
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1254
                               "pinst_list", "sinst_list",
1255
                               "pip", "sip"],
1256
                       dynamic=self.dynamic_fields,
1257
                       selected=self.op.output_fields)
1258

    
1259
    self.wanted = _GetWantedNodes(self, self.op.names)
1260

    
1261
  def Exec(self, feedback_fn):
1262
    """Computes the list of nodes and their attributes.
1263

1264
    """
1265
    nodenames = self.wanted
1266
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1267

    
1268
    # begin data gathering
1269

    
1270
    if self.dynamic_fields.intersection(self.op.output_fields):
1271
      live_data = {}
1272
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1273
      for name in nodenames:
1274
        nodeinfo = node_data.get(name, None)
1275
        if nodeinfo:
1276
          live_data[name] = {
1277
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1278
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1279
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1280
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1281
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1282
            "bootid": nodeinfo['bootid'],
1283
            }
1284
        else:
1285
          live_data[name] = {}
1286
    else:
1287
      live_data = dict.fromkeys(nodenames, {})
1288

    
1289
    node_to_primary = dict([(name, set()) for name in nodenames])
1290
    node_to_secondary = dict([(name, set()) for name in nodenames])
1291

    
1292
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1293
                             "sinst_cnt", "sinst_list"))
1294
    if inst_fields & frozenset(self.op.output_fields):
1295
      instancelist = self.cfg.GetInstanceList()
1296

    
1297
      for instance_name in instancelist:
1298
        inst = self.cfg.GetInstanceInfo(instance_name)
1299
        if inst.primary_node in node_to_primary:
1300
          node_to_primary[inst.primary_node].add(inst.name)
1301
        for secnode in inst.secondary_nodes:
1302
          if secnode in node_to_secondary:
1303
            node_to_secondary[secnode].add(inst.name)
1304

    
1305
    # end data gathering
1306

    
1307
    output = []
1308
    for node in nodelist:
1309
      node_output = []
1310
      for field in self.op.output_fields:
1311
        if field == "name":
1312
          val = node.name
1313
        elif field == "pinst_list":
1314
          val = list(node_to_primary[node.name])
1315
        elif field == "sinst_list":
1316
          val = list(node_to_secondary[node.name])
1317
        elif field == "pinst_cnt":
1318
          val = len(node_to_primary[node.name])
1319
        elif field == "sinst_cnt":
1320
          val = len(node_to_secondary[node.name])
1321
        elif field == "pip":
1322
          val = node.primary_ip
1323
        elif field == "sip":
1324
          val = node.secondary_ip
1325
        elif field in self.dynamic_fields:
1326
          val = live_data[node.name].get(field, None)
1327
        else:
1328
          raise errors.ParameterError(field)
1329
        node_output.append(val)
1330
      output.append(node_output)
1331

    
1332
    return output
1333

    
1334

    
1335
class LUQueryNodeVolumes(NoHooksLU):
1336
  """Logical unit for getting volumes on node(s).
1337

1338
  """
1339
  _OP_REQP = ["nodes", "output_fields"]
1340

    
1341
  def CheckPrereq(self):
1342
    """Check prerequisites.
1343

1344
    This checks that the fields required are valid output fields.
1345

1346
    """
1347
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1348

    
1349
    _CheckOutputFields(static=["node"],
1350
                       dynamic=["phys", "vg", "name", "size", "instance"],
1351
                       selected=self.op.output_fields)
1352

    
1353

    
1354
  def Exec(self, feedback_fn):
1355
    """Computes the list of nodes and their attributes.
1356

1357
    """
1358
    nodenames = self.nodes
1359
    volumes = rpc.call_node_volumes(nodenames)
1360

    
1361
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1362
             in self.cfg.GetInstanceList()]
1363

    
1364
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1365

    
1366
    output = []
1367
    for node in nodenames:
1368
      if node not in volumes or not volumes[node]:
1369
        continue
1370

    
1371
      node_vols = volumes[node][:]
1372
      node_vols.sort(key=lambda vol: vol['dev'])
1373

    
1374
      for vol in node_vols:
1375
        node_output = []
1376
        for field in self.op.output_fields:
1377
          if field == "node":
1378
            val = node
1379
          elif field == "phys":
1380
            val = vol['dev']
1381
          elif field == "vg":
1382
            val = vol['vg']
1383
          elif field == "name":
1384
            val = vol['name']
1385
          elif field == "size":
1386
            val = int(float(vol['size']))
1387
          elif field == "instance":
1388
            for inst in ilist:
1389
              if node not in lv_by_node[inst]:
1390
                continue
1391
              if vol['name'] in lv_by_node[inst][node]:
1392
                val = inst.name
1393
                break
1394
            else:
1395
              val = '-'
1396
          else:
1397
            raise errors.ParameterError(field)
1398
          node_output.append(str(val))
1399

    
1400
        output.append(node_output)
1401

    
1402
    return output
1403

    
1404

    
1405
class LUAddNode(LogicalUnit):
1406
  """Logical unit for adding node to the cluster.
1407

1408
  """
1409
  HPATH = "node-add"
1410
  HTYPE = constants.HTYPE_NODE
1411
  _OP_REQP = ["node_name"]
1412

    
1413
  def BuildHooksEnv(self):
1414
    """Build hooks env.
1415

1416
    This will run on all nodes before, and on all nodes + the new node after.
1417

1418
    """
1419
    env = {
1420
      "OP_TARGET": self.op.node_name,
1421
      "NODE_NAME": self.op.node_name,
1422
      "NODE_PIP": self.op.primary_ip,
1423
      "NODE_SIP": self.op.secondary_ip,
1424
      }
1425
    nodes_0 = self.cfg.GetNodeList()
1426
    nodes_1 = nodes_0 + [self.op.node_name, ]
1427
    return env, nodes_0, nodes_1
1428

    
1429
  def CheckPrereq(self):
1430
    """Check prerequisites.
1431

1432
    This checks:
1433
     - the new node is not already in the config
1434
     - it is resolvable
1435
     - its parameters (single/dual homed) matches the cluster
1436

1437
    Any errors are signalled by raising errors.OpPrereqError.
1438

1439
    """
1440
    node_name = self.op.node_name
1441
    cfg = self.cfg
1442

    
1443
    dns_data = utils.HostInfo(node_name)
1444

    
1445
    node = dns_data.name
1446
    primary_ip = self.op.primary_ip = dns_data.ip
1447
    secondary_ip = getattr(self.op, "secondary_ip", None)
1448
    if secondary_ip is None:
1449
      secondary_ip = primary_ip
1450
    if not utils.IsValidIP(secondary_ip):
1451
      raise errors.OpPrereqError("Invalid secondary IP given")
1452
    self.op.secondary_ip = secondary_ip
1453
    node_list = cfg.GetNodeList()
1454
    if node in node_list:
1455
      raise errors.OpPrereqError("Node %s is already in the configuration"
1456
                                 % node)
1457

    
1458
    for existing_node_name in node_list:
1459
      existing_node = cfg.GetNodeInfo(existing_node_name)
1460
      if (existing_node.primary_ip == primary_ip or
1461
          existing_node.secondary_ip == primary_ip or
1462
          existing_node.primary_ip == secondary_ip or
1463
          existing_node.secondary_ip == secondary_ip):
1464
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1465
                                   " existing node %s" % existing_node.name)
1466

    
1467
    # check that the type of the node (single versus dual homed) is the
1468
    # same as for the master
1469
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1470
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1471
    newbie_singlehomed = secondary_ip == primary_ip
1472
    if master_singlehomed != newbie_singlehomed:
1473
      if master_singlehomed:
1474
        raise errors.OpPrereqError("The master has no private ip but the"
1475
                                   " new node has one")
1476
      else:
1477
        raise errors.OpPrereqError("The master has a private ip but the"
1478
                                   " new node doesn't have one")
1479

    
1480
    # checks reachablity
1481
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1482
      raise errors.OpPrereqError("Node not reachable by ping")
1483

    
1484
    if not newbie_singlehomed:
1485
      # check reachability from my secondary ip to newbie's secondary ip
1486
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1487
                           source=myself.secondary_ip):
1488
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1489
                                   " based ping to noded port")
1490

    
1491
    self.new_node = objects.Node(name=node,
1492
                                 primary_ip=primary_ip,
1493
                                 secondary_ip=secondary_ip)
1494

    
1495
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1496
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1497
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1498
                                   constants.VNC_PASSWORD_FILE)
1499

    
1500
  def Exec(self, feedback_fn):
1501
    """Adds the new node to the cluster.
1502

1503
    """
1504
    new_node = self.new_node
1505
    node = new_node.name
1506

    
1507
    # set up inter-node password and certificate and restarts the node daemon
1508
    gntpass = self.sstore.GetNodeDaemonPassword()
1509
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1510
      raise errors.OpExecError("ganeti password corruption detected")
1511
    f = open(constants.SSL_CERT_FILE)
1512
    try:
1513
      gntpem = f.read(8192)
1514
    finally:
1515
      f.close()
1516
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1517
    # so we use this to detect an invalid certificate; as long as the
1518
    # cert doesn't contain this, the here-document will be correctly
1519
    # parsed by the shell sequence below
1520
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1521
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1522
    if not gntpem.endswith("\n"):
1523
      raise errors.OpExecError("PEM must end with newline")
1524
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1525

    
1526
    # and then connect with ssh to set password and start ganeti-noded
1527
    # note that all the below variables are sanitized at this point,
1528
    # either by being constants or by the checks above
1529
    ss = self.sstore
1530
    mycommand = ("umask 077 && "
1531
                 "echo '%s' > '%s' && "
1532
                 "cat > '%s' << '!EOF.' && \n"
1533
                 "%s!EOF.\n%s restart" %
1534
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1535
                  constants.SSL_CERT_FILE, gntpem,
1536
                  constants.NODE_INITD_SCRIPT))
1537

    
1538
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1539
    if result.failed:
1540
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1541
                               " output: %s" %
1542
                               (node, result.fail_reason, result.output))
1543

    
1544
    # check connectivity
1545
    time.sleep(4)
1546

    
1547
    result = rpc.call_version([node])[node]
1548
    if result:
1549
      if constants.PROTOCOL_VERSION == result:
1550
        logger.Info("communication to node %s fine, sw version %s match" %
1551
                    (node, result))
1552
      else:
1553
        raise errors.OpExecError("Version mismatch master version %s,"
1554
                                 " node version %s" %
1555
                                 (constants.PROTOCOL_VERSION, result))
1556
    else:
1557
      raise errors.OpExecError("Cannot get version from the new node")
1558

    
1559
    # setup ssh on node
1560
    logger.Info("copy ssh key to node %s" % node)
1561
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1562
    keyarray = []
1563
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1564
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1565
                priv_key, pub_key]
1566

    
1567
    for i in keyfiles:
1568
      f = open(i, 'r')
1569
      try:
1570
        keyarray.append(f.read())
1571
      finally:
1572
        f.close()
1573

    
1574
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1575
                               keyarray[3], keyarray[4], keyarray[5])
1576

    
1577
    if not result:
1578
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1579

    
1580
    # Add node to our /etc/hosts, and add key to known_hosts
1581
    _AddHostToEtcHosts(new_node.name)
1582

    
1583
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1584
                      self.cfg.GetHostKey())
1585

    
1586
    if new_node.secondary_ip != new_node.primary_ip:
1587
      if not rpc.call_node_tcp_ping(new_node.name,
1588
                                    constants.LOCALHOST_IP_ADDRESS,
1589
                                    new_node.secondary_ip,
1590
                                    constants.DEFAULT_NODED_PORT,
1591
                                    10, False):
1592
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1593
                                 " you gave (%s). Please fix and re-run this"
1594
                                 " command." % new_node.secondary_ip)
1595

    
1596
    success, msg = ssh.VerifyNodeHostname(node)
1597
    if not success:
1598
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1599
                               " than the one the resolver gives: %s."
1600
                               " Please fix and re-run this command." %
1601
                               (node, msg))
1602

    
1603
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1604
    # including the node just added
1605
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1606
    dist_nodes = self.cfg.GetNodeList() + [node]
1607
    if myself.name in dist_nodes:
1608
      dist_nodes.remove(myself.name)
1609

    
1610
    logger.Debug("Copying hosts and known_hosts to all nodes")
1611
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1612
      result = rpc.call_upload_file(dist_nodes, fname)
1613
      for to_node in dist_nodes:
1614
        if not result[to_node]:
1615
          logger.Error("copy of file %s to node %s failed" %
1616
                       (fname, to_node))
1617

    
1618
    to_copy = ss.GetFileList()
1619
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1620
      to_copy.append(constants.VNC_PASSWORD_FILE)
1621
    for fname in to_copy:
1622
      if not ssh.CopyFileToNode(node, fname):
1623
        logger.Error("could not copy file %s to node %s" % (fname, node))
1624

    
1625
    logger.Info("adding node %s to cluster.conf" % node)
1626
    self.cfg.AddNode(new_node)
1627

    
1628

    
1629
class LUMasterFailover(LogicalUnit):
1630
  """Failover the master node to the current node.
1631

1632
  This is a special LU in that it must run on a non-master node.
1633

1634
  """
1635
  HPATH = "master-failover"
1636
  HTYPE = constants.HTYPE_CLUSTER
1637
  REQ_MASTER = False
1638
  _OP_REQP = []
1639

    
1640
  def BuildHooksEnv(self):
1641
    """Build hooks env.
1642

1643
    This will run on the new master only in the pre phase, and on all
1644
    the nodes in the post phase.
1645

1646
    """
1647
    env = {
1648
      "OP_TARGET": self.new_master,
1649
      "NEW_MASTER": self.new_master,
1650
      "OLD_MASTER": self.old_master,
1651
      }
1652
    return env, [self.new_master], self.cfg.GetNodeList()
1653

    
1654
  def CheckPrereq(self):
1655
    """Check prerequisites.
1656

1657
    This checks that we are not already the master.
1658

1659
    """
1660
    self.new_master = utils.HostInfo().name
1661
    self.old_master = self.sstore.GetMasterNode()
1662

    
1663
    if self.old_master == self.new_master:
1664
      raise errors.OpPrereqError("This commands must be run on the node"
1665
                                 " where you want the new master to be."
1666
                                 " %s is already the master" %
1667
                                 self.old_master)
1668

    
1669
  def Exec(self, feedback_fn):
1670
    """Failover the master node.
1671

1672
    This command, when run on a non-master node, will cause the current
1673
    master to cease being master, and the non-master to become new
1674
    master.
1675

1676
    """
1677
    #TODO: do not rely on gethostname returning the FQDN
1678
    logger.Info("setting master to %s, old master: %s" %
1679
                (self.new_master, self.old_master))
1680

    
1681
    if not rpc.call_node_stop_master(self.old_master):
1682
      logger.Error("could disable the master role on the old master"
1683
                   " %s, please disable manually" % self.old_master)
1684

    
1685
    ss = self.sstore
1686
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1687
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1688
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1689
      logger.Error("could not distribute the new simple store master file"
1690
                   " to the other nodes, please check.")
1691

    
1692
    if not rpc.call_node_start_master(self.new_master):
1693
      logger.Error("could not start the master role on the new master"
1694
                   " %s, please check" % self.new_master)
1695
      feedback_fn("Error in activating the master IP on the new master,"
1696
                  " please fix manually.")
1697

    
1698

    
1699

    
1700
class LUQueryClusterInfo(NoHooksLU):
1701
  """Query cluster configuration.
1702

1703
  """
1704
  _OP_REQP = []
1705
  REQ_MASTER = False
1706

    
1707
  def CheckPrereq(self):
1708
    """No prerequsites needed for this LU.
1709

1710
    """
1711
    pass
1712

    
1713
  def Exec(self, feedback_fn):
1714
    """Return cluster config.
1715

1716
    """
1717
    result = {
1718
      "name": self.sstore.GetClusterName(),
1719
      "software_version": constants.RELEASE_VERSION,
1720
      "protocol_version": constants.PROTOCOL_VERSION,
1721
      "config_version": constants.CONFIG_VERSION,
1722
      "os_api_version": constants.OS_API_VERSION,
1723
      "export_version": constants.EXPORT_VERSION,
1724
      "master": self.sstore.GetMasterNode(),
1725
      "architecture": (platform.architecture()[0], platform.machine()),
1726
      }
1727

    
1728
    return result
1729

    
1730

    
1731
class LUClusterCopyFile(NoHooksLU):
1732
  """Copy file to cluster.
1733

1734
  """
1735
  _OP_REQP = ["nodes", "filename"]
1736

    
1737
  def CheckPrereq(self):
1738
    """Check prerequisites.
1739

1740
    It should check that the named file exists and that the given list
1741
    of nodes is valid.
1742

1743
    """
1744
    if not os.path.exists(self.op.filename):
1745
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1746

    
1747
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1748

    
1749
  def Exec(self, feedback_fn):
1750
    """Copy a file from master to some nodes.
1751

1752
    Args:
1753
      opts - class with options as members
1754
      args - list containing a single element, the file name
1755
    Opts used:
1756
      nodes - list containing the name of target nodes; if empty, all nodes
1757

1758
    """
1759
    filename = self.op.filename
1760

    
1761
    myname = utils.HostInfo().name
1762

    
1763
    for node in self.nodes:
1764
      if node == myname:
1765
        continue
1766
      if not ssh.CopyFileToNode(node, filename):
1767
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1768

    
1769

    
1770
class LUDumpClusterConfig(NoHooksLU):
1771
  """Return a text-representation of the cluster-config.
1772

1773
  """
1774
  _OP_REQP = []
1775

    
1776
  def CheckPrereq(self):
1777
    """No prerequisites.
1778

1779
    """
1780
    pass
1781

    
1782
  def Exec(self, feedback_fn):
1783
    """Dump a representation of the cluster config to the standard output.
1784

1785
    """
1786
    return self.cfg.DumpConfig()
1787

    
1788

    
1789
class LURunClusterCommand(NoHooksLU):
1790
  """Run a command on some nodes.
1791

1792
  """
1793
  _OP_REQP = ["command", "nodes"]
1794

    
1795
  def CheckPrereq(self):
1796
    """Check prerequisites.
1797

1798
    It checks that the given list of nodes is valid.
1799

1800
    """
1801
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1802

    
1803
  def Exec(self, feedback_fn):
1804
    """Run a command on some nodes.
1805

1806
    """
1807
    data = []
1808
    for node in self.nodes:
1809
      result = ssh.SSHCall(node, "root", self.op.command)
1810
      data.append((node, result.output, result.exit_code))
1811

    
1812
    return data
1813

    
1814

    
1815
class LUActivateInstanceDisks(NoHooksLU):
1816
  """Bring up an instance's disks.
1817

1818
  """
1819
  _OP_REQP = ["instance_name"]
1820

    
1821
  def CheckPrereq(self):
1822
    """Check prerequisites.
1823

1824
    This checks that the instance is in the cluster.
1825

1826
    """
1827
    instance = self.cfg.GetInstanceInfo(
1828
      self.cfg.ExpandInstanceName(self.op.instance_name))
1829
    if instance is None:
1830
      raise errors.OpPrereqError("Instance '%s' not known" %
1831
                                 self.op.instance_name)
1832
    self.instance = instance
1833

    
1834

    
1835
  def Exec(self, feedback_fn):
1836
    """Activate the disks.
1837

1838
    """
1839
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1840
    if not disks_ok:
1841
      raise errors.OpExecError("Cannot activate block devices")
1842

    
1843
    return disks_info
1844

    
1845

    
1846
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1847
  """Prepare the block devices for an instance.
1848

1849
  This sets up the block devices on all nodes.
1850

1851
  Args:
1852
    instance: a ganeti.objects.Instance object
1853
    ignore_secondaries: if true, errors on secondary nodes won't result
1854
                        in an error return from the function
1855

1856
  Returns:
1857
    false if the operation failed
1858
    list of (host, instance_visible_name, node_visible_name) if the operation
1859
         suceeded with the mapping from node devices to instance devices
1860
  """
1861
  device_info = []
1862
  disks_ok = True
1863
  iname = instance.name
1864
  # With the two passes mechanism we try to reduce the window of
1865
  # opportunity for the race condition of switching DRBD to primary
1866
  # before handshaking occured, but we do not eliminate it
1867

    
1868
  # The proper fix would be to wait (with some limits) until the
1869
  # connection has been made and drbd transitions from WFConnection
1870
  # into any other network-connected state (Connected, SyncTarget,
1871
  # SyncSource, etc.)
1872

    
1873
  # 1st pass, assemble on all nodes in secondary mode
1874
  for inst_disk in instance.disks:
1875
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1876
      cfg.SetDiskID(node_disk, node)
1877
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1878
      if not result:
1879
        logger.Error("could not prepare block device %s on node %s"
1880
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1881
        if not ignore_secondaries:
1882
          disks_ok = False
1883

    
1884
  # FIXME: race condition on drbd migration to primary
1885

    
1886
  # 2nd pass, do only the primary node
1887
  for inst_disk in instance.disks:
1888
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1889
      if node != instance.primary_node:
1890
        continue
1891
      cfg.SetDiskID(node_disk, node)
1892
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1893
      if not result:
1894
        logger.Error("could not prepare block device %s on node %s"
1895
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1896
        disks_ok = False
1897
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1898

    
1899
  # leave the disks configured for the primary node
1900
  # this is a workaround that would be fixed better by
1901
  # improving the logical/physical id handling
1902
  for disk in instance.disks:
1903
    cfg.SetDiskID(disk, instance.primary_node)
1904

    
1905
  return disks_ok, device_info
1906

    
1907

    
1908
def _StartInstanceDisks(cfg, instance, force):
1909
  """Start the disks of an instance.
1910

1911
  """
1912
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1913
                                           ignore_secondaries=force)
1914
  if not disks_ok:
1915
    _ShutdownInstanceDisks(instance, cfg)
1916
    if force is not None and not force:
1917
      logger.Error("If the message above refers to a secondary node,"
1918
                   " you can retry the operation using '--force'.")
1919
    raise errors.OpExecError("Disk consistency error")
1920

    
1921

    
1922
class LUDeactivateInstanceDisks(NoHooksLU):
1923
  """Shutdown an instance's disks.
1924

1925
  """
1926
  _OP_REQP = ["instance_name"]
1927

    
1928
  def CheckPrereq(self):
1929
    """Check prerequisites.
1930

1931
    This checks that the instance is in the cluster.
1932

1933
    """
1934
    instance = self.cfg.GetInstanceInfo(
1935
      self.cfg.ExpandInstanceName(self.op.instance_name))
1936
    if instance is None:
1937
      raise errors.OpPrereqError("Instance '%s' not known" %
1938
                                 self.op.instance_name)
1939
    self.instance = instance
1940

    
1941
  def Exec(self, feedback_fn):
1942
    """Deactivate the disks
1943

1944
    """
1945
    instance = self.instance
1946
    ins_l = rpc.call_instance_list([instance.primary_node])
1947
    ins_l = ins_l[instance.primary_node]
1948
    if not type(ins_l) is list:
1949
      raise errors.OpExecError("Can't contact node '%s'" %
1950
                               instance.primary_node)
1951

    
1952
    if self.instance.name in ins_l:
1953
      raise errors.OpExecError("Instance is running, can't shutdown"
1954
                               " block devices.")
1955

    
1956
    _ShutdownInstanceDisks(instance, self.cfg)
1957

    
1958

    
1959
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1960
  """Shutdown block devices of an instance.
1961

1962
  This does the shutdown on all nodes of the instance.
1963

1964
  If the ignore_primary is false, errors on the primary node are
1965
  ignored.
1966

1967
  """
1968
  result = True
1969
  for disk in instance.disks:
1970
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1971
      cfg.SetDiskID(top_disk, node)
1972
      if not rpc.call_blockdev_shutdown(node, top_disk):
1973
        logger.Error("could not shutdown block device %s on node %s" %
1974
                     (disk.iv_name, node))
1975
        if not ignore_primary or node != instance.primary_node:
1976
          result = False
1977
  return result
1978

    
1979

    
1980
def _CheckNodeFreeMemory(cfg, node, reason, requested):
1981
  """Checks if a node has enough free memory.
1982

1983
  This function check if a given node has the needed amount of free
1984
  memory. In case the node has less memory or we cannot get the
1985
  information from the node, this function raise an OpPrereqError
1986
  exception.
1987

1988
  Args:
1989
    - cfg: a ConfigWriter instance
1990
    - node: the node name
1991
    - reason: string to use in the error message
1992
    - requested: the amount of memory in MiB
1993

1994
  """
1995
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1996
  if not nodeinfo or not isinstance(nodeinfo, dict):
1997
    raise errors.OpPrereqError("Could not contact node %s for resource"
1998
                             " information" % (node,))
1999

    
2000
  free_mem = nodeinfo[node].get('memory_free')
2001
  if not isinstance(free_mem, int):
2002
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2003
                             " was '%s'" % (node, free_mem))
2004
  if requested > free_mem:
2005
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2006
                             " needed %s MiB, available %s MiB" %
2007
                             (node, reason, requested, free_mem))
2008

    
2009

    
2010
class LUStartupInstance(LogicalUnit):
2011
  """Starts an instance.
2012

2013
  """
2014
  HPATH = "instance-start"
2015
  HTYPE = constants.HTYPE_INSTANCE
2016
  _OP_REQP = ["instance_name", "force"]
2017

    
2018
  def BuildHooksEnv(self):
2019
    """Build hooks env.
2020

2021
    This runs on master, primary and secondary nodes of the instance.
2022

2023
    """
2024
    env = {
2025
      "FORCE": self.op.force,
2026
      }
2027
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2028
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2029
          list(self.instance.secondary_nodes))
2030
    return env, nl, nl
2031

    
2032
  def CheckPrereq(self):
2033
    """Check prerequisites.
2034

2035
    This checks that the instance is in the cluster.
2036

2037
    """
2038
    instance = self.cfg.GetInstanceInfo(
2039
      self.cfg.ExpandInstanceName(self.op.instance_name))
2040
    if instance is None:
2041
      raise errors.OpPrereqError("Instance '%s' not known" %
2042
                                 self.op.instance_name)
2043

    
2044
    # check bridges existance
2045
    _CheckInstanceBridgesExist(instance)
2046

    
2047
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2048
                         "starting instance %s" % instance.name,
2049
                         instance.memory)
2050

    
2051
    self.instance = instance
2052
    self.op.instance_name = instance.name
2053

    
2054
  def Exec(self, feedback_fn):
2055
    """Start the instance.
2056

2057
    """
2058
    instance = self.instance
2059
    force = self.op.force
2060
    extra_args = getattr(self.op, "extra_args", "")
2061

    
2062
    node_current = instance.primary_node
2063

    
2064
    _StartInstanceDisks(self.cfg, instance, force)
2065

    
2066
    if not rpc.call_instance_start(node_current, instance, extra_args):
2067
      _ShutdownInstanceDisks(instance, self.cfg)
2068
      raise errors.OpExecError("Could not start instance")
2069

    
2070
    self.cfg.MarkInstanceUp(instance.name)
2071

    
2072

    
2073
class LURebootInstance(LogicalUnit):
2074
  """Reboot an instance.
2075

2076
  """
2077
  HPATH = "instance-reboot"
2078
  HTYPE = constants.HTYPE_INSTANCE
2079
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2080

    
2081
  def BuildHooksEnv(self):
2082
    """Build hooks env.
2083

2084
    This runs on master, primary and secondary nodes of the instance.
2085

2086
    """
2087
    env = {
2088
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2089
      }
2090
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2091
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2092
          list(self.instance.secondary_nodes))
2093
    return env, nl, nl
2094

    
2095
  def CheckPrereq(self):
2096
    """Check prerequisites.
2097

2098
    This checks that the instance is in the cluster.
2099

2100
    """
2101
    instance = self.cfg.GetInstanceInfo(
2102
      self.cfg.ExpandInstanceName(self.op.instance_name))
2103
    if instance is None:
2104
      raise errors.OpPrereqError("Instance '%s' not known" %
2105
                                 self.op.instance_name)
2106

    
2107
    # check bridges existance
2108
    _CheckInstanceBridgesExist(instance)
2109

    
2110
    self.instance = instance
2111
    self.op.instance_name = instance.name
2112

    
2113
  def Exec(self, feedback_fn):
2114
    """Reboot the instance.
2115

2116
    """
2117
    instance = self.instance
2118
    ignore_secondaries = self.op.ignore_secondaries
2119
    reboot_type = self.op.reboot_type
2120
    extra_args = getattr(self.op, "extra_args", "")
2121

    
2122
    node_current = instance.primary_node
2123

    
2124
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2125
                           constants.INSTANCE_REBOOT_HARD,
2126
                           constants.INSTANCE_REBOOT_FULL]:
2127
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2128
                                  (constants.INSTANCE_REBOOT_SOFT,
2129
                                   constants.INSTANCE_REBOOT_HARD,
2130
                                   constants.INSTANCE_REBOOT_FULL))
2131

    
2132
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2133
                       constants.INSTANCE_REBOOT_HARD]:
2134
      if not rpc.call_instance_reboot(node_current, instance,
2135
                                      reboot_type, extra_args):
2136
        raise errors.OpExecError("Could not reboot instance")
2137
    else:
2138
      if not rpc.call_instance_shutdown(node_current, instance):
2139
        raise errors.OpExecError("could not shutdown instance for full reboot")
2140
      _ShutdownInstanceDisks(instance, self.cfg)
2141
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2142
      if not rpc.call_instance_start(node_current, instance, extra_args):
2143
        _ShutdownInstanceDisks(instance, self.cfg)
2144
        raise errors.OpExecError("Could not start instance for full reboot")
2145

    
2146
    self.cfg.MarkInstanceUp(instance.name)
2147

    
2148

    
2149
class LUShutdownInstance(LogicalUnit):
2150
  """Shutdown an instance.
2151

2152
  """
2153
  HPATH = "instance-stop"
2154
  HTYPE = constants.HTYPE_INSTANCE
2155
  _OP_REQP = ["instance_name"]
2156

    
2157
  def BuildHooksEnv(self):
2158
    """Build hooks env.
2159

2160
    This runs on master, primary and secondary nodes of the instance.
2161

2162
    """
2163
    env = _BuildInstanceHookEnvByObject(self.instance)
2164
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2165
          list(self.instance.secondary_nodes))
2166
    return env, nl, nl
2167

    
2168
  def CheckPrereq(self):
2169
    """Check prerequisites.
2170

2171
    This checks that the instance is in the cluster.
2172

2173
    """
2174
    instance = self.cfg.GetInstanceInfo(
2175
      self.cfg.ExpandInstanceName(self.op.instance_name))
2176
    if instance is None:
2177
      raise errors.OpPrereqError("Instance '%s' not known" %
2178
                                 self.op.instance_name)
2179
    self.instance = instance
2180

    
2181
  def Exec(self, feedback_fn):
2182
    """Shutdown the instance.
2183

2184
    """
2185
    instance = self.instance
2186
    node_current = instance.primary_node
2187
    if not rpc.call_instance_shutdown(node_current, instance):
2188
      logger.Error("could not shutdown instance")
2189

    
2190
    self.cfg.MarkInstanceDown(instance.name)
2191
    _ShutdownInstanceDisks(instance, self.cfg)
2192

    
2193

    
2194
class LUReinstallInstance(LogicalUnit):
2195
  """Reinstall an instance.
2196

2197
  """
2198
  HPATH = "instance-reinstall"
2199
  HTYPE = constants.HTYPE_INSTANCE
2200
  _OP_REQP = ["instance_name"]
2201

    
2202
  def BuildHooksEnv(self):
2203
    """Build hooks env.
2204

2205
    This runs on master, primary and secondary nodes of the instance.
2206

2207
    """
2208
    env = _BuildInstanceHookEnvByObject(self.instance)
2209
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2210
          list(self.instance.secondary_nodes))
2211
    return env, nl, nl
2212

    
2213
  def CheckPrereq(self):
2214
    """Check prerequisites.
2215

2216
    This checks that the instance is in the cluster and is not running.
2217

2218
    """
2219
    instance = self.cfg.GetInstanceInfo(
2220
      self.cfg.ExpandInstanceName(self.op.instance_name))
2221
    if instance is None:
2222
      raise errors.OpPrereqError("Instance '%s' not known" %
2223
                                 self.op.instance_name)
2224
    if instance.disk_template == constants.DT_DISKLESS:
2225
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2226
                                 self.op.instance_name)
2227
    if instance.status != "down":
2228
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2229
                                 self.op.instance_name)
2230
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2231
    if remote_info:
2232
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2233
                                 (self.op.instance_name,
2234
                                  instance.primary_node))
2235

    
2236
    self.op.os_type = getattr(self.op, "os_type", None)
2237
    if self.op.os_type is not None:
2238
      # OS verification
2239
      pnode = self.cfg.GetNodeInfo(
2240
        self.cfg.ExpandNodeName(instance.primary_node))
2241
      if pnode is None:
2242
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2243
                                   self.op.pnode)
2244
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2245
      if not os_obj:
2246
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2247
                                   " primary node"  % self.op.os_type)
2248

    
2249
    self.instance = instance
2250

    
2251
  def Exec(self, feedback_fn):
2252
    """Reinstall the instance.
2253

2254
    """
2255
    inst = self.instance
2256

    
2257
    if self.op.os_type is not None:
2258
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2259
      inst.os = self.op.os_type
2260
      self.cfg.AddInstance(inst)
2261

    
2262
    _StartInstanceDisks(self.cfg, inst, None)
2263
    try:
2264
      feedback_fn("Running the instance OS create scripts...")
2265
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2266
        raise errors.OpExecError("Could not install OS for instance %s"
2267
                                 " on node %s" %
2268
                                 (inst.name, inst.primary_node))
2269
    finally:
2270
      _ShutdownInstanceDisks(inst, self.cfg)
2271

    
2272

    
2273
class LURenameInstance(LogicalUnit):
2274
  """Rename an instance.
2275

2276
  """
2277
  HPATH = "instance-rename"
2278
  HTYPE = constants.HTYPE_INSTANCE
2279
  _OP_REQP = ["instance_name", "new_name"]
2280

    
2281
  def BuildHooksEnv(self):
2282
    """Build hooks env.
2283

2284
    This runs on master, primary and secondary nodes of the instance.
2285

2286
    """
2287
    env = _BuildInstanceHookEnvByObject(self.instance)
2288
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2289
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2290
          list(self.instance.secondary_nodes))
2291
    return env, nl, nl
2292

    
2293
  def CheckPrereq(self):
2294
    """Check prerequisites.
2295

2296
    This checks that the instance is in the cluster and is not running.
2297

2298
    """
2299
    instance = self.cfg.GetInstanceInfo(
2300
      self.cfg.ExpandInstanceName(self.op.instance_name))
2301
    if instance is None:
2302
      raise errors.OpPrereqError("Instance '%s' not known" %
2303
                                 self.op.instance_name)
2304
    if instance.status != "down":
2305
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2306
                                 self.op.instance_name)
2307
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2308
    if remote_info:
2309
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2310
                                 (self.op.instance_name,
2311
                                  instance.primary_node))
2312
    self.instance = instance
2313

    
2314
    # new name verification
2315
    name_info = utils.HostInfo(self.op.new_name)
2316

    
2317
    self.op.new_name = new_name = name_info.name
2318
    instance_list = self.cfg.GetInstanceList()
2319
    if new_name in instance_list:
2320
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2321
                                 instance_name)
2322

    
2323
    if not getattr(self.op, "ignore_ip", False):
2324
      command = ["fping", "-q", name_info.ip]
2325
      result = utils.RunCmd(command)
2326
      if not result.failed:
2327
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2328
                                   (name_info.ip, new_name))
2329

    
2330

    
2331
  def Exec(self, feedback_fn):
2332
    """Reinstall the instance.
2333

2334
    """
2335
    inst = self.instance
2336
    old_name = inst.name
2337

    
2338
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2339

    
2340
    # re-read the instance from the configuration after rename
2341
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2342

    
2343
    _StartInstanceDisks(self.cfg, inst, None)
2344
    try:
2345
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2346
                                          "sda", "sdb"):
2347
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2348
               " instance has been renamed in Ganeti)" %
2349
               (inst.name, inst.primary_node))
2350
        logger.Error(msg)
2351
    finally:
2352
      _ShutdownInstanceDisks(inst, self.cfg)
2353

    
2354

    
2355
class LURemoveInstance(LogicalUnit):
2356
  """Remove an instance.
2357

2358
  """
2359
  HPATH = "instance-remove"
2360
  HTYPE = constants.HTYPE_INSTANCE
2361
  _OP_REQP = ["instance_name"]
2362

    
2363
  def BuildHooksEnv(self):
2364
    """Build hooks env.
2365

2366
    This runs on master, primary and secondary nodes of the instance.
2367

2368
    """
2369
    env = _BuildInstanceHookEnvByObject(self.instance)
2370
    nl = [self.sstore.GetMasterNode()]
2371
    return env, nl, nl
2372

    
2373
  def CheckPrereq(self):
2374
    """Check prerequisites.
2375

2376
    This checks that the instance is in the cluster.
2377

2378
    """
2379
    instance = self.cfg.GetInstanceInfo(
2380
      self.cfg.ExpandInstanceName(self.op.instance_name))
2381
    if instance is None:
2382
      raise errors.OpPrereqError("Instance '%s' not known" %
2383
                                 self.op.instance_name)
2384
    self.instance = instance
2385

    
2386
  def Exec(self, feedback_fn):
2387
    """Remove the instance.
2388

2389
    """
2390
    instance = self.instance
2391
    logger.Info("shutting down instance %s on node %s" %
2392
                (instance.name, instance.primary_node))
2393

    
2394
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2395
      if self.op.ignore_failures:
2396
        feedback_fn("Warning: can't shutdown instance")
2397
      else:
2398
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2399
                                 (instance.name, instance.primary_node))
2400

    
2401
    logger.Info("removing block devices for instance %s" % instance.name)
2402

    
2403
    if not _RemoveDisks(instance, self.cfg):
2404
      if self.op.ignore_failures:
2405
        feedback_fn("Warning: can't remove instance's disks")
2406
      else:
2407
        raise errors.OpExecError("Can't remove instance's disks")
2408

    
2409
    logger.Info("removing instance %s out of cluster config" % instance.name)
2410

    
2411
    self.cfg.RemoveInstance(instance.name)
2412

    
2413

    
2414
class LUQueryInstances(NoHooksLU):
2415
  """Logical unit for querying instances.
2416

2417
  """
2418
  _OP_REQP = ["output_fields", "names"]
2419

    
2420
  def CheckPrereq(self):
2421
    """Check prerequisites.
2422

2423
    This checks that the fields required are valid output fields.
2424

2425
    """
2426
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2427
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2428
                               "admin_state", "admin_ram",
2429
                               "disk_template", "ip", "mac", "bridge",
2430
                               "sda_size", "sdb_size", "vcpus"],
2431
                       dynamic=self.dynamic_fields,
2432
                       selected=self.op.output_fields)
2433

    
2434
    self.wanted = _GetWantedInstances(self, self.op.names)
2435

    
2436
  def Exec(self, feedback_fn):
2437
    """Computes the list of nodes and their attributes.
2438

2439
    """
2440
    instance_names = self.wanted
2441
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2442
                     in instance_names]
2443

    
2444
    # begin data gathering
2445

    
2446
    nodes = frozenset([inst.primary_node for inst in instance_list])
2447

    
2448
    bad_nodes = []
2449
    if self.dynamic_fields.intersection(self.op.output_fields):
2450
      live_data = {}
2451
      node_data = rpc.call_all_instances_info(nodes)
2452
      for name in nodes:
2453
        result = node_data[name]
2454
        if result:
2455
          live_data.update(result)
2456
        elif result == False:
2457
          bad_nodes.append(name)
2458
        # else no instance is alive
2459
    else:
2460
      live_data = dict([(name, {}) for name in instance_names])
2461

    
2462
    # end data gathering
2463

    
2464
    output = []
2465
    for instance in instance_list:
2466
      iout = []
2467
      for field in self.op.output_fields:
2468
        if field == "name":
2469
          val = instance.name
2470
        elif field == "os":
2471
          val = instance.os
2472
        elif field == "pnode":
2473
          val = instance.primary_node
2474
        elif field == "snodes":
2475
          val = list(instance.secondary_nodes)
2476
        elif field == "admin_state":
2477
          val = (instance.status != "down")
2478
        elif field == "oper_state":
2479
          if instance.primary_node in bad_nodes:
2480
            val = None
2481
          else:
2482
            val = bool(live_data.get(instance.name))
2483
        elif field == "status":
2484
          if instance.primary_node in bad_nodes:
2485
            val = "ERROR_nodedown"
2486
          else:
2487
            running = bool(live_data.get(instance.name))
2488
            if running:
2489
              if instance.status != "down":
2490
                val = "running"
2491
              else:
2492
                val = "ERROR_up"
2493
            else:
2494
              if instance.status != "down":
2495
                val = "ERROR_down"
2496
              else:
2497
                val = "ADMIN_down"
2498
        elif field == "admin_ram":
2499
          val = instance.memory
2500
        elif field == "oper_ram":
2501
          if instance.primary_node in bad_nodes:
2502
            val = None
2503
          elif instance.name in live_data:
2504
            val = live_data[instance.name].get("memory", "?")
2505
          else:
2506
            val = "-"
2507
        elif field == "disk_template":
2508
          val = instance.disk_template
2509
        elif field == "ip":
2510
          val = instance.nics[0].ip
2511
        elif field == "bridge":
2512
          val = instance.nics[0].bridge
2513
        elif field == "mac":
2514
          val = instance.nics[0].mac
2515
        elif field == "sda_size" or field == "sdb_size":
2516
          disk = instance.FindDisk(field[:3])
2517
          if disk is None:
2518
            val = None
2519
          else:
2520
            val = disk.size
2521
        elif field == "vcpus":
2522
          val = instance.vcpus
2523
        else:
2524
          raise errors.ParameterError(field)
2525
        iout.append(val)
2526
      output.append(iout)
2527

    
2528
    return output
2529

    
2530

    
2531
class LUFailoverInstance(LogicalUnit):
2532
  """Failover an instance.
2533

2534
  """
2535
  HPATH = "instance-failover"
2536
  HTYPE = constants.HTYPE_INSTANCE
2537
  _OP_REQP = ["instance_name", "ignore_consistency"]
2538

    
2539
  def BuildHooksEnv(self):
2540
    """Build hooks env.
2541

2542
    This runs on master, primary and secondary nodes of the instance.
2543

2544
    """
2545
    env = {
2546
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2547
      }
2548
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2549
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2550
    return env, nl, nl
2551

    
2552
  def CheckPrereq(self):
2553
    """Check prerequisites.
2554

2555
    This checks that the instance is in the cluster.
2556

2557
    """
2558
    instance = self.cfg.GetInstanceInfo(
2559
      self.cfg.ExpandInstanceName(self.op.instance_name))
2560
    if instance is None:
2561
      raise errors.OpPrereqError("Instance '%s' not known" %
2562
                                 self.op.instance_name)
2563

    
2564
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2565
      raise errors.OpPrereqError("Instance's disk layout is not"
2566
                                 " network mirrored, cannot failover.")
2567

    
2568
    secondary_nodes = instance.secondary_nodes
2569
    if not secondary_nodes:
2570
      raise errors.ProgrammerError("no secondary node but using "
2571
                                   "DT_REMOTE_RAID1 template")
2572

    
2573
    target_node = secondary_nodes[0]
2574
    # check memory requirements on the secondary node
2575
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2576
                         instance.name, instance.memory)
2577

    
2578
    # check bridge existance
2579
    brlist = [nic.bridge for nic in instance.nics]
2580
    if not rpc.call_bridges_exist(target_node, brlist):
2581
      raise errors.OpPrereqError("One or more target bridges %s does not"
2582
                                 " exist on destination node '%s'" %
2583
                                 (brlist, target_node))
2584

    
2585
    self.instance = instance
2586

    
2587
  def Exec(self, feedback_fn):
2588
    """Failover an instance.
2589

2590
    The failover is done by shutting it down on its present node and
2591
    starting it on the secondary.
2592

2593
    """
2594
    instance = self.instance
2595

    
2596
    source_node = instance.primary_node
2597
    target_node = instance.secondary_nodes[0]
2598

    
2599
    feedback_fn("* checking disk consistency between source and target")
2600
    for dev in instance.disks:
2601
      # for remote_raid1, these are md over drbd
2602
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2603
        if not self.op.ignore_consistency:
2604
          raise errors.OpExecError("Disk %s is degraded on target node,"
2605
                                   " aborting failover." % dev.iv_name)
2606

    
2607
    feedback_fn("* shutting down instance on source node")
2608
    logger.Info("Shutting down instance %s on node %s" %
2609
                (instance.name, source_node))
2610

    
2611
    if not rpc.call_instance_shutdown(source_node, instance):
2612
      if self.op.ignore_consistency:
2613
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2614
                     " anyway. Please make sure node %s is down"  %
2615
                     (instance.name, source_node, source_node))
2616
      else:
2617
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2618
                                 (instance.name, source_node))
2619

    
2620
    feedback_fn("* deactivating the instance's disks on source node")
2621
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2622
      raise errors.OpExecError("Can't shut down the instance's disks.")
2623

    
2624
    instance.primary_node = target_node
2625
    # distribute new instance config to the other nodes
2626
    self.cfg.AddInstance(instance)
2627

    
2628
    feedback_fn("* activating the instance's disks on target node")
2629
    logger.Info("Starting instance %s on node %s" %
2630
                (instance.name, target_node))
2631

    
2632
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2633
                                             ignore_secondaries=True)
2634
    if not disks_ok:
2635
      _ShutdownInstanceDisks(instance, self.cfg)
2636
      raise errors.OpExecError("Can't activate the instance's disks")
2637

    
2638
    feedback_fn("* starting the instance on the target node")
2639
    if not rpc.call_instance_start(target_node, instance, None):
2640
      _ShutdownInstanceDisks(instance, self.cfg)
2641
      raise errors.OpExecError("Could not start instance %s on node %s." %
2642
                               (instance.name, target_node))
2643

    
2644

    
2645
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2646
  """Create a tree of block devices on the primary node.
2647

2648
  This always creates all devices.
2649

2650
  """
2651
  if device.children:
2652
    for child in device.children:
2653
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2654
        return False
2655

    
2656
  cfg.SetDiskID(device, node)
2657
  new_id = rpc.call_blockdev_create(node, device, device.size,
2658
                                    instance.name, True, info)
2659
  if not new_id:
2660
    return False
2661
  if device.physical_id is None:
2662
    device.physical_id = new_id
2663
  return True
2664

    
2665

    
2666
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2667
  """Create a tree of block devices on a secondary node.
2668

2669
  If this device type has to be created on secondaries, create it and
2670
  all its children.
2671

2672
  If not, just recurse to children keeping the same 'force' value.
2673

2674
  """
2675
  if device.CreateOnSecondary():
2676
    force = True
2677
  if device.children:
2678
    for child in device.children:
2679
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2680
                                        child, force, info):
2681
        return False
2682

    
2683
  if not force:
2684
    return True
2685
  cfg.SetDiskID(device, node)
2686
  new_id = rpc.call_blockdev_create(node, device, device.size,
2687
                                    instance.name, False, info)
2688
  if not new_id:
2689
    return False
2690
  if device.physical_id is None:
2691
    device.physical_id = new_id
2692
  return True
2693

    
2694

    
2695
def _GenerateUniqueNames(cfg, exts):
2696
  """Generate a suitable LV name.
2697

2698
  This will generate a logical volume name for the given instance.
2699

2700
  """
2701
  results = []
2702
  for val in exts:
2703
    new_id = cfg.GenerateUniqueID()
2704
    results.append("%s%s" % (new_id, val))
2705
  return results
2706

    
2707

    
2708
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2709
  """Generate a drbd device complete with its children.
2710

2711
  """
2712
  port = cfg.AllocatePort()
2713
  vgname = cfg.GetVGName()
2714
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2715
                          logical_id=(vgname, names[0]))
2716
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2717
                          logical_id=(vgname, names[1]))
2718
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2719
                          logical_id = (primary, secondary, port),
2720
                          children = [dev_data, dev_meta])
2721
  return drbd_dev
2722

    
2723

    
2724
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2725
  """Generate a drbd8 device complete with its children.
2726

2727
  """
2728
  port = cfg.AllocatePort()
2729
  vgname = cfg.GetVGName()
2730
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2731
                          logical_id=(vgname, names[0]))
2732
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2733
                          logical_id=(vgname, names[1]))
2734
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2735
                          logical_id = (primary, secondary, port),
2736
                          children = [dev_data, dev_meta],
2737
                          iv_name=iv_name)
2738
  return drbd_dev
2739

    
2740

    
2741
def _GenerateDiskTemplate(cfg, template_name,
2742
                          instance_name, primary_node,
2743
                          secondary_nodes, disk_sz, swap_sz):
2744
  """Generate the entire disk layout for a given template type.
2745

2746
  """
2747
  #TODO: compute space requirements
2748

    
2749
  vgname = cfg.GetVGName()
2750
  if template_name == constants.DT_DISKLESS:
2751
    disks = []
2752
  elif template_name == constants.DT_PLAIN:
2753
    if len(secondary_nodes) != 0:
2754
      raise errors.ProgrammerError("Wrong template configuration")
2755

    
2756
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2757
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2758
                           logical_id=(vgname, names[0]),
2759
                           iv_name = "sda")
2760
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2761
                           logical_id=(vgname, names[1]),
2762
                           iv_name = "sdb")
2763
    disks = [sda_dev, sdb_dev]
2764
  elif template_name == constants.DT_LOCAL_RAID1:
2765
    if len(secondary_nodes) != 0:
2766
      raise errors.ProgrammerError("Wrong template configuration")
2767

    
2768

    
2769
    names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2770
                                       ".sdb_m1", ".sdb_m2"])
2771
    sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2772
                              logical_id=(vgname, names[0]))
2773
    sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2774
                              logical_id=(vgname, names[1]))
2775
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2776
                              size=disk_sz,
2777
                              children = [sda_dev_m1, sda_dev_m2])
2778
    sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2779
                              logical_id=(vgname, names[2]))
2780
    sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2781
                              logical_id=(vgname, names[3]))
2782
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2783
                              size=swap_sz,
2784
                              children = [sdb_dev_m1, sdb_dev_m2])
2785
    disks = [md_sda_dev, md_sdb_dev]
2786
  elif template_name == constants.DT_REMOTE_RAID1:
2787
    if len(secondary_nodes) != 1:
2788
      raise errors.ProgrammerError("Wrong template configuration")
2789
    remote_node = secondary_nodes[0]
2790
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2791
                                       ".sdb_data", ".sdb_meta"])
2792
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2793
                                         disk_sz, names[0:2])
2794
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2795
                              children = [drbd_sda_dev], size=disk_sz)
2796
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2797
                                         swap_sz, names[2:4])
2798
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2799
                              children = [drbd_sdb_dev], size=swap_sz)
2800
    disks = [md_sda_dev, md_sdb_dev]
2801
  elif template_name == constants.DT_DRBD8:
2802
    if len(secondary_nodes) != 1:
2803
      raise errors.ProgrammerError("Wrong template configuration")
2804
    remote_node = secondary_nodes[0]
2805
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2806
                                       ".sdb_data", ".sdb_meta"])
2807
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2808
                                         disk_sz, names[0:2], "sda")
2809
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2810
                                         swap_sz, names[2:4], "sdb")
2811
    disks = [drbd_sda_dev, drbd_sdb_dev]
2812
  else:
2813
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2814
  return disks
2815

    
2816

    
2817
def _GetInstanceInfoText(instance):
2818
  """Compute that text that should be added to the disk's metadata.
2819

2820
  """
2821
  return "originstname+%s" % instance.name
2822

    
2823

    
2824
def _CreateDisks(cfg, instance):
2825
  """Create all disks for an instance.
2826

2827
  This abstracts away some work from AddInstance.
2828

2829
  Args:
2830
    instance: the instance object
2831

2832
  Returns:
2833
    True or False showing the success of the creation process
2834

2835
  """
2836
  info = _GetInstanceInfoText(instance)
2837

    
2838
  for device in instance.disks:
2839
    logger.Info("creating volume %s for instance %s" %
2840
              (device.iv_name, instance.name))
2841
    #HARDCODE
2842
    for secondary_node in instance.secondary_nodes:
2843
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2844
                                        device, False, info):
2845
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2846
                     (device.iv_name, device, secondary_node))
2847
        return False
2848
    #HARDCODE
2849
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2850
                                    instance, device, info):
2851
      logger.Error("failed to create volume %s on primary!" %
2852
                   device.iv_name)
2853
      return False
2854
  return True
2855

    
2856

    
2857
def _RemoveDisks(instance, cfg):
2858
  """Remove all disks for an instance.
2859

2860
  This abstracts away some work from `AddInstance()` and
2861
  `RemoveInstance()`. Note that in case some of the devices couldn't
2862
  be removed, the removal will continue with the other ones (compare
2863
  with `_CreateDisks()`).
2864

2865
  Args:
2866
    instance: the instance object
2867

2868
  Returns:
2869
    True or False showing the success of the removal proces
2870

2871
  """
2872
  logger.Info("removing block devices for instance %s" % instance.name)
2873

    
2874
  result = True
2875
  for device in instance.disks:
2876
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2877
      cfg.SetDiskID(disk, node)
2878
      if not rpc.call_blockdev_remove(node, disk):
2879
        logger.Error("could not remove block device %s on node %s,"
2880
                     " continuing anyway" %
2881
                     (device.iv_name, node))
2882
        result = False
2883
  return result
2884

    
2885

    
2886
class LUCreateInstance(LogicalUnit):
2887
  """Create an instance.
2888

2889
  """
2890
  HPATH = "instance-add"
2891
  HTYPE = constants.HTYPE_INSTANCE
2892
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2893
              "disk_template", "swap_size", "mode", "start", "vcpus",
2894
              "wait_for_sync", "ip_check", "mac"]
2895

    
2896
  def BuildHooksEnv(self):
2897
    """Build hooks env.
2898

2899
    This runs on master, primary and secondary nodes of the instance.
2900

2901
    """
2902
    env = {
2903
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2904
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2905
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2906
      "INSTANCE_ADD_MODE": self.op.mode,
2907
      }
2908
    if self.op.mode == constants.INSTANCE_IMPORT:
2909
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2910
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2911
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2912

    
2913
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2914
      primary_node=self.op.pnode,
2915
      secondary_nodes=self.secondaries,
2916
      status=self.instance_status,
2917
      os_type=self.op.os_type,
2918
      memory=self.op.mem_size,
2919
      vcpus=self.op.vcpus,
2920
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2921
    ))
2922

    
2923
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2924
          self.secondaries)
2925
    return env, nl, nl
2926

    
2927

    
2928
  def CheckPrereq(self):
2929
    """Check prerequisites.
2930

2931
    """
2932
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
2933
      if not hasattr(self.op, attr):
2934
        setattr(self.op, attr, None)
2935

    
2936
    if self.op.mode not in (constants.INSTANCE_CREATE,
2937
                            constants.INSTANCE_IMPORT):
2938
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2939
                                 self.op.mode)
2940

    
2941
    if self.op.mode == constants.INSTANCE_IMPORT:
2942
      src_node = getattr(self.op, "src_node", None)
2943
      src_path = getattr(self.op, "src_path", None)
2944
      if src_node is None or src_path is None:
2945
        raise errors.OpPrereqError("Importing an instance requires source"
2946
                                   " node and path options")
2947
      src_node_full = self.cfg.ExpandNodeName(src_node)
2948
      if src_node_full is None:
2949
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2950
      self.op.src_node = src_node = src_node_full
2951

    
2952
      if not os.path.isabs(src_path):
2953
        raise errors.OpPrereqError("The source path must be absolute")
2954

    
2955
      export_info = rpc.call_export_info(src_node, src_path)
2956

    
2957
      if not export_info:
2958
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2959

    
2960
      if not export_info.has_section(constants.INISECT_EXP):
2961
        raise errors.ProgrammerError("Corrupted export config")
2962

    
2963
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2964
      if (int(ei_version) != constants.EXPORT_VERSION):
2965
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2966
                                   (ei_version, constants.EXPORT_VERSION))
2967

    
2968
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2969
        raise errors.OpPrereqError("Can't import instance with more than"
2970
                                   " one data disk")
2971

    
2972
      # FIXME: are the old os-es, disk sizes, etc. useful?
2973
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2974
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2975
                                                         'disk0_dump'))
2976
      self.src_image = diskimage
2977
    else: # INSTANCE_CREATE
2978
      if getattr(self.op, "os_type", None) is None:
2979
        raise errors.OpPrereqError("No guest OS specified")
2980

    
2981
    # check primary node
2982
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2983
    if pnode is None:
2984
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
2985
                                 self.op.pnode)
2986
    self.op.pnode = pnode.name
2987
    self.pnode = pnode
2988
    self.secondaries = []
2989
    # disk template and mirror node verification
2990
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2991
      raise errors.OpPrereqError("Invalid disk template name")
2992

    
2993
    if self.op.disk_template in constants.DTS_NET_MIRROR:
2994
      if getattr(self.op, "snode", None) is None:
2995
        raise errors.OpPrereqError("The networked disk templates need"
2996
                                   " a mirror node")
2997

    
2998
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2999
      if snode_name is None:
3000
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3001
                                   self.op.snode)
3002
      elif snode_name == pnode.name:
3003
        raise errors.OpPrereqError("The secondary node cannot be"
3004
                                   " the primary node.")
3005
      self.secondaries.append(snode_name)
3006

    
3007
    # Required free disk space as a function of disk and swap space
3008
    req_size_dict = {
3009
      constants.DT_DISKLESS: None,
3010
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
3011
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
3012
      # 256 MB are added for drbd metadata, 128MB for each drbd device
3013
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
3014
      constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
3015
    }
3016

    
3017
    if self.op.disk_template not in req_size_dict:
3018
      raise errors.ProgrammerError("Disk template '%s' size requirement"
3019
                                   " is unknown" %  self.op.disk_template)
3020

    
3021
    req_size = req_size_dict[self.op.disk_template]
3022

    
3023
    # Check lv size requirements
3024
    if req_size is not None:
3025
      nodenames = [pnode.name] + self.secondaries
3026
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3027
      for node in nodenames:
3028
        info = nodeinfo.get(node, None)
3029
        if not info:
3030
          raise errors.OpPrereqError("Cannot get current information"
3031
                                     " from node '%s'" % nodeinfo)
3032
        vg_free = info.get('vg_free', None)
3033
        if not isinstance(vg_free, int):
3034
          raise errors.OpPrereqError("Can't compute free disk space on"
3035
                                     " node %s" % node)
3036
        if req_size > info['vg_free']:
3037
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3038
                                     " %d MB available, %d MB required" %
3039
                                     (node, info['vg_free'], req_size))
3040

    
3041
    # os verification
3042
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3043
    if not os_obj:
3044
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3045
                                 " primary node"  % self.op.os_type)
3046

    
3047
    if self.op.kernel_path == constants.VALUE_NONE:
3048
      raise errors.OpPrereqError("Can't set instance kernel to none")
3049

    
3050
    # instance verification
3051
    hostname1 = utils.HostInfo(self.op.instance_name)
3052

    
3053
    self.op.instance_name = instance_name = hostname1.name
3054
    instance_list = self.cfg.GetInstanceList()
3055
    if instance_name in instance_list:
3056
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3057
                                 instance_name)
3058

    
3059
    ip = getattr(self.op, "ip", None)
3060
    if ip is None or ip.lower() == "none":
3061
      inst_ip = None
3062
    elif ip.lower() == "auto":
3063
      inst_ip = hostname1.ip
3064
    else:
3065
      if not utils.IsValidIP(ip):
3066
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3067
                                   " like a valid IP" % ip)
3068
      inst_ip = ip
3069
    self.inst_ip = inst_ip
3070

    
3071
    if self.op.start and not self.op.ip_check:
3072
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3073
                                 " adding an instance in start mode")
3074

    
3075
    if self.op.ip_check:
3076
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3077
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3078
                                   (hostname1.ip, instance_name))
3079

    
3080
    # MAC address verification
3081
    if self.op.mac != "auto":
3082
      if not utils.IsValidMac(self.op.mac.lower()):
3083
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3084
                                   self.op.mac)
3085

    
3086
    # bridge verification
3087
    bridge = getattr(self.op, "bridge", None)
3088
    if bridge is None:
3089
      self.op.bridge = self.cfg.GetDefBridge()
3090
    else:
3091
      self.op.bridge = bridge
3092

    
3093
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3094
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3095
                                 " destination node '%s'" %
3096
                                 (self.op.bridge, pnode.name))
3097

    
3098
    # boot order verification
3099
    if self.op.hvm_boot_order is not None:
3100
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3101
        raise errors.OpPrereqError("invalid boot order specified,"
3102
                                   " must be one or more of [acdn]")
3103

    
3104
    if self.op.start:
3105
      self.instance_status = 'up'
3106
    else:
3107
      self.instance_status = 'down'
3108

    
3109
  def Exec(self, feedback_fn):
3110
    """Create and add the instance to the cluster.
3111

3112
    """
3113
    instance = self.op.instance_name
3114
    pnode_name = self.pnode.name
3115

    
3116
    if self.op.mac == "auto":
3117
      mac_address = self.cfg.GenerateMAC()
3118
    else:
3119
      mac_address = self.op.mac
3120

    
3121
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3122
    if self.inst_ip is not None:
3123
      nic.ip = self.inst_ip
3124

    
3125
    ht_kind = self.sstore.GetHypervisorType()
3126
    if ht_kind in constants.HTS_REQ_PORT:
3127
      network_port = self.cfg.AllocatePort()
3128
    else:
3129
      network_port = None
3130

    
3131
    disks = _GenerateDiskTemplate(self.cfg,
3132
                                  self.op.disk_template,
3133
                                  instance, pnode_name,
3134
                                  self.secondaries, self.op.disk_size,
3135
                                  self.op.swap_size)
3136

    
3137
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3138
                            primary_node=pnode_name,
3139
                            memory=self.op.mem_size,
3140
                            vcpus=self.op.vcpus,
3141
                            nics=[nic], disks=disks,
3142
                            disk_template=self.op.disk_template,
3143
                            status=self.instance_status,
3144
                            network_port=network_port,
3145
                            kernel_path=self.op.kernel_path,
3146
                            initrd_path=self.op.initrd_path,
3147
                            hvm_boot_order=self.op.hvm_boot_order,
3148
                            )
3149

    
3150
    feedback_fn("* creating instance disks...")
3151
    if not _CreateDisks(self.cfg, iobj):
3152
      _RemoveDisks(iobj, self.cfg)
3153
      raise errors.OpExecError("Device creation failed, reverting...")
3154

    
3155
    feedback_fn("adding instance %s to cluster config" % instance)
3156

    
3157
    self.cfg.AddInstance(iobj)
3158

    
3159
    if self.op.wait_for_sync:
3160
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3161
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3162
      # make sure the disks are not degraded (still sync-ing is ok)
3163
      time.sleep(15)
3164
      feedback_fn("* checking mirrors status")
3165
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3166
    else:
3167
      disk_abort = False
3168

    
3169
    if disk_abort:
3170
      _RemoveDisks(iobj, self.cfg)
3171
      self.cfg.RemoveInstance(iobj.name)
3172
      raise errors.OpExecError("There are some degraded disks for"
3173
                               " this instance")
3174

    
3175
    feedback_fn("creating os for instance %s on node %s" %
3176
                (instance, pnode_name))
3177

    
3178
    if iobj.disk_template != constants.DT_DISKLESS:
3179
      if self.op.mode == constants.INSTANCE_CREATE:
3180
        feedback_fn("* running the instance OS create scripts...")
3181
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3182
          raise errors.OpExecError("could not add os for instance %s"
3183
                                   " on node %s" %
3184
                                   (instance, pnode_name))
3185

    
3186
      elif self.op.mode == constants.INSTANCE_IMPORT:
3187
        feedback_fn("* running the instance OS import scripts...")
3188
        src_node = self.op.src_node
3189
        src_image = self.src_image
3190
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3191
                                                src_node, src_image):
3192
          raise errors.OpExecError("Could not import os for instance"
3193
                                   " %s on node %s" %
3194
                                   (instance, pnode_name))
3195
      else:
3196
        # also checked in the prereq part
3197
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3198
                                     % self.op.mode)
3199

    
3200
    if self.op.start:
3201
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3202
      feedback_fn("* starting instance...")
3203
      if not rpc.call_instance_start(pnode_name, iobj, None):
3204
        raise errors.OpExecError("Could not start instance")
3205

    
3206

    
3207
class LUConnectConsole(NoHooksLU):
3208
  """Connect to an instance's console.
3209

3210
  This is somewhat special in that it returns the command line that
3211
  you need to run on the master node in order to connect to the
3212
  console.
3213

3214
  """
3215
  _OP_REQP = ["instance_name"]
3216

    
3217
  def CheckPrereq(self):
3218
    """Check prerequisites.
3219

3220
    This checks that the instance is in the cluster.
3221

3222
    """
3223
    instance = self.cfg.GetInstanceInfo(
3224
      self.cfg.ExpandInstanceName(self.op.instance_name))
3225
    if instance is None:
3226
      raise errors.OpPrereqError("Instance '%s' not known" %
3227
                                 self.op.instance_name)
3228
    self.instance = instance
3229

    
3230
  def Exec(self, feedback_fn):
3231
    """Connect to the console of an instance
3232

3233
    """
3234
    instance = self.instance
3235
    node = instance.primary_node
3236

    
3237
    node_insts = rpc.call_instance_list([node])[node]
3238
    if node_insts is False:
3239
      raise errors.OpExecError("Can't connect to node %s." % node)
3240

    
3241
    if instance.name not in node_insts:
3242
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3243

    
3244
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3245

    
3246
    hyper = hypervisor.GetHypervisor()
3247
    console_cmd = hyper.GetShellCommandForConsole(instance)
3248
    # build ssh cmdline
3249
    argv = ["ssh", "-q", "-t"]
3250
    argv.extend(ssh.KNOWN_HOSTS_OPTS)
3251
    argv.extend(ssh.BATCH_MODE_OPTS)
3252
    argv.append(node)
3253
    argv.append(console_cmd)
3254
    return "ssh", argv
3255

    
3256

    
3257
class LUAddMDDRBDComponent(LogicalUnit):
3258
  """Adda new mirror member to an instance's disk.
3259

3260
  """
3261
  HPATH = "mirror-add"
3262
  HTYPE = constants.HTYPE_INSTANCE
3263
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3264

    
3265
  def BuildHooksEnv(self):
3266
    """Build hooks env.
3267

3268
    This runs on the master, the primary and all the secondaries.
3269

3270
    """
3271
    env = {
3272
      "NEW_SECONDARY": self.op.remote_node,
3273
      "DISK_NAME": self.op.disk_name,
3274
      }
3275
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3276
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3277
          self.op.remote_node,] + list(self.instance.secondary_nodes)
3278
    return env, nl, nl
3279

    
3280
  def CheckPrereq(self):
3281
    """Check prerequisites.
3282

3283
    This checks that the instance is in the cluster.
3284

3285
    """
3286
    instance = self.cfg.GetInstanceInfo(
3287
      self.cfg.ExpandInstanceName(self.op.instance_name))
3288
    if instance is None:
3289
      raise errors.OpPrereqError("Instance '%s' not known" %
3290
                                 self.op.instance_name)
3291
    self.instance = instance
3292

    
3293
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3294
    if remote_node is None:
3295
      raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3296
    self.remote_node = remote_node
3297

    
3298
    if remote_node == instance.primary_node:
3299
      raise errors.OpPrereqError("The specified node is the primary node of"
3300
                                 " the instance.")
3301

    
3302
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3303
      raise errors.OpPrereqError("Instance's disk layout is not"
3304
                                 " remote_raid1.")
3305
    for disk in instance.disks:
3306
      if disk.iv_name == self.op.disk_name:
3307
        break
3308
    else:
3309
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3310
                                 " instance." % self.op.disk_name)
3311
    if len(disk.children) > 1:
3312
      raise errors.OpPrereqError("The device already has two slave devices."
3313
                                 " This would create a 3-disk raid1 which we"
3314
                                 " don't allow.")
3315
    self.disk = disk
3316

    
3317
  def Exec(self, feedback_fn):
3318
    """Add the mirror component
3319

3320
    """
3321
    disk = self.disk
3322
    instance = self.instance
3323

    
3324
    remote_node = self.remote_node
3325
    lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3326
    names = _GenerateUniqueNames(self.cfg, lv_names)
3327
    new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3328
                                     remote_node, disk.size, names)
3329

    
3330
    logger.Info("adding new mirror component on secondary")
3331
    #HARDCODE
3332
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3333
                                      new_drbd, False,
3334
                                      _GetInstanceInfoText(instance)):
3335
      raise errors.OpExecError("Failed to create new component on secondary"
3336
                               " node %s" % remote_node)
3337

    
3338
    logger.Info("adding new mirror component on primary")
3339
    #HARDCODE
3340
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3341
                                    instance, new_drbd,
3342
                                    _GetInstanceInfoText(instance)):
3343
      # remove secondary dev
3344
      self.cfg.SetDiskID(new_drbd, remote_node)
3345
      rpc.call_blockdev_remove(remote_node, new_drbd)
3346
      raise errors.OpExecError("Failed to create volume on primary")
3347

    
3348
    # the device exists now
3349
    # call the primary node to add the mirror to md
3350
    logger.Info("adding new mirror component to md")
3351
    if not rpc.call_blockdev_addchildren(instance.primary_node,
3352
                                         disk, [new_drbd]):
3353
      logger.Error("Can't add mirror compoment to md!")
3354
      self.cfg.SetDiskID(new_drbd, remote_node)
3355
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
3356
        logger.Error("Can't rollback on secondary")
3357
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
3358
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3359
        logger.Error("Can't rollback on primary")
3360
      raise errors.OpExecError("Can't add mirror component to md array")
3361

    
3362
    disk.children.append(new_drbd)
3363

    
3364
    self.cfg.AddInstance(instance)
3365

    
3366
    _WaitForSync(self.cfg, instance, self.proc)
3367

    
3368
    return 0
3369

    
3370

    
3371
class LURemoveMDDRBDComponent(LogicalUnit):
3372
  """Remove a component from a remote_raid1 disk.
3373

3374
  """
3375
  HPATH = "mirror-remove"
3376
  HTYPE = constants.HTYPE_INSTANCE
3377
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3378

    
3379
  def BuildHooksEnv(self):
3380
    """Build hooks env.
3381

3382
    This runs on the master, the primary and all the secondaries.
3383

3384
    """
3385
    env = {
3386
      "DISK_NAME": self.op.disk_name,
3387
      "DISK_ID": self.op.disk_id,
3388
      "OLD_SECONDARY": self.old_secondary,
3389
      }
3390
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3391
    nl = [self.sstore.GetMasterNode(),
3392
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3393
    return env, nl, nl
3394

    
3395
  def CheckPrereq(self):
3396
    """Check prerequisites.
3397

3398
    This checks that the instance is in the cluster.
3399

3400
    """
3401
    instance = self.cfg.GetInstanceInfo(
3402
      self.cfg.ExpandInstanceName(self.op.instance_name))
3403
    if instance is None:
3404
      raise errors.OpPrereqError("Instance '%s' not known" %
3405
                                 self.op.instance_name)
3406
    self.instance = instance
3407

    
3408
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3409
      raise errors.OpPrereqError("Instance's disk layout is not"
3410
                                 " remote_raid1.")
3411
    for disk in instance.disks:
3412
      if disk.iv_name == self.op.disk_name:
3413
        break
3414
    else:
3415
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3416
                                 " instance." % self.op.disk_name)
3417
    for child in disk.children:
3418
      if (child.dev_type == constants.LD_DRBD7 and
3419
          child.logical_id[2] == self.op.disk_id):
3420
        break
3421
    else:
3422
      raise errors.OpPrereqError("Can't find the device with this port.")
3423

    
3424
    if len(disk.children) < 2:
3425
      raise errors.OpPrereqError("Cannot remove the last component from"
3426
                                 " a mirror.")
3427
    self.disk = disk
3428
    self.child = child
3429
    if self.child.logical_id[0] == instance.primary_node:
3430
      oid = 1
3431
    else:
3432
      oid = 0
3433
    self.old_secondary = self.child.logical_id[oid]
3434

    
3435
  def Exec(self, feedback_fn):
3436
    """Remove the mirror component
3437

3438
    """
3439
    instance = self.instance
3440
    disk = self.disk
3441
    child = self.child
3442
    logger.Info("remove mirror component")
3443
    self.cfg.SetDiskID(disk, instance.primary_node)
3444
    if not rpc.call_blockdev_removechildren(instance.primary_node,
3445
                                            disk, [child]):
3446
      raise errors.OpExecError("Can't remove child from mirror.")
3447

    
3448
    for node in child.logical_id[:2]:
3449
      self.cfg.SetDiskID(child, node)
3450
      if not rpc.call_blockdev_remove(node, child):
3451
        logger.Error("Warning: failed to remove device from node %s,"
3452
                     " continuing operation." % node)
3453

    
3454
    disk.children.remove(child)
3455
    self.cfg.AddInstance(instance)
3456

    
3457

    
3458
class LUReplaceDisks(LogicalUnit):
3459
  """Replace the disks of an instance.
3460

3461
  """
3462
  HPATH = "mirrors-replace"
3463
  HTYPE = constants.HTYPE_INSTANCE
3464
  _OP_REQP = ["instance_name", "mode", "disks"]
3465

    
3466
  def BuildHooksEnv(self):
3467
    """Build hooks env.
3468

3469
    This runs on the master, the primary and all the secondaries.
3470

3471
    """
3472
    env = {
3473
      "MODE": self.op.mode,
3474
      "NEW_SECONDARY": self.op.remote_node,
3475
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3476
      }
3477
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3478
    nl = [
3479
      self.sstore.GetMasterNode(),
3480
      self.instance.primary_node,
3481
      ]
3482
    if self.op.remote_node is not None:
3483
      nl.append(self.op.remote_node)
3484
    return env, nl, nl
3485

    
3486
  def CheckPrereq(self):
3487
    """Check prerequisites.
3488

3489
    This checks that the instance is in the cluster.
3490

3491
    """
3492
    instance = self.cfg.GetInstanceInfo(
3493
      self.cfg.ExpandInstanceName(self.op.instance_name))
3494
    if instance is None:
3495
      raise errors.OpPrereqError("Instance '%s' not known" %
3496
                                 self.op.instance_name)
3497
    self.instance = instance
3498
    self.op.instance_name = instance.name
3499

    
3500
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3501
      raise errors.OpPrereqError("Instance's disk layout is not"
3502
                                 " network mirrored.")
3503

    
3504
    if len(instance.secondary_nodes) != 1:
3505
      raise errors.OpPrereqError("The instance has a strange layout,"
3506
                                 " expected one secondary but found %d" %
3507
                                 len(instance.secondary_nodes))
3508

    
3509
    self.sec_node = instance.secondary_nodes[0]
3510

    
3511
    remote_node = getattr(self.op, "remote_node", None)
3512
    if remote_node is not None:
3513
      remote_node = self.cfg.ExpandNodeName(remote_node)
3514
      if remote_node is None:
3515
        raise errors.OpPrereqError("Node '%s' not known" %
3516
                                   self.op.remote_node)
3517
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3518
    else:
3519
      self.remote_node_info = None
3520
    if remote_node == instance.primary_node:
3521
      raise errors.OpPrereqError("The specified node is the primary node of"
3522
                                 " the instance.")
3523
    elif remote_node == self.sec_node:
3524
      if self.op.mode == constants.REPLACE_DISK_SEC:
3525
        # this is for DRBD8, where we can't execute the same mode of
3526
        # replacement as for drbd7 (no different port allocated)
3527
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3528
                                   " replacement")
3529
      # the user gave the current secondary, switch to
3530
      # 'no-replace-secondary' mode for drbd7
3531
      remote_node = None
3532
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3533
        self.op.mode != constants.REPLACE_DISK_ALL):
3534
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3535
                                 " disks replacement, not individual ones")
3536
    if instance.disk_template == constants.DT_DRBD8:
3537
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3538
          remote_node is not None):
3539
        # switch to replace secondary mode
3540
        self.op.mode = constants.REPLACE_DISK_SEC
3541

    
3542
      if self.op.mode == constants.REPLACE_DISK_ALL:
3543
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3544
                                   " secondary disk replacement, not"
3545
                                   " both at once")
3546
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3547
        if remote_node is not None:
3548
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3549
                                     " the secondary while doing a primary"
3550
                                     " node disk replacement")
3551
        self.tgt_node = instance.primary_node
3552
        self.oth_node = instance.secondary_nodes[0]
3553
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3554
        self.new_node = remote_node # this can be None, in which case
3555
                                    # we don't change the secondary
3556
        self.tgt_node = instance.secondary_nodes[0]
3557
        self.oth_node = instance.primary_node
3558
      else:
3559
        raise errors.ProgrammerError("Unhandled disk replace mode")
3560

    
3561
    for name in self.op.disks:
3562
      if instance.FindDisk(name) is None:
3563
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3564
                                   (name, instance.name))
3565
    self.op.remote_node = remote_node
3566

    
3567
  def _ExecRR1(self, feedback_fn):
3568
    """Replace the disks of an instance.
3569

3570
    """
3571
    instance = self.instance
3572
    iv_names = {}
3573
    # start of work
3574
    if self.op.remote_node is None:
3575
      remote_node = self.sec_node
3576
    else:
3577
      remote_node = self.op.remote_node
3578
    cfg = self.cfg
3579
    for dev in instance.disks:
3580
      size = dev.size
3581
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3582
      names = _GenerateUniqueNames(cfg, lv_names)
3583
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3584
                                       remote_node, size, names)
3585
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3586
      logger.Info("adding new mirror component on secondary for %s" %
3587
                  dev.iv_name)
3588
      #HARDCODE
3589
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3590
                                        new_drbd, False,
3591
                                        _GetInstanceInfoText(instance)):
3592
        raise errors.OpExecError("Failed to create new component on secondary"
3593
                                 " node %s. Full abort, cleanup manually!" %
3594
                                 remote_node)
3595

    
3596
      logger.Info("adding new mirror component on primary")
3597
      #HARDCODE
3598
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3599
                                      instance, new_drbd,
3600
                                      _GetInstanceInfoText(instance)):
3601
        # remove secondary dev
3602
        cfg.SetDiskID(new_drbd, remote_node)
3603
        rpc.call_blockdev_remove(remote_node, new_drbd)
3604
        raise errors.OpExecError("Failed to create volume on primary!"
3605
                                 " Full abort, cleanup manually!!")
3606

    
3607
      # the device exists now
3608
      # call the primary node to add the mirror to md
3609
      logger.Info("adding new mirror component to md")
3610
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3611
                                           [new_drbd]):
3612
        logger.Error("Can't add mirror compoment to md!")
3613
        cfg.SetDiskID(new_drbd, remote_node)
3614
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3615
          logger.Error("Can't rollback on secondary")
3616
        cfg.SetDiskID(new_drbd, instance.primary_node)
3617
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3618
          logger.Error("Can't rollback on primary")
3619
        raise errors.OpExecError("Full abort, cleanup manually!!")
3620

    
3621
      dev.children.append(new_drbd)
3622
      cfg.AddInstance(instance)
3623

    
3624
    # this can fail as the old devices are degraded and _WaitForSync
3625
    # does a combined result over all disks, so we don't check its
3626
    # return value
3627
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3628

    
3629
    # so check manually all the devices
3630
    for name in iv_names:
3631
      dev, child, new_drbd = iv_names[name]
3632
      cfg.SetDiskID(dev, instance.primary_node)
3633
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3634
      if is_degr:
3635
        raise errors.OpExecError("MD device %s is degraded!" % name)
3636
      cfg.SetDiskID(new_drbd, instance.primary_node)
3637
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3638
      if is_degr:
3639
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3640

    
3641
    for name in iv_names:
3642
      dev, child, new_drbd = iv_names[name]
3643
      logger.Info("remove mirror %s component" % name)
3644
      cfg.SetDiskID(dev, instance.primary_node)
3645
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3646
                                              dev, [child]):
3647
        logger.Error("Can't remove child from mirror, aborting"
3648
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3649
        continue
3650

    
3651
      for node in child.logical_id[:2]:
3652
        logger.Info("remove child device on %s" % node)
3653
        cfg.SetDiskID(child, node)
3654
        if not rpc.call_blockdev_remove(node, child):
3655
          logger.Error("Warning: failed to remove device from node %s,"
3656
                       " continuing operation." % node)
3657

    
3658
      dev.children.remove(child)
3659

    
3660
      cfg.AddInstance(instance)
3661

    
3662
  def _ExecD8DiskOnly(self, feedback_fn):
3663
    """Replace a disk on the primary or secondary for dbrd8.
3664

3665
    The algorithm for replace is quite complicated:
3666
      - for each disk to be replaced:
3667
        - create new LVs on the target node with unique names
3668
        - detach old LVs from the drbd device
3669
        - rename old LVs to name_replaced.<time_t>
3670
        - rename new LVs to old LVs
3671
        - attach the new LVs (with the old names now) to the drbd device
3672
      - wait for sync across all devices
3673
      - for each modified disk:
3674
        - remove old LVs (which have the name name_replaces.<time_t>)
3675

3676
    Failures are not very well handled.
3677

3678
    """
3679
    steps_total = 6
3680
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3681
    instance = self.instance
3682
    iv_names = {}
3683
    vgname = self.cfg.GetVGName()
3684
    # start of work
3685
    cfg = self.cfg
3686
    tgt_node = self.tgt_node
3687
    oth_node = self.oth_node
3688

    
3689
    # Step: check device activation
3690
    self.proc.LogStep(1, steps_total, "check device existence")
3691
    info("checking volume groups")
3692
    my_vg = cfg.GetVGName()
3693
    results = rpc.call_vg_list([oth_node, tgt_node])
3694
    if not results:
3695
      raise errors.OpExecError("Can't list volume groups on the nodes")
3696
    for node in oth_node, tgt_node:
3697
      res = results.get(node, False)
3698
      if not res or my_vg not in res:
3699
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3700
                                 (my_vg, node))
3701
    for dev in instance.disks:
3702
      if not dev.iv_name in self.op.disks:
3703
        continue
3704
      for node in tgt_node, oth_node:
3705
        info("checking %s on %s" % (dev.iv_name, node))
3706
        cfg.SetDiskID(dev, node)
3707
        if not rpc.call_blockdev_find(node, dev):
3708
          raise errors.OpExecError("Can't find device %s on node %s" %
3709
                                   (dev.iv_name, node))
3710

    
3711
    # Step: check other node consistency
3712
    self.proc.LogStep(2, steps_total, "check peer consistency")
3713
    for dev in instance.disks:
3714
      if not dev.iv_name in self.op.disks:
3715
        continue
3716
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3717
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3718
                                   oth_node==instance.primary_node):
3719
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3720
                                 " to replace disks on this node (%s)" %
3721
                                 (oth_node, tgt_node))
3722

    
3723
    # Step: create new storage
3724
    self.proc.LogStep(3, steps_total, "allocate new storage")
3725
    for dev in instance.disks:
3726
      if not dev.iv_name in self.op.disks:
3727
        continue
3728
      size = dev.size
3729
      cfg.SetDiskID(dev, tgt_node)
3730
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3731
      names = _GenerateUniqueNames(cfg, lv_names)
3732
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3733
                             logical_id=(vgname, names[0]))
3734
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3735
                             logical_id=(vgname, names[1]))
3736
      new_lvs = [lv_data, lv_meta]
3737
      old_lvs = dev.children
3738
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3739
      info("creating new local storage on %s for %s" %
3740
           (tgt_node, dev.iv_name))
3741
      # since we *always* want to create this LV, we use the
3742
      # _Create...OnPrimary (which forces the creation), even if we
3743
      # are talking about the secondary node
3744
      for new_lv in new_lvs:
3745
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3746
                                        _GetInstanceInfoText(instance)):
3747
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3748
                                   " node '%s'" %
3749
                                   (new_lv.logical_id[1], tgt_node))
3750

    
3751
    # Step: for each lv, detach+rename*2+attach
3752
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3753
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3754
      info("detaching %s drbd from local storage" % dev.iv_name)
3755
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3756
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3757
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3758
      #dev.children = []
3759
      #cfg.Update(instance)
3760

    
3761
      # ok, we created the new LVs, so now we know we have the needed
3762
      # storage; as such, we proceed on the target node to rename
3763
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3764
      # using the assumption that logical_id == physical_id (which in
3765
      # turn is the unique_id on that node)
3766

    
3767
      # FIXME(iustin): use a better name for the replaced LVs
3768
      temp_suffix = int(time.time())
3769
      ren_fn = lambda d, suff: (d.physical_id[0],
3770
                                d.physical_id[1] + "_replaced-%s" % suff)
3771
      # build the rename list based on what LVs exist on the node
3772
      rlist = []
3773
      for to_ren in old_lvs:
3774
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3775
        if find_res is not None: # device exists
3776
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3777

    
3778
      info("renaming the old LVs on the target node")
3779
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3780
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3781
      # now we rename the new LVs to the old LVs
3782
      info("renaming the new LVs on the target node")
3783
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3784
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3785
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3786

    
3787
      for old, new in zip(old_lvs, new_lvs):
3788
        new.logical_id = old.logical_id
3789
        cfg.SetDiskID(new, tgt_node)
3790

    
3791
      for disk in old_lvs:
3792
        disk.logical_id = ren_fn(disk, temp_suffix)
3793
        cfg.SetDiskID(disk, tgt_node)
3794

    
3795
      # now that the new lvs have the old name, we can add them to the device
3796
      info("adding new mirror component on %s" % tgt_node)
3797
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3798
        for new_lv in new_lvs:
3799
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3800
            warning("Can't rollback device %s", hint="manually cleanup unused"
3801
                    " logical volumes")
3802
        raise errors.OpExecError("Can't add local storage to drbd")
3803

    
3804
      dev.children = new_lvs
3805
      cfg.Update(instance)
3806

    
3807
    # Step: wait for sync
3808

    
3809
    # this can fail as the old devices are degraded and _WaitForSync
3810
    # does a combined result over all disks, so we don't check its
3811
    # return value
3812
    self.proc.LogStep(5, steps_total, "sync devices")
3813
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3814

    
3815
    # so check manually all the devices
3816
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3817
      cfg.SetDiskID(dev, instance.primary_node)
3818
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3819
      if is_degr:
3820
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3821

    
3822
    # Step: remove old storage
3823
    self.proc.LogStep(6, steps_total, "removing old storage")
3824
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3825
      info("remove logical volumes for %s" % name)
3826
      for lv in old_lvs:
3827
        cfg.SetDiskID(lv, tgt_node)
3828
        if not rpc.call_blockdev_remove(tgt_node, lv):
3829
          warning("Can't remove old LV", hint="manually remove unused LVs")
3830
          continue
3831

    
3832
  def _ExecD8Secondary(self, feedback_fn):
3833
    """Replace the secondary node for drbd8.
3834

3835
    The algorithm for replace is quite complicated:
3836
      - for all disks of the instance:
3837
        - create new LVs on the new node with same names
3838
        - shutdown the drbd device on the old secondary
3839
        - disconnect the drbd network on the primary
3840
        - create the drbd device on the new secondary
3841
        - network attach the drbd on the primary, using an artifice:
3842
          the drbd code for Attach() will connect to the network if it
3843
          finds a device which is connected to the good local disks but
3844
          not network enabled
3845
      - wait for sync across all devices
3846
      - remove all disks from the old secondary
3847

3848
    Failures are not very well handled.
3849

3850
    """
3851
    steps_total = 6
3852
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3853
    instance = self.instance
3854
    iv_names = {}
3855
    vgname = self.cfg.GetVGName()
3856
    # start of work
3857
    cfg = self.cfg
3858
    old_node = self.tgt_node
3859
    new_node = self.new_node
3860
    pri_node = instance.primary_node
3861

    
3862
    # Step: check device activation
3863
    self.proc.LogStep(1, steps_total, "check device existence")
3864
    info("checking volume groups")
3865
    my_vg = cfg.GetVGName()
3866
    results = rpc.call_vg_list([pri_node, new_node])
3867
    if not results:
3868
      raise errors.OpExecError("Can't list volume groups on the nodes")
3869
    for node in pri_node, new_node:
3870
      res = results.get(node, False)
3871
      if not res or my_vg not in res:
3872
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3873
                                 (my_vg, node))
3874
    for dev in instance.disks:
3875
      if not dev.iv_name in self.op.disks:
3876
        continue
3877
      info("checking %s on %s" % (dev.iv_name, pri_node))
3878
      cfg.SetDiskID(dev, pri_node)
3879
      if not rpc.call_blockdev_find(pri_node, dev):
3880
        raise errors.OpExecError("Can't find device %s on node %s" %
3881
                                 (dev.iv_name, pri_node))
3882

    
3883
    # Step: check other node consistency
3884
    self.proc.LogStep(2, steps_total, "check peer consistency")
3885
    for dev in instance.disks:
3886
      if not dev.iv_name in self.op.disks:
3887
        continue
3888
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3889
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3890
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3891
                                 " unsafe to replace the secondary" %
3892
                                 pri_node)
3893

    
3894
    # Step: create new storage
3895
    self.proc.LogStep(3, steps_total, "allocate new storage")
3896
    for dev in instance.disks:
3897
      size = dev.size
3898
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3899
      # since we *always* want to create this LV, we use the
3900
      # _Create...OnPrimary (which forces the creation), even if we
3901
      # are talking about the secondary node
3902
      for new_lv in dev.children:
3903
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3904
                                        _GetInstanceInfoText(instance)):
3905
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3906
                                   " node '%s'" %
3907
                                   (new_lv.logical_id[1], new_node))
3908

    
3909
      iv_names[dev.iv_name] = (dev, dev.children)
3910

    
3911
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3912
    for dev in instance.disks:
3913
      size = dev.size
3914
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3915
      # create new devices on new_node
3916
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3917
                              logical_id=(pri_node, new_node,
3918
                                          dev.logical_id[2]),
3919
                              children=dev.children)
3920
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3921
                                        new_drbd, False,
3922
                                      _GetInstanceInfoText(instance)):
3923
        raise errors.OpExecError("Failed to create new DRBD on"
3924
                                 " node '%s'" % new_node)
3925

    
3926
    for dev in instance.disks:
3927
      # we have new devices, shutdown the drbd on the old secondary
3928
      info("shutting down drbd for %s on old node" % dev.iv_name)
3929
      cfg.SetDiskID(dev, old_node)
3930
      if not rpc.call_blockdev_shutdown(old_node, dev):
3931
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3932
                hint="Please cleanup this device manually as soon as possible")
3933

    
3934
    info("detaching primary drbds from the network (=> standalone)")
3935
    done = 0
3936
    for dev in instance.disks:
3937
      cfg.SetDiskID(dev, pri_node)
3938
      # set the physical (unique in bdev terms) id to None, meaning
3939
      # detach from network
3940
      dev.physical_id = (None,) * len(dev.physical_id)
3941
      # and 'find' the device, which will 'fix' it to match the
3942
      # standalone state
3943
      if rpc.call_blockdev_find(pri_node, dev):
3944
        done += 1
3945
      else:
3946
        warning("Failed to detach drbd %s from network, unusual case" %
3947
                dev.iv_name)
3948

    
3949
    if not done:
3950
      # no detaches succeeded (very unlikely)
3951
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3952

    
3953
    # if we managed to detach at least one, we update all the disks of
3954
    # the instance to point to the new secondary
3955
    info("updating instance configuration")
3956
    for dev in instance.disks:
3957
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3958
      cfg.SetDiskID(dev, pri_node)
3959
    cfg.Update(instance)
3960

    
3961
    # and now perform the drbd attach
3962
    info("attaching primary drbds to new secondary (standalone => connected)")
3963
    failures = []
3964
    for dev in instance.disks:
3965
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3966
      # since the attach is smart, it's enough to 'find' the device,
3967
      # it will automatically activate the network, if the physical_id
3968
      # is correct
3969
      cfg.SetDiskID(dev, pri_node)
3970
      if not rpc.call_blockdev_find(pri_node, dev):
3971
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3972
                "please do a gnt-instance info to see the status of disks")
3973

    
3974
    # this can fail as the old devices are degraded and _WaitForSync
3975
    # does a combined result over all disks, so we don't check its
3976
    # return value
3977
    self.proc.LogStep(5, steps_total, "sync devices")
3978
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3979

    
3980
    # so check manually all the devices
3981
    for name, (dev, old_lvs) in iv_names.iteritems():
3982
      cfg.SetDiskID(dev, pri_node)
3983
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3984
      if is_degr:
3985
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3986

    
3987
    self.proc.LogStep(6, steps_total, "removing old storage")
3988
    for name, (dev, old_lvs) in iv_names.iteritems():
3989
      info("remove logical volumes for %s" % name)
3990
      for lv in old_lvs:
3991
        cfg.SetDiskID(lv, old_node)
3992
        if not rpc.call_blockdev_remove(old_node, lv):
3993
          warning("Can't remove LV on old secondary",
3994
                  hint="Cleanup stale volumes by hand")
3995

    
3996
  def Exec(self, feedback_fn):
3997
    """Execute disk replacement.
3998

3999
    This dispatches the disk replacement to the appropriate handler.
4000

4001
    """
4002
    instance = self.instance
4003
    if instance.disk_template == constants.DT_REMOTE_RAID1:
4004
      fn = self._ExecRR1
4005
    elif instance.disk_template == constants.DT_DRBD8:
4006
      if self.op.remote_node is None:
4007
        fn = self._ExecD8DiskOnly
4008
      else:
4009
        fn = self._ExecD8Secondary
4010
    else:
4011
      raise errors.ProgrammerError("Unhandled disk replacement case")
4012
    return fn(feedback_fn)
4013

    
4014

    
4015
class LUQueryInstanceData(NoHooksLU):
4016
  """Query runtime instance data.
4017

4018
  """
4019
  _OP_REQP = ["instances"]
4020

    
4021
  def CheckPrereq(self):
4022
    """Check prerequisites.
4023

4024
    This only checks the optional instance list against the existing names.
4025

4026
    """
4027
    if not isinstance(self.op.instances, list):
4028
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4029
    if self.op.instances:
4030
      self.wanted_instances = []
4031
      names = self.op.instances
4032
      for name in names:
4033
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4034
        if instance is None:
4035
          raise errors.OpPrereqError("No such instance name '%s'" % name)
4036
        self.wanted_instances.append(instance)
4037
    else:
4038
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4039
                               in self.cfg.GetInstanceList()]
4040
    return
4041

    
4042

    
4043
  def _ComputeDiskStatus(self, instance, snode, dev):
4044
    """Compute block device status.
4045

4046
    """
4047
    self.cfg.SetDiskID(dev, instance.primary_node)
4048
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4049
    if dev.dev_type in constants.LDS_DRBD:
4050
      # we change the snode then (otherwise we use the one passed in)
4051
      if dev.logical_id[0] == instance.primary_node:
4052
        snode = dev.logical_id[1]
4053
      else:
4054
        snode = dev.logical_id[0]
4055

    
4056
    if snode:
4057
      self.cfg.SetDiskID(dev, snode)
4058
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4059
    else:
4060
      dev_sstatus = None
4061

    
4062
    if dev.children:
4063
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4064
                      for child in dev.children]
4065
    else:
4066
      dev_children = []
4067

    
4068
    data = {
4069
      "iv_name": dev.iv_name,
4070
      "dev_type": dev.dev_type,
4071
      "logical_id": dev.logical_id,
4072
      "physical_id": dev.physical_id,
4073
      "pstatus": dev_pstatus,
4074
      "sstatus": dev_sstatus,
4075
      "children": dev_children,
4076
      }
4077

    
4078
    return data
4079

    
4080
  def Exec(self, feedback_fn):
4081
    """Gather and return data"""
4082
    result = {}
4083
    for instance in self.wanted_instances:
4084
      remote_info = rpc.call_instance_info(instance.primary_node,
4085
                                                instance.name)
4086
      if remote_info and "state" in remote_info:
4087
        remote_state = "up"
4088
      else:
4089
        remote_state = "down"
4090
      if instance.status == "down":
4091
        config_state = "down"
4092
      else:
4093
        config_state = "up"
4094

    
4095
      disks = [self._ComputeDiskStatus(instance, None, device)
4096
               for device in instance.disks]
4097

    
4098
      idict = {
4099
        "name": instance.name,
4100
        "config_state": config_state,
4101
        "run_state": remote_state,
4102
        "pnode": instance.primary_node,
4103
        "snodes": instance.secondary_nodes,
4104
        "os": instance.os,
4105
        "memory": instance.memory,
4106
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4107
        "disks": disks,
4108
        "network_port": instance.network_port,
4109
        "vcpus": instance.vcpus,
4110
        "kernel_path": instance.kernel_path,
4111
        "initrd_path": instance.initrd_path,
4112
        "hvm_boot_order": instance.hvm_boot_order,
4113
        }
4114

    
4115
      result[instance.name] = idict
4116

    
4117
    return result
4118

    
4119

    
4120
class LUSetInstanceParms(LogicalUnit):
4121
  """Modifies an instances's parameters.
4122

4123
  """
4124
  HPATH = "instance-modify"
4125
  HTYPE = constants.HTYPE_INSTANCE
4126
  _OP_REQP = ["instance_name"]
4127

    
4128
  def BuildHooksEnv(self):
4129
    """Build hooks env.
4130

4131
    This runs on the master, primary and secondaries.
4132

4133
    """
4134
    args = dict()
4135
    if self.mem:
4136
      args['memory'] = self.mem
4137
    if self.vcpus:
4138
      args['vcpus'] = self.vcpus
4139
    if self.do_ip or self.do_bridge or self.mac:
4140
      if self.do_ip:
4141
        ip = self.ip
4142
      else:
4143
        ip = self.instance.nics[0].ip
4144
      if self.bridge:
4145
        bridge = self.bridge
4146
      else:
4147
        bridge = self.instance.nics[0].bridge
4148
      if self.mac:
4149
        mac = self.mac
4150
      else:
4151
        mac = self.instance.nics[0].mac
4152
      args['nics'] = [(ip, bridge, mac)]
4153
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4154
    nl = [self.sstore.GetMasterNode(),
4155
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4156
    return env, nl, nl
4157

    
4158
  def CheckPrereq(self):
4159
    """Check prerequisites.
4160

4161
    This only checks the instance list against the existing names.
4162

4163
    """
4164
    self.mem = getattr(self.op, "mem", None)
4165
    self.vcpus = getattr(self.op, "vcpus", None)
4166
    self.ip = getattr(self.op, "ip", None)
4167
    self.mac = getattr(self.op, "mac", None)
4168
    self.bridge = getattr(self.op, "bridge", None)
4169
    self.kernel_path = getattr(self.op, "kernel_path", None)
4170
    self.initrd_path = getattr(self.op, "initrd_path", None)
4171
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4172
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4173
                 self.kernel_path, self.initrd_path, self.hvm_boot_order]
4174
    if all_parms.count(None) == len(all_parms):
4175
      raise errors.OpPrereqError("No changes submitted")
4176
    if self.mem is not None:
4177
      try:
4178
        self.mem = int(self.mem)
4179
      except ValueError, err:
4180
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4181
    if self.vcpus is not None:
4182
      try:
4183
        self.vcpus = int(self.vcpus)
4184
      except ValueError, err:
4185
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4186
    if self.ip is not None:
4187
      self.do_ip = True
4188
      if self.ip.lower() == "none":
4189
        self.ip = None
4190
      else:
4191
        if not utils.IsValidIP(self.ip):
4192
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4193
    else:
4194
      self.do_ip = False
4195
    self.do_bridge = (self.bridge is not None)
4196
    if self.mac is not None:
4197
      if self.cfg.IsMacInUse(self.mac):
4198
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4199
                                   self.mac)
4200
      if not utils.IsValidMac(self.mac):
4201
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4202

    
4203
    if self.kernel_path is not None:
4204
      self.do_kernel_path = True
4205
      if self.kernel_path == constants.VALUE_NONE:
4206
        raise errors.OpPrereqError("Can't set instance to no kernel")
4207

    
4208
      if self.kernel_path != constants.VALUE_DEFAULT:
4209
        if not os.path.isabs(self.kernel_path):
4210
          raise errors.OpPrereqError("The kernel path must be an absolute"
4211
                                    " filename")
4212
    else:
4213
      self.do_kernel_path = False
4214

    
4215
    if self.initrd_path is not None:
4216
      self.do_initrd_path = True
4217
      if self.initrd_path not in (constants.VALUE_NONE,
4218
                                  constants.VALUE_DEFAULT):
4219
        if not os.path.isabs(self.initrd_path):
4220
          raise errors.OpPrereqError("The initrd path must be an absolute"
4221
                                    " filename")
4222
    else:
4223
      self.do_initrd_path = False
4224

    
4225
    # boot order verification
4226
    if self.hvm_boot_order is not None:
4227
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4228
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4229
          raise errors.OpPrereqError("invalid boot order specified,"
4230
                                     " must be one or more of [acdn]"
4231
                                     " or 'default'")
4232

    
4233
    instance = self.cfg.GetInstanceInfo(
4234
      self.cfg.ExpandInstanceName(self.op.instance_name))
4235
    if instance is None:
4236
      raise errors.OpPrereqError("No such instance name '%s'" %
4237
                                 self.op.instance_name)
4238
    self.op.instance_name = instance.name
4239
    self.instance = instance
4240
    return
4241

    
4242
  def Exec(self, feedback_fn):
4243
    """Modifies an instance.
4244

4245
    All parameters take effect only at the next restart of the instance.
4246
    """
4247
    result = []
4248
    instance = self.instance
4249
    if self.mem:
4250
      instance.memory = self.mem
4251
      result.append(("mem", self.mem))
4252
    if self.vcpus:
4253
      instance.vcpus = self.vcpus
4254
      result.append(("vcpus",  self.vcpus))
4255
    if self.do_ip:
4256
      instance.nics[0].ip = self.ip
4257
      result.append(("ip", self.ip))
4258
    if self.bridge:
4259
      instance.nics[0].bridge = self.bridge
4260
      result.append(("bridge", self.bridge))
4261
    if self.mac:
4262
      instance.nics[0].mac = self.mac
4263
      result.append(("mac", self.mac))
4264
    if self.do_kernel_path:
4265
      instance.kernel_path = self.kernel_path
4266
      result.append(("kernel_path", self.kernel_path))
4267
    if self.do_initrd_path:
4268
      instance.initrd_path = self.initrd_path
4269
      result.append(("initrd_path", self.initrd_path))
4270
    if self.hvm_boot_order:
4271
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4272
        instance.hvm_boot_order = None
4273
      else:
4274
        instance.hvm_boot_order = self.hvm_boot_order
4275
      result.append(("hvm_boot_order", self.hvm_boot_order))
4276

    
4277
    self.cfg.AddInstance(instance)
4278

    
4279
    return result
4280

    
4281

    
4282
class LUQueryExports(NoHooksLU):
4283
  """Query the exports list
4284

4285
  """
4286
  _OP_REQP = []
4287

    
4288
  def CheckPrereq(self):
4289
    """Check that the nodelist contains only existing nodes.
4290

4291
    """
4292
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4293

    
4294
  def Exec(self, feedback_fn):
4295
    """Compute the list of all the exported system images.
4296

4297
    Returns:
4298
      a dictionary with the structure node->(export-list)
4299
      where export-list is a list of the instances exported on
4300
      that node.
4301

4302
    """
4303
    return rpc.call_export_list(self.nodes)
4304

    
4305

    
4306
class LUExportInstance(LogicalUnit):
4307
  """Export an instance to an image in the cluster.
4308

4309
  """
4310
  HPATH = "instance-export"
4311
  HTYPE = constants.HTYPE_INSTANCE
4312
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4313

    
4314
  def BuildHooksEnv(self):
4315
    """Build hooks env.
4316

4317
    This will run on the master, primary node and target node.
4318

4319
    """
4320
    env = {
4321
      "EXPORT_NODE": self.op.target_node,
4322
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4323
      }
4324
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4325
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4326
          self.op.target_node]
4327
    return env, nl, nl
4328

    
4329
  def CheckPrereq(self):
4330
    """Check prerequisites.
4331

4332
    This checks that the instance name is a valid one.
4333

4334
    """
4335
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4336
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4337
    if self.instance is None:
4338
      raise errors.OpPrereqError("Instance '%s' not found" %
4339
                                 self.op.instance_name)
4340

    
4341
    # node verification
4342
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4343
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4344

    
4345
    if self.dst_node is None:
4346
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4347
                                 self.op.target_node)
4348
    self.op.target_node = self.dst_node.name
4349

    
4350
  def Exec(self, feedback_fn):
4351
    """Export an instance to an image in the cluster.
4352

4353
    """
4354
    instance = self.instance
4355
    dst_node = self.dst_node
4356
    src_node = instance.primary_node
4357
    # shutdown the instance, unless requested not to do so
4358
    if self.op.shutdown:
4359
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
4360
      self.proc.ChainOpCode(op)
4361

    
4362
    vgname = self.cfg.GetVGName()
4363

    
4364
    snap_disks = []
4365

    
4366
    try:
4367
      for disk in instance.disks:
4368
        if disk.iv_name == "sda":
4369
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4370
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4371

    
4372
          if not new_dev_name:
4373
            logger.Error("could not snapshot block device %s on node %s" %
4374
                         (disk.logical_id[1], src_node))
4375
          else:
4376
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4377
                                      logical_id=(vgname, new_dev_name),
4378
                                      physical_id=(vgname, new_dev_name),
4379
                                      iv_name=disk.iv_name)
4380
            snap_disks.append(new_dev)
4381

    
4382
    finally:
4383
      if self.op.shutdown:
4384
        op = opcodes.OpStartupInstance(instance_name=instance.name,
4385
                                       force=False)
4386
        self.proc.ChainOpCode(op)
4387

    
4388
    # TODO: check for size
4389

    
4390
    for dev in snap_disks:
4391
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4392
                                           instance):
4393
        logger.Error("could not export block device %s from node"
4394
                     " %s to node %s" %
4395
                     (dev.logical_id[1], src_node, dst_node.name))
4396
      if not rpc.call_blockdev_remove(src_node, dev):
4397
        logger.Error("could not remove snapshot block device %s from"
4398
                     " node %s" % (dev.logical_id[1], src_node))
4399

    
4400
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4401
      logger.Error("could not finalize export for instance %s on node %s" %
4402
                   (instance.name, dst_node.name))
4403

    
4404
    nodelist = self.cfg.GetNodeList()
4405
    nodelist.remove(dst_node.name)
4406

    
4407
    # on one-node clusters nodelist will be empty after the removal
4408
    # if we proceed the backup would be removed because OpQueryExports
4409
    # substitutes an empty list with the full cluster node list.
4410
    if nodelist:
4411
      op = opcodes.OpQueryExports(nodes=nodelist)
4412
      exportlist = self.proc.ChainOpCode(op)
4413
      for node in exportlist:
4414
        if instance.name in exportlist[node]:
4415
          if not rpc.call_export_remove(node, instance.name):
4416
            logger.Error("could not remove older export for instance %s"
4417
                         " on node %s" % (instance.name, node))
4418

    
4419

    
4420
class TagsLU(NoHooksLU):
4421
  """Generic tags LU.
4422

4423
  This is an abstract class which is the parent of all the other tags LUs.
4424

4425
  """
4426
  def CheckPrereq(self):
4427
    """Check prerequisites.
4428

4429
    """
4430
    if self.op.kind == constants.TAG_CLUSTER:
4431
      self.target = self.cfg.GetClusterInfo()
4432
    elif self.op.kind == constants.TAG_NODE:
4433
      name = self.cfg.ExpandNodeName(self.op.name)
4434
      if name is None:
4435
        raise errors.OpPrereqError("Invalid node name (%s)" %
4436
                                   (self.op.name,))
4437
      self.op.name = name
4438
      self.target = self.cfg.GetNodeInfo(name)
4439
    elif self.op.kind == constants.TAG_INSTANCE:
4440
      name = self.cfg.ExpandInstanceName(self.op.name)
4441
      if name is None:
4442
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4443
                                   (self.op.name,))
4444
      self.op.name = name
4445
      self.target = self.cfg.GetInstanceInfo(name)
4446
    else:
4447
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4448
                                 str(self.op.kind))
4449

    
4450

    
4451
class LUGetTags(TagsLU):
4452
  """Returns the tags of a given object.
4453

4454
  """
4455
  _OP_REQP = ["kind", "name"]
4456

    
4457
  def Exec(self, feedback_fn):
4458
    """Returns the tag list.
4459

4460
    """
4461
    return self.target.GetTags()
4462

    
4463

    
4464
class LUSearchTags(NoHooksLU):
4465
  """Searches the tags for a given pattern.
4466

4467
  """
4468
  _OP_REQP = ["pattern"]
4469

    
4470
  def CheckPrereq(self):
4471
    """Check prerequisites.
4472

4473
    This checks the pattern passed for validity by compiling it.
4474

4475
    """
4476
    try:
4477
      self.re = re.compile(self.op.pattern)
4478
    except re.error, err:
4479
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4480
                                 (self.op.pattern, err))
4481

    
4482
  def Exec(self, feedback_fn):
4483
    """Returns the tag list.
4484

4485
    """
4486
    cfg = self.cfg
4487
    tgts = [("/cluster", cfg.GetClusterInfo())]
4488
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4489
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4490
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4491
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4492
    results = []
4493
    for path, target in tgts:
4494
      for tag in target.GetTags():
4495
        if self.re.search(tag):
4496
          results.append((path, tag))
4497
    return results
4498

    
4499

    
4500
class LUAddTags(TagsLU):
4501
  """Sets a tag on a given object.
4502

4503
  """
4504
  _OP_REQP = ["kind", "name", "tags"]
4505

    
4506
  def CheckPrereq(self):
4507
    """Check prerequisites.
4508

4509
    This checks the type and length of the tag name and value.
4510

4511
    """
4512
    TagsLU.CheckPrereq(self)
4513
    for tag in self.op.tags:
4514
      objects.TaggableObject.ValidateTag(tag)
4515

    
4516
  def Exec(self, feedback_fn):
4517
    """Sets the tag.
4518

4519
    """
4520
    try:
4521
      for tag in self.op.tags:
4522
        self.target.AddTag(tag)
4523
    except errors.TagError, err:
4524
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4525
    try:
4526
      self.cfg.Update(self.target)
4527
    except errors.ConfigurationError:
4528
      raise errors.OpRetryError("There has been a modification to the"
4529
                                " config file and the operation has been"
4530
                                " aborted. Please retry.")
4531

    
4532

    
4533
class LUDelTags(TagsLU):
4534
  """Delete a list of tags from a given object.
4535

4536
  """
4537
  _OP_REQP = ["kind", "name", "tags"]
4538

    
4539
  def CheckPrereq(self):
4540
    """Check prerequisites.
4541

4542
    This checks that we have the given tag.
4543

4544
    """
4545
    TagsLU.CheckPrereq(self)
4546
    for tag in self.op.tags:
4547
      objects.TaggableObject.ValidateTag(tag)
4548
    del_tags = frozenset(self.op.tags)
4549
    cur_tags = self.target.GetTags()
4550
    if not del_tags <= cur_tags:
4551
      diff_tags = del_tags - cur_tags
4552
      diff_names = ["'%s'" % tag for tag in diff_tags]
4553
      diff_names.sort()
4554
      raise errors.OpPrereqError("Tag(s) %s not found" %
4555
                                 (",".join(diff_names)))
4556

    
4557
  def Exec(self, feedback_fn):
4558
    """Remove the tag from the object.
4559

4560
    """
4561
    for tag in self.op.tags:
4562
      self.target.RemoveTag(tag)
4563
    try:
4564
      self.cfg.Update(self.target)
4565
    except errors.ConfigurationError:
4566
      raise errors.OpRetryError("There has been a modification to the"
4567
                                " config file and the operation has been"
4568
                                " aborted. Please retry.")
4569

    
4570
class LUTestDelay(NoHooksLU):
4571
  """Sleep for a specified amount of time.
4572

4573
  This LU sleeps on the master and/or nodes for a specified amoutn of
4574
  time.
4575

4576
  """
4577
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4578

    
4579
  def CheckPrereq(self):
4580
    """Check prerequisites.
4581

4582
    This checks that we have a good list of nodes and/or the duration
4583
    is valid.
4584

4585
    """
4586

    
4587
    if self.op.on_nodes:
4588
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4589

    
4590
  def Exec(self, feedback_fn):
4591
    """Do the actual sleep.
4592

4593
    """
4594
    if self.op.on_master:
4595
      if not utils.TestDelay(self.op.duration):
4596
        raise errors.OpExecError("Error during master delay test")
4597
    if self.op.on_nodes:
4598
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4599
      if not result:
4600
        raise errors.OpExecError("Complete failure from rpc call")
4601
      for node, node_result in result.items():
4602
        if not node_result:
4603
          raise errors.OpExecError("Failure during rpc call to node %s,"
4604
                                   " result: %s" % (node, node_result))