Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ e7c6e02b

History | View | Annotate | Download (167.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import simplejson
34

    
35
from ganeti import rpc
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import ssconf
46

    
47

    
48
# Check whether the simplejson module supports indentation
49
_JSON_INDENT = 2
50
try:
51
  simplejson.dumps(1, indent=_JSON_INDENT)
52
except TypeError:
53
  _JSON_INDENT = None
54

    
55

    
56
class LogicalUnit(object):
57
  """Logical Unit base class.
58

59
  Subclasses must follow these rules:
60
    - implement CheckPrereq which also fills in the opcode instance
61
      with all the fields (even if as None)
62
    - implement Exec
63
    - implement BuildHooksEnv
64
    - redefine HPATH and HTYPE
65
    - optionally redefine their run requirements (REQ_CLUSTER,
66
      REQ_MASTER); note that all commands require root permissions
67

68
  """
69
  HPATH = None
70
  HTYPE = None
71
  _OP_REQP = []
72
  REQ_CLUSTER = True
73
  REQ_MASTER = True
74

    
75
  def __init__(self, processor, op, cfg, sstore):
76
    """Constructor for LogicalUnit.
77

78
    This needs to be overriden in derived classes in order to check op
79
    validity.
80

81
    """
82
    self.proc = processor
83
    self.op = op
84
    self.cfg = cfg
85
    self.sstore = sstore
86
    self.__ssh = None
87

    
88
    for attr_name in self._OP_REQP:
89
      attr_val = getattr(op, attr_name, None)
90
      if attr_val is None:
91
        raise errors.OpPrereqError("Required parameter '%s' missing" %
92
                                   attr_name)
93
    if self.REQ_CLUSTER:
94
      if not cfg.IsCluster():
95
        raise errors.OpPrereqError("Cluster not initialized yet,"
96
                                   " use 'gnt-cluster init' first.")
97
      if self.REQ_MASTER:
98
        master = sstore.GetMasterNode()
99
        if master != utils.HostInfo().name:
100
          raise errors.OpPrereqError("Commands must be run on the master"
101
                                     " node %s" % master)
102

    
103
  def __GetSSH(self):
104
    """Returns the SshRunner object
105

106
    """
107
    if not self.__ssh:
108
      self.__ssh = ssh.SshRunner(self.sstore)
109
    return self.__ssh
110

    
111
  ssh = property(fget=__GetSSH)
112

    
113
  def CheckPrereq(self):
114
    """Check prerequisites for this LU.
115

116
    This method should check that the prerequisites for the execution
117
    of this LU are fulfilled. It can do internode communication, but
118
    it should be idempotent - no cluster or system changes are
119
    allowed.
120

121
    The method should raise errors.OpPrereqError in case something is
122
    not fulfilled. Its return value is ignored.
123

124
    This method should also update all the parameters of the opcode to
125
    their canonical form; e.g. a short node name must be fully
126
    expanded after this method has successfully completed (so that
127
    hooks, logging, etc. work correctly).
128

129
    """
130
    raise NotImplementedError
131

    
132
  def Exec(self, feedback_fn):
133
    """Execute the LU.
134

135
    This method should implement the actual work. It should raise
136
    errors.OpExecError for failures that are somewhat dealt with in
137
    code, or expected.
138

139
    """
140
    raise NotImplementedError
141

    
142
  def BuildHooksEnv(self):
143
    """Build hooks environment for this LU.
144

145
    This method should return a three-node tuple consisting of: a dict
146
    containing the environment that will be used for running the
147
    specific hook for this LU, a list of node names on which the hook
148
    should run before the execution, and a list of node names on which
149
    the hook should run after the execution.
150

151
    The keys of the dict must not have 'GANETI_' prefixed as this will
152
    be handled in the hooks runner. Also note additional keys will be
153
    added by the hooks runner. If the LU doesn't define any
154
    environment, an empty dict (and not None) should be returned.
155

156
    As for the node lists, the master should not be included in the
157
    them, as it will be added by the hooks runner in case this LU
158
    requires a cluster to run on (otherwise we don't have a node
159
    list). No nodes should be returned as an empty list (and not
160
    None).
161

162
    Note that if the HPATH for a LU class is None, this function will
163
    not be called.
164

165
    """
166
    raise NotImplementedError
167

    
168

    
169
class NoHooksLU(LogicalUnit):
170
  """Simple LU which runs no hooks.
171

172
  This LU is intended as a parent for other LogicalUnits which will
173
  run no hooks, in order to reduce duplicate code.
174

175
  """
176
  HPATH = None
177
  HTYPE = None
178

    
179
  def BuildHooksEnv(self):
180
    """Build hooks env.
181

182
    This is a no-op, since we don't run hooks.
183

184
    """
185
    return {}, [], []
186

    
187

    
188
def _AddHostToEtcHosts(hostname):
189
  """Wrapper around utils.SetEtcHostsEntry.
190

191
  """
192
  hi = utils.HostInfo(name=hostname)
193
  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
194

    
195

    
196
def _RemoveHostFromEtcHosts(hostname):
197
  """Wrapper around utils.RemoveEtcHostsEntry.
198

199
  """
200
  hi = utils.HostInfo(name=hostname)
201
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
202
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
203

    
204

    
205
def _GetWantedNodes(lu, nodes):
206
  """Returns list of checked and expanded node names.
207

208
  Args:
209
    nodes: List of nodes (strings) or None for all
210

211
  """
212
  if not isinstance(nodes, list):
213
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
214

    
215
  if nodes:
216
    wanted = []
217

    
218
    for name in nodes:
219
      node = lu.cfg.ExpandNodeName(name)
220
      if node is None:
221
        raise errors.OpPrereqError("No such node name '%s'" % name)
222
      wanted.append(node)
223

    
224
  else:
225
    wanted = lu.cfg.GetNodeList()
226
  return utils.NiceSort(wanted)
227

    
228

    
229
def _GetWantedInstances(lu, instances):
230
  """Returns list of checked and expanded instance names.
231

232
  Args:
233
    instances: List of instances (strings) or None for all
234

235
  """
236
  if not isinstance(instances, list):
237
    raise errors.OpPrereqError("Invalid argument type 'instances'")
238

    
239
  if instances:
240
    wanted = []
241

    
242
    for name in instances:
243
      instance = lu.cfg.ExpandInstanceName(name)
244
      if instance is None:
245
        raise errors.OpPrereqError("No such instance name '%s'" % name)
246
      wanted.append(instance)
247

    
248
  else:
249
    wanted = lu.cfg.GetInstanceList()
250
  return utils.NiceSort(wanted)
251

    
252

    
253
def _CheckOutputFields(static, dynamic, selected):
254
  """Checks whether all selected fields are valid.
255

256
  Args:
257
    static: Static fields
258
    dynamic: Dynamic fields
259

260
  """
261
  static_fields = frozenset(static)
262
  dynamic_fields = frozenset(dynamic)
263

    
264
  all_fields = static_fields | dynamic_fields
265

    
266
  if not all_fields.issuperset(selected):
267
    raise errors.OpPrereqError("Unknown output fields selected: %s"
268
                               % ",".join(frozenset(selected).
269
                                          difference(all_fields)))
270

    
271

    
272
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
273
                          memory, vcpus, nics):
274
  """Builds instance related env variables for hooks from single variables.
275

276
  Args:
277
    secondary_nodes: List of secondary nodes as strings
278
  """
279
  env = {
280
    "OP_TARGET": name,
281
    "INSTANCE_NAME": name,
282
    "INSTANCE_PRIMARY": primary_node,
283
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
284
    "INSTANCE_OS_TYPE": os_type,
285
    "INSTANCE_STATUS": status,
286
    "INSTANCE_MEMORY": memory,
287
    "INSTANCE_VCPUS": vcpus,
288
  }
289

    
290
  if nics:
291
    nic_count = len(nics)
292
    for idx, (ip, bridge, mac) in enumerate(nics):
293
      if ip is None:
294
        ip = ""
295
      env["INSTANCE_NIC%d_IP" % idx] = ip
296
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
297
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
298
  else:
299
    nic_count = 0
300

    
301
  env["INSTANCE_NIC_COUNT"] = nic_count
302

    
303
  return env
304

    
305

    
306
def _BuildInstanceHookEnvByObject(instance, override=None):
307
  """Builds instance related env variables for hooks from an object.
308

309
  Args:
310
    instance: objects.Instance object of instance
311
    override: dict of values to override
312
  """
313
  args = {
314
    'name': instance.name,
315
    'primary_node': instance.primary_node,
316
    'secondary_nodes': instance.secondary_nodes,
317
    'os_type': instance.os,
318
    'status': instance.os,
319
    'memory': instance.memory,
320
    'vcpus': instance.vcpus,
321
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
322
  }
323
  if override:
324
    args.update(override)
325
  return _BuildInstanceHookEnv(**args)
326

    
327

    
328
def _HasValidVG(vglist, vgname):
329
  """Checks if the volume group list is valid.
330

331
  A non-None return value means there's an error, and the return value
332
  is the error message.
333

334
  """
335
  vgsize = vglist.get(vgname, None)
336
  if vgsize is None:
337
    return "volume group '%s' missing" % vgname
338
  elif vgsize < 20480:
339
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
340
            (vgname, vgsize))
341
  return None
342

    
343

    
344
def _InitSSHSetup(node):
345
  """Setup the SSH configuration for the cluster.
346

347

348
  This generates a dsa keypair for root, adds the pub key to the
349
  permitted hosts and adds the hostkey to its own known hosts.
350

351
  Args:
352
    node: the name of this host as a fqdn
353

354
  """
355
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
356

    
357
  for name in priv_key, pub_key:
358
    if os.path.exists(name):
359
      utils.CreateBackup(name)
360
    utils.RemoveFile(name)
361

    
362
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
363
                         "-f", priv_key,
364
                         "-q", "-N", ""])
365
  if result.failed:
366
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
367
                             result.output)
368

    
369
  f = open(pub_key, 'r')
370
  try:
371
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
372
  finally:
373
    f.close()
374

    
375

    
376
def _InitGanetiServerSetup(ss):
377
  """Setup the necessary configuration for the initial node daemon.
378

379
  This creates the nodepass file containing the shared password for
380
  the cluster and also generates the SSL certificate.
381

382
  """
383
  # Create pseudo random password
384
  randpass = sha.new(os.urandom(64)).hexdigest()
385
  # and write it into sstore
386
  ss.SetKey(ss.SS_NODED_PASS, randpass)
387

    
388
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
389
                         "-days", str(365*5), "-nodes", "-x509",
390
                         "-keyout", constants.SSL_CERT_FILE,
391
                         "-out", constants.SSL_CERT_FILE, "-batch"])
392
  if result.failed:
393
    raise errors.OpExecError("could not generate server ssl cert, command"
394
                             " %s had exitcode %s and error message %s" %
395
                             (result.cmd, result.exit_code, result.output))
396

    
397
  os.chmod(constants.SSL_CERT_FILE, 0400)
398

    
399
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
400

    
401
  if result.failed:
402
    raise errors.OpExecError("Could not start the node daemon, command %s"
403
                             " had exitcode %s and error %s" %
404
                             (result.cmd, result.exit_code, result.output))
405

    
406

    
407
def _CheckInstanceBridgesExist(instance):
408
  """Check that the brigdes needed by an instance exist.
409

410
  """
411
  # check bridges existance
412
  brlist = [nic.bridge for nic in instance.nics]
413
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
414
    raise errors.OpPrereqError("one or more target bridges %s does not"
415
                               " exist on destination node '%s'" %
416
                               (brlist, instance.primary_node))
417

    
418

    
419
class LUInitCluster(LogicalUnit):
420
  """Initialise the cluster.
421

422
  """
423
  HPATH = "cluster-init"
424
  HTYPE = constants.HTYPE_CLUSTER
425
  _OP_REQP = ["cluster_name", "hypervisor_type", "mac_prefix",
426
              "def_bridge", "master_netdev", "file_storage_dir"]
427
  REQ_CLUSTER = False
428

    
429
  def BuildHooksEnv(self):
430
    """Build hooks env.
431

432
    Notes: Since we don't require a cluster, we must manually add
433
    ourselves in the post-run node list.
434

435
    """
436
    env = {"OP_TARGET": self.op.cluster_name}
437
    return env, [], [self.hostname.name]
438

    
439
  def CheckPrereq(self):
440
    """Verify that the passed name is a valid one.
441

442
    """
443
    if config.ConfigWriter.IsCluster():
444
      raise errors.OpPrereqError("Cluster is already initialised")
445

    
446
    if self.op.hypervisor_type == constants.HT_XEN_HVM31:
447
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
448
        raise errors.OpPrereqError("Please prepare the cluster VNC"
449
                                   "password file %s" %
450
                                   constants.VNC_PASSWORD_FILE)
451

    
452
    self.hostname = hostname = utils.HostInfo()
453

    
454
    if hostname.ip.startswith("127."):
455
      raise errors.OpPrereqError("This host's IP resolves to the private"
456
                                 " range (%s). Please fix DNS or %s." %
457
                                 (hostname.ip, constants.ETC_HOSTS))
458

    
459
    if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
460
                         source=constants.LOCALHOST_IP_ADDRESS):
461
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
462
                                 " to %s,\nbut this ip address does not"
463
                                 " belong to this host."
464
                                 " Aborting." % hostname.ip)
465

    
466
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
467

    
468
    if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
469
                     timeout=5):
470
      raise errors.OpPrereqError("Cluster IP already active. Aborting.")
471

    
472
    secondary_ip = getattr(self.op, "secondary_ip", None)
473
    if secondary_ip and not utils.IsValidIP(secondary_ip):
474
      raise errors.OpPrereqError("Invalid secondary ip given")
475
    if (secondary_ip and
476
        secondary_ip != hostname.ip and
477
        (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
478
                           source=constants.LOCALHOST_IP_ADDRESS))):
479
      raise errors.OpPrereqError("You gave %s as secondary IP,"
480
                                 " but it does not belong to this host." %
481
                                 secondary_ip)
482
    self.secondary_ip = secondary_ip
483

    
484
    if not hasattr(self.op, "vg_name"):
485
      self.op.vg_name = None
486
    # if vg_name not None, checks if volume group is valid
487
    if self.op.vg_name:
488
      vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
489
      if vgstatus:
490
        raise errors.OpPrereqError("Error: %s\nspecify --no-lvm-storage if"
491
                                   " you are not using lvm" % vgstatus)
492

    
493
    self.op.file_storage_dir = os.path.normpath(self.op.file_storage_dir)
494

    
495
    if not os.path.isabs(self.op.file_storage_dir):
496
      raise errors.OpPrereqError("The file storage directory you have is"
497
                                 " not an absolute path.")
498

    
499
    if not os.path.exists(self.op.file_storage_dir):
500
      try:
501
        os.makedirs(self.op.file_storage_dir, 0750)
502
      except OSError, err:
503
        raise errors.OpPrereqError("Cannot create file storage directory"
504
                                   " '%s': %s" %
505
                                   (self.op.file_storage_dir, err))
506

    
507
    if not os.path.isdir(self.op.file_storage_dir):
508
      raise errors.OpPrereqError("The file storage directory '%s' is not"
509
                                 " a directory." % self.op.file_storage_dir)
510

    
511
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
512
                    self.op.mac_prefix):
513
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
514
                                 self.op.mac_prefix)
515

    
516
    if self.op.hypervisor_type not in constants.HYPER_TYPES:
517
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
518
                                 self.op.hypervisor_type)
519

    
520
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
521
    if result.failed:
522
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
523
                                 (self.op.master_netdev,
524
                                  result.output.strip()))
525

    
526
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
527
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
528
      raise errors.OpPrereqError("Init.d script '%s' missing or not"
529
                                 " executable." % constants.NODE_INITD_SCRIPT)
530

    
531
  def Exec(self, feedback_fn):
532
    """Initialize the cluster.
533

534
    """
535
    clustername = self.clustername
536
    hostname = self.hostname
537

    
538
    # set up the simple store
539
    self.sstore = ss = ssconf.SimpleStore()
540
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
541
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
542
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
543
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
544
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
545
    ss.SetKey(ss.SS_FILE_STORAGE_DIR, self.op.file_storage_dir)
546

    
547
    # set up the inter-node password and certificate
548
    _InitGanetiServerSetup(ss)
549

    
550
    # start the master ip
551
    rpc.call_node_start_master(hostname.name)
552

    
553
    # set up ssh config and /etc/hosts
554
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
555
    try:
556
      sshline = f.read()
557
    finally:
558
      f.close()
559
    sshkey = sshline.split(" ")[1]
560

    
561
    _AddHostToEtcHosts(hostname.name)
562
    _InitSSHSetup(hostname.name)
563

    
564
    # init of cluster config file
565
    self.cfg = cfgw = config.ConfigWriter()
566
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
567
                    sshkey, self.op.mac_prefix,
568
                    self.op.vg_name, self.op.def_bridge)
569

    
570
    ssh.WriteKnownHostsFile(cfgw, ss, constants.SSH_KNOWN_HOSTS_FILE)
571

    
572

    
573
class LUDestroyCluster(NoHooksLU):
574
  """Logical unit for destroying the cluster.
575

576
  """
577
  _OP_REQP = []
578

    
579
  def CheckPrereq(self):
580
    """Check prerequisites.
581

582
    This checks whether the cluster is empty.
583

584
    Any errors are signalled by raising errors.OpPrereqError.
585

586
    """
587
    master = self.sstore.GetMasterNode()
588

    
589
    nodelist = self.cfg.GetNodeList()
590
    if len(nodelist) != 1 or nodelist[0] != master:
591
      raise errors.OpPrereqError("There are still %d node(s) in"
592
                                 " this cluster." % (len(nodelist) - 1))
593
    instancelist = self.cfg.GetInstanceList()
594
    if instancelist:
595
      raise errors.OpPrereqError("There are still %d instance(s) in"
596
                                 " this cluster." % len(instancelist))
597

    
598
  def Exec(self, feedback_fn):
599
    """Destroys the cluster.
600

601
    """
602
    master = self.sstore.GetMasterNode()
603
    if not rpc.call_node_stop_master(master):
604
      raise errors.OpExecError("Could not disable the master role")
605
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
606
    utils.CreateBackup(priv_key)
607
    utils.CreateBackup(pub_key)
608
    rpc.call_node_leave_cluster(master)
609

    
610

    
611
class LUVerifyCluster(NoHooksLU):
612
  """Verifies the cluster status.
613

614
  """
615
  _OP_REQP = ["skip_checks"]
616

    
617
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
618
                  remote_version, feedback_fn):
619
    """Run multiple tests against a node.
620

621
    Test list:
622
      - compares ganeti version
623
      - checks vg existance and size > 20G
624
      - checks config file checksum
625
      - checks ssh to other nodes
626

627
    Args:
628
      node: name of the node to check
629
      file_list: required list of files
630
      local_cksum: dictionary of local files and their checksums
631

632
    """
633
    # compares ganeti version
634
    local_version = constants.PROTOCOL_VERSION
635
    if not remote_version:
636
      feedback_fn("  - ERROR: connection to %s failed" % (node))
637
      return True
638

    
639
    if local_version != remote_version:
640
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
641
                      (local_version, node, remote_version))
642
      return True
643

    
644
    # checks vg existance and size > 20G
645

    
646
    bad = False
647
    if not vglist:
648
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
649
                      (node,))
650
      bad = True
651
    else:
652
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
653
      if vgstatus:
654
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
655
        bad = True
656

    
657
    # checks config file checksum
658
    # checks ssh to any
659

    
660
    if 'filelist' not in node_result:
661
      bad = True
662
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
663
    else:
664
      remote_cksum = node_result['filelist']
665
      for file_name in file_list:
666
        if file_name not in remote_cksum:
667
          bad = True
668
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
669
        elif remote_cksum[file_name] != local_cksum[file_name]:
670
          bad = True
671
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
672

    
673
    if 'nodelist' not in node_result:
674
      bad = True
675
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
676
    else:
677
      if node_result['nodelist']:
678
        bad = True
679
        for node in node_result['nodelist']:
680
          feedback_fn("  - ERROR: communication with node '%s': %s" %
681
                          (node, node_result['nodelist'][node]))
682
    hyp_result = node_result.get('hypervisor', None)
683
    if hyp_result is not None:
684
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
685
    return bad
686

    
687
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
688
                      node_instance, feedback_fn):
689
    """Verify an instance.
690

691
    This function checks to see if the required block devices are
692
    available on the instance's node.
693

694
    """
695
    bad = False
696

    
697
    node_current = instanceconfig.primary_node
698

    
699
    node_vol_should = {}
