Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 1ff08570

History | View | Annotate | Download (153.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement CheckPrereq which also fills in the opcode instance
52
      with all the fields (even if as None)
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements (REQ_CLUSTER,
57
      REQ_MASTER); note that all commands require root permissions
58

59
  """
60
  HPATH = None
61
  HTYPE = None
62
  _OP_REQP = []
63
  REQ_CLUSTER = True
64
  REQ_MASTER = True
65

    
66
  def __init__(self, processor, op, cfg, sstore):
67
    """Constructor for LogicalUnit.
68

69
    This needs to be overriden in derived classes in order to check op
70
    validity.
71

72
    """
73
    self.proc = processor
74
    self.op = op
75
    self.cfg = cfg
76
    self.sstore = sstore
77
    self.__ssh = None
78

    
79
    for attr_name in self._OP_REQP:
80
      attr_val = getattr(op, attr_name, None)
81
      if attr_val is None:
82
        raise errors.OpPrereqError("Required parameter '%s' missing" %
83
                                   attr_name)
84
    if self.REQ_CLUSTER:
85
      if not cfg.IsCluster():
86
        raise errors.OpPrereqError("Cluster not initialized yet,"
87
                                   " use 'gnt-cluster init' first.")
88
      if self.REQ_MASTER:
89
        master = sstore.GetMasterNode()
90
        if master != utils.HostInfo().name:
91
          raise errors.OpPrereqError("Commands must be run on the master"
92
                                     " node %s" % master)
93

    
94
  def __GetSSH(self):
95
    """Returns the SshRunner object
96

97
    """
98
    if not self.__ssh:
99
      self.__ssh = ssh.SshRunner(self.sstore)
100
    return self.__ssh
101

    
102
  ssh = property(fget=__GetSSH)
103

    
104
  def CheckPrereq(self):
105
    """Check prerequisites for this LU.
106

107
    This method should check that the prerequisites for the execution
108
    of this LU are fulfilled. It can do internode communication, but
109
    it should be idempotent - no cluster or system changes are
110
    allowed.
111

112
    The method should raise errors.OpPrereqError in case something is
113
    not fulfilled. Its return value is ignored.
114

115
    This method should also update all the parameters of the opcode to
116
    their canonical form; e.g. a short node name must be fully
117
    expanded after this method has successfully completed (so that
118
    hooks, logging, etc. work correctly).
119

120
    """
121
    raise NotImplementedError
122

    
123
  def Exec(self, feedback_fn):
124
    """Execute the LU.
125

126
    This method should implement the actual work. It should raise
127
    errors.OpExecError for failures that are somewhat dealt with in
128
    code, or expected.
129

130
    """
131
    raise NotImplementedError
132

    
133
  def BuildHooksEnv(self):
134
    """Build hooks environment for this LU.
135

136
    This method should return a three-node tuple consisting of: a dict
137
    containing the environment that will be used for running the
138
    specific hook for this LU, a list of node names on which the hook
139
    should run before the execution, and a list of node names on which
140
    the hook should run after the execution.
141

142
    The keys of the dict must not have 'GANETI_' prefixed as this will
143
    be handled in the hooks runner. Also note additional keys will be
144
    added by the hooks runner. If the LU doesn't define any
145
    environment, an empty dict (and not None) should be returned.
146

147
    As for the node lists, the master should not be included in the
148
    them, as it will be added by the hooks runner in case this LU
149
    requires a cluster to run on (otherwise we don't have a node
150
    list). No nodes should be returned as an empty list (and not
151
    None).
152

153
    Note that if the HPATH for a LU class is None, this function will
154
    not be called.
155

156
    """
157
    raise NotImplementedError
158

    
159

    
160
class NoHooksLU(LogicalUnit):
161
  """Simple LU which runs no hooks.
162

163
  This LU is intended as a parent for other LogicalUnits which will
164
  run no hooks, in order to reduce duplicate code.
165

166
  """
167
  HPATH = None
168
  HTYPE = None
169

    
170
  def BuildHooksEnv(self):
171
    """Build hooks env.
172

173
    This is a no-op, since we don't run hooks.
174

175
    """
176
    return {}, [], []
177

    
178

    
179
def _AddHostToEtcHosts(hostname):
180
  """Wrapper around utils.SetEtcHostsEntry.
181

182
  """
183
  hi = utils.HostInfo(name=hostname)
184
  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
185

    
186

    
187
def _RemoveHostFromEtcHosts(hostname):
188
  """Wrapper around utils.RemoveEtcHostsEntry.
189

190
  """
191
  hi = utils.HostInfo(name=hostname)
192
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
193
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
194

    
195

    
196
def _GetWantedNodes(lu, nodes):
197
  """Returns list of checked and expanded node names.
198

199
  Args:
200
    nodes: List of nodes (strings) or None for all
201

202
  """
203
  if not isinstance(nodes, list):
204
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
205

    
206
  if nodes:
207
    wanted = []
208

    
209
    for name in nodes:
210
      node = lu.cfg.ExpandNodeName(name)
211
      if node is None:
212
        raise errors.OpPrereqError("No such node name '%s'" % name)
213
      wanted.append(node)
214

    
215
  else:
216
    wanted = lu.cfg.GetNodeList()
217
  return utils.NiceSort(wanted)
218

    
219

    
220
def _GetWantedInstances(lu, instances):
221
  """Returns list of checked and expanded instance names.
222

223
  Args:
224
    instances: List of instances (strings) or None for all
225

226
  """
227
  if not isinstance(instances, list):
228
    raise errors.OpPrereqError("Invalid argument type 'instances'")
229

    
230
  if instances:
231
    wanted = []
232

    
233
    for name in instances:
234
      instance = lu.cfg.ExpandInstanceName(name)
235
      if instance is None:
236
        raise errors.OpPrereqError("No such instance name '%s'" % name)
237
      wanted.append(instance)
238

    
239
  else:
240
    wanted = lu.cfg.GetInstanceList()
241
  return utils.NiceSort(wanted)
242

    
243

    
244
def _CheckOutputFields(static, dynamic, selected):
245
  """Checks whether all selected fields are valid.
246

247
  Args:
248
    static: Static fields
249
    dynamic: Dynamic fields
250

251
  """
252
  static_fields = frozenset(static)
253
  dynamic_fields = frozenset(dynamic)
254

    
255
  all_fields = static_fields | dynamic_fields
256

    
257
  if not all_fields.issuperset(selected):
258
    raise errors.OpPrereqError("Unknown output fields selected: %s"
259
                               % ",".join(frozenset(selected).
260
                                          difference(all_fields)))
261

    
262

    
263
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
264
                          memory, vcpus, nics):
265
  """Builds instance related env variables for hooks from single variables.
266

267
  Args:
268
    secondary_nodes: List of secondary nodes as strings
269
  """
270
  env = {
271
    "OP_TARGET": name,
272
    "INSTANCE_NAME": name,
273
    "INSTANCE_PRIMARY": primary_node,
274
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
275
    "INSTANCE_OS_TYPE": os_type,
276
    "INSTANCE_STATUS": status,
277
    "INSTANCE_MEMORY": memory,
278
    "INSTANCE_VCPUS": vcpus,
279
  }
280

    
281
  if nics:
282
    nic_count = len(nics)
283
    for idx, (ip, bridge, mac) in enumerate(nics):
284
      if ip is None:
285
        ip = ""
286
      env["INSTANCE_NIC%d_IP" % idx] = ip
287
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
288
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
289
  else:
290
    nic_count = 0
291

    
292
  env["INSTANCE_NIC_COUNT"] = nic_count
293

    
294
  return env
295

    
296

    
297
def _BuildInstanceHookEnvByObject(instance, override=None):
298
  """Builds instance related env variables for hooks from an object.
299

300
  Args:
301
    instance: objects.Instance object of instance
302
    override: dict of values to override
303
  """
304
  args = {
305
    'name': instance.name,
306
    'primary_node': instance.primary_node,
307
    'secondary_nodes': instance.secondary_nodes,
308
    'os_type': instance.os,
309
    'status': instance.os,
310
    'memory': instance.memory,
311
    'vcpus': instance.vcpus,
312
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
313
  }
314
  if override:
315
    args.update(override)
316
  return _BuildInstanceHookEnv(**args)
317

    
318

    
319
def _UpdateKnownHosts(fullnode, ip, pubkey):
320
  """Ensure a node has a correct known_hosts entry.
321

322
  Args:
323
    fullnode - Fully qualified domain name of host. (str)
324
    ip       - IPv4 address of host (str)
325
    pubkey   - the public key of the cluster
326

327
  """
328
  if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
329
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
330
  else:
331
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
332

    
333
  inthere = False
334

    
335
  save_lines = []
336
  add_lines = []
337
  removed = False
338

    
339
  for rawline in f:
340
    logger.Debug('read %s' % (repr(rawline),))
341

    
342
    parts = rawline.rstrip('\r\n').split()
343

    
344
    # Ignore unwanted lines
345
    if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
346
      fields = parts[0].split(',')
347
      key = parts[2]
348

    
349
      haveall = True
350
      havesome = False
351
      for spec in [ ip, fullnode ]:
352
        if spec not in fields:
353
          haveall = False
354
        if spec in fields:
355
          havesome = True
356

    
357
      logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
358
      if haveall and key == pubkey:
359
        inthere = True
360
        save_lines.append(rawline)
361
        logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
362
        continue
363

    
364
      if havesome and (not haveall or key != pubkey):
365
        removed = True
366
        logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
367
        continue
368

    
369
    save_lines.append(rawline)
370

    
371
  if not inthere:
372
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
373
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
374

    
375
  if removed:
376
    save_lines = save_lines + add_lines
377

    
378
    # Write a new file and replace old.
379
    fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
380
                                   constants.DATA_DIR)
381
    newfile = os.fdopen(fd, 'w')
382
    try:
383
      newfile.write(''.join(save_lines))
384
    finally:
385
      newfile.close()
386
    logger.Debug("Wrote new known_hosts.")
387
    os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
388

    
389
  elif add_lines:
390
    # Simply appending a new line will do the trick.
391
    f.seek(0, 2)
392
    for add in add_lines:
393
      f.write(add)
394

    
395
  f.close()
396

    
397

    
398
def _HasValidVG(vglist, vgname):
399
  """Checks if the volume group list is valid.
400

401
  A non-None return value means there's an error, and the return value
402
  is the error message.
403

404
  """
405
  vgsize = vglist.get(vgname, None)
406
  if vgsize is None:
407
    return "volume group '%s' missing" % vgname
408
  elif vgsize < 20480:
409
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
410
            (vgname, vgsize))
411
  return None
412

    
413

    
414
def _InitSSHSetup(node):
415
  """Setup the SSH configuration for the cluster.
416

417

418
  This generates a dsa keypair for root, adds the pub key to the
419
  permitted hosts and adds the hostkey to its own known hosts.
420

421
  Args:
422
    node: the name of this host as a fqdn
423

424
  """
425
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
426

    
427
  for name in priv_key, pub_key:
428
    if os.path.exists(name):
429
      utils.CreateBackup(name)
430
    utils.RemoveFile(name)
431

    
432
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
433
                         "-f", priv_key,
434
                         "-q", "-N", ""])
435
  if result.failed:
436
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
437
                             result.output)
438

    
439
  f = open(pub_key, 'r')
440
  try:
441
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
442
  finally:
443
    f.close()
444

    
445

    
446
def _InitGanetiServerSetup(ss):
447
  """Setup the necessary configuration for the initial node daemon.
448

449
  This creates the nodepass file containing the shared password for
450
  the cluster and also generates the SSL certificate.
451

452
  """
453
  # Create pseudo random password
454
  randpass = sha.new(os.urandom(64)).hexdigest()
455
  # and write it into sstore
456
  ss.SetKey(ss.SS_NODED_PASS, randpass)
457

    
458
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
459
                         "-days", str(365*5), "-nodes", "-x509",
460
                         "-keyout", constants.SSL_CERT_FILE,
461
                         "-out", constants.SSL_CERT_FILE, "-batch"])
462
  if result.failed:
463
    raise errors.OpExecError("could not generate server ssl cert, command"
464
                             " %s had exitcode %s and error message %s" %
465
                             (result.cmd, result.exit_code, result.output))
466

    
467
  os.chmod(constants.SSL_CERT_FILE, 0400)
468

    
469
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
470

    
471
  if result.failed:
472
    raise errors.OpExecError("Could not start the node daemon, command %s"
473
                             " had exitcode %s and error %s" %
474
                             (result.cmd, result.exit_code, result.output))
475

    
476

    
477
def _CheckInstanceBridgesExist(instance):
478
  """Check that the brigdes needed by an instance exist.
479

480
  """
481
  # check bridges existance
482
  brlist = [nic.bridge for nic in instance.nics]
483
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
484
    raise errors.OpPrereqError("one or more target bridges %s does not"
485
                               " exist on destination node '%s'" %
486
                               (brlist, instance.primary_node))
487

    
488

    
489
class LUInitCluster(LogicalUnit):
490
  """Initialise the cluster.
491

492
  """
493
  HPATH = "cluster-init"
494
  HTYPE = constants.HTYPE_CLUSTER
495
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
496
              "def_bridge", "master_netdev"]
497
  REQ_CLUSTER = False
498

    
499
  def BuildHooksEnv(self):
500
    """Build hooks env.
501

502
    Notes: Since we don't require a cluster, we must manually add
503
    ourselves in the post-run node list.
504

505
    """
506
    env = {"OP_TARGET": self.op.cluster_name}
507
    return env, [], [self.hostname.name]
508

    
509
  def CheckPrereq(self):
510
    """Verify that the passed name is a valid one.
511

512
    """
513
    if config.ConfigWriter.IsCluster():
514
      raise errors.OpPrereqError("Cluster is already initialised")
515

    
516
    if self.op.hypervisor_type == constants.HT_XEN_HVM31:
517
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
518
        raise errors.OpPrereqError("Please prepare the cluster VNC"
519
                                   "password file %s" %
520
                                   constants.VNC_PASSWORD_FILE)
521

    
522
    self.hostname = hostname = utils.HostInfo()
523

    
524
    if hostname.ip.startswith("127."):
525
      raise errors.OpPrereqError("This host's IP resolves to the private"
526
                                 " range (%s). Please fix DNS or %s." %
527
                                 (hostname.ip, constants.ETC_HOSTS))
528

    
529
    if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
530
                         source=constants.LOCALHOST_IP_ADDRESS):
531
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
532
                                 " to %s,\nbut this ip address does not"
533
                                 " belong to this host."
534
                                 " Aborting." % hostname.ip)
535

    
536
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
537

    
538
    if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
539
                     timeout=5):
540
      raise errors.OpPrereqError("Cluster IP already active. Aborting.")
541

    
542
    secondary_ip = getattr(self.op, "secondary_ip", None)
543
    if secondary_ip and not utils.IsValidIP(secondary_ip):
544
      raise errors.OpPrereqError("Invalid secondary ip given")
545
    if (secondary_ip and
546
        secondary_ip != hostname.ip and
547
        (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
548
                           source=constants.LOCALHOST_IP_ADDRESS))):
549
      raise errors.OpPrereqError("You gave %s as secondary IP,"
550
                                 " but it does not belong to this host." %
551
                                 secondary_ip)
552
    self.secondary_ip = secondary_ip
553

    
554
    # checks presence of the volume group given
555
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
556

    
557
    if vgstatus:
558
      raise errors.OpPrereqError("Error: %s" % vgstatus)
559

    
560
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
561
                    self.op.mac_prefix):
562
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
563
                                 self.op.mac_prefix)
564

    
565
    if self.op.hypervisor_type not in constants.HYPER_TYPES:
566
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
567
                                 self.op.hypervisor_type)
568

    
569
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
570
    if result.failed:
571
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
572
                                 (self.op.master_netdev,
573
                                  result.output.strip()))
574

    
575
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
576
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
577
      raise errors.OpPrereqError("Init.d script '%s' missing or not"
578
                                 " executable." % constants.NODE_INITD_SCRIPT)
579

    
580
  def Exec(self, feedback_fn):
581
    """Initialize the cluster.
582

583
    """
584
    clustername = self.clustername
585
    hostname = self.hostname
586

    
587
    # set up the simple store
588
    self.sstore = ss = ssconf.SimpleStore()
589
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
590
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
591
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
592
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
593
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
594

    
595
    # set up the inter-node password and certificate
596
    _InitGanetiServerSetup(ss)
597

    
598
    # start the master ip
599
    rpc.call_node_start_master(hostname.name)
600

    
601
    # set up ssh config and /etc/hosts
602
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
603
    try:
604
      sshline = f.read()
605
    finally:
606
      f.close()
607
    sshkey = sshline.split(" ")[1]
608

    
609
    _AddHostToEtcHosts(hostname.name)
610

    
611
    _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
612

    
613
    _InitSSHSetup(hostname.name)
614

    
615
    # init of cluster config file
616
    self.cfg = cfgw = config.ConfigWriter()
617
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
618
                    sshkey, self.op.mac_prefix,
619
                    self.op.vg_name, self.op.def_bridge)
620

    
621

    
622
class LUDestroyCluster(NoHooksLU):
623
  """Logical unit for destroying the cluster.
624

625
  """
626
  _OP_REQP = []
627

    
628
  def CheckPrereq(self):
629
    """Check prerequisites.
630

631
    This checks whether the cluster is empty.
632

633
    Any errors are signalled by raising errors.OpPrereqError.
634

635
    """
636
    master = self.sstore.GetMasterNode()
637

    
638
    nodelist = self.cfg.GetNodeList()
639
    if len(nodelist) != 1 or nodelist[0] != master:
640
      raise errors.OpPrereqError("There are still %d node(s) in"
641
                                 " this cluster." % (len(nodelist) - 1))
642
    instancelist = self.cfg.GetInstanceList()
643
    if instancelist:
644
      raise errors.OpPrereqError("There are still %d instance(s) in"
645
                                 " this cluster." % len(instancelist))
646

    
647
  def Exec(self, feedback_fn):
648
    """Destroys the cluster.
649

650
    """
651
    master = self.sstore.GetMasterNode()
652
    if not rpc.call_node_stop_master(master):
653
      raise errors.OpExecError("Could not disable the master role")
654
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
655
    utils.CreateBackup(priv_key)
656
    utils.CreateBackup(pub_key)
657
    rpc.call_node_leave_cluster(master)
658

    
659

    
660
class LUVerifyCluster(NoHooksLU):
661
  """Verifies the cluster status.
662

663
  """
664
  _OP_REQP = []
665

    
666
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
667
                  remote_version, feedback_fn):
668
    """Run multiple tests against a node.
669

670
    Test list:
671
      - compares ganeti version
672
      - checks vg existance and size > 20G
673
      - checks config file checksum
674
      - checks ssh to other nodes
675

676
    Args:
677
      node: name of the node to check
678
      file_list: required list of files
679
      local_cksum: dictionary of local files and their checksums
680

681
    """
682
    # compares ganeti version
683
    local_version = constants.PROTOCOL_VERSION
684
    if not remote_version:
685
      feedback_fn(" - ERROR: connection to %s failed" % (node))
686
      return True
687

    
688
    if local_version != remote_version:
689
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
690
                      (local_version, node, remote_version))
691
      return True
692

    
693
    # checks vg existance and size > 20G
694

    
695
    bad = False
696
    if not vglist:
697
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
698
                      (node,))
699
      bad = True
700
    else:
701
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
702
      if vgstatus:
703
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
704
        bad = True
705

    
706
    # checks config file checksum
707
    # checks ssh to any
708

    
709
    if 'filelist' not in node_result:
710
      bad = True
711
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
712
    else:
713
      remote_cksum = node_result['filelist']
714
      for file_name in file_list:
715
        if file_name not in remote_cksum:
716
          bad = True
717
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
718
        elif remote_cksum[file_name] != local_cksum[file_name]:
719
          bad = True
720
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
721

    
722
    if 'nodelist' not in node_result:
723
      bad = True
724
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
725
    else:
726
      if node_result['nodelist']:
727
        bad = True
728
        for node in node_result['nodelist']:
729
          feedback_fn("  - ERROR: communication with node '%s': %s" %
730
                          (node, node_result['nodelist'][node]))
731
    hyp_result = node_result.get('hypervisor', None)
732
    if hyp_result is not None:
733
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
734
    return bad
735

    
736
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
737
    """Verify an instance.
738

739
    This function checks to see if the required block devices are
740
    available on the instance's node.
741

742
    """
743
    bad = False
744

    
745
    instancelist = self.cfg.GetInstanceList()
746
    if not instance in instancelist:
747
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
748
                      (instance, instancelist))
749
      bad = True
750

    
751
    instanceconfig = self.cfg.GetInstanceInfo(instance)
752
    node_current = instanceconfig.primary_node
753

    
754
    node_vol_should = {}
755
    instanceconfig.MapLVsByNode(node_vol_should)
756

    
757
    for node in node_vol_should:
758
      for volume in node_vol_should[node]:
759
        if node not in node_vol_is or volume not in node_vol_is[node]:
760
          feedback_fn("  - ERROR: volume %s missing on node %s" %
761
                          (volume, node))
762
          bad = True
763

    
764
    if not instanceconfig.status == 'down':
765
      if not instance in node_instance[node_current]:
766
        feedback_fn("  - ERROR: instance %s not running on node %s" %
767
                        (instance, node_current))
768
        bad = True
769

    
770
    for node in node_instance:
771
      if (not node == node_current):
772
        if instance in node_instance[node]:
773
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
774
                          (instance, node))
775
          bad = True
776

    
777
    return bad
778

    
779
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
780
    """Verify if there are any unknown volumes in the cluster.
781

782
    The .os, .swap and backup volumes are ignored. All other volumes are
783
    reported as unknown.
784

785
    """
786
    bad = False
787

    
788
    for node in node_vol_is:
789
      for volume in node_vol_is[node]:
790
        if node not in node_vol_should or volume not in node_vol_should[node]:
791
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
792
                      (volume, node))
793
          bad = True
794
    return bad
795

    
796
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
797
    """Verify the list of running instances.
798

799
    This checks what instances are running but unknown to the cluster.
800

801
    """
802
    bad = False
803
    for node in node_instance:
804
      for runninginstance in node_instance[node]:
805
        if runninginstance not in instancelist:
806
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
807
                          (runninginstance, node))
808
          bad = True
809
    return bad
810

    
811
  def CheckPrereq(self):
812
    """Check prerequisites.
813

814
    This has no prerequisites.
815

816
    """
817
    pass
818

    
819
  def Exec(self, feedback_fn):
820
    """Verify integrity of cluster, performing various test on nodes.
821

822
    """
823
    bad = False
824
    feedback_fn("* Verifying global settings")
825
    for msg in self.cfg.VerifyConfig():
826
      feedback_fn("  - ERROR: %s" % msg)
827

    
828
    vg_name = self.cfg.GetVGName()
829
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
830
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
831
    node_volume = {}
832
    node_instance = {}
833

    
834
    # FIXME: verify OS list
835
    # do local checksums
836
    file_names = list(self.sstore.GetFileList())
837
    file_names.append(constants.SSL_CERT_FILE)
838
    file_names.append(constants.CLUSTER_CONF_FILE)
839
    local_checksums = utils.FingerprintFiles(file_names)
840

    
841
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
842
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
843
    all_instanceinfo = rpc.call_instance_list(nodelist)
844
    all_vglist = rpc.call_vg_list(nodelist)
845
    node_verify_param = {
846
      'filelist': file_names,
847
      'nodelist': nodelist,
848
      'hypervisor': None,
849
      }
850
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
851
    all_rversion = rpc.call_version(nodelist)
852

    
853
    for node in nodelist:
854
      feedback_fn("* Verifying node %s" % node)
855
      result = self._VerifyNode(node, file_names, local_checksums,
856
                                all_vglist[node], all_nvinfo[node],
857
                                all_rversion[node], feedback_fn)
858
      bad = bad or result
859

    
860
      # node_volume
861
      volumeinfo = all_volumeinfo[node]
862

    
863
      if isinstance(volumeinfo, basestring):
864
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
865
                    (node, volumeinfo[-400:].encode('string_escape')))
866
        bad = True
867
        node_volume[node] = {}
868
      elif not isinstance(volumeinfo, dict):
869
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
870
        bad = True
871
        continue
872
      else:
873
        node_volume[node] = volumeinfo
874

    
875
      # node_instance
876
      nodeinstance = all_instanceinfo[node]
877
      if type(nodeinstance) != list:
878
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
879
        bad = True
880
        continue
881

    
882
      node_instance[node] = nodeinstance
883

    
884
    node_vol_should = {}
885

    
886
    for instance in instancelist:
887
      feedback_fn("* Verifying instance %s" % instance)
888
      result =  self._VerifyInstance(instance, node_volume, node_instance,
889
                                     feedback_fn)
890
      bad = bad or result
891

    
892
      inst_config = self.cfg.GetInstanceInfo(instance)
893

    
894
      inst_config.MapLVsByNode(node_vol_should)
895

    
896
    feedback_fn("* Verifying orphan volumes")
897
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
898
                                       feedback_fn)
899
    bad = bad or result
900

    
901
    feedback_fn("* Verifying remaining instances")
902
    result = self._VerifyOrphanInstances(instancelist, node_instance,
903
                                         feedback_fn)
904
    bad = bad or result
905

    
906
    return int(bad)
907

    
908

    
909
class LUVerifyDisks(NoHooksLU):
910
  """Verifies the cluster disks status.
911

912
  """
913
  _OP_REQP = []
914

    
915
  def CheckPrereq(self):
916
    """Check prerequisites.
917

918
    This has no prerequisites.
919

920
    """
921
    pass
922

    
923
  def Exec(self, feedback_fn):
924
    """Verify integrity of cluster disks.
925

926
    """
927
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
928

    
929
    vg_name = self.cfg.GetVGName()
930
    nodes = utils.NiceSort(self.cfg.GetNodeList())
931
    instances = [self.cfg.GetInstanceInfo(name)
932
                 for name in self.cfg.GetInstanceList()]
933

    
934
    nv_dict = {}
935
    for inst in instances:
936
      inst_lvs = {}
937
      if (inst.status != "up" or
938
          inst.disk_template not in constants.DTS_NET_MIRROR):
939
        continue
940
      inst.MapLVsByNode(inst_lvs)
941
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
942
      for node, vol_list in inst_lvs.iteritems():
943
        for vol in vol_list:
944
          nv_dict[(node, vol)] = inst
945

    
946
    if not nv_dict:
947
      return result
948

    
949
    node_lvs = rpc.call_volume_list(nodes, vg_name)
950

    
951
    to_act = set()
952
    for node in nodes:
953
      # node_volume
954
      lvs = node_lvs[node]
955

    
956
      if isinstance(lvs, basestring):
957
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
958
        res_nlvm[node] = lvs
959
      elif not isinstance(lvs, dict):
960
        logger.Info("connection to node %s failed or invalid data returned" %
961
                    (node,))
962
        res_nodes.append(node)
963
        continue
964

    
965
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
966
        inst = nv_dict.pop((node, lv_name), None)
967
        if (not lv_online and inst is not None
968
            and inst.name not in res_instances):
969
          res_instances.append(inst.name)
970

    
971
    # any leftover items in nv_dict are missing LVs, let's arrange the
972
    # data better
973
    for key, inst in nv_dict.iteritems():
974
      if inst.name not in res_missing:
975
        res_missing[inst.name] = []
976
      res_missing[inst.name].append(key)
977

    
978
    return result
979

    
980

    
981
class LURenameCluster(LogicalUnit):
982
  """Rename the cluster.
983

984
  """
985
  HPATH = "cluster-rename"
986
  HTYPE = constants.HTYPE_CLUSTER
987
  _OP_REQP = ["name"]
988

    
989
  def BuildHooksEnv(self):
990
    """Build hooks env.
991

992
    """
993
    env = {
994
      "OP_TARGET": self.sstore.GetClusterName(),
995
      "NEW_NAME": self.op.name,
996
      }
997
    mn = self.sstore.GetMasterNode()
998
    return env, [mn], [mn]
999

    
1000
  def CheckPrereq(self):
1001
    """Verify that the passed name is a valid one.
1002

1003
    """
1004
    hostname = utils.HostInfo(self.op.name)
1005

    
1006
    new_name = hostname.name
1007
    self.ip = new_ip = hostname.ip
1008
    old_name = self.sstore.GetClusterName()
1009
    old_ip = self.sstore.GetMasterIP()
1010
    if new_name == old_name and new_ip == old_ip:
1011
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1012
                                 " cluster has changed")
1013
    if new_ip != old_ip:
1014
      result = utils.RunCmd(["fping", "-q", new_ip])
1015
      if not result.failed:
1016
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1017
                                   " reachable on the network. Aborting." %
1018
                                   new_ip)
1019

    
1020
    self.op.name = new_name
1021

    
1022
  def Exec(self, feedback_fn):
1023
    """Rename the cluster.
1024

1025
    """
1026
    clustername = self.op.name
1027
    ip = self.ip
1028
    ss = self.sstore
1029

    
1030
    # shutdown the master IP
1031
    master = ss.GetMasterNode()
1032
    if not rpc.call_node_stop_master(master):
1033
      raise errors.OpExecError("Could not disable the master role")
1034

    
1035
    try:
1036
      # modify the sstore
1037
      ss.SetKey(ss.SS_MASTER_IP, ip)
1038
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1039

    
1040
      # Distribute updated ss config to all nodes
1041
      myself = self.cfg.GetNodeInfo(master)
1042
      dist_nodes = self.cfg.GetNodeList()
1043
      if myself.name in dist_nodes:
1044
        dist_nodes.remove(myself.name)
1045

    
1046
      logger.Debug("Copying updated ssconf data to all nodes")
1047
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1048
        fname = ss.KeyToFilename(keyname)
1049
        result = rpc.call_upload_file(dist_nodes, fname)
1050
        for to_node in dist_nodes:
1051
          if not result[to_node]:
1052
            logger.Error("copy of file %s to node %s failed" %
1053
                         (fname, to_node))
1054
    finally:
1055
      if not rpc.call_node_start_master(master):
1056
        logger.Error("Could not re-enable the master role on the master,"
1057
                     " please restart manually.")
1058

    
1059

    
1060
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1061
  """Sleep and poll for an instance's disk to sync.
1062

1063
  """
1064
  if not instance.disks:
1065
    return True
1066

    
1067
  if not oneshot:
1068
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1069

    
1070
  node = instance.primary_node
1071

    
1072
  for dev in instance.disks:
1073
    cfgw.SetDiskID(dev, node)
1074

    
1075
  retries = 0
1076
  while True:
1077
    max_time = 0
1078
    done = True
1079
    cumul_degraded = False
1080
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1081
    if not rstats:
1082
      proc.LogWarning("Can't get any data from node %s" % node)
1083
      retries += 1
1084
      if retries >= 10:
1085
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1086
                                 " aborting." % node)
1087
      time.sleep(6)
1088
      continue
1089
    retries = 0
1090
    for i in range(len(rstats)):
1091
      mstat = rstats[i]
1092
      if mstat is None:
1093
        proc.LogWarning("Can't compute data for node %s/%s" %
1094
                        (node, instance.disks[i].iv_name))
1095
        continue
1096
      # we ignore the ldisk parameter
1097
      perc_done, est_time, is_degraded, _ = mstat
1098
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1099
      if perc_done is not None:
1100
        done = False
1101
        if est_time is not None:
1102
          rem_time = "%d estimated seconds remaining" % est_time
1103
          max_time = est_time
1104
        else:
1105
          rem_time = "no time estimate"
1106
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1107
                     (instance.disks[i].iv_name, perc_done, rem_time))
1108
    if done or oneshot:
1109
      break
1110

    
1111
    if unlock:
1112
      utils.Unlock('cmd')
1113
    try:
1114
      time.sleep(min(60, max_time))
1115
    finally:
1116
      if unlock:
1117
        utils.Lock('cmd')
1118

    
1119
  if done:
1120
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1121
  return not cumul_degraded
1122

    
1123

    
1124
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1125
  """Check that mirrors are not degraded.
1126

1127
  The ldisk parameter, if True, will change the test from the
1128
  is_degraded attribute (which represents overall non-ok status for
1129
  the device(s)) to the ldisk (representing the local storage status).
1130

1131
  """
1132
  cfgw.SetDiskID(dev, node)
1133
  if ldisk:
1134
    idx = 6
1135
  else:
1136
    idx = 5
1137

    
1138
  result = True
1139
  if on_primary or dev.AssembleOnSecondary():
1140
    rstats = rpc.call_blockdev_find(node, dev)
1141
    if not rstats:
1142
      logger.ToStderr("Can't get any data from node %s" % node)
1143
      result = False
1144
    else:
1145
      result = result and (not rstats[idx])
1146
  if dev.children:
1147
    for child in dev.children:
1148
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1149

    
1150
  return result
1151

    
1152

    
1153
class LUDiagnoseOS(NoHooksLU):
1154
  """Logical unit for OS diagnose/query.
1155

1156
  """
1157
  _OP_REQP = []
1158

    
1159
  def CheckPrereq(self):
1160
    """Check prerequisites.
1161

1162
    This always succeeds, since this is a pure query LU.
1163

1164
    """
1165
    return
1166

    
1167
  def Exec(self, feedback_fn):
1168
    """Compute the list of OSes.
1169

1170
    """
1171
    node_list = self.cfg.GetNodeList()
1172
    node_data = rpc.call_os_diagnose(node_list)
1173
    if node_data == False:
1174
      raise errors.OpExecError("Can't gather the list of OSes")
1175
    return node_data
1176

    
1177

    
1178
class LURemoveNode(LogicalUnit):
1179
  """Logical unit for removing a node.
1180

1181
  """
1182
  HPATH = "node-remove"
1183
  HTYPE = constants.HTYPE_NODE
1184
  _OP_REQP = ["node_name"]
1185

    
1186
  def BuildHooksEnv(self):
1187
    """Build hooks env.
1188

1189
    This doesn't run on the target node in the pre phase as a failed
1190
    node would not allows itself to run.
1191

1192
    """
1193
    env = {
1194
      "OP_TARGET": self.op.node_name,
1195
      "NODE_NAME": self.op.node_name,
1196
      }
1197
    all_nodes = self.cfg.GetNodeList()
1198
    all_nodes.remove(self.op.node_name)
1199
    return env, all_nodes, all_nodes
1200

    
1201
  def CheckPrereq(self):
1202
    """Check prerequisites.
1203

1204
    This checks:
1205
     - the node exists in the configuration
1206
     - it does not have primary or secondary instances
1207
     - it's not the master
1208

1209
    Any errors are signalled by raising errors.OpPrereqError.
1210

1211
    """
1212
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1213
    if node is None:
1214
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1215

    
1216
    instance_list = self.cfg.GetInstanceList()
1217

    
1218
    masternode = self.sstore.GetMasterNode()
1219
    if node.name == masternode:
1220
      raise errors.OpPrereqError("Node is the master node,"
1221
                                 " you need to failover first.")
1222

    
1223
    for instance_name in instance_list:
1224
      instance = self.cfg.GetInstanceInfo(instance_name)
1225
      if node.name == instance.primary_node:
1226
        raise errors.OpPrereqError("Instance %s still running on the node,"
1227
                                   " please remove first." % instance_name)
1228
      if node.name in instance.secondary_nodes:
1229
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1230
                                   " please remove first." % instance_name)
1231
    self.op.node_name = node.name
1232
    self.node = node
1233

    
1234
  def Exec(self, feedback_fn):
1235
    """Removes the node from the cluster.
1236

1237
    """
1238
    node = self.node
1239
    logger.Info("stopping the node daemon and removing configs from node %s" %
1240
                node.name)
1241

    
1242
    rpc.call_node_leave_cluster(node.name)
1243

    
1244
    self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1245

    
1246
    logger.Info("Removing node %s from config" % node.name)
1247

    
1248
    self.cfg.RemoveNode(node.name)
1249

    
1250
    _RemoveHostFromEtcHosts(node.name)
1251

    
1252

    
1253
class LUQueryNodes(NoHooksLU):
1254
  """Logical unit for querying nodes.
1255

1256
  """
1257
  _OP_REQP = ["output_fields", "names"]
1258

    
1259
  def CheckPrereq(self):
1260
    """Check prerequisites.
1261

1262
    This checks that the fields required are valid output fields.
1263

1264
    """
1265
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1266
                                     "mtotal", "mnode", "mfree",
1267
                                     "bootid"])
1268

    
1269
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1270
                               "pinst_list", "sinst_list",
1271
                               "pip", "sip"],
1272
                       dynamic=self.dynamic_fields,
1273
                       selected=self.op.output_fields)
1274

    
1275
    self.wanted = _GetWantedNodes(self, self.op.names)
1276

    
1277
  def Exec(self, feedback_fn):
1278
    """Computes the list of nodes and their attributes.
1279

1280
    """
1281
    nodenames = self.wanted
1282
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1283

    
1284
    # begin data gathering
1285

    
1286
    if self.dynamic_fields.intersection(self.op.output_fields):
1287
      live_data = {}
1288
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1289
      for name in nodenames:
1290
        nodeinfo = node_data.get(name, None)
1291
        if nodeinfo:
1292
          live_data[name] = {
1293
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1294
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1295
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1296
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1297
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1298
            "bootid": nodeinfo['bootid'],
1299
            }
1300
        else:
1301
          live_data[name] = {}
1302
    else:
1303
      live_data = dict.fromkeys(nodenames, {})
1304

    
1305
    node_to_primary = dict([(name, set()) for name in nodenames])
1306
    node_to_secondary = dict([(name, set()) for name in nodenames])
1307

    
1308
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1309
                             "sinst_cnt", "sinst_list"))
1310
    if inst_fields & frozenset(self.op.output_fields):
1311
      instancelist = self.cfg.GetInstanceList()
1312

    
1313
      for instance_name in instancelist:
1314
        inst = self.cfg.GetInstanceInfo(instance_name)
1315
        if inst.primary_node in node_to_primary:
1316
          node_to_primary[inst.primary_node].add(inst.name)
1317
        for secnode in inst.secondary_nodes:
1318
          if secnode in node_to_secondary:
1319
            node_to_secondary[secnode].add(inst.name)
1320

    
1321
    # end data gathering
1322

    
1323
    output = []
1324
    for node in nodelist:
1325
      node_output = []
1326
      for field in self.op.output_fields:
1327
        if field == "name":
1328
          val = node.name
1329
        elif field == "pinst_list":
1330
          val = list(node_to_primary[node.name])
1331
        elif field == "sinst_list":
1332
          val = list(node_to_secondary[node.name])
1333
        elif field == "pinst_cnt":
1334
          val = len(node_to_primary[node.name])
1335
        elif field == "sinst_cnt":
1336
          val = len(node_to_secondary[node.name])
1337
        elif field == "pip":
1338
          val = node.primary_ip
1339
        elif field == "sip":
1340
          val = node.secondary_ip
1341
        elif field in self.dynamic_fields:
1342
          val = live_data[node.name].get(field, None)
1343
        else:
1344
          raise errors.ParameterError(field)
1345
        node_output.append(val)
1346
      output.append(node_output)
1347

    
1348
    return output
1349

    
1350

    
1351
class LUQueryNodeVolumes(NoHooksLU):
1352
  """Logical unit for getting volumes on node(s).
1353

1354
  """
1355
  _OP_REQP = ["nodes", "output_fields"]
1356

    
1357
  def CheckPrereq(self):
1358
    """Check prerequisites.
1359

1360
    This checks that the fields required are valid output fields.
1361

1362
    """
1363
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1364

    
1365
    _CheckOutputFields(static=["node"],
1366
                       dynamic=["phys", "vg", "name", "size", "instance"],
1367
                       selected=self.op.output_fields)
1368

    
1369

    
1370
  def Exec(self, feedback_fn):
1371
    """Computes the list of nodes and their attributes.
1372

1373
    """
1374
    nodenames = self.nodes
1375
    volumes = rpc.call_node_volumes(nodenames)
1376

    
1377
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1378
             in self.cfg.GetInstanceList()]
1379

    
1380
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1381

    
1382
    output = []
1383
    for node in nodenames:
1384
      if node not in volumes or not volumes[node]:
1385
        continue
1386

    
1387
      node_vols = volumes[node][:]
1388
      node_vols.sort(key=lambda vol: vol['dev'])
1389

    
1390
      for vol in node_vols:
1391
        node_output = []
1392
        for field in self.op.output_fields:
1393
          if field == "node":
1394
            val = node
1395
          elif field == "phys":
1396
            val = vol['dev']
1397
          elif field == "vg":
1398
            val = vol['vg']
1399
          elif field == "name":
1400
            val = vol['name']
1401
          elif field == "size":
1402
            val = int(float(vol['size']))
1403
          elif field == "instance":
1404
            for inst in ilist:
1405
              if node not in lv_by_node[inst]:
1406
                continue
1407
              if vol['name'] in lv_by_node[inst][node]:
1408
                val = inst.name
1409
                break
1410
            else:
1411
              val = '-'
1412
          else:
1413
            raise errors.ParameterError(field)
1414
          node_output.append(str(val))
1415

    
1416
        output.append(node_output)
1417

    
1418
    return output
1419

    
1420

    
1421
class LUAddNode(LogicalUnit):
1422
  """Logical unit for adding node to the cluster.
1423

1424
  """
1425
  HPATH = "node-add"
1426
  HTYPE = constants.HTYPE_NODE
1427
  _OP_REQP = ["node_name"]
1428

    
1429
  def BuildHooksEnv(self):
1430
    """Build hooks env.
1431

1432
    This will run on all nodes before, and on all nodes + the new node after.
1433

1434
    """
1435
    env = {
1436
      "OP_TARGET": self.op.node_name,
1437
      "NODE_NAME": self.op.node_name,
1438
      "NODE_PIP": self.op.primary_ip,
1439
      "NODE_SIP": self.op.secondary_ip,
1440
      }
1441
    nodes_0 = self.cfg.GetNodeList()
1442
    nodes_1 = nodes_0 + [self.op.node_name, ]
1443
    return env, nodes_0, nodes_1
1444

    
1445
  def CheckPrereq(self):
1446
    """Check prerequisites.
1447

1448
    This checks:
1449
     - the new node is not already in the config
1450
     - it is resolvable
1451
     - its parameters (single/dual homed) matches the cluster
1452

1453
    Any errors are signalled by raising errors.OpPrereqError.
1454

1455
    """
1456
    node_name = self.op.node_name
1457
    cfg = self.cfg
1458

    
1459
    dns_data = utils.HostInfo(node_name)
1460

    
1461
    node = dns_data.name
1462
    primary_ip = self.op.primary_ip = dns_data.ip
1463
    secondary_ip = getattr(self.op, "secondary_ip", None)
1464
    if secondary_ip is None:
1465
      secondary_ip = primary_ip
1466
    if not utils.IsValidIP(secondary_ip):
1467
      raise errors.OpPrereqError("Invalid secondary IP given")
1468
    self.op.secondary_ip = secondary_ip
1469
    node_list = cfg.GetNodeList()
1470
    if node in node_list:
1471
      raise errors.OpPrereqError("Node %s is already in the configuration"
1472
                                 % node)
1473

    
1474
    for existing_node_name in node_list:
1475
      existing_node = cfg.GetNodeInfo(existing_node_name)
1476
      if (existing_node.primary_ip == primary_ip or
1477
          existing_node.secondary_ip == primary_ip or
1478
          existing_node.primary_ip == secondary_ip or
1479
          existing_node.secondary_ip == secondary_ip):
1480
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1481
                                   " existing node %s" % existing_node.name)
1482

    
1483
    # check that the type of the node (single versus dual homed) is the
1484
    # same as for the master
1485
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1486
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1487
    newbie_singlehomed = secondary_ip == primary_ip
1488
    if master_singlehomed != newbie_singlehomed:
1489
      if master_singlehomed:
1490
        raise errors.OpPrereqError("The master has no private ip but the"
1491
                                   " new node has one")
1492
      else:
1493
        raise errors.OpPrereqError("The master has a private ip but the"
1494
                                   " new node doesn't have one")
1495

    
1496
    # checks reachablity
1497
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1498
      raise errors.OpPrereqError("Node not reachable by ping")
1499

    
1500
    if not newbie_singlehomed:
1501
      # check reachability from my secondary ip to newbie's secondary ip
1502
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1503
                           source=myself.secondary_ip):
1504
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1505
                                   " based ping to noded port")
1506

    
1507
    self.new_node = objects.Node(name=node,
1508
                                 primary_ip=primary_ip,
1509
                                 secondary_ip=secondary_ip)
1510

    
1511
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1512
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1513
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1514
                                   constants.VNC_PASSWORD_FILE)
1515

    
1516
  def Exec(self, feedback_fn):
1517
    """Adds the new node to the cluster.
1518

1519
    """
1520
    new_node = self.new_node
1521
    node = new_node.name
1522

    
1523
    # set up inter-node password and certificate and restarts the node daemon
1524
    gntpass = self.sstore.GetNodeDaemonPassword()
1525
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1526
      raise errors.OpExecError("ganeti password corruption detected")
1527
    f = open(constants.SSL_CERT_FILE)
1528
    try:
1529
      gntpem = f.read(8192)
1530
    finally:
1531
      f.close()
1532
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1533
    # so we use this to detect an invalid certificate; as long as the
1534
    # cert doesn't contain this, the here-document will be correctly
1535
    # parsed by the shell sequence below
1536
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1537
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1538
    if not gntpem.endswith("\n"):
1539
      raise errors.OpExecError("PEM must end with newline")
1540
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1541

    
1542
    # and then connect with ssh to set password and start ganeti-noded
1543
    # note that all the below variables are sanitized at this point,
1544
    # either by being constants or by the checks above
1545
    ss = self.sstore
1546
    mycommand = ("umask 077 && "
1547
                 "echo '%s' > '%s' && "
1548
                 "cat > '%s' << '!EOF.' && \n"
1549
                 "%s!EOF.\n%s restart" %
1550
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1551
                  constants.SSL_CERT_FILE, gntpem,
1552
                  constants.NODE_INITD_SCRIPT))
1553

    
1554
    result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1555
    if result.failed:
1556
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1557
                               " output: %s" %
1558
                               (node, result.fail_reason, result.output))
1559

    
1560
    # check connectivity
1561
    time.sleep(4)
1562

    
1563
    result = rpc.call_version([node])[node]
1564
    if result:
1565
      if constants.PROTOCOL_VERSION == result:
1566
        logger.Info("communication to node %s fine, sw version %s match" %
1567
                    (node, result))
1568
      else:
1569
        raise errors.OpExecError("Version mismatch master version %s,"
1570
                                 " node version %s" %
1571
                                 (constants.PROTOCOL_VERSION, result))
1572
    else:
1573
      raise errors.OpExecError("Cannot get version from the new node")
1574

    
1575
    # setup ssh on node
1576
    logger.Info("copy ssh key to node %s" % node)
1577
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1578
    keyarray = []
1579
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1580
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1581
                priv_key, pub_key]
1582

    
1583
    for i in keyfiles:
1584
      f = open(i, 'r')
1585
      try:
1586
        keyarray.append(f.read())
1587
      finally:
1588
        f.close()
1589

    
1590
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1591
                               keyarray[3], keyarray[4], keyarray[5])
1592

    
1593
    if not result:
1594
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1595

    
1596
    # Add node to our /etc/hosts, and add key to known_hosts
1597
    _AddHostToEtcHosts(new_node.name)
1598

    
1599
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1600
                      self.cfg.GetHostKey())
1601

    
1602
    if new_node.secondary_ip != new_node.primary_ip:
1603
      if not rpc.call_node_tcp_ping(new_node.name,
1604
                                    constants.LOCALHOST_IP_ADDRESS,
1605
                                    new_node.secondary_ip,
1606
                                    constants.DEFAULT_NODED_PORT,
1607
                                    10, False):
1608
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1609
                                 " you gave (%s). Please fix and re-run this"
1610
                                 " command." % new_node.secondary_ip)
1611

    
1612
    success, msg = self.ssh.VerifyNodeHostname(node)
1613
    if not success:
1614
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1615
                               " than the one the resolver gives: %s."
1616
                               " Please fix and re-run this command." %
1617
                               (node, msg))
1618

    
1619
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1620
    # including the node just added
1621
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1622
    dist_nodes = self.cfg.GetNodeList() + [node]
1623
    if myself.name in dist_nodes:
1624
      dist_nodes.remove(myself.name)
1625

    
1626
    logger.Debug("Copying hosts and known_hosts to all nodes")
1627
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1628
      result = rpc.call_upload_file(dist_nodes, fname)
1629
      for to_node in dist_nodes:
1630
        if not result[to_node]:
1631
          logger.Error("copy of file %s to node %s failed" %
1632
                       (fname, to_node))
1633

    
1634
    to_copy = ss.GetFileList()
1635
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1636
      to_copy.append(constants.VNC_PASSWORD_FILE)
1637
    for fname in to_copy:
1638
      if not self.ssh.CopyFileToNode(node, fname):
1639
        logger.Error("could not copy file %s to node %s" % (fname, node))
1640

    
1641
    logger.Info("adding node %s to cluster.conf" % node)
1642
    self.cfg.AddNode(new_node)
1643

    
1644

    
1645
class LUMasterFailover(LogicalUnit):
1646
  """Failover the master node to the current node.
1647

1648
  This is a special LU in that it must run on a non-master node.
1649

1650
  """
1651
  HPATH = "master-failover"
1652
  HTYPE = constants.HTYPE_CLUSTER
1653
  REQ_MASTER = False
1654
  _OP_REQP = []
1655

    
1656
  def BuildHooksEnv(self):
1657
    """Build hooks env.
1658

1659
    This will run on the new master only in the pre phase, and on all
1660
    the nodes in the post phase.
1661

1662
    """
1663
    env = {
1664
      "OP_TARGET": self.new_master,
1665
      "NEW_MASTER": self.new_master,
1666
      "OLD_MASTER": self.old_master,
1667
      }
1668
    return env, [self.new_master], self.cfg.GetNodeList()
1669

    
1670
  def CheckPrereq(self):
1671
    """Check prerequisites.
1672

1673
    This checks that we are not already the master.
1674

1675
    """
1676
    self.new_master = utils.HostInfo().name
1677
    self.old_master = self.sstore.GetMasterNode()
1678

    
1679
    if self.old_master == self.new_master:
1680
      raise errors.OpPrereqError("This commands must be run on the node"
1681
                                 " where you want the new master to be."
1682
                                 " %s is already the master" %
1683
                                 self.old_master)
1684

    
1685
  def Exec(self, feedback_fn):
1686
    """Failover the master node.
1687

1688
    This command, when run on a non-master node, will cause the current
1689
    master to cease being master, and the non-master to become new
1690
    master.
1691

1692
    """
1693
    #TODO: do not rely on gethostname returning the FQDN
1694
    logger.Info("setting master to %s, old master: %s" %
1695
                (self.new_master, self.old_master))
1696

    
1697
    if not rpc.call_node_stop_master(self.old_master):
1698
      logger.Error("could disable the master role on the old master"
1699
                   " %s, please disable manually" % self.old_master)
1700

    
1701
    ss = self.sstore
1702
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1703
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1704
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1705
      logger.Error("could not distribute the new simple store master file"
1706
                   " to the other nodes, please check.")
1707

    
1708
    if not rpc.call_node_start_master(self.new_master):
1709
      logger.Error("could not start the master role on the new master"
1710
                   " %s, please check" % self.new_master)
1711
      feedback_fn("Error in activating the master IP on the new master,"
1712
                  " please fix manually.")
1713

    
1714

    
1715

    
1716
class LUQueryClusterInfo(NoHooksLU):
1717
  """Query cluster configuration.
1718

1719
  """
1720
  _OP_REQP = []
1721
  REQ_MASTER = False
1722

    
1723
  def CheckPrereq(self):
1724
    """No prerequsites needed for this LU.
1725

1726
    """
1727
    pass
1728

    
1729
  def Exec(self, feedback_fn):
1730
    """Return cluster config.
1731

1732
    """
1733
    result = {
1734
      "name": self.sstore.GetClusterName(),
1735
      "software_version": constants.RELEASE_VERSION,
1736
      "protocol_version": constants.PROTOCOL_VERSION,
1737
      "config_version": constants.CONFIG_VERSION,
1738
      "os_api_version": constants.OS_API_VERSION,
1739
      "export_version": constants.EXPORT_VERSION,
1740
      "master": self.sstore.GetMasterNode(),
1741
      "architecture": (platform.architecture()[0], platform.machine()),
1742
      }
1743

    
1744
    return result
1745

    
1746

    
1747
class LUClusterCopyFile(NoHooksLU):
1748
  """Copy file to cluster.
1749

1750
  """
1751
  _OP_REQP = ["nodes", "filename"]
1752

    
1753
  def CheckPrereq(self):
1754
    """Check prerequisites.
1755

1756
    It should check that the named file exists and that the given list
1757
    of nodes is valid.
1758

1759
    """
1760
    if not os.path.exists(self.op.filename):
1761
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1762

    
1763
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1764

    
1765
  def Exec(self, feedback_fn):
1766
    """Copy a file from master to some nodes.
1767

1768
    Args:
1769
      opts - class with options as members
1770
      args - list containing a single element, the file name
1771
    Opts used:
1772
      nodes - list containing the name of target nodes; if empty, all nodes
1773

1774
    """
1775
    filename = self.op.filename
1776

    
1777
    myname = utils.HostInfo().name
1778

    
1779
    for node in self.nodes:
1780
      if node == myname:
1781
        continue
1782
      if not self.ssh.CopyFileToNode(node, filename):
1783
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1784

    
1785

    
1786
class LUDumpClusterConfig(NoHooksLU):
1787
  """Return a text-representation of the cluster-config.
1788

1789
  """
1790
  _OP_REQP = []
1791

    
1792
  def CheckPrereq(self):
1793
    """No prerequisites.
1794

1795
    """
1796
    pass
1797

    
1798
  def Exec(self, feedback_fn):
1799
    """Dump a representation of the cluster config to the standard output.
1800

1801
    """
1802
    return self.cfg.DumpConfig()
1803

    
1804

    
1805
class LURunClusterCommand(NoHooksLU):
1806
  """Run a command on some nodes.
1807

1808
  """
1809
  _OP_REQP = ["command", "nodes"]
1810

    
1811
  def CheckPrereq(self):
1812
    """Check prerequisites.
1813

1814
    It checks that the given list of nodes is valid.
1815

1816
    """
1817
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1818

    
1819
  def Exec(self, feedback_fn):
1820
    """Run a command on some nodes.
1821

1822
    """
1823
    data = []
1824
    for node in self.nodes:
1825
      result = self.ssh.Run(node, "root", self.op.command)
1826
      data.append((node, result.output, result.exit_code))
1827

    
1828
    return data
1829

    
1830

    
1831
class LUActivateInstanceDisks(NoHooksLU):
1832
  """Bring up an instance's disks.
1833

1834
  """
1835
  _OP_REQP = ["instance_name"]
1836

    
1837
  def CheckPrereq(self):
1838
    """Check prerequisites.
1839

1840
    This checks that the instance is in the cluster.
1841

1842
    """
1843
    instance = self.cfg.GetInstanceInfo(
1844
      self.cfg.ExpandInstanceName(self.op.instance_name))
1845
    if instance is None:
1846
      raise errors.OpPrereqError("Instance '%s' not known" %
1847
                                 self.op.instance_name)
1848
    self.instance = instance
1849

    
1850

    
1851
  def Exec(self, feedback_fn):
1852
    """Activate the disks.
1853

1854
    """
1855
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1856
    if not disks_ok:
1857
      raise errors.OpExecError("Cannot activate block devices")
1858

    
1859
    return disks_info
1860

    
1861

    
1862
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1863
  """Prepare the block devices for an instance.
1864

1865
  This sets up the block devices on all nodes.
1866

1867
  Args:
1868
    instance: a ganeti.objects.Instance object
1869
    ignore_secondaries: if true, errors on secondary nodes won't result
1870
                        in an error return from the function
1871

1872
  Returns:
1873
    false if the operation failed
1874
    list of (host, instance_visible_name, node_visible_name) if the operation
1875
         suceeded with the mapping from node devices to instance devices
1876
  """
1877
  device_info = []
1878
  disks_ok = True
1879
  iname = instance.name
1880
  # With the two passes mechanism we try to reduce the window of
1881
  # opportunity for the race condition of switching DRBD to primary
1882
  # before handshaking occured, but we do not eliminate it
1883

    
1884
  # The proper fix would be to wait (with some limits) until the
1885
  # connection has been made and drbd transitions from WFConnection
1886
  # into any other network-connected state (Connected, SyncTarget,
1887
  # SyncSource, etc.)
1888

    
1889
  # 1st pass, assemble on all nodes in secondary mode
1890
  for inst_disk in instance.disks:
1891
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1892
      cfg.SetDiskID(node_disk, node)
1893
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1894
      if not result:
1895
        logger.Error("could not prepare block device %s on node %s"
1896
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1897
        if not ignore_secondaries:
1898
          disks_ok = False
1899

    
1900
  # FIXME: race condition on drbd migration to primary
1901

    
1902
  # 2nd pass, do only the primary node
1903
  for inst_disk in instance.disks:
1904
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1905
      if node != instance.primary_node:
1906
        continue
1907
      cfg.SetDiskID(node_disk, node)
1908
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1909
      if not result:
1910
        logger.Error("could not prepare block device %s on node %s"
1911
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1912
        disks_ok = False
1913
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1914

    
1915
  # leave the disks configured for the primary node
1916
  # this is a workaround that would be fixed better by
1917
  # improving the logical/physical id handling
1918
  for disk in instance.disks:
1919
    cfg.SetDiskID(disk, instance.primary_node)
1920

    
1921
  return disks_ok, device_info
1922

    
1923

    
1924
def _StartInstanceDisks(cfg, instance, force):
1925
  """Start the disks of an instance.
1926

1927
  """
1928
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1929
                                           ignore_secondaries=force)
1930
  if not disks_ok:
1931
    _ShutdownInstanceDisks(instance, cfg)
1932
    if force is not None and not force:
1933
      logger.Error("If the message above refers to a secondary node,"
1934
                   " you can retry the operation using '--force'.")
1935
    raise errors.OpExecError("Disk consistency error")
1936

    
1937

    
1938
class LUDeactivateInstanceDisks(NoHooksLU):
1939
  """Shutdown an instance's disks.
1940

1941
  """
1942
  _OP_REQP = ["instance_name"]
1943

    
1944
  def CheckPrereq(self):
1945
    """Check prerequisites.
1946

1947
    This checks that the instance is in the cluster.
1948

1949
    """
1950
    instance = self.cfg.GetInstanceInfo(
1951
      self.cfg.ExpandInstanceName(self.op.instance_name))
1952
    if instance is None:
1953
      raise errors.OpPrereqError("Instance '%s' not known" %
1954
                                 self.op.instance_name)
1955
    self.instance = instance
1956

    
1957
  def Exec(self, feedback_fn):
1958
    """Deactivate the disks
1959

1960
    """
1961
    instance = self.instance
1962
    ins_l = rpc.call_instance_list([instance.primary_node])
1963
    ins_l = ins_l[instance.primary_node]
1964
    if not type(ins_l) is list:
1965
      raise errors.OpExecError("Can't contact node '%s'" %
1966
                               instance.primary_node)
1967

    
1968
    if self.instance.name in ins_l:
1969
      raise errors.OpExecError("Instance is running, can't shutdown"
1970
                               " block devices.")
1971

    
1972
    _ShutdownInstanceDisks(instance, self.cfg)
1973

    
1974

    
1975
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1976
  """Shutdown block devices of an instance.
1977

1978
  This does the shutdown on all nodes of the instance.
1979

1980
  If the ignore_primary is false, errors on the primary node are
1981
  ignored.
1982

1983
  """
1984
  result = True
1985
  for disk in instance.disks:
1986
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1987
      cfg.SetDiskID(top_disk, node)
1988
      if not rpc.call_blockdev_shutdown(node, top_disk):
1989
        logger.Error("could not shutdown block device %s on node %s" %
1990
                     (disk.iv_name, node))
1991
        if not ignore_primary or node != instance.primary_node:
1992
          result = False
1993
  return result
1994

    
1995

    
1996
def _CheckNodeFreeMemory(cfg, node, reason, requested):
1997
  """Checks if a node has enough free memory.
1998

1999
  This function check if a given node has the needed amount of free
2000
  memory. In case the node has less memory or we cannot get the
2001
  information from the node, this function raise an OpPrereqError
2002
  exception.
2003

2004
  Args:
2005
    - cfg: a ConfigWriter instance
2006
    - node: the node name
2007
    - reason: string to use in the error message
2008
    - requested: the amount of memory in MiB
2009

2010
  """
2011
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2012
  if not nodeinfo or not isinstance(nodeinfo, dict):
2013
    raise errors.OpPrereqError("Could not contact node %s for resource"
2014
                             " information" % (node,))
2015

    
2016
  free_mem = nodeinfo[node].get('memory_free')
2017
  if not isinstance(free_mem, int):
2018
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2019
                             " was '%s'" % (node, free_mem))
2020
  if requested > free_mem:
2021
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2022
                             " needed %s MiB, available %s MiB" %
2023
                             (node, reason, requested, free_mem))
2024

    
2025

    
2026
class LUStartupInstance(LogicalUnit):
2027
  """Starts an instance.
2028

2029
  """
2030
  HPATH = "instance-start"
2031
  HTYPE = constants.HTYPE_INSTANCE
2032
  _OP_REQP = ["instance_name", "force"]
2033

    
2034
  def BuildHooksEnv(self):
2035
    """Build hooks env.
2036

2037
    This runs on master, primary and secondary nodes of the instance.
2038

2039
    """
2040
    env = {
2041
      "FORCE": self.op.force,
2042
      }
2043
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2044
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2045
          list(self.instance.secondary_nodes))
2046
    return env, nl, nl
2047

    
2048
  def CheckPrereq(self):
2049
    """Check prerequisites.
2050

2051
    This checks that the instance is in the cluster.
2052

2053
    """
2054
    instance = self.cfg.GetInstanceInfo(
2055
      self.cfg.ExpandInstanceName(self.op.instance_name))
2056
    if instance is None:
2057
      raise errors.OpPrereqError("Instance '%s' not known" %
2058
                                 self.op.instance_name)
2059

    
2060
    # check bridges existance
2061
    _CheckInstanceBridgesExist(instance)
2062

    
2063
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2064
                         "starting instance %s" % instance.name,
2065
                         instance.memory)
2066

    
2067
    self.instance = instance
2068
    self.op.instance_name = instance.name
2069

    
2070
  def Exec(self, feedback_fn):
2071
    """Start the instance.
2072

2073
    """
2074
    instance = self.instance
2075
    force = self.op.force
2076
    extra_args = getattr(self.op, "extra_args", "")
2077

    
2078
    node_current = instance.primary_node
2079

    
2080
    _StartInstanceDisks(self.cfg, instance, force)
2081

    
2082
    if not rpc.call_instance_start(node_current, instance, extra_args):
2083
      _ShutdownInstanceDisks(instance, self.cfg)
2084
      raise errors.OpExecError("Could not start instance")
2085

    
2086
    self.cfg.MarkInstanceUp(instance.name)
2087

    
2088

    
2089
class LURebootInstance(LogicalUnit):
2090
  """Reboot an instance.
2091

2092
  """
2093
  HPATH = "instance-reboot"
2094
  HTYPE = constants.HTYPE_INSTANCE
2095
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2096

    
2097
  def BuildHooksEnv(self):
2098
    """Build hooks env.
2099

2100
    This runs on master, primary and secondary nodes of the instance.
2101

2102
    """
2103
    env = {
2104
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2105
      }
2106
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2107
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2108
          list(self.instance.secondary_nodes))
2109
    return env, nl, nl
2110

    
2111
  def CheckPrereq(self):
2112
    """Check prerequisites.
2113

2114
    This checks that the instance is in the cluster.
2115

2116
    """
2117
    instance = self.cfg.GetInstanceInfo(
2118
      self.cfg.ExpandInstanceName(self.op.instance_name))
2119
    if instance is None:
2120
      raise errors.OpPrereqError("Instance '%s' not known" %
2121
                                 self.op.instance_name)
2122

    
2123
    # check bridges existance
2124
    _CheckInstanceBridgesExist(instance)
2125

    
2126
    self.instance = instance
2127
    self.op.instance_name = instance.name
2128

    
2129
  def Exec(self, feedback_fn):
2130
    """Reboot the instance.
2131

2132
    """
2133
    instance = self.instance
2134
    ignore_secondaries = self.op.ignore_secondaries
2135
    reboot_type = self.op.reboot_type
2136
    extra_args = getattr(self.op, "extra_args", "")
2137

    
2138
    node_current = instance.primary_node
2139

    
2140
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2141
                           constants.INSTANCE_REBOOT_HARD,
2142
                           constants.INSTANCE_REBOOT_FULL]:
2143
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2144
                                  (constants.INSTANCE_REBOOT_SOFT,
2145
                                   constants.INSTANCE_REBOOT_HARD,
2146
                                   constants.INSTANCE_REBOOT_FULL))
2147

    
2148
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2149
                       constants.INSTANCE_REBOOT_HARD]:
2150
      if not rpc.call_instance_reboot(node_current, instance,
2151
                                      reboot_type, extra_args):
2152
        raise errors.OpExecError("Could not reboot instance")
2153
    else:
2154
      if not rpc.call_instance_shutdown(node_current, instance):
2155
        raise errors.OpExecError("could not shutdown instance for full reboot")
2156
      _ShutdownInstanceDisks(instance, self.cfg)
2157
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2158
      if not rpc.call_instance_start(node_current, instance, extra_args):
2159
        _ShutdownInstanceDisks(instance, self.cfg)
2160
        raise errors.OpExecError("Could not start instance for full reboot")
2161

    
2162
    self.cfg.MarkInstanceUp(instance.name)
2163

    
2164

    
2165
class LUShutdownInstance(LogicalUnit):
2166
  """Shutdown an instance.
2167

2168
  """
2169
  HPATH = "instance-stop"
2170
  HTYPE = constants.HTYPE_INSTANCE
2171
  _OP_REQP = ["instance_name"]
2172

    
2173
  def BuildHooksEnv(self):
2174
    """Build hooks env.
2175

2176
    This runs on master, primary and secondary nodes of the instance.
2177

2178
    """
2179
    env = _BuildInstanceHookEnvByObject(self.instance)
2180
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2181
          list(self.instance.secondary_nodes))
2182
    return env, nl, nl
2183

    
2184
  def CheckPrereq(self):
2185
    """Check prerequisites.
2186

2187
    This checks that the instance is in the cluster.
2188

2189
    """
2190
    instance = self.cfg.GetInstanceInfo(
2191
      self.cfg.ExpandInstanceName(self.op.instance_name))
2192
    if instance is None:
2193
      raise errors.OpPrereqError("Instance '%s' not known" %
2194
                                 self.op.instance_name)
2195
    self.instance = instance
2196

    
2197
  def Exec(self, feedback_fn):
2198
    """Shutdown the instance.
2199

2200
    """
2201
    instance = self.instance
2202
    node_current = instance.primary_node
2203
    if not rpc.call_instance_shutdown(node_current, instance):
2204
      logger.Error("could not shutdown instance")
2205

    
2206
    self.cfg.MarkInstanceDown(instance.name)
2207
    _ShutdownInstanceDisks(instance, self.cfg)
2208

    
2209

    
2210
class LUReinstallInstance(LogicalUnit):
2211
  """Reinstall an instance.
2212

2213
  """
2214
  HPATH = "instance-reinstall"
2215
  HTYPE = constants.HTYPE_INSTANCE
2216
  _OP_REQP = ["instance_name"]
2217

    
2218
  def BuildHooksEnv(self):
2219
    """Build hooks env.
2220

2221
    This runs on master, primary and secondary nodes of the instance.
2222

2223
    """
2224
    env = _BuildInstanceHookEnvByObject(self.instance)
2225
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2226
          list(self.instance.secondary_nodes))
2227
    return env, nl, nl
2228

    
2229
  def CheckPrereq(self):
2230
    """Check prerequisites.
2231

2232
    This checks that the instance is in the cluster and is not running.
2233

2234
    """
2235
    instance = self.cfg.GetInstanceInfo(
2236
      self.cfg.ExpandInstanceName(self.op.instance_name))
2237
    if instance is None:
2238
      raise errors.OpPrereqError("Instance '%s' not known" %
2239
                                 self.op.instance_name)
2240
    if instance.disk_template == constants.DT_DISKLESS:
2241
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2242
                                 self.op.instance_name)
2243
    if instance.status != "down":
2244
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2245
                                 self.op.instance_name)
2246
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2247
    if remote_info:
2248
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2249
                                 (self.op.instance_name,
2250
                                  instance.primary_node))
2251

    
2252
    self.op.os_type = getattr(self.op, "os_type", None)
2253
    if self.op.os_type is not None:
2254
      # OS verification
2255
      pnode = self.cfg.GetNodeInfo(
2256
        self.cfg.ExpandNodeName(instance.primary_node))
2257
      if pnode is None:
2258
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2259
                                   self.op.pnode)
2260
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2261
      if not os_obj:
2262
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2263
                                   " primary node"  % self.op.os_type)
2264

    
2265
    self.instance = instance
2266

    
2267
  def Exec(self, feedback_fn):
2268
    """Reinstall the instance.
2269

2270
    """
2271
    inst = self.instance
2272

    
2273
    if self.op.os_type is not None:
2274
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2275
      inst.os = self.op.os_type
2276
      self.cfg.AddInstance(inst)
2277

    
2278
    _StartInstanceDisks(self.cfg, inst, None)
2279
    try:
2280
      feedback_fn("Running the instance OS create scripts...")
2281
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2282
        raise errors.OpExecError("Could not install OS for instance %s"
2283
                                 " on node %s" %
2284
                                 (inst.name, inst.primary_node))
2285
    finally:
2286
      _ShutdownInstanceDisks(inst, self.cfg)
2287

    
2288

    
2289
class LURenameInstance(LogicalUnit):
2290
  """Rename an instance.
2291

2292
  """
2293
  HPATH = "instance-rename"
2294
  HTYPE = constants.HTYPE_INSTANCE
2295
  _OP_REQP = ["instance_name", "new_name"]
2296

    
2297
  def BuildHooksEnv(self):
2298
    """Build hooks env.
2299

2300
    This runs on master, primary and secondary nodes of the instance.
2301

2302
    """
2303
    env = _BuildInstanceHookEnvByObject(self.instance)
2304
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2305
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2306
          list(self.instance.secondary_nodes))
2307
    return env, nl, nl
2308

    
2309
  def CheckPrereq(self):
2310
    """Check prerequisites.
2311

2312
    This checks that the instance is in the cluster and is not running.
2313

2314
    """
2315
    instance = self.cfg.GetInstanceInfo(
2316
      self.cfg.ExpandInstanceName(self.op.instance_name))
2317
    if instance is None:
2318
      raise errors.OpPrereqError("Instance '%s' not known" %
2319
                                 self.op.instance_name)
2320
    if instance.status != "down":
2321
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2322
                                 self.op.instance_name)
2323
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2324
    if remote_info:
2325
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2326
                                 (self.op.instance_name,
2327
                                  instance.primary_node))
2328
    self.instance = instance
2329

    
2330
    # new name verification
2331
    name_info = utils.HostInfo(self.op.new_name)
2332

    
2333
    self.op.new_name = new_name = name_info.name
2334
    instance_list = self.cfg.GetInstanceList()
2335
    if new_name in instance_list:
2336
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2337
                                 instance_name)
2338

    
2339
    if not getattr(self.op, "ignore_ip", False):
2340
      command = ["fping", "-q", name_info.ip]
2341
      result = utils.RunCmd(command)
2342
      if not result.failed:
2343
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2344
                                   (name_info.ip, new_name))
2345

    
2346

    
2347
  def Exec(self, feedback_fn):
2348
    """Reinstall the instance.
2349

2350
    """
2351
    inst = self.instance
2352
    old_name = inst.name
2353

    
2354
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2355

    
2356
    # re-read the instance from the configuration after rename
2357
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2358

    
2359
    _StartInstanceDisks(self.cfg, inst, None)
2360
    try:
2361
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2362
                                          "sda", "sdb"):
2363
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2364
               " instance has been renamed in Ganeti)" %
2365
               (inst.name, inst.primary_node))
2366
        logger.Error(msg)
2367
    finally:
2368
      _ShutdownInstanceDisks(inst, self.cfg)
2369

    
2370

    
2371
class LURemoveInstance(LogicalUnit):
2372
  """Remove an instance.
2373

2374
  """
2375
  HPATH = "instance-remove"
2376
  HTYPE = constants.HTYPE_INSTANCE
2377
  _OP_REQP = ["instance_name"]
2378

    
2379
  def BuildHooksEnv(self):
2380
    """Build hooks env.
2381

2382
    This runs on master, primary and secondary nodes of the instance.
2383

2384
    """
2385
    env = _BuildInstanceHookEnvByObject(self.instance)
2386
    nl = [self.sstore.GetMasterNode()]
2387
    return env, nl, nl
2388

    
2389
  def CheckPrereq(self):
2390
    """Check prerequisites.
2391

2392
    This checks that the instance is in the cluster.
2393

2394
    """
2395
    instance = self.cfg.GetInstanceInfo(
2396
      self.cfg.ExpandInstanceName(self.op.instance_name))
2397
    if instance is None:
2398
      raise errors.OpPrereqError("Instance '%s' not known" %
2399
                                 self.op.instance_name)
2400
    self.instance = instance
2401

    
2402
  def Exec(self, feedback_fn):
2403
    """Remove the instance.
2404

2405
    """
2406
    instance = self.instance
2407
    logger.Info("shutting down instance %s on node %s" %
2408
                (instance.name, instance.primary_node))
2409

    
2410
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2411
      if self.op.ignore_failures:
2412
        feedback_fn("Warning: can't shutdown instance")
2413
      else:
2414
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2415
                                 (instance.name, instance.primary_node))
2416

    
2417
    logger.Info("removing block devices for instance %s" % instance.name)
2418

    
2419
    if not _RemoveDisks(instance, self.cfg):
2420
      if self.op.ignore_failures:
2421
        feedback_fn("Warning: can't remove instance's disks")
2422
      else:
2423
        raise errors.OpExecError("Can't remove instance's disks")
2424

    
2425
    logger.Info("removing instance %s out of cluster config" % instance.name)
2426

    
2427
    self.cfg.RemoveInstance(instance.name)
2428

    
2429

    
2430
class LUQueryInstances(NoHooksLU):
2431
  """Logical unit for querying instances.
2432

2433
  """
2434
  _OP_REQP = ["output_fields", "names"]
2435

    
2436
  def CheckPrereq(self):
2437
    """Check prerequisites.
2438

2439
    This checks that the fields required are valid output fields.
2440

2441
    """
2442
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2443
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2444
                               "admin_state", "admin_ram",
2445
                               "disk_template", "ip", "mac", "bridge",
2446
                               "sda_size", "sdb_size", "vcpus"],
2447
                       dynamic=self.dynamic_fields,
2448
                       selected=self.op.output_fields)
2449

    
2450
    self.wanted = _GetWantedInstances(self, self.op.names)
2451

    
2452
  def Exec(self, feedback_fn):
2453
    """Computes the list of nodes and their attributes.
2454

2455
    """
2456
    instance_names = self.wanted
2457
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2458
                     in instance_names]
2459

    
2460
    # begin data gathering
2461

    
2462
    nodes = frozenset([inst.primary_node for inst in instance_list])
2463

    
2464
    bad_nodes = []
2465
    if self.dynamic_fields.intersection(self.op.output_fields):
2466
      live_data = {}
2467
      node_data = rpc.call_all_instances_info(nodes)
2468
      for name in nodes:
2469
        result = node_data[name]
2470
        if result:
2471
          live_data.update(result)
2472
        elif result == False:
2473
          bad_nodes.append(name)
2474
        # else no instance is alive
2475
    else:
2476
      live_data = dict([(name, {}) for name in instance_names])
2477

    
2478
    # end data gathering
2479

    
2480
    output = []
2481
    for instance in instance_list:
2482
      iout = []
2483
      for field in self.op.output_fields:
2484
        if field == "name":
2485
          val = instance.name
2486
        elif field == "os":
2487
          val = instance.os
2488
        elif field == "pnode":
2489
          val = instance.primary_node
2490
        elif field == "snodes":
2491
          val = list(instance.secondary_nodes)
2492
        elif field == "admin_state":
2493
          val = (instance.status != "down")
2494
        elif field == "oper_state":
2495
          if instance.primary_node in bad_nodes:
2496
            val = None
2497
          else:
2498
            val = bool(live_data.get(instance.name))
2499
        elif field == "status":
2500
          if instance.primary_node in bad_nodes:
2501
            val = "ERROR_nodedown"
2502
          else:
2503
            running = bool(live_data.get(instance.name))
2504
            if running:
2505
              if instance.status != "down":
2506
                val = "running"
2507
              else:
2508
                val = "ERROR_up"
2509
            else:
2510
              if instance.status != "down":
2511
                val = "ERROR_down"
2512
              else:
2513
                val = "ADMIN_down"
2514
        elif field == "admin_ram":
2515
          val = instance.memory
2516
        elif field == "oper_ram":
2517
          if instance.primary_node in bad_nodes:
2518
            val = None
2519
          elif instance.name in live_data:
2520
            val = live_data[instance.name].get("memory", "?")
2521
          else:
2522
            val = "-"
2523
        elif field == "disk_template":
2524
          val = instance.disk_template
2525
        elif field == "ip":
2526
          val = instance.nics[0].ip
2527
        elif field == "bridge":
2528
          val = instance.nics[0].bridge
2529
        elif field == "mac":
2530
          val = instance.nics[0].mac
2531
        elif field == "sda_size" or field == "sdb_size":
2532
          disk = instance.FindDisk(field[:3])
2533
          if disk is None:
2534
            val = None
2535
          else:
2536
            val = disk.size
2537
        elif field == "vcpus":
2538
          val = instance.vcpus
2539
        else:
2540
          raise errors.ParameterError(field)
2541
        iout.append(val)
2542
      output.append(iout)
2543

    
2544
    return output
2545

    
2546

    
2547
class LUFailoverInstance(LogicalUnit):
2548
  """Failover an instance.
2549

2550
  """
2551
  HPATH = "instance-failover"
2552
  HTYPE = constants.HTYPE_INSTANCE
2553
  _OP_REQP = ["instance_name", "ignore_consistency"]
2554

    
2555
  def BuildHooksEnv(self):
2556
    """Build hooks env.
2557

2558
    This runs on master, primary and secondary nodes of the instance.
2559

2560
    """
2561
    env = {
2562
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2563
      }
2564
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2565
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2566
    return env, nl, nl
2567

    
2568
  def CheckPrereq(self):
2569
    """Check prerequisites.
2570

2571
    This checks that the instance is in the cluster.
2572

2573
    """
2574
    instance = self.cfg.GetInstanceInfo(
2575
      self.cfg.ExpandInstanceName(self.op.instance_name))
2576
    if instance is None:
2577
      raise errors.OpPrereqError("Instance '%s' not known" %
2578
                                 self.op.instance_name)
2579

    
2580
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2581
      raise errors.OpPrereqError("Instance's disk layout is not"
2582
                                 " network mirrored, cannot failover.")
2583

    
2584
    secondary_nodes = instance.secondary_nodes
2585
    if not secondary_nodes:
2586
      raise errors.ProgrammerError("no secondary node but using "
2587
                                   "DT_REMOTE_RAID1 template")
2588

    
2589
    target_node = secondary_nodes[0]
2590
    # check memory requirements on the secondary node
2591
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2592
                         instance.name, instance.memory)
2593

    
2594
    # check bridge existance
2595
    brlist = [nic.bridge for nic in instance.nics]
2596
    if not rpc.call_bridges_exist(target_node, brlist):
2597
      raise errors.OpPrereqError("One or more target bridges %s does not"
2598
                                 " exist on destination node '%s'" %
2599
                                 (brlist, target_node))
2600

    
2601
    self.instance = instance
2602

    
2603
  def Exec(self, feedback_fn):
2604
    """Failover an instance.
2605

2606
    The failover is done by shutting it down on its present node and
2607
    starting it on the secondary.
2608

2609
    """
2610
    instance = self.instance
2611

    
2612
    source_node = instance.primary_node
2613
    target_node = instance.secondary_nodes[0]
2614

    
2615
    feedback_fn("* checking disk consistency between source and target")
2616
    for dev in instance.disks:
2617
      # for remote_raid1, these are md over drbd
2618
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2619
        if not self.op.ignore_consistency:
2620
          raise errors.OpExecError("Disk %s is degraded on target node,"
2621
                                   " aborting failover." % dev.iv_name)
2622

    
2623
    feedback_fn("* shutting down instance on source node")
2624
    logger.Info("Shutting down instance %s on node %s" %
2625
                (instance.name, source_node))
2626

    
2627
    if not rpc.call_instance_shutdown(source_node, instance):
2628
      if self.op.ignore_consistency:
2629
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2630
                     " anyway. Please make sure node %s is down"  %
2631
                     (instance.name, source_node, source_node))
2632
      else:
2633
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2634
                                 (instance.name, source_node))
2635

    
2636
    feedback_fn("* deactivating the instance's disks on source node")
2637
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2638
      raise errors.OpExecError("Can't shut down the instance's disks.")
2639

    
2640
    instance.primary_node = target_node
2641
    # distribute new instance config to the other nodes
2642
    self.cfg.AddInstance(instance)
2643

    
2644
    feedback_fn("* activating the instance's disks on target node")
2645
    logger.Info("Starting instance %s on node %s" %
2646
                (instance.name, target_node))
2647

    
2648
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2649
                                             ignore_secondaries=True)
2650
    if not disks_ok:
2651
      _ShutdownInstanceDisks(instance, self.cfg)
2652
      raise errors.OpExecError("Can't activate the instance's disks")
2653

    
2654
    feedback_fn("* starting the instance on the target node")
2655
    if not rpc.call_instance_start(target_node, instance, None):
2656
      _ShutdownInstanceDisks(instance, self.cfg)
2657
      raise errors.OpExecError("Could not start instance %s on node %s." %
2658
                               (instance.name, target_node))
2659

    
2660

    
2661
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2662
  """Create a tree of block devices on the primary node.
2663

2664
  This always creates all devices.
2665

2666
  """
2667
  if device.children:
2668
    for child in device.children:
2669
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2670
        return False
2671

    
2672
  cfg.SetDiskID(device, node)
2673
  new_id = rpc.call_blockdev_create(node, device, device.size,
2674
                                    instance.name, True, info)
2675
  if not new_id:
2676
    return False
2677
  if device.physical_id is None:
2678
    device.physical_id = new_id
2679
  return True
2680

    
2681

    
2682
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2683
  """Create a tree of block devices on a secondary node.
2684

2685
  If this device type has to be created on secondaries, create it and
2686
  all its children.
2687

2688
  If not, just recurse to children keeping the same 'force' value.
2689

2690
  """
2691
  if device.CreateOnSecondary():
2692
    force = True
2693
  if device.children:
2694
    for child in device.children:
2695
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2696
                                        child, force, info):
2697
        return False
2698

    
2699
  if not force:
2700
    return True
2701
  cfg.SetDiskID(device, node)
2702
  new_id = rpc.call_blockdev_create(node, device, device.size,
2703
                                    instance.name, False, info)
2704
  if not new_id:
2705
    return False
2706
  if device.physical_id is None:
2707
    device.physical_id = new_id
2708
  return True
2709

    
2710

    
2711
def _GenerateUniqueNames(cfg, exts):
2712
  """Generate a suitable LV name.
2713

2714
  This will generate a logical volume name for the given instance.
2715

2716
  """
2717
  results = []
2718
  for val in exts:
2719
    new_id = cfg.GenerateUniqueID()
2720
    results.append("%s%s" % (new_id, val))
2721
  return results
2722

    
2723

    
2724
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2725
  """Generate a drbd device complete with its children.
2726

2727
  """
2728
  port = cfg.AllocatePort()
2729
  vgname = cfg.GetVGName()
2730
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2731
                          logical_id=(vgname, names[0]))
2732
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2733
                          logical_id=(vgname, names[1]))
2734
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2735
                          logical_id = (primary, secondary, port),
2736
                          children = [dev_data, dev_meta])
2737
  return drbd_dev
2738

    
2739

    
2740
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2741
  """Generate a drbd8 device complete with its children.
2742

2743
  """
2744
  port = cfg.AllocatePort()
2745
  vgname = cfg.GetVGName()
2746
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2747
                          logical_id=(vgname, names[0]))
2748
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2749
                          logical_id=(vgname, names[1]))
2750
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2751
                          logical_id = (primary, secondary, port),
2752
                          children = [dev_data, dev_meta],
2753
                          iv_name=iv_name)
2754
  return drbd_dev
2755

    
2756

    
2757
def _GenerateDiskTemplate(cfg, template_name,
2758
                          instance_name, primary_node,
2759
                          secondary_nodes, disk_sz, swap_sz):
2760
  """Generate the entire disk layout for a given template type.
2761

2762
  """
2763
  #TODO: compute space requirements
2764

    
2765
  vgname = cfg.GetVGName()
2766
  if template_name == constants.DT_DISKLESS:
2767
    disks = []
2768
  elif template_name == constants.DT_PLAIN:
2769
    if len(secondary_nodes) != 0:
2770
      raise errors.ProgrammerError("Wrong template configuration")
2771

    
2772
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2773
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2774
                           logical_id=(vgname, names[0]),
2775
                           iv_name = "sda")
2776
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2777
                           logical_id=(vgname, names[1]),
2778
                           iv_name = "sdb")
2779
    disks = [sda_dev, sdb_dev]
2780
  elif template_name == constants.DT_LOCAL_RAID1:
2781
    if len(secondary_nodes) != 0:
2782
      raise errors.ProgrammerError("Wrong template configuration")
2783

    
2784

    
2785
    names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2786
                                       ".sdb_m1", ".sdb_m2"])
2787
    sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2788
                              logical_id=(vgname, names[0]))
2789
    sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2790
                              logical_id=(vgname, names[1]))
2791
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2792
                              size=disk_sz,
2793
                              children = [sda_dev_m1, sda_dev_m2])
2794
    sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2795
                              logical_id=(vgname, names[2]))
2796
    sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2797
                              logical_id=(vgname, names[3]))
2798
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2799
                              size=swap_sz,
2800
                              children = [sdb_dev_m1, sdb_dev_m2])
2801
    disks = [md_sda_dev, md_sdb_dev]
2802
  elif template_name == constants.DT_REMOTE_RAID1:
2803
    if len(secondary_nodes) != 1:
2804
      raise errors.ProgrammerError("Wrong template configuration")
2805
    remote_node = secondary_nodes[0]
2806
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2807
                                       ".sdb_data", ".sdb_meta"])
2808
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2809
                                         disk_sz, names[0:2])
2810
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2811
                              children = [drbd_sda_dev], size=disk_sz)
2812
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2813
                                         swap_sz, names[2:4])
2814
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2815
                              children = [drbd_sdb_dev], size=swap_sz)
2816
    disks = [md_sda_dev, md_sdb_dev]
2817
  elif template_name == constants.DT_DRBD8:
2818
    if len(secondary_nodes) != 1:
2819
      raise errors.ProgrammerError("Wrong template configuration")
2820
    remote_node = secondary_nodes[0]
2821
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2822
                                       ".sdb_data", ".sdb_meta"])
2823
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2824
                                         disk_sz, names[0:2], "sda")
2825
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2826
                                         swap_sz, names[2:4], "sdb")
2827
    disks = [drbd_sda_dev, drbd_sdb_dev]
2828
  else:
2829
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2830
  return disks
2831

    
2832

    
2833
def _GetInstanceInfoText(instance):
2834
  """Compute that text that should be added to the disk's metadata.
2835

2836
  """
2837
  return "originstname+%s" % instance.name
2838

    
2839

    
2840
def _CreateDisks(cfg, instance):
2841
  """Create all disks for an instance.
2842

2843
  This abstracts away some work from AddInstance.
2844

2845
  Args:
2846
    instance: the instance object
2847

2848
  Returns:
2849
    True or False showing the success of the creation process
2850

2851
  """
2852
  info = _GetInstanceInfoText(instance)
2853

    
2854
  for device in instance.disks:
2855
    logger.Info("creating volume %s for instance %s" %
2856
              (device.iv_name, instance.name))
2857
    #HARDCODE
2858
    for secondary_node in instance.secondary_nodes:
2859
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2860
                                        device, False, info):
2861
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2862
                     (device.iv_name, device, secondary_node))
2863
        return False
2864
    #HARDCODE
2865
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2866
                                    instance, device, info):
2867
      logger.Error("failed to create volume %s on primary!" %
2868
                   device.iv_name)
2869
      return False
2870
  return True
2871

    
2872

    
2873
def _RemoveDisks(instance, cfg):
2874
  """Remove all disks for an instance.
2875

2876
  This abstracts away some work from `AddInstance()` and
2877
  `RemoveInstance()`. Note that in case some of the devices couldn't
2878
  be removed, the removal will continue with the other ones (compare
2879
  with `_CreateDisks()`).
2880

2881
  Args:
2882
    instance: the instance object
2883

2884
  Returns:
2885
    True or False showing the success of the removal proces
2886

2887
  """
2888
  logger.Info("removing block devices for instance %s" % instance.name)
2889

    
2890
  result = True
2891
  for device in instance.disks:
2892
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2893
      cfg.SetDiskID(disk, node)
2894
      if not rpc.call_blockdev_remove(node, disk):
2895
        logger.Error("could not remove block device %s on node %s,"
2896
                     " continuing anyway" %
2897
                     (device.iv_name, node))
2898
        result = False
2899
  return result
2900

    
2901

    
2902
class LUCreateInstance(LogicalUnit):
2903
  """Create an instance.
2904

2905
  """
2906
  HPATH = "instance-add"
2907
  HTYPE = constants.HTYPE_INSTANCE
2908
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2909
              "disk_template", "swap_size", "mode", "start", "vcpus",
2910
              "wait_for_sync", "ip_check", "mac"]
2911

    
2912
  def BuildHooksEnv(self):
2913
    """Build hooks env.
2914

2915
    This runs on master, primary and secondary nodes of the instance.
2916

2917
    """
2918
    env = {
2919
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2920
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2921
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2922
      "INSTANCE_ADD_MODE": self.op.mode,
2923
      }
2924
    if self.op.mode == constants.INSTANCE_IMPORT:
2925
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2926
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2927
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2928

    
2929
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2930
      primary_node=self.op.pnode,
2931
      secondary_nodes=self.secondaries,
2932
      status=self.instance_status,
2933
      os_type=self.op.os_type,
2934
      memory=self.op.mem_size,
2935
      vcpus=self.op.vcpus,
2936
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2937
    ))
2938

    
2939
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2940
          self.secondaries)
2941
    return env, nl, nl
2942

    
2943

    
2944
  def CheckPrereq(self):
2945
    """Check prerequisites.
2946

2947
    """
2948
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
2949
      if not hasattr(self.op, attr):
2950
        setattr(self.op, attr, None)
2951

    
2952
    if self.op.mode not in (constants.INSTANCE_CREATE,
2953
                            constants.INSTANCE_IMPORT):
2954
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2955
                                 self.op.mode)
2956

    
2957
    if self.op.mode == constants.INSTANCE_IMPORT:
2958
      src_node = getattr(self.op, "src_node", None)
2959
      src_path = getattr(self.op, "src_path", None)
2960
      if src_node is None or src_path is None:
2961
        raise errors.OpPrereqError("Importing an instance requires source"
2962
                                   " node and path options")
2963
      src_node_full = self.cfg.ExpandNodeName(src_node)
2964
      if src_node_full is None:
2965
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2966
      self.op.src_node = src_node = src_node_full
2967

    
2968
      if not os.path.isabs(src_path):
2969
        raise errors.OpPrereqError("The source path must be absolute")
2970

    
2971
      export_info = rpc.call_export_info(src_node, src_path)
2972

    
2973
      if not export_info:
2974
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2975

    
2976
      if not export_info.has_section(constants.INISECT_EXP):
2977
        raise errors.ProgrammerError("Corrupted export config")
2978

    
2979
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2980
      if (int(ei_version) != constants.EXPORT_VERSION):
2981
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2982
                                   (ei_version, constants.EXPORT_VERSION))
2983

    
2984
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2985
        raise errors.OpPrereqError("Can't import instance with more than"
2986
                                   " one data disk")
2987

    
2988
      # FIXME: are the old os-es, disk sizes, etc. useful?
2989
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2990
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2991
                                                         'disk0_dump'))
2992
      self.src_image = diskimage
2993
    else: # INSTANCE_CREATE
2994
      if getattr(self.op, "os_type", None) is None:
2995
        raise errors.OpPrereqError("No guest OS specified")
2996

    
2997
    # check primary node
2998
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2999
    if pnode is None:
3000
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3001
                                 self.op.pnode)
3002
    self.op.pnode = pnode.name
3003
    self.pnode = pnode
3004
    self.secondaries = []
3005
    # disk template and mirror node verification
3006
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3007
      raise errors.OpPrereqError("Invalid disk template name")
3008

    
3009
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3010
      if getattr(self.op, "snode", None) is None:
3011
        raise errors.OpPrereqError("The networked disk templates need"
3012
                                   " a mirror node")
3013

    
3014
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3015
      if snode_name is None:
3016
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3017
                                   self.op.snode)
3018
      elif snode_name == pnode.name:
3019
        raise errors.OpPrereqError("The secondary node cannot be"
3020
                                   " the primary node.")
3021
      self.secondaries.append(snode_name)
3022

    
3023
    # Required free disk space as a function of disk and swap space
3024
    req_size_dict = {
3025
      constants.DT_DISKLESS: None,
3026
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
3027
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
3028
      # 256 MB are added for drbd metadata, 128MB for each drbd device
3029
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
3030
      constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
3031
    }
3032

    
3033
    if self.op.disk_template not in req_size_dict:
3034
      raise errors.ProgrammerError("Disk template '%s' size requirement"
3035
                                   " is unknown" %  self.op.disk_template)
3036

    
3037
    req_size = req_size_dict[self.op.disk_template]
3038

    
3039
    # Check lv size requirements
3040
    if req_size is not None:
3041
      nodenames = [pnode.name] + self.secondaries
3042
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3043
      for node in nodenames:
3044
        info = nodeinfo.get(node, None)
3045
        if not info:
3046
          raise errors.OpPrereqError("Cannot get current information"
3047
                                     " from node '%s'" % nodeinfo)
3048
        vg_free = info.get('vg_free', None)
3049
        if not isinstance(vg_free, int):
3050
          raise errors.OpPrereqError("Can't compute free disk space on"
3051
                                     " node %s" % node)
3052
        if req_size > info['vg_free']:
3053
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3054
                                     " %d MB available, %d MB required" %
3055
                                     (node, info['vg_free'], req_size))
3056

    
3057
    # os verification
3058
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3059
    if not os_obj:
3060
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3061
                                 " primary node"  % self.op.os_type)
3062

    
3063
    if self.op.kernel_path == constants.VALUE_NONE:
3064
      raise errors.OpPrereqError("Can't set instance kernel to none")
3065

    
3066
    # instance verification
3067
    hostname1 = utils.HostInfo(self.op.instance_name)
3068

    
3069
    self.op.instance_name = instance_name = hostname1.name
3070
    instance_list = self.cfg.GetInstanceList()
3071
    if instance_name in instance_list:
3072
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3073
                                 instance_name)
3074

    
3075
    ip = getattr(self.op, "ip", None)
3076
    if ip is None or ip.lower() == "none":
3077
      inst_ip = None
3078
    elif ip.lower() == "auto":
3079
      inst_ip = hostname1.ip
3080
    else:
3081
      if not utils.IsValidIP(ip):
3082
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3083
                                   " like a valid IP" % ip)
3084
      inst_ip = ip
3085
    self.inst_ip = inst_ip
3086

    
3087
    if self.op.start and not self.op.ip_check:
3088
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3089
                                 " adding an instance in start mode")
3090

    
3091
    if self.op.ip_check:
3092
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3093
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3094
                                   (hostname1.ip, instance_name))
3095

    
3096
    # MAC address verification
3097
    if self.op.mac != "auto":
3098
      if not utils.IsValidMac(self.op.mac.lower()):
3099
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3100
                                   self.op.mac)
3101

    
3102
    # bridge verification
3103
    bridge = getattr(self.op, "bridge", None)
3104
    if bridge is None:
3105
      self.op.bridge = self.cfg.GetDefBridge()
3106
    else:
3107
      self.op.bridge = bridge
3108

    
3109
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3110
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3111
                                 " destination node '%s'" %
3112
                                 (self.op.bridge, pnode.name))
3113

    
3114
    # boot order verification
3115
    if self.op.hvm_boot_order is not None:
3116
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3117
        raise errors.OpPrereqError("invalid boot order specified,"
3118
                                   " must be one or more of [acdn]")
3119

    
3120
    if self.op.start:
3121
      self.instance_status = 'up'
3122
    else:
3123
      self.instance_status = 'down'
3124

    
3125
  def Exec(self, feedback_fn):
3126
    """Create and add the instance to the cluster.
3127

3128
    """
3129
    instance = self.op.instance_name
3130
    pnode_name = self.pnode.name
3131

    
3132
    if self.op.mac == "auto":
3133
      mac_address = self.cfg.GenerateMAC()
3134
    else:
3135
      mac_address = self.op.mac
3136

    
3137
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3138
    if self.inst_ip is not None:
3139
      nic.ip = self.inst_ip
3140

    
3141
    ht_kind = self.sstore.GetHypervisorType()
3142
    if ht_kind in constants.HTS_REQ_PORT:
3143
      network_port = self.cfg.AllocatePort()
3144
    else:
3145
      network_port = None
3146

    
3147
    disks = _GenerateDiskTemplate(self.cfg,
3148
                                  self.op.disk_template,
3149
                                  instance, pnode_name,
3150
                                  self.secondaries, self.op.disk_size,
3151
                                  self.op.swap_size)
3152

    
3153
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3154
                            primary_node=pnode_name,
3155
                            memory=self.op.mem_size,
3156
                            vcpus=self.op.vcpus,
3157
                            nics=[nic], disks=disks,
3158
                            disk_template=self.op.disk_template,
3159
                            status=self.instance_status,
3160
                            network_port=network_port,
3161
                            kernel_path=self.op.kernel_path,
3162
                            initrd_path=self.op.initrd_path,
3163
                            hvm_boot_order=self.op.hvm_boot_order,
3164
                            )
3165

    
3166
    feedback_fn("* creating instance disks...")
3167
    if not _CreateDisks(self.cfg, iobj):
3168
      _RemoveDisks(iobj, self.cfg)
3169
      raise errors.OpExecError("Device creation failed, reverting...")
3170

    
3171
    feedback_fn("adding instance %s to cluster config" % instance)
3172

    
3173
    self.cfg.AddInstance(iobj)
3174

    
3175
    if self.op.wait_for_sync:
3176
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3177
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3178
      # make sure the disks are not degraded (still sync-ing is ok)
3179
      time.sleep(15)
3180
      feedback_fn("* checking mirrors status")
3181
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3182
    else:
3183
      disk_abort = False
3184

    
3185
    if disk_abort:
3186
      _RemoveDisks(iobj, self.cfg)
3187
      self.cfg.RemoveInstance(iobj.name)
3188
      raise errors.OpExecError("There are some degraded disks for"
3189
                               " this instance")
3190

    
3191
    feedback_fn("creating os for instance %s on node %s" %
3192
                (instance, pnode_name))
3193

    
3194
    if iobj.disk_template != constants.DT_DISKLESS:
3195
      if self.op.mode == constants.INSTANCE_CREATE:
3196
        feedback_fn("* running the instance OS create scripts...")
3197
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3198
          raise errors.OpExecError("could not add os for instance %s"
3199
                                   " on node %s" %
3200
                                   (instance, pnode_name))
3201

    
3202
      elif self.op.mode == constants.INSTANCE_IMPORT:
3203
        feedback_fn("* running the instance OS import scripts...")
3204
        src_node = self.op.src_node
3205
        src_image = self.src_image
3206
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3207
                                                src_node, src_image):
3208
          raise errors.OpExecError("Could not import os for instance"
3209
                                   " %s on node %s" %
3210
                                   (instance, pnode_name))
3211
      else:
3212
        # also checked in the prereq part
3213
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3214
                                     % self.op.mode)
3215

    
3216
    if self.op.start:
3217
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3218
      feedback_fn("* starting instance...")
3219
      if not rpc.call_instance_start(pnode_name, iobj, None):
3220
        raise errors.OpExecError("Could not start instance")
3221

    
3222

    
3223
class LUConnectConsole(NoHooksLU):
3224
  """Connect to an instance's console.
3225

3226
  This is somewhat special in that it returns the command line that
3227
  you need to run on the master node in order to connect to the
3228
  console.
3229

3230
  """
3231
  _OP_REQP = ["instance_name"]
3232

    
3233
  def CheckPrereq(self):
3234
    """Check prerequisites.
3235

3236
    This checks that the instance is in the cluster.
3237

3238
    """
3239
    instance = self.cfg.GetInstanceInfo(
3240
      self.cfg.ExpandInstanceName(self.op.instance_name))
3241
    if instance is None:
3242
      raise errors.OpPrereqError("Instance '%s' not known" %
3243
                                 self.op.instance_name)
3244
    self.instance = instance
3245

    
3246
  def Exec(self, feedback_fn):
3247
    """Connect to the console of an instance
3248

3249
    """
3250
    instance = self.instance
3251
    node = instance.primary_node
3252

    
3253
    node_insts = rpc.call_instance_list([node])[node]
3254
    if node_insts is False:
3255
      raise errors.OpExecError("Can't connect to node %s." % node)
3256

    
3257
    if instance.name not in node_insts:
3258
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3259

    
3260
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3261

    
3262
    hyper = hypervisor.GetHypervisor()
3263
    console_cmd = hyper.GetShellCommandForConsole(instance)
3264

    
3265
    # build ssh cmdline
3266
    cmd = self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3267
    return cmd[0], cmd
3268

    
3269

    
3270
class LUAddMDDRBDComponent(LogicalUnit):
3271
  """Adda new mirror member to an instance's disk.
3272

3273
  """
3274
  HPATH = "mirror-add"
3275
  HTYPE = constants.HTYPE_INSTANCE
3276
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3277

    
3278
  def BuildHooksEnv(self):
3279
    """Build hooks env.
3280

3281
    This runs on the master, the primary and all the secondaries.
3282

3283
    """
3284
    env = {
3285
      "NEW_SECONDARY": self.op.remote_node,
3286
      "DISK_NAME": self.op.disk_name,
3287
      }
3288
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3289
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3290
          self.op.remote_node,] + list(self.instance.secondary_nodes)
3291
    return env, nl, nl
3292

    
3293
  def CheckPrereq(self):
3294
    """Check prerequisites.
3295

3296
    This checks that the instance is in the cluster.
3297

3298
    """
3299
    instance = self.cfg.GetInstanceInfo(
3300
      self.cfg.ExpandInstanceName(self.op.instance_name))
3301
    if instance is None:
3302
      raise errors.OpPrereqError("Instance '%s' not known" %
3303
                                 self.op.instance_name)
3304
    self.instance = instance
3305

    
3306
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3307
    if remote_node is None:
3308
      raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3309
    self.remote_node = remote_node
3310

    
3311
    if remote_node == instance.primary_node:
3312
      raise errors.OpPrereqError("The specified node is the primary node of"
3313
                                 " the instance.")
3314

    
3315
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3316
      raise errors.OpPrereqError("Instance's disk layout is not"
3317
                                 " remote_raid1.")
3318
    for disk in instance.disks:
3319
      if disk.iv_name == self.op.disk_name:
3320
        break
3321
    else:
3322
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3323
                                 " instance." % self.op.disk_name)
3324
    if len(disk.children) > 1:
3325
      raise errors.OpPrereqError("The device already has two slave devices."
3326
                                 " This would create a 3-disk raid1 which we"
3327
                                 " don't allow.")
3328
    self.disk = disk
3329

    
3330
  def Exec(self, feedback_fn):
3331
    """Add the mirror component
3332

3333
    """
3334
    disk = self.disk
3335
    instance = self.instance
3336

    
3337
    remote_node = self.remote_node
3338
    lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3339
    names = _GenerateUniqueNames(self.cfg, lv_names)
3340
    new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3341
                                     remote_node, disk.size, names)
3342

    
3343
    logger.Info("adding new mirror component on secondary")
3344
    #HARDCODE
3345
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3346
                                      new_drbd, False,
3347
                                      _GetInstanceInfoText(instance)):
3348
      raise errors.OpExecError("Failed to create new component on secondary"
3349
                               " node %s" % remote_node)
3350

    
3351
    logger.Info("adding new mirror component on primary")
3352
    #HARDCODE
3353
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3354
                                    instance, new_drbd,
3355
                                    _GetInstanceInfoText(instance)):
3356
      # remove secondary dev
3357
      self.cfg.SetDiskID(new_drbd, remote_node)
3358
      rpc.call_blockdev_remove(remote_node, new_drbd)
3359
      raise errors.OpExecError("Failed to create volume on primary")
3360

    
3361
    # the device exists now
3362
    # call the primary node to add the mirror to md
3363
    logger.Info("adding new mirror component to md")
3364
    if not rpc.call_blockdev_addchildren(instance.primary_node,
3365
                                         disk, [new_drbd]):
3366
      logger.Error("Can't add mirror compoment to md!")
3367
      self.cfg.SetDiskID(new_drbd, remote_node)
3368
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
3369
        logger.Error("Can't rollback on secondary")
3370
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
3371
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3372
        logger.Error("Can't rollback on primary")
3373
      raise errors.OpExecError("Can't add mirror component to md array")
3374

    
3375
    disk.children.append(new_drbd)
3376

    
3377
    self.cfg.AddInstance(instance)
3378

    
3379
    _WaitForSync(self.cfg, instance, self.proc)
3380

    
3381
    return 0
3382

    
3383

    
3384
class LURemoveMDDRBDComponent(LogicalUnit):
3385
  """Remove a component from a remote_raid1 disk.
3386

3387
  """
3388
  HPATH = "mirror-remove"
3389
  HTYPE = constants.HTYPE_INSTANCE
3390
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3391

    
3392
  def BuildHooksEnv(self):
3393
    """Build hooks env.
3394

3395
    This runs on the master, the primary and all the secondaries.
3396

3397
    """
3398
    env = {
3399
      "DISK_NAME": self.op.disk_name,
3400
      "DISK_ID": self.op.disk_id,
3401
      "OLD_SECONDARY": self.old_secondary,
3402
      }
3403
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3404
    nl = [self.sstore.GetMasterNode(),
3405
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3406
    return env, nl, nl
3407

    
3408
  def CheckPrereq(self):
3409
    """Check prerequisites.
3410

3411
    This checks that the instance is in the cluster.
3412

3413
    """
3414
    instance = self.cfg.GetInstanceInfo(
3415
      self.cfg.ExpandInstanceName(self.op.instance_name))
3416
    if instance is None:
3417
      raise errors.OpPrereqError("Instance '%s' not known" %
3418
                                 self.op.instance_name)
3419
    self.instance = instance
3420

    
3421
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3422
      raise errors.OpPrereqError("Instance's disk layout is not"
3423
                                 " remote_raid1.")
3424
    for disk in instance.disks:
3425
      if disk.iv_name == self.op.disk_name:
3426
        break
3427
    else:
3428
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3429
                                 " instance." % self.op.disk_name)
3430
    for child in disk.children:
3431
      if (child.dev_type == constants.LD_DRBD7 and
3432
          child.logical_id[2] == self.op.disk_id):
3433
        break
3434
    else:
3435
      raise errors.OpPrereqError("Can't find the device with this port.")
3436

    
3437
    if len(disk.children) < 2:
3438
      raise errors.OpPrereqError("Cannot remove the last component from"
3439
                                 " a mirror.")
3440
    self.disk = disk
3441
    self.child = child
3442
    if self.child.logical_id[0] == instance.primary_node:
3443
      oid = 1
3444
    else:
3445
      oid = 0
3446
    self.old_secondary = self.child.logical_id[oid]
3447

    
3448
  def Exec(self, feedback_fn):
3449
    """Remove the mirror component
3450

3451
    """
3452
    instance = self.instance
3453
    disk = self.disk
3454
    child = self.child
3455
    logger.Info("remove mirror component")
3456
    self.cfg.SetDiskID(disk, instance.primary_node)
3457
    if not rpc.call_blockdev_removechildren(instance.primary_node,
3458
                                            disk, [child]):
3459
      raise errors.OpExecError("Can't remove child from mirror.")
3460

    
3461
    for node in child.logical_id[:2]:
3462
      self.cfg.SetDiskID(child, node)
3463
      if not rpc.call_blockdev_remove(node, child):
3464
        logger.Error("Warning: failed to remove device from node %s,"
3465
                     " continuing operation." % node)
3466

    
3467
    disk.children.remove(child)
3468
    self.cfg.AddInstance(instance)
3469

    
3470

    
3471
class LUReplaceDisks(LogicalUnit):
3472
  """Replace the disks of an instance.
3473

3474
  """
3475
  HPATH = "mirrors-replace"
3476
  HTYPE = constants.HTYPE_INSTANCE
3477
  _OP_REQP = ["instance_name", "mode", "disks"]
3478

    
3479
  def BuildHooksEnv(self):
3480
    """Build hooks env.
3481

3482
    This runs on the master, the primary and all the secondaries.
3483

3484
    """
3485
    env = {
3486
      "MODE": self.op.mode,
3487
      "NEW_SECONDARY": self.op.remote_node,
3488
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3489
      }
3490
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3491
    nl = [
3492
      self.sstore.GetMasterNode(),
3493
      self.instance.primary_node,
3494
      ]
3495
    if self.op.remote_node is not None:
3496
      nl.append(self.op.remote_node)
3497
    return env, nl, nl
3498

    
3499
  def CheckPrereq(self):
3500
    """Check prerequisites.
3501

3502
    This checks that the instance is in the cluster.
3503

3504
    """
3505
    instance = self.cfg.GetInstanceInfo(
3506
      self.cfg.ExpandInstanceName(self.op.instance_name))
3507
    if instance is None:
3508
      raise errors.OpPrereqError("Instance '%s' not known" %
3509
                                 self.op.instance_name)
3510
    self.instance = instance
3511
    self.op.instance_name = instance.name
3512

    
3513
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3514
      raise errors.OpPrereqError("Instance's disk layout is not"
3515
                                 " network mirrored.")
3516

    
3517
    if len(instance.secondary_nodes) != 1:
3518
      raise errors.OpPrereqError("The instance has a strange layout,"
3519
                                 " expected one secondary but found %d" %
3520
                                 len(instance.secondary_nodes))
3521

    
3522
    self.sec_node = instance.secondary_nodes[0]
3523

    
3524
    remote_node = getattr(self.op, "remote_node", None)
3525
    if remote_node is not None:
3526
      remote_node = self.cfg.ExpandNodeName(remote_node)
3527
      if remote_node is None:
3528
        raise errors.OpPrereqError("Node '%s' not known" %
3529
                                   self.op.remote_node)
3530
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3531
    else:
3532
      self.remote_node_info = None
3533
    if remote_node == instance.primary_node:
3534
      raise errors.OpPrereqError("The specified node is the primary node of"
3535
                                 " the instance.")
3536
    elif remote_node == self.sec_node:
3537
      if self.op.mode == constants.REPLACE_DISK_SEC:
3538
        # this is for DRBD8, where we can't execute the same mode of
3539
        # replacement as for drbd7 (no different port allocated)
3540
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3541
                                   " replacement")
3542
      # the user gave the current secondary, switch to
3543
      # 'no-replace-secondary' mode for drbd7
3544
      remote_node = None
3545
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3546
        self.op.mode != constants.REPLACE_DISK_ALL):
3547
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3548
                                 " disks replacement, not individual ones")
3549
    if instance.disk_template == constants.DT_DRBD8:
3550
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3551
          remote_node is not None):
3552
        # switch to replace secondary mode
3553
        self.op.mode = constants.REPLACE_DISK_SEC
3554

    
3555
      if self.op.mode == constants.REPLACE_DISK_ALL:
3556
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3557
                                   " secondary disk replacement, not"
3558
                                   " both at once")
3559
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3560
        if remote_node is not None:
3561
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3562
                                     " the secondary while doing a primary"
3563
                                     " node disk replacement")
3564
        self.tgt_node = instance.primary_node
3565
        self.oth_node = instance.secondary_nodes[0]
3566
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3567
        self.new_node = remote_node # this can be None, in which case
3568
                                    # we don't change the secondary
3569
        self.tgt_node = instance.secondary_nodes[0]
3570
        self.oth_node = instance.primary_node
3571
      else:
3572
        raise errors.ProgrammerError("Unhandled disk replace mode")
3573

    
3574
    for name in self.op.disks:
3575
      if instance.FindDisk(name) is None:
3576
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3577
                                   (name, instance.name))
3578
    self.op.remote_node = remote_node
3579

    
3580
  def _ExecRR1(self, feedback_fn):
3581
    """Replace the disks of an instance.
3582

3583
    """
3584
    instance = self.instance
3585
    iv_names = {}
3586
    # start of work
3587
    if self.op.remote_node is None:
3588
      remote_node = self.sec_node
3589
    else:
3590
      remote_node = self.op.remote_node
3591
    cfg = self.cfg
3592
    for dev in instance.disks:
3593
      size = dev.size
3594
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3595
      names = _GenerateUniqueNames(cfg, lv_names)
3596
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3597
                                       remote_node, size, names)
3598
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3599
      logger.Info("adding new mirror component on secondary for %s" %
3600
                  dev.iv_name)
3601
      #HARDCODE
3602
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3603
                                        new_drbd, False,
3604
                                        _GetInstanceInfoText(instance)):
3605
        raise errors.OpExecError("Failed to create new component on secondary"
3606
                                 " node %s. Full abort, cleanup manually!" %
3607
                                 remote_node)
3608

    
3609
      logger.Info("adding new mirror component on primary")
3610
      #HARDCODE
3611
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3612
                                      instance, new_drbd,
3613
                                      _GetInstanceInfoText(instance)):
3614
        # remove secondary dev
3615
        cfg.SetDiskID(new_drbd, remote_node)
3616
        rpc.call_blockdev_remove(remote_node, new_drbd)
3617
        raise errors.OpExecError("Failed to create volume on primary!"
3618
                                 " Full abort, cleanup manually!!")
3619

    
3620
      # the device exists now
3621
      # call the primary node to add the mirror to md
3622
      logger.Info("adding new mirror component to md")
3623
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3624
                                           [new_drbd]):
3625
        logger.Error("Can't add mirror compoment to md!")
3626
        cfg.SetDiskID(new_drbd, remote_node)
3627
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3628
          logger.Error("Can't rollback on secondary")
3629
        cfg.SetDiskID(new_drbd, instance.primary_node)
3630
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3631
          logger.Error("Can't rollback on primary")
3632
        raise errors.OpExecError("Full abort, cleanup manually!!")
3633

    
3634
      dev.children.append(new_drbd)
3635
      cfg.AddInstance(instance)
3636

    
3637
    # this can fail as the old devices are degraded and _WaitForSync
3638
    # does a combined result over all disks, so we don't check its
3639
    # return value
3640
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3641

    
3642
    # so check manually all the devices
3643
    for name in iv_names:
3644
      dev, child, new_drbd = iv_names[name]
3645
      cfg.SetDiskID(dev, instance.primary_node)
3646
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3647
      if is_degr:
3648
        raise errors.OpExecError("MD device %s is degraded!" % name)
3649
      cfg.SetDiskID(new_drbd, instance.primary_node)
3650
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3651
      if is_degr:
3652
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3653

    
3654
    for name in iv_names:
3655
      dev, child, new_drbd = iv_names[name]
3656
      logger.Info("remove mirror %s component" % name)
3657
      cfg.SetDiskID(dev, instance.primary_node)
3658
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3659
                                              dev, [child]):
3660
        logger.Error("Can't remove child from mirror, aborting"
3661
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3662
        continue
3663

    
3664
      for node in child.logical_id[:2]:
3665
        logger.Info("remove child device on %s" % node)
3666
        cfg.SetDiskID(child, node)
3667
        if not rpc.call_blockdev_remove(node, child):
3668
          logger.Error("Warning: failed to remove device from node %s,"
3669
                       " continuing operation." % node)
3670

    
3671
      dev.children.remove(child)
3672

    
3673
      cfg.AddInstance(instance)
3674

    
3675
  def _ExecD8DiskOnly(self, feedback_fn):
3676
    """Replace a disk on the primary or secondary for dbrd8.
3677

3678
    The algorithm for replace is quite complicated:
3679
      - for each disk to be replaced:
3680
        - create new LVs on the target node with unique names
3681
        - detach old LVs from the drbd device
3682
        - rename old LVs to name_replaced.<time_t>
3683
        - rename new LVs to old LVs
3684
        - attach the new LVs (with the old names now) to the drbd device
3685
      - wait for sync across all devices
3686
      - for each modified disk:
3687
        - remove old LVs (which have the name name_replaces.<time_t>)
3688

3689
    Failures are not very well handled.
3690

3691
    """
3692
    steps_total = 6
3693
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3694
    instance = self.instance
3695
    iv_names = {}
3696
    vgname = self.cfg.GetVGName()
3697
    # start of work
3698
    cfg = self.cfg
3699
    tgt_node = self.tgt_node
3700
    oth_node = self.oth_node
3701

    
3702
    # Step: check device activation
3703
    self.proc.LogStep(1, steps_total, "check device existence")
3704
    info("checking volume groups")
3705
    my_vg = cfg.GetVGName()
3706
    results = rpc.call_vg_list([oth_node, tgt_node])
3707
    if not results:
3708
      raise errors.OpExecError("Can't list volume groups on the nodes")
3709
    for node in oth_node, tgt_node:
3710
      res = results.get(node, False)
3711
      if not res or my_vg not in res:
3712
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3713
                                 (my_vg, node))
3714
    for dev in instance.disks:
3715
      if not dev.iv_name in self.op.disks:
3716
        continue
3717
      for node in tgt_node, oth_node:
3718
        info("checking %s on %s" % (dev.iv_name, node))
3719
        cfg.SetDiskID(dev, node)
3720
        if not rpc.call_blockdev_find(node, dev):
3721
          raise errors.OpExecError("Can't find device %s on node %s" %
3722
                                   (dev.iv_name, node))
3723

    
3724
    # Step: check other node consistency
3725
    self.proc.LogStep(2, steps_total, "check peer consistency")
3726
    for dev in instance.disks:
3727
      if not dev.iv_name in self.op.disks:
3728
        continue
3729
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3730
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3731
                                   oth_node==instance.primary_node):
3732
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3733
                                 " to replace disks on this node (%s)" %
3734
                                 (oth_node, tgt_node))
3735

    
3736
    # Step: create new storage
3737
    self.proc.LogStep(3, steps_total, "allocate new storage")
3738
    for dev in instance.disks:
3739
      if not dev.iv_name in self.op.disks:
3740
        continue
3741
      size = dev.size
3742
      cfg.SetDiskID(dev, tgt_node)
3743
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3744
      names = _GenerateUniqueNames(cfg, lv_names)
3745
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3746
                             logical_id=(vgname, names[0]))
3747
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3748
                             logical_id=(vgname, names[1]))
3749
      new_lvs = [lv_data, lv_meta]
3750
      old_lvs = dev.children
3751
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3752
      info("creating new local storage on %s for %s" %
3753
           (tgt_node, dev.iv_name))
3754
      # since we *always* want to create this LV, we use the
3755
      # _Create...OnPrimary (which forces the creation), even if we
3756
      # are talking about the secondary node
3757
      for new_lv in new_lvs:
3758
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3759
                                        _GetInstanceInfoText(instance)):
3760
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3761
                                   " node '%s'" %
3762
                                   (new_lv.logical_id[1], tgt_node))
3763

    
3764
    # Step: for each lv, detach+rename*2+attach
3765
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3766
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3767
      info("detaching %s drbd from local storage" % dev.iv_name)
3768
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3769
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3770
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3771
      #dev.children = []
3772
      #cfg.Update(instance)
3773

    
3774
      # ok, we created the new LVs, so now we know we have the needed
3775
      # storage; as such, we proceed on the target node to rename
3776
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3777
      # using the assumption that logical_id == physical_id (which in
3778
      # turn is the unique_id on that node)
3779

    
3780
      # FIXME(iustin): use a better name for the replaced LVs
3781
      temp_suffix = int(time.time())
3782
      ren_fn = lambda d, suff: (d.physical_id[0],
3783
                                d.physical_id[1] + "_replaced-%s" % suff)
3784
      # build the rename list based on what LVs exist on the node
3785
      rlist = []
3786
      for to_ren in old_lvs:
3787
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3788
        if find_res is not None: # device exists
3789
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3790

    
3791
      info("renaming the old LVs on the target node")
3792
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3793
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3794
      # now we rename the new LVs to the old LVs
3795
      info("renaming the new LVs on the target node")
3796
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3797
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3798
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3799

    
3800
      for old, new in zip(old_lvs, new_lvs):
3801
        new.logical_id = old.logical_id
3802
        cfg.SetDiskID(new, tgt_node)
3803

    
3804
      for disk in old_lvs:
3805
        disk.logical_id = ren_fn(disk, temp_suffix)
3806
        cfg.SetDiskID(disk, tgt_node)
3807

    
3808
      # now that the new lvs have the old name, we can add them to the device
3809
      info("adding new mirror component on %s" % tgt_node)
3810
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3811
        for new_lv in new_lvs:
3812
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3813
            warning("Can't rollback device %s", hint="manually cleanup unused"
3814
                    " logical volumes")
3815
        raise errors.OpExecError("Can't add local storage to drbd")
3816

    
3817
      dev.children = new_lvs
3818
      cfg.Update(instance)
3819

    
3820
    # Step: wait for sync
3821

    
3822
    # this can fail as the old devices are degraded and _WaitForSync
3823
    # does a combined result over all disks, so we don't check its
3824
    # return value
3825
    self.proc.LogStep(5, steps_total, "sync devices")
3826
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3827

    
3828
    # so check manually all the devices
3829
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3830
      cfg.SetDiskID(dev, instance.primary_node)
3831
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3832
      if is_degr:
3833
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3834

    
3835
    # Step: remove old storage
3836
    self.proc.LogStep(6, steps_total, "removing old storage")
3837
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3838
      info("remove logical volumes for %s" % name)
3839
      for lv in old_lvs:
3840
        cfg.SetDiskID(lv, tgt_node)
3841
        if not rpc.call_blockdev_remove(tgt_node, lv):
3842
          warning("Can't remove old LV", hint="manually remove unused LVs")
3843
          continue
3844

    
3845
  def _ExecD8Secondary(self, feedback_fn):
3846
    """Replace the secondary node for drbd8.
3847

3848
    The algorithm for replace is quite complicated:
3849
      - for all disks of the instance:
3850
        - create new LVs on the new node with same names
3851
        - shutdown the drbd device on the old secondary
3852
        - disconnect the drbd network on the primary
3853
        - create the drbd device on the new secondary
3854
        - network attach the drbd on the primary, using an artifice:
3855
          the drbd code for Attach() will connect to the network if it
3856
          finds a device which is connected to the good local disks but
3857
          not network enabled
3858
      - wait for sync across all devices
3859
      - remove all disks from the old secondary
3860

3861
    Failures are not very well handled.
3862

3863
    """
3864
    steps_total = 6
3865
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3866
    instance = self.instance
3867
    iv_names = {}
3868
    vgname = self.cfg.GetVGName()
3869
    # start of work
3870
    cfg = self.cfg
3871
    old_node = self.tgt_node
3872
    new_node = self.new_node
3873
    pri_node = instance.primary_node
3874

    
3875
    # Step: check device activation
3876
    self.proc.LogStep(1, steps_total, "check device existence")
3877
    info("checking volume groups")
3878
    my_vg = cfg.GetVGName()
3879
    results = rpc.call_vg_list([pri_node, new_node])
3880
    if not results:
3881
      raise errors.OpExecError("Can't list volume groups on the nodes")
3882
    for node in pri_node, new_node:
3883
      res = results.get(node, False)
3884
      if not res or my_vg not in res:
3885
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3886
                                 (my_vg, node))
3887
    for dev in instance.disks:
3888
      if not dev.iv_name in self.op.disks:
3889
        continue
3890
      info("checking %s on %s" % (dev.iv_name, pri_node))
3891
      cfg.SetDiskID(dev, pri_node)
3892
      if not rpc.call_blockdev_find(pri_node, dev):
3893
        raise errors.OpExecError("Can't find device %s on node %s" %
3894
                                 (dev.iv_name, pri_node))
3895

    
3896
    # Step: check other node consistency
3897
    self.proc.LogStep(2, steps_total, "check peer consistency")
3898
    for dev in instance.disks:
3899
      if not dev.iv_name in self.op.disks:
3900
        continue
3901
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3902
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3903
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3904
                                 " unsafe to replace the secondary" %
3905
                                 pri_node)
3906

    
3907
    # Step: create new storage
3908
    self.proc.LogStep(3, steps_total, "allocate new storage")
3909
    for dev in instance.disks:
3910
      size = dev.size
3911
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3912
      # since we *always* want to create this LV, we use the
3913
      # _Create...OnPrimary (which forces the creation), even if we
3914
      # are talking about the secondary node
3915
      for new_lv in dev.children:
3916
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3917
                                        _GetInstanceInfoText(instance)):
3918
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3919
                                   " node '%s'" %
3920
                                   (new_lv.logical_id[1], new_node))
3921

    
3922
      iv_names[dev.iv_name] = (dev, dev.children)
3923

    
3924
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3925
    for dev in instance.disks:
3926
      size = dev.size
3927
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3928
      # create new devices on new_node
3929
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3930
                              logical_id=(pri_node, new_node,
3931
                                          dev.logical_id[2]),
3932
                              children=dev.children)
3933
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3934
                                        new_drbd, False,
3935
                                      _GetInstanceInfoText(instance)):
3936
        raise errors.OpExecError("Failed to create new DRBD on"
3937
                                 " node '%s'" % new_node)
3938

    
3939
    for dev in instance.disks:
3940
      # we have new devices, shutdown the drbd on the old secondary
3941
      info("shutting down drbd for %s on old node" % dev.iv_name)
3942
      cfg.SetDiskID(dev, old_node)
3943
      if not rpc.call_blockdev_shutdown(old_node, dev):
3944
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3945
                hint="Please cleanup this device manually as soon as possible")
3946

    
3947
    info("detaching primary drbds from the network (=> standalone)")
3948
    done = 0
3949
    for dev in instance.disks:
3950
      cfg.SetDiskID(dev, pri_node)
3951
      # set the physical (unique in bdev terms) id to None, meaning
3952
      # detach from network
3953
      dev.physical_id = (None,) * len(dev.physical_id)
3954
      # and 'find' the device, which will 'fix' it to match the
3955
      # standalone state
3956
      if rpc.call_blockdev_find(pri_node, dev):
3957
        done += 1
3958
      else:
3959
        warning("Failed to detach drbd %s from network, unusual case" %
3960
                dev.iv_name)
3961

    
3962
    if not done:
3963
      # no detaches succeeded (very unlikely)
3964
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3965

    
3966
    # if we managed to detach at least one, we update all the disks of
3967
    # the instance to point to the new secondary
3968
    info("updating instance configuration")
3969
    for dev in instance.disks:
3970
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3971
      cfg.SetDiskID(dev, pri_node)
3972
    cfg.Update(instance)
3973

    
3974
    # and now perform the drbd attach
3975
    info("attaching primary drbds to new secondary (standalone => connected)")
3976
    failures = []
3977
    for dev in instance.disks:
3978
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3979
      # since the attach is smart, it's enough to 'find' the device,
3980
      # it will automatically activate the network, if the physical_id
3981
      # is correct
3982
      cfg.SetDiskID(dev, pri_node)
3983
      if not rpc.call_blockdev_find(pri_node, dev):
3984
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3985
                "please do a gnt-instance info to see the status of disks")
3986

    
3987
    # this can fail as the old devices are degraded and _WaitForSync
3988
    # does a combined result over all disks, so we don't check its
3989
    # return value
3990
    self.proc.LogStep(5, steps_total, "sync devices")
3991
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3992

    
3993
    # so check manually all the devices
3994
    for name, (dev, old_lvs) in iv_names.iteritems():
3995
      cfg.SetDiskID(dev, pri_node)
3996
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3997
      if is_degr:
3998
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3999

    
4000
    self.proc.LogStep(6, steps_total, "removing old storage")
4001
    for name, (dev, old_lvs) in iv_names.iteritems():
4002
      info("remove logical volumes for %s" % name)
4003
      for lv in old_lvs:
4004
        cfg.SetDiskID(lv, old_node)
4005
        if not rpc.call_blockdev_remove(old_node, lv):
4006
          warning("Can't remove LV on old secondary",
4007
                  hint="Cleanup stale volumes by hand")
4008

    
4009
  def Exec(self, feedback_fn):
4010
    """Execute disk replacement.
4011

4012
    This dispatches the disk replacement to the appropriate handler.
4013

4014
    """
4015
    instance = self.instance
4016
    if instance.disk_template == constants.DT_REMOTE_RAID1:
4017
      fn = self._ExecRR1
4018
    elif instance.disk_template == constants.DT_DRBD8:
4019
      if self.op.remote_node is None:
4020
        fn = self._ExecD8DiskOnly
4021
      else:
4022
        fn = self._ExecD8Secondary
4023
    else:
4024
      raise errors.ProgrammerError("Unhandled disk replacement case")
4025
    return fn(feedback_fn)
4026

    
4027

    
4028
class LUQueryInstanceData(NoHooksLU):
4029
  """Query runtime instance data.
4030

4031
  """
4032
  _OP_REQP = ["instances"]
4033

    
4034
  def CheckPrereq(self):
4035
    """Check prerequisites.
4036

4037
    This only checks the optional instance list against the existing names.
4038

4039
    """
4040
    if not isinstance(self.op.instances, list):
4041
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4042
    if self.op.instances:
4043
      self.wanted_instances = []
4044
      names = self.op.instances
4045
      for name in names:
4046
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4047
        if instance is None:
4048
          raise errors.OpPrereqError("No such instance name '%s'" % name)
4049
        self.wanted_instances.append(instance)
4050
    else:
4051
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4052
                               in self.cfg.GetInstanceList()]
4053
    return
4054

    
4055

    
4056
  def _ComputeDiskStatus(self, instance, snode, dev):
4057
    """Compute block device status.
4058

4059
    """
4060
    self.cfg.SetDiskID(dev, instance.primary_node)
4061
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4062
    if dev.dev_type in constants.LDS_DRBD:
4063
      # we change the snode then (otherwise we use the one passed in)
4064
      if dev.logical_id[0] == instance.primary_node:
4065
        snode = dev.logical_id[1]
4066
      else:
4067
        snode = dev.logical_id[0]
4068

    
4069
    if snode:
4070
      self.cfg.SetDiskID(dev, snode)
4071
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4072
    else:
4073
      dev_sstatus = None
4074

    
4075
    if dev.children:
4076
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4077
                      for child in dev.children]
4078
    else:
4079
      dev_children = []
4080

    
4081
    data = {
4082
      "iv_name": dev.iv_name,
4083
      "dev_type": dev.dev_type,
4084
      "logical_id": dev.logical_id,
4085
      "physical_id": dev.physical_id,
4086
      "pstatus": dev_pstatus,
4087
      "sstatus": dev_sstatus,
4088
      "children": dev_children,
4089
      }
4090

    
4091
    return data
4092

    
4093
  def Exec(self, feedback_fn):
4094
    """Gather and return data"""
4095
    result = {}
4096
    for instance in self.wanted_instances:
4097
      remote_info = rpc.call_instance_info(instance.primary_node,
4098
                                                instance.name)
4099
      if remote_info and "state" in remote_info:
4100
        remote_state = "up"
4101
      else:
4102
        remote_state = "down"
4103
      if instance.status == "down":
4104
        config_state = "down"
4105
      else:
4106
        config_state = "up"
4107

    
4108
      disks = [self._ComputeDiskStatus(instance, None, device)
4109
               for device in instance.disks]
4110

    
4111
      idict = {
4112
        "name": instance.name,
4113
        "config_state": config_state,
4114
        "run_state": remote_state,
4115
        "pnode": instance.primary_node,
4116
        "snodes": instance.secondary_nodes,
4117
        "os": instance.os,
4118
        "memory": instance.memory,
4119
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4120
        "disks": disks,
4121
        "network_port": instance.network_port,
4122
        "vcpus": instance.vcpus,
4123
        "kernel_path": instance.kernel_path,
4124
        "initrd_path": instance.initrd_path,
4125
        "hvm_boot_order": instance.hvm_boot_order,
4126
        }
4127

    
4128
      result[instance.name] = idict
4129

    
4130
    return result
4131

    
4132

    
4133
class LUSetInstanceParms(LogicalUnit):
4134
  """Modifies an instances's parameters.
4135

4136
  """
4137
  HPATH = "instance-modify"
4138
  HTYPE = constants.HTYPE_INSTANCE
4139
  _OP_REQP = ["instance_name"]
4140

    
4141
  def BuildHooksEnv(self):
4142
    """Build hooks env.
4143

4144
    This runs on the master, primary and secondaries.
4145

4146
    """
4147
    args = dict()
4148
    if self.mem:
4149
      args['memory'] = self.mem
4150
    if self.vcpus:
4151
      args['vcpus'] = self.vcpus
4152
    if self.do_ip or self.do_bridge or self.mac:
4153
      if self.do_ip:
4154
        ip = self.ip
4155
      else:
4156
        ip = self.instance.nics[0].ip
4157
      if self.bridge:
4158
        bridge = self.bridge
4159
      else:
4160
        bridge = self.instance.nics[0].bridge
4161
      if self.mac:
4162
        mac = self.mac
4163
      else:
4164
        mac = self.instance.nics[0].mac
4165
      args['nics'] = [(ip, bridge, mac)]
4166
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4167
    nl = [self.sstore.GetMasterNode(),
4168
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4169
    return env, nl, nl
4170

    
4171
  def CheckPrereq(self):
4172
    """Check prerequisites.
4173

4174
    This only checks the instance list against the existing names.
4175

4176
    """
4177
    self.mem = getattr(self.op, "mem", None)
4178
    self.vcpus = getattr(self.op, "vcpus", None)
4179
    self.ip = getattr(self.op, "ip", None)
4180
    self.mac = getattr(self.op, "mac", None)
4181
    self.bridge = getattr(self.op, "bridge", None)
4182
    self.kernel_path = getattr(self.op, "kernel_path", None)
4183
    self.initrd_path = getattr(self.op, "initrd_path", None)
4184
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4185
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4186
                 self.kernel_path, self.initrd_path, self.hvm_boot_order]
4187
    if all_parms.count(None) == len(all_parms):
4188
      raise errors.OpPrereqError("No changes submitted")
4189
    if self.mem is not None:
4190
      try:
4191
        self.mem = int(self.mem)
4192
      except ValueError, err:
4193
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4194
    if self.vcpus is not None:
4195
      try:
4196
        self.vcpus = int(self.vcpus)
4197
      except ValueError, err:
4198
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4199
    if self.ip is not None:
4200
      self.do_ip = True
4201
      if self.ip.lower() == "none":
4202
        self.ip = None
4203
      else:
4204
        if not utils.IsValidIP(self.ip):
4205
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4206
    else:
4207
      self.do_ip = False
4208
    self.do_bridge = (self.bridge is not None)
4209
    if self.mac is not None:
4210
      if self.cfg.IsMacInUse(self.mac):
4211
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4212
                                   self.mac)
4213
      if not utils.IsValidMac(self.mac):
4214
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4215

    
4216
    if self.kernel_path is not None:
4217
      self.do_kernel_path = True
4218
      if self.kernel_path == constants.VALUE_NONE:
4219
        raise errors.OpPrereqError("Can't set instance to no kernel")
4220

    
4221
      if self.kernel_path != constants.VALUE_DEFAULT:
4222
        if not os.path.isabs(self.kernel_path):
4223
          raise errors.OpPrereqError("The kernel path must be an absolute"
4224
                                    " filename")
4225
    else:
4226
      self.do_kernel_path = False
4227

    
4228
    if self.initrd_path is not None:
4229
      self.do_initrd_path = True
4230
      if self.initrd_path not in (constants.VALUE_NONE,
4231
                                  constants.VALUE_DEFAULT):
4232
        if not os.path.isabs(self.initrd_path):
4233
          raise errors.OpPrereqError("The initrd path must be an absolute"
4234
                                    " filename")
4235
    else:
4236
      self.do_initrd_path = False
4237

    
4238
    # boot order verification
4239
    if self.hvm_boot_order is not None:
4240
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4241
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4242
          raise errors.OpPrereqError("invalid boot order specified,"
4243
                                     " must be one or more of [acdn]"
4244
                                     " or 'default'")
4245

    
4246
    instance = self.cfg.GetInstanceInfo(
4247
      self.cfg.ExpandInstanceName(self.op.instance_name))
4248
    if instance is None:
4249
      raise errors.OpPrereqError("No such instance name '%s'" %
4250
                                 self.op.instance_name)
4251
    self.op.instance_name = instance.name
4252
    self.instance = instance
4253
    return
4254

    
4255
  def Exec(self, feedback_fn):
4256
    """Modifies an instance.
4257

4258
    All parameters take effect only at the next restart of the instance.
4259
    """
4260
    result = []
4261
    instance = self.instance
4262
    if self.mem:
4263
      instance.memory = self.mem
4264
      result.append(("mem", self.mem))
4265
    if self.vcpus:
4266
      instance.vcpus = self.vcpus
4267
      result.append(("vcpus",  self.vcpus))
4268
    if self.do_ip:
4269
      instance.nics[0].ip = self.ip
4270
      result.append(("ip", self.ip))
4271
    if self.bridge:
4272
      instance.nics[0].bridge = self.bridge
4273
      result.append(("bridge", self.bridge))
4274
    if self.mac:
4275
      instance.nics[0].mac = self.mac
4276
      result.append(("mac", self.mac))
4277
    if self.do_kernel_path:
4278
      instance.kernel_path = self.kernel_path
4279
      result.append(("kernel_path", self.kernel_path))
4280
    if self.do_initrd_path:
4281
      instance.initrd_path = self.initrd_path
4282
      result.append(("initrd_path", self.initrd_path))
4283
    if self.hvm_boot_order:
4284
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4285
        instance.hvm_boot_order = None
4286
      else:
4287
        instance.hvm_boot_order = self.hvm_boot_order
4288
      result.append(("hvm_boot_order", self.hvm_boot_order))
4289

    
4290
    self.cfg.AddInstance(instance)
4291

    
4292
    return result
4293

    
4294

    
4295
class LUQueryExports(NoHooksLU):
4296
  """Query the exports list
4297

4298
  """
4299
  _OP_REQP = []
4300

    
4301
  def CheckPrereq(self):
4302
    """Check that the nodelist contains only existing nodes.
4303

4304
    """
4305
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4306

    
4307
  def Exec(self, feedback_fn):
4308
    """Compute the list of all the exported system images.
4309

4310
    Returns:
4311
      a dictionary with the structure node->(export-list)
4312
      where export-list is a list of the instances exported on
4313
      that node.
4314

4315
    """
4316
    return rpc.call_export_list(self.nodes)
4317

    
4318

    
4319
class LUExportInstance(LogicalUnit):
4320
  """Export an instance to an image in the cluster.
4321

4322
  """
4323
  HPATH = "instance-export"
4324
  HTYPE = constants.HTYPE_INSTANCE
4325
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4326

    
4327
  def BuildHooksEnv(self):
4328
    """Build hooks env.
4329

4330
    This will run on the master, primary node and target node.
4331

4332
    """
4333
    env = {
4334
      "EXPORT_NODE": self.op.target_node,
4335
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4336
      }
4337
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4338
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4339
          self.op.target_node]
4340
    return env, nl, nl
4341

    
4342
  def CheckPrereq(self):
4343
    """Check prerequisites.
4344

4345
    This checks that the instance name is a valid one.
4346

4347
    """
4348
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4349
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4350
    if self.instance is None:
4351
      raise errors.OpPrereqError("Instance '%s' not found" %
4352
                                 self.op.instance_name)
4353

    
4354
    # node verification
4355
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4356
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4357

    
4358
    if self.dst_node is None:
4359
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4360
                                 self.op.target_node)
4361
    self.op.target_node = self.dst_node.name
4362

    
4363
  def Exec(self, feedback_fn):
4364
    """Export an instance to an image in the cluster.
4365

4366
    """
4367
    instance = self.instance
4368
    dst_node = self.dst_node
4369
    src_node = instance.primary_node
4370
    # shutdown the instance, unless requested not to do so
4371
    if self.op.shutdown:
4372
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
4373
      self.proc.ChainOpCode(op)
4374

    
4375
    vgname = self.cfg.GetVGName()
4376

    
4377
    snap_disks = []
4378

    
4379
    try:
4380
      for disk in instance.disks:
4381
        if disk.iv_name == "sda":
4382
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4383
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4384

    
4385
          if not new_dev_name:
4386
            logger.Error("could not snapshot block device %s on node %s" %
4387
                         (disk.logical_id[1], src_node))
4388
          else:
4389
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4390
                                      logical_id=(vgname, new_dev_name),
4391
                                      physical_id=(vgname, new_dev_name),
4392
                                      iv_name=disk.iv_name)
4393
            snap_disks.append(new_dev)
4394

    
4395
    finally:
4396
      if self.op.shutdown:
4397
        op = opcodes.OpStartupInstance(instance_name=instance.name,
4398
                                       force=False)
4399
        self.proc.ChainOpCode(op)
4400

    
4401
    # TODO: check for size
4402

    
4403
    for dev in snap_disks:
4404
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4405
                                           instance):
4406
        logger.Error("could not export block device %s from node"
4407
                     " %s to node %s" %
4408
                     (dev.logical_id[1], src_node, dst_node.name))
4409
      if not rpc.call_blockdev_remove(src_node, dev):
4410
        logger.Error("could not remove snapshot block device %s from"
4411
                     " node %s" % (dev.logical_id[1], src_node))
4412

    
4413
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4414
      logger.Error("could not finalize export for instance %s on node %s" %
4415
                   (instance.name, dst_node.name))
4416

    
4417
    nodelist = self.cfg.GetNodeList()
4418
    nodelist.remove(dst_node.name)
4419

    
4420
    # on one-node clusters nodelist will be empty after the removal
4421
    # if we proceed the backup would be removed because OpQueryExports
4422
    # substitutes an empty list with the full cluster node list.
4423
    if nodelist:
4424
      op = opcodes.OpQueryExports(nodes=nodelist)
4425
      exportlist = self.proc.ChainOpCode(op)
4426
      for node in exportlist:
4427
        if instance.name in exportlist[node]:
4428
          if not rpc.call_export_remove(node, instance.name):
4429
            logger.Error("could not remove older export for instance %s"
4430
                         " on node %s" % (instance.name, node))
4431

    
4432

    
4433
class TagsLU(NoHooksLU):
4434
  """Generic tags LU.
4435

4436
  This is an abstract class which is the parent of all the other tags LUs.
4437

4438
  """
4439
  def CheckPrereq(self):
4440
    """Check prerequisites.
4441

4442
    """
4443
    if self.op.kind == constants.TAG_CLUSTER:
4444
      self.target = self.cfg.GetClusterInfo()
4445
    elif self.op.kind == constants.TAG_NODE:
4446
      name = self.cfg.ExpandNodeName(self.op.name)
4447
      if name is None:
4448
        raise errors.OpPrereqError("Invalid node name (%s)" %
4449
                                   (self.op.name,))
4450
      self.op.name = name
4451
      self.target = self.cfg.GetNodeInfo(name)
4452
    elif self.op.kind == constants.TAG_INSTANCE:
4453
      name = self.cfg.ExpandInstanceName(self.op.name)
4454
      if name is None:
4455
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4456
                                   (self.op.name,))
4457
      self.op.name = name
4458
      self.target = self.cfg.GetInstanceInfo(name)
4459
    else:
4460
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4461
                                 str(self.op.kind))
4462

    
4463

    
4464
class LUGetTags(TagsLU):
4465
  """Returns the tags of a given object.
4466

4467
  """
4468
  _OP_REQP = ["kind", "name"]
4469

    
4470
  def Exec(self, feedback_fn):
4471
    """Returns the tag list.
4472

4473
    """
4474
    return self.target.GetTags()
4475

    
4476

    
4477
class LUSearchTags(NoHooksLU):
4478
  """Searches the tags for a given pattern.
4479

4480
  """
4481
  _OP_REQP = ["pattern"]
4482

    
4483
  def CheckPrereq(self):
4484
    """Check prerequisites.
4485

4486
    This checks the pattern passed for validity by compiling it.
4487

4488
    """
4489
    try:
4490
      self.re = re.compile(self.op.pattern)
4491
    except re.error, err:
4492
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4493
                                 (self.op.pattern, err))
4494

    
4495
  def Exec(self, feedback_fn):
4496
    """Returns the tag list.
4497

4498
    """
4499
    cfg = self.cfg
4500
    tgts = [("/cluster", cfg.GetClusterInfo())]
4501
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4502
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4503
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4504
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4505
    results = []
4506
    for path, target in tgts:
4507
      for tag in target.GetTags():
4508
        if self.re.search(tag):
4509
          results.append((path, tag))
4510
    return results
4511

    
4512

    
4513
class LUAddTags(TagsLU):
4514
  """Sets a tag on a given object.
4515

4516
  """
4517
  _OP_REQP = ["kind", "name", "tags"]
4518

    
4519
  def CheckPrereq(self):
4520
    """Check prerequisites.
4521

4522
    This checks the type and length of the tag name and value.
4523

4524
    """
4525
    TagsLU.CheckPrereq(self)
4526
    for tag in self.op.tags:
4527
      objects.TaggableObject.ValidateTag(tag)
4528

    
4529
  def Exec(self, feedback_fn):
4530
    """Sets the tag.
4531

4532
    """
4533
    try:
4534
      for tag in self.op.tags:
4535
        self.target.AddTag(tag)
4536
    except errors.TagError, err:
4537
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4538
    try:
4539
      self.cfg.Update(self.target)
4540
    except errors.ConfigurationError:
4541
      raise errors.OpRetryError("There has been a modification to the"
4542
                                " config file and the operation has been"
4543
                                " aborted. Please retry.")
4544

    
4545

    
4546
class LUDelTags(TagsLU):
4547
  """Delete a list of tags from a given object.
4548

4549
  """
4550
  _OP_REQP = ["kind", "name", "tags"]
4551

    
4552
  def CheckPrereq(self):
4553
    """Check prerequisites.
4554

4555
    This checks that we have the given tag.
4556

4557
    """
4558
    TagsLU.CheckPrereq(self)
4559
    for tag in self.op.tags:
4560
      objects.TaggableObject.ValidateTag(tag)
4561
    del_tags = frozenset(self.op.tags)
4562
    cur_tags = self.target.GetTags()
4563
    if not del_tags <= cur_tags:
4564
      diff_tags = del_tags - cur_tags
4565
      diff_names = ["'%s'" % tag for tag in diff_tags]
4566
      diff_names.sort()
4567
      raise errors.OpPrereqError("Tag(s) %s not found" %
4568
                                 (",".join(diff_names)))
4569

    
4570
  def Exec(self, feedback_fn):
4571
    """Remove the tag from the object.
4572

4573
    """
4574
    for tag in self.op.tags:
4575
      self.target.RemoveTag(tag)
4576
    try:
4577
      self.cfg.Update(self.target)
4578
    except errors.ConfigurationError:
4579
      raise errors.OpRetryError("There has been a modification to the"
4580
                                " config file and the operation has been"
4581
                                " aborted. Please retry.")
4582

    
4583
class LUTestDelay(NoHooksLU):
4584
  """Sleep for a specified amount of time.
4585

4586
  This LU sleeps on the master and/or nodes for a specified amoutn of
4587
  time.
4588

4589
  """
4590
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4591

    
4592
  def CheckPrereq(self):
4593
    """Check prerequisites.
4594

4595
    This checks that we have a good list of nodes and/or the duration
4596
    is valid.
4597

4598
    """
4599

    
4600
    if self.op.on_nodes:
4601
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4602

    
4603
  def Exec(self, feedback_fn):
4604
    """Do the actual sleep.
4605

4606
    """
4607
    if self.op.on_master:
4608
      if not utils.TestDelay(self.op.duration):
4609
        raise errors.OpExecError("Error during master delay test")
4610
    if self.op.on_nodes:
4611
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4612
      if not result:
4613
        raise errors.OpExecError("Complete failure from rpc call")
4614
      for node, node_result in result.items():
4615
        if not node_result:
4616
          raise errors.OpExecError("Failure during rpc call to node %s,"
4617
                                   " result: %s" % (node, node_result))