Modify LURenameInstance to support file backend
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45
46
47 class LogicalUnit(object):
48   """Logical Unit base class.
49
50   Subclasses must follow these rules:
51     - implement CheckPrereq which also fills in the opcode instance
52       with all the fields (even if as None)
53     - implement Exec
54     - implement BuildHooksEnv
55     - redefine HPATH and HTYPE
56     - optionally redefine their run requirements (REQ_CLUSTER,
57       REQ_MASTER); note that all commands require root permissions
58
59   """
60   HPATH = None
61   HTYPE = None
62   _OP_REQP = []
63   REQ_CLUSTER = True
64   REQ_MASTER = True
65
66   def __init__(self, processor, op, cfg, sstore):
67     """Constructor for LogicalUnit.
68
69     This needs to be overriden in derived classes in order to check op
70     validity.
71
72     """
73     self.proc = processor
74     self.op = op
75     self.cfg = cfg
76     self.sstore = sstore
77     self.__ssh = None
78
79     for attr_name in self._OP_REQP:
80       attr_val = getattr(op, attr_name, None)
81       if attr_val is None:
82         raise errors.OpPrereqError("Required parameter '%s' missing" %
83                                    attr_name)
84     if self.REQ_CLUSTER:
85       if not cfg.IsCluster():
86         raise errors.OpPrereqError("Cluster not initialized yet,"
87                                    " use 'gnt-cluster init' first.")
88       if self.REQ_MASTER:
89         master = sstore.GetMasterNode()
90         if master != utils.HostInfo().name:
91           raise errors.OpPrereqError("Commands must be run on the master"
92                                      " node %s" % master)
93
94   def __GetSSH(self):
95     """Returns the SshRunner object
96
97     """
98     if not self.__ssh:
99       self.__ssh = ssh.SshRunner(self.sstore)
100     return self.__ssh
101
102   ssh = property(fget=__GetSSH)
103
104   def CheckPrereq(self):
105     """Check prerequisites for this LU.
106
107     This method should check that the prerequisites for the execution
108     of this LU are fulfilled. It can do internode communication, but
109     it should be idempotent - no cluster or system changes are
110     allowed.
111
112     The method should raise errors.OpPrereqError in case something is
113     not fulfilled. Its return value is ignored.
114
115     This method should also update all the parameters of the opcode to
116     their canonical form; e.g. a short node name must be fully
117     expanded after this method has successfully completed (so that
118     hooks, logging, etc. work correctly).
119
120     """
121     raise NotImplementedError
122
123   def Exec(self, feedback_fn):
124     """Execute the LU.
125
126     This method should implement the actual work. It should raise
127     errors.OpExecError for failures that are somewhat dealt with in
128     code, or expected.
129
130     """
131     raise NotImplementedError
132
133   def BuildHooksEnv(self):
134     """Build hooks environment for this LU.
135
136     This method should return a three-node tuple consisting of: a dict
137     containing the environment that will be used for running the
138     specific hook for this LU, a list of node names on which the hook
139     should run before the execution, and a list of node names on which
140     the hook should run after the execution.
141
142     The keys of the dict must not have 'GANETI_' prefixed as this will
143     be handled in the hooks runner. Also note additional keys will be
144     added by the hooks runner. If the LU doesn't define any
145     environment, an empty dict (and not None) should be returned.
146
147     As for the node lists, the master should not be included in the
148     them, as it will be added by the hooks runner in case this LU
149     requires a cluster to run on (otherwise we don't have a node
150     list). No nodes should be returned as an empty list (and not
151     None).
152
153     Note that if the HPATH for a LU class is None, this function will
154     not be called.
155
156     """
157     raise NotImplementedError
158
159
160 class NoHooksLU(LogicalUnit):
161   """Simple LU which runs no hooks.
162
163   This LU is intended as a parent for other LogicalUnits which will
164   run no hooks, in order to reduce duplicate code.
165
166   """
167   HPATH = None
168   HTYPE = None
169
170   def BuildHooksEnv(self):
171     """Build hooks env.
172
173     This is a no-op, since we don't run hooks.
174
175     """
176     return {}, [], []
177
178
179 def _AddHostToEtcHosts(hostname):
180   """Wrapper around utils.SetEtcHostsEntry.
181
182   """
183   hi = utils.HostInfo(name=hostname)
184   utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
185
186
187 def _RemoveHostFromEtcHosts(hostname):
188   """Wrapper around utils.RemoveEtcHostsEntry.
189
190   """
191   hi = utils.HostInfo(name=hostname)
192   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
193   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
194
195
196 def _GetWantedNodes(lu, nodes):
197   """Returns list of checked and expanded node names.
198
199   Args:
200     nodes: List of nodes (strings) or None for all
201
202   """
203   if not isinstance(nodes, list):
204     raise errors.OpPrereqError("Invalid argument type 'nodes'")
205
206   if nodes:
207     wanted = []
208
209     for name in nodes:
210       node = lu.cfg.ExpandNodeName(name)
211       if node is None:
212         raise errors.OpPrereqError("No such node name '%s'" % name)
213       wanted.append(node)
214
215   else:
216     wanted = lu.cfg.GetNodeList()
217   return utils.NiceSort(wanted)
218
219
220 def _GetWantedInstances(lu, instances):
221   """Returns list of checked and expanded instance names.
222
223   Args:
224     instances: List of instances (strings) or None for all
225
226   """
227   if not isinstance(instances, list):
228     raise errors.OpPrereqError("Invalid argument type 'instances'")
229
230   if instances:
231     wanted = []
232
233     for name in instances:
234       instance = lu.cfg.ExpandInstanceName(name)
235       if instance is None:
236         raise errors.OpPrereqError("No such instance name '%s'" % name)
237       wanted.append(instance)
238
239   else:
240     wanted = lu.cfg.GetInstanceList()
241   return utils.NiceSort(wanted)
242
243
244 def _CheckOutputFields(static, dynamic, selected):
245   """Checks whether all selected fields are valid.
246
247   Args:
248     static: Static fields
249     dynamic: Dynamic fields
250
251   """
252   static_fields = frozenset(static)
253   dynamic_fields = frozenset(dynamic)
254
255   all_fields = static_fields | dynamic_fields
256
257   if not all_fields.issuperset(selected):
258     raise errors.OpPrereqError("Unknown output fields selected: %s"
259                                % ",".join(frozenset(selected).
260                                           difference(all_fields)))
261
262
263 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
264                           memory, vcpus, nics):
265   """Builds instance related env variables for hooks from single variables.
266
267   Args:
268     secondary_nodes: List of secondary nodes as strings
269   """
270   env = {
271     "OP_TARGET": name,
272     "INSTANCE_NAME": name,
273     "INSTANCE_PRIMARY": primary_node,
274     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
275     "INSTANCE_OS_TYPE": os_type,
276     "INSTANCE_STATUS": status,
277     "INSTANCE_MEMORY": memory,
278     "INSTANCE_VCPUS": vcpus,
279   }
280
281   if nics:
282     nic_count = len(nics)
283     for idx, (ip, bridge, mac) in enumerate(nics):
284       if ip is None:
285         ip = ""
286       env["INSTANCE_NIC%d_IP" % idx] = ip
287       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
288       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
289   else:
290     nic_count = 0
291
292   env["INSTANCE_NIC_COUNT"] = nic_count
293
294   return env
295
296
297 def _BuildInstanceHookEnvByObject(instance, override=None):
298   """Builds instance related env variables for hooks from an object.
299
300   Args:
301     instance: objects.Instance object of instance
302     override: dict of values to override
303   """
304   args = {
305     'name': instance.name,
306     'primary_node': instance.primary_node,
307     'secondary_nodes': instance.secondary_nodes,
308     'os_type': instance.os,
309     'status': instance.os,
310     'memory': instance.memory,
311     'vcpus': instance.vcpus,
312     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
313   }
314   if override:
315     args.update(override)
316   return _BuildInstanceHookEnv(**args)
317
318
319 def _HasValidVG(vglist, vgname):
320   """Checks if the volume group list is valid.
321
322   A non-None return value means there's an error, and the return value
323   is the error message.
324
325   """
326   vgsize = vglist.get(vgname, None)
327   if vgsize is None:
328     return "volume group '%s' missing" % vgname
329   elif vgsize < 20480:
330     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
331             (vgname, vgsize))
332   return None
333
334
335 def _InitSSHSetup(node):
336   """Setup the SSH configuration for the cluster.
337
338
339   This generates a dsa keypair for root, adds the pub key to the
340   permitted hosts and adds the hostkey to its own known hosts.
341
342   Args:
343     node: the name of this host as a fqdn
344
345   """
346   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
347
348   for name in priv_key, pub_key:
349     if os.path.exists(name):
350       utils.CreateBackup(name)
351     utils.RemoveFile(name)
352
353   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
354                          "-f", priv_key,
355                          "-q", "-N", ""])
356   if result.failed:
357     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
358                              result.output)
359
360   f = open(pub_key, 'r')
361   try:
362     utils.AddAuthorizedKey(auth_keys, f.read(8192))
363   finally:
364     f.close()
365
366
367 def _InitGanetiServerSetup(ss):
368   """Setup the necessary configuration for the initial node daemon.
369
370   This creates the nodepass file containing the shared password for
371   the cluster and also generates the SSL certificate.
372
373   """
374   # Create pseudo random password
375   randpass = sha.new(os.urandom(64)).hexdigest()
376   # and write it into sstore
377   ss.SetKey(ss.SS_NODED_PASS, randpass)
378
379   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
380                          "-days", str(365*5), "-nodes", "-x509",
381                          "-keyout", constants.SSL_CERT_FILE,
382                          "-out", constants.SSL_CERT_FILE, "-batch"])
383   if result.failed:
384     raise errors.OpExecError("could not generate server ssl cert, command"
385                              " %s had exitcode %s and error message %s" %
386                              (result.cmd, result.exit_code, result.output))
387
388   os.chmod(constants.SSL_CERT_FILE, 0400)
389
390   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
391
392   if result.failed:
393     raise errors.OpExecError("Could not start the node daemon, command %s"
394                              " had exitcode %s and error %s" %
395                              (result.cmd, result.exit_code, result.output))
396
397
398 def _CheckInstanceBridgesExist(instance):
399   """Check that the brigdes needed by an instance exist.
400
401   """
402   # check bridges existance
403   brlist = [nic.bridge for nic in instance.nics]
404   if not rpc.call_bridges_exist(instance.primary_node, brlist):
405     raise errors.OpPrereqError("one or more target bridges %s does not"
406                                " exist on destination node '%s'" %
407                                (brlist, instance.primary_node))
408
409
410 class LUInitCluster(LogicalUnit):
411   """Initialise the cluster.
412
413   """
414   HPATH = "cluster-init"
415   HTYPE = constants.HTYPE_CLUSTER
416   _OP_REQP = ["cluster_name", "hypervisor_type", "mac_prefix",
417               "def_bridge", "master_netdev", "file_storage_dir"]
418   REQ_CLUSTER = False
419
420   def BuildHooksEnv(self):
421     """Build hooks env.
422
423     Notes: Since we don't require a cluster, we must manually add
424     ourselves in the post-run node list.
425
426     """
427     env = {"OP_TARGET": self.op.cluster_name}
428     return env, [], [self.hostname.name]
429
430   def CheckPrereq(self):
431     """Verify that the passed name is a valid one.
432
433     """
434     if config.ConfigWriter.IsCluster():
435       raise errors.OpPrereqError("Cluster is already initialised")
436
437     if self.op.hypervisor_type == constants.HT_XEN_HVM31:
438       if not os.path.exists(constants.VNC_PASSWORD_FILE):
439         raise errors.OpPrereqError("Please prepare the cluster VNC"
440                                    "password file %s" %
441                                    constants.VNC_PASSWORD_FILE)
442
443     self.hostname = hostname = utils.HostInfo()
444
445     if hostname.ip.startswith("127."):
446       raise errors.OpPrereqError("This host's IP resolves to the private"
447                                  " range (%s). Please fix DNS or %s." %
448                                  (hostname.ip, constants.ETC_HOSTS))
449
450     if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
451                          source=constants.LOCALHOST_IP_ADDRESS):
452       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
453                                  " to %s,\nbut this ip address does not"
454                                  " belong to this host."
455                                  " Aborting." % hostname.ip)
456
457     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
458
459     if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
460                      timeout=5):
461       raise errors.OpPrereqError("Cluster IP already active. Aborting.")
462
463     secondary_ip = getattr(self.op, "secondary_ip", None)
464     if secondary_ip and not utils.IsValidIP(secondary_ip):
465       raise errors.OpPrereqError("Invalid secondary ip given")
466     if (secondary_ip and
467         secondary_ip != hostname.ip and
468         (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
469                            source=constants.LOCALHOST_IP_ADDRESS))):
470       raise errors.OpPrereqError("You gave %s as secondary IP,"
471                                  " but it does not belong to this host." %
472                                  secondary_ip)
473     self.secondary_ip = secondary_ip
474
475     if not hasattr(self.op, "vg_name"):
476       self.op.vg_name = None
477     # if vg_name not None, checks if volume group is valid
478     if self.op.vg_name:
479       vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
480       if vgstatus:
481         raise errors.OpPrereqError("Error: %s\nspecify --no-lvm-storage if"
482                                    " you are not using lvm" % vgstatus)
483
484     self.op.file_storage_dir = os.path.normpath(self.op.file_storage_dir)
485
486     if not os.path.isabs(self.op.file_storage_dir):
487       raise errors.OpPrereqError("The file storage directory you have is"
488                                  " not an absolute path.")
489
490     if not os.path.exists(self.op.file_storage_dir):
491       try:
492         os.makedirs(self.op.file_storage_dir, 0750)
493       except OSError, err:
494         raise errors.OpPrereqError("Cannot create file storage directory"
495                                    " '%s': %s" %
496                                    (self.op.file_storage_dir, err))
497
498     if not os.path.isdir(self.op.file_storage_dir):
499       raise errors.OpPrereqError("The file storage directory '%s' is not"
500                                  " a directory." % self.op.file_storage_dir)
501
502     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
503                     self.op.mac_prefix):
504       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
505                                  self.op.mac_prefix)
506
507     if self.op.hypervisor_type not in constants.HYPER_TYPES:
508       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
509                                  self.op.hypervisor_type)
510
511     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
512     if result.failed:
513       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
514                                  (self.op.master_netdev,
515                                   result.output.strip()))
516
517     if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
518             os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
519       raise errors.OpPrereqError("Init.d script '%s' missing or not"
520                                  " executable." % constants.NODE_INITD_SCRIPT)
521
522   def Exec(self, feedback_fn):
523     """Initialize the cluster.
524
525     """
526     clustername = self.clustername
527     hostname = self.hostname
528
529     # set up the simple store
530     self.sstore = ss = ssconf.SimpleStore()
531     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
532     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
533     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
534     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
535     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
536     ss.SetKey(ss.SS_FILE_STORAGE_DIR, self.op.file_storage_dir)
537
538     # set up the inter-node password and certificate
539     _InitGanetiServerSetup(ss)
540
541     # start the master ip
542     rpc.call_node_start_master(hostname.name)
543
544     # set up ssh config and /etc/hosts
545     f = open(constants.SSH_HOST_RSA_PUB, 'r')
546     try:
547       sshline = f.read()
548     finally:
549       f.close()
550     sshkey = sshline.split(" ")[1]
551
552     _AddHostToEtcHosts(hostname.name)
553     _InitSSHSetup(hostname.name)
554
555     # init of cluster config file
556     self.cfg = cfgw = config.ConfigWriter()
557     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
558                     sshkey, self.op.mac_prefix,
559                     self.op.vg_name, self.op.def_bridge)
560
561     ssh.WriteKnownHostsFile(cfgw, ss, constants.SSH_KNOWN_HOSTS_FILE)
562
563
564 class LUDestroyCluster(NoHooksLU):
565   """Logical unit for destroying the cluster.
566
567   """
568   _OP_REQP = []
569
570   def CheckPrereq(self):
571     """Check prerequisites.
572
573     This checks whether the cluster is empty.
574
575     Any errors are signalled by raising errors.OpPrereqError.
576
577     """
578     master = self.sstore.GetMasterNode()
579
580     nodelist = self.cfg.GetNodeList()
581     if len(nodelist) != 1 or nodelist[0] != master:
582       raise errors.OpPrereqError("There are still %d node(s) in"
583                                  " this cluster." % (len(nodelist) - 1))
584     instancelist = self.cfg.GetInstanceList()
585     if instancelist:
586       raise errors.OpPrereqError("There are still %d instance(s) in"
587                                  " this cluster." % len(instancelist))
588
589   def Exec(self, feedback_fn):
590     """Destroys the cluster.
591
592     """
593     master = self.sstore.GetMasterNode()
594     if not rpc.call_node_stop_master(master):
595       raise errors.OpExecError("Could not disable the master role")
596     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
597     utils.CreateBackup(priv_key)
598     utils.CreateBackup(pub_key)
599     rpc.call_node_leave_cluster(master)
600
601
602 class LUVerifyCluster(NoHooksLU):
603   """Verifies the cluster status.
604
605   """
606   _OP_REQP = []
607
608   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
609                   remote_version, feedback_fn):
610     """Run multiple tests against a node.
611
612     Test list:
613       - compares ganeti version
614       - checks vg existance and size > 20G
615       - checks config file checksum
616       - checks ssh to other nodes
617
618     Args:
619       node: name of the node to check
620       file_list: required list of files
621       local_cksum: dictionary of local files and their checksums
622
623     """
624     # compares ganeti version
625     local_version = constants.PROTOCOL_VERSION
626     if not remote_version:
627       feedback_fn(" - ERROR: connection to %s failed" % (node))
628       return True
629
630     if local_version != remote_version:
631       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
632                       (local_version, node, remote_version))
633       return True
634
635     # checks vg existance and size > 20G
636
637     bad = False
638     if not vglist:
639       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
640                       (node,))
641       bad = True
642     else:
643       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
644       if vgstatus:
645         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
646         bad = True
647
648     # checks config file checksum
649     # checks ssh to any
650
651     if 'filelist' not in node_result:
652       bad = True
653       feedback_fn("  - ERROR: node hasn't returned file checksum data")
654     else:
655       remote_cksum = node_result['filelist']
656       for file_name in file_list:
657         if file_name not in remote_cksum:
658           bad = True
659           feedback_fn("  - ERROR: file '%s' missing" % file_name)
660         elif remote_cksum[file_name] != local_cksum[file_name]:
661           bad = True
662           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
663
664     if 'nodelist' not in node_result:
665       bad = True
666       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
667     else:
668       if node_result['nodelist']:
669         bad = True
670         for node in node_result['nodelist']:
671           feedback_fn("  - ERROR: communication with node '%s': %s" %
672                           (node, node_result['nodelist'][node]))
673     hyp_result = node_result.get('hypervisor', None)
674     if hyp_result is not None:
675       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
676     return bad
677
678   def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
679     """Verify an instance.
680
681     This function checks to see if the required block devices are
682     available on the instance's node.
683
684     """
685     bad = False
686
687     instancelist = self.cfg.GetInstanceList()
688     if not instance in instancelist:
689       feedback_fn("  - ERROR: instance %s not in instance list %s" %
690                       (instance, instancelist))
691       bad = True
692
693     instanceconfig = self.cfg.GetInstanceInfo(instance)
694     node_current = instanceconfig.primary_node
695
696     node_vol_should = {}
697     instanceconfig.MapLVsByNode(node_vol_should)
698
699     for node in node_vol_should:
700       for volume in node_vol_should[node]:
701         if node not in node_vol_is or volume not in node_vol_is[node]:
702           feedback_fn("  - ERROR: volume %s missing on node %s" %
703                           (volume, node))
704           bad = True
705
706     if not instanceconfig.status == 'down':
707       if not instance in node_instance[node_current]:
708         feedback_fn("  - ERROR: instance %s not running on node %s" %
709                         (instance, node_current))
710         bad = True
711
712     for node in node_instance:
713       if (not node == node_current):
714         if instance in node_instance[node]:
715           feedback_fn("  - ERROR: instance %s should not run on node %s" %
716                           (instance, node))
717           bad = True
718
719     return bad
720
721   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
722     """Verify if there are any unknown volumes in the cluster.
723
724     The .os, .swap and backup volumes are ignored. All other volumes are
725     reported as unknown.
726
727     """
728     bad = False
729
730     for node in node_vol_is:
731       for volume in node_vol_is[node]:
732         if node not in node_vol_should or volume not in node_vol_should[node]:
733           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
734                       (volume, node))
735           bad = True
736     return bad
737
738   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
739     """Verify the list of running instances.
740
741     This checks what instances are running but unknown to the cluster.
742
743     """
744     bad = False
745     for node in node_instance:
746       for runninginstance in node_instance[node]:
747         if runninginstance not in instancelist:
748           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
749                           (runninginstance, node))
750           bad = True
751     return bad
752
753   def CheckPrereq(self):
754     """Check prerequisites.
755
756     This has no prerequisites.
757
758     """
759     pass
760
761   def Exec(self, feedback_fn):
762     """Verify integrity of cluster, performing various test on nodes.
763
764     """
765     bad = False
766     feedback_fn("* Verifying global settings")
767     for msg in self.cfg.VerifyConfig():
768       feedback_fn("  - ERROR: %s" % msg)
769
770     vg_name = self.cfg.GetVGName()
771     nodelist = utils.NiceSort(self.cfg.GetNodeList())
772     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
773     node_volume = {}
774     node_instance = {}
775
776     # FIXME: verify OS list
777     # do local checksums
778     file_names = list(self.sstore.GetFileList())
779     file_names.append(constants.SSL_CERT_FILE)
780     file_names.append(constants.CLUSTER_CONF_FILE)
781     local_checksums = utils.FingerprintFiles(file_names)
782
783     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
784     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
785     all_instanceinfo = rpc.call_instance_list(nodelist)
786     all_vglist = rpc.call_vg_list(nodelist)
787     node_verify_param = {
788       'filelist': file_names,
789       'nodelist': nodelist,
790       'hypervisor': None,
791       }
792     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
793     all_rversion = rpc.call_version(nodelist)
794
795     for node in nodelist:
796       feedback_fn("* Verifying node %s" % node)
797       result = self._VerifyNode(node, file_names, local_checksums,
798                                 all_vglist[node], all_nvinfo[node],
799                                 all_rversion[node], feedback_fn)
800       bad = bad or result
801
802       # node_volume
803       volumeinfo = all_volumeinfo[node]
804
805       if isinstance(volumeinfo, basestring):
806         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
807                     (node, volumeinfo[-400:].encode('string_escape')))
808         bad = True
809         node_volume[node] = {}
810       elif not isinstance(volumeinfo, dict):
811         feedback_fn("  - ERROR: connection to %s failed" % (node,))
812         bad = True
813         continue
814       else:
815         node_volume[node] = volumeinfo
816
817       # node_instance
818       nodeinstance = all_instanceinfo[node]
819       if type(nodeinstance) != list:
820         feedback_fn("  - ERROR: connection to %s failed" % (node,))
821         bad = True
822         continue
823
824       node_instance[node] = nodeinstance
825
826     node_vol_should = {}
827
828     for instance in instancelist:
829       feedback_fn("* Verifying instance %s" % instance)
830       result =  self._VerifyInstance(instance, node_volume, node_instance,
831                                      feedback_fn)
832       bad = bad or result
833
834       inst_config = self.cfg.GetInstanceInfo(instance)
835
836       inst_config.MapLVsByNode(node_vol_should)
837
838     feedback_fn("* Verifying orphan volumes")
839     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
840                                        feedback_fn)
841     bad = bad or result
842
843     feedback_fn("* Verifying remaining instances")
844     result = self._VerifyOrphanInstances(instancelist, node_instance,
845                                          feedback_fn)
846     bad = bad or result
847
848     return int(bad)
849
850
851 class LUVerifyDisks(NoHooksLU):
852   """Verifies the cluster disks status.
853
854   """
855   _OP_REQP = []
856
857   def CheckPrereq(self):
858     """Check prerequisites.
859
860     This has no prerequisites.
861
862     """
863     pass
864
865   def Exec(self, feedback_fn):
866     """Verify integrity of cluster disks.
867
868     """
869     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
870
871     vg_name = self.cfg.GetVGName()
872     nodes = utils.NiceSort(self.cfg.GetNodeList())
873     instances = [self.cfg.GetInstanceInfo(name)
874                  for name in self.cfg.GetInstanceList()]
875
876     nv_dict = {}
877     for inst in instances:
878       inst_lvs = {}
879       if (inst.status != "up" or
880           inst.disk_template not in constants.DTS_NET_MIRROR):
881         continue
882       inst.MapLVsByNode(inst_lvs)
883       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
884       for node, vol_list in inst_lvs.iteritems():
885         for vol in vol_list:
886           nv_dict[(node, vol)] = inst
887
888     if not nv_dict:
889       return result
890
891     node_lvs = rpc.call_volume_list(nodes, vg_name)
892
893     to_act = set()
894     for node in nodes:
895       # node_volume
896       lvs = node_lvs[node]
897
898       if isinstance(lvs, basestring):
899         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
900         res_nlvm[node] = lvs
901       elif not isinstance(lvs, dict):
902         logger.Info("connection to node %s failed or invalid data returned" %
903                     (node,))
904         res_nodes.append(node)
905         continue
906
907       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
908         inst = nv_dict.pop((node, lv_name), None)
909         if (not lv_online and inst is not None
910             and inst.name not in res_instances):
911           res_instances.append(inst.name)
912
913     # any leftover items in nv_dict are missing LVs, let's arrange the
914     # data better
915     for key, inst in nv_dict.iteritems():
916       if inst.name not in res_missing:
917         res_missing[inst.name] = []
918       res_missing[inst.name].append(key)
919
920     return result
921
922
923 class LURenameCluster(LogicalUnit):
924   """Rename the cluster.
925
926   """
927   HPATH = "cluster-rename"
928   HTYPE = constants.HTYPE_CLUSTER
929   _OP_REQP = ["name"]
930
931   def BuildHooksEnv(self):
932     """Build hooks env.
933
934     """
935     env = {
936       "OP_TARGET": self.sstore.GetClusterName(),
937       "NEW_NAME": self.op.name,
938       }
939     mn = self.sstore.GetMasterNode()
940     return env, [mn], [mn]
941
942   def CheckPrereq(self):
943     """Verify that the passed name is a valid one.
944
945     """
946     hostname = utils.HostInfo(self.op.name)
947
948     new_name = hostname.name
949     self.ip = new_ip = hostname.ip
950     old_name = self.sstore.GetClusterName()
951     old_ip = self.sstore.GetMasterIP()
952     if new_name == old_name and new_ip == old_ip:
953       raise errors.OpPrereqError("Neither the name nor the IP address of the"
954                                  " cluster has changed")
955     if new_ip != old_ip:
956       result = utils.RunCmd(["fping", "-q", new_ip])
957       if not result.failed:
958         raise errors.OpPrereqError("The given cluster IP address (%s) is"
959                                    " reachable on the network. Aborting." %
960                                    new_ip)
961
962     self.op.name = new_name
963
964   def Exec(self, feedback_fn):
965     """Rename the cluster.
966
967     """
968     clustername = self.op.name
969     ip = self.ip
970     ss = self.sstore
971
972     # shutdown the master IP
973     master = ss.GetMasterNode()
974     if not rpc.call_node_stop_master(master):
975       raise errors.OpExecError("Could not disable the master role")
976
977     try:
978       # modify the sstore
979       ss.SetKey(ss.SS_MASTER_IP, ip)
980       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
981
982       # Distribute updated ss config to all nodes
983       myself = self.cfg.GetNodeInfo(master)
984       dist_nodes = self.cfg.GetNodeList()
985       if myself.name in dist_nodes:
986         dist_nodes.remove(myself.name)
987
988       logger.Debug("Copying updated ssconf data to all nodes")
989       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
990         fname = ss.KeyToFilename(keyname)
991         result = rpc.call_upload_file(dist_nodes, fname)
992         for to_node in dist_nodes:
993           if not result[to_node]:
994             logger.Error("copy of file %s to node %s failed" %
995                          (fname, to_node))
996     finally:
997       if not rpc.call_node_start_master(master):
998         logger.Error("Could not re-enable the master role on the master,"
999                      " please restart manually.")
1000
1001
1002 def _RecursiveCheckIfLVMBased(disk):
1003   """Check if the given disk or its children are lvm-based.
1004
1005   Args:
1006     disk: ganeti.objects.Disk object
1007
1008   Returns:
1009     boolean indicating whether a LD_LV dev_type was found or not
1010
1011   """
1012   if disk.children:
1013     for chdisk in disk.children:
1014       if _RecursiveCheckIfLVMBased(chdisk):
1015         return True
1016   return disk.dev_type == constants.LD_LV
1017
1018
1019 class LUSetClusterParams(LogicalUnit):
1020   """Change the parameters of the cluster.
1021
1022   """
1023   HPATH = "cluster-modify"
1024   HTYPE = constants.HTYPE_CLUSTER
1025   _OP_REQP = []
1026
1027   def BuildHooksEnv(self):
1028     """Build hooks env.
1029
1030     """
1031     env = {
1032       "OP_TARGET": self.sstore.GetClusterName(),
1033       "NEW_VG_NAME": self.op.vg_name,
1034       }
1035     mn = self.sstore.GetMasterNode()
1036     return env, [mn], [mn]
1037
1038   def CheckPrereq(self):
1039     """Check prerequisites.
1040
1041     This checks whether the given params don't conflict and
1042     if the given volume group is valid.
1043
1044     """
1045     if not self.op.vg_name:
1046       instances = [self.cfg.GetInstanceInfo(name)
1047                    for name in self.cfg.GetInstanceList()]
1048       for inst in instances:
1049         for disk in inst.disks:
1050           if _RecursiveCheckIfLVMBased(disk):
1051             raise errors.OpPrereqError("Cannot disable lvm storage while"
1052                                        " lvm-based instances exist")
1053
1054     # if vg_name not None, checks given volume group on all nodes
1055     if self.op.vg_name:
1056       node_list = self.cfg.GetNodeList()
1057       vglist = rpc.call_vg_list(node_list)
1058       for node in node_list:
1059         vgstatus = _HasValidVG(vglist[node], self.op.vg_name)
1060         if vgstatus:
1061           raise errors.OpPrereqError("Error on node '%s': %s" %
1062                                      (node, vgstatus))
1063
1064   def Exec(self, feedback_fn):
1065     """Change the parameters of the cluster.
1066
1067     """
1068     if self.op.vg_name != self.cfg.GetVGName():
1069       self.cfg.SetVGName(self.op.vg_name)
1070     else:
1071       feedback_fn("Cluster LVM configuration already in desired"
1072                   " state, not changing")
1073
1074
1075 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1076   """Sleep and poll for an instance's disk to sync.
1077
1078   """
1079   if not instance.disks:
1080     return True
1081
1082   if not oneshot:
1083     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1084
1085   node = instance.primary_node
1086
1087   for dev in instance.disks:
1088     cfgw.SetDiskID(dev, node)
1089
1090   retries = 0
1091   while True:
1092     max_time = 0
1093     done = True
1094     cumul_degraded = False
1095     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1096     if not rstats:
1097       proc.LogWarning("Can't get any data from node %s" % node)
1098       retries += 1
1099       if retries >= 10:
1100         raise errors.RemoteError("Can't contact node %s for mirror data,"
1101                                  " aborting." % node)
1102       time.sleep(6)
1103       continue
1104     retries = 0
1105     for i in range(len(rstats)):
1106       mstat = rstats[i]
1107       if mstat is None:
1108         proc.LogWarning("Can't compute data for node %s/%s" %
1109                         (node, instance.disks[i].iv_name))
1110         continue
1111       # we ignore the ldisk parameter
1112       perc_done, est_time, is_degraded, _ = mstat
1113       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1114       if perc_done is not None:
1115         done = False
1116         if est_time is not None:
1117           rem_time = "%d estimated seconds remaining" % est_time
1118           max_time = est_time
1119         else:
1120           rem_time = "no time estimate"
1121         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1122                      (instance.disks[i].iv_name, perc_done, rem_time))
1123     if done or oneshot:
1124       break
1125
1126     if unlock:
1127       utils.Unlock('cmd')
1128     try:
1129       time.sleep(min(60, max_time))
1130     finally:
1131       if unlock:
1132         utils.Lock('cmd')
1133
1134   if done:
1135     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1136   return not cumul_degraded
1137
1138
1139 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1140   """Check that mirrors are not degraded.
1141
1142   The ldisk parameter, if True, will change the test from the
1143   is_degraded attribute (which represents overall non-ok status for
1144   the device(s)) to the ldisk (representing the local storage status).
1145
1146   """
1147   cfgw.SetDiskID(dev, node)
1148   if ldisk:
1149     idx = 6
1150   else:
1151     idx = 5
1152
1153   result = True
1154   if on_primary or dev.AssembleOnSecondary():
1155     rstats = rpc.call_blockdev_find(node, dev)
1156     if not rstats:
1157       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1158       result = False
1159     else:
1160       result = result and (not rstats[idx])
1161   if dev.children:
1162     for child in dev.children:
1163       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1164
1165   return result
1166
1167
1168 class LUDiagnoseOS(NoHooksLU):
1169   """Logical unit for OS diagnose/query.
1170
1171   """
1172   _OP_REQP = []
1173
1174   def CheckPrereq(self):
1175     """Check prerequisites.
1176
1177     This always succeeds, since this is a pure query LU.
1178
1179     """
1180     return
1181
1182   def Exec(self, feedback_fn):
1183     """Compute the list of OSes.
1184
1185     """
1186     node_list = self.cfg.GetNodeList()
1187     node_data = rpc.call_os_diagnose(node_list)
1188     if node_data == False:
1189       raise errors.OpExecError("Can't gather the list of OSes")
1190     return node_data
1191
1192
1193 class LURemoveNode(LogicalUnit):
1194   """Logical unit for removing a node.
1195
1196   """
1197   HPATH = "node-remove"
1198   HTYPE = constants.HTYPE_NODE
1199   _OP_REQP = ["node_name"]
1200
1201   def BuildHooksEnv(self):
1202     """Build hooks env.
1203
1204     This doesn't run on the target node in the pre phase as a failed
1205     node would not allows itself to run.
1206
1207     """
1208     env = {
1209       "OP_TARGET": self.op.node_name,
1210       "NODE_NAME": self.op.node_name,
1211       }
1212     all_nodes = self.cfg.GetNodeList()
1213     all_nodes.remove(self.op.node_name)
1214     return env, all_nodes, all_nodes
1215
1216   def CheckPrereq(self):
1217     """Check prerequisites.
1218
1219     This checks:
1220      - the node exists in the configuration
1221      - it does not have primary or secondary instances
1222      - it's not the master
1223
1224     Any errors are signalled by raising errors.OpPrereqError.
1225
1226     """
1227     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1228     if node is None:
1229       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1230
1231     instance_list = self.cfg.GetInstanceList()
1232
1233     masternode = self.sstore.GetMasterNode()
1234     if node.name == masternode:
1235       raise errors.OpPrereqError("Node is the master node,"
1236                                  " you need to failover first.")
1237
1238     for instance_name in instance_list:
1239       instance = self.cfg.GetInstanceInfo(instance_name)
1240       if node.name == instance.primary_node:
1241         raise errors.OpPrereqError("Instance %s still running on the node,"
1242                                    " please remove first." % instance_name)
1243       if node.name in instance.secondary_nodes:
1244         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1245                                    " please remove first." % instance_name)
1246     self.op.node_name = node.name
1247     self.node = node
1248
1249   def Exec(self, feedback_fn):
1250     """Removes the node from the cluster.
1251
1252     """
1253     node = self.node
1254     logger.Info("stopping the node daemon and removing configs from node %s" %
1255                 node.name)
1256
1257     rpc.call_node_leave_cluster(node.name)
1258
1259     self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1260
1261     logger.Info("Removing node %s from config" % node.name)
1262
1263     self.cfg.RemoveNode(node.name)
1264
1265     _RemoveHostFromEtcHosts(node.name)
1266
1267
1268 class LUQueryNodes(NoHooksLU):
1269   """Logical unit for querying nodes.
1270
1271   """
1272   _OP_REQP = ["output_fields", "names"]
1273
1274   def CheckPrereq(self):
1275     """Check prerequisites.
1276
1277     This checks that the fields required are valid output fields.
1278
1279     """
1280     self.dynamic_fields = frozenset(["dtotal", "dfree",
1281                                      "mtotal", "mnode", "mfree",
1282                                      "bootid"])
1283
1284     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1285                                "pinst_list", "sinst_list",
1286                                "pip", "sip"],
1287                        dynamic=self.dynamic_fields,
1288                        selected=self.op.output_fields)
1289
1290     self.wanted = _GetWantedNodes(self, self.op.names)
1291
1292   def Exec(self, feedback_fn):
1293     """Computes the list of nodes and their attributes.
1294
1295     """
1296     nodenames = self.wanted
1297     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1298
1299     # begin data gathering
1300
1301     if self.dynamic_fields.intersection(self.op.output_fields):
1302       live_data = {}
1303       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1304       for name in nodenames:
1305         nodeinfo = node_data.get(name, None)
1306         if nodeinfo:
1307           live_data[name] = {
1308             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1309             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1310             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1311             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1312             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1313             "bootid": nodeinfo['bootid'],
1314             }
1315         else:
1316           live_data[name] = {}
1317     else:
1318       live_data = dict.fromkeys(nodenames, {})
1319
1320     node_to_primary = dict([(name, set()) for name in nodenames])
1321     node_to_secondary = dict([(name, set()) for name in nodenames])
1322
1323     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1324                              "sinst_cnt", "sinst_list"))
1325     if inst_fields & frozenset(self.op.output_fields):
1326       instancelist = self.cfg.GetInstanceList()
1327
1328       for instance_name in instancelist:
1329         inst = self.cfg.GetInstanceInfo(instance_name)
1330         if inst.primary_node in node_to_primary:
1331           node_to_primary[inst.primary_node].add(inst.name)
1332         for secnode in inst.secondary_nodes:
1333           if secnode in node_to_secondary:
1334             node_to_secondary[secnode].add(inst.name)
1335
1336     # end data gathering
1337
1338     output = []
1339     for node in nodelist:
1340       node_output = []
1341       for field in self.op.output_fields:
1342         if field == "name":
1343           val = node.name
1344         elif field == "pinst_list":
1345           val = list(node_to_primary[node.name])
1346         elif field == "sinst_list":
1347           val = list(node_to_secondary[node.name])
1348         elif field == "pinst_cnt":
1349           val = len(node_to_primary[node.name])
1350         elif field == "sinst_cnt":
1351           val = len(node_to_secondary[node.name])
1352         elif field == "pip":
1353           val = node.primary_ip
1354         elif field == "sip":
1355           val = node.secondary_ip
1356         elif field in self.dynamic_fields:
1357           val = live_data[node.name].get(field, None)
1358         else:
1359           raise errors.ParameterError(field)
1360         node_output.append(val)
1361       output.append(node_output)
1362
1363     return output
1364
1365
1366 class LUQueryNodeVolumes(NoHooksLU):
1367   """Logical unit for getting volumes on node(s).
1368
1369   """
1370   _OP_REQP = ["nodes", "output_fields"]
1371
1372   def CheckPrereq(self):
1373     """Check prerequisites.
1374
1375     This checks that the fields required are valid output fields.
1376
1377     """
1378     self.nodes = _GetWantedNodes(self, self.op.nodes)
1379
1380     _CheckOutputFields(static=["node"],
1381                        dynamic=["phys", "vg", "name", "size", "instance"],
1382                        selected=self.op.output_fields)
1383
1384
1385   def Exec(self, feedback_fn):
1386     """Computes the list of nodes and their attributes.
1387
1388     """
1389     nodenames = self.nodes
1390     volumes = rpc.call_node_volumes(nodenames)
1391
1392     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1393              in self.cfg.GetInstanceList()]
1394
1395     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1396
1397     output = []
1398     for node in nodenames:
1399       if node not in volumes or not volumes[node]:
1400         continue
1401
1402       node_vols = volumes[node][:]
1403       node_vols.sort(key=lambda vol: vol['dev'])
1404
1405       for vol in node_vols:
1406         node_output = []
1407         for field in self.op.output_fields:
1408           if field == "node":
1409             val = node
1410           elif field == "phys":
1411             val = vol['dev']
1412           elif field == "vg":
1413             val = vol['vg']
1414           elif field == "name":
1415             val = vol['name']
1416           elif field == "size":
1417             val = int(float(vol['size']))
1418           elif field == "instance":
1419             for inst in ilist:
1420               if node not in lv_by_node[inst]:
1421                 continue
1422               if vol['name'] in lv_by_node[inst][node]:
1423                 val = inst.name
1424                 break
1425             else:
1426               val = '-'
1427           else:
1428             raise errors.ParameterError(field)
1429           node_output.append(str(val))
1430
1431         output.append(node_output)
1432
1433     return output
1434
1435
1436 class LUAddNode(LogicalUnit):
1437   """Logical unit for adding node to the cluster.
1438
1439   """
1440   HPATH = "node-add"
1441   HTYPE = constants.HTYPE_NODE
1442   _OP_REQP = ["node_name"]
1443
1444   def BuildHooksEnv(self):
1445     """Build hooks env.
1446
1447     This will run on all nodes before, and on all nodes + the new node after.
1448
1449     """
1450     env = {
1451       "OP_TARGET": self.op.node_name,
1452       "NODE_NAME": self.op.node_name,
1453       "NODE_PIP": self.op.primary_ip,
1454       "NODE_SIP": self.op.secondary_ip,
1455       }
1456     nodes_0 = self.cfg.GetNodeList()
1457     nodes_1 = nodes_0 + [self.op.node_name, ]
1458     return env, nodes_0, nodes_1
1459
1460   def CheckPrereq(self):
1461     """Check prerequisites.
1462
1463     This checks:
1464      - the new node is not already in the config
1465      - it is resolvable
1466      - its parameters (single/dual homed) matches the cluster
1467
1468     Any errors are signalled by raising errors.OpPrereqError.
1469
1470     """
1471     node_name = self.op.node_name
1472     cfg = self.cfg
1473
1474     dns_data = utils.HostInfo(node_name)
1475
1476     node = dns_data.name
1477     primary_ip = self.op.primary_ip = dns_data.ip
1478     secondary_ip = getattr(self.op, "secondary_ip", None)
1479     if secondary_ip is None:
1480       secondary_ip = primary_ip
1481     if not utils.IsValidIP(secondary_ip):
1482       raise errors.OpPrereqError("Invalid secondary IP given")
1483     self.op.secondary_ip = secondary_ip
1484     node_list = cfg.GetNodeList()
1485     if node in node_list:
1486       raise errors.OpPrereqError("Node %s is already in the configuration"
1487                                  % node)
1488
1489     for existing_node_name in node_list:
1490       existing_node = cfg.GetNodeInfo(existing_node_name)
1491       if (existing_node.primary_ip == primary_ip or
1492           existing_node.secondary_ip == primary_ip or
1493           existing_node.primary_ip == secondary_ip or
1494           existing_node.secondary_ip == secondary_ip):
1495         raise errors.OpPrereqError("New node ip address(es) conflict with"
1496                                    " existing node %s" % existing_node.name)
1497
1498     # check that the type of the node (single versus dual homed) is the
1499     # same as for the master
1500     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1501     master_singlehomed = myself.secondary_ip == myself.primary_ip
1502     newbie_singlehomed = secondary_ip == primary_ip
1503     if master_singlehomed != newbie_singlehomed:
1504       if master_singlehomed:
1505         raise errors.OpPrereqError("The master has no private ip but the"
1506                                    " new node has one")
1507       else:
1508         raise errors.OpPrereqError("The master has a private ip but the"
1509                                    " new node doesn't have one")
1510
1511     # checks reachablity
1512     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1513       raise errors.OpPrereqError("Node not reachable by ping")
1514
1515     if not newbie_singlehomed:
1516       # check reachability from my secondary ip to newbie's secondary ip
1517       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1518                            source=myself.secondary_ip):
1519         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1520                                    " based ping to noded port")
1521
1522     self.new_node = objects.Node(name=node,
1523                                  primary_ip=primary_ip,
1524                                  secondary_ip=secondary_ip)
1525
1526     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1527       if not os.path.exists(constants.VNC_PASSWORD_FILE):
1528         raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1529                                    constants.VNC_PASSWORD_FILE)
1530
1531   def Exec(self, feedback_fn):
1532     """Adds the new node to the cluster.
1533
1534     """
1535     new_node = self.new_node
1536     node = new_node.name
1537
1538     # set up inter-node password and certificate and restarts the node daemon
1539     gntpass = self.sstore.GetNodeDaemonPassword()
1540     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1541       raise errors.OpExecError("ganeti password corruption detected")
1542     f = open(constants.SSL_CERT_FILE)
1543     try:
1544       gntpem = f.read(8192)
1545     finally:
1546       f.close()
1547     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1548     # so we use this to detect an invalid certificate; as long as the
1549     # cert doesn't contain this, the here-document will be correctly
1550     # parsed by the shell sequence below
1551     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1552       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1553     if not gntpem.endswith("\n"):
1554       raise errors.OpExecError("PEM must end with newline")
1555     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1556
1557     # and then connect with ssh to set password and start ganeti-noded
1558     # note that all the below variables are sanitized at this point,
1559     # either by being constants or by the checks above
1560     ss = self.sstore
1561     mycommand = ("umask 077 && "
1562                  "echo '%s' > '%s' && "
1563                  "cat > '%s' << '!EOF.' && \n"
1564                  "%s!EOF.\n%s restart" %
1565                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1566                   constants.SSL_CERT_FILE, gntpem,
1567                   constants.NODE_INITD_SCRIPT))
1568
1569     result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1570     if result.failed:
1571       raise errors.OpExecError("Remote command on node %s, error: %s,"
1572                                " output: %s" %
1573                                (node, result.fail_reason, result.output))
1574
1575     # check connectivity
1576     time.sleep(4)
1577
1578     result = rpc.call_version([node])[node]
1579     if result:
1580       if constants.PROTOCOL_VERSION == result:
1581         logger.Info("communication to node %s fine, sw version %s match" %
1582                     (node, result))
1583       else:
1584         raise errors.OpExecError("Version mismatch master version %s,"
1585                                  " node version %s" %
1586                                  (constants.PROTOCOL_VERSION, result))
1587     else:
1588       raise errors.OpExecError("Cannot get version from the new node")
1589
1590     # setup ssh on node
1591     logger.Info("copy ssh key to node %s" % node)
1592     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1593     keyarray = []
1594     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1595                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1596                 priv_key, pub_key]
1597
1598     for i in keyfiles:
1599       f = open(i, 'r')
1600       try:
1601         keyarray.append(f.read())
1602       finally:
1603         f.close()
1604
1605     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1606                                keyarray[3], keyarray[4], keyarray[5])
1607
1608     if not result:
1609       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1610
1611     # Add node to our /etc/hosts, and add key to known_hosts
1612     _AddHostToEtcHosts(new_node.name)
1613
1614     if new_node.secondary_ip != new_node.primary_ip:
1615       if not rpc.call_node_tcp_ping(new_node.name,
1616                                     constants.LOCALHOST_IP_ADDRESS,
1617                                     new_node.secondary_ip,
1618                                     constants.DEFAULT_NODED_PORT,
1619                                     10, False):
1620         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1621                                  " you gave (%s). Please fix and re-run this"
1622                                  " command." % new_node.secondary_ip)
1623
1624     success, msg = self.ssh.VerifyNodeHostname(node)
1625     if not success:
1626       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1627                                " than the one the resolver gives: %s."
1628                                " Please fix and re-run this command." %
1629                                (node, msg))
1630
1631     # Distribute updated /etc/hosts and known_hosts to all nodes,
1632     # including the node just added
1633     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1634     dist_nodes = self.cfg.GetNodeList() + [node]
1635     if myself.name in dist_nodes:
1636       dist_nodes.remove(myself.name)
1637
1638     logger.Debug("Copying hosts and known_hosts to all nodes")
1639     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1640       result = rpc.call_upload_file(dist_nodes, fname)
1641       for to_node in dist_nodes:
1642         if not result[to_node]:
1643           logger.Error("copy of file %s to node %s failed" %
1644                        (fname, to_node))
1645
1646     to_copy = ss.GetFileList()
1647     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1648       to_copy.append(constants.VNC_PASSWORD_FILE)
1649     for fname in to_copy:
1650       if not self.ssh.CopyFileToNode(node, fname):
1651         logger.Error("could not copy file %s to node %s" % (fname, node))
1652
1653     logger.Info("adding node %s to cluster.conf" % node)
1654     self.cfg.AddNode(new_node)
1655
1656
1657 class LUMasterFailover(LogicalUnit):
1658   """Failover the master node to the current node.
1659
1660   This is a special LU in that it must run on a non-master node.
1661
1662   """
1663   HPATH = "master-failover"
1664   HTYPE = constants.HTYPE_CLUSTER
1665   REQ_MASTER = False
1666   _OP_REQP = []
1667
1668   def BuildHooksEnv(self):
1669     """Build hooks env.
1670
1671     This will run on the new master only in the pre phase, and on all
1672     the nodes in the post phase.
1673
1674     """
1675     env = {
1676       "OP_TARGET": self.new_master,
1677       "NEW_MASTER": self.new_master,
1678       "OLD_MASTER": self.old_master,
1679       }
1680     return env, [self.new_master], self.cfg.GetNodeList()
1681
1682   def CheckPrereq(self):
1683     """Check prerequisites.
1684
1685     This checks that we are not already the master.
1686
1687     """
1688     self.new_master = utils.HostInfo().name
1689     self.old_master = self.sstore.GetMasterNode()
1690
1691     if self.old_master == self.new_master:
1692       raise errors.OpPrereqError("This commands must be run on the node"
1693                                  " where you want the new master to be."
1694                                  " %s is already the master" %
1695                                  self.old_master)
1696
1697   def Exec(self, feedback_fn):
1698     """Failover the master node.
1699
1700     This command, when run on a non-master node, will cause the current
1701     master to cease being master, and the non-master to become new
1702     master.
1703
1704     """
1705     #TODO: do not rely on gethostname returning the FQDN
1706     logger.Info("setting master to %s, old master: %s" %
1707                 (self.new_master, self.old_master))
1708
1709     if not rpc.call_node_stop_master(self.old_master):
1710       logger.Error("could disable the master role on the old master"
1711                    " %s, please disable manually" % self.old_master)
1712
1713     ss = self.sstore
1714     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1715     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1716                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1717       logger.Error("could not distribute the new simple store master file"
1718                    " to the other nodes, please check.")
1719
1720     if not rpc.call_node_start_master(self.new_master):
1721       logger.Error("could not start the master role on the new master"
1722                    " %s, please check" % self.new_master)
1723       feedback_fn("Error in activating the master IP on the new master,"
1724                   " please fix manually.")
1725
1726
1727
1728 class LUQueryClusterInfo(NoHooksLU):
1729   """Query cluster configuration.
1730
1731   """
1732   _OP_REQP = []
1733   REQ_MASTER = False
1734
1735   def CheckPrereq(self):
1736     """No prerequsites needed for this LU.
1737
1738     """
1739     pass
1740
1741   def Exec(self, feedback_fn):
1742     """Return cluster config.
1743
1744     """
1745     result = {
1746       "name": self.sstore.GetClusterName(),
1747       "software_version": constants.RELEASE_VERSION,
1748       "protocol_version": constants.PROTOCOL_VERSION,
1749       "config_version": constants.CONFIG_VERSION,
1750       "os_api_version": constants.OS_API_VERSION,
1751       "export_version": constants.EXPORT_VERSION,
1752       "master": self.sstore.GetMasterNode(),
1753       "architecture": (platform.architecture()[0], platform.machine()),
1754       }
1755
1756     return result
1757
1758
1759 class LUClusterCopyFile(NoHooksLU):
1760   """Copy file to cluster.
1761
1762   """
1763   _OP_REQP = ["nodes", "filename"]
1764
1765   def CheckPrereq(self):
1766     """Check prerequisites.
1767
1768     It should check that the named file exists and that the given list
1769     of nodes is valid.
1770
1771     """
1772     if not os.path.exists(self.op.filename):
1773       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1774
1775     self.nodes = _GetWantedNodes(self, self.op.nodes)
1776
1777   def Exec(self, feedback_fn):
1778     """Copy a file from master to some nodes.
1779
1780     Args:
1781       opts - class with options as members
1782       args - list containing a single element, the file name
1783     Opts used:
1784       nodes - list containing the name of target nodes; if empty, all nodes
1785
1786     """
1787     filename = self.op.filename
1788
1789     myname = utils.HostInfo().name
1790
1791     for node in self.nodes:
1792       if node == myname:
1793         continue
1794       if not self.ssh.CopyFileToNode(node, filename):
1795         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1796
1797
1798 class LUDumpClusterConfig(NoHooksLU):
1799   """Return a text-representation of the cluster-config.
1800
1801   """
1802   _OP_REQP = []
1803
1804   def CheckPrereq(self):
1805     """No prerequisites.
1806
1807     """
1808     pass
1809
1810   def Exec(self, feedback_fn):
1811     """Dump a representation of the cluster config to the standard output.
1812
1813     """
1814     return self.cfg.DumpConfig()
1815
1816
1817 class LURunClusterCommand(NoHooksLU):
1818   """Run a command on some nodes.
1819
1820   """
1821   _OP_REQP = ["command", "nodes"]
1822
1823   def CheckPrereq(self):
1824     """Check prerequisites.
1825
1826     It checks that the given list of nodes is valid.
1827
1828     """
1829     self.nodes = _GetWantedNodes(self, self.op.nodes)
1830
1831   def Exec(self, feedback_fn):
1832     """Run a command on some nodes.
1833
1834     """
1835     # put the master at the end of the nodes list
1836     master_node = self.sstore.GetMasterNode()
1837     if master_node in self.nodes:
1838       self.nodes.remove(master_node)
1839       self.nodes.append(master_node)
1840
1841     data = []
1842     for node in self.nodes:
1843       result = self.ssh.Run(node, "root", self.op.command)
1844       data.append((node, result.output, result.exit_code))
1845
1846     return data
1847
1848
1849 class LUActivateInstanceDisks(NoHooksLU):
1850   """Bring up an instance's disks.
1851
1852   """
1853   _OP_REQP = ["instance_name"]
1854
1855   def CheckPrereq(self):
1856     """Check prerequisites.
1857
1858     This checks that the instance is in the cluster.
1859
1860     """
1861     instance = self.cfg.GetInstanceInfo(
1862       self.cfg.ExpandInstanceName(self.op.instance_name))
1863     if instance is None:
1864       raise errors.OpPrereqError("Instance '%s' not known" %
1865                                  self.op.instance_name)
1866     self.instance = instance
1867
1868
1869   def Exec(self, feedback_fn):
1870     """Activate the disks.
1871
1872     """
1873     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1874     if not disks_ok:
1875       raise errors.OpExecError("Cannot activate block devices")
1876
1877     return disks_info
1878
1879
1880 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1881   """Prepare the block devices for an instance.
1882
1883   This sets up the block devices on all nodes.
1884
1885   Args:
1886     instance: a ganeti.objects.Instance object
1887     ignore_secondaries: if true, errors on secondary nodes won't result
1888                         in an error return from the function
1889
1890   Returns:
1891     false if the operation failed
1892     list of (host, instance_visible_name, node_visible_name) if the operation
1893          suceeded with the mapping from node devices to instance devices
1894   """
1895   device_info = []
1896   disks_ok = True
1897   iname = instance.name
1898   # With the two passes mechanism we try to reduce the window of
1899   # opportunity for the race condition of switching DRBD to primary
1900   # before handshaking occured, but we do not eliminate it
1901
1902   # The proper fix would be to wait (with some limits) until the
1903   # connection has been made and drbd transitions from WFConnection
1904   # into any other network-connected state (Connected, SyncTarget,
1905   # SyncSource, etc.)
1906
1907   # 1st pass, assemble on all nodes in secondary mode
1908   for inst_disk in instance.disks:
1909     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1910       cfg.SetDiskID(node_disk, node)
1911       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1912       if not result:
1913         logger.Error("could not prepare block device %s on node %s"
1914                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1915         if not ignore_secondaries:
1916           disks_ok = False
1917
1918   # FIXME: race condition on drbd migration to primary
1919
1920   # 2nd pass, do only the primary node
1921   for inst_disk in instance.disks:
1922     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1923       if node != instance.primary_node:
1924         continue
1925       cfg.SetDiskID(node_disk, node)
1926       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1927       if not result:
1928         logger.Error("could not prepare block device %s on node %s"
1929                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1930         disks_ok = False
1931     device_info.append((instance.primary_node, inst_disk.iv_name, result))
1932
1933   # leave the disks configured for the primary node
1934   # this is a workaround that would be fixed better by
1935   # improving the logical/physical id handling
1936   for disk in instance.disks:
1937     cfg.SetDiskID(disk, instance.primary_node)
1938
1939   return disks_ok, device_info
1940
1941
1942 def _StartInstanceDisks(cfg, instance, force):
1943   """Start the disks of an instance.
1944
1945   """
1946   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1947                                            ignore_secondaries=force)
1948   if not disks_ok:
1949     _ShutdownInstanceDisks(instance, cfg)
1950     if force is not None and not force:
1951       logger.Error("If the message above refers to a secondary node,"
1952                    " you can retry the operation using '--force'.")
1953     raise errors.OpExecError("Disk consistency error")
1954
1955
1956 class LUDeactivateInstanceDisks(NoHooksLU):
1957   """Shutdown an instance's disks.
1958
1959   """
1960   _OP_REQP = ["instance_name"]
1961
1962   def CheckPrereq(self):
1963     """Check prerequisites.
1964
1965     This checks that the instance is in the cluster.
1966
1967     """
1968     instance = self.cfg.GetInstanceInfo(
1969       self.cfg.ExpandInstanceName(self.op.instance_name))
1970     if instance is None:
1971       raise errors.OpPrereqError("Instance '%s' not known" %
1972                                  self.op.instance_name)
1973     self.instance = instance
1974
1975   def Exec(self, feedback_fn):
1976     """Deactivate the disks
1977
1978     """
1979     instance = self.instance
1980     ins_l = rpc.call_instance_list([instance.primary_node])
1981     ins_l = ins_l[instance.primary_node]
1982     if not type(ins_l) is list:
1983       raise errors.OpExecError("Can't contact node '%s'" %
1984                                instance.primary_node)
1985
1986     if self.instance.name in ins_l:
1987       raise errors.OpExecError("Instance is running, can't shutdown"
1988                                " block devices.")
1989
1990     _ShutdownInstanceDisks(instance, self.cfg)
1991
1992
1993 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1994   """Shutdown block devices of an instance.
1995
1996   This does the shutdown on all nodes of the instance.
1997
1998   If the ignore_primary is false, errors on the primary node are
1999   ignored.
2000
2001   """
2002   result = True
2003   for disk in instance.disks:
2004     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2005       cfg.SetDiskID(top_disk, node)
2006       if not rpc.call_blockdev_shutdown(node, top_disk):
2007         logger.Error("could not shutdown block device %s on node %s" %
2008                      (disk.iv_name, node))
2009         if not ignore_primary or node != instance.primary_node:
2010           result = False
2011   return result
2012
2013
2014 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2015   """Checks if a node has enough free memory.
2016
2017   This function check if a given node has the needed amount of free
2018   memory. In case the node has less memory or we cannot get the
2019   information from the node, this function raise an OpPrereqError
2020   exception.
2021
2022   Args:
2023     - cfg: a ConfigWriter instance
2024     - node: the node name
2025     - reason: string to use in the error message
2026     - requested: the amount of memory in MiB
2027
2028   """
2029   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2030   if not nodeinfo or not isinstance(nodeinfo, dict):
2031     raise errors.OpPrereqError("Could not contact node %s for resource"
2032                              " information" % (node,))
2033
2034   free_mem = nodeinfo[node].get('memory_free')
2035   if not isinstance(free_mem, int):
2036     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2037                              " was '%s'" % (node, free_mem))
2038   if requested > free_mem:
2039     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2040                              " needed %s MiB, available %s MiB" %
2041                              (node, reason, requested, free_mem))
2042
2043
2044 class LUStartupInstance(LogicalUnit):
2045   """Starts an instance.
2046
2047   """
2048   HPATH = "instance-start"
2049   HTYPE = constants.HTYPE_INSTANCE
2050   _OP_REQP = ["instance_name", "force"]
2051
2052   def BuildHooksEnv(self):
2053     """Build hooks env.
2054
2055     This runs on master, primary and secondary nodes of the instance.
2056
2057     """
2058     env = {
2059       "FORCE": self.op.force,
2060       }
2061     env.update(_BuildInstanceHookEnvByObject(self.instance))
2062     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2063           list(self.instance.secondary_nodes))
2064     return env, nl, nl
2065
2066   def CheckPrereq(self):
2067     """Check prerequisites.
2068
2069     This checks that the instance is in the cluster.
2070
2071     """
2072     instance = self.cfg.GetInstanceInfo(
2073       self.cfg.ExpandInstanceName(self.op.instance_name))
2074     if instance is None:
2075       raise errors.OpPrereqError("Instance '%s' not known" %
2076                                  self.op.instance_name)
2077
2078     # check bridges existance
2079     _CheckInstanceBridgesExist(instance)
2080
2081     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2082                          "starting instance %s" % instance.name,
2083                          instance.memory)
2084
2085     self.instance = instance
2086     self.op.instance_name = instance.name
2087
2088   def Exec(self, feedback_fn):
2089     """Start the instance.
2090
2091     """
2092     instance = self.instance
2093     force = self.op.force
2094     extra_args = getattr(self.op, "extra_args", "")
2095
2096     self.cfg.MarkInstanceUp(instance.name)
2097
2098     node_current = instance.primary_node
2099
2100     _StartInstanceDisks(self.cfg, instance, force)
2101
2102     if not rpc.call_instance_start(node_current, instance, extra_args):
2103       _ShutdownInstanceDisks(instance, self.cfg)
2104       raise errors.OpExecError("Could not start instance")
2105
2106
2107 class LURebootInstance(LogicalUnit):
2108   """Reboot an instance.
2109
2110   """
2111   HPATH = "instance-reboot"
2112   HTYPE = constants.HTYPE_INSTANCE
2113   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2114
2115   def BuildHooksEnv(self):
2116     """Build hooks env.
2117
2118     This runs on master, primary and secondary nodes of the instance.
2119
2120     """
2121     env = {
2122       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2123       }
2124     env.update(_BuildInstanceHookEnvByObject(self.instance))
2125     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2126           list(self.instance.secondary_nodes))
2127     return env, nl, nl
2128
2129   def CheckPrereq(self):
2130     """Check prerequisites.
2131
2132     This checks that the instance is in the cluster.
2133
2134     """
2135     instance = self.cfg.GetInstanceInfo(
2136       self.cfg.ExpandInstanceName(self.op.instance_name))
2137     if instance is None:
2138       raise errors.OpPrereqError("Instance '%s' not known" %
2139                                  self.op.instance_name)
2140
2141     # check bridges existance
2142     _CheckInstanceBridgesExist(instance)
2143
2144     self.instance = instance
2145     self.op.instance_name = instance.name
2146
2147   def Exec(self, feedback_fn):
2148     """Reboot the instance.
2149
2150     """
2151     instance = self.instance
2152     ignore_secondaries = self.op.ignore_secondaries
2153     reboot_type = self.op.reboot_type
2154     extra_args = getattr(self.op, "extra_args", "")
2155
2156     node_current = instance.primary_node
2157
2158     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2159                            constants.INSTANCE_REBOOT_HARD,
2160                            constants.INSTANCE_REBOOT_FULL]:
2161       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2162                                   (constants.INSTANCE_REBOOT_SOFT,
2163                                    constants.INSTANCE_REBOOT_HARD,
2164                                    constants.INSTANCE_REBOOT_FULL))
2165
2166     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2167                        constants.INSTANCE_REBOOT_HARD]:
2168       if not rpc.call_instance_reboot(node_current, instance,
2169                                       reboot_type, extra_args):
2170         raise errors.OpExecError("Could not reboot instance")
2171     else:
2172       if not rpc.call_instance_shutdown(node_current, instance):
2173         raise errors.OpExecError("could not shutdown instance for full reboot")
2174       _ShutdownInstanceDisks(instance, self.cfg)
2175       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2176       if not rpc.call_instance_start(node_current, instance, extra_args):
2177         _ShutdownInstanceDisks(instance, self.cfg)
2178         raise errors.OpExecError("Could not start instance for full reboot")
2179
2180     self.cfg.MarkInstanceUp(instance.name)
2181
2182
2183 class LUShutdownInstance(LogicalUnit):
2184   """Shutdown an instance.
2185
2186   """
2187   HPATH = "instance-stop"
2188   HTYPE = constants.HTYPE_INSTANCE
2189   _OP_REQP = ["instance_name"]
2190
2191   def BuildHooksEnv(self):
2192     """Build hooks env.
2193
2194     This runs on master, primary and secondary nodes of the instance.
2195
2196     """
2197     env = _BuildInstanceHookEnvByObject(self.instance)
2198     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2199           list(self.instance.secondary_nodes))
2200     return env, nl, nl
2201
2202   def CheckPrereq(self):
2203     """Check prerequisites.
2204
2205     This checks that the instance is in the cluster.
2206
2207     """
2208     instance = self.cfg.GetInstanceInfo(
2209       self.cfg.ExpandInstanceName(self.op.instance_name))
2210     if instance is None:
2211       raise errors.OpPrereqError("Instance '%s' not known" %
2212                                  self.op.instance_name)
2213     self.instance = instance
2214
2215   def Exec(self, feedback_fn):
2216     """Shutdown the instance.
2217
2218     """
2219     instance = self.instance
2220     node_current = instance.primary_node
2221     self.cfg.MarkInstanceDown(instance.name)
2222     if not rpc.call_instance_shutdown(node_current, instance):
2223       logger.Error("could not shutdown instance")
2224
2225     _ShutdownInstanceDisks(instance, self.cfg)
2226
2227
2228 class LUReinstallInstance(LogicalUnit):
2229   """Reinstall an instance.
2230
2231   """
2232   HPATH = "instance-reinstall"
2233   HTYPE = constants.HTYPE_INSTANCE
2234   _OP_REQP = ["instance_name"]
2235
2236   def BuildHooksEnv(self):
2237     """Build hooks env.
2238
2239     This runs on master, primary and secondary nodes of the instance.
2240
2241     """
2242     env = _BuildInstanceHookEnvByObject(self.instance)
2243     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2244           list(self.instance.secondary_nodes))
2245     return env, nl, nl
2246
2247   def CheckPrereq(self):
2248     """Check prerequisites.
2249
2250     This checks that the instance is in the cluster and is not running.
2251
2252     """
2253     instance = self.cfg.GetInstanceInfo(
2254       self.cfg.ExpandInstanceName(self.op.instance_name))
2255     if instance is None:
2256       raise errors.OpPrereqError("Instance '%s' not known" %
2257                                  self.op.instance_name)
2258     if instance.disk_template == constants.DT_DISKLESS:
2259       raise errors.OpPrereqError("Instance '%s' has no disks" %
2260                                  self.op.instance_name)
2261     if instance.status != "down":
2262       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2263                                  self.op.instance_name)
2264     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2265     if remote_info:
2266       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2267                                  (self.op.instance_name,
2268                                   instance.primary_node))
2269
2270     self.op.os_type = getattr(self.op, "os_type", None)
2271     if self.op.os_type is not None:
2272       # OS verification
2273       pnode = self.cfg.GetNodeInfo(
2274         self.cfg.ExpandNodeName(instance.primary_node))
2275       if pnode is None:
2276         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2277                                    self.op.pnode)
2278       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2279       if not os_obj:
2280         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2281                                    " primary node"  % self.op.os_type)
2282
2283     self.instance = instance
2284
2285   def Exec(self, feedback_fn):
2286     """Reinstall the instance.
2287
2288     """
2289     inst = self.instance
2290
2291     if self.op.os_type is not None:
2292       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2293       inst.os = self.op.os_type
2294       self.cfg.AddInstance(inst)
2295
2296     _StartInstanceDisks(self.cfg, inst, None)
2297     try:
2298       feedback_fn("Running the instance OS create scripts...")
2299       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2300         raise errors.OpExecError("Could not install OS for instance %s"
2301                                  " on node %s" %
2302                                  (inst.name, inst.primary_node))
2303     finally:
2304       _ShutdownInstanceDisks(inst, self.cfg)
2305
2306
2307 class LURenameInstance(LogicalUnit):
2308   """Rename an instance.
2309
2310   """
2311   HPATH = "instance-rename"
2312   HTYPE = constants.HTYPE_INSTANCE
2313   _OP_REQP = ["instance_name", "new_name"]
2314
2315   def BuildHooksEnv(self):
2316     """Build hooks env.
2317
2318     This runs on master, primary and secondary nodes of the instance.
2319
2320     """
2321     env = _BuildInstanceHookEnvByObject(self.instance)
2322     env["INSTANCE_NEW_NAME"] = self.op.new_name
2323     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2324           list(self.instance.secondary_nodes))
2325     return env, nl, nl
2326
2327   def CheckPrereq(self):
2328     """Check prerequisites.
2329
2330     This checks that the instance is in the cluster and is not running.
2331
2332     """
2333     instance = self.cfg.GetInstanceInfo(
2334       self.cfg.ExpandInstanceName(self.op.instance_name))
2335     if instance is None:
2336       raise errors.OpPrereqError("Instance '%s' not known" %
2337                                  self.op.instance_name)
2338     if instance.status != "down":
2339       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2340                                  self.op.instance_name)
2341     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2342     if remote_info:
2343       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2344                                  (self.op.instance_name,
2345                                   instance.primary_node))
2346     self.instance = instance
2347
2348     # new name verification
2349     name_info = utils.HostInfo(self.op.new_name)
2350
2351     self.op.new_name = new_name = name_info.name
2352     instance_list = self.cfg.GetInstanceList()
2353     if new_name in instance_list:
2354       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2355                                  instance_name)
2356
2357     if not getattr(self.op, "ignore_ip", False):
2358       command = ["fping", "-q", name_info.ip]
2359       result = utils.RunCmd(command)
2360       if not result.failed:
2361         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2362                                    (name_info.ip, new_name))
2363
2364
2365   def Exec(self, feedback_fn):
2366     """Reinstall the instance.
2367
2368     """
2369     inst = self.instance
2370     old_name = inst.name
2371
2372     if inst.disk_template == constants.DT_FILE:
2373       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2374
2375     self.cfg.RenameInstance(inst.name, self.op.new_name)
2376
2377     # re-read the instance from the configuration after rename
2378     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2379
2380     if inst.disk_template == constants.DT_FILE:
2381       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2382       result = rpc.call_file_storage_dir_rename(inst.primary_node,
2383                                                 old_file_storage_dir,
2384                                                 new_file_storage_dir)
2385
2386       if not result:
2387         raise errors.OpExecError("Could not connect to node '%s' to rename"
2388                                  " directory '%s' to '%s' (but the instance"
2389                                  " has been renamed in Ganeti)" % (
2390                                  inst.primary_node, old_file_storage_dir,
2391                                  new_file_storage_dir))
2392
2393       if not result[0]:
2394         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2395                                  " (but the instance has been renamed in"
2396                                  " Ganeti)" % (old_file_storage_dir,
2397                                                new_file_storage_dir))
2398
2399     _StartInstanceDisks(self.cfg, inst, None)
2400     try:
2401       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2402                                           "sda", "sdb"):
2403         msg = ("Could run OS rename script for instance %s on node %s (but the"
2404                " instance has been renamed in Ganeti)" %
2405                (inst.name, inst.primary_node))
2406         logger.Error(msg)
2407     finally:
2408       _ShutdownInstanceDisks(inst, self.cfg)
2409
2410
2411 class LURemoveInstance(LogicalUnit):
2412   """Remove an instance.
2413
2414   """
2415   HPATH = "instance-remove"
2416   HTYPE = constants.HTYPE_INSTANCE
2417   _OP_REQP = ["instance_name"]
2418
2419   def BuildHooksEnv(self):
2420     """Build hooks env.
2421
2422     This runs on master, primary and secondary nodes of the instance.
2423
2424     """
2425     env = _BuildInstanceHookEnvByObject(self.instance)
2426     nl = [self.sstore.GetMasterNode()]
2427     return env, nl, nl
2428
2429   def CheckPrereq(self):
2430     """Check prerequisites.
2431
2432     This checks that the instance is in the cluster.
2433
2434     """
2435     instance = self.cfg.GetInstanceInfo(
2436       self.cfg.ExpandInstanceName(self.op.instance_name))
2437     if instance is None:
2438       raise errors.OpPrereqError("Instance '%s' not known" %
2439                                  self.op.instance_name)
2440     self.instance = instance
2441
2442   def Exec(self, feedback_fn):
2443     """Remove the instance.
2444
2445     """
2446     instance = self.instance
2447     logger.Info("shutting down instance %s on node %s" %
2448                 (instance.name, instance.primary_node))
2449
2450     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2451       if self.op.ignore_failures:
2452         feedback_fn("Warning: can't shutdown instance")
2453       else:
2454         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2455                                  (instance.name, instance.primary_node))
2456
2457     logger.Info("removing block devices for instance %s" % instance.name)
2458
2459     if not _RemoveDisks(instance, self.cfg):
2460       if self.op.ignore_failures:
2461         feedback_fn("Warning: can't remove instance's disks")
2462       else:
2463         raise errors.OpExecError("Can't remove instance's disks")
2464
2465     logger.Info("removing instance %s out of cluster config" % instance.name)
2466
2467     self.cfg.RemoveInstance(instance.name)
2468
2469
2470 class LUQueryInstances(NoHooksLU):
2471   """Logical unit for querying instances.
2472
2473   """
2474   _OP_REQP = ["output_fields", "names"]
2475
2476   def CheckPrereq(self):
2477     """Check prerequisites.
2478
2479     This checks that the fields required are valid output fields.
2480
2481     """
2482     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2483     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2484                                "admin_state", "admin_ram",
2485                                "disk_template", "ip", "mac", "bridge",
2486                                "sda_size", "sdb_size", "vcpus"],
2487                        dynamic=self.dynamic_fields,
2488                        selected=self.op.output_fields)
2489
2490     self.wanted = _GetWantedInstances(self, self.op.names)
2491
2492   def Exec(self, feedback_fn):
2493     """Computes the list of nodes and their attributes.
2494
2495     """
2496     instance_names = self.wanted
2497     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2498                      in instance_names]
2499
2500     # begin data gathering
2501
2502     nodes = frozenset([inst.primary_node for inst in instance_list])
2503
2504     bad_nodes = []
2505     if self.dynamic_fields.intersection(self.op.output_fields):
2506       live_data = {}
2507       node_data = rpc.call_all_instances_info(nodes)
2508       for name in nodes:
2509         result = node_data[name]
2510         if result:
2511           live_data.update(result)
2512         elif result == False:
2513           bad_nodes.append(name)
2514         # else no instance is alive
2515     else:
2516       live_data = dict([(name, {}) for name in instance_names])
2517
2518     # end data gathering
2519
2520     output = []
2521     for instance in instance_list:
2522       iout = []
2523       for field in self.op.output_fields:
2524         if field == "name":
2525           val = instance.name
2526         elif field == "os":
2527           val = instance.os
2528         elif field == "pnode":
2529           val = instance.primary_node
2530         elif field == "snodes":
2531           val = list(instance.secondary_nodes)
2532         elif field == "admin_state":
2533           val = (instance.status != "down")
2534         elif field == "oper_state":
2535           if instance.primary_node in bad_nodes:
2536             val = None
2537           else:
2538             val = bool(live_data.get(instance.name))
2539         elif field == "status":
2540           if instance.primary_node in bad_nodes:
2541             val = "ERROR_nodedown"
2542           else:
2543             running = bool(live_data.get(instance.name))
2544             if running:
2545               if instance.status != "down":
2546                 val = "running"
2547               else:
2548                 val = "ERROR_up"
2549             else:
2550               if instance.status != "down":
2551                 val = "ERROR_down"
2552               else:
2553                 val = "ADMIN_down"
2554         elif field == "admin_ram":
2555           val = instance.memory
2556         elif field == "oper_ram":
2557           if instance.primary_node in bad_nodes:
2558             val = None
2559           elif instance.name in live_data:
2560             val = live_data[instance.name].get("memory", "?")
2561           else:
2562             val = "-"
2563         elif field == "disk_template":
2564           val = instance.disk_template
2565         elif field == "ip":
2566           val = instance.nics[0].ip
2567         elif field == "bridge":
2568           val = instance.nics[0].bridge
2569         elif field == "mac":
2570           val = instance.nics[0].mac
2571         elif field == "sda_size" or field == "sdb_size":
2572           disk = instance.FindDisk(field[:3])
2573           if disk is None:
2574             val = None
2575           else:
2576             val = disk.size
2577         elif field == "vcpus":
2578           val = instance.vcpus
2579         else:
2580           raise errors.ParameterError(field)
2581         iout.append(val)
2582       output.append(iout)
2583
2584     return output
2585
2586
2587 class LUFailoverInstance(LogicalUnit):
2588   """Failover an instance.
2589
2590   """
2591   HPATH = "instance-failover"
2592   HTYPE = constants.HTYPE_INSTANCE
2593   _OP_REQP = ["instance_name", "ignore_consistency"]
2594
2595   def BuildHooksEnv(self):
2596     """Build hooks env.
2597
2598     This runs on master, primary and secondary nodes of the instance.
2599
2600     """
2601     env = {
2602       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2603       }
2604     env.update(_BuildInstanceHookEnvByObject(self.instance))
2605     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2606     return env, nl, nl
2607
2608   def CheckPrereq(self):
2609     """Check prerequisites.
2610
2611     This checks that the instance is in the cluster.
2612
2613     """
2614     instance = self.cfg.GetInstanceInfo(
2615       self.cfg.ExpandInstanceName(self.op.instance_name))
2616     if instance is None:
2617       raise errors.OpPrereqError("Instance '%s' not known" %
2618                                  self.op.instance_name)
2619
2620     if instance.disk_template not in constants.DTS_NET_MIRROR:
2621       raise errors.OpPrereqError("Instance's disk layout is not"
2622                                  " network mirrored, cannot failover.")
2623
2624     secondary_nodes = instance.secondary_nodes
2625     if not secondary_nodes:
2626       raise errors.ProgrammerError("no secondary node but using "
2627                                    "DT_REMOTE_RAID1 template")
2628
2629     target_node = secondary_nodes[0]
2630     # check memory requirements on the secondary node
2631     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2632                          instance.name, instance.memory)
2633
2634     # check bridge existance
2635     brlist = [nic.bridge for nic in instance.nics]
2636     if not rpc.call_bridges_exist(target_node, brlist):
2637       raise errors.OpPrereqError("One or more target bridges %s does not"
2638                                  " exist on destination node '%s'" %
2639                                  (brlist, target_node))
2640
2641     self.instance = instance
2642
2643   def Exec(self, feedback_fn):
2644     """Failover an instance.
2645
2646     The failover is done by shutting it down on its present node and
2647     starting it on the secondary.
2648
2649     """
2650     instance = self.instance
2651
2652     source_node = instance.primary_node
2653     target_node = instance.secondary_nodes[0]
2654
2655     feedback_fn("* checking disk consistency between source and target")
2656     for dev in instance.disks:
2657       # for remote_raid1, these are md over drbd
2658       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2659         if instance.status == "up" and not self.op.ignore_consistency:
2660           raise errors.OpExecError("Disk %s is degraded on target node,"
2661                                    " aborting failover." % dev.iv_name)
2662
2663     feedback_fn("* shutting down instance on source node")
2664     logger.Info("Shutting down instance %s on node %s" %
2665                 (instance.name, source_node))
2666
2667     if not rpc.call_instance_shutdown(source_node, instance):
2668       if self.op.ignore_consistency:
2669         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2670                      " anyway. Please make sure node %s is down"  %
2671                      (instance.name, source_node, source_node))
2672       else:
2673         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2674                                  (instance.name, source_node))
2675
2676     feedback_fn("* deactivating the instance's disks on source node")
2677     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2678       raise errors.OpExecError("Can't shut down the instance's disks.")
2679
2680     instance.primary_node = target_node
2681     # distribute new instance config to the other nodes
2682     self.cfg.AddInstance(instance)
2683
2684     # Only start the instance if it's marked as up
2685     if instance.status == "up":
2686       feedback_fn("* activating the instance's disks on target node")
2687       logger.Info("Starting instance %s on node %s" %
2688                   (instance.name, target_node))
2689
2690       disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2691                                                ignore_secondaries=True)
2692       if not disks_ok:
2693         _ShutdownInstanceDisks(instance, self.cfg)
2694         raise errors.OpExecError("Can't activate the instance's disks")
2695
2696       feedback_fn("* starting the instance on the target node")
2697       if not rpc.call_instance_start(target_node, instance, None):
2698         _ShutdownInstanceDisks(instance, self.cfg)
2699         raise errors.OpExecError("Could not start instance %s on node %s." %
2700                                  (instance.name, target_node))
2701
2702
2703 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2704   """Create a tree of block devices on the primary node.
2705
2706   This always creates all devices.
2707
2708   """
2709   if device.children:
2710     for child in device.children:
2711       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2712         return False
2713
2714   cfg.SetDiskID(device, node)
2715   new_id = rpc.call_blockdev_create(node, device, device.size,
2716                                     instance.name, True, info)
2717   if not new_id:
2718     return False
2719   if device.physical_id is None:
2720     device.physical_id = new_id
2721   return True
2722
2723
2724 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2725   """Create a tree of block devices on a secondary node.
2726
2727   If this device type has to be created on secondaries, create it and
2728   all its children.
2729
2730   If not, just recurse to children keeping the same 'force' value.
2731
2732   """
2733   if device.CreateOnSecondary():
2734     force = True
2735   if device.children:
2736     for child in device.children:
2737       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2738                                         child, force, info):
2739         return False
2740
2741   if not force:
2742     return True
2743   cfg.SetDiskID(device, node)
2744   new_id = rpc.call_blockdev_create(node, device, device.size,
2745                                     instance.name, False, info)
2746   if not new_id:
2747     return False
2748   if device.physical_id is None:
2749     device.physical_id = new_id
2750   return True
2751
2752
2753 def _GenerateUniqueNames(cfg, exts):
2754   """Generate a suitable LV name.
2755
2756   This will generate a logical volume name for the given instance.
2757
2758   """
2759   results = []
2760   for val in exts:
2761     new_id = cfg.GenerateUniqueID()
2762     results.append("%s%s" % (new_id, val))
2763   return results
2764
2765
2766 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2767   """Generate a drbd device complete with its children.
2768
2769   """
2770   port = cfg.AllocatePort()
2771   vgname = cfg.GetVGName()
2772   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2773                           logical_id=(vgname, names[0]))
2774   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2775                           logical_id=(vgname, names[1]))
2776   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2777                           logical_id = (primary, secondary, port),
2778                           children = [dev_data, dev_meta])
2779   return drbd_dev
2780
2781
2782 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2783   """Generate a drbd8 device complete with its children.
2784
2785   """
2786   port = cfg.AllocatePort()
2787   vgname = cfg.GetVGName()
2788   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2789                           logical_id=(vgname, names[0]))
2790   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2791                           logical_id=(vgname, names[1]))
2792   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2793                           logical_id = (primary, secondary, port),
2794                           children = [dev_data, dev_meta],
2795                           iv_name=iv_name)
2796   return drbd_dev
2797
2798
2799 def _GenerateDiskTemplate(cfg, template_name,
2800                           instance_name, primary_node,
2801                           secondary_nodes, disk_sz, swap_sz,
2802                           file_storage_dir, file_driver):
2803   """Generate the entire disk layout for a given template type.
2804
2805   """
2806   #TODO: compute space requirements
2807
2808   vgname = cfg.GetVGName()
2809   if template_name == constants.DT_DISKLESS:
2810     disks = []
2811   elif template_name == constants.DT_PLAIN:
2812     if len(secondary_nodes) != 0:
2813       raise errors.ProgrammerError("Wrong template configuration")
2814
2815     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2816     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2817                            logical_id=(vgname, names[0]),
2818                            iv_name = "sda")
2819     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2820                            logical_id=(vgname, names[1]),
2821                            iv_name = "sdb")
2822     disks = [sda_dev, sdb_dev]
2823   elif template_name == constants.DT_DRBD8:
2824     if len(secondary_nodes) != 1:
2825       raise errors.ProgrammerError("Wrong template configuration")
2826     remote_node = secondary_nodes[0]
2827     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2828                                        ".sdb_data", ".sdb_meta"])
2829     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2830                                          disk_sz, names[0:2], "sda")
2831     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2832                                          swap_sz, names[2:4], "sdb")
2833     disks = [drbd_sda_dev, drbd_sdb_dev]
2834   elif template_name == constants.DT_FILE:
2835     if len(secondary_nodes) != 0:
2836       raise errors.ProgrammerError("Wrong template configuration")
2837
2838     file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2839                                 iv_name="sda", logical_id=(file_driver,
2840                                 "%s/sda" % file_storage_dir))
2841     file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2842                                 iv_name="sdb", logical_id=(file_driver,
2843                                 "%s/sdb" % file_storage_dir))
2844     disks = [file_sda_dev, file_sdb_dev]
2845   else:
2846     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2847   return disks
2848
2849
2850 def _GetInstanceInfoText(instance):
2851   """Compute that text that should be added to the disk's metadata.
2852
2853   """
2854   return "originstname+%s" % instance.name
2855
2856
2857 def _CreateDisks(cfg, instance):
2858   """Create all disks for an instance.
2859
2860   This abstracts away some work from AddInstance.
2861
2862   Args:
2863     instance: the instance object
2864
2865   Returns:
2866     True or False showing the success of the creation process
2867
2868   """
2869   info = _GetInstanceInfoText(instance)
2870
2871   if instance.disk_template == constants.DT_FILE:
2872     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2873     result = rpc.call_file_storage_dir_create(instance.primary_node,
2874                                               file_storage_dir)
2875
2876     if not result:
2877       logger.Error("Could not connect to node '%s'" % inst.primary_node)
2878       return False
2879
2880     if not result[0]:
2881       logger.Error("failed to create directory '%s'" % file_storage_dir)
2882       return False
2883
2884   for device in instance.disks:
2885     logger.Info("creating volume %s for instance %s" %
2886               (device.iv_name, instance.name))
2887     #HARDCODE
2888     for secondary_node in instance.secondary_nodes:
2889       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2890                                         device, False, info):
2891         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2892                      (device.iv_name, device, secondary_node))
2893         return False
2894     #HARDCODE
2895     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2896                                     instance, device, info):
2897       logger.Error("failed to create volume %s on primary!" %
2898                    device.iv_name)
2899       return False
2900   return True
2901
2902
2903 def _RemoveDisks(instance, cfg):
2904   """Remove all disks for an instance.
2905
2906   This abstracts away some work from `AddInstance()` and
2907   `RemoveInstance()`. Note that in case some of the devices couldn't
2908   be removed, the removal will continue with the other ones (compare
2909   with `_CreateDisks()`).
2910
2911   Args:
2912     instance: the instance object
2913
2914   Returns:
2915     True or False showing the success of the removal proces
2916
2917   """
2918   logger.Info("removing block devices for instance %s" % instance.name)
2919
2920   result = True
2921   for device in instance.disks:
2922     for node, disk in device.ComputeNodeTree(instance.primary_node):
2923       cfg.SetDiskID(disk, node)
2924       if not rpc.call_blockdev_remove(node, disk):
2925         logger.Error("could not remove block device %s on node %s,"
2926                      " continuing anyway" %
2927                      (device.iv_name, node))
2928         result = False
2929
2930   if instance.disk_template == constants.DT_FILE:
2931     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2932     if not rpc.call_file_storage_dir_remove(instance.primary_node,
2933                                             file_storage_dir):
2934       logger.Error("could not remove directory '%s'" % file_storage_dir)
2935       result = False
2936
2937   return result
2938
2939
2940 class LUCreateInstance(LogicalUnit):
2941   """Create an instance.
2942
2943   """
2944   HPATH = "instance-add"
2945   HTYPE = constants.HTYPE_INSTANCE
2946   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2947               "disk_template", "swap_size", "mode", "start", "vcpus",
2948               "wait_for_sync", "ip_check", "mac"]
2949
2950   def BuildHooksEnv(self):
2951     """Build hooks env.
2952
2953     This runs on master, primary and secondary nodes of the instance.
2954
2955     """
2956     env = {
2957       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2958       "INSTANCE_DISK_SIZE": self.op.disk_size,
2959       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2960       "INSTANCE_ADD_MODE": self.op.mode,
2961       }
2962     if self.op.mode == constants.INSTANCE_IMPORT:
2963       env["INSTANCE_SRC_NODE"] = self.op.src_node
2964       env["INSTANCE_SRC_PATH"] = self.op.src_path
2965       env["INSTANCE_SRC_IMAGE"] = self.src_image
2966
2967     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2968       primary_node=self.op.pnode,
2969       secondary_nodes=self.secondaries,
2970       status=self.instance_status,
2971       os_type=self.op.os_type,
2972       memory=self.op.mem_size,
2973       vcpus=self.op.vcpus,
2974       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2975     ))
2976
2977     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2978           self.secondaries)
2979     return env, nl, nl
2980
2981
2982   def CheckPrereq(self):
2983     """Check prerequisites.
2984
2985     """
2986     for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
2987       if not hasattr(self.op, attr):
2988         setattr(self.op, attr, None)
2989
2990     if self.op.mode not in (constants.INSTANCE_CREATE,
2991                             constants.INSTANCE_IMPORT):
2992       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2993                                  self.op.mode)
2994
2995     if (not self.cfg.GetVGName() and
2996         self.op.disk_template not in constants.DTS_NOT_LVM):
2997       raise errors.OpPrereqError("Cluster does not support lvm-based"
2998                                  " instances")
2999
3000     if self.op.mode == constants.INSTANCE_IMPORT:
3001       src_node = getattr(self.op, "src_node", None)
3002       src_path = getattr(self.op, "src_path", None)
3003       if src_node is None or src_path is None:
3004         raise errors.OpPrereqError("Importing an instance requires source"
3005                                    " node and path options")
3006       src_node_full = self.cfg.ExpandNodeName(src_node)
3007       if src_node_full is None:
3008         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3009       self.op.src_node = src_node = src_node_full
3010
3011       if not os.path.isabs(src_path):
3012         raise errors.OpPrereqError("The source path must be absolute")
3013
3014       export_info = rpc.call_export_info(src_node, src_path)
3015
3016       if not export_info:
3017         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3018
3019       if not export_info.has_section(constants.INISECT_EXP):
3020         raise errors.ProgrammerError("Corrupted export config")
3021
3022       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3023       if (int(ei_version) != constants.EXPORT_VERSION):
3024         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3025                                    (ei_version, constants.EXPORT_VERSION))
3026
3027       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3028         raise errors.OpPrereqError("Can't import instance with more than"
3029                                    " one data disk")
3030
3031       # FIXME: are the old os-es, disk sizes, etc. useful?
3032       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3033       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3034                                                          'disk0_dump'))
3035       self.src_image = diskimage
3036     else: # INSTANCE_CREATE
3037       if getattr(self.op, "os_type", None) is None:
3038         raise errors.OpPrereqError("No guest OS specified")
3039
3040     # check primary node
3041     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3042     if pnode is None:
3043       raise errors.OpPrereqError("Primary node '%s' is unknown" %
3044                                  self.op.pnode)
3045     self.op.pnode = pnode.name
3046     self.pnode = pnode
3047     self.secondaries = []
3048     # disk template and mirror node verification
3049     if self.op.disk_template not in constants.DISK_TEMPLATES:
3050       raise errors.OpPrereqError("Invalid disk template name")
3051
3052     if (self.op.file_driver and
3053         not self.op.file_driver in constants.FILE_DRIVER):
3054       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3055                                  self.op.file_driver)
3056
3057     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3058         raise errors.OpPrereqError("File storage directory not a relative"
3059                                    " path")
3060
3061     if self.op.disk_template in constants.DTS_NET_MIRROR:
3062       if getattr(self.op, "snode", None) is None:
3063         raise errors.OpPrereqError("The networked disk templates need"
3064                                    " a mirror node")
3065
3066       snode_name = self.cfg.ExpandNodeName(self.op.snode)
3067       if snode_name is None:
3068         raise errors.OpPrereqError("Unknown secondary node '%s'" %
3069                                    self.op.snode)
3070       elif snode_name == pnode.name:
3071         raise errors.OpPrereqError("The secondary node cannot be"
3072                                    " the primary node.")
3073       self.secondaries.append(snode_name)
3074
3075     # Required free disk space as a function of disk and swap space
3076     req_size_dict = {
3077       constants.DT_DISKLESS: None,
3078       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
3079       # 256 MB are added for drbd metadata, 128MB for each drbd device
3080       constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
3081       constants.DT_FILE: None,
3082     }
3083
3084     if self.op.disk_template not in req_size_dict:
3085       raise errors.ProgrammerError("Disk template '%s' size requirement"
3086                                    " is unknown" %  self.op.disk_template)
3087
3088     req_size = req_size_dict[self.op.disk_template]
3089
3090     # Check lv size requirements
3091     if req_size is not None:
3092       nodenames = [pnode.name] + self.secondaries
3093       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3094       for node in nodenames:
3095         info = nodeinfo.get(node, None)
3096         if not info:
3097           raise errors.OpPrereqError("Cannot get current information"
3098                                      " from node '%s'" % nodeinfo)
3099         vg_free = info.get('vg_free', None)
3100         if not isinstance(vg_free, int):
3101           raise errors.OpPrereqError("Can't compute free disk space on"
3102                                      " node %s" % node)
3103         if req_size > info['vg_free']:
3104           raise errors.OpPrereqError("Not enough disk space on target node %s."
3105                                      " %d MB available, %d MB required" %
3106                                      (node, info['vg_free'], req_size))
3107
3108     # os verification
3109     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3110     if not os_obj:
3111       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3112                                  " primary node"  % self.op.os_type)
3113
3114     if self.op.kernel_path == constants.VALUE_NONE:
3115       raise errors.OpPrereqError("Can't set instance kernel to none")
3116
3117     # instance verification
3118     hostname1 = utils.HostInfo(self.op.instance_name)
3119
3120     self.op.instance_name = instance_name = hostname1.name
3121     instance_list = self.cfg.GetInstanceList()
3122     if instance_name in instance_list:
3123       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3124                                  instance_name)
3125
3126     ip = getattr(self.op, "ip", None)
3127     if ip is None or ip.lower() == "none":
3128       inst_ip = None
3129     elif ip.lower() == "auto":
3130       inst_ip = hostname1.ip
3131     else:
3132       if not utils.IsValidIP(ip):
3133         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3134                                    " like a valid IP" % ip)
3135       inst_ip = ip
3136     self.inst_ip = inst_ip
3137
3138     if self.op.start and not self.op.ip_check:
3139       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3140                                  " adding an instance in start mode")
3141
3142     if self.op.ip_check:
3143       if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3144         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3145                                    (hostname1.ip, instance_name))
3146
3147     # MAC address verification
3148     if self.op.mac != "auto":
3149       if not utils.IsValidMac(self.op.mac.lower()):
3150         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3151                                    self.op.mac)
3152
3153     # bridge verification
3154     bridge = getattr(self.op, "bridge", None)
3155     if bridge is None:
3156       self.op.bridge = self.cfg.GetDefBridge()
3157     else:
3158       self.op.bridge = bridge
3159
3160     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3161       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3162                                  " destination node '%s'" %
3163                                  (self.op.bridge, pnode.name))
3164
3165     # boot order verification
3166     if self.op.hvm_boot_order is not None:
3167       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3168         raise errors.OpPrereqError("invalid boot order specified,"
3169                                    " must be one or more of [acdn]")
3170
3171     if self.op.start:
3172       self.instance_status = 'up'
3173     else:
3174       self.instance_status = 'down'
3175
3176   def Exec(self, feedback_fn):
3177     """Create and add the instance to the cluster.
3178
3179     """
3180     instance = self.op.instance_name
3181     pnode_name = self.pnode.name
3182
3183     if self.op.mac == "auto":
3184       mac_address = self.cfg.GenerateMAC()
3185     else:
3186       mac_address = self.op.mac
3187
3188     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3189     if self.inst_ip is not None:
3190       nic.ip = self.inst_ip
3191
3192     ht_kind = self.sstore.GetHypervisorType()
3193     if ht_kind in constants.HTS_REQ_PORT:
3194       network_port = self.cfg.AllocatePort()
3195     else:
3196       network_port = None
3197
3198     # build the full file storage dir path
3199     file_storage_dir = os.path.normpath(os.path.join(
3200                                         self.sstore.GetFileStorageDir(),
3201                                         self.op.file_storage_dir, instance))
3202
3203
3204     disks = _GenerateDiskTemplate(self.cfg,
3205                                   self.op.disk_template,
3206                                   instance, pnode_name,
3207                                   self.secondaries, self.op.disk_size,
3208                                   self.op.swap_size,
3209                                   file_storage_dir,
3210                                   self.op.file_driver)
3211
3212     iobj = objects.Instance(name=instance, os=self.op.os_type,
3213                             primary_node=pnode_name,
3214                             memory=self.op.mem_size,
3215                             vcpus=self.op.vcpus,
3216                             nics=[nic], disks=disks,
3217                             disk_template=self.op.disk_template,
3218                             status=self.instance_status,
3219                             network_port=network_port,
3220                             kernel_path=self.op.kernel_path,
3221                             initrd_path=self.op.initrd_path,
3222                             hvm_boot_order=self.op.hvm_boot_order,
3223                             )
3224
3225     feedback_fn("* creating instance disks...")
3226     if not _CreateDisks(self.cfg, iobj):
3227       _RemoveDisks(iobj, self.cfg)
3228       raise errors.OpExecError("Device creation failed, reverting...")
3229
3230     feedback_fn("adding instance %s to cluster config" % instance)
3231
3232     self.cfg.AddInstance(iobj)
3233
3234     if self.op.wait_for_sync:
3235       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3236     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3237       # make sure the disks are not degraded (still sync-ing is ok)
3238       time.sleep(15)
3239       feedback_fn("* checking mirrors status")
3240       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3241     else:
3242       disk_abort = False
3243
3244     if disk_abort:
3245       _RemoveDisks(iobj, self.cfg)
3246       self.cfg.RemoveInstance(iobj.name)
3247       raise errors.OpExecError("There are some degraded disks for"
3248                                " this instance")
3249
3250     feedback_fn("creating os for instance %s on node %s" %
3251                 (instance, pnode_name))
3252
3253     if iobj.disk_template != constants.DT_DISKLESS:
3254       if self.op.mode == constants.INSTANCE_CREATE:
3255         feedback_fn("* running the instance OS create scripts...")
3256         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3257           raise errors.OpExecError("could not add os for instance %s"
3258                                    " on node %s" %
3259                                    (instance, pnode_name))
3260
3261       elif self.op.mode == constants.INSTANCE_IMPORT:
3262         feedback_fn("* running the instance OS import scripts...")
3263         src_node = self.op.src_node
3264         src_image = self.src_image
3265         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3266                                                 src_node, src_image):
3267           raise errors.OpExecError("Could not import os for instance"
3268                                    " %s on node %s" %
3269                                    (instance, pnode_name))
3270       else:
3271         # also checked in the prereq part
3272         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3273                                      % self.op.mode)
3274
3275     if self.op.start:
3276       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3277       feedback_fn("* starting instance...")
3278       if not rpc.call_instance_start(pnode_name, iobj, None):
3279         raise errors.OpExecError("Could not start instance")
3280
3281
3282 class LUConnectConsole(NoHooksLU):
3283   """Connect to an instance's console.
3284
3285   This is somewhat special in that it returns the command line that
3286   you need to run on the master node in order to connect to the
3287   console.
3288
3289   """
3290   _OP_REQP = ["instance_name"]
3291
3292   def CheckPrereq(self):
3293     """Check prerequisites.
3294
3295     This checks that the instance is in the cluster.
3296
3297     """
3298     instance = self.cfg.GetInstanceInfo(
3299       self.cfg.ExpandInstanceName(self.op.instance_name))
3300     if instance is None:
3301       raise errors.OpPrereqError("Instance '%s' not known" %
3302                                  self.op.instance_name)
3303     self.instance = instance
3304
3305   def Exec(self, feedback_fn):
3306     """Connect to the console of an instance
3307
3308     """
3309     instance = self.instance
3310     node = instance.primary_node
3311
3312     node_insts = rpc.call_instance_list([node])[node]
3313     if node_insts is False:
3314       raise errors.OpExecError("Can't connect to node %s." % node)
3315
3316     if instance.name not in node_insts:
3317       raise errors.OpExecError("Instance %s is not running." % instance.name)
3318
3319     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3320
3321     hyper = hypervisor.GetHypervisor()
3322     console_cmd = hyper.GetShellCommandForConsole(instance)
3323
3324     # build ssh cmdline
3325     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3326
3327
3328 class LUReplaceDisks(LogicalUnit):
3329   """Replace the disks of an instance.
3330
3331   """
3332   HPATH = "mirrors-replace"
3333   HTYPE = constants.HTYPE_INSTANCE
3334   _OP_REQP = ["instance_name", "mode", "disks"]
3335
3336   def BuildHooksEnv(self):
3337     """Build hooks env.
3338
3339     This runs on the master, the primary and all the secondaries.
3340
3341     """
3342     env = {
3343       "MODE": self.op.mode,
3344       "NEW_SECONDARY": self.op.remote_node,
3345       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3346       }
3347     env.update(_BuildInstanceHookEnvByObject(self.instance))
3348     nl = [
3349       self.sstore.GetMasterNode(),
3350       self.instance.primary_node,
3351       ]
3352     if self.op.remote_node is not None:
3353       nl.append(self.op.remote_node)
3354     return env, nl, nl
3355
3356   def CheckPrereq(self):
3357     """Check prerequisites.
3358
3359     This checks that the instance is in the cluster.
3360
3361     """
3362     instance = self.cfg.GetInstanceInfo(
3363       self.cfg.ExpandInstanceName(self.op.instance_name))
3364     if instance is None:
3365       raise errors.OpPrereqError("Instance '%s' not known" %
3366                                  self.op.instance_name)
3367     self.instance = instance
3368     self.op.instance_name = instance.name
3369
3370     if instance.disk_template not in constants.DTS_NET_MIRROR:
3371       raise errors.OpPrereqError("Instance's disk layout is not"
3372                                  " network mirrored.")
3373
3374     if len(instance.secondary_nodes) != 1:
3375       raise errors.OpPrereqError("The instance has a strange layout,"
3376                                  " expected one secondary but found %d" %
3377                                  len(instance.secondary_nodes))
3378
3379     self.sec_node = instance.secondary_nodes[0]
3380
3381     remote_node = getattr(self.op, "remote_node", None)
3382     if remote_node is not None:
3383       remote_node = self.cfg.ExpandNodeName(remote_node)
3384       if remote_node is None:
3385         raise errors.OpPrereqError("Node '%s' not known" %
3386                                    self.op.remote_node)
3387       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3388     else:
3389       self.remote_node_info = None
3390     if remote_node == instance.primary_node:
3391       raise errors.OpPrereqError("The specified node is the primary node of"
3392                                  " the instance.")
3393     elif remote_node == self.sec_node:
3394       if self.op.mode == constants.REPLACE_DISK_SEC:
3395         # this is for DRBD8, where we can't execute the same mode of
3396         # replacement as for drbd7 (no different port allocated)
3397         raise errors.OpPrereqError("Same secondary given, cannot execute"
3398                                    " replacement")
3399       # the user gave the current secondary, switch to
3400       # 'no-replace-secondary' mode for drbd7
3401       remote_node = None
3402     if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3403         self.op.mode != constants.REPLACE_DISK_ALL):
3404       raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3405                                  " disks replacement, not individual ones")
3406     if instance.disk_template == constants.DT_DRBD8:
3407       if (self.op.mode == constants.REPLACE_DISK_ALL and
3408           remote_node is not None):
3409         # switch to replace secondary mode
3410         self.op.mode = constants.REPLACE_DISK_SEC
3411
3412       if self.op.mode == constants.REPLACE_DISK_ALL:
3413         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3414                                    " secondary disk replacement, not"
3415                                    " both at once")
3416       elif self.op.mode == constants.REPLACE_DISK_PRI:
3417         if remote_node is not None:
3418           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3419                                      " the secondary while doing a primary"
3420                                      " node disk replacement")
3421         self.tgt_node = instance.primary_node
3422         self.oth_node = instance.secondary_nodes[0]
3423       elif self.op.mode == constants.REPLACE_DISK_SEC:
3424         self.new_node = remote_node # this can be None, in which case
3425                                     # we don't change the secondary
3426         self.tgt_node = instance.secondary_nodes[0]
3427         self.oth_node = instance.primary_node
3428       else:
3429         raise errors.ProgrammerError("Unhandled disk replace mode")
3430
3431     for name in self.op.disks:
3432       if instance.FindDisk(name) is None:
3433         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3434                                    (name, instance.name))
3435     self.op.remote_node = remote_node
3436
3437   def _ExecRR1(self, feedback_fn):
3438     """Replace the disks of an instance.
3439
3440     """
3441     instance = self.instance
3442     iv_names = {}
3443     # start of work
3444     if self.op.remote_node is None:
3445       remote_node = self.sec_node
3446     else:
3447       remote_node = self.op.remote_node
3448     cfg = self.cfg
3449     for dev in instance.disks:
3450       size = dev.size
3451       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3452       names = _GenerateUniqueNames(cfg, lv_names)
3453       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3454                                        remote_node, size, names)
3455       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3456       logger.Info("adding new mirror component on secondary for %s" %
3457                   dev.iv_name)
3458       #HARDCODE
3459       if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3460                                         new_drbd, False,
3461                                         _GetInstanceInfoText(instance)):
3462         raise errors.OpExecError("Failed to create new component on secondary"
3463                                  " node %s. Full abort, cleanup manually!" %
3464                                  remote_node)
3465
3466       logger.Info("adding new mirror component on primary")
3467       #HARDCODE
3468       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3469                                       instance, new_drbd,
3470                                       _GetInstanceInfoText(instance)):
3471         # remove secondary dev
3472         cfg.SetDiskID(new_drbd, remote_node)
3473         rpc.call_blockdev_remove(remote_node, new_drbd)
3474         raise errors.OpExecError("Failed to create volume on primary!"
3475                                  " Full abort, cleanup manually!!")
3476
3477       # the device exists now
3478       # call the primary node to add the mirror to md
3479       logger.Info("adding new mirror component to md")
3480       if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3481                                            [new_drbd]):
3482         logger.Error("Can't add mirror compoment to md!")
3483         cfg.SetDiskID(new_drbd, remote_node)
3484         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3485           logger.Error("Can't rollback on secondary")
3486         cfg.SetDiskID(new_drbd, instance.primary_node)
3487         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3488           logger.Error("Can't rollback on primary")
3489         raise errors.OpExecError("Full abort, cleanup manually!!")
3490
3491       dev.children.append(new_drbd)
3492       cfg.AddInstance(instance)
3493
3494     # this can fail as the old devices are degraded and _WaitForSync
3495     # does a combined result over all disks, so we don't check its
3496     # return value
3497     _WaitForSync(cfg, instance, self.proc, unlock=True)
3498
3499     # so check manually all the devices
3500     for name in iv_names:
3501       dev, child, new_drbd = iv_names[name]
3502       cfg.SetDiskID(dev, instance.primary_node)
3503       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3504       if is_degr:
3505         raise errors.OpExecError("MD device %s is degraded!" % name)
3506       cfg.SetDiskID(new_drbd, instance.primary_node)
3507       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3508       if is_degr:
3509         raise errors.OpExecError("New drbd device %s is degraded!" % name)
3510
3511     for name in iv_names:
3512       dev, child, new_drbd = iv_names[name]
3513       logger.Info("remove mirror %s component" % name)
3514       cfg.SetDiskID(dev, instance.primary_node)
3515       if not rpc.call_blockdev_removechildren(instance.primary_node,
3516                                               dev, [child]):
3517         logger.Error("Can't remove child from mirror, aborting"
3518                      " *this device cleanup*.\nYou need to cleanup manually!!")
3519         continue
3520
3521       for node in child.logical_id[:2]:
3522         logger.Info("remove child device on %s" % node)
3523         cfg.SetDiskID(child, node)
3524         if not rpc.call_blockdev_remove(node, child):
3525           logger.Error("Warning: failed to remove device from node %s,"
3526                        " continuing operation." % node)
3527
3528       dev.children.remove(child)
3529
3530       cfg.AddInstance(instance)
3531
3532   def _ExecD8DiskOnly(self, feedback_fn):
3533     """Replace a disk on the primary or secondary for dbrd8.
3534
3535     The algorithm for replace is quite complicated:
3536       - for each disk to be replaced:
3537         - create new LVs on the target node with unique names
3538         - detach old LVs from the drbd device
3539         - rename old LVs to name_replaced.<time_t>
3540         - rename new LVs to old LVs
3541         - attach the new LVs (with the old names now) to the drbd device
3542       - wait for sync across all devices
3543       - for each modified disk:
3544         - remove old LVs (which have the name name_replaces.<time_t>)
3545
3546     Failures are not very well handled.
3547
3548     """
3549     steps_total = 6
3550     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3551     instance = self.instance
3552     iv_names = {}
3553     vgname = self.cfg.GetVGName()
3554     # start of work
3555     cfg = self.cfg
3556     tgt_node = self.tgt_node
3557     oth_node = self.oth_node
3558
3559     # Step: check device activation
3560     self.proc.LogStep(1, steps_total, "check device existence")
3561     info("checking volume groups")
3562     my_vg = cfg.GetVGName()
3563     results = rpc.call_vg_list([oth_node, tgt_node])
3564     if not results:
3565       raise errors.OpExecError("Can't list volume groups on the nodes")
3566     for node in oth_node, tgt_node:
3567       res = results.get(node, False)
3568       if not res or my_vg not in res:
3569         raise errors.OpExecError("Volume group '%s' not found on %s" %
3570                                  (my_vg, node))
3571     for dev in instance.disks:
3572       if not dev.iv_name in self.op.disks:
3573         continue
3574       for node in tgt_node, oth_node:
3575         info("checking %s on %s" % (dev.iv_name, node))
3576         cfg.SetDiskID(dev, node)
3577         if not rpc.call_blockdev_find(node, dev):
3578           raise errors.OpExecError("Can't find device %s on node %s" %
3579                                    (dev.iv_name, node))
3580
3581     # Step: check other node consistency
3582     self.proc.LogStep(2, steps_total, "check peer consistency")
3583     for dev in instance.disks:
3584       if not dev.iv_name in self.op.disks:
3585         continue
3586       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3587       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3588                                    oth_node==instance.primary_node):
3589         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3590                                  " to replace disks on this node (%s)" %
3591                                  (oth_node, tgt_node))
3592
3593     # Step: create new storage
3594     self.proc.LogStep(3, steps_total, "allocate new storage")
3595     for dev in instance.disks:
3596       if not dev.iv_name in self.op.disks:
3597         continue
3598       size = dev.size
3599       cfg.SetDiskID(dev, tgt_node)
3600       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3601       names = _GenerateUniqueNames(cfg, lv_names)
3602       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3603                              logical_id=(vgname, names[0]))
3604       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3605                              logical_id=(vgname, names[1]))
3606       new_lvs = [lv_data, lv_meta]
3607       old_lvs = dev.children
3608       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3609       info("creating new local storage on %s for %s" %
3610            (tgt_node, dev.iv_name))
3611       # since we *always* want to create this LV, we use the
3612       # _Create...OnPrimary (which forces the creation), even if we
3613       # are talking about the secondary node
3614       for new_lv in new_lvs:
3615         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3616                                         _GetInstanceInfoText(instance)):
3617           raise errors.OpExecError("Failed to create new LV named '%s' on"
3618                                    " node '%s'" %
3619                                    (new_lv.logical_id[1], tgt_node))
3620
3621     # Step: for each lv, detach+rename*2+attach
3622     self.proc.LogStep(4, steps_total, "change drbd configuration")
3623     for dev, old_lvs, new_lvs in iv_names.itervalues():
3624       info("detaching %s drbd from local storage" % dev.iv_name)
3625       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3626         raise errors.OpExecError("Can't detach drbd from local storage on node"
3627                                  " %s for device %s" % (tgt_node, dev.iv_name))
3628       #dev.children = []
3629       #cfg.Update(instance)
3630
3631       # ok, we created the new LVs, so now we know we have the needed
3632       # storage; as such, we proceed on the target node to rename
3633       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3634       # using the assumption that logical_id == physical_id (which in
3635       # turn is the unique_id on that node)
3636
3637       # FIXME(iustin): use a better name for the replaced LVs
3638       temp_suffix = int(time.time())
3639       ren_fn = lambda d, suff: (d.physical_id[0],
3640                                 d.physical_id[1] + "_replaced-%s" % suff)
3641       # build the rename list based on what LVs exist on the node
3642       rlist = []
3643       for to_ren in old_lvs:
3644         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3645         if find_res is not None: # device exists
3646           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3647
3648       info("renaming the old LVs on the target node")
3649       if not rpc.call_blockdev_rename(tgt_node, rlist):
3650         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3651       # now we rename the new LVs to the old LVs
3652       info("renaming the new LVs on the target node")
3653       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3654       if not rpc.call_blockdev_rename(tgt_node, rlist):
3655         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3656
3657       for old, new in zip(old_lvs, new_lvs):
3658         new.logical_id = old.logical_id
3659         cfg.SetDiskID(new, tgt_node)
3660
3661       for disk in old_lvs:
3662         disk.logical_id = ren_fn(disk, temp_suffix)
3663         cfg.SetDiskID(disk, tgt_node)
3664
3665       # now that the new lvs have the old name, we can add them to the device
3666       info("adding new mirror component on %s" % tgt_node)
3667       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3668         for new_lv in new_lvs:
3669           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3670             warning("Can't rollback device %s", hint="manually cleanup unused"
3671                     " logical volumes")
3672         raise errors.OpExecError("Can't add local storage to drbd")
3673
3674       dev.children = new_lvs
3675       cfg.Update(instance)
3676
3677     # Step: wait for sync
3678
3679     # this can fail as the old devices are degraded and _WaitForSync
3680     # does a combined result over all disks, so we don't check its
3681     # return value
3682     self.proc.LogStep(5, steps_total, "sync devices")
3683     _WaitForSync(cfg, instance, self.proc, unlock=True)
3684
3685     # so check manually all the devices
3686     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3687       cfg.SetDiskID(dev, instance.primary_node)
3688       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3689       if is_degr:
3690         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3691
3692     # Step: remove old storage
3693     self.proc.LogStep(6, steps_total, "removing old storage")
3694     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3695       info("remove logical volumes for %s" % name)
3696       for lv in old_lvs:
3697         cfg.SetDiskID(lv, tgt_node)
3698         if not rpc.call_blockdev_remove(tgt_node, lv):
3699           warning("Can't remove old LV", hint="manually remove unused LVs")
3700           continue
3701
3702   def _ExecD8Secondary(self, feedback_fn):
3703     """Replace the secondary node for drbd8.
3704
3705     The algorithm for replace is quite complicated:
3706       - for all disks of the instance:
3707         - create new LVs on the new node with same names
3708         - shutdown the drbd device on the old secondary
3709         - disconnect the drbd network on the primary
3710         - create the drbd device on the new secondary
3711         - network attach the drbd on the primary, using an artifice:
3712           the drbd code for Attach() will connect to the network if it
3713           finds a device which is connected to the good local disks but
3714           not network enabled
3715       - wait for sync across all devices
3716       - remove all disks from the old secondary
3717
3718     Failures are not very well handled.
3719
3720     """
3721     steps_total = 6
3722     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3723     instance = self.instance
3724     iv_names = {}
3725     vgname = self.cfg.GetVGName()
3726     # start of work
3727     cfg = self.cfg
3728     old_node = self.tgt_node
3729     new_node = self.new_node
3730     pri_node = instance.primary_node
3731
3732     # Step: check device activation
3733     self.proc.LogStep(1, steps_total, "check device existence")
3734     info("checking volume groups")
3735     my_vg = cfg.GetVGName()
3736     results = rpc.call_vg_list([pri_node, new_node])
3737     if not results:
3738       raise errors.OpExecError("Can't list volume groups on the nodes")
3739     for node in pri_node, new_node:
3740       res = results.get(node, False)
3741       if not res or my_vg not in res:
3742         raise errors.OpExecError("Volume group '%s' not found on %s" %
3743                                  (my_vg, node))
3744     for dev in instance.disks:
3745       if not dev.iv_name in self.op.disks:
3746         continue
3747       info("checking %s on %s" % (dev.iv_name, pri_node))
3748       cfg.SetDiskID(dev, pri_node)
3749       if not rpc.call_blockdev_find(pri_node, dev):
3750         raise errors.OpExecError("Can't find device %s on node %s" %
3751                                  (dev.iv_name, pri_node))
3752
3753     # Step: check other node consistency
3754     self.proc.LogStep(2, steps_total, "check peer consistency")
3755     for dev in instance.disks:
3756       if not dev.iv_name in self.op.disks:
3757         continue
3758       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3759       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3760         raise errors.OpExecError("Primary node (%s) has degraded storage,"
3761                                  " unsafe to replace the secondary" %
3762                                  pri_node)
3763
3764     # Step: create new storage
3765     self.proc.LogStep(3, steps_total, "allocate new storage")
3766     for dev in instance.disks:
3767       size = dev.size
3768       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3769       # since we *always* want to create this LV, we use the
3770       # _Create...OnPrimary (which forces the creation), even if we
3771       # are talking about the secondary node
3772       for new_lv in dev.children:
3773         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3774                                         _GetInstanceInfoText(instance)):
3775           raise errors.OpExecError("Failed to create new LV named '%s' on"
3776                                    " node '%s'" %
3777                                    (new_lv.logical_id[1], new_node))
3778
3779       iv_names[dev.iv_name] = (dev, dev.children)
3780
3781     self.proc.LogStep(4, steps_total, "changing drbd configuration")
3782     for dev in instance.disks:
3783       size = dev.size
3784       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3785       # create new devices on new_node
3786       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3787                               logical_id=(pri_node, new_node,
3788                                           dev.logical_id[2]),
3789                               children=dev.children)
3790       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3791                                         new_drbd, False,
3792                                       _GetInstanceInfoText(instance)):
3793         raise errors.OpExecError("Failed to create new DRBD on"
3794                                  " node '%s'" % new_node)
3795
3796     for dev in instance.disks:
3797       # we have new devices, shutdown the drbd on the old secondary
3798       info("shutting down drbd for %s on old node" % dev.iv_name)
3799       cfg.SetDiskID(dev, old_node)
3800       if not rpc.call_blockdev_shutdown(old_node, dev):
3801         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3802                 hint="Please cleanup this device manually as soon as possible")
3803
3804     info("detaching primary drbds from the network (=> standalone)")
3805     done = 0
3806     for dev in instance.disks:
3807       cfg.SetDiskID(dev, pri_node)
3808       # set the physical (unique in bdev terms) id to None, meaning
3809       # detach from network
3810       dev.physical_id = (None,) * len(dev.physical_id)
3811       # and 'find' the device, which will 'fix' it to match the
3812       # standalone state
3813       if rpc.call_blockdev_find(pri_node, dev):
3814         done += 1
3815       else:
3816         warning("Failed to detach drbd %s from network, unusual case" %
3817                 dev.iv_name)
3818
3819     if not done:
3820       # no detaches succeeded (very unlikely)
3821       raise errors.OpExecError("Can't detach at least one DRBD from old node")
3822
3823     # if we managed to detach at least one, we update all the disks of
3824     # the instance to point to the new secondary
3825     info("updating instance configuration")
3826     for dev in instance.disks:
3827       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3828       cfg.SetDiskID(dev, pri_node)
3829     cfg.Update(instance)
3830
3831     # and now perform the drbd attach
3832     info("attaching primary drbds to new secondary (standalone => connected)")
3833     failures = []
3834     for dev in instance.disks:
3835       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3836       # since the attach is smart, it's enough to 'find' the device,
3837       # it will automatically activate the network, if the physical_id
3838       # is correct
3839       cfg.SetDiskID(dev, pri_node)
3840       if not rpc.call_blockdev_find(pri_node, dev):
3841         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3842                 "please do a gnt-instance info to see the status of disks")
3843
3844     # this can fail as the old devices are degraded and _WaitForSync
3845     # does a combined result over all disks, so we don't check its
3846     # return value
3847     self.proc.LogStep(5, steps_total, "sync devices")
3848     _WaitForSync(cfg, instance, self.proc, unlock=True)
3849
3850     # so check manually all the devices
3851     for name, (dev, old_lvs) in iv_names.iteritems():
3852       cfg.SetDiskID(dev, pri_node)
3853       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3854       if is_degr:
3855         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3856
3857     self.proc.LogStep(6, steps_total, "removing old storage")
3858     for name, (dev, old_lvs) in iv_names.iteritems():
3859       info("remove logical volumes for %s" % name)
3860       for lv in old_lvs:
3861         cfg.SetDiskID(lv, old_node)
3862         if not rpc.call_blockdev_remove(old_node, lv):
3863           warning("Can't remove LV on old secondary",
3864                   hint="Cleanup stale volumes by hand")
3865
3866   def Exec(self, feedback_fn):
3867     """Execute disk replacement.
3868
3869     This dispatches the disk replacement to the appropriate handler.
3870
3871     """
3872     instance = self.instance
3873     if instance.disk_template == constants.DT_REMOTE_RAID1:
3874       fn = self._ExecRR1
3875     elif instance.disk_template == constants.DT_DRBD8:
3876       if self.op.remote_node is None:
3877         fn = self._ExecD8DiskOnly
3878       else:
3879         fn = self._ExecD8Secondary
3880     else:
3881       raise errors.ProgrammerError("Unhandled disk replacement case")
3882     return fn(feedback_fn)
3883
3884
3885 class LUQueryInstanceData(NoHooksLU):
3886   """Query runtime instance data.
3887
3888   """
3889   _OP_REQP = ["instances"]
3890
3891   def CheckPrereq(self):
3892     """Check prerequisites.
3893
3894     This only checks the optional instance list against the existing names.
3895
3896     """
3897     if not isinstance(self.op.instances, list):
3898       raise errors.OpPrereqError("Invalid argument type 'instances'")
3899     if self.op.instances:
3900       self.wanted_instances = []
3901       names = self.op.instances
3902       for name in names:
3903         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3904         if instance is None:
3905           raise errors.OpPrereqError("No such instance name '%s'" % name)
3906         self.wanted_instances.append(instance)
3907     else:
3908       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3909                                in self.cfg.GetInstanceList()]
3910     return
3911
3912
3913   def _ComputeDiskStatus(self, instance, snode, dev):
3914     """Compute block device status.
3915
3916     """
3917     self.cfg.SetDiskID(dev, instance.primary_node)
3918     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3919     if dev.dev_type in constants.LDS_DRBD:
3920       # we change the snode then (otherwise we use the one passed in)
3921       if dev.logical_id[0] == instance.primary_node:
3922         snode = dev.logical_id[1]
3923       else:
3924         snode = dev.logical_id[0]
3925
3926     if snode:
3927       self.cfg.SetDiskID(dev, snode)
3928       dev_sstatus = rpc.call_blockdev_find(snode, dev)
3929     else:
3930       dev_sstatus = None
3931
3932     if dev.children:
3933       dev_children = [self._ComputeDiskStatus(instance, snode, child)
3934                       for child in dev.children]
3935     else:
3936       dev_children = []
3937
3938     data = {
3939       "iv_name": dev.iv_name,
3940       "dev_type": dev.dev_type,
3941       "logical_id": dev.logical_id,
3942       "physical_id": dev.physical_id,
3943       "pstatus": dev_pstatus,
3944       "sstatus": dev_sstatus,
3945       "children": dev_children,
3946       }
3947
3948     return data
3949
3950   def Exec(self, feedback_fn):
3951     """Gather and return data"""
3952     result = {}
3953     for instance in self.wanted_instances:
3954       remote_info = rpc.call_instance_info(instance.primary_node,
3955                                                 instance.name)
3956       if remote_info and "state" in remote_info:
3957         remote_state = "up"
3958       else:
3959         remote_state = "down"
3960       if instance.status == "down":
3961         config_state = "down"
3962       else:
3963         config_state = "up"
3964
3965       disks = [self._ComputeDiskStatus(instance, None, device)
3966                for device in instance.disks]
3967
3968       idict = {
3969         "name": instance.name,
3970         "config_state": config_state,
3971         "run_state": remote_state,
3972         "pnode": instance.primary_node,
3973         "snodes": instance.secondary_nodes,
3974         "os": instance.os,
3975         "memory": instance.memory,
3976         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3977         "disks": disks,
3978         "network_port": instance.network_port,
3979         "vcpus": instance.vcpus,
3980         "kernel_path": instance.kernel_path,
3981         "initrd_path": instance.initrd_path,
3982         "hvm_boot_order": instance.hvm_boot_order,
3983         }
3984
3985       result[instance.name] = idict
3986
3987     return result
3988
3989
3990 class LUSetInstanceParams(LogicalUnit):
3991   """Modifies an instances's parameters.
3992
3993   """
3994   HPATH = "instance-modify"
3995   HTYPE = constants.HTYPE_INSTANCE
3996   _OP_REQP = ["instance_name"]
3997
3998   def BuildHooksEnv(self):
3999     """Build hooks env.
4000
4001     This runs on the master, primary and secondaries.
4002
4003     """
4004     args = dict()
4005     if self.mem:
4006       args['memory'] = self.mem
4007     if self.vcpus:
4008       args['vcpus'] = self.vcpus
4009     if self.do_ip or self.do_bridge or self.mac:
4010       if self.do_ip:
4011         ip = self.ip
4012       else:
4013         ip = self.instance.nics[0].ip
4014       if self.bridge:
4015         bridge = self.bridge
4016       else:
4017         bridge = self.instance.nics[0].bridge
4018       if self.mac:
4019         mac = self.mac
4020       else:
4021         mac = self.instance.nics[0].mac
4022       args['nics'] = [(ip, bridge, mac)]
4023     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4024     nl = [self.sstore.GetMasterNode(),
4025           self.instance.primary_node] + list(self.instance.secondary_nodes)
4026     return env, nl, nl
4027
4028   def CheckPrereq(self):
4029     """Check prerequisites.
4030
4031     This only checks the instance list against the existing names.
4032
4033     """
4034     self.mem = getattr(self.op, "mem", None)
4035     self.vcpus = getattr(self.op, "vcpus", None)
4036     self.ip = getattr(self.op, "ip", None)
4037     self.mac = getattr(self.op, "mac", None)
4038     self.bridge = getattr(self.op, "bridge", None)
4039     self.kernel_path = getattr(self.op, "kernel_path", None)
4040     self.initrd_path = getattr(self.op, "initrd_path", None)
4041     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4042     all_params = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4043                   self.kernel_path, self.initrd_path, self.hvm_boot_order]
4044     if all_params.count(None) == len(all_params):
4045       raise errors.OpPrereqError("No changes submitted")
4046     if self.mem is not None:
4047       try:
4048         self.mem = int(self.mem)
4049       except ValueError, err:
4050         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4051     if self.vcpus is not None:
4052       try:
4053         self.vcpus = int(self.vcpus)
4054       except ValueError, err:
4055         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4056     if self.ip is not None:
4057       self.do_ip = True
4058       if self.ip.lower() == "none":
4059         self.ip = None
4060       else:
4061         if not utils.IsValidIP(self.ip):
4062           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4063     else:
4064       self.do_ip = False
4065     self.do_bridge = (self.bridge is not None)
4066     if self.mac is not None:
4067       if self.cfg.IsMacInUse(self.mac):
4068         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4069                                    self.mac)
4070       if not utils.IsValidMac(self.mac):
4071         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4072
4073     if self.kernel_path is not None:
4074       self.do_kernel_path = True
4075       if self.kernel_path == constants.VALUE_NONE:
4076         raise errors.OpPrereqError("Can't set instance to no kernel")
4077
4078       if self.kernel_path != constants.VALUE_DEFAULT:
4079         if not os.path.isabs(self.kernel_path):
4080           raise errors.OpPrereqError("The kernel path must be an absolute"
4081                                     " filename")
4082     else:
4083       self.do_kernel_path = False
4084
4085     if self.initrd_path is not None:
4086       self.do_initrd_path = True
4087       if self.initrd_path not in (constants.VALUE_NONE,
4088                                   constants.VALUE_DEFAULT):
4089         if not os.path.isabs(self.initrd_path):
4090           raise errors.OpPrereqError("The initrd path must be an absolute"
4091                                     " filename")
4092     else:
4093       self.do_initrd_path = False
4094
4095     # boot order verification
4096     if self.hvm_boot_order is not None:
4097       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4098         if len(self.hvm_boot_order.strip("acdn")) != 0:
4099           raise errors.OpPrereqError("invalid boot order specified,"
4100                                      " must be one or more of [acdn]"
4101                                      " or 'default'")
4102
4103     instance = self.cfg.GetInstanceInfo(
4104       self.cfg.ExpandInstanceName(self.op.instance_name))
4105     if instance is None:
4106       raise errors.OpPrereqError("No such instance name '%s'" %
4107                                  self.op.instance_name)
4108     self.op.instance_name = instance.name
4109     self.instance = instance
4110     return
4111
4112   def Exec(self, feedback_fn):
4113     """Modifies an instance.
4114
4115     All parameters take effect only at the next restart of the instance.
4116     """
4117     result = []
4118     instance = self.instance
4119     if self.mem:
4120       instance.memory = self.mem
4121       result.append(("mem", self.mem))
4122     if self.vcpus:
4123       instance.vcpus = self.vcpus
4124       result.append(("vcpus",  self.vcpus))
4125     if self.do_ip:
4126       instance.nics[0].ip = self.ip
4127       result.append(("ip", self.ip))
4128     if self.bridge:
4129       instance.nics[0].bridge = self.bridge
4130       result.append(("bridge", self.bridge))
4131     if self.mac:
4132       instance.nics[0].mac = self.mac
4133       result.append(("mac", self.mac))
4134     if self.do_kernel_path:
4135       instance.kernel_path = self.kernel_path
4136       result.append(("kernel_path", self.kernel_path))
4137     if self.do_initrd_path:
4138       instance.initrd_path = self.initrd_path
4139       result.append(("initrd_path", self.initrd_path))
4140     if self.hvm_boot_order:
4141       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4142         instance.hvm_boot_order = None
4143       else:
4144         instance.hvm_boot_order = self.hvm_boot_order
4145       result.append(("hvm_boot_order", self.hvm_boot_order))
4146
4147     self.cfg.AddInstance(instance)
4148
4149     return result
4150
4151
4152 class LUQueryExports(NoHooksLU):
4153   """Query the exports list
4154
4155   """
4156   _OP_REQP = []
4157
4158   def CheckPrereq(self):
4159     """Check that the nodelist contains only existing nodes.
4160
4161     """
4162     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4163
4164   def Exec(self, feedback_fn):
4165     """Compute the list of all the exported system images.
4166
4167     Returns:
4168       a dictionary with the structure node->(export-list)
4169       where export-list is a list of the instances exported on
4170       that node.
4171
4172     """
4173     return rpc.call_export_list(self.nodes)
4174
4175
4176 class LUExportInstance(LogicalUnit):
4177   """Export an instance to an image in the cluster.
4178
4179   """
4180   HPATH = "instance-export"
4181   HTYPE = constants.HTYPE_INSTANCE
4182   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4183
4184   def BuildHooksEnv(self):
4185     """Build hooks env.
4186
4187     This will run on the master, primary node and target node.
4188
4189     """
4190     env = {
4191       "EXPORT_NODE": self.op.target_node,
4192       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4193       }
4194     env.update(_BuildInstanceHookEnvByObject(self.instance))
4195     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4196           self.op.target_node]
4197     return env, nl, nl
4198
4199   def CheckPrereq(self):
4200     """Check prerequisites.
4201
4202     This checks that the instance name is a valid one.
4203
4204     """
4205     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4206     self.instance = self.cfg.GetInstanceInfo(instance_name)
4207     if self.instance is None:
4208       raise errors.OpPrereqError("Instance '%s' not found" %
4209                                  self.op.instance_name)
4210
4211     # node verification
4212     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4213     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4214
4215     if self.dst_node is None:
4216       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4217                                  self.op.target_node)
4218     self.op.target_node = self.dst_node.name
4219
4220   def Exec(self, feedback_fn):
4221     """Export an instance to an image in the cluster.
4222
4223     """
4224     instance = self.instance
4225     dst_node = self.dst_node
4226     src_node = instance.primary_node
4227     if self.op.shutdown:
4228       # shutdown the instance, but not the disks
4229       if not rpc.call_instance_shutdown(src_node, instance):
4230          raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4231                                  (instance.name, source_node))
4232
4233     vgname = self.cfg.GetVGName()
4234
4235     snap_disks = []
4236
4237     try:
4238       for disk in instance.disks:
4239         if disk.iv_name == "sda":
4240           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4241           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4242
4243           if not new_dev_name:
4244             logger.Error("could not snapshot block device %s on node %s" %
4245                          (disk.logical_id[1], src_node))
4246           else:
4247             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4248                                       logical_id=(vgname, new_dev_name),
4249                                       physical_id=(vgname, new_dev_name),
4250                                       iv_name=disk.iv_name)
4251             snap_disks.append(new_dev)
4252
4253     finally:
4254       if self.op.shutdown and instance.status == "up":
4255         if not rpc.call_instance_start(src_node, instance, None):
4256           _ShutdownInstanceDisks(instance, self.cfg)
4257           raise errors.OpExecError("Could not start instance")
4258
4259     # TODO: check for size
4260
4261     for dev in snap_disks:
4262       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4263                                            instance):
4264         logger.Error("could not export block device %s from node"
4265                      " %s to node %s" %
4266                      (dev.logical_id[1], src_node, dst_node.name))
4267       if not rpc.call_blockdev_remove(src_node, dev):
4268         logger.Error("could not remove snapshot block device %s from"
4269                      " node %s" % (dev.logical_id[1], src_node))
4270
4271     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4272       logger.Error("could not finalize export for instance %s on node %s" %
4273                    (instance.name, dst_node.name))
4274
4275     nodelist = self.cfg.GetNodeList()
4276     nodelist.remove(dst_node.name)
4277
4278     # on one-node clusters nodelist will be empty after the removal
4279     # if we proceed the backup would be removed because OpQueryExports
4280     # substitutes an empty list with the full cluster node list.
4281     if nodelist:
4282       op = opcodes.OpQueryExports(nodes=nodelist)
4283       exportlist = self.proc.ChainOpCode(op)
4284       for node in exportlist:
4285         if instance.name in exportlist[node]:
4286           if not rpc.call_export_remove(node, instance.name):
4287             logger.Error("could not remove older export for instance %s"
4288                          " on node %s" % (instance.name, node))
4289
4290
4291 class TagsLU(NoHooksLU):
4292   """Generic tags LU.
4293
4294   This is an abstract class which is the parent of all the other tags LUs.
4295
4296   """
4297   def CheckPrereq(self):
4298     """Check prerequisites.
4299
4300     """
4301     if self.op.kind == constants.TAG_CLUSTER:
4302       self.target = self.cfg.GetClusterInfo()
4303     elif self.op.kind == constants.TAG_NODE:
4304       name = self.cfg.ExpandNodeName(self.op.name)
4305       if name is None:
4306         raise errors.OpPrereqError("Invalid node name (%s)" %
4307                                    (self.op.name,))
4308       self.op.name = name
4309       self.target = self.cfg.GetNodeInfo(name)
4310     elif self.op.kind == constants.TAG_INSTANCE:
4311       name = self.cfg.ExpandInstanceName(self.op.name)
4312       if name is None:
4313         raise errors.OpPrereqError("Invalid instance name (%s)" %
4314                                    (self.op.name,))
4315       self.op.name = name
4316       self.target = self.cfg.GetInstanceInfo(name)
4317     else:
4318       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4319                                  str(self.op.kind))
4320
4321
4322 class LUGetTags(TagsLU):
4323   """Returns the tags of a given object.
4324
4325   """
4326   _OP_REQP = ["kind", "name"]
4327
4328   def Exec(self, feedback_fn):
4329     """Returns the tag list.
4330
4331     """
4332     return self.target.GetTags()
4333
4334
4335 class LUSearchTags(NoHooksLU):
4336   """Searches the tags for a given pattern.
4337
4338   """
4339   _OP_REQP = ["pattern"]
4340
4341   def CheckPrereq(self):
4342     """Check prerequisites.
4343
4344     This checks the pattern passed for validity by compiling it.
4345
4346     """
4347     try:
4348       self.re = re.compile(self.op.pattern)
4349     except re.error, err:
4350       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4351                                  (self.op.pattern, err))
4352
4353   def Exec(self, feedback_fn):
4354     """Returns the tag list.
4355
4356     """
4357     cfg = self.cfg
4358     tgts = [("/cluster", cfg.GetClusterInfo())]
4359     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4360     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4361     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4362     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4363     results = []
4364     for path, target in tgts:
4365       for tag in target.GetTags():
4366         if self.re.search(tag):
4367           results.append((path, tag))
4368     return results
4369
4370
4371 class LUAddTags(TagsLU):
4372   """Sets a tag on a given object.
4373
4374   """
4375   _OP_REQP = ["kind", "name", "tags"]
4376
4377   def CheckPrereq(self):
4378     """Check prerequisites.
4379
4380     This checks the type and length of the tag name and value.
4381
4382     """
4383     TagsLU.CheckPrereq(self)
4384     for tag in self.op.tags:
4385       objects.TaggableObject.ValidateTag(tag)
4386
4387   def Exec(self, feedback_fn):
4388     """Sets the tag.
4389
4390     """
4391     try:
4392       for tag in self.op.tags:
4393         self.target.AddTag(tag)
4394     except errors.TagError, err:
4395       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4396     try:
4397       self.cfg.Update(self.target)
4398     except errors.ConfigurationError:
4399       raise errors.OpRetryError("There has been a modification to the"
4400                                 " config file and the operation has been"
4401                                 " aborted. Please retry.")
4402
4403
4404 class LUDelTags(TagsLU):
4405   """Delete a list of tags from a given object.
4406
4407   """
4408   _OP_REQP = ["kind", "name", "tags"]
4409
4410   def CheckPrereq(self):
4411     """Check prerequisites.
4412
4413     This checks that we have the given tag.
4414
4415     """
4416     TagsLU.CheckPrereq(self)
4417     for tag in self.op.tags:
4418       objects.TaggableObject.ValidateTag(tag)
4419     del_tags = frozenset(self.op.tags)
4420     cur_tags = self.target.GetTags()
4421     if not del_tags <= cur_tags:
4422       diff_tags = del_tags - cur_tags
4423       diff_names = ["'%s'" % tag for tag in diff_tags]
4424       diff_names.sort()
4425       raise errors.OpPrereqError("Tag(s) %s not found" %
4426                                  (",".join(diff_names)))
4427
4428   def Exec(self, feedback_fn):
4429     """Remove the tag from the object.
4430
4431     """
4432     for tag in self.op.tags:
4433       self.target.RemoveTag(tag)
4434     try:
4435       self.cfg.Update(self.target)
4436     except errors.ConfigurationError:
4437       raise errors.OpRetryError("There has been a modification to the"
4438                                 " config file and the operation has been"
4439                                 " aborted. Please retry.")
4440
4441 class LUTestDelay(NoHooksLU):
4442   """Sleep for a specified amount of time.
4443
4444   This LU sleeps on the master and/or nodes for a specified amoutn of
4445   time.
4446
4447   """
4448   _OP_REQP = ["duration", "on_master", "on_nodes"]
4449
4450   def CheckPrereq(self):
4451     """Check prerequisites.
4452
4453     This checks that we have a good list of nodes and/or the duration
4454     is valid.
4455
4456     """
4457
4458     if self.op.on_nodes:
4459       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4460
4461   def Exec(self, feedback_fn):
4462     """Do the actual sleep.
4463
4464     """
4465     if self.op.on_master:
4466       if not utils.TestDelay(self.op.duration):
4467         raise errors.OpExecError("Error during master delay test")
4468     if self.op.on_nodes:
4469       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4470       if not result:
4471         raise errors.OpExecError("Complete failure from rpc call")
4472       for node, node_result in result.items():
4473         if not node_result:
4474           raise errors.OpExecError("Failure during rpc call to node %s,"
4475                                    " result: %s" % (node, node_result))