700
    instanceconfig.MapLVsByNode(node_vol_should)
701

    
702
    for node in node_vol_should:
703
      for volume in node_vol_should[node]:
704
        if node not in node_vol_is or volume not in node_vol_is[node]:
705
          feedback_fn("  - ERROR: volume %s missing on node %s" %
706
                          (volume, node))
707
          bad = True
708

    
709
    if not instanceconfig.status == 'down':
710
      if (node_current not in node_instance or
711
          not instance in node_instance[node_current]):
712
        feedback_fn("  - ERROR: instance %s not running on node %s" %
713
                        (instance, node_current))
714
        bad = True
715

    
716
    for node in node_instance:
717
      if (not node == node_current):
718
        if instance in node_instance[node]:
719
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
720
                          (instance, node))
721
          bad = True
722

    
723
    return bad
724

    
725
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
726
    """Verify if there are any unknown volumes in the cluster.
727

728
    The .os, .swap and backup volumes are ignored. All other volumes are
729
    reported as unknown.
730

731
    """
732
    bad = False
733

    
734
    for node in node_vol_is:
735
      for volume in node_vol_is[node]:
736
        if node not in node_vol_should or volume not in node_vol_should[node]:
737
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
738
                      (volume, node))
739
          bad = True
740
    return bad
741

    
742
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
743
    """Verify the list of running instances.
744

745
    This checks what instances are running but unknown to the cluster.
746

747
    """
748
    bad = False
749
    for node in node_instance:
750
      for runninginstance in node_instance[node]:
751
        if runninginstance not in instancelist:
752
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
753
                          (runninginstance, node))
754
          bad = True
755
    return bad
756

    
757
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
758
    """Verify N+1 Memory Resilience.
759

760
    Check that if one single node dies we can still start all the instances it
761
    was primary for.
762

763
    """
764
    bad = False
765

    
766
    for node, nodeinfo in node_info.iteritems():
767
      # This code checks that every node which is now listed as secondary has
768
      # enough memory to host all instances it is supposed to should a single
769
      # other node in the cluster fail.
770
      # FIXME: not ready for failover to an arbitrary node
771
      # FIXME: does not support file-backed instances
772
      # WARNING: we currently take into account down instances as well as up
773
      # ones, considering that even if they're down someone might want to start
774
      # them even in the event of a node failure.
775
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
776
        needed_mem = 0
777
        for instance in instances:
778
          needed_mem += instance_cfg[instance].memory
779
        if nodeinfo['mfree'] < needed_mem:
780
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
781
                      " failovers should node %s fail" % (node, prinode))
782
          bad = True
783
    return bad
784

    
785
  def CheckPrereq(self):
786
    """Check prerequisites.
787

788
    Transform the list of checks we're going to skip into a set and check that
789
    all its members are valid.
790

791
    """
792
    self.skip_set = frozenset(self.op.skip_checks)
793
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
794
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
795

    
796
  def Exec(self, feedback_fn):
797
    """Verify integrity of cluster, performing various test on nodes.
798

799
    """
800
    bad = False
801
    feedback_fn("* Verifying global settings")
802
    for msg in self.cfg.VerifyConfig():
803
      feedback_fn("  - ERROR: %s" % msg)
804

    
805
    vg_name = self.cfg.GetVGName()
806
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
807
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
808
    i_non_redundant = [] # Non redundant instances
809
    node_volume = {}
810
    node_instance = {}
811
    node_info = {}
812
    instance_cfg = {}
813

    
814
    # FIXME: verify OS list
815
    # do local checksums
816
    file_names = list(self.sstore.GetFileList())
817
    file_names.append(constants.SSL_CERT_FILE)
818
    file_names.append(constants.CLUSTER_CONF_FILE)
819
    local_checksums = utils.FingerprintFiles(file_names)
820

    
821
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
822
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
823
    all_instanceinfo = rpc.call_instance_list(nodelist)
824
    all_vglist = rpc.call_vg_list(nodelist)
825
    node_verify_param = {
826
      'filelist': file_names,
827
      'nodelist': nodelist,
828
      'hypervisor': None,
829
      }
830
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
831
    all_rversion = rpc.call_version(nodelist)
832
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
833

    
834
    for node in nodelist:
835
      feedback_fn("* Verifying node %s" % node)
836
      result = self._VerifyNode(node, file_names, local_checksums,
837
                                all_vglist[node], all_nvinfo[node],
838
                                all_rversion[node], feedback_fn)
839
      bad = bad or result
840

    
841
      # node_volume
842
      volumeinfo = all_volumeinfo[node]
843

    
844
      if isinstance(volumeinfo, basestring):
845
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
846
                    (node, volumeinfo[-400:].encode('string_escape')))
847
        bad = True
848
        node_volume[node] = {}
849
      elif not isinstance(volumeinfo, dict):
850
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
851
        bad = True
852
        continue
853
      else:
854
        node_volume[node] = volumeinfo
855

    
856
      # node_instance
857
      nodeinstance = all_instanceinfo[node]
858
      if type(nodeinstance) != list:
859
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
860
        bad = True
861
        continue
862

    
863
      node_instance[node] = nodeinstance
864

    
865
      # node_info
866
      nodeinfo = all_ninfo[node]
867
      if not isinstance(nodeinfo, dict):
868
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
869
        bad = True
870
        continue
871

    
872
      try:
873
        node_info[node] = {
874
          "mfree": int(nodeinfo['memory_free']),
875
          "dfree": int(nodeinfo['vg_free']),
876
          "pinst": [],
877
          "sinst": [],
878
          # dictionary holding all instances this node is secondary for,
879
          # grouped by their primary node. Each key is a cluster node, and each
880
          # value is a list of instances which have the key as primary and the
881
          # current node as secondary.  this is handy to calculate N+1 memory
882
          # availability if you can only failover from a primary to its
883
          # secondary.
884
          "sinst-by-pnode": {},
885
        }
886
      except ValueError:
887
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
888
        bad = True
889
        continue
890

    
891
    node_vol_should = {}
892

    
893
    for instance in instancelist:
894
      feedback_fn("* Verifying instance %s" % instance)
895
      inst_config = self.cfg.GetInstanceInfo(instance)
896
      result =  self._VerifyInstance(instance, inst_config, node_volume,
897
                                     node_instance, feedback_fn)
898
      bad = bad or result
899

    
900
      inst_config.MapLVsByNode(node_vol_should)
901

    
902
      instance_cfg[instance] = inst_config
903

    
904
      pnode = inst_config.primary_node
905
      if pnode in node_info:
906
        node_info[pnode]['pinst'].append(instance)
907
      else:
908
        feedback_fn("  - ERROR: instance %s, connection to primary node"
909
                    " %s failed" % (instance, pnode))
910
        bad = True
911

    
912
      # If the instance is non-redundant we cannot survive losing its primary
913
      # node, so we are not N+1 compliant. On the other hand we have no disk
914
      # templates with more than one secondary so that situation is not well
915
      # supported either.
916
      # FIXME: does not support file-backed instances
917
      if len(inst_config.secondary_nodes) == 0:
918
        i_non_redundant.append(instance)
919
      elif len(inst_config.secondary_nodes) > 1:
920
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
921
                    % instance)
922

    
923
      for snode in inst_config.secondary_nodes:
924
        if snode in node_info:
925
          node_info[snode]['sinst'].append(instance)
926
          if pnode not in node_info[snode]['sinst-by-pnode']:
927
            node_info[snode]['sinst-by-pnode'][pnode] = []
928
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
929
        else:
930
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
931
                      " %s failed" % (instance, snode))
932

    
933
    feedback_fn("* Verifying orphan volumes")
934
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
935
                                       feedback_fn)
936
    bad = bad or result
937

    
938
    feedback_fn("* Verifying remaining instances")
939
    result = self._VerifyOrphanInstances(instancelist, node_instance,
940
                                         feedback_fn)
941
    bad = bad or result
942

    
943
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
944
      feedback_fn("* Verifying N+1 Memory redundancy")
945
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
946
      bad = bad or result
947

    
948
    feedback_fn("* Other Notes")
949
    if i_non_redundant:
950
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
951
                  % len(i_non_redundant))
952

    
953
    return int(bad)
954

    
955

    
956
class LUVerifyDisks(NoHooksLU):
957
  """Verifies the cluster disks status.
958

959
  """
960
  _OP_REQP = []
961

    
962
  def CheckPrereq(self):
963
    """Check prerequisites.
964

965
    This has no prerequisites.
966

967
    """
968
    pass
969

    
970
  def Exec(self, feedback_fn):
971
    """Verify integrity of cluster disks.
972

973
    """
974
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
975

    
976
    vg_name = self.cfg.GetVGName()
977
    nodes = utils.NiceSort(self.cfg.GetNodeList())
978
    instances = [self.cfg.GetInstanceInfo(name)
979
                 for name in self.cfg.GetInstanceList()]
980

    
981
    nv_dict = {}
982
    for inst in instances:
983
      inst_lvs = {}
984
      if (inst.status != "up" or
985
          inst.disk_template not in constants.DTS_NET_MIRROR):
986
        continue
987
      inst.MapLVsByNode(inst_lvs)
988
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
989
      for node, vol_list in inst_lvs.iteritems():
990
        for vol in vol_list:
991
          nv_dict[(node, vol)] = inst
992

    
993
    if not nv_dict:
994
      return result
995

    
996
    node_lvs = rpc.call_volume_list(nodes, vg_name)
997

    
998
    to_act = set()
999
    for node in nodes:
1000
      # node_volume
1001
      lvs = node_lvs[node]
1002

    
1003
      if isinstance(lvs, basestring):
1004
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1005
        res_nlvm[node] = lvs
1006
      elif not isinstance(lvs, dict):
1007
        logger.Info("connection to node %s failed or invalid data returned" %
1008
                    (node,))
1009
        res_nodes.append(node)
1010
        continue
1011

    
1012
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1013
        inst = nv_dict.pop((node, lv_name), None)
1014
        if (not lv_online and inst is not None
1015
            and inst.name not in res_instances):
1016
          res_instances.append(inst.name)
1017

    
1018
    # any leftover items in nv_dict are missing LVs, let's arrange the
1019
    # data better
1020
    for key, inst in nv_dict.iteritems():
1021
      if inst.name not in res_missing:
1022
        res_missing[inst.name] = []
1023
      res_missing[inst.name].append(key)
1024

    
1025
    return result
1026

    
1027

    
1028
class LURenameCluster(LogicalUnit):
1029
  """Rename the cluster.
1030

1031
  """
1032
  HPATH = "cluster-rename"
1033
  HTYPE = constants.HTYPE_CLUSTER
1034
  _OP_REQP = ["name"]
1035

    
1036
  def BuildHooksEnv(self):
1037
    """Build hooks env.
1038

1039
    """
1040
    env = {
1041
      "OP_TARGET": self.sstore.GetClusterName(),
1042
      "NEW_NAME": self.op.name,
1043
      }
1044
    mn = self.sstore.GetMasterNode()
1045
    return env, [mn], [mn]
1046

    
1047
  def CheckPrereq(self):
1048
    """Verify that the passed name is a valid one.
1049

1050
    """
1051
    hostname = utils.HostInfo(self.op.name)
1052

    
1053
    new_name = hostname.name
1054
    self.ip = new_ip = hostname.ip
1055
    old_name = self.sstore.GetClusterName()
1056
    old_ip = self.sstore.GetMasterIP()
1057
    if new_name == old_name and new_ip == old_ip:
1058
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1059
                                 " cluster has changed")
1060
    if new_ip != old_ip:
1061
      result = utils.RunCmd(["fping", "-q", new_ip])
1062
      if not result.failed:
1063
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1064
                                   " reachable on the network. Aborting." %
1065
                                   new_ip)
1066

    
1067
    self.op.name = new_name
1068

    
1069
  def Exec(self, feedback_fn):
1070
    """Rename the cluster.
1071

1072
    """
1073
    clustername = self.op.name
1074
    ip = self.ip
1075
    ss = self.sstore
1076

    
1077
    # shutdown the master IP
1078
    master = ss.GetMasterNode()
1079
    if not rpc.call_node_stop_master(master):
1080
      raise errors.OpExecError("Could not disable the master role")
1081

    
1082
    try:
1083
      # modify the sstore
1084
      ss.SetKey(ss.SS_MASTER_IP, ip)
1085
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1086

    
1087
      # Distribute updated ss config to all nodes
1088
      myself = self.cfg.GetNodeInfo(master)
1089
      dist_nodes = self.cfg.GetNodeList()
1090
      if myself.name in dist_nodes:
1091
        dist_nodes.remove(myself.name)
1092

    
1093
      logger.Debug("Copying updated ssconf data to all nodes")
1094
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1095
        fname = ss.KeyToFilename(keyname)
1096
        result = rpc.call_upload_file(dist_nodes, fname)
1097
        for to_node in dist_nodes:
1098
          if not result[to_node]:
1099
            logger.Error("copy of file %s to node %s failed" %
1100
                         (fname, to_node))
1101
    finally:
1102
      if not rpc.call_node_start_master(master):
1103
        logger.Error("Could not re-enable the master role on the master,"
1104
                     " please restart manually.")
1105

    
1106

    
1107
def _RecursiveCheckIfLVMBased(disk):
1108
  """Check if the given disk or its children are lvm-based.
1109

1110
  Args:
1111
    disk: ganeti.objects.Disk object
1112

1113
  Returns:
1114
    boolean indicating whether a LD_LV dev_type was found or not
1115

1116
  """
1117
  if disk.children:
1118
    for chdisk in disk.children:
1119
      if _RecursiveCheckIfLVMBased(chdisk):
1120
        return True
1121
  return disk.dev_type == constants.LD_LV
1122

    
1123

    
1124
class LUSetClusterParams(LogicalUnit):
1125
  """Change the parameters of the cluster.
1126

1127
  """
1128
  HPATH = "cluster-modify"
1129
  HTYPE = constants.HTYPE_CLUSTER
1130
  _OP_REQP = []
1131

    
1132
  def BuildHooksEnv(self):
1133
    """Build hooks env.
1134

1135
    """
1136
    env = {
1137
      "OP_TARGET": self.sstore.GetClusterName(),
1138
      "NEW_VG_NAME": self.op.vg_name,
1139
      }
1140
    mn = self.sstore.GetMasterNode()
1141
    return env, [mn], [mn]
1142

    
1143
  def CheckPrereq(self):
1144
    """Check prerequisites.
1145

1146
    This checks whether the given params don't conflict and
1147
    if the given volume group is valid.
1148

1149
    """
1150
    if not self.op.vg_name:
1151
      instances = [self.cfg.GetInstanceInfo(name)
1152
                   for name in self.cfg.GetInstanceList()]
1153
      for inst in instances:
1154
        for disk in inst.disks:
1155
          if _RecursiveCheckIfLVMBased(disk):
1156
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1157
                                       " lvm-based instances exist")
1158

    
1159
    # if vg_name not None, checks given volume group on all nodes
1160
    if self.op.vg_name:
1161
      node_list = self.cfg.GetNodeList()
1162
      vglist = rpc.call_vg_list(node_list)
1163
      for node in node_list:
1164
        vgstatus = _HasValidVG(vglist[node], self.op.vg_name)
1165
        if vgstatus:
1166
          raise errors.OpPrereqError("Error on node '%s': %s" %
1167
                                     (node, vgstatus))
1168

    
1169
  def Exec(self, feedback_fn):
1170
    """Change the parameters of the cluster.
1171

1172
    """
1173
    if self.op.vg_name != self.cfg.GetVGName():
1174
      self.cfg.SetVGName(self.op.vg_name)
1175
    else:
1176
      feedback_fn("Cluster LVM configuration already in desired"
1177
                  " state, not changing")
1178

    
1179

    
1180
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1181
  """Sleep and poll for an instance's disk to sync.
1182

1183
  """
1184
  if not instance.disks:
1185
    return True
1186

    
1187
  if not oneshot:
1188
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1189

    
1190
  node = instance.primary_node
1191

    
1192
  for dev in instance.disks:
1193
    cfgw.SetDiskID(dev, node)
1194

    
1195
  retries = 0
1196
  while True:
1197
    max_time = 0
1198
    done = True
1199
    cumul_degraded = False
1200
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1201
    if not rstats:
1202
      proc.LogWarning("Can't get any data from node %s" % node)
1203
      retries += 1
1204
      if retries >= 10:
1205
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1206
                                 " aborting." % node)
1207
      time.sleep(6)
1208
      continue
1209
    retries = 0
1210
    for i in range(len(rstats)):
1211
      mstat = rstats[i]
1212
      if mstat is None:
1213
        proc.LogWarning("Can't compute data for node %s/%s" %
1214
                        (node, instance.disks[i].iv_name))
1215
        continue
1216
      # we ignore the ldisk parameter
1217
      perc_done, est_time, is_degraded, _ = mstat
1218
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1219
      if perc_done is not None:
1220
        done = False
1221
        if est_time is not None:
1222
          rem_time = "%d estimated seconds remaining" % est_time
1223
          max_time = est_time
1224
        else:
1225
          rem_time = "no time estimate"
1226
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1227
                     (instance.disks[i].iv_name, perc_done, rem_time))
1228
    if done or oneshot:
1229
      break
1230

    
1231
    if unlock:
1232
      utils.Unlock('cmd')
1233
    try:
1234
      time.sleep(min(60, max_time))
1235
    finally:
1236
      if unlock:
1237
        utils.Lock('cmd')
1238

    
1239
  if done:
1240
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1241
  return not cumul_degraded
1242

    
1243

    
1244
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1245
  """Check that mirrors are not degraded.
1246

1247
  The ldisk parameter, if True, will change the test from the
1248
  is_degraded attribute (which represents overall non-ok status for
1249
  the device(s)) to the ldisk (representing the local storage status).
1250

1251
  """
1252
  cfgw.SetDiskID(dev, node)
1253
  if ldisk:
1254
    idx = 6
1255
  else:
1256
    idx = 5
1257

    
1258
  result = True
1259
  if on_primary or dev.AssembleOnSecondary():
1260
    rstats = rpc.call_blockdev_find(node, dev)
1261
    if not rstats:
1262
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1263
      result = False
1264
    else:
1265
      result = result and (not rstats[idx])
1266
  if dev.children:
1267
    for child in dev.children:
1268
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1269

    
1270
  return result
1271

    
1272

    
1273
class LUDiagnoseOS(NoHooksLU):
1274
  """Logical unit for OS diagnose/query.
1275

1276
  """
1277
  _OP_REQP = ["output_fields", "names"]
1278

    
1279
  def CheckPrereq(self):
1280
    """Check prerequisites.
1281

1282
    This always succeeds, since this is a pure query LU.
1283

1284
    """
1285
    if self.op.names:
1286
      raise errors.OpPrereqError("Selective OS query not supported")
1287

    
1288
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1289
    _CheckOutputFields(static=[],
1290
                       dynamic=self.dynamic_fields,
1291
                       selected=self.op.output_fields)
1292

    
1293
  @staticmethod
1294
  def _DiagnoseByOS(node_list, rlist):
1295
    """Remaps a per-node return list into an a per-os per-node dictionary
1296

1297
      Args:
1298
        node_list: a list with the names of all nodes
1299
        rlist: a map with node names as keys and OS objects as values
1300

1301
      Returns:
1302
        map: a map with osnames as keys and as value another map, with
1303
             nodes as
1304
             keys and list of OS objects as values
1305
             e.g. {"debian-etch": {"node1": [<object>,...],
1306
                                   "node2": [<object>,]}
1307
                  }
1308

1309
    """
1310
    all_os = {}
1311
    for node_name, nr in rlist.iteritems():
1312
      if not nr:
1313
        continue
1314
      for os in nr:
1315
        if os.name not in all_os:
1316
          # build a list of nodes for this os containing empty lists
1317
          # for each node in node_list
1318
          all_os[os.name] = {}
1319
          for nname in node_list:
1320
            all_os[os.name][nname] = []
1321
        all_os[os.name][node_name].append(os)
1322
    return all_os
1323

    
1324
  def Exec(self, feedback_fn):
1325
    """Compute the list of OSes.
1326

1327
    """
1328
    node_list = self.cfg.GetNodeList()
1329
    node_data = rpc.call_os_diagnose(node_list)
1330
    if node_data == False:
1331
      raise errors.OpExecError("Can't gather the list of OSes")
1332
    pol = self._DiagnoseByOS(node_list, node_data)
1333
    output = []
1334
    for os_name, os_data in pol.iteritems():
1335
      row = []
1336
      for field in self.op.output_fields:
1337
        if field == "name":
1338
          val = os_name
