Chage the rapi to conform to new OpDiagnoseOS
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45 from ganeti import serializer
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement CheckPrereq which also fills in the opcode instance
53       with all the fields (even if as None)
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements (REQ_CLUSTER,
58       REQ_MASTER); note that all commands require root permissions
59
60   """
61   HPATH = None
62   HTYPE = None
63   _OP_REQP = []
64   REQ_CLUSTER = True
65   REQ_MASTER = True
66
67   def __init__(self, processor, op, cfg, sstore):
68     """Constructor for LogicalUnit.
69
70     This needs to be overriden in derived classes in order to check op
71     validity.
72
73     """
74     self.proc = processor
75     self.op = op
76     self.cfg = cfg
77     self.sstore = sstore
78     for attr_name in self._OP_REQP:
79       attr_val = getattr(op, attr_name, None)
80       if attr_val is None:
81         raise errors.OpPrereqError("Required parameter '%s' missing" %
82                                    attr_name)
83     if self.REQ_CLUSTER:
84       if not cfg.IsCluster():
85         raise errors.OpPrereqError("Cluster not initialized yet,"
86                                    " use 'gnt-cluster init' first.")
87       if self.REQ_MASTER:
88         master = sstore.GetMasterNode()
89         if master != utils.HostInfo().name:
90           raise errors.OpPrereqError("Commands must be run on the master"
91                                      " node %s" % master)
92
93   def CheckPrereq(self):
94     """Check prerequisites for this LU.
95
96     This method should check that the prerequisites for the execution
97     of this LU are fulfilled. It can do internode communication, but
98     it should be idempotent - no cluster or system changes are
99     allowed.
100
101     The method should raise errors.OpPrereqError in case something is
102     not fulfilled. Its return value is ignored.
103
104     This method should also update all the parameters of the opcode to
105     their canonical form; e.g. a short node name must be fully
106     expanded after this method has successfully completed (so that
107     hooks, logging, etc. work correctly).
108
109     """
110     raise NotImplementedError
111
112   def Exec(self, feedback_fn):
113     """Execute the LU.
114
115     This method should implement the actual work. It should raise
116     errors.OpExecError for failures that are somewhat dealt with in
117     code, or expected.
118
119     """
120     raise NotImplementedError
121
122   def BuildHooksEnv(self):
123     """Build hooks environment for this LU.
124
125     This method should return a three-node tuple consisting of: a dict
126     containing the environment that will be used for running the
127     specific hook for this LU, a list of node names on which the hook
128     should run before the execution, and a list of node names on which
129     the hook should run after the execution.
130
131     The keys of the dict must not have 'GANETI_' prefixed as this will
132     be handled in the hooks runner. Also note additional keys will be
133     added by the hooks runner. If the LU doesn't define any
134     environment, an empty dict (and not None) should be returned.
135
136     As for the node lists, the master should not be included in the
137     them, as it will be added by the hooks runner in case this LU
138     requires a cluster to run on (otherwise we don't have a node
139     list). No nodes should be returned as an empty list (and not
140     None).
141
142     Note that if the HPATH for a LU class is None, this function will
143     not be called.
144
145     """
146     raise NotImplementedError
147
148
149 class NoHooksLU(LogicalUnit):
150   """Simple LU which runs no hooks.
151
152   This LU is intended as a parent for other LogicalUnits which will
153   run no hooks, in order to reduce duplicate code.
154
155   """
156   HPATH = None
157   HTYPE = None
158
159   def BuildHooksEnv(self):
160     """Build hooks env.
161
162     This is a no-op, since we don't run hooks.
163
164     """
165     return {}, [], []
166
167
168 def _AddHostToEtcHosts(hostname):
169   """Wrapper around utils.SetEtcHostsEntry.
170
171   """
172   hi = utils.HostInfo(name=hostname)
173   utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
174
175
176 def _RemoveHostFromEtcHosts(hostname):
177   """Wrapper around utils.RemoveEtcHostsEntry.
178
179   """
180   hi = utils.HostInfo(name=hostname)
181   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
182   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
183
184
185 def _GetWantedNodes(lu, nodes):
186   """Returns list of checked and expanded node names.
187
188   Args:
189     nodes: List of nodes (strings) or None for all
190
191   """
192   if not isinstance(nodes, list):
193     raise errors.OpPrereqError("Invalid argument type 'nodes'")
194
195   if nodes:
196     wanted = []
197
198     for name in nodes:
199       node = lu.cfg.ExpandNodeName(name)
200       if node is None:
201         raise errors.OpPrereqError("No such node name '%s'" % name)
202       wanted.append(node)
203
204   else:
205     wanted = lu.cfg.GetNodeList()
206   return utils.NiceSort(wanted)
207
208
209 def _GetWantedInstances(lu, instances):
210   """Returns list of checked and expanded instance names.
211
212   Args:
213     instances: List of instances (strings) or None for all
214
215   """
216   if not isinstance(instances, list):
217     raise errors.OpPrereqError("Invalid argument type 'instances'")
218
219   if instances:
220     wanted = []
221
222     for name in instances:
223       instance = lu.cfg.ExpandInstanceName(name)
224       if instance is None:
225         raise errors.OpPrereqError("No such instance name '%s'" % name)
226       wanted.append(instance)
227
228   else:
229     wanted = lu.cfg.GetInstanceList()
230   return utils.NiceSort(wanted)
231
232
233 def _CheckOutputFields(static, dynamic, selected):
234   """Checks whether all selected fields are valid.
235
236   Args:
237     static: Static fields
238     dynamic: Dynamic fields
239
240   """
241   static_fields = frozenset(static)
242   dynamic_fields = frozenset(dynamic)
243
244   all_fields = static_fields | dynamic_fields
245
246   if not all_fields.issuperset(selected):
247     raise errors.OpPrereqError("Unknown output fields selected: %s"
248                                % ",".join(frozenset(selected).
249                                           difference(all_fields)))
250
251
252 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
253                           memory, vcpus, nics):
254   """Builds instance related env variables for hooks from single variables.
255
256   Args:
257     secondary_nodes: List of secondary nodes as strings
258   """
259   env = {
260     "OP_TARGET": name,
261     "INSTANCE_NAME": name,
262     "INSTANCE_PRIMARY": primary_node,
263     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
264     "INSTANCE_OS_TYPE": os_type,
265     "INSTANCE_STATUS": status,
266     "INSTANCE_MEMORY": memory,
267     "INSTANCE_VCPUS": vcpus,
268   }
269
270   if nics:
271     nic_count = len(nics)
272     for idx, (ip, bridge, mac) in enumerate(nics):
273       if ip is None:
274         ip = ""
275       env["INSTANCE_NIC%d_IP" % idx] = ip
276       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
277       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
278   else:
279     nic_count = 0
280
281   env["INSTANCE_NIC_COUNT"] = nic_count
282
283   return env
284
285
286 def _BuildInstanceHookEnvByObject(instance, override=None):
287   """Builds instance related env variables for hooks from an object.
288
289   Args:
290     instance: objects.Instance object of instance
291     override: dict of values to override
292   """
293   args = {
294     'name': instance.name,
295     'primary_node': instance.primary_node,
296     'secondary_nodes': instance.secondary_nodes,
297     'os_type': instance.os,
298     'status': instance.os,
299     'memory': instance.memory,
300     'vcpus': instance.vcpus,
301     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
302   }
303   if override:
304     args.update(override)
305   return _BuildInstanceHookEnv(**args)
306
307
308 def _UpdateKnownHosts(fullnode, ip, pubkey):
309   """Ensure a node has a correct known_hosts entry.
310
311   Args:
312     fullnode - Fully qualified domain name of host. (str)
313     ip       - IPv4 address of host (str)
314     pubkey   - the public key of the cluster
315
316   """
317   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
318     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
319   else:
320     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
321
322   inthere = False
323
324   save_lines = []
325   add_lines = []
326   removed = False
327
328   for rawline in f:
329     logger.Debug('read %s' % (repr(rawline),))
330
331     parts = rawline.rstrip('\r\n').split()
332
333     # Ignore unwanted lines
334     if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
335       fields = parts[0].split(',')
336       key = parts[2]
337
338       haveall = True
339       havesome = False
340       for spec in [ ip, fullnode ]:
341         if spec not in fields:
342           haveall = False
343         if spec in fields:
344           havesome = True
345
346       logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
347       if haveall and key == pubkey:
348         inthere = True
349         save_lines.append(rawline)
350         logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
351         continue
352
353       if havesome and (not haveall or key != pubkey):
354         removed = True
355         logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
356         continue
357
358     save_lines.append(rawline)
359
360   if not inthere:
361     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
362     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
363
364   if removed:
365     save_lines = save_lines + add_lines
366
367     # Write a new file and replace old.
368     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
369                                    constants.DATA_DIR)
370     newfile = os.fdopen(fd, 'w')
371     try:
372       newfile.write(''.join(save_lines))
373     finally:
374       newfile.close()
375     logger.Debug("Wrote new known_hosts.")
376     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
377
378   elif add_lines:
379     # Simply appending a new line will do the trick.
380     f.seek(0, 2)
381     for add in add_lines:
382       f.write(add)
383
384   f.close()
385
386
387 def _HasValidVG(vglist, vgname):
388   """Checks if the volume group list is valid.
389
390   A non-None return value means there's an error, and the return value
391   is the error message.
392
393   """
394   vgsize = vglist.get(vgname, None)
395   if vgsize is None:
396     return "volume group '%s' missing" % vgname
397   elif vgsize < 20480:
398     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
399             (vgname, vgsize))
400   return None
401
402
403 def _InitSSHSetup(node):
404   """Setup the SSH configuration for the cluster.
405
406
407   This generates a dsa keypair for root, adds the pub key to the
408   permitted hosts and adds the hostkey to its own known hosts.
409
410   Args:
411     node: the name of this host as a fqdn
412
413   """
414   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
415
416   for name in priv_key, pub_key:
417     if os.path.exists(name):
418       utils.CreateBackup(name)
419     utils.RemoveFile(name)
420
421   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
422                          "-f", priv_key,
423                          "-q", "-N", ""])
424   if result.failed:
425     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
426                              result.output)
427
428   f = open(pub_key, 'r')
429   try:
430     utils.AddAuthorizedKey(auth_keys, f.read(8192))
431   finally:
432     f.close()
433
434
435 def _InitGanetiServerSetup(ss):
436   """Setup the necessary configuration for the initial node daemon.
437
438   This creates the nodepass file containing the shared password for
439   the cluster and also generates the SSL certificate.
440
441   """
442   # Create pseudo random password
443   randpass = sha.new(os.urandom(64)).hexdigest()
444   # and write it into sstore
445   ss.SetKey(ss.SS_NODED_PASS, randpass)
446
447   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
448                          "-days", str(365*5), "-nodes", "-x509",
449                          "-keyout", constants.SSL_CERT_FILE,
450                          "-out", constants.SSL_CERT_FILE, "-batch"])
451   if result.failed:
452     raise errors.OpExecError("could not generate server ssl cert, command"
453                              " %s had exitcode %s and error message %s" %
454                              (result.cmd, result.exit_code, result.output))
455
456   os.chmod(constants.SSL_CERT_FILE, 0400)
457
458   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
459
460   if result.failed:
461     raise errors.OpExecError("Could not start the node daemon, command %s"
462                              " had exitcode %s and error %s" %
463                              (result.cmd, result.exit_code, result.output))
464
465
466 def _CheckInstanceBridgesExist(instance):
467   """Check that the brigdes needed by an instance exist.
468
469   """
470   # check bridges existance
471   brlist = [nic.bridge for nic in instance.nics]
472   if not rpc.call_bridges_exist(instance.primary_node, brlist):
473     raise errors.OpPrereqError("one or more target bridges %s does not"
474                                " exist on destination node '%s'" %
475                                (brlist, instance.primary_node))
476
477
478 class LUInitCluster(LogicalUnit):
479   """Initialise the cluster.
480
481   """
482   HPATH = "cluster-init"
483   HTYPE = constants.HTYPE_CLUSTER
484   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
485               "def_bridge", "master_netdev"]
486   REQ_CLUSTER = False
487
488   def BuildHooksEnv(self):
489     """Build hooks env.
490
491     Notes: Since we don't require a cluster, we must manually add
492     ourselves in the post-run node list.
493
494     """
495     env = {"OP_TARGET": self.op.cluster_name}
496     return env, [], [self.hostname.name]
497
498   def CheckPrereq(self):
499     """Verify that the passed name is a valid one.
500
501     """
502     if config.ConfigWriter.IsCluster():
503       raise errors.OpPrereqError("Cluster is already initialised")
504
505     if self.op.hypervisor_type == constants.HT_XEN_HVM31:
506       if not os.path.exists(constants.VNC_PASSWORD_FILE):
507         raise errors.OpPrereqError("Please prepare the cluster VNC"
508                                    "password file %s" %
509                                    constants.VNC_PASSWORD_FILE)
510
511     self.hostname = hostname = utils.HostInfo()
512
513     if hostname.ip.startswith("127."):
514       raise errors.OpPrereqError("This host's IP resolves to the private"
515                                  " range (%s). Please fix DNS or %s." %
516                                  (hostname.ip, constants.ETC_HOSTS))
517
518     if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
519                          source=constants.LOCALHOST_IP_ADDRESS):
520       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
521                                  " to %s,\nbut this ip address does not"
522                                  " belong to this host."
523                                  " Aborting." % hostname.ip)
524
525     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
526
527     if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
528                      timeout=5):
529       raise errors.OpPrereqError("Cluster IP already active. Aborting.")
530
531     secondary_ip = getattr(self.op, "secondary_ip", None)
532     if secondary_ip and not utils.IsValidIP(secondary_ip):
533       raise errors.OpPrereqError("Invalid secondary ip given")
534     if (secondary_ip and
535         secondary_ip != hostname.ip and
536         (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
537                            source=constants.LOCALHOST_IP_ADDRESS))):
538       raise errors.OpPrereqError("You gave %s as secondary IP,"
539                                  " but it does not belong to this host." %
540                                  secondary_ip)
541     self.secondary_ip = secondary_ip
542
543     # checks presence of the volume group given
544     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
545
546     if vgstatus:
547       raise errors.OpPrereqError("Error: %s" % vgstatus)
548
549     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
550                     self.op.mac_prefix):
551       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
552                                  self.op.mac_prefix)
553
554     if self.op.hypervisor_type not in constants.HYPER_TYPES:
555       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
556                                  self.op.hypervisor_type)
557
558     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
559     if result.failed:
560       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
561                                  (self.op.master_netdev,
562                                   result.output.strip()))
563
564     if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
565             os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
566       raise errors.OpPrereqError("Init.d script '%s' missing or not"
567                                  " executable." % constants.NODE_INITD_SCRIPT)
568
569   def Exec(self, feedback_fn):
570     """Initialize the cluster.
571
572     """
573     clustername = self.clustername
574     hostname = self.hostname
575
576     # set up the simple store
577     self.sstore = ss = ssconf.SimpleStore()
578     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
579     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
580     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
581     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
582     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
583
584     # set up the inter-node password and certificate
585     _InitGanetiServerSetup(ss)
586
587     # start the master ip
588     rpc.call_node_start_master(hostname.name)
589
590     # set up ssh config and /etc/hosts
591     f = open(constants.SSH_HOST_RSA_PUB, 'r')
592     try:
593       sshline = f.read()
594     finally:
595       f.close()
596     sshkey = sshline.split(" ")[1]
597
598     _AddHostToEtcHosts(hostname.name)
599
600     _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
601
602     _InitSSHSetup(hostname.name)
603
604     # init of cluster config file
605     self.cfg = cfgw = config.ConfigWriter()
606     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
607                     sshkey, self.op.mac_prefix,
608                     self.op.vg_name, self.op.def_bridge)
609
610
611 class LUDestroyCluster(NoHooksLU):
612   """Logical unit for destroying the cluster.
613
614   """
615   _OP_REQP = []
616
617   def CheckPrereq(self):
618     """Check prerequisites.
619
620     This checks whether the cluster is empty.
621
622     Any errors are signalled by raising errors.OpPrereqError.
623
624     """
625     master = self.sstore.GetMasterNode()
626
627     nodelist = self.cfg.GetNodeList()
628     if len(nodelist) != 1 or nodelist[0] != master:
629       raise errors.OpPrereqError("There are still %d node(s) in"
630                                  " this cluster." % (len(nodelist) - 1))
631     instancelist = self.cfg.GetInstanceList()
632     if instancelist:
633       raise errors.OpPrereqError("There are still %d instance(s) in"
634                                  " this cluster." % len(instancelist))
635
636   def Exec(self, feedback_fn):
637     """Destroys the cluster.
638
639     """
640     master = self.sstore.GetMasterNode()
641     if not rpc.call_node_stop_master(master):
642       raise errors.OpExecError("Could not disable the master role")
643     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
644     utils.CreateBackup(priv_key)
645     utils.CreateBackup(pub_key)
646     rpc.call_node_leave_cluster(master)
647
648
649 class LUVerifyCluster(NoHooksLU):
650   """Verifies the cluster status.
651
652   """
653   _OP_REQP = ["skip_checks"]
654
655   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
656                   remote_version, feedback_fn):
657     """Run multiple tests against a node.
658
659     Test list:
660       - compares ganeti version
661       - checks vg existance and size > 20G
662       - checks config file checksum
663       - checks ssh to other nodes
664
665     Args:
666       node: name of the node to check
667       file_list: required list of files
668       local_cksum: dictionary of local files and their checksums
669
670     """
671     # compares ganeti version
672     local_version = constants.PROTOCOL_VERSION
673     if not remote_version:
674       feedback_fn("  - ERROR: connection to %s failed" % (node))
675       return True
676
677     if local_version != remote_version:
678       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
679                       (local_version, node, remote_version))
680       return True
681
682     # checks vg existance and size > 20G
683
684     bad = False
685     if not vglist:
686       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
687                       (node,))
688       bad = True
689     else:
690       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
691       if vgstatus:
692         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
693         bad = True
694
695     # checks config file checksum
696     # checks ssh to any
697
698     if 'filelist' not in node_result:
699       bad = True
700       feedback_fn("  - ERROR: node hasn't returned file checksum data")
701     else:
702       remote_cksum = node_result['filelist']
703       for file_name in file_list:
704         if file_name not in remote_cksum:
705           bad = True
706           feedback_fn("  - ERROR: file '%s' missing" % file_name)
707         elif remote_cksum[file_name] != local_cksum[file_name]:
708           bad = True
709           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
710
711     if 'nodelist' not in node_result:
712       bad = True
713       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
714     else:
715       if node_result['nodelist']:
716         bad = True
717         for node in node_result['nodelist']:
718           feedback_fn("  - ERROR: communication with node '%s': %s" %
719                           (node, node_result['nodelist'][node]))
720     hyp_result = node_result.get('hypervisor', None)
721     if hyp_result is not None:
722       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
723     return bad
724
725   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
726                       node_instance, feedback_fn):
727     """Verify an instance.
728
729     This function checks to see if the required block devices are
730     available on the instance's node.
731
732     """
733     bad = False
734
735     node_current = instanceconfig.primary_node
736
737     node_vol_should = {}
738     instanceconfig.MapLVsByNode(node_vol_should)
739
740     for node in node_vol_should:
741       for volume in node_vol_should[node]:
742         if node not in node_vol_is or volume not in node_vol_is[node]:
743           feedback_fn("  - ERROR: volume %s missing on node %s" %
744                           (volume, node))
745           bad = True
746
747     if not instanceconfig.status == 'down':
748       if (node_current not in node_instance or
749           not instance in node_instance[node_current]):
750         feedback_fn("  - ERROR: instance %s not running on node %s" %
751                         (instance, node_current))
752         bad = True
753
754     for node in node_instance:
755       if (not node == node_current):
756         if instance in node_instance[node]:
757           feedback_fn("  - ERROR: instance %s should not run on node %s" %
758                           (instance, node))
759           bad = True
760
761     return bad
762
763   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
764     """Verify if there are any unknown volumes in the cluster.
765
766     The .os, .swap and backup volumes are ignored. All other volumes are
767     reported as unknown.
768
769     """
770     bad = False
771
772     for node in node_vol_is:
773       for volume in node_vol_is[node]:
774         if node not in node_vol_should or volume not in node_vol_should[node]:
775           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
776                       (volume, node))
777           bad = True
778     return bad
779
780   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
781     """Verify the list of running instances.
782
783     This checks what instances are running but unknown to the cluster.
784
785     """
786     bad = False
787     for node in node_instance:
788       for runninginstance in node_instance[node]:
789         if runninginstance not in instancelist:
790           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
791                           (runninginstance, node))
792           bad = True
793     return bad
794
795   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
796     """Verify N+1 Memory Resilience.
797
798     Check that if one single node dies we can still start all the instances it
799     was primary for.
800
801     """
802     bad = False
803
804     for node, nodeinfo in node_info.iteritems():
805       # This code checks that every node which is now listed as secondary has
806       # enough memory to host all instances it is supposed to should a single
807       # other node in the cluster fail.
808       # FIXME: not ready for failover to an arbitrary node
809       # FIXME: does not support file-backed instances
810       # WARNING: we currently take into account down instances as well as up
811       # ones, considering that even if they're down someone might want to start
812       # them even in the event of a node failure.
813       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
814         needed_mem = 0
815         for instance in instances:
816           needed_mem += instance_cfg[instance].memory
817         if nodeinfo['mfree'] < needed_mem:
818           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
819                       " failovers should node %s fail" % (node, prinode))
820           bad = True
821     return bad
822
823   def CheckPrereq(self):
824     """Check prerequisites.
825
826     Transform the list of checks we're going to skip into a set and check that
827     all its members are valid.
828
829     """
830     self.skip_set = frozenset(self.op.skip_checks)
831     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
832       raise errors.OpPrereqError("Invalid checks to be skipped specified")
833
834   def Exec(self, feedback_fn):
835     """Verify integrity of cluster, performing various test on nodes.
836
837     """
838     bad = False
839     feedback_fn("* Verifying global settings")
840     for msg in self.cfg.VerifyConfig():
841       feedback_fn("  - ERROR: %s" % msg)
842
843     vg_name = self.cfg.GetVGName()
844     nodelist = utils.NiceSort(self.cfg.GetNodeList())
845     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
846     i_non_redundant = [] # Non redundant instances
847     node_volume = {}
848     node_instance = {}
849     node_info = {}
850     instance_cfg = {}
851
852     # FIXME: verify OS list
853     # do local checksums
854     file_names = list(self.sstore.GetFileList())
855     file_names.append(constants.SSL_CERT_FILE)
856     file_names.append(constants.CLUSTER_CONF_FILE)
857     local_checksums = utils.FingerprintFiles(file_names)
858
859     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
860     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
861     all_instanceinfo = rpc.call_instance_list(nodelist)
862     all_vglist = rpc.call_vg_list(nodelist)
863     node_verify_param = {
864       'filelist': file_names,
865       'nodelist': nodelist,
866       'hypervisor': None,
867       }
868     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
869     all_rversion = rpc.call_version(nodelist)
870     all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
871
872     for node in nodelist:
873       feedback_fn("* Verifying node %s" % node)
874       result = self._VerifyNode(node, file_names, local_checksums,
875                                 all_vglist[node], all_nvinfo[node],
876                                 all_rversion[node], feedback_fn)
877       bad = bad or result
878
879       # node_volume
880       volumeinfo = all_volumeinfo[node]
881
882       if isinstance(volumeinfo, basestring):
883         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
884                     (node, volumeinfo[-400:].encode('string_escape')))
885         bad = True
886         node_volume[node] = {}
887       elif not isinstance(volumeinfo, dict):
888         feedback_fn("  - ERROR: connection to %s failed" % (node,))
889         bad = True
890         continue
891       else:
892         node_volume[node] = volumeinfo
893
894       # node_instance
895       nodeinstance = all_instanceinfo[node]
896       if type(nodeinstance) != list:
897         feedback_fn("  - ERROR: connection to %s failed" % (node,))
898         bad = True
899         continue
900
901       node_instance[node] = nodeinstance
902
903       # node_info
904       nodeinfo = all_ninfo[node]
905       if not isinstance(nodeinfo, dict):
906         feedback_fn("  - ERROR: connection to %s failed" % (node,))
907         bad = True
908         continue
909
910       try:
911         node_info[node] = {
912           "mfree": int(nodeinfo['memory_free']),
913           "dfree": int(nodeinfo['vg_free']),
914           "pinst": [],
915           "sinst": [],
916           # dictionary holding all instances this node is secondary for,
917           # grouped by their primary node. Each key is a cluster node, and each
918           # value is a list of instances which have the key as primary and the
919           # current node as secondary.  this is handy to calculate N+1 memory
920           # availability if you can only failover from a primary to its
921           # secondary.
922           "sinst-by-pnode": {},
923         }
924       except ValueError:
925         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
926         bad = True
927         continue
928
929     node_vol_should = {}
930
931     for instance in instancelist:
932       feedback_fn("* Verifying instance %s" % instance)
933       inst_config = self.cfg.GetInstanceInfo(instance)
934       result =  self._VerifyInstance(instance, inst_config, node_volume,
935                                      node_instance, feedback_fn)
936       bad = bad or result
937
938       inst_config.MapLVsByNode(node_vol_should)
939
940       instance_cfg[instance] = inst_config
941
942       pnode = inst_config.primary_node
943       if pnode in node_info:
944         node_info[pnode]['pinst'].append(instance)
945       else:
946         feedback_fn("  - ERROR: instance %s, connection to primary node"
947                     " %s failed" % (instance, pnode))
948         bad = True
949
950       # If the instance is non-redundant we cannot survive losing its primary
951       # node, so we are not N+1 compliant. On the other hand we have no disk
952       # templates with more than one secondary so that situation is not well
953       # supported either.
954       # FIXME: does not support file-backed instances
955       if len(inst_config.secondary_nodes) == 0:
956         i_non_redundant.append(instance)
957       elif len(inst_config.secondary_nodes) > 1:
958         feedback_fn("  - WARNING: multiple secondaries for instance %s"
959                     % instance)
960
961       for snode in inst_config.secondary_nodes:
962         if snode in node_info:
963           node_info[snode]['sinst'].append(instance)
964           if pnode not in node_info[snode]['sinst-by-pnode']:
965             node_info[snode]['sinst-by-pnode'][pnode] = []
966           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
967         else:
968           feedback_fn("  - ERROR: instance %s, connection to secondary node"
969                       " %s failed" % (instance, snode))
970
971     feedback_fn("* Verifying orphan volumes")
972     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
973                                        feedback_fn)
974     bad = bad or result
975
976     feedback_fn("* Verifying remaining instances")
977     result = self._VerifyOrphanInstances(instancelist, node_instance,
978                                          feedback_fn)
979     bad = bad or result
980
981     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
982       feedback_fn("* Verifying N+1 Memory redundancy")
983       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
984       bad = bad or result
985
986     feedback_fn("* Other Notes")
987     if i_non_redundant:
988       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
989                   % len(i_non_redundant))
990
991     return int(bad)
992
993
994 class LUVerifyDisks(NoHooksLU):
995   """Verifies the cluster disks status.
996
997   """
998   _OP_REQP = []
999
1000   def CheckPrereq(self):
1001     """Check prerequisites.
1002
1003     This has no prerequisites.
1004
1005     """
1006     pass
1007
1008   def Exec(self, feedback_fn):
1009     """Verify integrity of cluster disks.
1010
1011     """
1012     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1013
1014     vg_name = self.cfg.GetVGName()
1015     nodes = utils.NiceSort(self.cfg.GetNodeList())
1016     instances = [self.cfg.GetInstanceInfo(name)
1017                  for name in self.cfg.GetInstanceList()]
1018
1019     nv_dict = {}
1020     for inst in instances:
1021       inst_lvs = {}
1022       if (inst.status != "up" or
1023           inst.disk_template not in constants.DTS_NET_MIRROR):
1024         continue
1025       inst.MapLVsByNode(inst_lvs)
1026       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1027       for node, vol_list in inst_lvs.iteritems():
1028         for vol in vol_list:
1029           nv_dict[(node, vol)] = inst
1030
1031     if not nv_dict:
1032       return result
1033
1034     node_lvs = rpc.call_volume_list(nodes, vg_name)
1035
1036     to_act = set()
1037     for node in nodes:
1038       # node_volume
1039       lvs = node_lvs[node]
1040
1041       if isinstance(lvs, basestring):
1042         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1043         res_nlvm[node] = lvs
1044       elif not isinstance(lvs, dict):
1045         logger.Info("connection to node %s failed or invalid data returned" %
1046                     (node,))
1047         res_nodes.append(node)
1048         continue
1049
1050       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1051         inst = nv_dict.pop((node, lv_name), None)
1052         if (not lv_online and inst is not None
1053             and inst.name not in res_instances):
1054           res_instances.append(inst.name)
1055
1056     # any leftover items in nv_dict are missing LVs, let's arrange the
1057     # data better
1058     for key, inst in nv_dict.iteritems():
1059       if inst.name not in res_missing:
1060         res_missing[inst.name] = []
1061       res_missing[inst.name].append(key)
1062
1063     return result
1064
1065
1066 class LURenameCluster(LogicalUnit):
1067   """Rename the cluster.
1068
1069   """
1070   HPATH = "cluster-rename"
1071   HTYPE = constants.HTYPE_CLUSTER
1072   _OP_REQP = ["name"]
1073
1074   def BuildHooksEnv(self):
1075     """Build hooks env.
1076
1077     """
1078     env = {
1079       "OP_TARGET": self.sstore.GetClusterName(),
1080       "NEW_NAME": self.op.name,
1081       }
1082     mn = self.sstore.GetMasterNode()
1083     return env, [mn], [mn]
1084
1085   def CheckPrereq(self):
1086     """Verify that the passed name is a valid one.
1087
1088     """
1089     hostname = utils.HostInfo(self.op.name)
1090
1091     new_name = hostname.name
1092     self.ip = new_ip = hostname.ip
1093     old_name = self.sstore.GetClusterName()
1094     old_ip = self.sstore.GetMasterIP()
1095     if new_name == old_name and new_ip == old_ip:
1096       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1097                                  " cluster has changed")
1098     if new_ip != old_ip:
1099       result = utils.RunCmd(["fping", "-q", new_ip])
1100       if not result.failed:
1101         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1102                                    " reachable on the network. Aborting." %
1103                                    new_ip)
1104
1105     self.op.name = new_name
1106
1107   def Exec(self, feedback_fn):
1108     """Rename the cluster.
1109
1110     """
1111     clustername = self.op.name
1112     ip = self.ip
1113     ss = self.sstore
1114
1115     # shutdown the master IP
1116     master = ss.GetMasterNode()
1117     if not rpc.call_node_stop_master(master):
1118       raise errors.OpExecError("Could not disable the master role")
1119
1120     try:
1121       # modify the sstore
1122       ss.SetKey(ss.SS_MASTER_IP, ip)
1123       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1124
1125       # Distribute updated ss config to all nodes
1126       myself = self.cfg.GetNodeInfo(master)
1127       dist_nodes = self.cfg.GetNodeList()
1128       if myself.name in dist_nodes:
1129         dist_nodes.remove(myself.name)
1130
1131       logger.Debug("Copying updated ssconf data to all nodes")
1132       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1133         fname = ss.KeyToFilename(keyname)
1134         result = rpc.call_upload_file(dist_nodes, fname)
1135         for to_node in dist_nodes:
1136           if not result[to_node]:
1137             logger.Error("copy of file %s to node %s failed" %
1138                          (fname, to_node))
1139     finally:
1140       if not rpc.call_node_start_master(master):
1141         logger.Error("Could not re-enable the master role on the master,"
1142                      " please restart manually.")
1143
1144
1145 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1146   """Sleep and poll for an instance's disk to sync.
1147
1148   """
1149   if not instance.disks:
1150     return True
1151
1152   if not oneshot:
1153     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1154
1155   node = instance.primary_node
1156
1157   for dev in instance.disks:
1158     cfgw.SetDiskID(dev, node)
1159
1160   retries = 0
1161   while True:
1162     max_time = 0
1163     done = True
1164     cumul_degraded = False
1165     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1166     if not rstats:
1167       proc.LogWarning("Can't get any data from node %s" % node)
1168       retries += 1
1169       if retries >= 10:
1170         raise errors.RemoteError("Can't contact node %s for mirror data,"
1171                                  " aborting." % node)
1172       time.sleep(6)
1173       continue
1174     retries = 0
1175     for i in range(len(rstats)):
1176       mstat = rstats[i]
1177       if mstat is None:
1178         proc.LogWarning("Can't compute data for node %s/%s" %
1179                         (node, instance.disks[i].iv_name))
1180         continue
1181       # we ignore the ldisk parameter
1182       perc_done, est_time, is_degraded, _ = mstat
1183       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1184       if perc_done is not None:
1185         done = False
1186         if est_time is not None:
1187           rem_time = "%d estimated seconds remaining" % est_time
1188           max_time = est_time
1189         else:
1190           rem_time = "no time estimate"
1191         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1192                      (instance.disks[i].iv_name, perc_done, rem_time))
1193     if done or oneshot:
1194       break
1195
1196     if unlock:
1197       utils.Unlock('cmd')
1198     try:
1199       time.sleep(min(60, max_time))
1200     finally:
1201       if unlock:
1202         utils.Lock('cmd')
1203
1204   if done:
1205     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1206   return not cumul_degraded
1207
1208
1209 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1210   """Check that mirrors are not degraded.
1211
1212   The ldisk parameter, if True, will change the test from the
1213   is_degraded attribute (which represents overall non-ok status for
1214   the device(s)) to the ldisk (representing the local storage status).
1215
1216   """
1217   cfgw.SetDiskID(dev, node)
1218   if ldisk:
1219     idx = 6
1220   else:
1221     idx = 5
1222
1223   result = True
1224   if on_primary or dev.AssembleOnSecondary():
1225     rstats = rpc.call_blockdev_find(node, dev)
1226     if not rstats:
1227       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1228       result = False
1229     else:
1230       result = result and (not rstats[idx])
1231   if dev.children:
1232     for child in dev.children:
1233       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1234
1235   return result
1236
1237
1238 class LUDiagnoseOS(NoHooksLU):
1239   """Logical unit for OS diagnose/query.
1240
1241   """
1242   _OP_REQP = ["output_fields", "names"]
1243
1244   def CheckPrereq(self):
1245     """Check prerequisites.
1246
1247     This always succeeds, since this is a pure query LU.
1248
1249     """
1250     if self.op.names:
1251       raise errors.OpPrereqError("Selective OS query not supported")
1252
1253     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1254     _CheckOutputFields(static=[],
1255                        dynamic=self.dynamic_fields,
1256                        selected=self.op.output_fields)
1257
1258   @staticmethod
1259   def _DiagnoseByOS(node_list, rlist):
1260     """Remaps a per-node return list into an a per-os per-node dictionary
1261
1262       Args:
1263         node_list: a list with the names of all nodes
1264         rlist: a map with node names as keys and OS objects as values
1265
1266       Returns:
1267         map: a map with osnames as keys and as value another map, with
1268              nodes as
1269              keys and list of OS objects as values
1270              e.g. {"debian-etch": {"node1": [<object>,...],
1271                                    "node2": [<object>,]}
1272                   }
1273
1274     """
1275     all_os = {}
1276     for node_name, nr in rlist.iteritems():
1277       if not nr:
1278         continue
1279       for os in nr:
1280         if os.name not in all_os:
1281           # build a list of nodes for this os containing empty lists
1282           # for each node in node_list
1283           all_os[os.name] = {}
1284           for nname in node_list:
1285             all_os[os.name][nname] = []
1286         all_os[os.name][node_name].append(os)
1287     return all_os
1288
1289   def Exec(self, feedback_fn):
1290     """Compute the list of OSes.
1291
1292     """
1293     node_list = self.cfg.GetNodeList()
1294     node_data = rpc.call_os_diagnose(node_list)
1295     if node_data == False:
1296       raise errors.OpExecError("Can't gather the list of OSes")
1297     pol = self._DiagnoseByOS(node_list, node_data)
1298     output = []
1299     for os_name, os_data in pol.iteritems():
1300       row = []
1301       for field in self.op.output_fields:
1302         if field == "name":
1303           val = os_name
1304         elif field == "valid":
1305           val = utils.all([osl and osl[0] for osl in os_data.values()])
1306         elif field == "node_status":
1307           val = {}
1308           for node_name, nos_list in os_data.iteritems():
1309             val[node_name] = [(v.status, v.path) for v in nos_list]
1310         else:
1311           raise errors.ParameterError(field)
1312         row.append(val)
1313       output.append(row)
1314
1315     return output
1316
1317
1318 class LURemoveNode(LogicalUnit):
1319   """Logical unit for removing a node.
1320
1321   """
1322   HPATH = "node-remove"
1323   HTYPE = constants.HTYPE_NODE
1324   _OP_REQP = ["node_name"]
1325
1326   def BuildHooksEnv(self):
1327     """Build hooks env.
1328
1329     This doesn't run on the target node in the pre phase as a failed
1330     node would not allows itself to run.
1331
1332     """
1333     env = {
1334       "OP_TARGET": self.op.node_name,
1335       "NODE_NAME": self.op.node_name,
1336       }
1337     all_nodes = self.cfg.GetNodeList()
1338     all_nodes.remove(self.op.node_name)
1339     return env, all_nodes, all_nodes
1340
1341   def CheckPrereq(self):
1342     """Check prerequisites.
1343
1344     This checks:
1345      - the node exists in the configuration
1346      - it does not have primary or secondary instances
1347      - it's not the master
1348
1349     Any errors are signalled by raising errors.OpPrereqError.
1350
1351     """
1352     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1353     if node is None:
1354       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1355
1356     instance_list = self.cfg.GetInstanceList()
1357
1358     masternode = self.sstore.GetMasterNode()
1359     if node.name == masternode:
1360       raise errors.OpPrereqError("Node is the master node,"
1361                                  " you need to failover first.")
1362
1363     for instance_name in instance_list:
1364       instance = self.cfg.GetInstanceInfo(instance_name)
1365       if node.name == instance.primary_node:
1366         raise errors.OpPrereqError("Instance %s still running on the node,"
1367                                    " please remove first." % instance_name)
1368       if node.name in instance.secondary_nodes:
1369         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1370                                    " please remove first." % instance_name)
1371     self.op.node_name = node.name
1372     self.node = node
1373
1374   def Exec(self, feedback_fn):
1375     """Removes the node from the cluster.
1376
1377     """
1378     node = self.node
1379     logger.Info("stopping the node daemon and removing configs from node %s" %
1380                 node.name)
1381
1382     rpc.call_node_leave_cluster(node.name)
1383
1384     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1385
1386     logger.Info("Removing node %s from config" % node.name)
1387
1388     self.cfg.RemoveNode(node.name)
1389
1390     _RemoveHostFromEtcHosts(node.name)
1391
1392
1393 class LUQueryNodes(NoHooksLU):
1394   """Logical unit for querying nodes.
1395
1396   """
1397   _OP_REQP = ["output_fields", "names"]
1398
1399   def CheckPrereq(self):
1400     """Check prerequisites.
1401
1402     This checks that the fields required are valid output fields.
1403
1404     """
1405     self.dynamic_fields = frozenset(["dtotal", "dfree",
1406                                      "mtotal", "mnode", "mfree",
1407                                      "bootid"])
1408
1409     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1410                                "pinst_list", "sinst_list",
1411                                "pip", "sip"],
1412                        dynamic=self.dynamic_fields,
1413                        selected=self.op.output_fields)
1414
1415     self.wanted = _GetWantedNodes(self, self.op.names)
1416
1417   def Exec(self, feedback_fn):
1418     """Computes the list of nodes and their attributes.
1419
1420     """
1421     nodenames = self.wanted
1422     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1423
1424     # begin data gathering
1425
1426     if self.dynamic_fields.intersection(self.op.output_fields):
1427       live_data = {}
1428       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1429       for name in nodenames:
1430         nodeinfo = node_data.get(name, None)
1431         if nodeinfo:
1432           live_data[name] = {
1433             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1434             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1435             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1436             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1437             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1438             "bootid": nodeinfo['bootid'],
1439             }
1440         else:
1441           live_data[name] = {}
1442     else:
1443       live_data = dict.fromkeys(nodenames, {})
1444
1445     node_to_primary = dict([(name, set()) for name in nodenames])
1446     node_to_secondary = dict([(name, set()) for name in nodenames])
1447
1448     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1449                              "sinst_cnt", "sinst_list"))
1450     if inst_fields & frozenset(self.op.output_fields):
1451       instancelist = self.cfg.GetInstanceList()
1452
1453       for instance_name in instancelist:
1454         inst = self.cfg.GetInstanceInfo(instance_name)
1455         if inst.primary_node in node_to_primary:
1456           node_to_primary[inst.primary_node].add(inst.name)
1457         for secnode in inst.secondary_nodes:
1458           if secnode in node_to_secondary:
1459             node_to_secondary[secnode].add(inst.name)
1460
1461     # end data gathering
1462
1463     output = []
1464     for node in nodelist:
1465       node_output = []
1466       for field in self.op.output_fields:
1467         if field == "name":
1468           val = node.name
1469         elif field == "pinst_list":
1470           val = list(node_to_primary[node.name])
1471         elif field == "sinst_list":
1472           val = list(node_to_secondary[node.name])
1473         elif field == "pinst_cnt":
1474           val = len(node_to_primary[node.name])
1475         elif field == "sinst_cnt":
1476           val = len(node_to_secondary[node.name])
1477         elif field == "pip":
1478           val = node.primary_ip
1479         elif field == "sip":
1480           val = node.secondary_ip
1481         elif field in self.dynamic_fields:
1482           val = live_data[node.name].get(field, None)
1483         else:
1484           raise errors.ParameterError(field)
1485         node_output.append(val)
1486       output.append(node_output)
1487
1488     return output
1489
1490
1491 class LUQueryNodeVolumes(NoHooksLU):
1492   """Logical unit for getting volumes on node(s).
1493
1494   """
1495   _OP_REQP = ["nodes", "output_fields"]
1496
1497   def CheckPrereq(self):
1498     """Check prerequisites.
1499
1500     This checks that the fields required are valid output fields.
1501
1502     """
1503     self.nodes = _GetWantedNodes(self, self.op.nodes)
1504
1505     _CheckOutputFields(static=["node"],
1506                        dynamic=["phys", "vg", "name", "size", "instance"],
1507                        selected=self.op.output_fields)
1508
1509
1510   def Exec(self, feedback_fn):
1511     """Computes the list of nodes and their attributes.
1512
1513     """
1514     nodenames = self.nodes
1515     volumes = rpc.call_node_volumes(nodenames)
1516
1517     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1518              in self.cfg.GetInstanceList()]
1519
1520     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1521
1522     output = []
1523     for node in nodenames:
1524       if node not in volumes or not volumes[node]:
1525         continue
1526
1527       node_vols = volumes[node][:]
1528       node_vols.sort(key=lambda vol: vol['dev'])
1529
1530       for vol in node_vols:
1531         node_output = []
1532         for field in self.op.output_fields:
1533           if field == "node":
1534             val = node
1535           elif field == "phys":
1536             val = vol['dev']
1537           elif field == "vg":
1538             val = vol['vg']
1539           elif field == "name":
1540             val = vol['name']
1541           elif field == "size":
1542             val = int(float(vol['size']))
1543           elif field == "instance":
1544             for inst in ilist:
1545               if node not in lv_by_node[inst]:
1546                 continue
1547               if vol['name'] in lv_by_node[inst][node]:
1548                 val = inst.name
1549                 break
1550             else:
1551               val = '-'
1552           else:
1553             raise errors.ParameterError(field)
1554           node_output.append(str(val))
1555
1556         output.append(node_output)
1557
1558     return output
1559
1560
1561 class LUAddNode(LogicalUnit):
1562   """Logical unit for adding node to the cluster.
1563
1564   """
1565   HPATH = "node-add"
1566   HTYPE = constants.HTYPE_NODE
1567   _OP_REQP = ["node_name"]
1568
1569   def BuildHooksEnv(self):
1570     """Build hooks env.
1571
1572     This will run on all nodes before, and on all nodes + the new node after.
1573
1574     """
1575     env = {
1576       "OP_TARGET": self.op.node_name,
1577       "NODE_NAME": self.op.node_name,
1578       "NODE_PIP": self.op.primary_ip,
1579       "NODE_SIP": self.op.secondary_ip,
1580       }
1581     nodes_0 = self.cfg.GetNodeList()
1582     nodes_1 = nodes_0 + [self.op.node_name, ]
1583     return env, nodes_0, nodes_1
1584
1585   def CheckPrereq(self):
1586     """Check prerequisites.
1587
1588     This checks:
1589      - the new node is not already in the config
1590      - it is resolvable
1591      - its parameters (single/dual homed) matches the cluster
1592
1593     Any errors are signalled by raising errors.OpPrereqError.
1594
1595     """
1596     node_name = self.op.node_name
1597     cfg = self.cfg
1598
1599     dns_data = utils.HostInfo(node_name)
1600
1601     node = dns_data.name
1602     primary_ip = self.op.primary_ip = dns_data.ip
1603     secondary_ip = getattr(self.op, "secondary_ip", None)
1604     if secondary_ip is None:
1605       secondary_ip = primary_ip
1606     if not utils.IsValidIP(secondary_ip):
1607       raise errors.OpPrereqError("Invalid secondary IP given")
1608     self.op.secondary_ip = secondary_ip
1609     node_list = cfg.GetNodeList()
1610     if node in node_list:
1611       raise errors.OpPrereqError("Node %s is already in the configuration"
1612                                  % node)
1613
1614     for existing_node_name in node_list:
1615       existing_node = cfg.GetNodeInfo(existing_node_name)
1616       if (existing_node.primary_ip == primary_ip or
1617           existing_node.secondary_ip == primary_ip or
1618           existing_node.primary_ip == secondary_ip or
1619           existing_node.secondary_ip == secondary_ip):
1620         raise errors.OpPrereqError("New node ip address(es) conflict with"
1621                                    " existing node %s" % existing_node.name)
1622
1623     # check that the type of the node (single versus dual homed) is the
1624     # same as for the master
1625     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1626     master_singlehomed = myself.secondary_ip == myself.primary_ip
1627     newbie_singlehomed = secondary_ip == primary_ip
1628     if master_singlehomed != newbie_singlehomed:
1629       if master_singlehomed:
1630         raise errors.OpPrereqError("The master has no private ip but the"
1631                                    " new node has one")
1632       else:
1633         raise errors.OpPrereqError("The master has a private ip but the"
1634                                    " new node doesn't have one")
1635
1636     # checks reachablity
1637     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1638       raise errors.OpPrereqError("Node not reachable by ping")
1639
1640     if not newbie_singlehomed:
1641       # check reachability from my secondary ip to newbie's secondary ip
1642       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1643                            source=myself.secondary_ip):
1644         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1645                                    " based ping to noded port")
1646
1647     self.new_node = objects.Node(name=node,
1648                                  primary_ip=primary_ip,
1649                                  secondary_ip=secondary_ip)
1650
1651     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1652       if not os.path.exists(constants.VNC_PASSWORD_FILE):
1653         raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1654                                    constants.VNC_PASSWORD_FILE)
1655
1656   def Exec(self, feedback_fn):
1657     """Adds the new node to the cluster.
1658
1659     """
1660     new_node = self.new_node
1661     node = new_node.name
1662
1663     # set up inter-node password and certificate and restarts the node daemon
1664     gntpass = self.sstore.GetNodeDaemonPassword()
1665     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1666       raise errors.OpExecError("ganeti password corruption detected")
1667     f = open(constants.SSL_CERT_FILE)
1668     try:
1669       gntpem = f.read(8192)
1670     finally:
1671       f.close()
1672     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1673     # so we use this to detect an invalid certificate; as long as the
1674     # cert doesn't contain this, the here-document will be correctly
1675     # parsed by the shell sequence below
1676     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1677       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1678     if not gntpem.endswith("\n"):
1679       raise errors.OpExecError("PEM must end with newline")
1680     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1681
1682     # and then connect with ssh to set password and start ganeti-noded
1683     # note that all the below variables are sanitized at this point,
1684     # either by being constants or by the checks above
1685     ss = self.sstore
1686     mycommand = ("umask 077 && "
1687                  "echo '%s' > '%s' && "
1688                  "cat > '%s' << '!EOF.' && \n"
1689                  "%s!EOF.\n%s restart" %
1690                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1691                   constants.SSL_CERT_FILE, gntpem,
1692                   constants.NODE_INITD_SCRIPT))
1693
1694     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1695     if result.failed:
1696       raise errors.OpExecError("Remote command on node %s, error: %s,"
1697                                " output: %s" %
1698                                (node, result.fail_reason, result.output))
1699
1700     # check connectivity
1701     time.sleep(4)
1702
1703     result = rpc.call_version([node])[node]
1704     if result:
1705       if constants.PROTOCOL_VERSION == result:
1706         logger.Info("communication to node %s fine, sw version %s match" %
1707                     (node, result))
1708       else:
1709         raise errors.OpExecError("Version mismatch master version %s,"
1710                                  " node version %s" %
1711                                  (constants.PROTOCOL_VERSION, result))
1712     else:
1713       raise errors.OpExecError("Cannot get version from the new node")
1714
1715     # setup ssh on node
1716     logger.Info("copy ssh key to node %s" % node)
1717     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1718     keyarray = []
1719     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1720                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1721                 priv_key, pub_key]
1722
1723     for i in keyfiles:
1724       f = open(i, 'r')
1725       try:
1726         keyarray.append(f.read())
1727       finally:
1728         f.close()
1729
1730     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1731                                keyarray[3], keyarray[4], keyarray[5])
1732
1733     if not result:
1734       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1735
1736     # Add node to our /etc/hosts, and add key to known_hosts
1737     _AddHostToEtcHosts(new_node.name)
1738
1739     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1740                       self.cfg.GetHostKey())
1741
1742     if new_node.secondary_ip != new_node.primary_ip:
1743       if not rpc.call_node_tcp_ping(new_node.name,
1744                                     constants.LOCALHOST_IP_ADDRESS,
1745                                     new_node.secondary_ip,
1746                                     constants.DEFAULT_NODED_PORT,
1747                                     10, False):
1748         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1749                                  " you gave (%s). Please fix and re-run this"
1750                                  " command." % new_node.secondary_ip)
1751
1752     success, msg = ssh.VerifyNodeHostname(node)
1753     if not success:
1754       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1755                                " than the one the resolver gives: %s."
1756                                " Please fix and re-run this command." %
1757                                (node, msg))
1758
1759     # Distribute updated /etc/hosts and known_hosts to all nodes,
1760     # including the node just added
1761     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1762     dist_nodes = self.cfg.GetNodeList() + [node]
1763     if myself.name in dist_nodes:
1764       dist_nodes.remove(myself.name)
1765
1766     logger.Debug("Copying hosts and known_hosts to all nodes")
1767     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1768       result = rpc.call_upload_file(dist_nodes, fname)
1769       for to_node in dist_nodes:
1770         if not result[to_node]:
1771           logger.Error("copy of file %s to node %s failed" %
1772                        (fname, to_node))
1773
1774     to_copy = ss.GetFileList()
1775     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1776       to_copy.append(constants.VNC_PASSWORD_FILE)
1777     for fname in to_copy:
1778       if not ssh.CopyFileToNode(node, fname):
1779         logger.Error("could not copy file %s to node %s" % (fname, node))
1780
1781     logger.Info("adding node %s to cluster.conf" % node)
1782     self.cfg.AddNode(new_node)
1783
1784
1785 class LUMasterFailover(LogicalUnit):
1786   """Failover the master node to the current node.
1787
1788   This is a special LU in that it must run on a non-master node.
1789
1790   """
1791   HPATH = "master-failover"
1792   HTYPE = constants.HTYPE_CLUSTER
1793   REQ_MASTER = False
1794   _OP_REQP = []
1795
1796   def BuildHooksEnv(self):
1797     """Build hooks env.
1798
1799     This will run on the new master only in the pre phase, and on all
1800     the nodes in the post phase.
1801
1802     """
1803     env = {
1804       "OP_TARGET": self.new_master,
1805       "NEW_MASTER": self.new_master,
1806       "OLD_MASTER": self.old_master,
1807       }
1808     return env, [self.new_master], self.cfg.GetNodeList()
1809
1810   def CheckPrereq(self):
1811     """Check prerequisites.
1812
1813     This checks that we are not already the master.
1814
1815     """
1816     self.new_master = utils.HostInfo().name
1817     self.old_master = self.sstore.GetMasterNode()
1818
1819     if self.old_master == self.new_master:
1820       raise errors.OpPrereqError("This commands must be run on the node"
1821                                  " where you want the new master to be."
1822                                  " %s is already the master" %
1823                                  self.old_master)
1824
1825   def Exec(self, feedback_fn):
1826     """Failover the master node.
1827
1828     This command, when run on a non-master node, will cause the current
1829     master to cease being master, and the non-master to become new
1830     master.
1831
1832     """
1833     #TODO: do not rely on gethostname returning the FQDN
1834     logger.Info("setting master to %s, old master: %s" %
1835                 (self.new_master, self.old_master))
1836
1837     if not rpc.call_node_stop_master(self.old_master):
1838       logger.Error("could disable the master role on the old master"
1839                    " %s, please disable manually" % self.old_master)
1840
1841     ss = self.sstore
1842     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1843     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1844                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1845       logger.Error("could not distribute the new simple store master file"
1846                    " to the other nodes, please check.")
1847
1848     if not rpc.call_node_start_master(self.new_master):
1849       logger.Error("could not start the master role on the new master"
1850                    " %s, please check" % self.new_master)
1851       feedback_fn("Error in activating the master IP on the new master,"
1852                   " please fix manually.")
1853
1854
1855
1856 class LUQueryClusterInfo(NoHooksLU):
1857   """Query cluster configuration.
1858
1859   """
1860   _OP_REQP = []
1861   REQ_MASTER = False
1862
1863   def CheckPrereq(self):
1864     """No prerequsites needed for this LU.
1865
1866     """
1867     pass
1868
1869   def Exec(self, feedback_fn):
1870     """Return cluster config.
1871
1872     """
1873     result = {
1874       "name": self.sstore.GetClusterName(),
1875       "software_version": constants.RELEASE_VERSION,
1876       "protocol_version": constants.PROTOCOL_VERSION,
1877       "config_version": constants.CONFIG_VERSION,
1878       "os_api_version": constants.OS_API_VERSION,
1879       "export_version": constants.EXPORT_VERSION,
1880       "master": self.sstore.GetMasterNode(),
1881       "architecture": (platform.architecture()[0], platform.machine()),
1882       }
1883
1884     return result
1885
1886
1887 class LUClusterCopyFile(NoHooksLU):
1888   """Copy file to cluster.
1889
1890   """
1891   _OP_REQP = ["nodes", "filename"]
1892
1893   def CheckPrereq(self):
1894     """Check prerequisites.
1895
1896     It should check that the named file exists and that the given list
1897     of nodes is valid.
1898
1899     """
1900     if not os.path.exists(self.op.filename):
1901       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1902
1903     self.nodes = _GetWantedNodes(self, self.op.nodes)
1904
1905   def Exec(self, feedback_fn):
1906     """Copy a file from master to some nodes.
1907
1908     Args:
1909       opts - class with options as members
1910       args - list containing a single element, the file name
1911     Opts used:
1912       nodes - list containing the name of target nodes; if empty, all nodes
1913
1914     """
1915     filename = self.op.filename
1916
1917     myname = utils.HostInfo().name
1918
1919     for node in self.nodes:
1920       if node == myname:
1921         continue
1922       if not ssh.CopyFileToNode(node, filename):
1923         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1924
1925
1926 class LUDumpClusterConfig(NoHooksLU):
1927   """Return a text-representation of the cluster-config.
1928
1929   """
1930   _OP_REQP = []
1931
1932   def CheckPrereq(self):
1933     """No prerequisites.
1934
1935     """
1936     pass
1937
1938   def Exec(self, feedback_fn):
1939     """Dump a representation of the cluster config to the standard output.
1940
1941     """
1942     return self.cfg.DumpConfig()
1943
1944
1945 class LURunClusterCommand(NoHooksLU):
1946   """Run a command on some nodes.
1947
1948   """
1949   _OP_REQP = ["command", "nodes"]
1950
1951   def CheckPrereq(self):
1952     """Check prerequisites.
1953
1954     It checks that the given list of nodes is valid.
1955
1956     """
1957     self.nodes = _GetWantedNodes(self, self.op.nodes)
1958
1959   def Exec(self, feedback_fn):
1960     """Run a command on some nodes.
1961
1962     """
1963     # put the master at the end of the nodes list
1964     master_node = self.sstore.GetMasterNode()
1965     if master_node in self.nodes:
1966       self.nodes.remove(master_node)
1967       self.nodes.append(master_node)
1968
1969     data = []
1970     for node in self.nodes:
1971       result = ssh.SSHCall(node, "root", self.op.command)
1972       data.append((node, result.output, result.exit_code))
1973
1974     return data
1975
1976
1977 class LUActivateInstanceDisks(NoHooksLU):
1978   """Bring up an instance's disks.
1979
1980   """
1981   _OP_REQP = ["instance_name"]
1982
1983   def CheckPrereq(self):
1984     """Check prerequisites.
1985
1986     This checks that the instance is in the cluster.
1987
1988     """
1989     instance = self.cfg.GetInstanceInfo(
1990       self.cfg.ExpandInstanceName(self.op.instance_name))
1991     if instance is None:
1992       raise errors.OpPrereqError("Instance '%s' not known" %
1993                                  self.op.instance_name)
1994     self.instance = instance
1995
1996
1997   def Exec(self, feedback_fn):
1998     """Activate the disks.
1999
2000     """
2001     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2002     if not disks_ok:
2003       raise errors.OpExecError("Cannot activate block devices")
2004
2005     return disks_info
2006
2007
2008 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2009   """Prepare the block devices for an instance.
2010
2011   This sets up the block devices on all nodes.
2012
2013   Args:
2014     instance: a ganeti.objects.Instance object
2015     ignore_secondaries: if true, errors on secondary nodes won't result
2016                         in an error return from the function
2017
2018   Returns:
2019     false if the operation failed
2020     list of (host, instance_visible_name, node_visible_name) if the operation
2021          suceeded with the mapping from node devices to instance devices
2022   """
2023   device_info = []
2024   disks_ok = True
2025   iname = instance.name
2026   # With the two passes mechanism we try to reduce the window of
2027   # opportunity for the race condition of switching DRBD to primary
2028   # before handshaking occured, but we do not eliminate it
2029
2030   # The proper fix would be to wait (with some limits) until the
2031   # connection has been made and drbd transitions from WFConnection
2032   # into any other network-connected state (Connected, SyncTarget,
2033   # SyncSource, etc.)
2034
2035   # 1st pass, assemble on all nodes in secondary mode
2036   for inst_disk in instance.disks:
2037     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2038       cfg.SetDiskID(node_disk, node)
2039       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2040       if not result:
2041         logger.Error("could not prepare block device %s on node %s"
2042                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2043         if not ignore_secondaries:
2044           disks_ok = False
2045
2046   # FIXME: race condition on drbd migration to primary
2047
2048   # 2nd pass, do only the primary node
2049   for inst_disk in instance.disks:
2050     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2051       if node != instance.primary_node:
2052         continue
2053       cfg.SetDiskID(node_disk, node)
2054       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2055       if not result:
2056         logger.Error("could not prepare block device %s on node %s"
2057                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2058         disks_ok = False
2059     device_info.append((instance.primary_node, inst_disk.iv_name, result))
2060
2061   # leave the disks configured for the primary node
2062   # this is a workaround that would be fixed better by
2063   # improving the logical/physical id handling
2064   for disk in instance.disks:
2065     cfg.SetDiskID(disk, instance.primary_node)
2066
2067   return disks_ok, device_info
2068
2069
2070 def _StartInstanceDisks(cfg, instance, force):
2071   """Start the disks of an instance.
2072
2073   """
2074   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2075                                            ignore_secondaries=force)
2076   if not disks_ok:
2077     _ShutdownInstanceDisks(instance, cfg)
2078     if force is not None and not force:
2079       logger.Error("If the message above refers to a secondary node,"
2080                    " you can retry the operation using '--force'.")
2081     raise errors.OpExecError("Disk consistency error")
2082
2083
2084 class LUDeactivateInstanceDisks(NoHooksLU):
2085   """Shutdown an instance's disks.
2086
2087   """
2088   _OP_REQP = ["instance_name"]
2089
2090   def CheckPrereq(self):
2091     """Check prerequisites.
2092
2093     This checks that the instance is in the cluster.
2094
2095     """
2096     instance = self.cfg.GetInstanceInfo(
2097       self.cfg.ExpandInstanceName(self.op.instance_name))
2098     if instance is None:
2099       raise errors.OpPrereqError("Instance '%s' not known" %
2100                                  self.op.instance_name)
2101     self.instance = instance
2102
2103   def Exec(self, feedback_fn):
2104     """Deactivate the disks
2105
2106     """
2107     instance = self.instance
2108     ins_l = rpc.call_instance_list([instance.primary_node])
2109     ins_l = ins_l[instance.primary_node]
2110     if not type(ins_l) is list:
2111       raise errors.OpExecError("Can't contact node '%s'" %
2112                                instance.primary_node)
2113
2114     if self.instance.name in ins_l:
2115       raise errors.OpExecError("Instance is running, can't shutdown"
2116                                " block devices.")
2117
2118     _ShutdownInstanceDisks(instance, self.cfg)
2119
2120
2121 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2122   """Shutdown block devices of an instance.
2123
2124   This does the shutdown on all nodes of the instance.
2125
2126   If the ignore_primary is false, errors on the primary node are
2127   ignored.
2128
2129   """
2130   result = True
2131   for disk in instance.disks:
2132     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2133       cfg.SetDiskID(top_disk, node)
2134       if not rpc.call_blockdev_shutdown(node, top_disk):
2135         logger.Error("could not shutdown block device %s on node %s" %
2136                      (disk.iv_name, node))
2137         if not ignore_primary or node != instance.primary_node:
2138           result = False
2139   return result
2140
2141
2142 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2143   """Checks if a node has enough free memory.
2144
2145   This function check if a given node has the needed amount of free
2146   memory. In case the node has less memory or we cannot get the
2147   information from the node, this function raise an OpPrereqError
2148   exception.
2149
2150   Args:
2151     - cfg: a ConfigWriter instance
2152     - node: the node name
2153     - reason: string to use in the error message
2154     - requested: the amount of memory in MiB
2155
2156   """
2157   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2158   if not nodeinfo or not isinstance(nodeinfo, dict):
2159     raise errors.OpPrereqError("Could not contact node %s for resource"
2160                              " information" % (node,))
2161
2162   free_mem = nodeinfo[node].get('memory_free')
2163   if not isinstance(free_mem, int):
2164     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2165                              " was '%s'" % (node, free_mem))
2166   if requested > free_mem:
2167     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2168                              " needed %s MiB, available %s MiB" %
2169                              (node, reason, requested, free_mem))
2170
2171
2172 class LUStartupInstance(LogicalUnit):
2173   """Starts an instance.
2174
2175   """
2176   HPATH = "instance-start"
2177   HTYPE = constants.HTYPE_INSTANCE
2178   _OP_REQP = ["instance_name", "force"]
2179
2180   def BuildHooksEnv(self):
2181     """Build hooks env.
2182
2183     This runs on master, primary and secondary nodes of the instance.
2184
2185     """
2186     env = {
2187       "FORCE": self.op.force,
2188       }
2189     env.update(_BuildInstanceHookEnvByObject(self.instance))
2190     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2191           list(self.instance.secondary_nodes))
2192     return env, nl, nl
2193
2194   def CheckPrereq(self):
2195     """Check prerequisites.
2196
2197     This checks that the instance is in the cluster.
2198
2199     """
2200     instance = self.cfg.GetInstanceInfo(
2201       self.cfg.ExpandInstanceName(self.op.instance_name))
2202     if instance is None:
2203       raise errors.OpPrereqError("Instance '%s' not known" %
2204                                  self.op.instance_name)
2205
2206     # check bridges existance
2207     _CheckInstanceBridgesExist(instance)
2208
2209     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2210                          "starting instance %s" % instance.name,
2211                          instance.memory)
2212
2213     self.instance = instance
2214     self.op.instance_name = instance.name
2215
2216   def Exec(self, feedback_fn):
2217     """Start the instance.
2218
2219     """
2220     instance = self.instance
2221     force = self.op.force
2222     extra_args = getattr(self.op, "extra_args", "")
2223
2224     self.cfg.MarkInstanceUp(instance.name)
2225
2226     node_current = instance.primary_node
2227
2228     _StartInstanceDisks(self.cfg, instance, force)
2229
2230     if not rpc.call_instance_start(node_current, instance, extra_args):
2231       _ShutdownInstanceDisks(instance, self.cfg)
2232       raise errors.OpExecError("Could not start instance")
2233
2234
2235 class LURebootInstance(LogicalUnit):
2236   """Reboot an instance.
2237
2238   """
2239   HPATH = "instance-reboot"
2240   HTYPE = constants.HTYPE_INSTANCE
2241   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2242
2243   def BuildHooksEnv(self):
2244     """Build hooks env.
2245
2246     This runs on master, primary and secondary nodes of the instance.
2247
2248     """
2249     env = {
2250       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2251       }
2252     env.update(_BuildInstanceHookEnvByObject(self.instance))
2253     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2254           list(self.instance.secondary_nodes))
2255     return env, nl, nl
2256
2257   def CheckPrereq(self):
2258     """Check prerequisites.
2259
2260     This checks that the instance is in the cluster.
2261
2262     """
2263     instance = self.cfg.GetInstanceInfo(
2264       self.cfg.ExpandInstanceName(self.op.instance_name))
2265     if instance is None:
2266       raise errors.OpPrereqError("Instance '%s' not known" %
2267                                  self.op.instance_name)
2268
2269     # check bridges existance
2270     _CheckInstanceBridgesExist(instance)
2271
2272     self.instance = instance
2273     self.op.instance_name = instance.name
2274
2275   def Exec(self, feedback_fn):
2276     """Reboot the instance.
2277
2278     """
2279     instance = self.instance
2280     ignore_secondaries = self.op.ignore_secondaries
2281     reboot_type = self.op.reboot_type
2282     extra_args = getattr(self.op, "extra_args", "")
2283
2284     node_current = instance.primary_node
2285
2286     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2287                            constants.INSTANCE_REBOOT_HARD,
2288                            constants.INSTANCE_REBOOT_FULL]:
2289       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2290                                   (constants.INSTANCE_REBOOT_SOFT,
2291                                    constants.INSTANCE_REBOOT_HARD,
2292                                    constants.INSTANCE_REBOOT_FULL))
2293
2294     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2295                        constants.INSTANCE_REBOOT_HARD]:
2296       if not rpc.call_instance_reboot(node_current, instance,
2297                                       reboot_type, extra_args):
2298         raise errors.OpExecError("Could not reboot instance")
2299     else:
2300       if not rpc.call_instance_shutdown(node_current, instance):
2301         raise errors.OpExecError("could not shutdown instance for full reboot")
2302       _ShutdownInstanceDisks(instance, self.cfg)
2303       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2304       if not rpc.call_instance_start(node_current, instance, extra_args):
2305         _ShutdownInstanceDisks(instance, self.cfg)
2306         raise errors.OpExecError("Could not start instance for full reboot")
2307
2308     self.cfg.MarkInstanceUp(instance.name)
2309
2310
2311 class LUShutdownInstance(LogicalUnit):
2312   """Shutdown an instance.
2313
2314   """
2315   HPATH = "instance-stop"
2316   HTYPE = constants.HTYPE_INSTANCE
2317   _OP_REQP = ["instance_name"]
2318
2319   def BuildHooksEnv(self):
2320     """Build hooks env.
2321
2322     This runs on master, primary and secondary nodes of the instance.
2323
2324     """
2325     env = _BuildInstanceHookEnvByObject(self.instance)
2326     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2327           list(self.instance.secondary_nodes))
2328     return env, nl, nl
2329
2330   def CheckPrereq(self):
2331     """Check prerequisites.
2332
2333     This checks that the instance is in the cluster.
2334
2335     """
2336     instance = self.cfg.GetInstanceInfo(
2337       self.cfg.ExpandInstanceName(self.op.instance_name))
2338     if instance is None:
2339       raise errors.OpPrereqError("Instance '%s' not known" %
2340                                  self.op.instance_name)
2341     self.instance = instance
2342
2343   def Exec(self, feedback_fn):
2344     """Shutdown the instance.
2345
2346     """
2347     instance = self.instance
2348     node_current = instance.primary_node
2349     self.cfg.MarkInstanceDown(instance.name)
2350     if not rpc.call_instance_shutdown(node_current, instance):
2351       logger.Error("could not shutdown instance")
2352
2353     _ShutdownInstanceDisks(instance, self.cfg)
2354
2355
2356 class LUReinstallInstance(LogicalUnit):
2357   """Reinstall an instance.
2358
2359   """
2360   HPATH = "instance-reinstall"
2361   HTYPE = constants.HTYPE_INSTANCE
2362   _OP_REQP = ["instance_name"]
2363
2364   def BuildHooksEnv(self):
2365     """Build hooks env.
2366
2367     This runs on master, primary and secondary nodes of the instance.
2368
2369     """
2370     env = _BuildInstanceHookEnvByObject(self.instance)
2371     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2372           list(self.instance.secondary_nodes))
2373     return env, nl, nl
2374
2375   def CheckPrereq(self):
2376     """Check prerequisites.
2377
2378     This checks that the instance is in the cluster and is not running.
2379
2380     """
2381     instance = self.cfg.GetInstanceInfo(
2382       self.cfg.ExpandInstanceName(self.op.instance_name))
2383     if instance is None:
2384       raise errors.OpPrereqError("Instance '%s' not known" %
2385                                  self.op.instance_name)
2386     if instance.disk_template == constants.DT_DISKLESS:
2387       raise errors.OpPrereqError("Instance '%s' has no disks" %
2388                                  self.op.instance_name)
2389     if instance.status != "down":
2390       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2391                                  self.op.instance_name)
2392     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2393     if remote_info:
2394       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2395                                  (self.op.instance_name,
2396                                   instance.primary_node))
2397
2398     self.op.os_type = getattr(self.op, "os_type", None)
2399     if self.op.os_type is not None:
2400       # OS verification
2401       pnode = self.cfg.GetNodeInfo(
2402         self.cfg.ExpandNodeName(instance.primary_node))
2403       if pnode is None:
2404         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2405                                    self.op.pnode)
2406       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2407       if not os_obj:
2408         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2409                                    " primary node"  % self.op.os_type)
2410
2411     self.instance = instance
2412
2413   def Exec(self, feedback_fn):
2414     """Reinstall the instance.
2415
2416     """
2417     inst = self.instance
2418
2419     if self.op.os_type is not None:
2420       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2421       inst.os = self.op.os_type
2422       self.cfg.AddInstance(inst)
2423
2424     _StartInstanceDisks(self.cfg, inst, None)
2425     try:
2426       feedback_fn("Running the instance OS create scripts...")
2427       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2428         raise errors.OpExecError("Could not install OS for instance %s"
2429                                  " on node %s" %
2430                                  (inst.name, inst.primary_node))
2431     finally:
2432       _ShutdownInstanceDisks(inst, self.cfg)
2433
2434
2435 class LURenameInstance(LogicalUnit):
2436   """Rename an instance.
2437
2438   """
2439   HPATH = "instance-rename"
2440   HTYPE = constants.HTYPE_INSTANCE
2441   _OP_REQP = ["instance_name", "new_name"]
2442
2443   def BuildHooksEnv(self):
2444     """Build hooks env.
2445
2446     This runs on master, primary and secondary nodes of the instance.
2447
2448     """
2449     env = _BuildInstanceHookEnvByObject(self.instance)
2450     env["INSTANCE_NEW_NAME"] = self.op.new_name
2451     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2452           list(self.instance.secondary_nodes))
2453     return env, nl, nl
2454
2455   def CheckPrereq(self):
2456     """Check prerequisites.
2457
2458     This checks that the instance is in the cluster and is not running.
2459
2460     """
2461     instance = self.cfg.GetInstanceInfo(
2462       self.cfg.ExpandInstanceName(self.op.instance_name))
2463     if instance is None:
2464       raise errors.OpPrereqError("Instance '%s' not known" %
2465                                  self.op.instance_name)
2466     if instance.status != "down":
2467       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2468                                  self.op.instance_name)
2469     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2470     if remote_info:
2471       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2472                                  (self.op.instance_name,
2473                                   instance.primary_node))
2474     self.instance = instance
2475
2476     # new name verification
2477     name_info = utils.HostInfo(self.op.new_name)
2478
2479     self.op.new_name = new_name = name_info.name
2480     instance_list = self.cfg.GetInstanceList()
2481     if new_name in instance_list:
2482       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2483                                  new_name)
2484
2485     if not getattr(self.op, "ignore_ip", False):
2486       command = ["fping", "-q", name_info.ip]
2487       result = utils.RunCmd(command)
2488       if not result.failed:
2489         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2490                                    (name_info.ip, new_name))
2491
2492
2493   def Exec(self, feedback_fn):
2494     """Reinstall the instance.
2495
2496     """
2497     inst = self.instance
2498     old_name = inst.name
2499
2500     self.cfg.RenameInstance(inst.name, self.op.new_name)
2501
2502     # re-read the instance from the configuration after rename
2503     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2504
2505     _StartInstanceDisks(self.cfg, inst, None)
2506     try:
2507       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2508                                           "sda", "sdb"):
2509         msg = ("Could run OS rename script for instance %s on node %s (but the"
2510                " instance has been renamed in Ganeti)" %
2511                (inst.name, inst.primary_node))
2512         logger.Error(msg)
2513     finally:
2514       _ShutdownInstanceDisks(inst, self.cfg)
2515
2516
2517 class LURemoveInstance(LogicalUnit):
2518   """Remove an instance.
2519
2520   """
2521   HPATH = "instance-remove"
2522   HTYPE = constants.HTYPE_INSTANCE
2523   _OP_REQP = ["instance_name"]
2524
2525   def BuildHooksEnv(self):
2526     """Build hooks env.
2527
2528     This runs on master, primary and secondary nodes of the instance.
2529
2530     """
2531     env = _BuildInstanceHookEnvByObject(self.instance)
2532     nl = [self.sstore.GetMasterNode()]
2533     return env, nl, nl
2534
2535   def CheckPrereq(self):
2536     """Check prerequisites.
2537
2538     This checks that the instance is in the cluster.
2539
2540     """
2541     instance = self.cfg.GetInstanceInfo(
2542       self.cfg.ExpandInstanceName(self.op.instance_name))
2543     if instance is None:
2544       raise errors.OpPrereqError("Instance '%s' not known" %
2545                                  self.op.instance_name)
2546     self.instance = instance
2547
2548   def Exec(self, feedback_fn):
2549     """Remove the instance.
2550
2551     """
2552     instance = self.instance
2553     logger.Info("shutting down instance %s on node %s" %
2554                 (instance.name, instance.primary_node))
2555
2556     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2557       if self.op.ignore_failures:
2558         feedback_fn("Warning: can't shutdown instance")
2559       else:
2560         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2561                                  (instance.name, instance.primary_node))
2562
2563     logger.Info("removing block devices for instance %s" % instance.name)
2564
2565     if not _RemoveDisks(instance, self.cfg):
2566       if self.op.ignore_failures:
2567         feedback_fn("Warning: can't remove instance's disks")
2568       else:
2569         raise errors.OpExecError("Can't remove instance's disks")
2570
2571     logger.Info("removing instance %s out of cluster config" % instance.name)
2572
2573     self.cfg.RemoveInstance(instance.name)
2574
2575
2576 class LUQueryInstances(NoHooksLU):
2577   """Logical unit for querying instances.
2578
2579   """
2580   _OP_REQP = ["output_fields", "names"]
2581
2582   def CheckPrereq(self):
2583     """Check prerequisites.
2584
2585     This checks that the fields required are valid output fields.
2586
2587     """
2588     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2589     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2590                                "admin_state", "admin_ram",
2591                                "disk_template", "ip", "mac", "bridge",
2592                                "sda_size", "sdb_size", "vcpus"],
2593                        dynamic=self.dynamic_fields,
2594                        selected=self.op.output_fields)
2595
2596     self.wanted = _GetWantedInstances(self, self.op.names)
2597
2598   def Exec(self, feedback_fn):
2599     """Computes the list of nodes and their attributes.
2600
2601     """
2602     instance_names = self.wanted
2603     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2604                      in instance_names]
2605
2606     # begin data gathering
2607
2608     nodes = frozenset([inst.primary_node for inst in instance_list])
2609
2610     bad_nodes = []
2611     if self.dynamic_fields.intersection(self.op.output_fields):
2612       live_data = {}
2613       node_data = rpc.call_all_instances_info(nodes)
2614       for name in nodes:
2615         result = node_data[name]
2616         if result:
2617           live_data.update(result)
2618         elif result == False:
2619           bad_nodes.append(name)
2620         # else no instance is alive
2621     else:
2622       live_data = dict([(name, {}) for name in instance_names])
2623
2624     # end data gathering
2625
2626     output = []
2627     for instance in instance_list:
2628       iout = []
2629       for field in self.op.output_fields:
2630         if field == "name":
2631           val = instance.name
2632         elif field == "os":
2633           val = instance.os
2634         elif field == "pnode":
2635           val = instance.primary_node
2636         elif field == "snodes":
2637           val = list(instance.secondary_nodes)
2638         elif field == "admin_state":
2639           val = (instance.status != "down")
2640         elif field == "oper_state":
2641           if instance.primary_node in bad_nodes:
2642             val = None
2643           else:
2644             val = bool(live_data.get(instance.name))
2645         elif field == "status":
2646           if instance.primary_node in bad_nodes:
2647             val = "ERROR_nodedown"
2648           else:
2649             running = bool(live_data.get(instance.name))
2650             if running:
2651               if instance.status != "down":
2652                 val = "running"
2653               else:
2654                 val = "ERROR_up"
2655             else:
2656               if instance.status != "down":
2657                 val = "ERROR_down"
2658               else:
2659                 val = "ADMIN_down"
2660         elif field == "admin_ram":
2661           val = instance.memory
2662         elif field == "oper_ram":
2663           if instance.primary_node in bad_nodes:
2664             val = None
2665           elif instance.name in live_data:
2666             val = live_data[instance.name].get("memory", "?")
2667           else:
2668             val = "-"
2669         elif field == "disk_template":
2670           val = instance.disk_template
2671         elif field == "ip":
2672           val = instance.nics[0].ip
2673         elif field == "bridge":
2674           val = instance.nics[0].bridge
2675         elif field == "mac":
2676           val = instance.nics[0].mac
2677         elif field == "sda_size" or field == "sdb_size":
2678           disk = instance.FindDisk(field[:3])
2679           if disk is None:
2680             val = None
2681           else:
2682             val = disk.size
2683         elif field == "vcpus":
2684           val = instance.vcpus
2685         else:
2686           raise errors.ParameterError(field)
2687         iout.append(val)
2688       output.append(iout)
2689
2690     return output
2691
2692
2693 class LUFailoverInstance(LogicalUnit):
2694   """Failover an instance.
2695
2696   """
2697   HPATH = "instance-failover"
2698   HTYPE = constants.HTYPE_INSTANCE
2699   _OP_REQP = ["instance_name", "ignore_consistency"]
2700
2701   def BuildHooksEnv(self):
2702     """Build hooks env.
2703
2704     This runs on master, primary and secondary nodes of the instance.
2705
2706     """
2707     env = {
2708       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2709       }
2710     env.update(_BuildInstanceHookEnvByObject(self.instance))
2711     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2712     return env, nl, nl
2713
2714   def CheckPrereq(self):
2715     """Check prerequisites.
2716
2717     This checks that the instance is in the cluster.
2718
2719     """
2720     instance = self.cfg.GetInstanceInfo(
2721       self.cfg.ExpandInstanceName(self.op.instance_name))
2722     if instance is None:
2723       raise errors.OpPrereqError("Instance '%s' not known" %
2724                                  self.op.instance_name)
2725
2726     if instance.disk_template not in constants.DTS_NET_MIRROR:
2727       raise errors.OpPrereqError("Instance's disk layout is not"
2728                                  " network mirrored, cannot failover.")
2729
2730     secondary_nodes = instance.secondary_nodes
2731     if not secondary_nodes:
2732       raise errors.ProgrammerError("no secondary node but using "
2733                                    "DT_REMOTE_RAID1 template")
2734
2735     target_node = secondary_nodes[0]
2736     # check memory requirements on the secondary node
2737     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2738                          instance.name, instance.memory)
2739
2740     # check bridge existance
2741     brlist = [nic.bridge for nic in instance.nics]
2742     if not rpc.call_bridges_exist(target_node, brlist):
2743       raise errors.OpPrereqError("One or more target bridges %s does not"
2744                                  " exist on destination node '%s'" %
2745                                  (brlist, target_node))
2746
2747     self.instance = instance
2748
2749   def Exec(self, feedback_fn):
2750     """Failover an instance.
2751
2752     The failover is done by shutting it down on its present node and
2753     starting it on the secondary.
2754
2755     """
2756     instance = self.instance
2757
2758     source_node = instance.primary_node
2759     target_node = instance.secondary_nodes[0]
2760
2761     feedback_fn("* checking disk consistency between source and target")
2762     for dev in instance.disks:
2763       # for remote_raid1, these are md over drbd
2764       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2765         if instance.status == "up" and not self.op.ignore_consistency:
2766           raise errors.OpExecError("Disk %s is degraded on target node,"
2767                                    " aborting failover." % dev.iv_name)
2768
2769     feedback_fn("* shutting down instance on source node")
2770     logger.Info("Shutting down instance %s on node %s" %
2771                 (instance.name, source_node))
2772
2773     if not rpc.call_instance_shutdown(source_node, instance):
2774       if self.op.ignore_consistency:
2775         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2776                      " anyway. Please make sure node %s is down"  %
2777                      (instance.name, source_node, source_node))
2778       else:
2779         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2780                                  (instance.name, source_node))
2781
2782     feedback_fn("* deactivating the instance's disks on source node")
2783     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2784       raise errors.OpExecError("Can't shut down the instance's disks.")
2785
2786     instance.primary_node = target_node
2787     # distribute new instance config to the other nodes
2788     self.cfg.AddInstance(instance)
2789
2790     # Only start the instance if it's marked as up
2791     if instance.status == "up":
2792       feedback_fn("* activating the instance's disks on target node")
2793       logger.Info("Starting instance %s on node %s" %
2794                   (instance.name, target_node))
2795
2796       disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2797                                                ignore_secondaries=True)
2798       if not disks_ok:
2799         _ShutdownInstanceDisks(instance, self.cfg)
2800         raise errors.OpExecError("Can't activate the instance's disks")
2801
2802       feedback_fn("* starting the instance on the target node")
2803       if not rpc.call_instance_start(target_node, instance, None):
2804         _ShutdownInstanceDisks(instance, self.cfg)
2805         raise errors.OpExecError("Could not start instance %s on node %s." %
2806                                  (instance.name, target_node))
2807
2808
2809 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2810   """Create a tree of block devices on the primary node.
2811
2812   This always creates all devices.
2813
2814   """
2815   if device.children:
2816     for child in device.children:
2817       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2818         return False
2819
2820   cfg.SetDiskID(device, node)
2821   new_id = rpc.call_blockdev_create(node, device, device.size,
2822                                     instance.name, True, info)
2823   if not new_id:
2824     return False
2825   if device.physical_id is None:
2826     device.physical_id = new_id
2827   return True
2828
2829
2830 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2831   """Create a tree of block devices on a secondary node.
2832
2833   If this device type has to be created on secondaries, create it and
2834   all its children.
2835
2836   If not, just recurse to children keeping the same 'force' value.
2837
2838   """
2839   if device.CreateOnSecondary():
2840     force = True
2841   if device.children:
2842     for child in device.children:
2843       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2844                                         child, force, info):
2845         return False
2846
2847   if not force:
2848     return True
2849   cfg.SetDiskID(device, node)
2850   new_id = rpc.call_blockdev_create(node, device, device.size,
2851                                     instance.name, False, info)
2852   if not new_id:
2853     return False
2854   if device.physical_id is None:
2855     device.physical_id = new_id
2856   return True
2857
2858
2859 def _GenerateUniqueNames(cfg, exts):
2860   """Generate a suitable LV name.
2861
2862   This will generate a logical volume name for the given instance.
2863
2864   """
2865   results = []
2866   for val in exts:
2867     new_id = cfg.GenerateUniqueID()
2868     results.append("%s%s" % (new_id, val))
2869   return results
2870
2871
2872 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2873   """Generate a drbd device complete with its children.
2874
2875   """
2876   port = cfg.AllocatePort()
2877   vgname = cfg.GetVGName()
2878   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2879                           logical_id=(vgname, names[0]))
2880   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2881                           logical_id=(vgname, names[1]))
2882   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2883                           logical_id = (primary, secondary, port),
2884                           children = [dev_data, dev_meta])
2885   return drbd_dev
2886
2887
2888 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2889   """Generate a drbd8 device complete with its children.
2890
2891   """
2892   port = cfg.AllocatePort()
2893   vgname = cfg.GetVGName()
2894   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2895                           logical_id=(vgname, names[0]))
2896   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2897                           logical_id=(vgname, names[1]))
2898   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2899                           logical_id = (primary, secondary, port),
2900                           children = [dev_data, dev_meta],
2901                           iv_name=iv_name)
2902   return drbd_dev
2903
2904 def _GenerateDiskTemplate(cfg, template_name,
2905                           instance_name, primary_node,
2906                           secondary_nodes, disk_sz, swap_sz):
2907   """Generate the entire disk layout for a given template type.
2908
2909   """
2910   #TODO: compute space requirements
2911
2912   vgname = cfg.GetVGName()
2913   if template_name == constants.DT_DISKLESS:
2914     disks = []
2915   elif template_name == constants.DT_PLAIN:
2916     if len(secondary_nodes) != 0:
2917       raise errors.ProgrammerError("Wrong template configuration")
2918
2919     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2920     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2921                            logical_id=(vgname, names[0]),
2922                            iv_name = "sda")
2923     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2924                            logical_id=(vgname, names[1]),
2925                            iv_name = "sdb")
2926     disks = [sda_dev, sdb_dev]
2927   elif template_name == constants.DT_LOCAL_RAID1:
2928     if len(secondary_nodes) != 0:
2929       raise errors.ProgrammerError("Wrong template configuration")
2930
2931
2932     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2933                                        ".sdb_m1", ".sdb_m2"])
2934     sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2935                               logical_id=(vgname, names[0]))
2936     sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2937                               logical_id=(vgname, names[1]))
2938     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2939                               size=disk_sz,
2940                               children = [sda_dev_m1, sda_dev_m2])
2941     sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2942                               logical_id=(vgname, names[2]))
2943     sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2944                               logical_id=(vgname, names[3]))
2945     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2946                               size=swap_sz,
2947                               children = [sdb_dev_m1, sdb_dev_m2])
2948     disks = [md_sda_dev, md_sdb_dev]
2949   elif template_name == constants.DT_REMOTE_RAID1:
2950     if len(secondary_nodes) != 1:
2951       raise errors.ProgrammerError("Wrong template configuration")
2952     remote_node = secondary_nodes[0]
2953     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2954                                        ".sdb_data", ".sdb_meta"])
2955     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2956                                          disk_sz, names[0:2])
2957     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2958                               children = [drbd_sda_dev], size=disk_sz)
2959     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2960                                          swap_sz, names[2:4])
2961     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2962                               children = [drbd_sdb_dev], size=swap_sz)
2963     disks = [md_sda_dev, md_sdb_dev]
2964   elif template_name == constants.DT_DRBD8:
2965     if len(secondary_nodes) != 1:
2966       raise errors.ProgrammerError("Wrong template configuration")
2967     remote_node = secondary_nodes[0]
2968     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2969                                        ".sdb_data", ".sdb_meta"])
2970     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2971                                          disk_sz, names[0:2], "sda")
2972     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2973                                          swap_sz, names[2:4], "sdb")
2974     disks = [drbd_sda_dev, drbd_sdb_dev]
2975   else:
2976     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2977   return disks
2978
2979
2980 def _GetInstanceInfoText(instance):
2981   """Compute that text that should be added to the disk's metadata.
2982
2983   """
2984   return "originstname+%s" % instance.name
2985
2986
2987 def _CreateDisks(cfg, instance):
2988   """Create all disks for an instance.
2989
2990   This abstracts away some work from AddInstance.
2991
2992   Args:
2993     instance: the instance object
2994
2995   Returns:
2996     True or False showing the success of the creation process
2997
2998   """
2999   info = _GetInstanceInfoText(instance)
3000
3001   for device in instance.disks:
3002     logger.Info("creating volume %s for instance %s" %
3003               (device.iv_name, instance.name))
3004     #HARDCODE
3005     for secondary_node in instance.secondary_nodes:
3006       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3007                                         device, False, info):
3008         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3009                      (device.iv_name, device, secondary_node))
3010         return False
3011     #HARDCODE
3012     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3013                                     instance, device, info):
3014       logger.Error("failed to create volume %s on primary!" %
3015                    device.iv_name)
3016       return False
3017   return True
3018
3019
3020 def _RemoveDisks(instance, cfg):
3021   """Remove all disks for an instance.
3022
3023   This abstracts away some work from `AddInstance()` and
3024   `RemoveInstance()`. Note that in case some of the devices couldn't
3025   be removed, the removal will continue with the other ones (compare
3026   with `_CreateDisks()`).
3027
3028   Args:
3029     instance: the instance object
3030
3031   Returns:
3032     True or False showing the success of the removal proces
3033
3034   """
3035   logger.Info("removing block devices for instance %s" % instance.name)
3036
3037   result = True
3038   for device in instance.disks:
3039     for node, disk in device.ComputeNodeTree(instance.primary_node):
3040       cfg.SetDiskID(disk, node)
3041       if not rpc.call_blockdev_remove(node, disk):
3042         logger.Error("could not remove block device %s on node %s,"
3043                      " continuing anyway" %
3044                      (device.iv_name, node))
3045         result = False
3046   return result
3047
3048
3049 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3050   """Compute disk size requirements in the volume group
3051
3052   This is currently hard-coded for the two-drive layout.
3053
3054   """
3055   # Required free disk space as a function of disk and swap space
3056   req_size_dict = {
3057     constants.DT_DISKLESS: None,
3058     constants.DT_PLAIN: disk_size + swap_size,
3059     constants.DT_LOCAL_RAID1: (disk_size + swap_size) * 2,
3060     # 256 MB are added for drbd metadata, 128MB for each drbd device
3061     constants.DT_REMOTE_RAID1: disk_size + swap_size + 256,
3062     constants.DT_DRBD8: disk_size + swap_size + 256,
3063   }
3064
3065   if disk_template not in req_size_dict:
3066     raise errors.ProgrammerError("Disk template '%s' size requirement"
3067                                  " is unknown" %  disk_template)
3068
3069   return req_size_dict[disk_template]
3070
3071
3072 class LUCreateInstance(LogicalUnit):
3073   """Create an instance.
3074
3075   """
3076   HPATH = "instance-add"
3077   HTYPE = constants.HTYPE_INSTANCE
3078   _OP_REQP = ["instance_name", "mem_size", "disk_size",
3079               "disk_template", "swap_size", "mode", "start", "vcpus",
3080               "wait_for_sync", "ip_check", "mac"]
3081
3082   def _RunAllocator(self):
3083     """Run the allocator based on input opcode.
3084
3085     """
3086     disks = [{"size": self.op.disk_size, "mode": "w"},
3087              {"size": self.op.swap_size, "mode": "w"}]
3088     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3089              "bridge": self.op.bridge}]
3090     ial = IAllocator(self.cfg, self.sstore,
3091                      mode=constants.IALLOCATOR_MODE_ALLOC,
3092                      name=self.op.instance_name,
3093                      disk_template=self.op.disk_template,
3094                      tags=[],
3095                      os=self.op.os_type,
3096                      vcpus=self.op.vcpus,
3097                      mem_size=self.op.mem_size,
3098                      disks=disks,
3099                      nics=nics,
3100                      )
3101
3102     ial.Run(self.op.iallocator)
3103
3104     if not ial.success:
3105       raise errors.OpPrereqError("Can't compute nodes using"
3106                                  " iallocator '%s': %s" % (self.op.iallocator,
3107                                                            ial.info))
3108     if len(ial.nodes) != ial.required_nodes:
3109       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3110                                  " of nodes (%s), required %s" %
3111                                  (len(ial.nodes), ial.required_nodes))
3112     self.op.pnode = ial.nodes[0]
3113     logger.ToStdout("Selected nodes for the instance: %s" %
3114                     (", ".join(ial.nodes),))
3115     logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3116                 (self.op.instance_name, self.op.iallocator, ial.nodes))
3117     if ial.required_nodes == 2:
3118       self.op.snode = ial.nodes[1]
3119
3120   def BuildHooksEnv(self):
3121     """Build hooks env.
3122
3123     This runs on master, primary and secondary nodes of the instance.
3124
3125     """
3126     env = {
3127       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3128       "INSTANCE_DISK_SIZE": self.op.disk_size,
3129       "INSTANCE_SWAP_SIZE": self.op.swap_size,
3130       "INSTANCE_ADD_MODE": self.op.mode,
3131       }
3132     if self.op.mode == constants.INSTANCE_IMPORT:
3133       env["INSTANCE_SRC_NODE"] = self.op.src_node
3134       env["INSTANCE_SRC_PATH"] = self.op.src_path
3135       env["INSTANCE_SRC_IMAGE"] = self.src_image
3136
3137     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3138       primary_node=self.op.pnode,
3139       secondary_nodes=self.secondaries,
3140       status=self.instance_status,
3141       os_type=self.op.os_type,
3142       memory=self.op.mem_size,
3143       vcpus=self.op.vcpus,
3144       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3145     ))
3146
3147     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3148           self.secondaries)
3149     return env, nl, nl
3150
3151
3152   def CheckPrereq(self):
3153     """Check prerequisites.
3154
3155     """
3156     # set optional parameters to none if they don't exist
3157     for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3158                  "iallocator"]:
3159       if not hasattr(self.op, attr):
3160         setattr(self.op, attr, None)
3161
3162     if self.op.mode not in (constants.INSTANCE_CREATE,
3163                             constants.INSTANCE_IMPORT):
3164       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3165                                  self.op.mode)
3166
3167     if self.op.mode == constants.INSTANCE_IMPORT:
3168       src_node = getattr(self.op, "src_node", None)
3169       src_path = getattr(self.op, "src_path", None)
3170       if src_node is None or src_path is None:
3171         raise errors.OpPrereqError("Importing an instance requires source"
3172                                    " node and path options")
3173       src_node_full = self.cfg.ExpandNodeName(src_node)
3174       if src_node_full is None:
3175         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3176       self.op.src_node = src_node = src_node_full
3177
3178       if not os.path.isabs(src_path):
3179         raise errors.OpPrereqError("The source path must be absolute")
3180
3181       export_info = rpc.call_export_info(src_node, src_path)
3182
3183       if not export_info:
3184         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3185
3186       if not export_info.has_section(constants.INISECT_EXP):
3187         raise errors.ProgrammerError("Corrupted export config")
3188
3189       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3190       if (int(ei_version) != constants.EXPORT_VERSION):
3191         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3192                                    (ei_version, constants.EXPORT_VERSION))
3193
3194       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3195         raise errors.OpPrereqError("Can't import instance with more than"
3196                                    " one data disk")
3197
3198       # FIXME: are the old os-es, disk sizes, etc. useful?
3199       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3200       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3201                                                          'disk0_dump'))
3202       self.src_image = diskimage
3203     else: # INSTANCE_CREATE
3204       if getattr(self.op, "os_type", None) is None:
3205         raise errors.OpPrereqError("No guest OS specified")
3206
3207     #### instance parameters check
3208
3209     # disk template and mirror node verification
3210     if self.op.disk_template not in constants.DISK_TEMPLATES:
3211       raise errors.OpPrereqError("Invalid disk template name")
3212
3213     # instance name verification
3214     hostname1 = utils.HostInfo(self.op.instance_name)
3215
3216     self.op.instance_name = instance_name = hostname1.name
3217     instance_list = self.cfg.GetInstanceList()
3218     if instance_name in instance_list:
3219       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3220                                  instance_name)
3221
3222     # ip validity checks
3223     ip = getattr(self.op, "ip", None)
3224     if ip is None or ip.lower() == "none":
3225       inst_ip = None
3226     elif ip.lower() == "auto":
3227       inst_ip = hostname1.ip
3228     else:
3229       if not utils.IsValidIP(ip):
3230         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3231                                    " like a valid IP" % ip)
3232       inst_ip = ip
3233     self.inst_ip = self.op.ip = inst_ip
3234
3235     if self.op.start and not self.op.ip_check:
3236       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3237                                  " adding an instance in start mode")
3238
3239     if self.op.ip_check:
3240       if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3241         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3242                                    (hostname1.ip, instance_name))
3243
3244     # MAC address verification
3245     if self.op.mac != "auto":
3246       if not utils.IsValidMac(self.op.mac.lower()):
3247         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3248                                    self.op.mac)
3249
3250     # bridge verification
3251     bridge = getattr(self.op, "bridge", None)
3252     if bridge is None:
3253       self.op.bridge = self.cfg.GetDefBridge()
3254     else:
3255       self.op.bridge = bridge
3256
3257     # boot order verification
3258     if self.op.hvm_boot_order is not None:
3259       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3260         raise errors.OpPrereqError("invalid boot order specified,"
3261                                    " must be one or more of [acdn]")
3262     #### allocator run
3263
3264     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3265       raise errors.OpPrereqError("One and only one of iallocator and primary"
3266                                  " node must be given")
3267
3268     if self.op.iallocator is not None:
3269       self._RunAllocator()
3270
3271     #### node related checks
3272
3273     # check primary node
3274     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3275     if pnode is None:
3276       raise errors.OpPrereqError("Primary node '%s' is unknown" %
3277                                  self.op.pnode)
3278     self.op.pnode = pnode.name
3279     self.pnode = pnode
3280     self.secondaries = []
3281
3282     # mirror node verification
3283     if self.op.disk_template in constants.DTS_NET_MIRROR:
3284       if getattr(self.op, "snode", None) is None:
3285         raise errors.OpPrereqError("The networked disk templates need"
3286                                    " a mirror node")
3287
3288       snode_name = self.cfg.ExpandNodeName(self.op.snode)
3289       if snode_name is None:
3290         raise errors.OpPrereqError("Unknown secondary node '%s'" %
3291                                    self.op.snode)
3292       elif snode_name == pnode.name:
3293         raise errors.OpPrereqError("The secondary node cannot be"
3294                                    " the primary node.")
3295       self.secondaries.append(snode_name)
3296
3297     req_size = _ComputeDiskSize(self.op.disk_template,
3298                                 self.op.disk_size, self.op.swap_size)
3299
3300     # Check lv size requirements
3301     if req_size is not None:
3302       nodenames = [pnode.name] + self.secondaries
3303       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3304       for node in nodenames:
3305         info = nodeinfo.get(node, None)
3306         if not info:
3307           raise errors.OpPrereqError("Cannot get current information"
3308                                      " from node '%s'" % nodeinfo)
3309         vg_free = info.get('vg_free', None)
3310         if not isinstance(vg_free, int):
3311           raise errors.OpPrereqError("Can't compute free disk space on"
3312                                      " node %s" % node)
3313         if req_size > info['vg_free']:
3314           raise errors.OpPrereqError("Not enough disk space on target node %s."
3315                                      " %d MB available, %d MB required" %
3316                                      (node, info['vg_free'], req_size))
3317
3318     # os verification
3319     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3320     if not os_obj:
3321       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3322                                  " primary node"  % self.op.os_type)
3323
3324     if self.op.kernel_path == constants.VALUE_NONE:
3325       raise errors.OpPrereqError("Can't set instance kernel to none")
3326
3327
3328     # bridge check on primary node
3329     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3330       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3331                                  " destination node '%s'" %
3332                                  (self.op.bridge, pnode.name))
3333
3334     if self.op.start:
3335       self.instance_status = 'up'
3336     else:
3337       self.instance_status = 'down'
3338
3339   def Exec(self, feedback_fn):
3340     """Create and add the instance to the cluster.
3341
3342     """
3343     instance = self.op.instance_name
3344     pnode_name = self.pnode.name
3345
3346     if self.op.mac == "auto":
3347       mac_address = self.cfg.GenerateMAC()
3348     else:
3349       mac_address = self.op.mac
3350
3351     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3352     if self.inst_ip is not None:
3353       nic.ip = self.inst_ip
3354
3355     ht_kind = self.sstore.GetHypervisorType()
3356     if ht_kind in constants.HTS_REQ_PORT:
3357       network_port = self.cfg.AllocatePort()
3358     else:
3359       network_port = None
3360
3361     disks = _GenerateDiskTemplate(self.cfg,
3362                                   self.op.disk_template,
3363                                   instance, pnode_name,
3364                                   self.secondaries, self.op.disk_size,
3365                                   self.op.swap_size)
3366
3367     iobj = objects.Instance(name=instance, os=self.op.os_type,
3368                             primary_node=pnode_name,
3369                             memory=self.op.mem_size,
3370                             vcpus=self.op.vcpus,
3371                             nics=[nic], disks=disks,
3372                             disk_template=self.op.disk_template,
3373                             status=self.instance_status,
3374                             network_port=network_port,
3375                             kernel_path=self.op.kernel_path,
3376                             initrd_path=self.op.initrd_path,
3377                             hvm_boot_order=self.op.hvm_boot_order,
3378                             )
3379
3380     feedback_fn("* creating instance disks...")
3381     if not _CreateDisks(self.cfg, iobj):
3382       _RemoveDisks(iobj, self.cfg)
3383       raise errors.OpExecError("Device creation failed, reverting...")
3384
3385     feedback_fn("adding instance %s to cluster config" % instance)
3386
3387     self.cfg.AddInstance(iobj)
3388
3389     if self.op.wait_for_sync:
3390       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3391     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3392       # make sure the disks are not degraded (still sync-ing is ok)
3393       time.sleep(15)
3394       feedback_fn("* checking mirrors status")
3395       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3396     else:
3397       disk_abort = False
3398
3399     if disk_abort:
3400       _RemoveDisks(iobj, self.cfg)
3401       self.cfg.RemoveInstance(iobj.name)
3402       raise errors.OpExecError("There are some degraded disks for"
3403                                " this instance")
3404
3405     feedback_fn("creating os for instance %s on node %s" %
3406                 (instance, pnode_name))
3407
3408     if iobj.disk_template != constants.DT_DISKLESS:
3409       if self.op.mode == constants.INSTANCE_CREATE:
3410         feedback_fn("* running the instance OS create scripts...")
3411         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3412           raise errors.OpExecError("could not add os for instance %s"
3413                                    " on node %s" %
3414                                    (instance, pnode_name))
3415
3416       elif self.op.mode == constants.INSTANCE_IMPORT:
3417         feedback_fn("* running the instance OS import scripts...")
3418         src_node = self.op.src_node
3419         src_image = self.src_image
3420         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3421                                                 src_node, src_image):
3422           raise errors.OpExecError("Could not import os for instance"
3423                                    " %s on node %s" %
3424                                    (instance, pnode_name))
3425       else:
3426         # also checked in the prereq part
3427         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3428                                      % self.op.mode)
3429
3430     if self.op.start:
3431       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3432       feedback_fn("* starting instance...")
3433       if not rpc.call_instance_start(pnode_name, iobj, None):
3434         raise errors.OpExecError("Could not start instance")
3435
3436
3437 class LUConnectConsole(NoHooksLU):
3438   """Connect to an instance's console.
3439
3440   This is somewhat special in that it returns the command line that
3441   you need to run on the master node in order to connect to the
3442   console.
3443
3444   """
3445   _OP_REQP = ["instance_name"]
3446
3447   def CheckPrereq(self):
3448     """Check prerequisites.
3449
3450     This checks that the instance is in the cluster.
3451
3452     """
3453     instance = self.cfg.GetInstanceInfo(
3454       self.cfg.ExpandInstanceName(self.op.instance_name))
3455     if instance is None:
3456       raise errors.OpPrereqError("Instance '%s' not known" %
3457                                  self.op.instance_name)
3458     self.instance = instance
3459
3460   def Exec(self, feedback_fn):
3461     """Connect to the console of an instance
3462
3463     """
3464     instance = self.instance
3465     node = instance.primary_node
3466
3467     node_insts = rpc.call_instance_list([node])[node]
3468     if node_insts is False:
3469       raise errors.OpExecError("Can't connect to node %s." % node)
3470
3471     if instance.name not in node_insts:
3472       raise errors.OpExecError("Instance %s is not running." % instance.name)
3473
3474     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3475
3476     hyper = hypervisor.GetHypervisor()
3477     console_cmd = hyper.GetShellCommandForConsole(instance)
3478     # build ssh cmdline
3479     argv = ["ssh", "-q", "-t"]
3480     argv.extend(ssh.KNOWN_HOSTS_OPTS)
3481     argv.extend(ssh.BATCH_MODE_OPTS)
3482     argv.append(node)
3483     argv.append(console_cmd)
3484     return "ssh", argv
3485
3486
3487 class LUAddMDDRBDComponent(LogicalUnit):
3488   """Adda new mirror member to an instance's disk.
3489
3490   """
3491   HPATH = "mirror-add"
3492   HTYPE = constants.HTYPE_INSTANCE
3493   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3494
3495   def BuildHooksEnv(self):
3496     """Build hooks env.
3497
3498     This runs on the master, the primary and all the secondaries.
3499
3500     """
3501     env = {
3502       "NEW_SECONDARY": self.op.remote_node,
3503       "DISK_NAME": self.op.disk_name,
3504       }
3505     env.update(_BuildInstanceHookEnvByObject(self.instance))
3506     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3507           self.op.remote_node,] + list(self.instance.secondary_nodes)
3508     return env, nl, nl
3509
3510   def CheckPrereq(self):
3511     """Check prerequisites.
3512
3513     This checks that the instance is in the cluster.
3514
3515     """
3516     instance = self.cfg.GetInstanceInfo(
3517       self.cfg.ExpandInstanceName(self.op.instance_name))
3518     if instance is None:
3519       raise errors.OpPrereqError("Instance '%s' not known" %
3520                                  self.op.instance_name)
3521     self.instance = instance
3522
3523     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3524     if remote_node is None:
3525       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3526     self.remote_node = remote_node
3527
3528     if remote_node == instance.primary_node:
3529       raise errors.OpPrereqError("The specified node is the primary node of"
3530                                  " the instance.")
3531
3532     if instance.disk_template != constants.DT_REMOTE_RAID1:
3533       raise errors.OpPrereqError("Instance's disk layout is not"
3534                                  " remote_raid1.")
3535     for disk in instance.disks:
3536       if disk.iv_name == self.op.disk_name:
3537         break
3538     else:
3539       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3540                                  " instance." % self.op.disk_name)
3541     if len(disk.children) > 1:
3542       raise errors.OpPrereqError("The device already has two slave devices."
3543                                  " This would create a 3-disk raid1 which we"
3544                                  " don't allow.")
3545     self.disk = disk
3546
3547   def Exec(self, feedback_fn):
3548     """Add the mirror component
3549
3550     """
3551     disk = self.disk
3552     instance = self.instance
3553
3554     remote_node = self.remote_node
3555     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3556     names = _GenerateUniqueNames(self.cfg, lv_names)
3557     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3558                                      remote_node, disk.size, names)
3559
3560     logger.Info("adding new mirror component on secondary")
3561     #HARDCODE
3562     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3563                                       new_drbd, False,
3564                                       _GetInstanceInfoText(instance)):
3565       raise errors.OpExecError("Failed to create new component on secondary"
3566                                " node %s" % remote_node)
3567
3568     logger.Info("adding new mirror component on primary")
3569     #HARDCODE
3570     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3571                                     instance, new_drbd,
3572                                     _GetInstanceInfoText(instance)):
3573       # remove secondary dev
3574       self.cfg.SetDiskID(new_drbd, remote_node)
3575       rpc.call_blockdev_remove(remote_node, new_drbd)
3576       raise errors.OpExecError("Failed to create volume on primary")
3577
3578     # the device exists now
3579     # call the primary node to add the mirror to md
3580     logger.Info("adding new mirror component to md")
3581     if not rpc.call_blockdev_addchildren(instance.primary_node,
3582                                          disk, [new_drbd]):
3583       logger.Error("Can't add mirror compoment to md!")
3584       self.cfg.SetDiskID(new_drbd, remote_node)
3585       if not rpc.call_blockdev_remove(remote_node, new_drbd):
3586         logger.Error("Can't rollback on secondary")
3587       self.cfg.SetDiskID(new_drbd, instance.primary_node)
3588       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3589         logger.Error("Can't rollback on primary")
3590       raise errors.OpExecError("Can't add mirror component to md array")
3591
3592     disk.children.append(new_drbd)
3593
3594     self.cfg.AddInstance(instance)
3595
3596     _WaitForSync(self.cfg, instance, self.proc)
3597
3598     return 0
3599
3600
3601 class LURemoveMDDRBDComponent(LogicalUnit):
3602   """Remove a component from a remote_raid1 disk.
3603
3604   """
3605   HPATH = "mirror-remove"
3606   HTYPE = constants.HTYPE_INSTANCE
3607   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3608
3609   def BuildHooksEnv(self):
3610     """Build hooks env.
3611
3612     This runs on the master, the primary and all the secondaries.
3613
3614     """
3615     env = {
3616       "DISK_NAME": self.op.disk_name,
3617       "DISK_ID": self.op.disk_id,
3618       "OLD_SECONDARY": self.old_secondary,
3619       }
3620     env.update(_BuildInstanceHookEnvByObject(self.instance))
3621     nl = [self.sstore.GetMasterNode(),
3622           self.instance.primary_node] + list(self.instance.secondary_nodes)
3623     return env, nl, nl
3624
3625   def CheckPrereq(self):
3626     """Check prerequisites.
3627
3628     This checks that the instance is in the cluster.
3629
3630     """
3631     instance = self.cfg.GetInstanceInfo(
3632       self.cfg.ExpandInstanceName(self.op.instance_name))
3633     if instance is None:
3634       raise errors.OpPrereqError("Instance '%s' not known" %
3635                                  self.op.instance_name)
3636     self.instance = instance
3637
3638     if instance.disk_template != constants.DT_REMOTE_RAID1:
3639       raise errors.OpPrereqError("Instance's disk layout is not"
3640                                  " remote_raid1.")
3641     for disk in instance.disks:
3642       if disk.iv_name == self.op.disk_name:
3643         break
3644     else:
3645       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3646                                  " instance." % self.op.disk_name)
3647     for child in disk.children:
3648       if (child.dev_type == constants.LD_DRBD7 and
3649           child.logical_id[2] == self.op.disk_id):
3650         break
3651     else:
3652       raise errors.OpPrereqError("Can't find the device with this port.")
3653
3654     if len(disk.children) < 2:
3655       raise errors.OpPrereqError("Cannot remove the last component from"
3656                                  " a mirror.")
3657     self.disk = disk
3658     self.child = child
3659     if self.child.logical_id[0] == instance.primary_node:
3660       oid = 1
3661     else:
3662       oid = 0
3663     self.old_secondary = self.child.logical_id[oid]
3664
3665   def Exec(self, feedback_fn):
3666     """Remove the mirror component
3667
3668     """
3669     instance = self.instance
3670     disk = self.disk
3671     child = self.child
3672     logger.Info("remove mirror component")
3673     self.cfg.SetDiskID(disk, instance.primary_node)
3674     if not rpc.call_blockdev_removechildren(instance.primary_node,
3675                                             disk, [child]):
3676       raise errors.OpExecError("Can't remove child from mirror.")
3677
3678     for node in child.logical_id[:2]:
3679       self.cfg.SetDiskID(child, node)
3680       if not rpc.call_blockdev_remove(node, child):
3681         logger.Error("Warning: failed to remove device from node %s,"
3682                      " continuing operation." % node)
3683
3684     disk.children.remove(child)
3685     self.cfg.AddInstance(instance)
3686
3687
3688 class LUReplaceDisks(LogicalUnit):
3689   """Replace the disks of an instance.
3690
3691   """
3692   HPATH = "mirrors-replace"
3693   HTYPE = constants.HTYPE_INSTANCE
3694   _OP_REQP = ["instance_name", "mode", "disks"]
3695
3696   def _RunAllocator(self):
3697     """Compute a new secondary node using an IAllocator.
3698
3699     """
3700     ial = IAllocator(self.cfg, self.sstore,
3701                      mode=constants.IALLOCATOR_MODE_RELOC,
3702                      name=self.op.instance_name,
3703                      relocate_from=[self.sec_node])
3704
3705     ial.Run(self.op.iallocator)
3706
3707     if not ial.success:
3708       raise errors.OpPrereqError("Can't compute nodes using"
3709                                  " iallocator '%s': %s" % (self.op.iallocator,
3710                                                            ial.info))
3711     if len(ial.nodes) != ial.required_nodes:
3712       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3713                                  " of nodes (%s), required %s" %
3714                                  (len(ial.nodes), ial.required_nodes))
3715     self.op.remote_node = ial.nodes[0]
3716     logger.ToStdout("Selected new secondary for the instance: %s" %
3717                     self.op.remote_node)
3718
3719   def BuildHooksEnv(self):
3720     """Build hooks env.
3721
3722     This runs on the master, the primary and all the secondaries.
3723
3724     """
3725     env = {
3726       "MODE": self.op.mode,
3727       "NEW_SECONDARY": self.op.remote_node,
3728       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3729       }
3730     env.update(_BuildInstanceHookEnvByObject(self.instance))
3731     nl = [
3732       self.sstore.GetMasterNode(),
3733       self.instance.primary_node,
3734       ]
3735     if self.op.remote_node is not None:
3736       nl.append(self.op.remote_node)
3737     return env, nl, nl
3738
3739   def CheckPrereq(self):
3740     """Check prerequisites.
3741
3742     This checks that the instance is in the cluster.
3743
3744     """
3745     if not hasattr(self.op, "remote_node"):
3746       self.op.remote_node = None
3747
3748     instance = self.cfg.GetInstanceInfo(
3749       self.cfg.ExpandInstanceName(self.op.instance_name))
3750     if instance is None:
3751       raise errors.OpPrereqError("Instance '%s' not known" %
3752                                  self.op.instance_name)
3753     self.instance = instance
3754     self.op.instance_name = instance.name
3755
3756     if instance.disk_template not in constants.DTS_NET_MIRROR:
3757       raise errors.OpPrereqError("Instance's disk layout is not"
3758                                  " network mirrored.")
3759
3760     if len(instance.secondary_nodes) != 1:
3761       raise errors.OpPrereqError("The instance has a strange layout,"
3762                                  " expected one secondary but found %d" %
3763                                  len(instance.secondary_nodes))
3764
3765     self.sec_node = instance.secondary_nodes[0]
3766
3767     ia_name = getattr(self.op, "iallocator", None)
3768     if ia_name is not None:
3769       if self.op.remote_node is not None:
3770         raise errors.OpPrereqError("Give either the iallocator or the new"
3771                                    " secondary, not both")
3772       self.op.remote_node = self._RunAllocator()
3773
3774     remote_node = self.op.remote_node
3775     if remote_node is not None:
3776       remote_node = self.cfg.ExpandNodeName(remote_node)
3777       if remote_node is None:
3778         raise errors.OpPrereqError("Node '%s' not known" %
3779                                    self.op.remote_node)
3780       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3781     else:
3782       self.remote_node_info = None
3783     if remote_node == instance.primary_node:
3784       raise errors.OpPrereqError("The specified node is the primary node of"
3785                                  " the instance.")
3786     elif remote_node == self.sec_node:
3787       if self.op.mode == constants.REPLACE_DISK_SEC:
3788         # this is for DRBD8, where we can't execute the same mode of
3789         # replacement as for drbd7 (no different port allocated)
3790         raise errors.OpPrereqError("Same secondary given, cannot execute"
3791                                    " replacement")
3792       # the user gave the current secondary, switch to
3793       # 'no-replace-secondary' mode for drbd7
3794       remote_node = None
3795     if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3796         self.op.mode != constants.REPLACE_DISK_ALL):
3797       raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3798                                  " disks replacement, not individual ones")
3799     if instance.disk_template == constants.DT_DRBD8:
3800       if (self.op.mode == constants.REPLACE_DISK_ALL and
3801           remote_node is not None):
3802         # switch to replace secondary mode
3803         self.op.mode = constants.REPLACE_DISK_SEC
3804
3805       if self.op.mode == constants.REPLACE_DISK_ALL:
3806         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3807                                    " secondary disk replacement, not"
3808                                    " both at once")
3809       elif self.op.mode == constants.REPLACE_DISK_PRI:
3810         if remote_node is not None:
3811           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3812                                      " the secondary while doing a primary"
3813                                      " node disk replacement")
3814         self.tgt_node = instance.primary_node
3815         self.oth_node = instance.secondary_nodes[0]
3816       elif self.op.mode == constants.REPLACE_DISK_SEC:
3817         self.new_node = remote_node # this can be None, in which case
3818                                     # we don't change the secondary
3819         self.tgt_node = instance.secondary_nodes[0]
3820         self.oth_node = instance.primary_node
3821       else:
3822         raise errors.ProgrammerError("Unhandled disk replace mode")
3823
3824     for name in self.op.disks:
3825       if instance.FindDisk(name) is None:
3826         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3827                                    (name, instance.name))
3828     self.op.remote_node = remote_node
3829
3830   def _ExecRR1(self, feedback_fn):
3831     """Replace the disks of an instance.
3832
3833     """
3834     instance = self.instance
3835     iv_names = {}
3836     # start of work
3837     if self.op.remote_node is None:
3838       remote_node = self.sec_node
3839     else:
3840       remote_node = self.op.remote_node
3841     cfg = self.cfg
3842     for dev in instance.disks:
3843       size = dev.size
3844       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3845       names = _GenerateUniqueNames(cfg, lv_names)
3846       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3847                                        remote_node, size, names)
3848       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3849       logger.Info("adding new mirror component on secondary for %s" %
3850                   dev.iv_name)
3851       #HARDCODE
3852       if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3853                                         new_drbd, False,
3854                                         _GetInstanceInfoText(instance)):
3855         raise errors.OpExecError("Failed to create new component on secondary"
3856                                  " node %s. Full abort, cleanup manually!" %
3857                                  remote_node)
3858
3859       logger.Info("adding new mirror component on primary")
3860       #HARDCODE
3861       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3862                                       instance, new_drbd,
3863                                       _GetInstanceInfoText(instance)):
3864         # remove secondary dev
3865         cfg.SetDiskID(new_drbd, remote_node)
3866         rpc.call_blockdev_remove(remote_node, new_drbd)
3867         raise errors.OpExecError("Failed to create volume on primary!"
3868                                  " Full abort, cleanup manually!!")
3869
3870       # the device exists now
3871       # call the primary node to add the mirror to md
3872       logger.Info("adding new mirror component to md")
3873       if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3874                                            [new_drbd]):
3875         logger.Error("Can't add mirror compoment to md!")
3876         cfg.SetDiskID(new_drbd, remote_node)
3877         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3878           logger.Error("Can't rollback on secondary")
3879         cfg.SetDiskID(new_drbd, instance.primary_node)
3880         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3881           logger.Error("Can't rollback on primary")
3882         raise errors.OpExecError("Full abort, cleanup manually!!")
3883
3884       dev.children.append(new_drbd)
3885       cfg.AddInstance(instance)
3886
3887     # this can fail as the old devices are degraded and _WaitForSync
3888     # does a combined result over all disks, so we don't check its
3889     # return value
3890     _WaitForSync(cfg, instance, self.proc, unlock=True)
3891
3892     # so check manually all the devices
3893     for name in iv_names:
3894       dev, child, new_drbd = iv_names[name]
3895       cfg.SetDiskID(dev, instance.primary_node)
3896       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3897       if is_degr:
3898         raise errors.OpExecError("MD device %s is degraded!" % name)
3899       cfg.SetDiskID(new_drbd, instance.primary_node)
3900       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3901       if is_degr:
3902         raise errors.OpExecError("New drbd device %s is degraded!" % name)
3903
3904     for name in iv_names:
3905       dev, child, new_drbd = iv_names[name]
3906       logger.Info("remove mirror %s component" % name)
3907       cfg.SetDiskID(dev, instance.primary_node)
3908       if not rpc.call_blockdev_removechildren(instance.primary_node,
3909                                               dev, [child]):
3910         logger.Error("Can't remove child from mirror, aborting"
3911                      " *this device cleanup*.\nYou need to cleanup manually!!")
3912         continue
3913
3914       for node in child.logical_id[:2]:
3915         logger.Info("remove child device on %s" % node)
3916         cfg.SetDiskID(child, node)
3917         if not rpc.call_blockdev_remove(node, child):
3918           logger.Error("Warning: failed to remove device from node %s,"
3919                        " continuing operation." % node)
3920
3921       dev.children.remove(child)
3922
3923       cfg.AddInstance(instance)
3924
3925   def _ExecD8DiskOnly(self, feedback_fn):
3926     """Replace a disk on the primary or secondary for dbrd8.
3927
3928     The algorithm for replace is quite complicated:
3929       - for each disk to be replaced:
3930         - create new LVs on the target node with unique names
3931         - detach old LVs from the drbd device
3932         - rename old LVs to name_replaced.<time_t>
3933         - rename new LVs to old LVs
3934         - attach the new LVs (with the old names now) to the drbd device
3935       - wait for sync across all devices
3936       - for each modified disk:
3937         - remove old LVs (which have the name name_replaces.<time_t>)
3938
3939     Failures are not very well handled.
3940
3941     """
3942     steps_total = 6
3943     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3944     instance = self.instance
3945     iv_names = {}
3946     vgname = self.cfg.GetVGName()
3947     # start of work
3948     cfg = self.cfg
3949     tgt_node = self.tgt_node
3950     oth_node = self.oth_node
3951
3952     # Step: check device activation
3953     self.proc.LogStep(1, steps_total, "check device existence")
3954     info("checking volume groups")
3955     my_vg = cfg.GetVGName()
3956     results = rpc.call_vg_list([oth_node, tgt_node])
3957     if not results:
3958       raise errors.OpExecError("Can't list volume groups on the nodes")
3959     for node in oth_node, tgt_node:
3960       res = results.get(node, False)
3961       if not res or my_vg not in res:
3962         raise errors.OpExecError("Volume group '%s' not found on %s" %
3963                                  (my_vg, node))
3964     for dev in instance.disks:
3965       if not dev.iv_name in self.op.disks:
3966         continue
3967       for node in tgt_node, oth_node:
3968         info("checking %s on %s" % (dev.iv_name, node))
3969         cfg.SetDiskID(dev, node)
3970         if not rpc.call_blockdev_find(node, dev):
3971           raise errors.OpExecError("Can't find device %s on node %s" %
3972                                    (dev.iv_name, node))
3973
3974     # Step: check other node consistency
3975     self.proc.LogStep(2, steps_total, "check peer consistency")
3976     for dev in instance.disks:
3977       if not dev.iv_name in self.op.disks:
3978         continue
3979       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3980       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3981                                    oth_node==instance.primary_node):
3982         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3983                                  " to replace disks on this node (%s)" %
3984                                  (oth_node, tgt_node))
3985
3986     # Step: create new storage
3987     self.proc.LogStep(3, steps_total, "allocate new storage")
3988     for dev in instance.disks:
3989       if not dev.iv_name in self.op.disks:
3990         continue
3991       size = dev.size
3992       cfg.SetDiskID(dev, tgt_node)
3993       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3994       names = _GenerateUniqueNames(cfg, lv_names)
3995       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3996                              logical_id=(vgname, names[0]))
3997       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3998                              logical_id=(vgname, names[1]))
3999       new_lvs = [lv_data, lv_meta]
4000       old_lvs = dev.children
4001       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4002       info("creating new local storage on %s for %s" %
4003            (tgt_node, dev.iv_name))
4004       # since we *always* want to create this LV, we use the
4005       # _Create...OnPrimary (which forces the creation), even if we
4006       # are talking about the secondary node
4007       for new_lv in new_lvs:
4008         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
4009                                         _GetInstanceInfoText(instance)):
4010           raise errors.OpExecError("Failed to create new LV named '%s' on"
4011                                    " node '%s'" %
4012                                    (new_lv.logical_id[1], tgt_node))
4013
4014     # Step: for each lv, detach+rename*2+attach
4015     self.proc.LogStep(4, steps_total, "change drbd configuration")
4016     for dev, old_lvs, new_lvs in iv_names.itervalues():
4017       info("detaching %s drbd from local storage" % dev.iv_name)
4018       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4019         raise errors.OpExecError("Can't detach drbd from local storage on node"
4020                                  " %s for device %s" % (tgt_node, dev.iv_name))
4021       #dev.children = []
4022       #cfg.Update(instance)
4023
4024       # ok, we created the new LVs, so now we know we have the needed
4025       # storage; as such, we proceed on the target node to rename
4026       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4027       # using the assumption that logical_id == physical_id (which in
4028       # turn is the unique_id on that node)
4029
4030       # FIXME(iustin): use a better name for the replaced LVs
4031       temp_suffix = int(time.time())
4032       ren_fn = lambda d, suff: (d.physical_id[0],
4033                                 d.physical_id[1] + "_replaced-%s" % suff)
4034       # build the rename list based on what LVs exist on the node
4035       rlist = []
4036       for to_ren in old_lvs:
4037         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
4038         if find_res is not None: # device exists
4039           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4040
4041       info("renaming the old LVs on the target node")
4042       if not rpc.call_blockdev_rename(tgt_node, rlist):
4043         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4044       # now we rename the new LVs to the old LVs
4045       info("renaming the new LVs on the target node")
4046       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4047       if not rpc.call_blockdev_rename(tgt_node, rlist):
4048         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4049
4050       for old, new in zip(old_lvs, new_lvs):
4051         new.logical_id = old.logical_id
4052         cfg.SetDiskID(new, tgt_node)
4053
4054       for disk in old_lvs:
4055         disk.logical_id = ren_fn(disk, temp_suffix)
4056         cfg.SetDiskID(disk, tgt_node)
4057
4058       # now that the new lvs have the old name, we can add them to the device
4059       info("adding new mirror component on %s" % tgt_node)
4060       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4061         for new_lv in new_lvs:
4062           if not rpc.call_blockdev_remove(tgt_node, new_lv):
4063             warning("Can't rollback device %s", hint="manually cleanup unused"
4064                     " logical volumes")
4065         raise errors.OpExecError("Can't add local storage to drbd")
4066
4067       dev.children = new_lvs
4068       cfg.Update(instance)
4069
4070     # Step: wait for sync
4071
4072     # this can fail as the old devices are degraded and _WaitForSync
4073     # does a combined result over all disks, so we don't check its
4074     # return value
4075     self.proc.LogStep(5, steps_total, "sync devices")
4076     _WaitForSync(cfg, instance, self.proc, unlock=True)
4077
4078     # so check manually all the devices
4079     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4080       cfg.SetDiskID(dev, instance.primary_node)
4081       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4082       if is_degr:
4083         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4084
4085     # Step: remove old storage
4086     self.proc.LogStep(6, steps_total, "removing old storage")
4087     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4088       info("remove logical volumes for %s" % name)
4089       for lv in old_lvs:
4090         cfg.SetDiskID(lv, tgt_node)
4091         if not rpc.call_blockdev_remove(tgt_node, lv):
4092           warning("Can't remove old LV", hint="manually remove unused LVs")
4093           continue
4094
4095   def _ExecD8Secondary(self, feedback_fn):
4096     """Replace the secondary node for drbd8.
4097
4098     The algorithm for replace is quite complicated:
4099       - for all disks of the instance:
4100         - create new LVs on the new node with same names
4101         - shutdown the drbd device on the old secondary
4102         - disconnect the drbd network on the primary
4103         - create the drbd device on the new secondary
4104         - network attach the drbd on the primary, using an artifice:
4105           the drbd code for Attach() will connect to the network if it
4106           finds a device which is connected to the good local disks but
4107           not network enabled
4108       - wait for sync across all devices
4109       - remove all disks from the old secondary
4110
4111     Failures are not very well handled.
4112
4113     """
4114     steps_total = 6
4115     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4116     instance = self.instance
4117     iv_names = {}
4118     vgname = self.cfg.GetVGName()
4119     # start of work
4120     cfg = self.cfg
4121     old_node = self.tgt_node
4122     new_node = self.new_node
4123     pri_node = instance.primary_node
4124
4125     # Step: check device activation
4126     self.proc.LogStep(1, steps_total, "check device existence")
4127     info("checking volume groups")
4128     my_vg = cfg.GetVGName()
4129     results = rpc.call_vg_list([pri_node, new_node])
4130     if not results:
4131       raise errors.OpExecError("Can't list volume groups on the nodes")
4132     for node in pri_node, new_node:
4133       res = results.get(node, False)
4134       if not res or my_vg not in res:
4135         raise errors.OpExecError("Volume group '%s' not found on %s" %
4136                                  (my_vg, node))
4137     for dev in instance.disks:
4138       if not dev.iv_name in self.op.disks:
4139         continue
4140       info("checking %s on %s" % (dev.iv_name, pri_node))
4141       cfg.SetDiskID(dev, pri_node)
4142       if not rpc.call_blockdev_find(pri_node, dev):
4143         raise errors.OpExecError("Can't find device %s on node %s" %
4144                                  (dev.iv_name, pri_node))
4145
4146     # Step: check other node consistency
4147     self.proc.LogStep(2, steps_total, "check peer consistency")
4148     for dev in instance.disks:
4149       if not dev.iv_name in self.op.disks:
4150         continue
4151       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4152       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4153         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4154                                  " unsafe to replace the secondary" %
4155                                  pri_node)
4156
4157     # Step: create new storage
4158     self.proc.LogStep(3, steps_total, "allocate new storage")
4159     for dev in instance.disks:
4160       size = dev.size
4161       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4162       # since we *always* want to create this LV, we use the
4163       # _Create...OnPrimary (which forces the creation), even if we
4164       # are talking about the secondary node
4165       for new_lv in dev.children:
4166         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4167                                         _GetInstanceInfoText(instance)):
4168           raise errors.OpExecError("Failed to create new LV named '%s' on"
4169                                    " node '%s'" %
4170                                    (new_lv.logical_id[1], new_node))
4171
4172       iv_names[dev.iv_name] = (dev, dev.children)
4173
4174     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4175     for dev in instance.disks:
4176       size = dev.size
4177       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4178       # create new devices on new_node
4179       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4180                               logical_id=(pri_node, new_node,
4181                                           dev.logical_id[2]),
4182                               children=dev.children)
4183       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4184                                         new_drbd, False,
4185                                       _GetInstanceInfoText(instance)):
4186         raise errors.OpExecError("Failed to create new DRBD on"
4187                                  " node '%s'" % new_node)
4188
4189     for dev in instance.disks:
4190       # we have new devices, shutdown the drbd on the old secondary
4191       info("shutting down drbd for %s on old node" % dev.iv_name)
4192       cfg.SetDiskID(dev, old_node)
4193       if not rpc.call_blockdev_shutdown(old_node, dev):
4194         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4195                 hint="Please cleanup this device manually as soon as possible")
4196
4197     info("detaching primary drbds from the network (=> standalone)")
4198     done = 0
4199     for dev in instance.disks:
4200       cfg.SetDiskID(dev, pri_node)
4201       # set the physical (unique in bdev terms) id to None, meaning
4202       # detach from network
4203       dev.physical_id = (None,) * len(dev.physical_id)
4204       # and 'find' the device, which will 'fix' it to match the
4205       # standalone state
4206       if rpc.call_blockdev_find(pri_node, dev):
4207         done += 1
4208       else:
4209         warning("Failed to detach drbd %s from network, unusual case" %
4210                 dev.iv_name)
4211
4212     if not done:
4213       # no detaches succeeded (very unlikely)
4214       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4215
4216     # if we managed to detach at least one, we update all the disks of
4217     # the instance to point to the new secondary
4218     info("updating instance configuration")
4219     for dev in instance.disks:
4220       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4221       cfg.SetDiskID(dev, pri_node)
4222     cfg.Update(instance)
4223
4224     # and now perform the drbd attach
4225     info("attaching primary drbds to new secondary (standalone => connected)")
4226     failures = []
4227     for dev in instance.disks:
4228       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4229       # since the attach is smart, it's enough to 'find' the device,
4230       # it will automatically activate the network, if the physical_id
4231       # is correct
4232       cfg.SetDiskID(dev, pri_node)
4233       if not rpc.call_blockdev_find(pri_node, dev):
4234         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4235                 "please do a gnt-instance info to see the status of disks")
4236
4237     # this can fail as the old devices are degraded and _WaitForSync
4238     # does a combined result over all disks, so we don't check its
4239     # return value
4240     self.proc.LogStep(5, steps_total, "sync devices")
4241     _WaitForSync(cfg, instance, self.proc, unlock=True)
4242
4243     # so check manually all the devices
4244     for name, (dev, old_lvs) in iv_names.iteritems():
4245       cfg.SetDiskID(dev, pri_node)
4246       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4247       if is_degr:
4248         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4249
4250     self.proc.LogStep(6, steps_total, "removing old storage")
4251     for name, (dev, old_lvs) in iv_names.iteritems():
4252       info("remove logical volumes for %s" % name)
4253       for lv in old_lvs:
4254         cfg.SetDiskID(lv, old_node)
4255         if not rpc.call_blockdev_remove(old_node, lv):
4256           warning("Can't remove LV on old secondary",
4257                   hint="Cleanup stale volumes by hand")
4258
4259   def Exec(self, feedback_fn):
4260     """Execute disk replacement.
4261
4262     This dispatches the disk replacement to the appropriate handler.
4263
4264     """
4265     instance = self.instance
4266     if instance.disk_template == constants.DT_REMOTE_RAID1:
4267       fn = self._ExecRR1
4268     elif instance.disk_template == constants.DT_DRBD8:
4269       if self.op.remote_node is None:
4270         fn = self._ExecD8DiskOnly
4271       else:
4272         fn = self._ExecD8Secondary
4273     else:
4274       raise errors.ProgrammerError("Unhandled disk replacement case")
4275     return fn(feedback_fn)
4276
4277
4278 class LUQueryInstanceData(NoHooksLU):
4279   """Query runtime instance data.
4280
4281   """
4282   _OP_REQP = ["instances"]
4283
4284   def CheckPrereq(self):
4285     """Check prerequisites.
4286
4287     This only checks the optional instance list against the existing names.
4288
4289     """
4290     if not isinstance(self.op.instances, list):
4291       raise errors.OpPrereqError("Invalid argument type 'instances'")
4292     if self.op.instances:
4293       self.wanted_instances = []
4294       names = self.op.instances
4295       for name in names:
4296         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4297         if instance is None:
4298           raise errors.OpPrereqError("No such instance name '%s'" % name)
4299         self.wanted_instances.append(instance)
4300     else:
4301       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4302                                in self.cfg.GetInstanceList()]
4303     return
4304
4305
4306   def _ComputeDiskStatus(self, instance, snode, dev):
4307     """Compute block device status.
4308
4309     """
4310     self.cfg.SetDiskID(dev, instance.primary_node)
4311     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4312     if dev.dev_type in constants.LDS_DRBD:
4313       # we change the snode then (otherwise we use the one passed in)
4314       if dev.logical_id[0] == instance.primary_node:
4315         snode = dev.logical_id[1]
4316       else:
4317         snode = dev.logical_id[0]
4318
4319     if snode:
4320       self.cfg.SetDiskID(dev, snode)
4321       dev_sstatus = rpc.call_blockdev_find(snode, dev)
4322     else:
4323       dev_sstatus = None
4324
4325     if dev.children:
4326       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4327                       for child in dev.children]
4328     else:
4329       dev_children = []
4330
4331     data = {
4332       "iv_name": dev.iv_name,
4333       "dev_type": dev.dev_type,
4334       "logical_id": dev.logical_id,
4335       "physical_id": dev.physical_id,
4336       "pstatus": dev_pstatus,
4337       "sstatus": dev_sstatus,
4338       "children": dev_children,
4339       }
4340
4341     return data
4342
4343   def Exec(self, feedback_fn):
4344     """Gather and return data"""
4345     result = {}
4346     for instance in self.wanted_instances:
4347       remote_info = rpc.call_instance_info(instance.primary_node,
4348                                                 instance.name)
4349       if remote_info and "state" in remote_info:
4350         remote_state = "up"
4351       else:
4352         remote_state = "down"
4353       if instance.status == "down":
4354         config_state = "down"
4355       else:
4356         config_state = "up"
4357
4358       disks = [self._ComputeDiskStatus(instance, None, device)
4359                for device in instance.disks]
4360
4361       idict = {
4362         "name": instance.name,
4363         "config_state": config_state,
4364         "run_state": remote_state,
4365         "pnode": instance.primary_node,
4366         "snodes": instance.secondary_nodes,
4367         "os": instance.os,
4368         "memory": instance.memory,
4369         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4370         "disks": disks,
4371         "network_port": instance.network_port,
4372         "vcpus": instance.vcpus,
4373         "kernel_path": instance.kernel_path,
4374         "initrd_path": instance.initrd_path,
4375         "hvm_boot_order": instance.hvm_boot_order,
4376         }
4377
4378       result[instance.name] = idict
4379
4380     return result
4381
4382
4383 class LUSetInstanceParms(LogicalUnit):
4384   """Modifies an instances's parameters.
4385
4386   """
4387   HPATH = "instance-modify"
4388   HTYPE = constants.HTYPE_INSTANCE
4389   _OP_REQP = ["instance_name"]
4390
4391   def BuildHooksEnv(self):
4392     """Build hooks env.
4393
4394     This runs on the master, primary and secondaries.
4395
4396     """
4397     args = dict()
4398     if self.mem:
4399       args['memory'] = self.mem
4400     if self.vcpus:
4401       args['vcpus'] = self.vcpus
4402     if self.do_ip or self.do_bridge or self.mac:
4403       if self.do_ip:
4404         ip = self.ip
4405       else:
4406         ip = self.instance.nics[0].ip
4407       if self.bridge:
4408         bridge = self.bridge
4409       else:
4410         bridge = self.instance.nics[0].bridge
4411       if self.mac:
4412         mac = self.mac
4413       else:
4414         mac = self.instance.nics[0].mac
4415       args['nics'] = [(ip, bridge, mac)]
4416     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4417     nl = [self.sstore.GetMasterNode(),
4418           self.instance.primary_node] + list(self.instance.secondary_nodes)
4419     return env, nl, nl
4420
4421   def CheckPrereq(self):
4422     """Check prerequisites.
4423
4424     This only checks the instance list against the existing names.
4425
4426     """
4427     self.mem = getattr(self.op, "mem", None)
4428     self.vcpus = getattr(self.op, "vcpus", None)
4429     self.ip = getattr(self.op, "ip", None)
4430     self.mac = getattr(self.op, "mac", None)
4431     self.bridge = getattr(self.op, "bridge", None)
4432     self.kernel_path = getattr(self.op, "kernel_path", None)
4433     self.initrd_path = getattr(self.op, "initrd_path", None)
4434     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4435     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4436                  self.kernel_path, self.initrd_path, self.hvm_boot_order]
4437     if all_parms.count(None) == len(all_parms):
4438       raise errors.OpPrereqError("No changes submitted")
4439     if self.mem is not None:
4440       try:
4441         self.mem = int(self.mem)
4442       except ValueError, err:
4443         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4444     if self.vcpus is not None:
4445       try:
4446         self.vcpus = int(self.vcpus)
4447       except ValueError, err:
4448         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4449     if self.ip is not None:
4450       self.do_ip = True
4451       if self.ip.lower() == "none":
4452         self.ip = None
4453       else:
4454         if not utils.IsValidIP(self.ip):
4455           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4456     else:
4457       self.do_ip = False
4458     self.do_bridge = (self.bridge is not None)
4459     if self.mac is not None:
4460       if self.cfg.IsMacInUse(self.mac):
4461         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4462                                    self.mac)
4463       if not utils.IsValidMac(self.mac):
4464         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4465
4466     if self.kernel_path is not None:
4467       self.do_kernel_path = True
4468       if self.kernel_path == constants.VALUE_NONE:
4469         raise errors.OpPrereqError("Can't set instance to no kernel")
4470
4471       if self.kernel_path != constants.VALUE_DEFAULT:
4472         if not os.path.isabs(self.kernel_path):
4473           raise errors.OpPrereqError("The kernel path must be an absolute"
4474                                     " filename")
4475     else:
4476       self.do_kernel_path = False
4477
4478     if self.initrd_path is not None:
4479       self.do_initrd_path = True
4480       if self.initrd_path not in (constants.VALUE_NONE,
4481                                   constants.VALUE_DEFAULT):
4482         if not os.path.isabs(self.initrd_path):
4483           raise errors.OpPrereqError("The initrd path must be an absolute"
4484                                     " filename")
4485     else:
4486       self.do_initrd_path = False
4487
4488     # boot order verification
4489     if self.hvm_boot_order is not None:
4490       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4491         if len(self.hvm_boot_order.strip("acdn")) != 0:
4492           raise errors.OpPrereqError("invalid boot order specified,"
4493                                      " must be one or more of [acdn]"
4494                                      " or 'default'")
4495
4496     instance = self.cfg.GetInstanceInfo(
4497       self.cfg.ExpandInstanceName(self.op.instance_name))
4498     if instance is None:
4499       raise errors.OpPrereqError("No such instance name '%s'" %
4500                                  self.op.instance_name)
4501     self.op.instance_name = instance.name
4502     self.instance = instance
4503     return
4504
4505   def Exec(self, feedback_fn):
4506     """Modifies an instance.
4507
4508     All parameters take effect only at the next restart of the instance.
4509     """
4510     result = []
4511     instance = self.instance
4512     if self.mem:
4513       instance.memory = self.mem
4514       result.append(("mem", self.mem))
4515     if self.vcpus:
4516       instance.vcpus = self.vcpus
4517       result.append(("vcpus",  self.vcpus))
4518     if self.do_ip:
4519       instance.nics[0].ip = self.ip
4520       result.append(("ip", self.ip))
4521     if self.bridge:
4522       instance.nics[0].bridge = self.bridge
4523       result.append(("bridge", self.bridge))
4524     if self.mac:
4525       instance.nics[0].mac = self.mac
4526       result.append(("mac", self.mac))
4527     if self.do_kernel_path:
4528       instance.kernel_path = self.kernel_path
4529       result.append(("kernel_path", self.kernel_path))
4530     if self.do_initrd_path:
4531       instance.initrd_path = self.initrd_path
4532       result.append(("initrd_path", self.initrd_path))
4533     if self.hvm_boot_order:
4534       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4535         instance.hvm_boot_order = None
4536       else:
4537         instance.hvm_boot_order = self.hvm_boot_order
4538       result.append(("hvm_boot_order", self.hvm_boot_order))
4539
4540     self.cfg.AddInstance(instance)
4541
4542     return result
4543
4544
4545 class LUQueryExports(NoHooksLU):
4546   """Query the exports list
4547
4548   """
4549   _OP_REQP = []
4550
4551   def CheckPrereq(self):
4552     """Check that the nodelist contains only existing nodes.
4553
4554     """
4555     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4556
4557   def Exec(self, feedback_fn):
4558     """Compute the list of all the exported system images.
4559
4560     Returns:
4561       a dictionary with the structure node->(export-list)
4562       where export-list is a list of the instances exported on
4563       that node.
4564
4565     """
4566     return rpc.call_export_list(self.nodes)
4567
4568
4569 class LUExportInstance(LogicalUnit):
4570   """Export an instance to an image in the cluster.
4571
4572   """
4573   HPATH = "instance-export"
4574   HTYPE = constants.HTYPE_INSTANCE
4575   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4576
4577   def BuildHooksEnv(self):
4578     """Build hooks env.
4579
4580     This will run on the master, primary node and target node.
4581
4582     """
4583     env = {
4584       "EXPORT_NODE": self.op.target_node,
4585       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4586       }
4587     env.update(_BuildInstanceHookEnvByObject(self.instance))
4588     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4589           self.op.target_node]
4590     return env, nl, nl
4591
4592   def CheckPrereq(self):
4593     """Check prerequisites.
4594
4595     This checks that the instance and node names are valid.
4596
4597     """
4598     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4599     self.instance = self.cfg.GetInstanceInfo(instance_name)
4600     if self.instance is None:
4601       raise errors.OpPrereqError("Instance '%s' not found" %
4602                                  self.op.instance_name)
4603
4604     # node verification
4605     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4606     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4607
4608     if self.dst_node is None:
4609       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4610                                  self.op.target_node)
4611     self.op.target_node = self.dst_node.name
4612
4613   def Exec(self, feedback_fn):
4614     """Export an instance to an image in the cluster.
4615
4616     """
4617     instance = self.instance
4618     dst_node = self.dst_node
4619     src_node = instance.primary_node
4620     if self.op.shutdown:
4621       # shutdown the instance, but not the disks
4622       if not rpc.call_instance_shutdown(src_node, instance):
4623         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4624                                  (instance.name, src_node))
4625
4626     vgname = self.cfg.GetVGName()
4627
4628     snap_disks = []
4629
4630     try:
4631       for disk in instance.disks:
4632         if disk.iv_name == "sda":
4633           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4634           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4635
4636           if not new_dev_name:
4637             logger.Error("could not snapshot block device %s on node %s" %
4638                          (disk.logical_id[1], src_node))
4639           else:
4640             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4641                                       logical_id=(vgname, new_dev_name),
4642                                       physical_id=(vgname, new_dev_name),
4643                                       iv_name=disk.iv_name)
4644             snap_disks.append(new_dev)
4645
4646     finally:
4647       if self.op.shutdown and instance.status == "up":
4648         if not rpc.call_instance_start(src_node, instance, None):
4649           _ShutdownInstanceDisks(instance, self.cfg)
4650           raise errors.OpExecError("Could not start instance")
4651
4652     # TODO: check for size
4653
4654     for dev in snap_disks:
4655       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4656                                            instance):
4657         logger.Error("could not export block device %s from node"
4658                      " %s to node %s" %
4659                      (dev.logical_id[1], src_node, dst_node.name))
4660       if not rpc.call_blockdev_remove(src_node, dev):
4661         logger.Error("could not remove snapshot block device %s from"
4662                      " node %s" % (dev.logical_id[1], src_node))
4663
4664     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4665       logger.Error("could not finalize export for instance %s on node %s" %
4666                    (instance.name, dst_node.name))
4667
4668     nodelist = self.cfg.GetNodeList()
4669     nodelist.remove(dst_node.name)
4670
4671     # on one-node clusters nodelist will be empty after the removal
4672     # if we proceed the backup would be removed because OpQueryExports
4673     # substitutes an empty list with the full cluster node list.
4674     if nodelist:
4675       op = opcodes.OpQueryExports(nodes=nodelist)
4676       exportlist = self.proc.ChainOpCode(op)
4677       for node in exportlist:
4678         if instance.name in exportlist[node]:
4679           if not rpc.call_export_remove(node, instance.name):
4680             logger.Error("could not remove older export for instance %s"
4681                          " on node %s" % (instance.name, node))
4682
4683
4684 class LURemoveExport(NoHooksLU):
4685   """Remove exports related to the named instance.
4686
4687   """
4688   _OP_REQP = ["instance_name"]
4689
4690   def CheckPrereq(self):
4691     """Check prerequisites.
4692     """
4693     pass
4694
4695   def Exec(self, feedback_fn):
4696     """Remove any export.
4697
4698     """
4699     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4700     # If the instance was not found we'll try with the name that was passed in.
4701     # This will only work if it was an FQDN, though.
4702     fqdn_warn = False
4703     if not instance_name:
4704       fqdn_warn = True
4705       instance_name = self.op.instance_name
4706
4707     op = opcodes.OpQueryExports(nodes=[])
4708     exportlist = self.proc.ChainOpCode(op)
4709     found = False
4710     for node in exportlist:
4711       if instance_name in exportlist[node]:
4712         found = True
4713         if not rpc.call_export_remove(node, instance_name):
4714           logger.Error("could not remove export for instance %s"
4715                        " on node %s" % (instance_name, node))
4716
4717     if fqdn_warn and not found:
4718       feedback_fn("Export not found. If trying to remove an export belonging"
4719                   " to a deleted instance please use its Fully Qualified"
4720                   " Domain Name.")
4721
4722
4723 class TagsLU(NoHooksLU):
4724   """Generic tags LU.
4725
4726   This is an abstract class which is the parent of all the other tags LUs.
4727
4728   """
4729   def CheckPrereq(self):
4730     """Check prerequisites.
4731
4732     """
4733     if self.op.kind == constants.TAG_CLUSTER:
4734       self.target = self.cfg.GetClusterInfo()
4735     elif self.op.kind == constants.TAG_NODE:
4736       name = self.cfg.ExpandNodeName(self.op.name)
4737       if name is None:
4738         raise errors.OpPrereqError("Invalid node name (%s)" %
4739                                    (self.op.name,))
4740       self.op.name = name
4741       self.target = self.cfg.GetNodeInfo(name)
4742     elif self.op.kind == constants.TAG_INSTANCE:
4743       name = self.cfg.ExpandInstanceName(self.op.name)
4744       if name is None:
4745         raise errors.OpPrereqError("Invalid instance name (%s)" %
4746                                    (self.op.name,))
4747       self.op.name = name
4748       self.target = self.cfg.GetInstanceInfo(name)
4749     else:
4750       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4751                                  str(self.op.kind))
4752
4753
4754 class LUGetTags(TagsLU):
4755   """Returns the tags of a given object.
4756
4757   """
4758   _OP_REQP = ["kind", "name"]
4759
4760   def Exec(self, feedback_fn):
4761     """Returns the tag list.
4762
4763     """
4764     return self.target.GetTags()
4765
4766
4767 class LUSearchTags(NoHooksLU):
4768   """Searches the tags for a given pattern.
4769
4770   """
4771   _OP_REQP = ["pattern"]
4772
4773   def CheckPrereq(self):
4774     """Check prerequisites.
4775
4776     This checks the pattern passed for validity by compiling it.
4777
4778     """
4779     try:
4780       self.re = re.compile(self.op.pattern)
4781     except re.error, err:
4782       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4783                                  (self.op.pattern, err))
4784
4785   def Exec(self, feedback_fn):
4786     """Returns the tag list.
4787
4788     """
4789     cfg = self.cfg
4790     tgts = [("/cluster", cfg.GetClusterInfo())]
4791     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4792     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4793     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4794     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4795     results = []
4796     for path, target in tgts:
4797       for tag in target.GetTags():
4798         if self.re.search(tag):
4799           results.append((path, tag))
4800     return results
4801
4802
4803 class LUAddTags(TagsLU):
4804   """Sets a tag on a given object.
4805
4806   """
4807   _OP_REQP = ["kind", "name", "tags"]
4808
4809   def CheckPrereq(self):
4810     """Check prerequisites.
4811
4812     This checks the type and length of the tag name and value.
4813
4814     """
4815     TagsLU.CheckPrereq(self)
4816     for tag in self.op.tags:
4817       objects.TaggableObject.ValidateTag(tag)
4818
4819   def Exec(self, feedback_fn):
4820     """Sets the tag.
4821
4822     """
4823     try:
4824       for tag in self.op.tags:
4825         self.target.AddTag(tag)
4826     except errors.TagError, err:
4827       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4828     try:
4829       self.cfg.Update(self.target)
4830     except errors.ConfigurationError:
4831       raise errors.OpRetryError("There has been a modification to the"
4832                                 " config file and the operation has been"
4833                                 " aborted. Please retry.")
4834
4835
4836 class LUDelTags(TagsLU):
4837   """Delete a list of tags from a given object.
4838
4839   """
4840   _OP_REQP = ["kind", "name", "tags"]
4841
4842   def CheckPrereq(self):
4843     """Check prerequisites.
4844
4845     This checks that we have the given tag.
4846
4847     """
4848     TagsLU.CheckPrereq(self)
4849     for tag in self.op.tags:
4850       objects.TaggableObject.ValidateTag(tag)
4851     del_tags = frozenset(self.op.tags)
4852     cur_tags = self.target.GetTags()
4853     if not del_tags <= cur_tags:
4854       diff_tags = del_tags - cur_tags
4855       diff_names = ["'%s'" % tag for tag in diff_tags]
4856       diff_names.sort()
4857       raise errors.OpPrereqError("Tag(s) %s not found" %
4858                                  (",".join(diff_names)))
4859
4860   def Exec(self, feedback_fn):
4861     """Remove the tag from the object.
4862
4863     """
4864     for tag in self.op.tags:
4865       self.target.RemoveTag(tag)
4866     try:
4867       self.cfg.Update(self.target)
4868     except errors.ConfigurationError:
4869       raise errors.OpRetryError("There has been a modification to the"
4870                                 " config file and the operation has been"
4871                                 " aborted. Please retry.")
4872
4873 class LUTestDelay(NoHooksLU):
4874   """Sleep for a specified amount of time.
4875
4876   This LU sleeps on the master and/or nodes for a specified amoutn of
4877   time.
4878
4879   """
4880   _OP_REQP = ["duration", "on_master", "on_nodes"]
4881
4882   def CheckPrereq(self):
4883     """Check prerequisites.
4884
4885     This checks that we have a good list of nodes and/or the duration
4886     is valid.
4887
4888     """
4889
4890     if self.op.on_nodes:
4891       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4892
4893   def Exec(self, feedback_fn):
4894     """Do the actual sleep.
4895
4896     """
4897     if self.op.on_master:
4898       if not utils.TestDelay(self.op.duration):
4899         raise errors.OpExecError("Error during master delay test")
4900     if self.op.on_nodes:
4901       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4902       if not result:
4903         raise errors.OpExecError("Complete failure from rpc call")
4904       for node, node_result in result.items():
4905         if not node_result:
4906           raise errors.OpExecError("Failure during rpc call to node %s,"
4907                                    " result: %s" % (node, node_result))
4908
4909
4910 class IAllocator(object):
4911   """IAllocator framework.
4912
4913   An IAllocator instance has three sets of attributes:
4914     - cfg/sstore that are needed to query the cluster
4915     - input data (all members of the _KEYS class attribute are required)
4916     - four buffer attributes (in|out_data|text), that represent the
4917       input (to the external script) in text and data structure format,
4918       and the output from it, again in two formats
4919     - the result variables from the script (success, info, nodes) for
4920       easy usage
4921
4922   """
4923   _ALLO_KEYS = [
4924     "mem_size", "disks", "disk_template",
4925     "os", "tags", "nics", "vcpus",
4926     ]
4927   _RELO_KEYS = [
4928     "relocate_from",
4929     ]
4930
4931   def __init__(self, cfg, sstore, mode, name, **kwargs):
4932     self.cfg = cfg
4933     self.sstore = sstore
4934     # init buffer variables
4935     self.in_text = self.out_text = self.in_data = self.out_data = None
4936     # init all input fields so that pylint is happy
4937     self.mode = mode
4938     self.name = name
4939     self.mem_size = self.disks = self.disk_template = None
4940     self.os = self.tags = self.nics = self.vcpus = None
4941     self.relocate_from = None
4942     # computed fields
4943     self.required_nodes = None
4944     # init result fields
4945     self.success = self.info = self.nodes = None
4946     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4947       keyset = self._ALLO_KEYS
4948     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4949       keyset = self._RELO_KEYS
4950     else:
4951       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4952                                    " IAllocator" % self.mode)
4953     for key in kwargs:
4954       if key not in keyset:
4955         raise errors.ProgrammerError("Invalid input parameter '%s' to"
4956                                      " IAllocator" % key)
4957       setattr(self, key, kwargs[key])
4958     for key in keyset:
4959       if key not in kwargs:
4960         raise errors.ProgrammerError("Missing input parameter '%s' to"
4961                                      " IAllocator" % key)
4962     self._BuildInputData()
4963
4964   def _ComputeClusterData(self):
4965     """Compute the generic allocator input data.
4966
4967     This is the data that is independent of the actual operation.
4968
4969     """
4970     cfg = self.cfg
4971     # cluster data
4972     data = {
4973       "version": 1,
4974       "cluster_name": self.sstore.GetClusterName(),
4975       "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4976       # we don't have job IDs
4977       }
4978
4979     # node data
4980     node_results = {}
4981     node_list = cfg.GetNodeList()
4982     node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4983     for nname in node_list:
4984       ninfo = cfg.GetNodeInfo(nname)
4985       if nname not in node_data or not isinstance(node_data[nname], dict):
4986         raise errors.OpExecError("Can't get data for node %s" % nname)
4987       remote_info = node_data[nname]
4988       for attr in ['memory_total', 'memory_free',
4989                    'vg_size', 'vg_free']:
4990         if attr not in remote_info:
4991           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4992                                    (nname, attr))
4993         try:
4994           int(remote_info[attr])
4995         except ValueError, err:
4996           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4997                                    " %s" % (nname, attr, str(err)))
4998       pnr = {
4999         "tags": list(ninfo.GetTags()),
5000         "total_memory": utils.TryConvert(int, remote_info['memory_total']),
5001         "free_memory": utils.TryConvert(int, remote_info['memory_free']),
5002         "total_disk": utils.TryConvert(int, remote_info['vg_size']),
5003         "free_disk": utils.TryConvert(int, remote_info['vg_free']),
5004         "primary_ip": ninfo.primary_ip,
5005         "secondary_ip": ninfo.secondary_ip,
5006         }
5007       node_results[nname] = pnr
5008     data["nodes"] = node_results
5009
5010     # instance data
5011     instance_data = {}
5012     i_list = cfg.GetInstanceList()
5013     for iname in i_list:
5014       iinfo = cfg.GetInstanceInfo(iname)
5015       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5016                   for n in iinfo.nics]
5017       pir = {
5018         "tags": list(iinfo.GetTags()),
5019         "should_run": iinfo.status == "up",
5020         "vcpus": iinfo.vcpus,
5021         "memory": iinfo.memory,
5022         "os": iinfo.os,
5023         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5024         "nics": nic_data,
5025         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5026         "disk_template": iinfo.disk_template,
5027         }
5028       instance_data[iname] = pir
5029
5030     data["instances"] = instance_data
5031
5032     self.in_data = data
5033
5034   def _AddNewInstance(self):
5035     """Add new instance data to allocator structure.
5036
5037     This in combination with _AllocatorGetClusterData will create the
5038     correct structure needed as input for the allocator.
5039
5040     The checks for the completeness of the opcode must have already been
5041     done.
5042
5043     """
5044     data = self.in_data
5045     if len(self.disks) != 2:
5046       raise errors.OpExecError("Only two-disk configurations supported")
5047
5048     disk_space = _ComputeDiskSize(self.disk_template,
5049                                   self.disks[0]["size"], self.disks[1]["size"])
5050
5051     if self.disk_template in constants.DTS_NET_MIRROR:
5052       self.required_nodes = 2
5053     else:
5054       self.required_nodes = 1
5055     request = {
5056       "type": "allocate",
5057       "name": self.name,
5058       "disk_template": self.disk_template,
5059       "tags": self.tags,
5060       "os": self.os,
5061       "vcpus": self.vcpus,
5062       "memory": self.mem_size,
5063       "disks": self.disks,
5064       "disk_space_total": disk_space,
5065       "nics": self.nics,
5066       "required_nodes": self.required_nodes,
5067       }
5068     data["request"] = request
5069
5070   def _AddRelocateInstance(self):
5071     """Add relocate instance data to allocator structure.
5072
5073     This in combination with _IAllocatorGetClusterData will create the
5074     correct structure needed as input for the allocator.
5075
5076     The checks for the completeness of the opcode must have already been
5077     done.
5078
5079     """
5080     instance = self.cfg.GetInstanceInfo(self.name)
5081     if instance is None:
5082       raise errors.ProgrammerError("Unknown instance '%s' passed to"
5083                                    " IAllocator" % self.name)
5084
5085     if instance.disk_template not in constants.DTS_NET_MIRROR:
5086       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5087
5088     if len(instance.secondary_nodes) != 1:
5089       raise errors.OpPrereqError("Instance has not exactly one secondary node")
5090
5091     self.required_nodes = 1
5092
5093     disk_space = _ComputeDiskSize(instance.disk_template,
5094                                   instance.disks[0].size,
5095                                   instance.disks[1].size)
5096
5097     request = {
5098       "type": "relocate",
5099       "name": self.name,
5100       "disk_space_total": disk_space,
5101       "required_nodes": self.required_nodes,
5102       "relocate_from": self.relocate_from,
5103       }
5104     self.in_data["request"] = request
5105
5106   def _BuildInputData(self):
5107     """Build input data structures.
5108
5109     """
5110     self._ComputeClusterData()
5111
5112     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5113       self._AddNewInstance()
5114     else:
5115       self._AddRelocateInstance()
5116
5117     self.in_text = serializer.Dump(self.in_data)
5118
5119   def Run(self, name, validate=True):
5120     """Run an instance allocator and return the results.
5121
5122     """
5123     data = self.in_text
5124
5125     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
5126                                   os.path.isfile)
5127     if alloc_script is None:
5128       raise errors.OpExecError("Can't find allocator '%s'" % name)
5129
5130     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
5131     try:
5132       os.write(fd, data)
5133       os.close(fd)
5134       result = utils.RunCmd([alloc_script, fin_name])
5135       if result.failed:
5136         raise errors.OpExecError("Instance allocator call failed: %s,"
5137                                  " output: %s" %
5138                                  (result.fail_reason, result.output))
5139     finally:
5140       os.unlink(fin_name)
5141     self.out_text = result.stdout
5142     if validate:
5143       self._ValidateResult()
5144
5145   def _ValidateResult(self):
5146     """Process the allocator results.
5147
5148     This will process and if successful save the result in
5149     self.out_data and the other parameters.
5150
5151     """
5152     try:
5153       rdict = serializer.Load(self.out_text)
5154     except Exception, err:
5155       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5156
5157     if not isinstance(rdict, dict):
5158       raise errors.OpExecError("Can't parse iallocator results: not a dict")
5159
5160     for key in "success", "info", "nodes":
5161       if key not in rdict:
5162         raise errors.OpExecError("Can't parse iallocator results:"
5163                                  " missing key '%s'" % key)
5164       setattr(self, key, rdict[key])
5165
5166     if not isinstance(rdict["nodes"], list):
5167       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5168                                " is not a list")
5169     self.out_data = rdict
5170
5171
5172 class LUTestAllocator(NoHooksLU):
5173   """Run allocator tests.
5174
5175   This LU runs the allocator tests
5176
5177   """
5178   _OP_REQP = ["direction", "mode", "name"]
5179
5180   def CheckPrereq(self):
5181     """Check prerequisites.
5182
5183     This checks the opcode parameters depending on the director and mode test.
5184
5185     """
5186     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5187       for attr in ["name", "mem_size", "disks", "disk_template",
5188                    "os", "tags", "nics", "vcpus"]:
5189         if not hasattr(self.op, attr):
5190           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5191                                      attr)
5192       iname = self.cfg.ExpandInstanceName(self.op.name)
5193       if iname is not None:
5194         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5195                                    iname)
5196       if not isinstance(self.op.nics, list):
5197         raise errors.OpPrereqError("Invalid parameter 'nics'")
5198       for row in self.op.nics:
5199         if (not isinstance(row, dict) or
5200             "mac" not in row or
5201             "ip" not in row or
5202             "bridge" not in row):
5203           raise errors.OpPrereqError("Invalid contents of the"
5204                                      " 'nics' parameter")
5205       if not isinstance(self.op.disks, list):
5206         raise errors.OpPrereqError("Invalid parameter 'disks'")
5207       if len(self.op.disks) != 2:
5208         raise errors.OpPrereqError("Only two-disk configurations supported")
5209       for row in self.op.disks:
5210         if (not isinstance(row, dict) or
5211             "size" not in row or
5212             not isinstance(row["size"], int) or
5213             "mode" not in row or
5214             row["mode"] not in ['r', 'w']):
5215           raise errors.OpPrereqError("Invalid contents of the"
5216                                      " 'disks' parameter")
5217     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5218       if not hasattr(self.op, "name"):
5219         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5220       fname = self.cfg.ExpandInstanceName(self.op.name)
5221       if fname is None:
5222         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5223                                    self.op.name)
5224       self.op.name = fname
5225       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5226     else:
5227       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5228                                  self.op.mode)
5229
5230     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5231       if not hasattr(self.op, "allocator") or self.op.allocator is None:
5232         raise errors.OpPrereqError("Missing allocator name")
5233     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5234       raise errors.OpPrereqError("Wrong allocator test '%s'" %
5235                                  self.op.direction)
5236
5237   def Exec(self, feedback_fn):
5238     """Run the allocator test.
5239
5240     """
5241     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5242       ial = IAllocator(self.cfg, self.sstore,
5243                        mode=self.op.mode,
5244                        name=self.op.name,
5245                        mem_size=self.op.mem_size,
5246                        disks=self.op.disks,
5247                        disk_template=self.op.disk_template,
5248                        os=self.op.os,
5249                        tags=self.op.tags,
5250                        nics=self.op.nics,
5251                        vcpus=self.op.vcpus,
5252                        )
5253     else:
5254       ial = IAllocator(self.cfg, self.sstore,
5255                        mode=self.op.mode,
5256                        name=self.op.name,
5257                        relocate_from=list(self.relocate_from),
5258                        )
5259
5260     if self.op.direction == constants.IALLOCATOR_DIR_IN:
5261       result = ial.in_text
5262     else:
5263       ial.Run(self.op.allocator, validate=False)
5264       result = ial.out_text
5265     return result