1339
        elif field == "valid":
1340
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1341
        elif field == "node_status":
1342
          val = {}
1343
          for node_name, nos_list in os_data.iteritems():
1344
            val[node_name] = [(v.status, v.path) for v in nos_list]
1345
        else:
1346
          raise errors.ParameterError(field)
1347
        row.append(val)
1348
      output.append(row)
1349

    
1350
    return output
1351

    
1352

    
1353
class LURemoveNode(LogicalUnit):
1354
  """Logical unit for removing a node.
1355

1356
  """
1357
  HPATH = "node-remove"
1358
  HTYPE = constants.HTYPE_NODE
1359
  _OP_REQP = ["node_name"]
1360

    
1361
  def BuildHooksEnv(self):
1362
    """Build hooks env.
1363

1364
    This doesn't run on the target node in the pre phase as a failed
1365
    node would not allows itself to run.
1366

1367
    """
1368
    env = {
1369
      "OP_TARGET": self.op.node_name,
1370
      "NODE_NAME": self.op.node_name,
1371
      }
1372
    all_nodes = self.cfg.GetNodeList()
1373
    all_nodes.remove(self.op.node_name)
1374
    return env, all_nodes, all_nodes
1375

    
1376
  def CheckPrereq(self):
1377
    """Check prerequisites.
1378

1379
    This checks:
1380
     - the node exists in the configuration
1381
     - it does not have primary or secondary instances
1382
     - it's not the master
1383

1384
    Any errors are signalled by raising errors.OpPrereqError.
1385

1386
    """
1387
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1388
    if node is None:
1389
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1390

    
1391
    instance_list = self.cfg.GetInstanceList()
1392

    
1393
    masternode = self.sstore.GetMasterNode()
1394
    if node.name == masternode:
1395
      raise errors.OpPrereqError("Node is the master node,"
1396
                                 " you need to failover first.")
1397

    
1398
    for instance_name in instance_list:
1399
      instance = self.cfg.GetInstanceInfo(instance_name)
1400
      if node.name == instance.primary_node:
1401
        raise errors.OpPrereqError("Instance %s still running on the node,"
1402
                                   " please remove first." % instance_name)
1403
      if node.name in instance.secondary_nodes:
1404
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1405
                                   " please remove first." % instance_name)
1406
    self.op.node_name = node.name
1407
    self.node = node
1408

    
1409
  def Exec(self, feedback_fn):
1410
    """Removes the node from the cluster.
1411

1412
    """
1413
    node = self.node
1414
    logger.Info("stopping the node daemon and removing configs from node %s" %
1415
                node.name)
1416

    
1417
    rpc.call_node_leave_cluster(node.name)
1418

    
1419
    self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1420

    
1421
    logger.Info("Removing node %s from config" % node.name)
1422

    
1423
    self.cfg.RemoveNode(node.name)
1424

    
1425
    _RemoveHostFromEtcHosts(node.name)
1426

    
1427

    
1428
class LUQueryNodes(NoHooksLU):
1429
  """Logical unit for querying nodes.
1430

1431
  """
1432
  _OP_REQP = ["output_fields", "names"]
1433

    
1434
  def CheckPrereq(self):
1435
    """Check prerequisites.
1436

1437
    This checks that the fields required are valid output fields.
1438

1439
    """
1440
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1441
                                     "mtotal", "mnode", "mfree",
1442
                                     "bootid"])
1443

    
1444
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1445
                               "pinst_list", "sinst_list",
1446
                               "pip", "sip"],
1447
                       dynamic=self.dynamic_fields,
1448
                       selected=self.op.output_fields)
1449

    
1450
    self.wanted = _GetWantedNodes(self, self.op.names)
1451

    
1452
  def Exec(self, feedback_fn):
1453
    """Computes the list of nodes and their attributes.
1454

1455
    """
1456
    nodenames = self.wanted
1457
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1458

    
1459
    # begin data gathering
1460

    
1461
    if self.dynamic_fields.intersection(self.op.output_fields):
1462
      live_data = {}
1463
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1464
      for name in nodenames:
1465
        nodeinfo = node_data.get(name, None)
1466
        if nodeinfo:
1467
          live_data[name] = {
1468
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1469
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1470
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1471
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1472
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1473
            "bootid": nodeinfo['bootid'],
1474
            }
1475
        else:
1476
          live_data[name] = {}
1477
    else:
1478
      live_data = dict.fromkeys(nodenames, {})
1479

    
1480
    node_to_primary = dict([(name, set()) for name in nodenames])
1481
    node_to_secondary = dict([(name, set()) for name in nodenames])
1482

    
1483
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1484
                             "sinst_cnt", "sinst_list"))
1485
    if inst_fields & frozenset(self.op.output_fields):
1486
      instancelist = self.cfg.GetInstanceList()
1487

    
1488
      for instance_name in instancelist:
1489
        inst = self.cfg.GetInstanceInfo(instance_name)
1490
        if inst.primary_node in node_to_primary:
1491
          node_to_primary[inst.primary_node].add(inst.name)
1492
        for secnode in inst.secondary_nodes:
1493
          if secnode in node_to_secondary:
1494
            node_to_secondary[secnode].add(inst.name)
1495

    
1496
    # end data gathering
1497

    
1498
    output = []
1499
    for node in nodelist:
1500
      node_output = []
1501
      for field in self.op.output_fields:
1502
        if field == "name":
1503
          val = node.name
1504
        elif field == "pinst_list":
1505
          val = list(node_to_primary[node.name])
1506
        elif field == "sinst_list":
1507
          val = list(node_to_secondary[node.name])
1508
        elif field == "pinst_cnt":
1509
          val = len(node_to_primary[node.name])
1510
        elif field == "sinst_cnt":
1511
          val = len(node_to_secondary[node.name])
1512
        elif field == "pip":
1513
          val = node.primary_ip
1514
        elif field == "sip":
1515
          val = node.secondary_ip
1516
        elif field in self.dynamic_fields:
1517
          val = live_data[node.name].get(field, None)
1518
        else:
1519
          raise errors.ParameterError(field)
1520
        node_output.append(val)
1521
      output.append(node_output)
1522

    
1523
    return output
1524

    
1525

    
1526
class LUQueryNodeVolumes(NoHooksLU):
1527
  """Logical unit for getting volumes on node(s).
1528

1529
  """
1530
  _OP_REQP = ["nodes", "output_fields"]
1531

    
1532
  def CheckPrereq(self):
1533
    """Check prerequisites.
1534

1535
    This checks that the fields required are valid output fields.
1536

1537
    """
1538
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1539

    
1540
    _CheckOutputFields(static=["node"],
1541
                       dynamic=["phys", "vg", "name", "size", "instance"],
1542
                       selected=self.op.output_fields)
1543

    
1544

    
1545
  def Exec(self, feedback_fn):
1546
    """Computes the list of nodes and their attributes.
1547

1548
    """
1549
    nodenames = self.nodes
1550
    volumes = rpc.call_node_volumes(nodenames)
1551

    
1552
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1553
             in self.cfg.GetInstanceList()]
1554

    
1555
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1556

    
1557
    output = []
1558
    for node in nodenames:
1559
      if node not in volumes or not volumes[node]:
1560
        continue
1561

    
1562
      node_vols = volumes[node][:]
1563
      node_vols.sort(key=lambda vol: vol['dev'])
1564

    
1565
      for vol in node_vols:
1566
        node_output = []
1567
        for field in self.op.output_fields:
1568
          if field == "node":
1569
            val = node
1570
          elif field == "phys":
1571
            val = vol['dev']
1572
          elif field == "vg":
1573
            val = vol['vg']
1574
          elif field == "name":
1575
            val = vol['name']
1576
          elif field == "size":
1577
            val = int(float(vol['size']))
1578
          elif field == "instance":
1579
            for inst in ilist:
1580
              if node not in lv_by_node[inst]:
1581
                continue
1582
              if vol['name'] in lv_by_node[inst][node]:
1583
                val = inst.name
1584
                break
1585
            else:
1586
              val = '-'
1587
          else:
1588
            raise errors.ParameterError(field)
1589
          node_output.append(str(val))
1590

    
1591
        output.append(node_output)
1592

    
1593
    return output
1594

    
1595

    
1596
class LUAddNode(LogicalUnit):
1597
  """Logical unit for adding node to the cluster.
1598

1599
  """
1600
  HPATH = "node-add"
1601
  HTYPE = constants.HTYPE_NODE
1602
  _OP_REQP = ["node_name"]
1603

    
1604
  def BuildHooksEnv(self):
1605
    """Build hooks env.
1606

1607
    This will run on all nodes before, and on all nodes + the new node after.
1608

1609
    """
1610
    env = {
1611
      "OP_TARGET": self.op.node_name,
1612
      "NODE_NAME": self.op.node_name,
1613
      "NODE_PIP": self.op.primary_ip,
1614
      "NODE_SIP": self.op.secondary_ip,
1615
      }
1616
    nodes_0 = self.cfg.GetNodeList()
1617
    nodes_1 = nodes_0 + [self.op.node_name, ]
1618
    return env, nodes_0, nodes_1
1619

    
1620
  def CheckPrereq(self):
1621
    """Check prerequisites.
1622

1623
    This checks:
1624
     - the new node is not already in the config
1625
     - it is resolvable
1626
     - its parameters (single/dual homed) matches the cluster
1627

1628
    Any errors are signalled by raising errors.OpPrereqError.
1629

1630
    """
1631
    node_name = self.op.node_name
1632
    cfg = self.cfg
1633

    
1634
    dns_data = utils.HostInfo(node_name)
1635

    
1636
    node = dns_data.name
1637
    primary_ip = self.op.primary_ip = dns_data.ip
1638
    secondary_ip = getattr(self.op, "secondary_ip", None)
1639
    if secondary_ip is None:
1640
      secondary_ip = primary_ip
1641
    if not utils.IsValidIP(secondary_ip):
1642
      raise errors.OpPrereqError("Invalid secondary IP given")
1643
    self.op.secondary_ip = secondary_ip
1644

    
1645
    node_list = cfg.GetNodeList()
1646
    if not self.op.readd and node in node_list:
1647
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1648
                                 node)
1649
    elif self.op.readd and node not in node_list:
1650
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1651

    
1652
    for existing_node_name in node_list:
1653
      existing_node = cfg.GetNodeInfo(existing_node_name)
1654

    
1655
      if self.op.readd and node == existing_node_name:
1656
        if (existing_node.primary_ip != primary_ip or
1657
            existing_node.secondary_ip != secondary_ip):
1658
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1659
                                     " address configuration as before")
1660
        continue
1661

    
1662
      if (existing_node.primary_ip == primary_ip or
1663
          existing_node.secondary_ip == primary_ip or
1664
          existing_node.primary_ip == secondary_ip or
1665
          existing_node.secondary_ip == secondary_ip):
1666
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1667
                                   " existing node %s" % existing_node.name)
1668

    
1669
    # check that the type of the node (single versus dual homed) is the
1670
    # same as for the master
1671
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1672
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1673
    newbie_singlehomed = secondary_ip == primary_ip
1674
    if master_singlehomed != newbie_singlehomed:
1675
      if master_singlehomed:
1676
        raise errors.OpPrereqError("The master has no private ip but the"
1677
                                   " new node has one")
1678
      else:
1679
        raise errors.OpPrereqError("The master has a private ip but the"
1680
                                   " new node doesn't have one")
1681

    
1682
    # checks reachablity
1683
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1684
      raise errors.OpPrereqError("Node not reachable by ping")
1685

    
1686
    if not newbie_singlehomed:
1687
      # check reachability from my secondary ip to newbie's secondary ip
1688
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1689
                           source=myself.secondary_ip):
1690
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1691
                                   " based ping to noded port")
1692

    
1693
    self.new_node = objects.Node(name=node,
1694
                                 primary_ip=primary_ip,
1695
                                 secondary_ip=secondary_ip)
1696

    
1697
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1698
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1699
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1700
                                   constants.VNC_PASSWORD_FILE)
1701

    
1702
  def Exec(self, feedback_fn):
1703
    """Adds the new node to the cluster.
1704

1705
    """
1706
    new_node = self.new_node
1707
    node = new_node.name
1708

    
1709
    # set up inter-node password and certificate and restarts the node daemon
1710
    gntpass = self.sstore.GetNodeDaemonPassword()
1711
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1712
      raise errors.OpExecError("ganeti password corruption detected")
1713
    f = open(constants.SSL_CERT_FILE)
1714
    try:
1715
      gntpem = f.read(8192)
1716
    finally:
1717
      f.close()
1718
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1719
    # so we use this to detect an invalid certificate; as long as the
1720
    # cert doesn't contain this, the here-document will be correctly
1721
    # parsed by the shell sequence below
1722
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1723
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1724
    if not gntpem.endswith("\n"):
1725
      raise errors.OpExecError("PEM must end with newline")
1726
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1727

    
1728
    # and then connect with ssh to set password and start ganeti-noded
1729
    # note that all the below variables are sanitized at this point,
1730
    # either by being constants or by the checks above
1731
    ss = self.sstore
1732
    mycommand = ("umask 077 && "
1733
                 "echo '%s' > '%s' && "
1734
                 "cat > '%s' << '!EOF.' && \n"
1735
                 "%s!EOF.\n%s restart" %
1736
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1737
                  constants.SSL_CERT_FILE, gntpem,
1738
                  constants.NODE_INITD_SCRIPT))
1739

    
1740
    result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1741
    if result.failed:
1742
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1743
                               " output: %s" %
1744
                               (node, result.fail_reason, result.output))
1745

    
1746
    # check connectivity
1747
    time.sleep(4)
1748

    
1749
    result = rpc.call_version([node])[node]
1750
    if result:
1751
      if constants.PROTOCOL_VERSION == result:
1752
        logger.Info("communication to node %s fine, sw version %s match" %
1753
                    (node, result))
1754
      else:
1755
        raise errors.OpExecError("Version mismatch master version %s,"
1756
                                 " node version %s" %
1757
                                 (constants.PROTOCOL_VERSION, result))
1758
    else:
1759
      raise errors.OpExecError("Cannot get version from the new node")
1760

    
1761
    # setup ssh on node
1762
    logger.Info("copy ssh key to node %s" % node)
1763
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1764
    keyarray = []
1765
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1766
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1767
                priv_key, pub_key]
1768

    
1769
    for i in keyfiles:
1770
      f = open(i, 'r')
1771
      try:
1772
        keyarray.append(f.read())
1773
      finally:
1774
        f.close()
1775

    
1776
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1777
                               keyarray[3], keyarray[4], keyarray[5])
1778

    
1779
    if not result:
1780
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1781

    
1782
    # Add node to our /etc/hosts, and add key to known_hosts
1783
    _AddHostToEtcHosts(new_node.name)
1784

    
1785
    if new_node.secondary_ip != new_node.primary_ip:
1786
      if not rpc.call_node_tcp_ping(new_node.name,
1787
                                    constants.LOCALHOST_IP_ADDRESS,
1788
                                    new_node.secondary_ip,
1789
                                    constants.DEFAULT_NODED_PORT,
1790
                                    10, False):
1791
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1792
                                 " you gave (%s). Please fix and re-run this"
1793
                                 " command." % new_node.secondary_ip)
1794

    
1795
    success, msg = self.ssh.VerifyNodeHostname(node)
1796
    if not success:
1797
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1798
                               " than the one the resolver gives: %s."
1799
                               " Please fix and re-run this command." %
1800
                               (node, msg))
1801

    
1802
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1803
    # including the node just added
1804
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1805
    dist_nodes = self.cfg.GetNodeList() + [node]
1806
    if myself.name in dist_nodes:
1807
      dist_nodes.remove(myself.name)
1808

    
1809
    logger.Debug("Copying hosts and known_hosts to all nodes")
1810
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1811
      result = rpc.call_upload_file(dist_nodes, fname)
1812
      for to_node in dist_nodes:
1813
        if not result[to_node]:
1814
          logger.Error("copy of file %s to node %s failed" %
1815
                       (fname, to_node))
1816

    
1817
    to_copy = ss.GetFileList()
1818
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1819
      to_copy.append(constants.VNC_PASSWORD_FILE)
1820
    for fname in to_copy:
1821
      if not self.ssh.CopyFileToNode(node, fname):
1822
        logger.Error("could not copy file %s to node %s" % (fname, node))
1823

    
1824
    if not self.op.readd:
1825
      logger.Info("adding node %s to cluster.conf" % node)
1826
      self.cfg.AddNode(new_node)
1827

    
1828

    
1829
class LUMasterFailover(LogicalUnit):
1830
  """Failover the master node to the current node.
1831

1832
  This is a special LU in that it must run on a non-master node.
1833

1834
  """
1835
  HPATH = "master-failover"
1836
  HTYPE = constants.HTYPE_CLUSTER
1837
  REQ_MASTER = False
1838
  _OP_REQP = []
1839

    
1840
  def BuildHooksEnv(self):
1841
    """Build hooks env.
1842

1843
    This will run on the new master only in the pre phase, and on all
1844
    the nodes in the post phase.
1845

1846
    """
1847
    env = {
1848
      "OP_TARGET": self.new_master,
1849
      "NEW_MASTER": self.new_master,
1850
      "OLD_MASTER": self.old_master,
1851
      }
1852
    return env, [self.new_master], self.cfg.GetNodeList()
1853

    
1854
  def CheckPrereq(self):
1855
    """Check prerequisites.
1856

1857
    This checks that we are not already the master.
1858

1859
    """
1860
    self.new_master = utils.HostInfo().name
1861
    self.old_master = self.sstore.GetMasterNode()
1862

    
1863
    if self.old_master == self.new_master:
1864
      raise errors.OpPrereqError("This commands must be run on the node"
1865
                                 " where you want the new master to be."
1866
                                 " %s is already the master" %
1867
                                 self.old_master)
1868

    
1869
  def Exec(self, feedback_fn):
1870
    """Failover the master node.
1871

1872
    This command, when run on a non-master node, will cause the current
1873
    master to cease being master, and the non-master to become new
1874
    master.
1875

1876
    """
1877
    #TODO: do not rely on gethostname returning the FQDN
1878
    logger.Info("setting master to %s, old master: %s" %
1879
                (self.new_master, self.old_master))
1880

    
1881
    if not rpc.call_node_stop_master(self.old_master):
1882
      logger.Error("could disable the master role on the old master"
1883
                   " %s, please disable manually" % self.old_master)
1884

    
1885
    ss = self.sstore
1886
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1887
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1888
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1889
      logger.Error("could not distribute the new simple store master file"
1890
                   " to the other nodes, please check.")
1891

    
1892
    if not rpc.call_node_start_master(self.new_master):
1893
      logger.Error("could not start the master role on the new master"
1894
                   " %s, please check" % self.new_master)
1895
      feedback_fn("Error in activating the master IP on the new master,"
1896
                  " please fix manually.")
1897

    
1898

    
1899

    
1900
class LUQueryClusterInfo(NoHooksLU):
1901
  """Query cluster configuration.
1902

1903
  """
1904
  _OP_REQP = []
1905
  REQ_MASTER = False
1906

    
1907
  def CheckPrereq(self):
1908
    """No prerequsites needed for this LU.
1909

1910
    """
1911
    pass
1912

    
1913
  def Exec(self, feedback_fn):
1914
    """Return cluster config.
1915

1916
    """
1917
    result = {
1918
      "name": self.sstore.GetClusterName(),
1919
      "software_version": constants.RELEASE_VERSION,
1920
      "protocol_version": constants.PROTOCOL_VERSION,
1921
      "config_version": constants.CONFIG_VERSION,
1922
      "os_api_version": constants.OS_API_VERSION,
1923
      "export_version": constants.EXPORT_VERSION,
1924
      "master": self.sstore.GetMasterNode(),
1925
      "architecture": (platform.architecture()[0], platform.machine()),
1926
      }
1927

    
1928
    return result
1929

    
1930

    
1931
class LUClusterCopyFile(NoHooksLU):
1932
  """Copy file to cluster.
1933

1934
  """
1935
  _OP_REQP = ["nodes", "filename"]
1936

    
1937
  def CheckPrereq(self):
1938
    """Check prerequisites.
1939

1940
    It should check that the named file exists and that the given list
1941
    of nodes is valid.
1942

1943
    """
1944
    if not os.path.exists(self.op.filename):
1945
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1946

    
1947
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1948

    
1949
  def Exec(self, feedback_fn):
1950
    """Copy a file from master to some nodes.
1951

1952
    Args:
1953
      opts - class with options as members
1954
      args - list containing a single element, the file name
1955
    Opts used:
1956
      nodes - list containing the name of target nodes; if empty, all nodes
1957

1958
    """
1959
    filename = self.op.filename
1960

    
1961
    myname = utils.HostInfo().name
1962

    
1963
    for node in self.nodes:
1964
      if node == myname:
1965
        continue
1966
      if not self.ssh.CopyFileToNode(node, filename):
1967
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1968

    
1969

    
1970
class LUDumpClusterConfig(NoHooksLU):
1971
  """Return a text-representation of the cluster-config.
1972

1973
  """
1974
  _OP_REQP = []
1975

    
1976
  def CheckPrereq(self):
1977
    """No prerequisites.
1978

1979
    """
1980
    pass
1981

    
1982
  def Exec(self, feedback_fn):
1983
    """Dump a representation of the cluster config to the standard output.
1984

1985
    """
1986
    return self.cfg.DumpConfig()
1987

    
1988

    
1989
class LURunClusterCommand(NoHooksLU):
1990
  """Run a command on some nodes.
1991

1992
  """
1993
  _OP_REQP = ["command", "nodes"]
1994

    
1995
  def CheckPrereq(self):
1996
    """Check prerequisites.
1997

1998
    It checks that the given list of nodes is valid.
1999

2000
    """
2001
    self.nodes = _GetWantedNodes(self, self.op.nodes)
2002

    
2003
  def Exec(self, feedback_fn):
2004
    """Run a command on some nodes.
2005

2006
    """
2007
    # put the master at the end of the nodes list
2008
    master_node = self.sstore.GetMasterNode()
2009
    if master_node in self.nodes:
2010
      self.nodes.remove(master_node)
2011
      self.nodes.append(master_node)
2012

    
2013
    data = []
2014
    for node in self.nodes:
2015
      result = self.ssh.Run(node, "root", self.op.command)
2016
      data.append((node, result.output, result.exit_code))
2017

    
2018
    return data
2019

    
2020

    
2021
class LUActivateInstanceDisks(NoHooksLU):
2022
  """Bring up an instance's disks.
2023

2024
  """
2025
  _OP_REQP = ["instance_name"]
2026

    
2027
  def CheckPrereq(self):
2028
    """Check prerequisites.
2029

2030
    This checks that the instance is in the cluster.
2031

2032
    """
2033
    instance = self.cfg.GetInstanceInfo(
2034
      self.cfg.ExpandInstanceName(self.op.instance_name))
2035
    if instance is None:
2036
      raise errors.OpPrereqError("Instance '%s' not known" %
2037
                                 self.op.instance_name)
2038
    self.instance = instance
2039

    
2040

    
2041
  def Exec(self, feedback_fn):
2042
    """Activate the disks.
2043

2044
    """
2045
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2046
    if not disks_ok:
2047
      raise errors.OpExecError("Cannot activate block devices")
2048

    
2049
    return disks_info
2050

    
2051

    
2052
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2053
  """Prepare the block devices for an instance.
2054

2055
  This sets up the block devices on all nodes.
2056

2057
  Args:
2058
    instance: a ganeti.objects.Instance object
2059
    ignore_secondaries: if true, errors on secondary nodes won't result
2060
                        in an error return from the function
2061

2062
  Returns:
2063
    false if the operation failed
2064
    list of (host, instance_visible_name, node_visible_name) if the operation
2065
         suceeded with the mapping from node devices to instance devices
2066
  """
2067
  device_info = []
2068
  disks_ok = True
2069
  iname = instance.name
2070
  # With the two passes mechanism we try to reduce the window of
2071
  # opportunity for the race condition of switching DRBD to primary
2072
  # before handshaking occured, but we do not eliminate it
2073

    
2074
  # The proper fix would be to wait (with some limits) until the
2075
  # connection has been made and drbd transitions from WFConnection
2076
  # into any other network-connected state (Connected, SyncTarget,
2077
  # SyncSource, etc.)
2078

    
2079
  # 1st pass, assemble on all nodes in secondary mode
2080
  for inst_disk in instance.disks:
2081
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2082
      cfg.SetDiskID(node_disk, node)
2083
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2084
      if not result:
2085
        logger.Error("could not prepare block device %s on node %s"
2086
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2087
        if not ignore_secondaries:
2088
          disks_ok = False
2089

    
2090
  # FIXME: race condition on drbd migration to primary
2091

    
2092
  # 2nd pass, do only the primary node
2093
  for inst_disk in instance.disks:
2094
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2095
      if node != instance.primary_node:
2096
        continue
2097
      cfg.SetDiskID(node_disk, node)
2098
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2099
      if not result:
2100
        logger.Error("could not prepare block device %s on node %s"
2101
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2102
        disks_ok = False
2103
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2104

    
2105
  # leave the disks configured for the primary node
2106
  # this is a workaround that would be fixed better by
2107
  # improving the logical/physical id handling
2108
  for disk in instance.disks:
2109
    cfg.SetDiskID(disk, instance.primary_node)
2110

    
2111
  return disks_ok, device_info
2112

    
2113

    
2114
def _StartInstanceDisks(cfg, instance, force):
2115
  """Start the disks of an instance.
2116

2117
  """
2118
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2119
                                           ignore_secondaries=force)
2120
  if not disks_ok:
2121
    _ShutdownInstanceDisks(instance, cfg)
2122
    if force is not None and not force:
2123
      logger.Error("If the message above refers to a secondary node,"
2124
                   " you can retry the operation using '--force'.")
2125
    raise errors.OpExecError("Disk consistency error")
2126

    
2127

    
2128
class LUDeactivateInstanceDisks(NoHooksLU):
2129
  """Shutdown an instance's disks.
2130

2131
  """
2132
  _OP_REQP = ["instance_name"]
2133

    
2134
  def CheckPrereq(self):
2135
    """Check prerequisites.
2136

2137
    This checks that the instance is in the cluster.
2138

2139
    """
2140
    instance = self.cfg.GetInstanceInfo(
2141
      self.cfg.ExpandInstanceName(self.op.instance_name))
2142
    if instance is None:
2143
      raise errors.OpPrereqError("Instance '%s' not known" %
2144
                                 self.op.instance_name)
2145
    self.instance = instance
2146

    
2147
  def Exec(self, feedback_fn):
2148
    """Deactivate the disks
2149

2150
    """
2151
    instance = self.instance
2152
    ins_l = rpc.call_instance_list([instance.primary_node])
2153
    ins_l = ins_l[instance.primary_node]
2154
    if not type(ins_l) is list:
2155
      raise errors.OpExecError("Can't contact node '%s'" %
2156
                               instance.primary_node)
2157

    
2158
    if self.instance.name in ins_l:
2159
      raise errors.OpExecError("Instance is running, can't shutdown"
2160
                               " block devices.")
2161

    
2162
    _ShutdownInstanceDisks(instance, self.cfg)
2163

    
2164

    
2165
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2166
  """Shutdown block devices of an instance.
2167

2168
  This does the shutdown on all nodes of the instance.
2169

2170
  If the ignore_primary is false, errors on the primary node are
2171
  ignored.
2172

2173
  """
2174
  result = True
2175
  for disk in instance.disks:
2176
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2177
      cfg.SetDiskID(top_disk, node)
2178
      if not rpc.call_blockdev_shutdown(node, top_disk):
2179
        logger.Error("could not shutdown block device %s on node %s" %
2180
                     (disk.iv_name, node))
2181
        if not ignore_primary or node != instance.primary_node:
2182
          result = False
2183
  return result
2184

    
2185

    
2186
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2187
  """Checks if a node has enough free memory.
2188

2189
  This function check if a given node has the needed amount of free
2190
  memory. In case the node has less memory or we cannot get the
2191
  information from the node, this function raise an OpPrereqError
2192
  exception.
2193

2194
  Args:
2195
    - cfg: a ConfigWriter instance
2196
    - node: the node name
2197
    - reason: string to use in the error message
2198
    - requested: the amount of memory in MiB
2199

2200
  """
2201
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2202
  if not nodeinfo or not isinstance(nodeinfo, dict):
2203
    raise errors.OpPrereqError("Could not contact node %s for resource"
2204
                             " information" % (node,))
2205

    
2206
  free_mem = nodeinfo[node].get('memory_free')
2207
  if not isinstance(free_mem, int):
2208
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2209
                             " was '%s'" % (node, free_mem))
2210
  if requested > free_mem:
2211
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2212
                             " needed %s MiB, available %s MiB" %
2213
                             (node, reason, requested, free_mem))
2214

    
2215

    
2216
class LUStartupInstance(LogicalUnit):
2217
  """Starts an instance.
2218

2219
  """
2220
  HPATH = "instance-start"
2221
  HTYPE = constants.HTYPE_INSTANCE
2222
  _OP_REQP = ["instance_name", "force"]
2223

    
2224
  def BuildHooksEnv(self):
2225
    """Build hooks env.
2226

2227
    This runs on master, primary and secondary nodes of the instance.
2228

2229
    """
2230
    env = {
2231
      "FORCE": self.op.force,
2232
      }
2233
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2234
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2235
          list(self.instance.secondary_nodes))
2236
    return env, nl, nl
2237

    
2238
  def CheckPrereq(self):
2239
    """Check prerequisites.
2240

2241
    This checks that the instance is in the cluster.
2242

2243
    """
2244
    instance = self.cfg.GetInstanceInfo(
2245
      self.cfg.ExpandInstanceName(self.op.instance_name))
2246
    if instance is None:
2247
      raise errors.OpPrereqError("Instance '%s' not known" %
2248
                                 self.op.instance_name)
2249

    
2250
    # check bridges existance
2251
    _CheckInstanceBridgesExist(instance)
2252

    
2253
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2254
                         "starting instance %s" % instance.name,
2255
                         instance.memory)
2256

    
2257
    self.instance = instance
2258
    self.op.instance_name = instance.name
2259

    
2260
  def Exec(self, feedback_fn):
2261
    """Start the instance.
2262

2263
    """
2264
    instance = self.instance
2265
    force = self.op.force
2266
    extra_args = getattr(self.op, "extra_args", "")
2267

    
2268
    self.cfg.MarkInstanceUp(instance.name)
2269

    
2270
    node_current = instance.primary_node
2271

    
2272
    _StartInstanceDisks(self.cfg, instance, force)
2273

    
2274
    if not rpc.call_instance_start(node_current, instance, extra_args):
2275
      _ShutdownInstanceDisks(instance, self.cfg)
2276
      raise errors.OpExecError("Could not start instance")
2277

    
2278

    
2279
class LURebootInstance(LogicalUnit):
2280
  """Reboot an instance.
2281

2282
  """
2283
  HPATH = "instance-reboot"
2284
  HTYPE = constants.HTYPE_INSTANCE
2285
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2286

    
2287
  def BuildHooksEnv(self):
2288
    """Build hooks env.
2289

2290
    This runs on master, primary and secondary nodes of the instance.
2291

2292
    """
2293
    env = {
2294
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2295
      }
2296
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2297
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2298
          list(self.instance.secondary_nodes))
2299
    return env, nl, nl
2300

    
2301
  def CheckPrereq(self):
2302
    """Check prerequisites.
2303

2304
    This checks that the instance is in the cluster.
2305

2306
    """
2307
    instance = self.cfg.GetInstanceInfo(
2308
      self.cfg.ExpandInstanceName(self.op.instance_name))
2309
    if instance is None:
2310
      raise errors.OpPrereqError("Instance '%s' not known" %
2311
                                 self.op.instance_name)
2312

    
2313
    # check bridges existance
2314
    _CheckInstanceBridgesExist(instance)
2315

    
2316
    self.instance = instance
2317
    self.op.instance_name = instance.name
2318

    
2319
  def Exec(self, feedback_fn):
2320
    """Reboot the instance.
2321

2322
    """
2323
    instance = self.instance
2324
    ignore_secondaries = self.op.ignore_secondaries
2325
    reboot_type = self.op.reboot_type
2326
    extra_args = getattr(self.op, "extra_args", "")
2327

    
2328
    node_current = instance.primary_node
2329

    
2330
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2331
                           constants.INSTANCE_REBOOT_HARD,
2332
                           constants.INSTANCE_REBOOT_FULL]:
2333
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2334
                                  (constants.INSTANCE_REBOOT_SOFT,
2335
                                   constants.INSTANCE_REBOOT_HARD,
2336
                                   constants.INSTANCE_REBOOT_FULL))
2337

    
2338
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2339
                       constants.INSTANCE_REBOOT_HARD]:
2340
      if not rpc.call_instance_reboot(node_current, instance,
2341
                                      reboot_type, extra_args):
2342
        raise errors.OpExecError("Could not reboot instance")
2343
    else:
2344
      if not rpc.call_instance_shutdown(node_current, instance):
2345
        raise errors.OpExecError("could not shutdown instance for full reboot")
2346
      _ShutdownInstanceDisks(instance, self.cfg)
2347
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2348
      if not rpc.call_instance_start(node_current, instance, extra_args):
2349
        _ShutdownInstanceDisks(instance, self.cfg)
2350
        raise errors.OpExecError("Could not start instance for full reboot")
2351

    
2352
    self.cfg.MarkInstanceUp(instance.name)
2353

    
2354

    
2355
class LUShutdownInstance(LogicalUnit):
2356
  """Shutdown an instance.
2357

2358
  """
2359
  HPATH = "instance-stop"
2360
  HTYPE = constants.HTYPE_INSTANCE
2361
  _OP_REQP = ["instance_name"]
2362

    
2363
  def BuildHooksEnv(self):
2364
    """Build hooks env.
2365

2366
    This runs on master, primary and secondary nodes of the instance.
2367

2368
    """
2369
    env = _BuildInstanceHookEnvByObject(self.instance)
2370
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2371
          list(self.instance.secondary_nodes))
2372
    return env, nl, nl
2373

    
2374
  def CheckPrereq(self):
2375
    """Check prerequisites.
2376

2377
    This checks that the instance is in the cluster.
2378

2379
    """
2380
    instance = self.cfg.GetInstanceInfo(
2381
      self.cfg.ExpandInstanceName(self.op.instance_name))
2382
    if instance is None:
2383
      raise errors.OpPrereqError("Instance '%s' not known" %
2384
                                 self.op.instance_name)
2385
    self.instance = instance
2386

    
2387
  def Exec(self, feedback_fn):
2388
    """Shutdown the instance.
2389

2390
    """
2391
    instance = self.instance
2392
    node_current = instance.primary_node
2393
    self.cfg.MarkInstanceDown(instance.name)
2394
    if not rpc.call_instance_shutdown(node_current, instance):
2395
      logger.Error("could not shutdown instance")
2396

    
2397
    _ShutdownInstanceDisks(instance, self.cfg)
2398

    
2399

    
2400
class LUReinstallInstance(LogicalUnit):
2401
  """Reinstall an instance.
2402

2403
  """
2404
  HPATH = "instance-reinstall"
2405
  HTYPE = constants.HTYPE_INSTANCE
2406
  _OP_REQP = ["instance_name"]
2407

    
2408
  def BuildHooksEnv(self):
2409
    """Build hooks env.
2410

2411
    This runs on master, primary and secondary nodes of the instance.
2412

2413
    """
2414
    env = _BuildInstanceHookEnvByObject(self.instance)
2415
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2416
          list(self.instance.secondary_nodes))
2417
    return env, nl, nl
2418

    
2419
  def CheckPrereq(self):
2420
    """Check prerequisites.
2421

2422
    This checks that the instance is in the cluster and is not running.
2423

2424
    """
2425
    instance = self.cfg.GetInstanceInfo(
2426
      self.cfg.ExpandInstanceName(self.op.instance_name))
2427
    if instance is None:
2428
      raise errors.OpPrereqError("Instance '%s' not known" %
2429
                                 self.op.instance_name)
2430
    if instance.disk_template == constants.DT_DISKLESS:
2431
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2432
                                 self.op.instance_name)
2433
    if instance.status != "down":
2434
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2435
                                 self.op.instance_name)
2436
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2437
    if remote_info:
2438
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2439
                                 (self.op.instance_name,
2440
                                  instance.primary_node))
2441

    
2442
    self.op.os_type = getattr(self.op, "os_type", None)
2443
    if self.op.os_type is not None:
2444
      # OS verification
2445
      pnode = self.cfg.GetNodeInfo(
2446
        self.cfg.ExpandNodeName(instance.primary_node))
2447
      if pnode is None:
2448
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2449
                                   self.op.pnode)
2450
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2451
      if not os_obj:
2452
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2453
                                   " primary node"  % self.op.os_type)
2454

    
2455
    self.instance = instance
2456

    
2457
  def Exec(self, feedback_fn):
2458
    """Reinstall the instance.
2459

2460
    """
2461
    inst = self.instance
2462

    
2463
    if self.op.os_type is not None:
2464
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2465
      inst.os = self.op.os_type
2466
      self.cfg.AddInstance(inst)
2467

    
2468
    _StartInstanceDisks(self.cfg, inst, None)
2469
    try:
2470
      feedback_fn("Running the instance OS create scripts...")
2471
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2472
        raise errors.OpExecError("Could not install OS for instance %s"
2473
                                 " on node %s" %
2474
                                 (inst.name, inst.primary_node))
2475
    finally:
2476
      _ShutdownInstanceDisks(inst, self.cfg)
2477

    
2478

    
2479
class LURenameInstance(LogicalUnit):
2480
  """Rename an instance.
2481

2482
  """
2483
  HPATH = "instance-rename"
2484
  HTYPE = constants.HTYPE_INSTANCE
2485
  _OP_REQP = ["instance_name", "new_name"]
2486

    
2487
  def BuildHooksEnv(self):
2488
    """Build hooks env.
2489

2490
    This runs on master, primary and secondary nodes of the instance.
2491

2492
    """
2493
    env = _BuildInstanceHookEnvByObject(self.instance)
2494
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2495
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2496
          list(self.instance.secondary_nodes))
2497
    return env, nl, nl
2498

    
2499
  def CheckPrereq(self):
2500
    """Check prerequisites.
2501

2502
    This checks that the instance is in the cluster and is not running.
2503

2504
    """
2505
    instance = self.cfg.GetInstanceInfo(
2506
      self.cfg.ExpandInstanceName(self.op.instance_name))
2507
    if instance is None:
2508
      raise errors.OpPrereqError("Instance '%s' not known" %
2509
                                 self.op.instance_name)
2510
    if instance.status != "down":
2511
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2512
                                 self.op.instance_name)
2513
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2514
    if remote_info:
2515
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2516
                                 (self.op.instance_name,
2517
                                  instance.primary_node))
2518
    self.instance = instance
2519

    
2520
    # new name verification
2521
    name_info = utils.HostInfo(self.op.new_name)
2522

    
2523
    self.op.new_name = new_name = name_info.name
2524
    instance_list = self.cfg.GetInstanceList()
2525
    if new_name in instance_list:
2526
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2527
                                 new_name)
2528

    
2529
    if not getattr(self.op, "ignore_ip", False):
2530
      command = ["fping", "-q", name_info.ip]
2531
      result = utils.RunCmd(command)
2532
      if not result.failed:
2533
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2534
                                   (name_info.ip, new_name))
2535

    
2536

    
2537
  def Exec(self, feedback_fn):
2538
    """Reinstall the instance.
2539

2540
    """
2541
    inst = self.instance
2542
    old_name = inst.name
2543

    
2544
    if inst.disk_template == constants.DT_FILE:
2545
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2546

    
2547
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2548

    
2549
    # re-read the instance from the configuration after rename
2550
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2551

    
2552
    if inst.disk_template == constants.DT_FILE:
2553
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2554
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2555
                                                old_file_storage_dir,
2556
                                                new_file_storage_dir)
2557

    
2558
      if not result:
2559
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2560
                                 " directory '%s' to '%s' (but the instance"
2561
                                 " has been renamed in Ganeti)" % (
2562
                                 inst.primary_node, old_file_storage_dir,
2563
                                 new_file_storage_dir))
2564

    
2565
      if not result[0]:
2566
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2567
                                 " (but the instance has been renamed in"
2568
                                 " Ganeti)" % (old_file_storage_dir,
2569
                                               new_file_storage_dir))
2570

    
2571
    _StartInstanceDisks(self.cfg, inst, None)
2572
    try:
2573
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2574
                                          "sda", "sdb"):
2575
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2576
               " instance has been renamed in Ganeti)" %
2577
               (inst.name, inst.primary_node))
2578
        logger.Error(msg)
2579
    finally:
2580
      _ShutdownInstanceDisks(inst, self.cfg)
2581

    
2582

    
2583
class LURemoveInstance(LogicalUnit):
2584
  """Remove an instance.
2585

2586
  """
2587
  HPATH = "instance-remove"
2588
  HTYPE = constants.HTYPE_INSTANCE
2589
  _OP_REQP = ["instance_name"]
2590

    
2591
  def BuildHooksEnv(self):
2592
    """Build hooks env.
2593

2594
    This runs on master, primary and secondary nodes of the instance.
2595

2596
    """
2597
    env = _BuildInstanceHookEnvByObject(self.instance)
2598
    nl = [self.sstore.GetMasterNode()]
2599
    return env, nl, nl
2600

    
2601
  def CheckPrereq(self):
2602
    """Check prerequisites.
2603

2604
    This checks that the instance is in the cluster.
2605

2606
    """
2607
    instance = self.cfg.GetInstanceInfo(
2608
      self.cfg.ExpandInstanceName(self.op.instance_name))
2609
    if instance is None:
2610
      raise errors.OpPrereqError("Instance '%s' not known" %
2611
                                 self.op.instance_name)
2612
    self.instance = instance
2613

    
2614
  def Exec(self, feedback_fn):
2615
    """Remove the instance.
2616

2617
    """
2618
    instance = self.instance
2619
    logger.Info("shutting down instance %s on node %s" %
2620
                (instance.name, instance.primary_node))
2621

    
2622
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2623
      if self.op.ignore_failures:
2624
        feedback_fn("Warning: can't shutdown instance")
2625
      else:
2626
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2627
                                 (instance.name, instance.primary_node))
2628

    
2629
    logger.Info("removing block devices for instance %s" % instance.name)
2630

    
2631
    if not _RemoveDisks(instance, self.cfg):
2632
      if self.op.ignore_failures:
2633
        feedback_fn("Warning: can't remove instance's disks")
2634
      else:
2635
        raise errors.OpExecError("Can't remove instance's disks")
2636

    
2637
    logger.Info("removing instance %s out of cluster config" % instance.name)
2638

    
2639
    self.cfg.RemoveInstance(instance.name)
2640

    
2641

    
2642
class LUQueryInstances(NoHooksLU):
2643
  """Logical unit for querying instances.
2644

2645
  """
2646
  _OP_REQP = ["output_fields", "names"]
2647

    
2648
  def CheckPrereq(self):
2649
    """Check prerequisites.
2650

2651
    This checks that the fields required are valid output fields.
2652

2653
    """
2654
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2655
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2656
                               "admin_state", "admin_ram",
2657
                               "disk_template", "ip", "mac", "bridge",
2658
                               "sda_size", "sdb_size", "vcpus"],
2659
                       dynamic=self.dynamic_fields,
2660
                       selected=self.op.output_fields)
2661

    
2662
    self.wanted = _GetWantedInstances(self, self.op.names)
2663

    
2664
  def Exec(self, feedback_fn):
2665
    """Computes the list of nodes and their attributes.
2666

2667
    """
2668
    instance_names = self.wanted
2669
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2670
                     in instance_names]
2671

    
2672
    # begin data gathering
2673

    
2674
    nodes = frozenset([inst.primary_node for inst in instance_list])
2675

    
2676
    bad_nodes = []
2677
    if self.dynamic_fields.intersection(self.op.output_fields):
2678
      live_data = {}
2679
      node_data = rpc.call_all_instances_info(nodes)
2680
      for name in nodes:
2681
        result = node_data[name]
2682
        if result:
2683
          live_data.update(result)
2684
        elif result == False:
2685
          bad_nodes.append(name)
2686
        # else no instance is alive
2687
    else:
2688
      live_data = dict([(name, {}) for name in instance_names])
2689

    
2690
    # end data gathering
2691

    
2692
    output = []
2693
    for instance in instance_list:
2694
      iout = []
2695
      for field in self.op.output_fields:
2696
        if field == "name":
2697
          val = instance.name
2698
        elif field == "os":
2699
          val = instance.os
2700
        elif field == "pnode":
2701
          val = instance.primary_node
2702
        elif field == "snodes":
2703
          val = list(instance.secondary_nodes)
2704
        elif field == "admin_state":
2705
          val = (instance.status != "down")
2706
        elif field == "oper_state":
2707
          if instance.primary_node in bad_nodes:
2708
            val = None
2709
          else:
2710
            val = bool(live_data.get(instance.name))
2711
        elif field == "status":
2712
          if instance.primary_node in bad_nodes:
2713
            val = "ERROR_nodedown"
2714
          else:
2715
            running = bool(live_data.get(instance.name))
2716
            if running:
2717
              if instance.status != "down":
2718
                val = "running"
2719
              else:
2720
                val = "ERROR_up"
2721
            else:
2722
              if instance.status != "down":
2723
                val = "ERROR_down"
2724
              else:
2725
                val = "ADMIN_down"
2726
        elif field == "admin_ram":
2727
          val = instance.memory
2728
        elif field == "oper_ram":
2729
          if instance.primary_node in bad_nodes:
2730
            val = None
2731
          elif instance.name in live_data:
2732
            val = live_data[instance.name].get("memory", "?")
2733
          else:
2734
            val = "-"
2735
        elif field == "disk_template":
2736
          val = instance.disk_template
2737
        elif field == "ip":
2738
          val = instance.nics[0].ip
2739
        elif field == "bridge":
2740
          val = instance.nics[0].bridge
2741
        elif field == "mac":
2742
          val = instance.nics[0].mac
2743
        elif field == "sda_size" or field == "sdb_size":
2744
          disk = instance.FindDisk(field[:3])
2745
          if disk is None:
2746
            val = None
2747
          else:
2748
            val = disk.size
2749
        elif field == "vcpus":
2750
          val = instance.vcpus
2751
        else:
2752
          raise errors.ParameterError(field)
2753
        iout.append(val)
2754
      output.append(iout)
2755

    
2756
    return output
2757

    
2758

    
2759
class LUFailoverInstance(LogicalUnit):
2760
  """Failover an instance.
2761

2762
  """
2763
  HPATH = "instance-failover"
2764
  HTYPE = constants.HTYPE_INSTANCE
2765
  _OP_REQP = ["instance_name", "ignore_consistency"]
2766

    
2767
  def BuildHooksEnv(self):
2768
    """Build hooks env.
2769

2770
    This runs on master, primary and secondary nodes of the instance.
2771

2772
    """
2773
    env = {
2774
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2775
      }
2776
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2777
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2778
    return env, nl, nl
2779

    
2780
  def CheckPrereq(self):
2781
    """Check prerequisites.
2782

2783
    This checks that the instance is in the cluster.
2784

2785
    """
2786
    instance = self.cfg.GetInstanceInfo(
2787
      self.cfg.ExpandInstanceName(self.op.instance_name))
2788
    if instance is None:
2789
      raise errors.OpPrereqError("Instance '%s' not known" %
2790
                                 self.op.instance_name)
2791

    
2792
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2793
      raise errors.OpPrereqError("Instance's disk layout is not"
2794
                                 " network mirrored, cannot failover.")
2795

    
2796
    secondary_nodes = instance.secondary_nodes
2797
    if not secondary_nodes:
2798
      raise errors.ProgrammerError("no secondary node but using "
2799
                                   "DT_REMOTE_RAID1 template")
2800

    
2801
    target_node = secondary_nodes[0]
2802
    # check memory requirements on the secondary node
2803
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2804
                         instance.name, instance.memory)
2805

    
2806
    # check bridge existance
2807
    brlist = [nic.bridge for nic in instance.nics]
2808
    if not rpc.call_bridges_exist(target_node, brlist):
2809
      raise errors.OpPrereqError("One or more target bridges %s does not"
2810
                                 " exist on destination node '%s'" %
2811
                                 (brlist, target_node))
2812

    
2813
    self.instance = instance
2814

    
2815
  def Exec(self, feedback_fn):
2816
    """Failover an instance.
2817

2818
    The failover is done by shutting it down on its present node and
2819
    starting it on the secondary.
2820

2821
    """
2822
    instance = self.instance
2823

    
2824
    source_node = instance.primary_node
2825
    target_node = instance.secondary_nodes[0]
2826

    
2827
    feedback_fn("* checking disk consistency between source and target")
2828
    for dev in instance.disks:
2829
      # for remote_raid1, these are md over drbd
2830
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2831
        if instance.status == "up" and not self.op.ignore_consistency:
2832
          raise errors.OpExecError("Disk %s is degraded on target node,"
2833
                                   " aborting failover." % dev.iv_name)
2834

    
2835
    feedback_fn("* shutting down instance on source node")
2836
    logger.Info("Shutting down instance %s on node %s" %
2837
                (instance.name, source_node))
2838

    
2839
    if not rpc.call_instance_shutdown(source_node, instance):
2840
      if self.op.ignore_consistency:
2841
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2842
                     " anyway. Please make sure node %s is down"  %
2843
                     (instance.name, source_node, source_node))
2844
      else:
2845
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2846
                                 (instance.name, source_node))
2847

    
2848
    feedback_fn("* deactivating the instance's disks on source node")
2849
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2850
      raise errors.OpExecError("Can't shut down the instance's disks.")
2851

    
2852
    instance.primary_node = target_node
2853
    # distribute new instance config to the other nodes
2854
    self.cfg.AddInstance(instance)
2855

    
2856
    # Only start the instance if it's marked as up
2857
    if instance.status == "up":
2858
      feedback_fn("* activating the instance's disks on target node")
2859
      logger.Info("Starting instance %s on node %s" %
2860
                  (instance.name, target_node))
2861

    
2862
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2863
                                               ignore_secondaries=True)
2864
      if not disks_ok:
2865
        _ShutdownInstanceDisks(instance, self.cfg)
2866
        raise errors.OpExecError("Can't activate the instance's disks")
2867

    
2868
      feedback_fn("* starting the instance on the target node")
2869
      if not rpc.call_instance_start(target_node, instance, None):
2870
        _ShutdownInstanceDisks(instance, self.cfg)
2871
        raise errors.OpExecError("Could not start instance %s on node %s." %
2872
                                 (instance.name, target_node))
2873

    
2874

    
2875
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2876
  """Create a tree of block devices on the primary node.
2877

2878
  This always creates all devices.
2879

2880
  """
2881
  if device.children:
2882
    for child in device.children:
2883
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2884
        return False
2885

    
2886
  cfg.SetDiskID(device, node)
2887
  new_id = rpc.call_blockdev_create(node, device, device.size,
2888
                                    instance.name, True, info)
2889
  if not new_id:
2890
    return False
2891
  if device.physical_id is None:
2892
    device.physical_id = new_id
2893
  return True
2894

    
2895

    
2896
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2897
  """Create a tree of block devices on a secondary node.
2898

2899
  If this device type has to be created on secondaries, create it and
2900
  all its children.
2901

2902
  If not, just recurse to children keeping the same 'force' value.
2903

2904
  """
2905
  if device.CreateOnSecondary():
2906
    force = True
2907
  if device.children:
2908
    for child in device.children:
2909
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2910
                                        child, force, info):
2911
        return False
2912

    
2913
  if not force:
2914
    return True
2915
  cfg.SetDiskID(device, node)
2916
  new_id = rpc.call_blockdev_create(node, device, device.size,
2917
                                    instance.name, False, info)
2918
  if not new_id:
2919
    return False
2920
  if device.physical_id is None:
2921
    device.physical_id = new_id
2922
  return True
2923

    
2924

    
2925
def _GenerateUniqueNames(cfg, exts):
2926
  """Generate a suitable LV name.
2927

2928
  This will generate a logical volume name for the given instance.
2929

2930
  """
2931
  results = []
2932
  for val in exts:
2933
    new_id = cfg.GenerateUniqueID()
2934
    results.append("%s%s" % (new_id, val))
2935
  return results
2936

    
2937

    
2938
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2939
  """Generate a drbd device complete with its children.
2940

2941
  """
2942
  port = cfg.AllocatePort()
2943
  vgname = cfg.GetVGName()
2944
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2945
                          logical_id=(vgname, names[0]))
2946
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2947
                          logical_id=(vgname, names[1]))
2948
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2949
                          logical_id = (primary, secondary, port),
2950
                          children = [dev_data, dev_meta])
2951
  return drbd_dev
2952

    
2953

    
2954
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2955
  """Generate a drbd8 device complete with its children.
2956

2957
  """
2958
  port = cfg.AllocatePort()
2959
  vgname = cfg.GetVGName()
2960
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2961
                          logical_id=(vgname, names[0]))
2962
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2963
                          logical_id=(vgname, names[1]))
2964
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2965
                          logical_id = (primary, secondary, port),
2966
                          children = [dev_data, dev_meta],
2967
                          iv_name=iv_name)
2968
  return drbd_dev
2969

    
2970

    
2971
def _GenerateDiskTemplate(cfg, template_name,
2972
                          instance_name, primary_node,
2973
                          secondary_nodes, disk_sz, swap_sz,
2974
                          file_storage_dir, file_driver):
2975
  """Generate the entire disk layout for a given template type.
2976

2977
  """
2978
  #TODO: compute space requirements
2979

    
2980
  vgname = cfg.GetVGName()
2981
  if template_name == constants.DT_DISKLESS:
2982
    disks = []
2983
  elif template_name == constants.DT_PLAIN:
2984
    if len(secondary_nodes) != 0:
2985
      raise errors.ProgrammerError("Wrong template configuration")
2986

    
2987
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2988
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2989
                           logical_id=(vgname, names[0]),
2990
                           iv_name = "sda")
2991
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2992
                           logical_id=(vgname, names[1]),
2993
                           iv_name = "sdb")
2994
    disks = [sda_dev, sdb_dev]
2995
  elif template_name == constants.DT_DRBD8:
2996
    if len(secondary_nodes) != 1:
2997
      raise errors.ProgrammerError("Wrong template configuration")
2998
    remote_node = secondary_nodes[0]
2999
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3000
                                       ".sdb_data", ".sdb_meta"])
3001
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3002
                                         disk_sz, names[0:2], "sda")
3003
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3004
                                         swap_sz, names[2:4], "sdb")
3005
    disks = [drbd_sda_dev, drbd_sdb_dev]
3006
  elif template_name == constants.DT_FILE:
3007
    if len(secondary_nodes) != 0:
3008
      raise errors.ProgrammerError("Wrong template configuration")
3009

    
3010
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3011
                                iv_name="sda", logical_id=(file_driver,
3012
                                "%s/sda" % file_storage_dir))
3013
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3014
                                iv_name="sdb", logical_id=(file_driver,
3015
                                "%s/sdb" % file_storage_dir))
3016
    disks = [file_sda_dev, file_sdb_dev]
3017
  else:
3018
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3019
  return disks
3020

    
3021

    
3022
def _GetInstanceInfoText(instance):
3023
  """Compute that text that should be added to the disk's metadata.
3024

3025
  """
3026
  return "originstname+%s" % instance.name
3027

    
3028

    
3029
def _CreateDisks(cfg, instance):
3030
  """Create all disks for an instance.
3031

3032
  This abstracts away some work from AddInstance.
3033

3034
  Args:
3035
    instance: the instance object
3036

3037
  Returns:
3038
    True or False showing the success of the creation process
3039

3040
  """
3041
  info = _GetInstanceInfoText(instance)
3042

    
3043
  if instance.disk_template == constants.DT_FILE:
3044
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3045
    result = rpc.call_file_storage_dir_create(instance.primary_node,
3046
                                              file_storage_dir)
3047

    
3048
    if not result:
3049
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3050
      return False
3051

    
3052
    if not result[0]:
3053
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3054
      return False
3055

    
3056
  for device in instance.disks:
3057
    logger.Info("creating volume %s for instance %s" %
3058
                (device.iv_name, instance.name))
3059
    #HARDCODE
3060
    for secondary_node in instance.secondary_nodes:
3061
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3062
                                        device, False, info):
3063
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3064
                     (device.iv_name, device, secondary_node))
3065
        return False
3066
    #HARDCODE
3067
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3068
                                    instance, device, info):
3069
      logger.Error("failed to create volume %s on primary!" %
3070
                   device.iv_name)
3071
      return False
3072

    
3073
  return True
3074

    
3075

    
3076
def _RemoveDisks(instance, cfg):
3077
  """Remove all disks for an instance.
3078

3079
  This abstracts away some work from `AddInstance()` and
3080
  `RemoveInstance()`. Note that in case some of the devices couldn't
3081
  be removed, the removal will continue with the other ones (compare
3082
  with `_CreateDisks()`).
3083

3084
  Args:
3085
    instance: the instance object
3086

3087
  Returns:
3088
    True or False showing the success of the removal proces
3089

3090
  """
3091
  logger.Info("removing block devices for instance %s" % instance.name)
3092

    
3093
  result = True
3094
  for device in instance.disks:
3095
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3096
      cfg.SetDiskID(disk, node)
3097
      if not rpc.call_blockdev_remove(node, disk):
3098
        logger.Error("could not remove block device %s on node %s,"
3099
                     " continuing anyway" %
3100
                     (device.iv_name, node))
3101
        result = False
3102

    
3103
  if instance.disk_template == constants.DT_FILE:
3104
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3105
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3106
                                            file_storage_dir):
3107
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3108
      result = False
3109

    
3110
  return result
3111

    
3112

    
3113
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3114
  """Compute disk size requirements in the volume group
3115

3116
  This is currently hard-coded for the two-drive layout.
3117

3118
  """
3119
  # Required free disk space as a function of disk and swap space
3120
  req_size_dict = {
3121
    constants.DT_DISKLESS: None,
3122
    constants.DT_PLAIN: disk_size + swap_size,
3123
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3124
    constants.DT_DRBD8: disk_size + swap_size + 256,
3125
    constants.DT_FILE: None,
3126
  }
3127

    
3128
  if disk_template not in req_size_dict:
3129
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3130
                                 " is unknown" %  disk_template)
3131

    
3132
  return req_size_dict[disk_template]
3133

    
3134

    
3135
class LUCreateInstance(LogicalUnit):
3136
  """Create an instance.
3137

3138
  """
3139
  HPATH = "instance-add"
3140
  HTYPE = constants.HTYPE_INSTANCE
3141
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3142
              "disk_template", "swap_size", "mode", "start", "vcpus",
3143
              "wait_for_sync", "ip_check", "mac"]
3144

    
3145
  def _RunAllocator(self):
3146
    """Run the allocator based on input opcode.
3147

3148
    """
3149
    al_data = _IAllocatorGetClusterData(self.cfg, self.sstore)
3150
    disks = [{"size": self.op.disk_size, "mode": "w"},
3151
             {"size": self.op.swap_size, "mode": "w"}]
3152
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3153
             "bridge": self.op.bridge}]
3154
    op = opcodes.OpTestAllocator(name=self.op.instance_name,
3155
                                 disk_template=self.op.disk_template,
3156
                                 tags=[],
3157
                                 os=self.op.os_type,
3158
                                 vcpus=self.op.vcpus,
3159
                                 mem_size=self.op.mem_size,
3160
                                 disks=disks,
3161
                                 nics=nics)
3162

    
3163
    _IAllocatorAddNewInstance(al_data, op)
3164

    
3165
    if _JSON_INDENT is None:
3166
      text = simplejson.dumps(al_data)
3167
    else:
3168
      text = simplejson.dumps(al_data, indent=_JSON_INDENT)
3169

    
3170
    result = _IAllocatorRun(self.op.iallocator, text)
3171

    
3172
    result = _IAllocatorValidateResult(result)
3173

    
3174
    if not result["success"]:
3175
      raise errors.OpPrereqError("Can't compute nodes using"
3176
                                 " iallocator '%s': %s" % (self.op.iallocator,
3177
                                                           result["info"]))
3178
    req_nodes = 1
3179
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3180
      req_nodes += 1
3181

    
3182
    if len(result["nodes"]) != req_nodes:
3183
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3184
                                 " of nodes (%s), required %s" %
3185
                                 (len(result["nodes"]), req_nodes))
3186
    self.op.pnode = result["nodes"][0]
3187
    logger.ToStdout("Selected nodes for the instance: %s" %
3188
                    (", ".join(result["nodes"]),))
3189
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3190
                (self.op.instance_name, self.op.iallocator, result["nodes"]))
3191
    if req_nodes == 2:
3192
      self.op.snode = result["nodes"][1]
3193

    
3194
  def BuildHooksEnv(self):
3195
    """Build hooks env.
3196

3197
    This runs on master, primary and secondary nodes of the instance.
3198

3199
    """
3200
    env = {
3201
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3202
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3203
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3204
      "INSTANCE_ADD_MODE": self.op.mode,
3205
      }
3206
    if self.op.mode == constants.INSTANCE_IMPORT:
3207
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3208
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3209
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3210

    
3211
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3212
      primary_node=self.op.pnode,
3213
      secondary_nodes=self.secondaries,
3214
      status=self.instance_status,
3215
      os_type=self.op.os_type,
3216
      memory=self.op.mem_size,
3217
      vcpus=self.op.vcpus,
3218
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3219
    ))
3220

    
3221
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3222
          self.secondaries)
3223
    return env, nl, nl
3224

    
3225

    
3226
  def CheckPrereq(self):
3227
    """Check prerequisites.
3228

3229
    """
3230
    # set optional parameters to none if they don't exist
3231
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3232
                 "iallocator"]:
3233
      if not hasattr(self.op, attr):
3234
        setattr(self.op, attr, None)
3235

    
3236
    if self.op.mode not in (constants.INSTANCE_CREATE,
3237
                            constants.INSTANCE_IMPORT):
3238
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3239
                                 self.op.mode)
3240

    
3241
    if (not self.cfg.GetVGName() and
3242
        self.op.disk_template not in constants.DTS_NOT_LVM):
3243
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3244
                                 " instances")
3245

    
3246
    if self.op.mode == constants.INSTANCE_IMPORT:
3247
      src_node = getattr(self.op, "src_node", None)
3248
      src_path = getattr(self.op, "src_path", None)
3249
      if src_node is None or src_path is None:
3250
        raise errors.OpPrereqError("Importing an instance requires source"
3251
                                   " node and path options")
3252
      src_node_full = self.cfg.ExpandNodeName(src_node)
3253
      if src_node_full is None:
3254
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3255
      self.op.src_node = src_node = src_node_full
3256

    
3257
      if not os.path.isabs(src_path):
3258
        raise errors.OpPrereqError("The source path must be absolute")
3259

    
3260
      export_info = rpc.call_export_info(src_node, src_path)
3261

    
3262
      if not export_info:
3263
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3264

    
3265
      if not export_info.has_section(constants.INISECT_EXP):
3266
        raise errors.ProgrammerError("Corrupted export config")
3267

    
3268
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3269
      if (int(ei_version) != constants.EXPORT_VERSION):
3270
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3271
                                   (ei_version, constants.EXPORT_VERSION))
3272

    
3273
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3274
        raise errors.OpPrereqError("Can't import instance with more than"
3275
                                   " one data disk")
3276

    
3277
      # FIXME: are the old os-es, disk sizes, etc. useful?
3278
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3279
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3280
                                                         'disk0_dump'))
3281
      self.src_image = diskimage
3282
    else: # INSTANCE_CREATE
3283
      if getattr(self.op, "os_type", None) is None:
3284
        raise errors.OpPrereqError("No guest OS specified")
3285

    
3286
    #### instance parameters check
3287

    
3288
    # disk template and mirror node verification
3289
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3290
      raise errors.OpPrereqError("Invalid disk template name")
3291

    
3292
    # instance name verification
3293
    hostname1 = utils.HostInfo(self.op.instance_name)
3294

    
3295
    self.op.instance_name = instance_name = hostname1.name
3296
    instance_list = self.cfg.GetInstanceList()
3297
    if instance_name in instance_list:
3298
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3299
                                 instance_name)
3300

    
3301
    # ip validity checks
3302
    ip = getattr(self.op, "ip", None)
3303
    if ip is None or ip.lower() == "none":
3304
      inst_ip = None
3305
    elif ip.lower() == "auto":
3306
      inst_ip = hostname1.ip
3307
    else:
3308
      if not utils.IsValidIP(ip):
3309
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3310
                                   " like a valid IP" % ip)
3311
      inst_ip = ip
3312
    self.inst_ip = self.op.ip = inst_ip
3313

    
3314
    if self.op.start and not self.op.ip_check:
3315
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3316
                                 " adding an instance in start mode")
3317

    
3318
    if self.op.ip_check:
3319
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3320
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3321
                                   (hostname1.ip, instance_name))
3322

    
3323
    # MAC address verification
3324
    if self.op.mac != "auto":
3325
      if not utils.IsValidMac(self.op.mac.lower()):
3326
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3327
                                   self.op.mac)
3328

    
3329
    # bridge verification
3330
    bridge = getattr(self.op, "bridge", None)
3331
    if bridge is None:
3332
      self.op.bridge = self.cfg.GetDefBridge()
3333
    else:
3334
      self.op.bridge = bridge
3335

    
3336
    # boot order verification
3337
    if self.op.hvm_boot_order is not None:
3338
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3339
        raise errors.OpPrereqError("invalid boot order specified,"
3340
                                   " must be one or more of [acdn]")
3341
    # file storage checks
3342
    if (self.op.file_driver and
3343
        not self.op.file_driver in constants.FILE_DRIVER):
3344
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3345
                                 self.op.file_driver)
3346

    
3347
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3348
        raise errors.OpPrereqError("File storage directory not a relative"
3349
                                   " path")
3350
    #### allocator run
3351

    
3352
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3353
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3354
                                 " node must be given")
3355

    
3356
    if self.op.iallocator is not None:
3357
      self._RunAllocator()
3358

    
3359
    #### node related checks
3360

    
3361
    # check primary node
3362
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3363
    if pnode is None:
3364
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3365
                                 self.op.pnode)
3366
    self.op.pnode = pnode.name
3367
    self.pnode = pnode
3368
    self.secondaries = []
3369

    
3370
    # mirror node verification
3371
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3372
      if getattr(self.op, "snode", None) is None:
3373
        raise errors.OpPrereqError("The networked disk templates need"
3374
                                   " a mirror node")
3375

    
3376
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3377
      if snode_name is None:
3378
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3379
                                   self.op.snode)
3380
      elif snode_name == pnode.name:
3381
        raise errors.OpPrereqError("The secondary node cannot be"
3382
                                   " the primary node.")
3383
      self.secondaries.append(snode_name)
3384

    
3385
    req_size = _ComputeDiskSize(self.op.disk_template,
3386
                                self.op.disk_size, self.op.swap_size)
3387

    
3388
    # Check lv size requirements
3389
    if req_size is not None:
3390
      nodenames = [pnode.name] + self.secondaries
3391
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3392
      for node in nodenames:
3393
        info = nodeinfo.get(node, None)
3394
        if not info:
3395
          raise errors.OpPrereqError("Cannot get current information"
3396
                                     " from node '%s'" % nodeinfo)
3397
        vg_free = info.get('vg_free', None)
3398
        if not isinstance(vg_free, int):
3399
          raise errors.OpPrereqError("Can't compute free disk space on"
3400
                                     " node %s" % node)
3401
        if req_size > info['vg_free']:
3402
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3403
                                     " %d MB available, %d MB required" %
3404
                                     (node, info['vg_free'], req_size))
3405

    
3406
    # os verification
3407
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3408
    if not os_obj:
3409
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3410
                                 " primary node"  % self.op.os_type)
3411

    
3412
    if self.op.kernel_path == constants.VALUE_NONE:
3413
      raise errors.OpPrereqError("Can't set instance kernel to none")
3414

    
3415

    
3416
    # bridge check on primary node
3417
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3418
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3419
                                 " destination node '%s'" %
3420
                                 (self.op.bridge, pnode.name))
3421

    
3422
    if self.op.start:
3423
      self.instance_status = 'up'
3424
    else:
3425
      self.instance_status = 'down'
3426

    
3427
  def Exec(self, feedback_fn):
3428
    """Create and add the instance to the cluster.
3429

3430
    """
3431
    instance = self.op.instance_name
3432
    pnode_name = self.pnode.name
3433

    
3434
    if self.op.mac == "auto":
3435
      mac_address = self.cfg.GenerateMAC()
3436
    else:
3437
      mac_address = self.op.mac
3438

    
3439
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3440
    if self.inst_ip is not None:
3441
      nic.ip = self.inst_ip
3442

    
3443
    ht_kind = self.sstore.GetHypervisorType()
3444
    if ht_kind in constants.HTS_REQ_PORT:
3445
      network_port = self.cfg.AllocatePort()
3446
    else:
3447
      network_port = None
3448

    
3449
    # this is needed because os.path.join does not accept None arguments
3450
    if self.op.file_storage_dir is None:
3451
      string_file_storage_dir = ""
3452
    else:
3453
      string_file_storage_dir = self.op.file_storage_dir
3454

    
3455
    # build the full file storage dir path
3456
    file_storage_dir = os.path.normpath(os.path.join(
3457
                                        self.sstore.GetFileStorageDir(),
3458
                                        string_file_storage_dir, instance))
3459

    
3460

    
3461
    disks = _GenerateDiskTemplate(self.cfg,
3462
                                  self.op.disk_template,
3463
                                  instance, pnode_name,
3464
                                  self.secondaries, self.op.disk_size,
3465
                                  self.op.swap_size,
3466
                                  file_storage_dir,
3467
                                  self.op.file_driver)
3468

    
3469
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3470
                            primary_node=pnode_name,
3471
                            memory=self.op.mem_size,
3472
                            vcpus=self.op.vcpus,
3473
                            nics=[nic], disks=disks,
3474
                            disk_template=self.op.disk_template,
3475
                            status=self.instance_status,
3476
                            network_port=network_port,
3477
                            kernel_path=self.op.kernel_path,
3478
                            initrd_path=self.op.initrd_path,
3479
                            hvm_boot_order=self.op.hvm_boot_order,
3480
                            )
3481

    
3482
    feedback_fn("* creating instance disks...")
3483
    if not _CreateDisks(self.cfg, iobj):
3484
      _RemoveDisks(iobj, self.cfg)
3485
      raise errors.OpExecError("Device creation failed, reverting...")
3486

    
3487
    feedback_fn("adding instance %s to cluster config" % instance)
3488

    
3489
    self.cfg.AddInstance(iobj)
3490

    
3491
    if self.op.wait_for_sync:
3492
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3493
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3494
      # make sure the disks are not degraded (still sync-ing is ok)
3495
      time.sleep(15)
3496
      feedback_fn("* checking mirrors status")
3497
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3498
    else:
3499
      disk_abort = False
3500

    
3501
    if disk_abort:
3502
      _RemoveDisks(iobj, self.cfg)
3503
      self.cfg.RemoveInstance(iobj.name)
3504
      raise errors.OpExecError("There are some degraded disks for"
3505
                               " this instance")
3506

    
3507
    feedback_fn("creating os for instance %s on node %s" %
3508
                (instance, pnode_name))
3509

    
3510
    if iobj.disk_template != constants.DT_DISKLESS:
3511
      if self.op.mode == constants.INSTANCE_CREATE:
3512
        feedback_fn("* running the instance OS create scripts...")
3513
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3514
          raise errors.OpExecError("could not add os for instance %s"
3515
                                   " on node %s" %
3516
                                   (instance, pnode_name))
3517

    
3518
      elif self.op.mode == constants.INSTANCE_IMPORT:
3519
        feedback_fn("* running the instance OS import scripts...")
3520
        src_node = self.op.src_node
3521
        src_image = self.src_image
3522
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3523
                                                src_node, src_image):
3524
          raise errors.OpExecError("Could not import os for instance"
3525
                                   " %s on node %s" %
3526
                                   (instance, pnode_name))
3527
      else:
3528
        # also checked in the prereq part
3529
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3530
                                     % self.op.mode)
3531

    
3532
    if self.op.start:
3533
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3534
      feedback_fn("* starting instance...")
3535
      if not rpc.call_instance_start(pnode_name, iobj, None):
3536
        raise errors.OpExecError("Could not start instance")
3537

    
3538

    
3539
class LUConnectConsole(NoHooksLU):
3540
  """Connect to an instance's console.
3541

3542
  This is somewhat special in that it returns the command line that
3543
  you need to run on the master node in order to connect to the
3544
  console.
3545

3546
  """
3547
  _OP_REQP = ["instance_name"]
3548

    
3549
  def CheckPrereq(self):
3550
    """Check prerequisites.
3551

3552
    This checks that the instance is in the cluster.
3553

3554
    """
3555
    instance = self.cfg.GetInstanceInfo(
3556
      self.cfg.ExpandInstanceName(self.op.instance_name))
3557
    if instance is None:
3558
      raise errors.OpPrereqError("Instance '%s' not known" %
3559
                                 self.op.instance_name)
3560
    self.instance = instance
3561

    
3562
  def Exec(self, feedback_fn):
3563
    """Connect to the console of an instance
3564

3565
    """
3566
    instance = self.instance
3567
    node = instance.primary_node
3568

    
3569
    node_insts = rpc.call_instance_list([node])[node]
3570
    if node_insts is False:
3571
      raise errors.OpExecError("Can't connect to node %s." % node)
3572

    
3573
    if instance.name not in node_insts:
3574
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3575

    
3576
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3577

    
3578
    hyper = hypervisor.GetHypervisor()
3579
    console_cmd = hyper.GetShellCommandForConsole(instance)
3580

    
3581
    # build ssh cmdline
3582
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3583

    
3584

    
3585
class LUReplaceDisks(LogicalUnit):
3586
  """Replace the disks of an instance.
3587

3588
  """
3589
  HPATH = "mirrors-replace"
3590
  HTYPE = constants.HTYPE_INSTANCE
3591
  _OP_REQP = ["instance_name", "mode", "disks"]
3592

    
3593
  def BuildHooksEnv(self):
3594
    """Build hooks env.
3595

3596
    This runs on the master, the primary and all the secondaries.
3597

3598
    """
3599
    env = {
3600
      "MODE": self.op.mode,
3601
      "NEW_SECONDARY": self.op.remote_node,
3602
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3603
      }
3604
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3605
    nl = [
3606
      self.sstore.GetMasterNode(),
3607
      self.instance.primary_node,
3608
      ]
3609
    if self.op.remote_node is not None:
3610
      nl.append(self.op.remote_node)
3611
    return env, nl, nl
3612

    
3613
  def CheckPrereq(self):
3614
    """Check prerequisites.
3615

3616
    This checks that the instance is in the cluster.
3617

3618
    """
3619
    instance = self.cfg.GetInstanceInfo(
3620
      self.cfg.ExpandInstanceName(self.op.instance_name))
3621
    if instance is None:
3622
      raise errors.OpPrereqError("Instance '%s' not known" %
3623
                                 self.op.instance_name)
3624
    self.instance = instance
3625
    self.op.instance_name = instance.name
3626

    
3627
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3628
      raise errors.OpPrereqError("Instance's disk layout is not"
3629
                                 " network mirrored.")
3630

    
3631
    if len(instance.secondary_nodes) != 1:
3632
      raise errors.OpPrereqError("The instance has a strange layout,"
3633
                                 " expected one secondary but found %d" %
3634
                                 len(instance.secondary_nodes))
3635

    
3636
    self.sec_node = instance.secondary_nodes[0]
3637

    
3638
    remote_node = getattr(self.op, "remote_node", None)
3639
    if remote_node is not None:
3640
      remote_node = self.cfg.ExpandNodeName(remote_node)
3641
      if remote_node is None:
3642
        raise errors.OpPrereqError("Node '%s' not known" %
3643
                                   self.op.remote_node)
3644
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3645
    else:
3646
      self.remote_node_info = None
3647
    if remote_node == instance.primary_node:
3648
      raise errors.OpPrereqError("The specified node is the primary node of"
3649
                                 " the instance.")
3650
    elif remote_node == self.sec_node:
3651
      if self.op.mode == constants.REPLACE_DISK_SEC:
3652
        # this is for DRBD8, where we can't execute the same mode of
3653
        # replacement as for drbd7 (no different port allocated)
3654
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3655
                                   " replacement")
3656
      # the user gave the current secondary, switch to
3657
      # 'no-replace-secondary' mode for drbd7
3658
      remote_node = None
3659
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3660
        self.op.mode != constants.REPLACE_DISK_ALL):
3661
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3662
                                 " disks replacement, not individual ones")
3663
    if instance.disk_template == constants.DT_DRBD8:
3664
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3665
          remote_node is not None):
3666
        # switch to replace secondary mode
3667
        self.op.mode = constants.REPLACE_DISK_SEC
3668

    
3669
      if self.op.mode == constants.REPLACE_DISK_ALL:
3670
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3671
                                   " secondary disk replacement, not"
3672
                                   " both at once")
3673
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3674
        if remote_node is not None:
3675
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3676
                                     " the secondary while doing a primary"
3677
                                     " node disk replacement")
3678
        self.tgt_node = instance.primary_node
3679
        self.oth_node = instance.secondary_nodes[0]
3680
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3681
        self.new_node = remote_node # this can be None, in which case
3682
                                    # we don't change the secondary
3683
        self.tgt_node = instance.secondary_nodes[0]
3684
        self.oth_node = instance.primary_node
3685
      else:
3686
        raise errors.ProgrammerError("Unhandled disk replace mode")
3687

    
3688
    for name in self.op.disks:
3689
      if instance.FindDisk(name) is None:
3690
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3691
                                   (name, instance.name))
3692
    self.op.remote_node = remote_node
3693

    
3694
  def _ExecRR1(self, feedback_fn):
3695
    """Replace the disks of an instance.
3696

3697
    """
3698
    instance = self.instance
3699
    iv_names = {}
3700
    # start of work
3701
    if self.op.remote_node is None:
3702
      remote_node = self.sec_node
3703
    else:
3704
      remote_node = self.op.remote_node
3705
    cfg = self.cfg
3706
    for dev in instance.disks:
3707
      size = dev.size
3708
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3709
      names = _GenerateUniqueNames(cfg, lv_names)
3710
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3711
                                       remote_node, size, names)
3712
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3713
      logger.Info("adding new mirror component on secondary for %s" %
3714
                  dev.iv_name)
3715
      #HARDCODE
3716
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3717
                                        new_drbd, False,
3718
                                        _GetInstanceInfoText(instance)):
3719
        raise errors.OpExecError("Failed to create new component on secondary"
3720
                                 " node %s. Full abort, cleanup manually!" %
3721
                                 remote_node)
3722

    
3723
      logger.Info("adding new mirror component on primary")
3724
      #HARDCODE
3725
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3726
                                      instance, new_drbd,
3727
                                      _GetInstanceInfoText(instance)):
3728
        # remove secondary dev
3729
        cfg.SetDiskID(new_drbd, remote_node)
3730
        rpc.call_blockdev_remove(remote_node, new_drbd)
3731
        raise errors.OpExecError("Failed to create volume on primary!"
3732
                                 " Full abort, cleanup manually!!")
3733

    
3734
      # the device exists now
3735
      # call the primary node to add the mirror to md
3736
      logger.Info("adding new mirror component to md")
3737
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3738
                                           [new_drbd]):
3739
        logger.Error("Can't add mirror compoment to md!")
3740
        cfg.SetDiskID(new_drbd, remote_node)
3741
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3742
          logger.Error("Can't rollback on secondary")
3743
        cfg.SetDiskID(new_drbd, instance.primary_node)
3744
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3745
          logger.Error("Can't rollback on primary")
3746
        raise errors.OpExecError("Full abort, cleanup manually!!")
3747

    
3748
      dev.children.append(new_drbd)
3749
      cfg.AddInstance(instance)
3750

    
3751
    # this can fail as the old devices are degraded and _WaitForSync
3752
    # does a combined result over all disks, so we don't check its
3753
    # return value
3754
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3755

    
3756
    # so check manually all the devices
3757
    for name in iv_names:
3758
      dev, child, new_drbd = iv_names[name]
3759
      cfg.SetDiskID(dev, instance.primary_node)
3760
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3761
      if is_degr:
3762
        raise errors.OpExecError("MD device %s is degraded!" % name)
3763
      cfg.SetDiskID(new_drbd, instance.primary_node)
3764
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3765
      if is_degr:
3766
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3767

    
3768
    for name in iv_names:
3769
      dev, child, new_drbd = iv_names[name]
3770
      logger.Info("remove mirror %s component" % name)
3771
      cfg.SetDiskID(dev, instance.primary_node)
3772
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3773
                                              dev, [child]):
3774
        logger.Error("Can't remove child from mirror, aborting"
3775
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3776
        continue
3777

    
3778
      for node in child.logical_id[:2]:
3779
        logger.Info("remove child device on %s" % node)
3780
        cfg.SetDiskID(child, node)
3781
        if not rpc.call_blockdev_remove(node, child):
3782
          logger.Error("Warning: failed to remove device from node %s,"
3783
                       " continuing operation." % node)
3784

    
3785
      dev.children.remove(child)
3786

    
3787
      cfg.AddInstance(instance)
3788

    
3789
  def _ExecD8DiskOnly(self, feedback_fn):
3790
    """Replace a disk on the primary or secondary for dbrd8.
3791

3792
    The algorithm for replace is quite complicated:
3793
      - for each disk to be replaced:
3794
        - create new LVs on the target node with unique names
3795
        - detach old LVs from the drbd device
3796
        - rename old LVs to name_replaced.<time_t>
3797
        - rename new LVs to old LVs
3798
        - attach the new LVs (with the old names now) to the drbd device
3799
      - wait for sync across all devices
3800
      - for each modified disk:
3801
        - remove old LVs (which have the name name_replaces.<time_t>)
3802

3803
    Failures are not very well handled.
3804

3805
    """
3806
    steps_total = 6
3807
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3808
    instance = self.instance
3809
    iv_names = {}
3810
    vgname = self.cfg.GetVGName()
3811
    # start of work
3812
    cfg = self.cfg
3813
    tgt_node = self.tgt_node
3814
    oth_node = self.oth_node
3815

    
3816
    # Step: check device activation
3817
    self.proc.LogStep(1, steps_total, "check device existence")
3818
    info("checking volume groups")
3819
    my_vg = cfg.GetVGName()
3820
    results = rpc.call_vg_list([oth_node, tgt_node])
3821
    if not results:
3822
      raise errors.OpExecError("Can't list volume groups on the nodes")
3823
    for node in oth_node, tgt_node:
3824
      res = results.get(node, False)
3825
      if not res or my_vg not in res:
3826
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3827
                                 (my_vg, node))
3828
    for dev in instance.disks:
3829
      if not dev.iv_name in self.op.disks:
3830
        continue
3831
      for node in tgt_node, oth_node:
3832
        info("checking %s on %s" % (dev.iv_name, node))
3833
        cfg.SetDiskID(dev, node)
3834
        if not rpc.call_blockdev_find(node, dev):
3835
          raise errors.OpExecError("Can't find device %s on node %s" %
3836
                                   (dev.iv_name, node))
3837

    
3838
    # Step: check other node consistency
3839
    self.proc.LogStep(2, steps_total, "check peer consistency")
3840
    for dev in instance.disks:
3841
      if not dev.iv_name in self.op.disks:
3842
        continue
3843
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3844
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3845
                                   oth_node==instance.primary_node):
3846
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3847
                                 " to replace disks on this node (%s)" %
3848
                                 (oth_node, tgt_node))
3849

    
3850
    # Step: create new storage
3851
    self.proc.LogStep(3, steps_total, "allocate new storage")
3852
    for dev in instance.disks:
3853
      if not dev.iv_name in self.op.disks:
3854
        continue
3855
      size = dev.size
3856
      cfg.SetDiskID(dev, tgt_node)
3857
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3858
      names = _GenerateUniqueNames(cfg, lv_names)
3859
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3860
                             logical_id=(vgname, names[0]))
3861
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3862
                             logical_id=(vgname, names[1]))
3863
      new_lvs = [lv_data, lv_meta]
3864
      old_lvs = dev.children
3865
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3866
      info("creating new local storage on %s for %s" %
3867
           (tgt_node, dev.iv_name))
3868
      # since we *always* want to create this LV, we use the
3869
      # _Create...OnPrimary (which forces the creation), even if we
3870
      # are talking about the secondary node
3871
      for new_lv in new_lvs:
3872
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3873
                                        _GetInstanceInfoText(instance)):
3874
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3875
                                   " node '%s'" %
3876
                                   (new_lv.logical_id[1], tgt_node))
3877

    
3878
    # Step: for each lv, detach+rename*2+attach
3879
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3880
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3881
      info("detaching %s drbd from local storage" % dev.iv_name)
3882
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3883
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3884
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3885
      #dev.children = []
3886
      #cfg.Update(instance)
3887

    
3888
      # ok, we created the new LVs, so now we know we have the needed
3889
      # storage; as such, we proceed on the target node to rename
3890
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3891
      # using the assumption that logical_id == physical_id (which in
3892
      # turn is the unique_id on that node)
3893

    
3894
      # FIXME(iustin): use a better name for the replaced LVs
3895
      temp_suffix = int(time.time())
3896
      ren_fn = lambda d, suff: (d.physical_id[0],
3897
                                d.physical_id[1] + "_replaced-%s" % suff)
3898
      # build the rename list based on what LVs exist on the node
3899
      rlist = []
3900
      for to_ren in old_lvs:
3901
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3902
        if find_res is not None: # device exists
3903
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3904

    
3905
      info("renaming the old LVs on the target node")
3906
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3907
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3908
      # now we rename the new LVs to the old LVs
3909
      info("renaming the new LVs on the target node")
3910
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3911
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3912
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3913

    
3914
      for old, new in zip(old_lvs, new_lvs):
3915
        new.logical_id = old.logical_id
3916
        cfg.SetDiskID(new, tgt_node)
3917

    
3918
      for disk in old_lvs:
3919
        disk.logical_id = ren_fn(disk, temp_suffix)
3920
        cfg.SetDiskID(disk, tgt_node)
3921

    
3922
      # now that the new lvs have the old name, we can add them to the device
3923
      info("adding new mirror component on %s" % tgt_node)
3924
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3925
        for new_lv in new_lvs:
3926
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3927
            warning("Can't rollback device %s", hint="manually cleanup unused"
3928
                    " logical volumes")
3929
        raise errors.OpExecError("Can't add local storage to drbd")
3930

    
3931
      dev.children = new_lvs
3932
      cfg.Update(instance)
3933

    
3934
    # Step: wait for sync
3935

    
3936
    # this can fail as the old devices are degraded and _WaitForSync
3937
    # does a combined result over all disks, so we don't check its
3938
    # return value
3939
    self.proc.LogStep(5, steps_total, "sync devices")
3940
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3941

    
3942
    # so check manually all the devices
3943
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3944
      cfg.SetDiskID(dev, instance.primary_node)
3945
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3946
      if is_degr:
3947
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3948

    
3949
    # Step: remove old storage
3950
    self.proc.LogStep(6, steps_total, "removing old storage")
3951
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3952
      info("remove logical volumes for %s" % name)
3953
      for lv in old_lvs:
3954
        cfg.SetDiskID(lv, tgt_node)
3955
        if not rpc.call_blockdev_remove(tgt_node, lv):
3956
          warning("Can't remove old LV", hint="manually remove unused LVs")
3957
          continue
3958

    
3959
  def _ExecD8Secondary(self, feedback_fn):
3960
    """Replace the secondary node for drbd8.
3961

3962
    The algorithm for replace is quite complicated:
3963
      - for all disks of the instance:
3964
        - create new LVs on the new node with same names
3965
        - shutdown the drbd device on the old secondary
3966
        - disconnect the drbd network on the primary
3967
        - create the drbd device on the new secondary
3968
        - network attach the drbd on the primary, using an artifice:
3969
          the drbd code for Attach() will connect to the network if it
3970
          finds a device which is connected to the good local disks but
3971
          not network enabled
3972
      - wait for sync across all devices
3973
      - remove all disks from the old secondary
3974

3975
    Failures are not very well handled.
3976

3977
    """
3978
    steps_total = 6
3979
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3980
    instance = self.instance
3981
    iv_names = {}
3982
    vgname = self.cfg.GetVGName()
3983
    # start of work
3984
    cfg = self.cfg
3985
    old_node = self.tgt_node
3986
    new_node = self.new_node
3987
    pri_node = instance.primary_node
3988

    
3989
    # Step: check device activation
3990
    self.proc.LogStep(1, steps_total, "check device existence")
3991
    info("checking volume groups")
3992
    my_vg = cfg.GetVGName()
3993
    results = rpc.call_vg_list([pri_node, new_node])
3994
    if not results:
3995
      raise errors.OpExecError("Can't list volume groups on the nodes")
3996
    for node in pri_node, new_node:
3997
      res = results.get(node, False)
3998
      if not res or my_vg not in res:
3999
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4000
                                 (my_vg, node))
4001
    for dev in instance.disks:
4002
      if not dev.iv_name in self.op.disks:
4003
        continue
4004
      info("checking %s on %s" % (dev.iv_name, pri_node))
4005
      cfg.SetDiskID(dev, pri_node)
4006
      if not rpc.call_blockdev_find(pri_node, dev):
4007
        raise errors.OpExecError("Can't find device %s on node %s" %
4008
                                 (dev.iv_name, pri_node))
4009

    
4010
    # Step: check other node consistency
4011
    self.proc.LogStep(2, steps_total, "check peer consistency")
4012
    for dev in instance.disks:
4013
      if not dev.iv_name in self.op.disks:
4014
        continue
4015
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4016
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4017
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4018
                                 " unsafe to replace the secondary" %
4019
                                 pri_node)
4020

    
4021
    # Step: create new storage
4022
    self.proc.LogStep(3, steps_total, "allocate new storage")
4023
    for dev in instance.disks:
4024
      size = dev.size
4025
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4026
      # since we *always* want to create this LV, we use the
4027
      # _Create...OnPrimary (which forces the creation), even if we
4028
      # are talking about the secondary node
4029
      for new_lv in dev.children:
4030
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4031
                                        _GetInstanceInfoText(instance)):
4032
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4033
                                   " node '%s'" %
4034
                                   (new_lv.logical_id[1], new_node))
4035

    
4036
      iv_names[dev.iv_name] = (dev, dev.children)
4037

    
4038
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4039
    for dev in instance.disks:
4040
      size = dev.size
4041
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4042
      # create new devices on new_node
4043
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4044
                              logical_id=(pri_node, new_node,
4045
                                          dev.logical_id[2]),
4046
                              children=dev.children)
4047
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4048
                                        new_drbd, False,
4049
                                      _GetInstanceInfoText(instance)):
4050
        raise errors.OpExecError("Failed to create new DRBD on"
4051
                                 " node '%s'" % new_node)
4052

    
4053
    for dev in instance.disks:
4054
      # we have new devices, shutdown the drbd on the old secondary
4055
      info("shutting down drbd for %s on old node" % dev.iv_name)
4056
      cfg.SetDiskID(dev, old_node)
4057
      if not rpc.call_blockdev_shutdown(old_node, dev):
4058
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4059
                hint="Please cleanup this device manually as soon as possible")
4060

    
4061
    info("detaching primary drbds from the network (=> standalone)")
4062
    done = 0
4063
    for dev in instance.disks:
4064
      cfg.SetDiskID(dev, pri_node)
4065
      # set the physical (unique in bdev terms) id to None, meaning
4066
      # detach from network
4067
      dev.physical_id = (None,) * len(dev.physical_id)
4068
      # and 'find' the device, which will 'fix' it to match the
4069
      # standalone state
4070
      if rpc.call_blockdev_find(pri_node, dev):
4071
        done += 1
4072
      else:
4073
        warning("Failed to detach drbd %s from network, unusual case" %
4074
                dev.iv_name)
4075

    
4076
    if not done:
4077
      # no detaches succeeded (very unlikely)
4078
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4079

    
4080
    # if we managed to detach at least one, we update all the disks of
4081
    # the instance to point to the new secondary
4082
    info("updating instance configuration")
4083
    for dev in instance.disks:
4084
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4085
      cfg.SetDiskID(dev, pri_node)
4086
    cfg.Update(instance)
4087

    
4088
    # and now perform the drbd attach
4089
    info("attaching primary drbds to new secondary (standalone => connected)")
4090
    failures = []
4091
    for dev in instance.disks:
4092
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4093
      # since the attach is smart, it's enough to 'find' the device,
4094
      # it will automatically activate the network, if the physical_id
4095
      # is correct
4096
      cfg.SetDiskID(dev, pri_node)
4097
      if not rpc.call_blockdev_find(pri_node, dev):
4098
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4099
                "please do a gnt-instance info to see the status of disks")
4100

    
4101
    # this can fail as the old devices are degraded and _WaitForSync
4102
    # does a combined result over all disks, so we don't check its
4103
    # return value
4104
    self.proc.LogStep(5, steps_total, "sync devices")
4105
    _WaitForSync(cfg, instance, self.proc, unlock=True)
4106

    
4107
    # so check manually all the devices
4108
    for name, (dev, old_lvs) in iv_names.iteritems():
4109
      cfg.SetDiskID(dev, pri_node)
4110
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4111
      if is_degr:
4112
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4113

    
4114
    self.proc.LogStep(6, steps_total, "removing old storage")
4115
    for name, (dev, old_lvs) in iv_names.iteritems():
4116
      info("remove logical volumes for %s" % name)
4117
      for lv in old_lvs:
4118
        cfg.SetDiskID(lv, old_node)
4119
        if not rpc.call_blockdev_remove(old_node, lv):
4120
          warning("Can't remove LV on old secondary",
4121
                  hint="Cleanup stale volumes by hand")
4122

    
4123
  def Exec(self, feedback_fn):
4124
    """Execute disk replacement.
4125

4126
    This dispatches the disk replacement to the appropriate handler.
4127

4128
    """
4129
    instance = self.instance
4130
    if instance.disk_template == constants.DT_REMOTE_RAID1:
4131
      fn = self._ExecRR1
4132
    elif instance.disk_template == constants.DT_DRBD8:
4133
      if self.op.remote_node is None:
4134
        fn = self._ExecD8DiskOnly
4135
      else:
4136
        fn = self._ExecD8Secondary
4137
    else:
4138
      raise errors.ProgrammerError("Unhandled disk replacement case")
4139
    return fn(feedback_fn)
4140

    
4141

    
4142
class LUQueryInstanceData(NoHooksLU):
4143
  """Query runtime instance data.
4144

4145
  """
4146
  _OP_REQP = ["instances"]
4147

    
4148
  def CheckPrereq(self):
4149
    """Check prerequisites.
4150

4151
    This only checks the optional instance list against the existing names.
4152

4153
    """
4154
    if not isinstance(self.op.instances, list):
4155
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4156
    if self.op.instances:
4157
      self.wanted_instances = []
4158
      names = self.op.instances
4159
      for name in names:
4160
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4161
        if instance is None:
4162
          raise errors.OpPrereqError("No such instance name '%s'" % name)
4163
        self.wanted_instances.append(instance)
4164
    else:
4165
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4166
                               in self.cfg.GetInstanceList()]
4167
    return
4168

    
4169

    
4170
  def _ComputeDiskStatus(self, instance, snode, dev):
4171
    """Compute block device status.
4172

4173
    """
4174
    self.cfg.SetDiskID(dev, instance.primary_node)
4175
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4176
    if dev.dev_type in constants.LDS_DRBD:
4177
      # we change the snode then (otherwise we use the one passed in)
4178
      if dev.logical_id[0] == instance.primary_node:
4179
        snode = dev.logical_id[1]
4180
      else:
4181
        snode = dev.logical_id[0]
4182

    
4183
    if snode:
4184
      self.cfg.SetDiskID(dev, snode)
4185
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4186
    else:
4187
      dev_sstatus = None
4188

    
4189
    if dev.children:
4190
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4191
                      for child in dev.children]
4192
    else:
4193
      dev_children = []
4194

    
4195
    data = {
4196
      "iv_name": dev.iv_name,
4197
      "dev_type": dev.dev_type,
4198
      "logical_id": dev.logical_id,
4199
      "physical_id": dev.physical_id,
4200
      "pstatus": dev_pstatus,
4201
      "sstatus": dev_sstatus,
4202
      "children": dev_children,
4203
      }
4204

    
4205
    return data
4206

    
4207
  def Exec(self, feedback_fn):
4208
    """Gather and return data"""
4209
    result = {}
4210
    for instance in self.wanted_instances:
4211
      remote_info = rpc.call_instance_info(instance.primary_node,
4212
                                                instance.name)
4213
      if remote_info and "state" in remote_info:
4214
        remote_state = "up"
4215
      else:
4216
        remote_state = "down"
4217
      if instance.status == "down":
4218
        config_state = "down"
4219
      else:
4220
        config_state = "up"
4221

    
4222
      disks = [self._ComputeDiskStatus(instance, None, device)
4223
               for device in instance.disks]
4224

    
4225
      idict = {
4226
        "name": instance.name,
4227
        "config_state": config_state,
4228
        "run_state": remote_state,
4229
        "pnode": instance.primary_node,
4230
        "snodes": instance.secondary_nodes,
4231
        "os": instance.os,
4232
        "memory": instance.memory,
4233
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4234
        "disks": disks,
4235
        "network_port": instance.network_port,
4236
        "vcpus": instance.vcpus,
4237
        "kernel_path": instance.kernel_path,
4238
        "initrd_path": instance.initrd_path,
4239
        "hvm_boot_order": instance.hvm_boot_order,
4240
        }
4241

    
4242
      result[instance.name] = idict
4243

    
4244
    return result
4245

    
4246

    
4247
class LUSetInstanceParams(LogicalUnit):
4248
  """Modifies an instances's parameters.
4249

4250
  """
4251
  HPATH = "instance-modify"
4252
  HTYPE = constants.HTYPE_INSTANCE
4253
  _OP_REQP = ["instance_name"]
4254

    
4255
  def BuildHooksEnv(self):
4256
    """Build hooks env.
4257

4258
    This runs on the master, primary and secondaries.
4259

4260
    """
4261
    args = dict()
4262
    if self.mem:
4263
      args['memory'] = self.mem
4264
    if self.vcpus:
4265
      args['vcpus'] = self.vcpus
4266
    if self.do_ip or self.do_bridge or self.mac:
4267
      if self.do_ip:
4268
        ip = self.ip
4269
      else:
4270
        ip = self.instance.nics[0].ip
4271
      if self.bridge:
4272
        bridge = self.bridge
4273
      else:
4274
        bridge = self.instance.nics[0].bridge
4275
      if self.mac:
4276
        mac = self.mac
4277
      else:
4278
        mac = self.instance.nics[0].mac
4279
      args['nics'] = [(ip, bridge, mac)]
4280
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4281
    nl = [self.sstore.GetMasterNode(),
4282
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4283
    return env, nl, nl
4284

    
4285
  def CheckPrereq(self):
4286
    """Check prerequisites.
4287

4288
    This only checks the instance list against the existing names.
4289

4290
    """
4291
    self.mem = getattr(self.op, "mem", None)
4292
    self.vcpus = getattr(self.op, "vcpus", None)
4293
    self.ip = getattr(self.op, "ip", None)
4294
    self.mac = getattr(self.op, "mac", None)
4295
    self.bridge = getattr(self.op, "bridge", None)
4296
    self.kernel_path = getattr(self.op, "kernel_path", None)
4297
    self.initrd_path = getattr(self.op, "initrd_path", None)
4298
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4299
    all_params = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4300
                  self.kernel_path, self.initrd_path, self.hvm_boot_order]
4301
    if all_params.count(None) == len(all_params):
4302
      raise errors.OpPrereqError("No changes submitted")
4303
    if self.mem is not None:
4304
      try:
4305
        self.mem = int(self.mem)
4306
      except ValueError, err:
4307
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4308
    if self.vcpus is not None:
4309
      try:
4310
        self.vcpus = int(self.vcpus)
4311
      except ValueError, err:
4312
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4313
    if self.ip is not None:
4314
      self.do_ip = True
4315
      if self.ip.lower() == "none":
4316
        self.ip = None
4317
      else:
4318
        if not utils.IsValidIP(self.ip):
4319
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4320
    else:
4321
      self.do_ip = False
4322
    self.do_bridge = (self.bridge is not None)
4323
    if self.mac is not None:
4324
      if self.cfg.IsMacInUse(self.mac):
4325
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4326
                                   self.mac)
4327
      if not utils.IsValidMac(self.mac):
4328
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4329

    
4330
    if self.kernel_path is not None:
4331
      self.do_kernel_path = True
4332
      if self.kernel_path == constants.VALUE_NONE:
4333
        raise errors.OpPrereqError("Can't set instance to no kernel")
4334

    
4335
      if self.kernel_path != constants.VALUE_DEFAULT:
4336
        if not os.path.isabs(self.kernel_path):
4337
          raise errors.OpPrereqError("The kernel path must be an absolute"
4338
                                    " filename")
4339
    else:
4340
      self.do_kernel_path = False
4341

    
4342
    if self.initrd_path is not None:
4343
      self.do_initrd_path = True
4344
      if self.initrd_path not in (constants.VALUE_NONE,
4345
                                  constants.VALUE_DEFAULT):
4346
        if not os.path.isabs(self.initrd_path):
4347
          raise errors.OpPrereqError("The initrd path must be an absolute"
4348
                                    " filename")
4349
    else:
4350
      self.do_initrd_path = False
4351

    
4352
    # boot order verification
4353
    if self.hvm_boot_order is not None:
4354
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4355
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4356
          raise errors.OpPrereqError("invalid boot order specified,"
4357
                                     " must be one or more of [acdn]"
4358
                                     " or 'default'")
4359

    
4360
    instance = self.cfg.GetInstanceInfo(
4361
      self.cfg.ExpandInstanceName(self.op.instance_name))
4362
    if instance is None:
4363
      raise errors.OpPrereqError("No such instance name '%s'" %
4364
                                 self.op.instance_name)
4365
    self.op.instance_name = instance.name
4366
    self.instance = instance
4367
    return
4368

    
4369
  def Exec(self, feedback_fn):
4370
    """Modifies an instance.
4371

4372
    All parameters take effect only at the next restart of the instance.
4373
    """
4374
    result = []
4375
    instance = self.instance
4376
    if self.mem:
4377
      instance.memory = self.mem
4378
      result.append(("mem", self.mem))
4379
    if self.vcpus:
4380
      instance.vcpus = self.vcpus
4381
      result.append(("vcpus",  self.vcpus))
4382
    if self.do_ip:
4383
      instance.nics[0].ip = self.ip
4384
      result.append(("ip", self.ip))
4385
    if self.bridge:
4386
      instance.nics[0].bridge = self.bridge
4387
      result.append(("bridge", self.bridge))
4388
    if self.mac:
4389
      instance.nics[0].mac = self.mac
4390
      result.append(("mac", self.mac))
4391
    if self.do_kernel_path:
4392
      instance.kernel_path = self.kernel_path
4393
      result.append(("kernel_path", self.kernel_path))
4394
    if self.do_initrd_path:
4395
      instance.initrd_path = self.initrd_path
4396
      result.append(("initrd_path", self.initrd_path))
4397
    if self.hvm_boot_order:
4398
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4399
        instance.hvm_boot_order = None
4400
      else:
4401
        instance.hvm_boot_order = self.hvm_boot_order
4402
      result.append(("hvm_boot_order", self.hvm_boot_order))
4403

    
4404
    self.cfg.AddInstance(instance)
4405

    
4406
    return result
4407

    
4408

    
4409
class LUQueryExports(NoHooksLU):
4410
  """Query the exports list
4411

4412
  """
4413
  _OP_REQP = []
4414

    
4415
  def CheckPrereq(self):
4416
    """Check that the nodelist contains only existing nodes.
4417

4418
    """
4419
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4420

    
4421
  def Exec(self, feedback_fn):
4422
    """Compute the list of all the exported system images.
4423

4424
    Returns:
4425
      a dictionary with the structure node->(export-list)
4426
      where export-list is a list of the instances exported on
4427
      that node.
4428

4429
    """
4430
    return rpc.call_export_list(self.nodes)
4431

    
4432

    
4433
class LUExportInstance(LogicalUnit):
4434
  """Export an instance to an image in the cluster.
4435

4436
  """
4437
  HPATH = "instance-export"
4438
  HTYPE = constants.HTYPE_INSTANCE
4439
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4440

    
4441
  def BuildHooksEnv(self):
4442
    """Build hooks env.
4443

4444
    This will run on the master, primary node and target node.
4445

4446
    """
4447
    env = {
4448
      "EXPORT_NODE": self.op.target_node,
4449
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4450
      }
4451
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4452
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4453
          self.op.target_node]
4454
    return env, nl, nl
4455

    
4456
  def CheckPrereq(self):
4457
    """Check prerequisites.
4458

4459
    This checks that the instance name is a valid one.
4460

4461
    """
4462
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4463
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4464
    if self.instance is None:
4465
      raise errors.OpPrereqError("Instance '%s' not found" %
4466
                                 self.op.instance_name)
4467

    
4468
    # node verification
4469
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4470
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4471

    
4472
    if self.dst_node is None:
4473
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4474
                                 self.op.target_node)
4475
    self.op.target_node = self.dst_node.name
4476

    
4477
  def Exec(self, feedback_fn):
4478
    """Export an instance to an image in the cluster.
4479

4480
    """
4481
    instance = self.instance
4482
    dst_node = self.dst_node
4483
    src_node = instance.primary_node
4484
    if self.op.shutdown:
4485
      # shutdown the instance, but not the disks
4486
      if not rpc.call_instance_shutdown(src_node, instance):
4487
         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4488
                                 (instance.name, src_node))
4489

    
4490
    vgname = self.cfg.GetVGName()
4491

    
4492
    snap_disks = []
4493

    
4494
    try:
4495
      for disk in instance.disks:
4496
        if disk.iv_name == "sda":
4497
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4498
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4499

    
4500
          if not new_dev_name:
4501
            logger.Error("could not snapshot block device %s on node %s" %
4502
                         (disk.logical_id[1], src_node))
4503
          else:
4504
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4505
                                      logical_id=(vgname, new_dev_name),
4506
                                      physical_id=(vgname, new_dev_name),
4507
                                      iv_name=disk.iv_name)
4508
            snap_disks.append(new_dev)
4509

    
4510
    finally:
4511
      if self.op.shutdown and instance.status == "up":
4512
        if not rpc.call_instance_start(src_node, instance, None):
4513
          _ShutdownInstanceDisks(instance, self.cfg)
4514
          raise errors.OpExecError("Could not start instance")
4515

    
4516
    # TODO: check for size
4517

    
4518
    for dev in snap_disks:
4519
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4520
        logger.Error("could not export block device %s from node %s to node %s"
4521
                     % (dev.logical_id[1], src_node, dst_node.name))
4522
      if not rpc.call_blockdev_remove(src_node, dev):
4523
        logger.Error("could not remove snapshot block device %s from node %s" %
4524
                     (dev.logical_id[1], src_node))
4525

    
4526
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4527
      logger.Error("could not finalize export for instance %s on node %s" %
4528
                   (instance.name, dst_node.name))
4529

    
4530
    nodelist = self.cfg.GetNodeList()
4531
    nodelist.remove(dst_node.name)
4532

    
4533
    # on one-node clusters nodelist will be empty after the removal
4534
    # if we proceed the backup would be removed because OpQueryExports
4535
    # substitutes an empty list with the full cluster node list.
4536
    if nodelist:
4537
      op = opcodes.OpQueryExports(nodes=nodelist)
4538
      exportlist = self.proc.ChainOpCode(op)
4539
      for node in exportlist:
4540
        if instance.name in exportlist[node]:
4541
          if not rpc.call_export_remove(node, instance.name):
4542
            logger.Error("could not remove older export for instance %s"
4543
                         " on node %s" % (instance.name, node))
4544

    
4545

    
4546
class TagsLU(NoHooksLU):
4547
  """Generic tags LU.
4548

4549
  This is an abstract class which is the parent of all the other tags LUs.
4550

4551
  """
4552
  def CheckPrereq(self):
4553
    """Check prerequisites.
4554

4555
    """
4556
    if self.op.kind == constants.TAG_CLUSTER:
4557
      self.target = self.cfg.GetClusterInfo()
4558
    elif self.op.kind == constants.TAG_NODE:
4559
      name = self.cfg.ExpandNodeName(self.op.name)
4560
      if name is None:
4561
        raise errors.OpPrereqError("Invalid node name (%s)" %
4562
                                   (self.op.name,))
4563
      self.op.name = name
4564
      self.target = self.cfg.GetNodeInfo(name)
4565
    elif self.op.kind == constants.TAG_INSTANCE:
4566
      name = self.cfg.ExpandInstanceName(self.op.name)
4567
      if name is None:
4568
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4569
                                   (self.op.name,))
4570
      self.op.name = name
4571
      self.target = self.cfg.GetInstanceInfo(name)
4572
    else:
4573
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4574
                                 str(self.op.kind))
4575

    
4576

    
4577
class LUGetTags(TagsLU):
4578
  """Returns the tags of a given object.
4579

4580
  """
4581
  _OP_REQP = ["kind", "name"]
4582

    
4583
  def Exec(self, feedback_fn):
4584
    """Returns the tag list.
4585

4586
    """
4587
    return self.target.GetTags()
4588

    
4589

    
4590
class LUSearchTags(NoHooksLU):
4591
  """Searches the tags for a given pattern.
4592

4593
  """
4594
  _OP_REQP = ["pattern"]
4595

    
4596
  def CheckPrereq(self):
4597
    """Check prerequisites.
4598

4599
    This checks the pattern passed for validity by compiling it.
4600

4601
    """
4602
    try:
4603
      self.re = re.compile(self.op.pattern)
4604
    except re.error, err:
4605
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4606
                                 (self.op.pattern, err))
4607

    
4608
  def Exec(self, feedback_fn):
4609
    """Returns the tag list.
4610

4611
    """
4612
    cfg = self.cfg
4613
    tgts = [("/cluster", cfg.GetClusterInfo())]
4614
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4615
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4616
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4617
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4618
    results = []
4619
    for path, target in tgts:
4620
      for tag in target.GetTags():
4621
        if self.re.search(tag):
4622
          results.append((path, tag))
4623
    return results
4624

    
4625

    
4626
class LUAddTags(TagsLU):
4627
  """Sets a tag on a given object.
4628

4629
  """
4630
  _OP_REQP = ["kind", "name", "tags"]
4631

    
4632
  def CheckPrereq(self):
4633
    """Check prerequisites.
4634

4635
    This checks the type and length of the tag name and value.
4636

4637
    """
4638
    TagsLU.CheckPrereq(self)
4639
    for tag in self.op.tags:
4640
      objects.TaggableObject.ValidateTag(tag)
4641

    
4642
  def Exec(self, feedback_fn):
4643
    """Sets the tag.
4644

4645
    """
4646
    try:
4647
      for tag in self.op.tags:
4648
        self.target.AddTag(tag)
4649
    except errors.TagError, err:
4650
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4651
    try:
4652
      self.cfg.Update(self.target)
4653
    except errors.ConfigurationError:
4654
      raise errors.OpRetryError("There has been a modification to the"
4655
                                " config file and the operation has been"
4656
                                " aborted. Please retry.")
4657

    
4658

    
4659
class LUDelTags(TagsLU):
4660
  """Delete a list of tags from a given object.
4661

4662
  """
4663
  _OP_REQP = ["kind", "name", "tags"]
4664

    
4665
  def CheckPrereq(self):
4666
    """Check prerequisites.
4667

4668
    This checks that we have the given tag.
4669

4670
    """
4671
    TagsLU.CheckPrereq(self)
4672
    for tag in self.op.tags:
4673
      objects.TaggableObject.ValidateTag(tag)
4674
    del_tags = frozenset(self.op.tags)
4675
    cur_tags = self.target.GetTags()
4676
    if not del_tags <= cur_tags:
4677
      diff_tags = del_tags - cur_tags
4678
      diff_names = ["'%s'" % tag for tag in diff_tags]
4679
      diff_names.sort()
4680
      raise errors.OpPrereqError("Tag(s) %s not found" %
4681
                                 (",".join(diff_names)))
4682

    
4683
  def Exec(self, feedback_fn):
4684
    """Remove the tag from the object.
4685

4686
    """
4687
    for tag in self.op.tags:
4688
      self.target.RemoveTag(tag)
4689
    try:
4690
      self.cfg.Update(self.target)
4691
    except errors.ConfigurationError:
4692
      raise errors.OpRetryError("There has been a modification to the"
4693
                                " config file and the operation has been"
4694
                                " aborted. Please retry.")
4695

    
4696
class LUTestDelay(NoHooksLU):
4697
  """Sleep for a specified amount of time.
4698

4699
  This LU sleeps on the master and/or nodes for a specified amoutn of
4700
  time.
4701

4702
  """
4703
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4704

    
4705
  def CheckPrereq(self):
4706
    """Check prerequisites.
4707

4708
    This checks that we have a good list of nodes and/or the duration
4709
    is valid.
4710

4711
    """
4712

    
4713
    if self.op.on_nodes:
4714
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4715

    
4716
  def Exec(self, feedback_fn):
4717
    """Do the actual sleep.
4718

4719
    """
4720
    if self.op.on_master:
4721
      if not utils.TestDelay(self.op.duration):
4722
        raise errors.OpExecError("Error during master delay test")
4723
    if self.op.on_nodes:
4724
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4725
      if not result:
4726
        raise errors.OpExecError("Complete failure from rpc call")
4727
      for node, node_result in result.items():
4728
        if not node_result:
4729
          raise errors.OpExecError("Failure during rpc call to node %s,"
4730
                                   " result: %s" % (node, node_result))
4731

    
4732

    
4733
def _IAllocatorGetClusterData(cfg, sstore):
4734
  """Compute the generic allocator input data.
4735

4736
  This is the data that is independent of the actual operation.
4737

4738
  """
4739
  # cluster data
4740
  data = {
4741
    "version": 1,
4742
    "cluster_name": sstore.GetClusterName(),
4743
    "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4744
    # we don't have job IDs
4745
    }
4746

    
4747
  # node data
4748
  node_results = {}
4749
  node_list = cfg.GetNodeList()
4750
  node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4751
  for nname in node_list:
4752
    ninfo = cfg.GetNodeInfo(nname)
4753
    if nname not in node_data or not isinstance(node_data[nname], dict):
4754
      raise errors.OpExecError("Can't get data for node %s" % nname)
4755
    remote_info = node_data[nname]
4756
    for attr in ['memory_total', 'memory_free',
4757
                 'vg_size', 'vg_free']:
4758
      if attr not in remote_info:
4759
        raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4760
                                 (nname, attr))
4761
      try:
4762
        int(remote_info[attr])
4763
      except ValueError, err:
4764
        raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4765
                                 " %s" % (nname, attr, str(err)))
4766
    pnr = {
4767
      "tags": list(ninfo.GetTags()),
4768
      "total_memory": utils.TryConvert(int, remote_info['memory_total']),
4769
      "free_memory": utils.TryConvert(int, remote_info['memory_free']),
4770
      "total_disk": utils.TryConvert(int, remote_info['vg_size']),
4771
      "free_disk": utils.TryConvert(int, remote_info['vg_free']),
4772
      "primary_ip": ninfo.primary_ip,
4773
      "secondary_ip": ninfo.secondary_ip,
4774
      }
4775
    node_results[nname] = pnr
4776
  data["nodes"] = node_results
4777

    
4778
  # instance data
4779
  instance_data = {}
4780
  i_list = cfg.GetInstanceList()
4781
  for iname in i_list:
4782
    iinfo = cfg.GetInstanceInfo(iname)
4783
    nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4784
                for n in iinfo.nics]
4785
    pir = {
4786
      "tags": list(iinfo.GetTags()),
4787
      "should_run": iinfo.status == "up",
4788
      "vcpus": iinfo.vcpus,
4789
      "memory": iinfo.memory,
4790
      "os": iinfo.os,
4791
      "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4792
      "nics": nic_data,
4793
      "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4794
      "disk_template": iinfo.disk_template,
4795
      }
4796
    instance_data[iname] = pir
4797

    
4798
  data["instances"] = instance_data
4799

    
4800
  return data
4801

    
4802

    
4803
def _IAllocatorAddNewInstance(data, op):
4804
  """Add new instance data to allocator structure.
4805

4806
  This in combination with _AllocatorGetClusterData will create the
4807
  correct structure needed as input for the allocator.
4808

4809
  The checks for the completeness of the opcode must have already been
4810
  done.
4811

4812
  """
4813
  if len(op.disks) != 2:
4814
    raise errors.OpExecError("Only two-disk configurations supported")
4815

    
4816
  disk_space = _ComputeDiskSize(op.disk_template,
4817
                                op.disks[0]["size"], op.disks[1]["size"])
4818

    
4819
  request = {
4820
    "type": "allocate",
4821
    "name": op.name,
4822
    "disk_template": op.disk_template,
4823
    "tags": op.tags,
4824
    "os": op.os,
4825
    "vcpus": op.vcpus,
4826
    "memory": op.mem_size,
4827
    "disks": op.disks,
4828
    "disk_space_total": disk_space,
4829
    "nics": op.nics,
4830
    }
4831
  data["request"] = request
4832

    
4833

    
4834
def _IAllocatorAddRelocateInstance(data, op):
4835
  """Add relocate instance data to allocator structure.
4836

4837
  This in combination with _IAllocatorGetClusterData will create the
4838
  correct structure needed as input for the allocator.
4839

4840
  The checks for the completeness of the opcode must have already been
4841
  done.
4842

4843
  """
4844
  request = {
4845
    "type": "replace_secondary",
4846
    "name": op.name,
4847
    }
4848
  data["request"] = request
4849

    
4850

    
4851
def _IAllocatorRun(name, data):
4852
  """Run an instance allocator and return the results.
4853

4854
  """
4855
  alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
4856
                                os.path.isfile)
4857
  if alloc_script is None:
4858
    raise errors.OpExecError("Can't find allocator '%s'" % name)
4859

    
4860
  fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
4861
  try:
4862
    os.write(fd, data)
4863
    os.close(fd)
4864
    result = utils.RunCmd([alloc_script, fin_name])
4865
    if result.failed:
4866
      raise errors.OpExecError("Instance allocator call failed: %s,"
4867
                               " output: %s" %
4868
                               (result.fail_reason, result.stdout))
4869
  finally:
4870
    os.unlink(fin_name)
4871
  return result.stdout
4872

    
4873

    
4874
def _IAllocatorValidateResult(data):
4875
  """Process the allocator results.
4876

4877
  """
4878
  try:
4879
    rdict = simplejson.loads(data)
4880
  except Exception, err:
4881
    raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
4882

    
4883
  if not isinstance(rdict, dict):
4884
    raise errors.OpExecError("Can't parse iallocator results: not a dict")
4885

    
4886
  for key in "success", "info", "nodes":
4887
    if key not in rdict:
4888
      raise errors.OpExecError("Can't parse iallocator results:"
4889
                               " missing key '%s'" % key)
4890

    
4891
  if not isinstance(rdict["nodes"], list):
4892
    raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
4893
                             " is not a list")
4894
  return rdict
4895

    
4896

    
4897
class LUTestAllocator(NoHooksLU):
4898
  """Run allocator tests.
4899

4900
  This LU runs the allocator tests
4901

4902
  """
4903
  _OP_REQP = ["direction", "mode", "name"]
4904

    
4905
  def CheckPrereq(self):
4906
    """Check prerequisites.
4907

4908
    This checks the opcode parameters depending on the director and mode test.
4909

4910
    """
4911
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4912
      for attr in ["name", "mem_size", "disks", "disk_template",
4913
                   "os", "tags", "nics", "vcpus"]:
4914
        if not hasattr(self.op, attr):
4915
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4916
                                     attr)
4917
      iname = self.cfg.ExpandInstanceName(self.op.name)
4918
      if iname is not None:
4919
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4920
                                   iname)
4921
      if not isinstance(self.op.nics, list):
4922
        raise errors.OpPrereqError("Invalid parameter 'nics'")
4923
      for row in self.op.nics:
4924
        if (not isinstance(row, dict) or
4925
            "mac" not in row or
4926
            "ip" not in row or
4927
            "bridge" not in row):
4928
          raise errors.OpPrereqError("Invalid contents of the"
4929
                                     " 'nics' parameter")
4930
      if not isinstance(self.op.disks, list):
4931
        raise errors.OpPrereqError("Invalid parameter 'disks'")
4932
      if len(self.op.disks) != 2:
4933
        raise errors.OpPrereqError("Only two-disk configurations supported")
4934
      for row in self.op.disks:
4935
        if (not isinstance(row, dict) or
4936
            "size" not in row or
4937
            not isinstance(row["size"], int) or
4938
            "mode" not in row or
4939
            row["mode"] not in ['r', 'w']):
4940
          raise errors.OpPrereqError("Invalid contents of the"
4941
                                     " 'disks' parameter")
4942
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
4943
      if not hasattr(self.op, "name"):
4944
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
4945
      fname = self.cfg.ExpandInstanceName(self.op.name)
4946
      if fname is None:
4947
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
4948
                                   self.op.name)
4949
      self.op.name = fname
4950
    else:
4951
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
4952
                                 self.op.mode)
4953

    
4954
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
4955
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
4956
        raise errors.OpPrereqError("Missing allocator name")
4957
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
4958
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
4959
                                 self.op.direction)
4960

    
4961
  def Exec(self, feedback_fn):
4962
    """Run the allocator test.
4963

4964
    """
4965
    data = _IAllocatorGetClusterData(self.cfg, self.sstore)
4966
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4967
      _IAllocatorAddNewInstance(data, self.op)
4968
    else:
4969
      _IAllocatorAddRelocateInstance(data, self.op)
4970

    
4971
    if _JSON_INDENT is None:
4972
      text = simplejson.dumps(data)
4973
    else:
4974
      text = simplejson.dumps(data, indent=_JSON_INDENT)
4975
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
4976
      result = text
4977
    else:
4978
      result = _IAllocatorRun(self.op.allocator, text)
4979
    return result