KVM: make the kernel and initrd arguments optional
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36 import zlib
37 import base64
38
39 from ganeti import errors
40 from ganeti import utils
41 from ganeti import ssh
42 from ganeti import hypervisor
43 from ganeti import constants
44 from ganeti import bdev
45 from ganeti import objects
46 from ganeti import ssconf
47
48
49 def _GetConfig():
50   """Simple wrapper to return a SimpleStore.
51
52   @rtype: L{ssconf.SimpleStore}
53   @return: a SimpleStore instance
54
55   """
56   return ssconf.SimpleStore()
57
58
59 def _GetSshRunner(cluster_name):
60   """Simple wrapper to return an SshRunner.
61
62   @type cluster_name: str
63   @param cluster_name: the cluster name, which is needed
64       by the SshRunner constructor
65   @rtype: L{ssh.SshRunner}
66   @return: an SshRunner instance
67
68   """
69   return ssh.SshRunner(cluster_name)
70
71
72 def _Decompress(data):
73   """Unpacks data compressed by the RPC client.
74
75   @type data: list or tuple
76   @param data: Data sent by RPC client
77   @rtype: str
78   @return: Decompressed data
79
80   """
81   assert isinstance(data, (list, tuple))
82   assert len(data) == 2
83   (encoding, content) = data
84   if encoding == constants.RPC_ENCODING_NONE:
85     return content
86   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
87     return zlib.decompress(base64.b64decode(content))
88   else:
89     raise AssertionError("Unknown data encoding")
90
91
92 def _CleanDirectory(path, exclude=None):
93   """Removes all regular files in a directory.
94
95   @type path: str
96   @param path: the directory to clean
97   @type exclude: list
98   @param exclude: list of files to be excluded, defaults
99       to the empty list
100
101   """
102   if not os.path.isdir(path):
103     return
104   if exclude is None:
105     exclude = []
106   else:
107     # Normalize excluded paths
108     exclude = [os.path.normpath(i) for i in exclude]
109
110   for rel_name in utils.ListVisibleFiles(path):
111     full_name = os.path.normpath(os.path.join(path, rel_name))
112     if full_name in exclude:
113       continue
114     if os.path.isfile(full_name) and not os.path.islink(full_name):
115       utils.RemoveFile(full_name)
116
117
118 def JobQueuePurge():
119   """Removes job queue files and archived jobs.
120
121   @rtype: None
122
123   """
124   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
125   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
126
127
128 def GetMasterInfo():
129   """Returns master information.
130
131   This is an utility function to compute master information, either
132   for consumption here or from the node daemon.
133
134   @rtype: tuple
135   @return: (master_netdev, master_ip, master_name) if we have a good
136       configuration, otherwise (None, None, None)
137
138   """
139   try:
140     cfg = _GetConfig()
141     master_netdev = cfg.GetMasterNetdev()
142     master_ip = cfg.GetMasterIP()
143     master_node = cfg.GetMasterNode()
144   except errors.ConfigurationError, err:
145     logging.exception("Cluster configuration incomplete")
146     return (None, None, None)
147   return (master_netdev, master_ip, master_node)
148
149
150 def StartMaster(start_daemons):
151   """Activate local node as master node.
152
153   The function will always try activate the IP address of the master
154   (unless someone else has it). It will also start the master daemons,
155   based on the start_daemons parameter.
156
157   @type start_daemons: boolean
158   @param start_daemons: whther to also start the master
159       daemons (ganeti-masterd and ganeti-rapi)
160   @rtype: None
161
162   """
163   ok = True
164   master_netdev, master_ip, _ = GetMasterInfo()
165   if not master_netdev:
166     return False
167
168   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
169     if utils.OwnIpAddress(master_ip):
170       # we already have the ip:
171       logging.debug("Already started")
172     else:
173       logging.error("Someone else has the master ip, not activating")
174       ok = False
175   else:
176     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
177                            "dev", master_netdev, "label",
178                            "%s:0" % master_netdev])
179     if result.failed:
180       logging.error("Can't activate master IP: %s", result.output)
181       ok = False
182
183     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
184                            "-s", master_ip, master_ip])
185     # we'll ignore the exit code of arping
186
187   # and now start the master and rapi daemons
188   if start_daemons:
189     for daemon in 'ganeti-masterd', 'ganeti-rapi':
190       result = utils.RunCmd([daemon])
191       if result.failed:
192         logging.error("Can't start daemon %s: %s", daemon, result.output)
193         ok = False
194   return ok
195
196
197 def StopMaster(stop_daemons):
198   """Deactivate this node as master.
199
200   The function will always try to deactivate the IP address of the
201   master. It will also stop the master daemons depending on the
202   stop_daemons parameter.
203
204   @type stop_daemons: boolean
205   @param stop_daemons: whether to also stop the master daemons
206       (ganeti-masterd and ganeti-rapi)
207   @rtype: None
208
209   """
210   master_netdev, master_ip, _ = GetMasterInfo()
211   if not master_netdev:
212     return False
213
214   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
215                          "dev", master_netdev])
216   if result.failed:
217     logging.error("Can't remove the master IP, error: %s", result.output)
218     # but otherwise ignore the failure
219
220   if stop_daemons:
221     # stop/kill the rapi and the master daemon
222     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
223       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
224
225   return True
226
227
228 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
229   """Joins this node to the cluster.
230
231   This does the following:
232       - updates the hostkeys of the machine (rsa and dsa)
233       - adds the ssh private key to the user
234       - adds the ssh public key to the users' authorized_keys file
235
236   @type dsa: str
237   @param dsa: the DSA private key to write
238   @type dsapub: str
239   @param dsapub: the DSA public key to write
240   @type rsa: str
241   @param rsa: the RSA private key to write
242   @type rsapub: str
243   @param rsapub: the RSA public key to write
244   @type sshkey: str
245   @param sshkey: the SSH private key to write
246   @type sshpub: str
247   @param sshpub: the SSH public key to write
248   @rtype: boolean
249   @return: the success of the operation
250
251   """
252   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
253                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
254                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
255                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
256   for name, content, mode in sshd_keys:
257     utils.WriteFile(name, data=content, mode=mode)
258
259   try:
260     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
261                                                     mkdir=True)
262   except errors.OpExecError, err:
263     logging.exception("Error while processing user ssh files")
264     return False
265
266   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
267     utils.WriteFile(name, data=content, mode=0600)
268
269   utils.AddAuthorizedKey(auth_keys, sshpub)
270
271   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
272
273   return True
274
275
276 def LeaveCluster():
277   """Cleans up and remove the current node.
278
279   This function cleans up and prepares the current node to be removed
280   from the cluster.
281
282   If processing is successful, then it raises an
283   L{errors.QuitGanetiException} which is used as a special case to
284   shutdown the node daemon.
285
286   """
287   _CleanDirectory(constants.DATA_DIR)
288   JobQueuePurge()
289
290   try:
291     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
292   except errors.OpExecError:
293     logging.exception("Error while processing ssh files")
294     return
295
296   f = open(pub_key, 'r')
297   try:
298     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
299   finally:
300     f.close()
301
302   utils.RemoveFile(priv_key)
303   utils.RemoveFile(pub_key)
304
305   # Return a reassuring string to the caller, and quit
306   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
307
308
309 def GetNodeInfo(vgname, hypervisor_type):
310   """Gives back a hash with different informations about the node.
311
312   @type vgname: C{string}
313   @param vgname: the name of the volume group to ask for disk space information
314   @type hypervisor_type: C{str}
315   @param hypervisor_type: the name of the hypervisor to ask for
316       memory information
317   @rtype: C{dict}
318   @return: dictionary with the following keys:
319       - vg_size is the size of the configured volume group in MiB
320       - vg_free is the free size of the volume group in MiB
321       - memory_dom0 is the memory allocated for domain0 in MiB
322       - memory_free is the currently available (free) ram in MiB
323       - memory_total is the total number of ram in MiB
324
325   """
326   outputarray = {}
327   vginfo = _GetVGInfo(vgname)
328   outputarray['vg_size'] = vginfo['vg_size']
329   outputarray['vg_free'] = vginfo['vg_free']
330
331   hyper = hypervisor.GetHypervisor(hypervisor_type)
332   hyp_info = hyper.GetNodeInfo()
333   if hyp_info is not None:
334     outputarray.update(hyp_info)
335
336   f = open("/proc/sys/kernel/random/boot_id", 'r')
337   try:
338     outputarray["bootid"] = f.read(128).rstrip("\n")
339   finally:
340     f.close()
341
342   return outputarray
343
344
345 def VerifyNode(what, cluster_name):
346   """Verify the status of the local node.
347
348   Based on the input L{what} parameter, various checks are done on the
349   local node.
350
351   If the I{filelist} key is present, this list of
352   files is checksummed and the file/checksum pairs are returned.
353
354   If the I{nodelist} key is present, we check that we have
355   connectivity via ssh with the target nodes (and check the hostname
356   report).
357
358   If the I{node-net-test} key is present, we check that we have
359   connectivity to the given nodes via both primary IP and, if
360   applicable, secondary IPs.
361
362   @type what: C{dict}
363   @param what: a dictionary of things to check:
364       - filelist: list of files for which to compute checksums
365       - nodelist: list of nodes we should check ssh communication with
366       - node-net-test: list of nodes we should check node daemon port
367         connectivity with
368       - hypervisor: list with hypervisors to run the verify for
369   @rtype: dict
370   @return: a dictionary with the same keys as the input dict, and
371       values representing the result of the checks
372
373   """
374   result = {}
375
376   if constants.NV_HYPERVISOR in what:
377     result[constants.NV_HYPERVISOR] = tmp = {}
378     for hv_name in what[constants.NV_HYPERVISOR]:
379       tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
380
381   if constants.NV_FILELIST in what:
382     result[constants.NV_FILELIST] = utils.FingerprintFiles(
383       what[constants.NV_FILELIST])
384
385   if constants.NV_NODELIST in what:
386     result[constants.NV_NODELIST] = tmp = {}
387     random.shuffle(what[constants.NV_NODELIST])
388     for node in what[constants.NV_NODELIST]:
389       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
390       if not success:
391         tmp[node] = message
392
393   if constants.NV_NODENETTEST in what:
394     result[constants.NV_NODENETTEST] = tmp = {}
395     my_name = utils.HostInfo().name
396     my_pip = my_sip = None
397     for name, pip, sip in what[constants.NV_NODENETTEST]:
398       if name == my_name:
399         my_pip = pip
400         my_sip = sip
401         break
402     if not my_pip:
403       tmp[my_name] = ("Can't find my own primary/secondary IP"
404                       " in the node list")
405     else:
406       port = utils.GetNodeDaemonPort()
407       for name, pip, sip in what[constants.NV_NODENETTEST]:
408         fail = []
409         if not utils.TcpPing(pip, port, source=my_pip):
410           fail.append("primary")
411         if sip != pip:
412           if not utils.TcpPing(sip, port, source=my_sip):
413             fail.append("secondary")
414         if fail:
415           tmp[name] = ("failure using the %s interface(s)" %
416                        " and ".join(fail))
417
418   if constants.NV_LVLIST in what:
419     result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
420
421   if constants.NV_INSTANCELIST in what:
422     result[constants.NV_INSTANCELIST] = GetInstanceList(
423       what[constants.NV_INSTANCELIST])
424
425   if constants.NV_VGLIST in what:
426     result[constants.NV_VGLIST] = ListVolumeGroups()
427
428   if constants.NV_VERSION in what:
429     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
430                                     constants.RELEASE_VERSION)
431
432   if constants.NV_HVINFO in what:
433     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
434     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
435
436   if constants.NV_DRBDLIST in what:
437     try:
438       used_minors = bdev.DRBD8.GetUsedDevs().keys()
439     except errors.BlockDeviceError:
440       logging.warning("Can't get used minors list", exc_info=True)
441       used_minors = []
442     result[constants.NV_DRBDLIST] = used_minors
443
444   return result
445
446
447 def GetVolumeList(vg_name):
448   """Compute list of logical volumes and their size.
449
450   @type vg_name: str
451   @param vg_name: the volume group whose LVs we should list
452   @rtype: dict
453   @return:
454       dictionary of all partions (key) with value being a tuple of
455       their size (in MiB), inactive and online status::
456
457         {'test1': ('20.06', True, True)}
458
459       in case of errors, a string is returned with the error
460       details.
461
462   """
463   lvs = {}
464   sep = '|'
465   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
466                          "--separator=%s" % sep,
467                          "-olv_name,lv_size,lv_attr", vg_name])
468   if result.failed:
469     logging.error("Failed to list logical volumes, lvs output: %s",
470                   result.output)
471     return result.output
472
473   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
474   for line in result.stdout.splitlines():
475     line = line.strip()
476     match = valid_line_re.match(line)
477     if not match:
478       logging.error("Invalid line returned from lvs output: '%s'", line)
479       continue
480     name, size, attr = match.groups()
481     inactive = attr[4] == '-'
482     online = attr[5] == 'o'
483     lvs[name] = (size, inactive, online)
484
485   return lvs
486
487
488 def ListVolumeGroups():
489   """List the volume groups and their size.
490
491   @rtype: dict
492   @return: dictionary with keys volume name and values the
493       size of the volume
494
495   """
496   return utils.ListVolumeGroups()
497
498
499 def NodeVolumes():
500   """List all volumes on this node.
501
502   @rtype: list
503   @return:
504     A list of dictionaries, each having four keys:
505       - name: the logical volume name,
506       - size: the size of the logical volume
507       - dev: the physical device on which the LV lives
508       - vg: the volume group to which it belongs
509
510     In case of errors, we return an empty list and log the
511     error.
512
513     Note that since a logical volume can live on multiple physical
514     volumes, the resulting list might include a logical volume
515     multiple times.
516
517   """
518   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
519                          "--separator=|",
520                          "--options=lv_name,lv_size,devices,vg_name"])
521   if result.failed:
522     logging.error("Failed to list logical volumes, lvs output: %s",
523                   result.output)
524     return []
525
526   def parse_dev(dev):
527     if '(' in dev:
528       return dev.split('(')[0]
529     else:
530       return dev
531
532   def map_line(line):
533     return {
534       'name': line[0].strip(),
535       'size': line[1].strip(),
536       'dev': parse_dev(line[2].strip()),
537       'vg': line[3].strip(),
538     }
539
540   return [map_line(line.split('|')) for line in result.stdout.splitlines()
541           if line.count('|') >= 3]
542
543
544 def BridgesExist(bridges_list):
545   """Check if a list of bridges exist on the current node.
546
547   @rtype: boolean
548   @return: C{True} if all of them exist, C{False} otherwise
549
550   """
551   for bridge in bridges_list:
552     if not utils.BridgeExists(bridge):
553       return False
554
555   return True
556
557
558 def GetInstanceList(hypervisor_list):
559   """Provides a list of instances.
560
561   @type hypervisor_list: list
562   @param hypervisor_list: the list of hypervisors to query information
563
564   @rtype: list
565   @return: a list of all running instances on the current node
566     - instance1.example.com
567     - instance2.example.com
568
569   """
570   results = []
571   for hname in hypervisor_list:
572     try:
573       names = hypervisor.GetHypervisor(hname).ListInstances()
574       results.extend(names)
575     except errors.HypervisorError, err:
576       logging.exception("Error enumerating instances for hypevisor %s", hname)
577       # FIXME: should we somehow not propagate this to the master?
578       raise
579
580   return results
581
582
583 def GetInstanceInfo(instance, hname):
584   """Gives back the informations about an instance as a dictionary.
585
586   @type instance: string
587   @param instance: the instance name
588   @type hname: string
589   @param hname: the hypervisor type of the instance
590
591   @rtype: dict
592   @return: dictionary with the following keys:
593       - memory: memory size of instance (int)
594       - state: xen state of instance (string)
595       - time: cpu time of instance (float)
596
597   """
598   output = {}
599
600   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
601   if iinfo is not None:
602     output['memory'] = iinfo[2]
603     output['state'] = iinfo[4]
604     output['time'] = iinfo[5]
605
606   return output
607
608
609 def GetInstanceMigratable(instance):
610   """Gives whether an instance can be migrated.
611
612   @type instance: L{objects.Instance}
613   @param instance: object representing the instance to be checked.
614
615   @rtype: tuple
616   @return: tuple of (result, description) where:
617       - result: whether the instance can be migrated or not
618       - description: a description of the issue, if relevant
619
620   """
621   hyper = hypervisor.GetHypervisor(instance.hypervisor)
622   if instance.name not in hyper.ListInstances():
623     return (False, 'not running')
624
625   for idx in range(len(instance.disks)):
626     link_name = _GetBlockDevSymlinkPath(instance.name, idx)
627     if not os.path.islink(link_name):
628       return (False, 'not restarted since ganeti 1.2.5')
629
630   return (True, '')
631
632
633 def GetAllInstancesInfo(hypervisor_list):
634   """Gather data about all instances.
635
636   This is the equivalent of L{GetInstanceInfo}, except that it
637   computes data for all instances at once, thus being faster if one
638   needs data about more than one instance.
639
640   @type hypervisor_list: list
641   @param hypervisor_list: list of hypervisors to query for instance data
642
643   @rtype: dict
644   @return: dictionary of instance: data, with data having the following keys:
645       - memory: memory size of instance (int)
646       - state: xen state of instance (string)
647       - time: cpu time of instance (float)
648       - vcpus: the number of vcpus
649
650   """
651   output = {}
652
653   for hname in hypervisor_list:
654     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
655     if iinfo:
656       for name, inst_id, memory, vcpus, state, times in iinfo:
657         value = {
658           'memory': memory,
659           'vcpus': vcpus,
660           'state': state,
661           'time': times,
662           }
663         if name in output and output[name] != value:
664           raise errors.HypervisorError("Instance %s running duplicate"
665                                        " with different parameters" % name)
666         output[name] = value
667
668   return output
669
670
671 def AddOSToInstance(instance):
672   """Add an OS to an instance.
673
674   @type instance: L{objects.Instance}
675   @param instance: Instance whose OS is to be installed
676   @rtype: boolean
677   @return: the success of the operation
678
679   """
680   inst_os = OSFromDisk(instance.os)
681
682   create_env = OSEnvironment(instance)
683
684   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
685                                      instance.name, int(time.time()))
686
687   result = utils.RunCmd([inst_os.create_script], env=create_env,
688                         cwd=inst_os.path, output=logfile,)
689   if result.failed:
690     logging.error("os create command '%s' returned error: %s, logfile: %s,"
691                   " output: %s", result.cmd, result.fail_reason, logfile,
692                   result.output)
693     lines = [val.encode("string_escape")
694              for val in utils.TailFile(logfile, lines=20)]
695     return (False, "OS create script failed (%s), last lines in the"
696             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
697
698   return (True, "Successfully installed")
699
700
701 def RunRenameInstance(instance, old_name):
702   """Run the OS rename script for an instance.
703
704   @type instance: L{objects.Instance}
705   @param instance: Instance whose OS is to be installed
706   @type old_name: string
707   @param old_name: previous instance name
708   @rtype: boolean
709   @return: the success of the operation
710
711   """
712   inst_os = OSFromDisk(instance.os)
713
714   rename_env = OSEnvironment(instance)
715   rename_env['OLD_INSTANCE_NAME'] = old_name
716
717   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
718                                            old_name,
719                                            instance.name, int(time.time()))
720
721   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
722                         cwd=inst_os.path, output=logfile)
723
724   if result.failed:
725     logging.error("os create command '%s' returned error: %s output: %s",
726                   result.cmd, result.fail_reason, result.output)
727     lines = [val.encode("string_escape")
728              for val in utils.TailFile(logfile, lines=20)]
729     return (False, "OS rename script failed (%s), last lines in the"
730             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
731
732   return (True, "Rename successful")
733
734
735 def _GetVGInfo(vg_name):
736   """Get informations about the volume group.
737
738   @type vg_name: str
739   @param vg_name: the volume group which we query
740   @rtype: dict
741   @return:
742     A dictionary with the following keys:
743       - C{vg_size} is the total size of the volume group in MiB
744       - C{vg_free} is the free size of the volume group in MiB
745       - C{pv_count} are the number of physical disks in that VG
746
747     If an error occurs during gathering of data, we return the same dict
748     with keys all set to None.
749
750   """
751   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
752
753   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
754                          "--nosuffix", "--units=m", "--separator=:", vg_name])
755
756   if retval.failed:
757     logging.error("volume group %s not present", vg_name)
758     return retdic
759   valarr = retval.stdout.strip().rstrip(':').split(':')
760   if len(valarr) == 3:
761     try:
762       retdic = {
763         "vg_size": int(round(float(valarr[0]), 0)),
764         "vg_free": int(round(float(valarr[1]), 0)),
765         "pv_count": int(valarr[2]),
766         }
767     except ValueError, err:
768       logging.exception("Fail to parse vgs output")
769   else:
770     logging.error("vgs output has the wrong number of fields (expected"
771                   " three): %s", str(valarr))
772   return retdic
773
774
775 def _GetBlockDevSymlinkPath(instance_name, idx):
776   return os.path.join(constants.DISK_LINKS_DIR,
777                       "%s:%d" % (instance_name, idx))
778
779
780 def _SymlinkBlockDev(instance_name, device_path, idx):
781   """Set up symlinks to a instance's block device.
782
783   This is an auxiliary function run when an instance is start (on the primary
784   node) or when an instance is migrated (on the target node).
785
786
787   @param instance_name: the name of the target instance
788   @param device_path: path of the physical block device, on the node
789   @param idx: the disk index
790   @return: absolute path to the disk's symlink
791
792   """
793   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
794   try:
795     os.symlink(device_path, link_name)
796   except OSError, err:
797     if err.errno == errno.EEXIST:
798       if (not os.path.islink(link_name) or
799           os.readlink(link_name) != device_path):
800         os.remove(link_name)
801         os.symlink(device_path, link_name)
802     else:
803       raise
804
805   return link_name
806
807
808 def _RemoveBlockDevLinks(instance_name, disks):
809   """Remove the block device symlinks belonging to the given instance.
810
811   """
812   for idx, disk in enumerate(disks):
813     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
814     if os.path.islink(link_name):
815       try:
816         os.remove(link_name)
817       except OSError:
818         logging.exception("Can't remove symlink '%s'", link_name)
819
820
821 def _GatherAndLinkBlockDevs(instance):
822   """Set up an instance's block device(s).
823
824   This is run on the primary node at instance startup. The block
825   devices must be already assembled.
826
827   @type instance: L{objects.Instance}
828   @param instance: the instance whose disks we shoul assemble
829   @rtype: list
830   @return: list of (disk_object, device_path)
831
832   """
833   block_devices = []
834   for idx, disk in enumerate(instance.disks):
835     device = _RecursiveFindBD(disk)
836     if device is None:
837       raise errors.BlockDeviceError("Block device '%s' is not set up." %
838                                     str(disk))
839     device.Open()
840     try:
841       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
842     except OSError, e:
843       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
844                                     e.strerror)
845
846     block_devices.append((disk, link_name))
847
848   return block_devices
849
850
851 def StartInstance(instance, extra_args):
852   """Start an instance.
853
854   @type instance: L{objects.Instance}
855   @param instance: the instance object
856   @rtype: boolean
857   @return: whether the startup was successful or not
858
859   """
860   running_instances = GetInstanceList([instance.hypervisor])
861
862   if instance.name in running_instances:
863     return (True, "Already running")
864
865   try:
866     block_devices = _GatherAndLinkBlockDevs(instance)
867     hyper = hypervisor.GetHypervisor(instance.hypervisor)
868     hyper.StartInstance(instance, block_devices, extra_args)
869   except errors.BlockDeviceError, err:
870     logging.exception("Failed to start instance")
871     return (False, "Block device error: %s" % str(err))
872   except errors.HypervisorError, err:
873     logging.exception("Failed to start instance")
874     _RemoveBlockDevLinks(instance.name, instance.disks)
875     return (False, "Hypervisor error: %s" % str(err))
876
877   return (True, "Instance started successfully")
878
879
880 def ShutdownInstance(instance):
881   """Shut an instance down.
882
883   @note: this functions uses polling with a hardcoded timeout.
884
885   @type instance: L{objects.Instance}
886   @param instance: the instance object
887   @rtype: boolean
888   @return: whether the startup was successful or not
889
890   """
891   hv_name = instance.hypervisor
892   running_instances = GetInstanceList([hv_name])
893
894   if instance.name not in running_instances:
895     return True
896
897   hyper = hypervisor.GetHypervisor(hv_name)
898   try:
899     hyper.StopInstance(instance)
900   except errors.HypervisorError, err:
901     logging.error("Failed to stop instance: %s" % err)
902     return False
903
904   # test every 10secs for 2min
905
906   time.sleep(1)
907   for dummy in range(11):
908     if instance.name not in GetInstanceList([hv_name]):
909       break
910     time.sleep(10)
911   else:
912     # the shutdown did not succeed
913     logging.error("Shutdown of '%s' unsuccessful, using destroy",
914                   instance.name)
915
916     try:
917       hyper.StopInstance(instance, force=True)
918     except errors.HypervisorError, err:
919       logging.exception("Failed to stop instance: %s" % err)
920       return False
921
922     time.sleep(1)
923     if instance.name in GetInstanceList([hv_name]):
924       logging.error("Could not shutdown instance '%s' even by destroy",
925                     instance.name)
926       return False
927
928   _RemoveBlockDevLinks(instance.name, instance.disks)
929
930   return True
931
932
933 def RebootInstance(instance, reboot_type, extra_args):
934   """Reboot an instance.
935
936   @type instance: L{objects.Instance}
937   @param instance: the instance object to reboot
938   @type reboot_type: str
939   @param reboot_type: the type of reboot, one the following
940     constants:
941       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
942         instance OS, do not recreate the VM
943       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
944         restart the VM (at the hypervisor level)
945       - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
946         is not accepted here, since that mode is handled
947         differently
948   @rtype: boolean
949   @return: the success of the operation
950
951   """
952   running_instances = GetInstanceList([instance.hypervisor])
953
954   if instance.name not in running_instances:
955     logging.error("Cannot reboot instance that is not running")
956     return False
957
958   hyper = hypervisor.GetHypervisor(instance.hypervisor)
959   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
960     try:
961       hyper.RebootInstance(instance)
962     except errors.HypervisorError, err:
963       logging.exception("Failed to soft reboot instance")
964       return False
965   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
966     try:
967       ShutdownInstance(instance)
968       StartInstance(instance, extra_args)
969     except errors.HypervisorError, err:
970       logging.exception("Failed to hard reboot instance")
971       return False
972   else:
973     raise errors.ParameterError("reboot_type invalid")
974
975   return True
976
977
978 def MigrationInfo(instance):
979   """Gather information about an instance to be migrated.
980
981   @type instance: L{objects.Instance}
982   @param instance: the instance definition
983
984   """
985   hyper = hypervisor.GetHypervisor(instance.hypervisor)
986   try:
987     info = hyper.MigrationInfo(instance)
988   except errors.HypervisorError, err:
989     msg = "Failed to fetch migration information"
990     logging.exception(msg)
991     return (False, '%s: %s' % (msg, err))
992   return (True, info)
993
994
995 def AcceptInstance(instance, info, target):
996   """Prepare the node to accept an instance.
997
998   @type instance: L{objects.Instance}
999   @param instance: the instance definition
1000   @type info: string/data (opaque)
1001   @param info: migration information, from the source node
1002   @type target: string
1003   @param target: target host (usually ip), on this node
1004
1005   """
1006   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1007   try:
1008     hyper.AcceptInstance(instance, info, target)
1009   except errors.HypervisorError, err:
1010     msg = "Failed to accept instance"
1011     logging.exception(msg)
1012     return (False, '%s: %s' % (msg, err))
1013   return (True, "Accept successfull")
1014
1015
1016 def FinalizeMigration(instance, info, success):
1017   """Finalize any preparation to accept an instance.
1018
1019   @type instance: L{objects.Instance}
1020   @param instance: the instance definition
1021   @type info: string/data (opaque)
1022   @param info: migration information, from the source node
1023   @type success: boolean
1024   @param success: whether the migration was a success or a failure
1025
1026   """
1027   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1028   try:
1029     hyper.FinalizeMigration(instance, info, success)
1030   except errors.HypervisorError, err:
1031     msg = "Failed to finalize migration"
1032     logging.exception(msg)
1033     return (False, '%s: %s' % (msg, err))
1034   return (True, "Migration Finalized")
1035
1036
1037 def MigrateInstance(instance, target, live):
1038   """Migrates an instance to another node.
1039
1040   @type instance: L{objects.Instance}
1041   @param instance: the instance definition
1042   @type target: string
1043   @param target: the target node name
1044   @type live: boolean
1045   @param live: whether the migration should be done live or not (the
1046       interpretation of this parameter is left to the hypervisor)
1047   @rtype: tuple
1048   @return: a tuple of (success, msg) where:
1049       - succes is a boolean denoting the success/failure of the operation
1050       - msg is a string with details in case of failure
1051
1052   """
1053   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1054
1055   try:
1056     hyper.MigrateInstance(instance.name, target, live)
1057   except errors.HypervisorError, err:
1058     msg = "Failed to migrate instance"
1059     logging.exception(msg)
1060     return (False, "%s: %s" % (msg, err))
1061   return (True, "Migration successfull")
1062
1063
1064 def CreateBlockDevice(disk, size, owner, on_primary, info):
1065   """Creates a block device for an instance.
1066
1067   @type disk: L{objects.Disk}
1068   @param disk: the object describing the disk we should create
1069   @type size: int
1070   @param size: the size of the physical underlying device, in MiB
1071   @type owner: str
1072   @param owner: the name of the instance for which disk is created,
1073       used for device cache data
1074   @type on_primary: boolean
1075   @param on_primary:  indicates if it is the primary node or not
1076   @type info: string
1077   @param info: string that will be sent to the physical device
1078       creation, used for example to set (LVM) tags on LVs
1079
1080   @return: the new unique_id of the device (this can sometime be
1081       computed only after creation), or None. On secondary nodes,
1082       it's not required to return anything.
1083
1084   """
1085   clist = []
1086   if disk.children:
1087     for child in disk.children:
1088       crdev = _RecursiveAssembleBD(child, owner, on_primary)
1089       if on_primary or disk.AssembleOnSecondary():
1090         # we need the children open in case the device itself has to
1091         # be assembled
1092         crdev.Open()
1093       clist.append(crdev)
1094
1095   try:
1096     device = bdev.Create(disk.dev_type, disk.physical_id, clist, size)
1097   except errors.GenericError, err:
1098     return False, "Can't create block device: %s" % str(err)
1099
1100   if on_primary or disk.AssembleOnSecondary():
1101     if not device.Assemble():
1102       errorstring = "Can't assemble device after creation, very unusual event"
1103       logging.error(errorstring)
1104       return False, errorstring
1105     device.SetSyncSpeed(constants.SYNC_SPEED)
1106     if on_primary or disk.OpenOnSecondary():
1107       device.Open(force=True)
1108     DevCacheManager.UpdateCache(device.dev_path, owner,
1109                                 on_primary, disk.iv_name)
1110
1111   device.SetInfo(info)
1112
1113   physical_id = device.unique_id
1114   return True, physical_id
1115
1116
1117 def RemoveBlockDevice(disk):
1118   """Remove a block device.
1119
1120   @note: This is intended to be called recursively.
1121
1122   @type disk: L{objects.Disk}
1123   @param disk: the disk object we should remove
1124   @rtype: boolean
1125   @return: the success of the operation
1126
1127   """
1128   try:
1129     rdev = _RecursiveFindBD(disk)
1130   except errors.BlockDeviceError, err:
1131     # probably can't attach
1132     logging.info("Can't attach to device %s in remove", disk)
1133     rdev = None
1134   if rdev is not None:
1135     r_path = rdev.dev_path
1136     result = rdev.Remove()
1137     if result:
1138       DevCacheManager.RemoveCache(r_path)
1139   else:
1140     result = True
1141   if disk.children:
1142     for child in disk.children:
1143       result = result and RemoveBlockDevice(child)
1144   return result
1145
1146
1147 def _RecursiveAssembleBD(disk, owner, as_primary):
1148   """Activate a block device for an instance.
1149
1150   This is run on the primary and secondary nodes for an instance.
1151
1152   @note: this function is called recursively.
1153
1154   @type disk: L{objects.Disk}
1155   @param disk: the disk we try to assemble
1156   @type owner: str
1157   @param owner: the name of the instance which owns the disk
1158   @type as_primary: boolean
1159   @param as_primary: if we should make the block device
1160       read/write
1161
1162   @return: the assembled device or None (in case no device
1163       was assembled)
1164   @raise errors.BlockDeviceError: in case there is an error
1165       during the activation of the children or the device
1166       itself
1167
1168   """
1169   children = []
1170   if disk.children:
1171     mcn = disk.ChildrenNeeded()
1172     if mcn == -1:
1173       mcn = 0 # max number of Nones allowed
1174     else:
1175       mcn = len(disk.children) - mcn # max number of Nones
1176     for chld_disk in disk.children:
1177       try:
1178         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1179       except errors.BlockDeviceError, err:
1180         if children.count(None) >= mcn:
1181           raise
1182         cdev = None
1183         logging.debug("Error in child activation: %s", str(err))
1184       children.append(cdev)
1185
1186   if as_primary or disk.AssembleOnSecondary():
1187     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children)
1188     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1189     result = r_dev
1190     if as_primary or disk.OpenOnSecondary():
1191       r_dev.Open()
1192     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1193                                 as_primary, disk.iv_name)
1194
1195   else:
1196     result = True
1197   return result
1198
1199
1200 def AssembleBlockDevice(disk, owner, as_primary):
1201   """Activate a block device for an instance.
1202
1203   This is a wrapper over _RecursiveAssembleBD.
1204
1205   @rtype: str or boolean
1206   @return: a C{/dev/...} path for primary nodes, and
1207       C{True} for secondary nodes
1208
1209   """
1210   result = _RecursiveAssembleBD(disk, owner, as_primary)
1211   if isinstance(result, bdev.BlockDev):
1212     result = result.dev_path
1213   return result
1214
1215
1216 def ShutdownBlockDevice(disk):
1217   """Shut down a block device.
1218
1219   First, if the device is assembled (Attach() is successfull), then
1220   the device is shutdown. Then the children of the device are
1221   shutdown.
1222
1223   This function is called recursively. Note that we don't cache the
1224   children or such, as oppossed to assemble, shutdown of different
1225   devices doesn't require that the upper device was active.
1226
1227   @type disk: L{objects.Disk}
1228   @param disk: the description of the disk we should
1229       shutdown
1230   @rtype: boolean
1231   @return: the success of the operation
1232
1233   """
1234   r_dev = _RecursiveFindBD(disk)
1235   if r_dev is not None:
1236     r_path = r_dev.dev_path
1237     result = r_dev.Shutdown()
1238     if result:
1239       DevCacheManager.RemoveCache(r_path)
1240   else:
1241     result = True
1242   if disk.children:
1243     for child in disk.children:
1244       result = result and ShutdownBlockDevice(child)
1245   return result
1246
1247
1248 def MirrorAddChildren(parent_cdev, new_cdevs):
1249   """Extend a mirrored block device.
1250
1251   @type parent_cdev: L{objects.Disk}
1252   @param parent_cdev: the disk to which we should add children
1253   @type new_cdevs: list of L{objects.Disk}
1254   @param new_cdevs: the list of children which we should add
1255   @rtype: boolean
1256   @return: the success of the operation
1257
1258   """
1259   parent_bdev = _RecursiveFindBD(parent_cdev)
1260   if parent_bdev is None:
1261     logging.error("Can't find parent device")
1262     return False
1263   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1264   if new_bdevs.count(None) > 0:
1265     logging.error("Can't find new device(s) to add: %s:%s",
1266                   new_bdevs, new_cdevs)
1267     return False
1268   parent_bdev.AddChildren(new_bdevs)
1269   return True
1270
1271
1272 def MirrorRemoveChildren(parent_cdev, new_cdevs):
1273   """Shrink a mirrored block device.
1274
1275   @type parent_cdev: L{objects.Disk}
1276   @param parent_cdev: the disk from which we should remove children
1277   @type new_cdevs: list of L{objects.Disk}
1278   @param new_cdevs: the list of children which we should remove
1279   @rtype: boolean
1280   @return: the success of the operation
1281
1282   """
1283   parent_bdev = _RecursiveFindBD(parent_cdev)
1284   if parent_bdev is None:
1285     logging.error("Can't find parent in remove children: %s", parent_cdev)
1286     return False
1287   devs = []
1288   for disk in new_cdevs:
1289     rpath = disk.StaticDevPath()
1290     if rpath is None:
1291       bd = _RecursiveFindBD(disk)
1292       if bd is None:
1293         logging.error("Can't find dynamic device %s while removing children",
1294                       disk)
1295         return False
1296       else:
1297         devs.append(bd.dev_path)
1298     else:
1299       devs.append(rpath)
1300   parent_bdev.RemoveChildren(devs)
1301   return True
1302
1303
1304 def GetMirrorStatus(disks):
1305   """Get the mirroring status of a list of devices.
1306
1307   @type disks: list of L{objects.Disk}
1308   @param disks: the list of disks which we should query
1309   @rtype: disk
1310   @return:
1311       a list of (mirror_done, estimated_time) tuples, which
1312       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1313   @raise errors.BlockDeviceError: if any of the disks cannot be
1314       found
1315
1316   """
1317   stats = []
1318   for dsk in disks:
1319     rbd = _RecursiveFindBD(dsk)
1320     if rbd is None:
1321       raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1322     stats.append(rbd.CombinedSyncStatus())
1323   return stats
1324
1325
1326 def _RecursiveFindBD(disk):
1327   """Check if a device is activated.
1328
1329   If so, return informations about the real device.
1330
1331   @type disk: L{objects.Disk}
1332   @param disk: the disk object we need to find
1333
1334   @return: None if the device can't be found,
1335       otherwise the device instance
1336
1337   """
1338   children = []
1339   if disk.children:
1340     for chdisk in disk.children:
1341       children.append(_RecursiveFindBD(chdisk))
1342
1343   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1344
1345
1346 def FindBlockDevice(disk):
1347   """Check if a device is activated.
1348
1349   If it is, return informations about the real device.
1350
1351   @type disk: L{objects.Disk}
1352   @param disk: the disk to find
1353   @rtype: None or tuple
1354   @return: None if the disk cannot be found, otherwise a
1355       tuple (device_path, major, minor, sync_percent,
1356       estimated_time, is_degraded)
1357
1358   """
1359   rbd = _RecursiveFindBD(disk)
1360   if rbd is None:
1361     return rbd
1362   return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1363
1364
1365 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1366   """Write a file to the filesystem.
1367
1368   This allows the master to overwrite(!) a file. It will only perform
1369   the operation if the file belongs to a list of configuration files.
1370
1371   @type file_name: str
1372   @param file_name: the target file name
1373   @type data: str
1374   @param data: the new contents of the file
1375   @type mode: int
1376   @param mode: the mode to give the file (can be None)
1377   @type uid: int
1378   @param uid: the owner of the file (can be -1 for default)
1379   @type gid: int
1380   @param gid: the group of the file (can be -1 for default)
1381   @type atime: float
1382   @param atime: the atime to set on the file (can be None)
1383   @type mtime: float
1384   @param mtime: the mtime to set on the file (can be None)
1385   @rtype: boolean
1386   @return: the success of the operation; errors are logged
1387       in the node daemon log
1388
1389   """
1390   if not os.path.isabs(file_name):
1391     logging.error("Filename passed to UploadFile is not absolute: '%s'",
1392                   file_name)
1393     return False
1394
1395   allowed_files = [
1396     constants.CLUSTER_CONF_FILE,
1397     constants.ETC_HOSTS,
1398     constants.SSH_KNOWN_HOSTS_FILE,
1399     constants.VNC_PASSWORD_FILE,
1400     ]
1401
1402   if file_name not in allowed_files:
1403     logging.error("Filename passed to UploadFile not in allowed"
1404                  " upload targets: '%s'", file_name)
1405     return False
1406
1407   raw_data = _Decompress(data)
1408
1409   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1410                   atime=atime, mtime=mtime)
1411   return True
1412
1413
1414 def WriteSsconfFiles(values):
1415   """Update all ssconf files.
1416
1417   Wrapper around the SimpleStore.WriteFiles.
1418
1419   """
1420   ssconf.SimpleStore().WriteFiles(values)
1421
1422
1423 def _ErrnoOrStr(err):
1424   """Format an EnvironmentError exception.
1425
1426   If the L{err} argument has an errno attribute, it will be looked up
1427   and converted into a textual C{E...} description. Otherwise the
1428   string representation of the error will be returned.
1429
1430   @type err: L{EnvironmentError}
1431   @param err: the exception to format
1432
1433   """
1434   if hasattr(err, 'errno'):
1435     detail = errno.errorcode[err.errno]
1436   else:
1437     detail = str(err)
1438   return detail
1439
1440
1441 def _OSOndiskVersion(name, os_dir):
1442   """Compute and return the API version of a given OS.
1443
1444   This function will try to read the API version of the OS given by
1445   the 'name' parameter and residing in the 'os_dir' directory.
1446
1447   @type name: str
1448   @param name: the OS name we should look for
1449   @type os_dir: str
1450   @param os_dir: the directory inwhich we should look for the OS
1451   @rtype: int or None
1452   @return:
1453       Either an integer denoting the version or None in the
1454       case when this is not a valid OS name.
1455   @raise errors.InvalidOS: if the OS cannot be found
1456
1457   """
1458   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1459
1460   try:
1461     st = os.stat(api_file)
1462   except EnvironmentError, err:
1463     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1464                            " found (%s)" % _ErrnoOrStr(err))
1465
1466   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1467     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1468                            " a regular file")
1469
1470   try:
1471     f = open(api_file)
1472     try:
1473       api_versions = f.readlines()
1474     finally:
1475       f.close()
1476   except EnvironmentError, err:
1477     raise errors.InvalidOS(name, os_dir, "error while reading the"
1478                            " API version (%s)" % _ErrnoOrStr(err))
1479
1480   api_versions = [version.strip() for version in api_versions]
1481   try:
1482     api_versions = [int(version) for version in api_versions]
1483   except (TypeError, ValueError), err:
1484     raise errors.InvalidOS(name, os_dir,
1485                            "API version is not integer (%s)" % str(err))
1486
1487   return api_versions
1488
1489
1490 def DiagnoseOS(top_dirs=None):
1491   """Compute the validity for all OSes.
1492
1493   @type top_dirs: list
1494   @param top_dirs: the list of directories in which to
1495       search (if not given defaults to
1496       L{constants.OS_SEARCH_PATH})
1497   @rtype: list of L{objects.OS}
1498   @return: an OS object for each name in all the given
1499       directories
1500
1501   """
1502   if top_dirs is None:
1503     top_dirs = constants.OS_SEARCH_PATH
1504
1505   result = []
1506   for dir_name in top_dirs:
1507     if os.path.isdir(dir_name):
1508       try:
1509         f_names = utils.ListVisibleFiles(dir_name)
1510       except EnvironmentError, err:
1511         logging.exception("Can't list the OS directory %s", dir_name)
1512         break
1513       for name in f_names:
1514         try:
1515           os_inst = OSFromDisk(name, base_dir=dir_name)
1516           result.append(os_inst)
1517         except errors.InvalidOS, err:
1518           result.append(objects.OS.FromInvalidOS(err))
1519
1520   return result
1521
1522
1523 def OSFromDisk(name, base_dir=None):
1524   """Create an OS instance from disk.
1525
1526   This function will return an OS instance if the given name is a
1527   valid OS name. Otherwise, it will raise an appropriate
1528   L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1529
1530   @type base_dir: string
1531   @keyword base_dir: Base directory containing OS installations.
1532                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1533   @rtype: L{objects.OS}
1534   @return: the OS instance if we find a valid one
1535   @raise errors.InvalidOS: if we don't find a valid OS
1536
1537   """
1538   if base_dir is None:
1539     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1540     if os_dir is None:
1541       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1542   else:
1543     os_dir = os.path.sep.join([base_dir, name])
1544
1545   api_versions = _OSOndiskVersion(name, os_dir)
1546
1547   if constants.OS_API_VERSION not in api_versions:
1548     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1549                            " (found %s want %s)"
1550                            % (api_versions, constants.OS_API_VERSION))
1551
1552   # OS Scripts dictionary, we will populate it with the actual script names
1553   os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1554
1555   for script in os_scripts:
1556     os_scripts[script] = os.path.sep.join([os_dir, script])
1557
1558     try:
1559       st = os.stat(os_scripts[script])
1560     except EnvironmentError, err:
1561       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1562                              (script, _ErrnoOrStr(err)))
1563
1564     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1565       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1566                              script)
1567
1568     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1569       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1570                              script)
1571
1572
1573   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1574                     create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1575                     export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1576                     import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1577                     rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1578                     api_versions=api_versions)
1579
1580 def OSEnvironment(instance, debug=0):
1581   """Calculate the environment for an os script.
1582
1583   @type instance: L{objects.Instance}
1584   @param instance: target instance for the os script run
1585   @type debug: integer
1586   @param debug: debug level (0 or 1, for OS Api 10)
1587   @rtype: dict
1588   @return: dict of environment variables
1589   @raise errors.BlockDeviceError: if the block device
1590       cannot be found
1591
1592   """
1593   result = {}
1594   result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1595   result['INSTANCE_NAME'] = instance.name
1596   result['HYPERVISOR'] = instance.hypervisor
1597   result['DISK_COUNT'] = '%d' % len(instance.disks)
1598   result['NIC_COUNT'] = '%d' % len(instance.nics)
1599   result['DEBUG_LEVEL'] = '%d' % debug
1600   for idx, disk in enumerate(instance.disks):
1601     real_disk = _RecursiveFindBD(disk)
1602     if real_disk is None:
1603       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1604                                     str(disk))
1605     real_disk.Open()
1606     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1607     # FIXME: When disks will have read-only mode, populate this
1608     result['DISK_%d_ACCESS' % idx] = 'W'
1609     if constants.HV_DISK_TYPE in instance.hvparams:
1610       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1611         instance.hvparams[constants.HV_DISK_TYPE]
1612     if disk.dev_type in constants.LDS_BLOCK:
1613       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1614     elif disk.dev_type == constants.LD_FILE:
1615       result['DISK_%d_BACKEND_TYPE' % idx] = \
1616         'file:%s' % disk.physical_id[0]
1617   for idx, nic in enumerate(instance.nics):
1618     result['NIC_%d_MAC' % idx] = nic.mac
1619     if nic.ip:
1620       result['NIC_%d_IP' % idx] = nic.ip
1621     result['NIC_%d_BRIDGE' % idx] = nic.bridge
1622     if constants.HV_NIC_TYPE in instance.hvparams:
1623       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1624         instance.hvparams[constants.HV_NIC_TYPE]
1625
1626   return result
1627
1628 def GrowBlockDevice(disk, amount):
1629   """Grow a stack of block devices.
1630
1631   This function is called recursively, with the childrens being the
1632   first ones to resize.
1633
1634   @type disk: L{objects.Disk}
1635   @param disk: the disk to be grown
1636   @rtype: (status, result)
1637   @return: a tuple with the status of the operation
1638       (True/False), and the errors message if status
1639       is False
1640
1641   """
1642   r_dev = _RecursiveFindBD(disk)
1643   if r_dev is None:
1644     return False, "Cannot find block device %s" % (disk,)
1645
1646   try:
1647     r_dev.Grow(amount)
1648   except errors.BlockDeviceError, err:
1649     return False, str(err)
1650
1651   return True, None
1652
1653
1654 def SnapshotBlockDevice(disk):
1655   """Create a snapshot copy of a block device.
1656
1657   This function is called recursively, and the snapshot is actually created
1658   just for the leaf lvm backend device.
1659
1660   @type disk: L{objects.Disk}
1661   @param disk: the disk to be snapshotted
1662   @rtype: string
1663   @return: snapshot disk path
1664
1665   """
1666   if disk.children:
1667     if len(disk.children) == 1:
1668       # only one child, let's recurse on it
1669       return SnapshotBlockDevice(disk.children[0])
1670     else:
1671       # more than one child, choose one that matches
1672       for child in disk.children:
1673         if child.size == disk.size:
1674           # return implies breaking the loop
1675           return SnapshotBlockDevice(child)
1676   elif disk.dev_type == constants.LD_LV:
1677     r_dev = _RecursiveFindBD(disk)
1678     if r_dev is not None:
1679       # let's stay on the safe side and ask for the full size, for now
1680       return r_dev.Snapshot(disk.size)
1681     else:
1682       return None
1683   else:
1684     raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1685                                  " '%s' of type '%s'" %
1686                                  (disk.unique_id, disk.dev_type))
1687
1688
1689 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1690   """Export a block device snapshot to a remote node.
1691
1692   @type disk: L{objects.Disk}
1693   @param disk: the description of the disk to export
1694   @type dest_node: str
1695   @param dest_node: the destination node to export to
1696   @type instance: L{objects.Instance}
1697   @param instance: the instance object to whom the disk belongs
1698   @type cluster_name: str
1699   @param cluster_name: the cluster name, needed for SSH hostalias
1700   @type idx: int
1701   @param idx: the index of the disk in the instance's disk list,
1702       used to export to the OS scripts environment
1703   @rtype: boolean
1704   @return: the success of the operation
1705
1706   """
1707   export_env = OSEnvironment(instance)
1708
1709   inst_os = OSFromDisk(instance.os)
1710   export_script = inst_os.export_script
1711
1712   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1713                                      instance.name, int(time.time()))
1714   if not os.path.exists(constants.LOG_OS_DIR):
1715     os.mkdir(constants.LOG_OS_DIR, 0750)
1716   real_disk = _RecursiveFindBD(disk)
1717   if real_disk is None:
1718     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1719                                   str(disk))
1720   real_disk.Open()
1721
1722   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1723   export_env['EXPORT_INDEX'] = str(idx)
1724
1725   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1726   destfile = disk.physical_id[1]
1727
1728   # the target command is built out of three individual commands,
1729   # which are joined by pipes; we check each individual command for
1730   # valid parameters
1731   expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1732                                export_script, logfile)
1733
1734   comprcmd = "gzip"
1735
1736   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1737                                 destdir, destdir, destfile)
1738   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1739                                                    constants.GANETI_RUNAS,
1740                                                    destcmd)
1741
1742   # all commands have been checked, so we're safe to combine them
1743   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1744
1745   result = utils.RunCmd(command, env=export_env)
1746
1747   if result.failed:
1748     logging.error("os snapshot export command '%s' returned error: %s"
1749                   " output: %s", command, result.fail_reason, result.output)
1750     return False
1751
1752   return True
1753
1754
1755 def FinalizeExport(instance, snap_disks):
1756   """Write out the export configuration information.
1757
1758   @type instance: L{objects.Instance}
1759   @param instance: the instance which we export, used for
1760       saving configuration
1761   @type snap_disks: list of L{objects.Disk}
1762   @param snap_disks: list of snapshot block devices, which
1763       will be used to get the actual name of the dump file
1764
1765   @rtype: boolean
1766   @return: the success of the operation
1767
1768   """
1769   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1770   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1771
1772   config = objects.SerializableConfigParser()
1773
1774   config.add_section(constants.INISECT_EXP)
1775   config.set(constants.INISECT_EXP, 'version', '0')
1776   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1777   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1778   config.set(constants.INISECT_EXP, 'os', instance.os)
1779   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1780
1781   config.add_section(constants.INISECT_INS)
1782   config.set(constants.INISECT_INS, 'name', instance.name)
1783   config.set(constants.INISECT_INS, 'memory', '%d' %
1784              instance.beparams[constants.BE_MEMORY])
1785   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1786              instance.beparams[constants.BE_VCPUS])
1787   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1788
1789   nic_total = 0
1790   for nic_count, nic in enumerate(instance.nics):
1791     nic_total += 1
1792     config.set(constants.INISECT_INS, 'nic%d_mac' %
1793                nic_count, '%s' % nic.mac)
1794     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1795     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1796                '%s' % nic.bridge)
1797   # TODO: redundant: on load can read nics until it doesn't exist
1798   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1799
1800   disk_total = 0
1801   for disk_count, disk in enumerate(snap_disks):
1802     if disk:
1803       disk_total += 1
1804       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1805                  ('%s' % disk.iv_name))
1806       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1807                  ('%s' % disk.physical_id[1]))
1808       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1809                  ('%d' % disk.size))
1810
1811   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1812
1813   utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1814                   data=config.Dumps())
1815   shutil.rmtree(finaldestdir, True)
1816   shutil.move(destdir, finaldestdir)
1817
1818   return True
1819
1820
1821 def ExportInfo(dest):
1822   """Get export configuration information.
1823
1824   @type dest: str
1825   @param dest: directory containing the export
1826
1827   @rtype: L{objects.SerializableConfigParser}
1828   @return: a serializable config file containing the
1829       export info
1830
1831   """
1832   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1833
1834   config = objects.SerializableConfigParser()
1835   config.read(cff)
1836
1837   if (not config.has_section(constants.INISECT_EXP) or
1838       not config.has_section(constants.INISECT_INS)):
1839     return None
1840
1841   return config
1842
1843
1844 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1845   """Import an os image into an instance.
1846
1847   @type instance: L{objects.Instance}
1848   @param instance: instance to import the disks into
1849   @type src_node: string
1850   @param src_node: source node for the disk images
1851   @type src_images: list of string
1852   @param src_images: absolute paths of the disk images
1853   @rtype: list of boolean
1854   @return: each boolean represent the success of importing the n-th disk
1855
1856   """
1857   import_env = OSEnvironment(instance)
1858   inst_os = OSFromDisk(instance.os)
1859   import_script = inst_os.import_script
1860
1861   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1862                                         instance.name, int(time.time()))
1863   if not os.path.exists(constants.LOG_OS_DIR):
1864     os.mkdir(constants.LOG_OS_DIR, 0750)
1865
1866   comprcmd = "gunzip"
1867   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1868                                import_script, logfile)
1869
1870   final_result = []
1871   for idx, image in enumerate(src_images):
1872     if image:
1873       destcmd = utils.BuildShellCmd('cat %s', image)
1874       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1875                                                        constants.GANETI_RUNAS,
1876                                                        destcmd)
1877       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1878       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1879       import_env['IMPORT_INDEX'] = str(idx)
1880       result = utils.RunCmd(command, env=import_env)
1881       if result.failed:
1882         logging.error("Disk import command '%s' returned error: %s"
1883                       " output: %s", command, result.fail_reason,
1884                       result.output)
1885         final_result.append(False)
1886       else:
1887         final_result.append(True)
1888     else:
1889       final_result.append(True)
1890
1891   return final_result
1892
1893
1894 def ListExports():
1895   """Return a list of exports currently available on this machine.
1896
1897   @rtype: list
1898   @return: list of the exports
1899
1900   """
1901   if os.path.isdir(constants.EXPORT_DIR):
1902     return utils.ListVisibleFiles(constants.EXPORT_DIR)
1903   else:
1904     return []
1905
1906
1907 def RemoveExport(export):
1908   """Remove an existing export from the node.
1909
1910   @type export: str
1911   @param export: the name of the export to remove
1912   @rtype: boolean
1913   @return: the success of the operation
1914
1915   """
1916   target = os.path.join(constants.EXPORT_DIR, export)
1917
1918   shutil.rmtree(target)
1919   # TODO: catch some of the relevant exceptions and provide a pretty
1920   # error message if rmtree fails.
1921
1922   return True
1923
1924
1925 def RenameBlockDevices(devlist):
1926   """Rename a list of block devices.
1927
1928   @type devlist: list of tuples
1929   @param devlist: list of tuples of the form  (disk,
1930       new_logical_id, new_physical_id); disk is an
1931       L{objects.Disk} object describing the current disk,
1932       and new logical_id/physical_id is the name we
1933       rename it to
1934   @rtype: boolean
1935   @return: True if all renames succeeded, False otherwise
1936
1937   """
1938   result = True
1939   for disk, unique_id in devlist:
1940     dev = _RecursiveFindBD(disk)
1941     if dev is None:
1942       result = False
1943       continue
1944     try:
1945       old_rpath = dev.dev_path
1946       dev.Rename(unique_id)
1947       new_rpath = dev.dev_path
1948       if old_rpath != new_rpath:
1949         DevCacheManager.RemoveCache(old_rpath)
1950         # FIXME: we should add the new cache information here, like:
1951         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1952         # but we don't have the owner here - maybe parse from existing
1953         # cache? for now, we only lose lvm data when we rename, which
1954         # is less critical than DRBD or MD
1955     except errors.BlockDeviceError, err:
1956       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1957       result = False
1958   return result
1959
1960
1961 def _TransformFileStorageDir(file_storage_dir):
1962   """Checks whether given file_storage_dir is valid.
1963
1964   Checks wheter the given file_storage_dir is within the cluster-wide
1965   default file_storage_dir stored in SimpleStore. Only paths under that
1966   directory are allowed.
1967
1968   @type file_storage_dir: str
1969   @param file_storage_dir: the path to check
1970
1971   @return: the normalized path if valid, None otherwise
1972
1973   """
1974   cfg = _GetConfig()
1975   file_storage_dir = os.path.normpath(file_storage_dir)
1976   base_file_storage_dir = cfg.GetFileStorageDir()
1977   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1978       base_file_storage_dir):
1979     logging.error("file storage directory '%s' is not under base file"
1980                   " storage directory '%s'",
1981                   file_storage_dir, base_file_storage_dir)
1982     return None
1983   return file_storage_dir
1984
1985
1986 def CreateFileStorageDir(file_storage_dir):
1987   """Create file storage directory.
1988
1989   @type file_storage_dir: str
1990   @param file_storage_dir: directory to create
1991
1992   @rtype: tuple
1993   @return: tuple with first element a boolean indicating wheter dir
1994       creation was successful or not
1995
1996   """
1997   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1998   result = True,
1999   if not file_storage_dir:
2000     result = False,
2001   else:
2002     if os.path.exists(file_storage_dir):
2003       if not os.path.isdir(file_storage_dir):
2004         logging.error("'%s' is not a directory", file_storage_dir)
2005         result = False,
2006     else:
2007       try:
2008         os.makedirs(file_storage_dir, 0750)
2009       except OSError, err:
2010         logging.error("Cannot create file storage directory '%s': %s",
2011                       file_storage_dir, err)
2012         result = False,
2013   return result
2014
2015
2016 def RemoveFileStorageDir(file_storage_dir):
2017   """Remove file storage directory.
2018
2019   Remove it only if it's empty. If not log an error and return.
2020
2021   @type file_storage_dir: str
2022   @param file_storage_dir: the directory we should cleanup
2023   @rtype: tuple (success,)
2024   @return: tuple of one element, C{success}, denoting
2025       whether the operation was successfull
2026
2027   """
2028   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2029   result = True,
2030   if not file_storage_dir:
2031     result = False,
2032   else:
2033     if os.path.exists(file_storage_dir):
2034       if not os.path.isdir(file_storage_dir):
2035         logging.error("'%s' is not a directory", file_storage_dir)
2036         result = False,
2037       # deletes dir only if empty, otherwise we want to return False
2038       try:
2039         os.rmdir(file_storage_dir)
2040       except OSError, err:
2041         logging.exception("Cannot remove file storage directory '%s'",
2042                           file_storage_dir)
2043         result = False,
2044   return result
2045
2046
2047 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2048   """Rename the file storage directory.
2049
2050   @type old_file_storage_dir: str
2051   @param old_file_storage_dir: the current path
2052   @type new_file_storage_dir: str
2053   @param new_file_storage_dir: the name we should rename to
2054   @rtype: tuple (success,)
2055   @return: tuple of one element, C{success}, denoting
2056       whether the operation was successful
2057
2058   """
2059   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2060   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2061   result = True,
2062   if not old_file_storage_dir or not new_file_storage_dir:
2063     result = False,
2064   else:
2065     if not os.path.exists(new_file_storage_dir):
2066       if os.path.isdir(old_file_storage_dir):
2067         try:
2068           os.rename(old_file_storage_dir, new_file_storage_dir)
2069         except OSError, err:
2070           logging.exception("Cannot rename '%s' to '%s'",
2071                             old_file_storage_dir, new_file_storage_dir)
2072           result =  False,
2073       else:
2074         logging.error("'%s' is not a directory", old_file_storage_dir)
2075         result = False,
2076     else:
2077       if os.path.exists(old_file_storage_dir):
2078         logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
2079                       old_file_storage_dir, new_file_storage_dir)
2080         result = False,
2081   return result
2082
2083
2084 def _IsJobQueueFile(file_name):
2085   """Checks whether the given filename is in the queue directory.
2086
2087   @type file_name: str
2088   @param file_name: the file name we should check
2089   @rtype: boolean
2090   @return: whether the file is under the queue directory
2091
2092   """
2093   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2094   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2095
2096   if not result:
2097     logging.error("'%s' is not a file in the queue directory",
2098                   file_name)
2099
2100   return result
2101
2102
2103 def JobQueueUpdate(file_name, content):
2104   """Updates a file in the queue directory.
2105
2106   This is just a wrapper over L{utils.WriteFile}, with proper
2107   checking.
2108
2109   @type file_name: str
2110   @param file_name: the job file name
2111   @type content: str
2112   @param content: the new job contents
2113   @rtype: boolean
2114   @return: the success of the operation
2115
2116   """
2117   if not _IsJobQueueFile(file_name):
2118     return False
2119
2120   # Write and replace the file atomically
2121   utils.WriteFile(file_name, data=_Decompress(content))
2122
2123   return True
2124
2125
2126 def JobQueueRename(old, new):
2127   """Renames a job queue file.
2128
2129   This is just a wrapper over os.rename with proper checking.
2130
2131   @type old: str
2132   @param old: the old (actual) file name
2133   @type new: str
2134   @param new: the desired file name
2135   @rtype: boolean
2136   @return: the success of the operation
2137
2138   """
2139   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2140     return False
2141
2142   utils.RenameFile(old, new, mkdir=True)
2143
2144   return True
2145
2146
2147 def JobQueueSetDrainFlag(drain_flag):
2148   """Set the drain flag for the queue.
2149
2150   This will set or unset the queue drain flag.
2151
2152   @type drain_flag: boolean
2153   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2154   @rtype: boolean
2155   @return: always True
2156   @warning: the function always returns True
2157
2158   """
2159   if drain_flag:
2160     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2161   else:
2162     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2163
2164   return True
2165
2166
2167 def CloseBlockDevices(instance_name, disks):
2168   """Closes the given block devices.
2169
2170   This means they will be switched to secondary mode (in case of
2171   DRBD).
2172
2173   @param instance_name: if the argument is not empty, the symlinks
2174       of this instance will be removed
2175   @type disks: list of L{objects.Disk}
2176   @param disks: the list of disks to be closed
2177   @rtype: tuple (success, message)
2178   @return: a tuple of success and message, where success
2179       indicates the succes of the operation, and message
2180       which will contain the error details in case we
2181       failed
2182
2183   """
2184   bdevs = []
2185   for cf in disks:
2186     rd = _RecursiveFindBD(cf)
2187     if rd is None:
2188       return (False, "Can't find device %s" % cf)
2189     bdevs.append(rd)
2190
2191   msg = []
2192   for rd in bdevs:
2193     try:
2194       rd.Close()
2195     except errors.BlockDeviceError, err:
2196       msg.append(str(err))
2197   if msg:
2198     return (False, "Can't make devices secondary: %s" % ",".join(msg))
2199   else:
2200     if instance_name:
2201       _RemoveBlockDevLinks(instance_name, disks)
2202     return (True, "All devices secondary")
2203
2204
2205 def ValidateHVParams(hvname, hvparams):
2206   """Validates the given hypervisor parameters.
2207
2208   @type hvname: string
2209   @param hvname: the hypervisor name
2210   @type hvparams: dict
2211   @param hvparams: the hypervisor parameters to be validated
2212   @rtype: tuple (success, message)
2213   @return: a tuple of success and message, where success
2214       indicates the succes of the operation, and message
2215       which will contain the error details in case we
2216       failed
2217
2218   """
2219   try:
2220     hv_type = hypervisor.GetHypervisor(hvname)
2221     hv_type.ValidateParameters(hvparams)
2222     return (True, "Validation passed")
2223   except errors.HypervisorError, err:
2224     return (False, str(err))
2225
2226
2227 def DemoteFromMC():
2228   """Demotes the current node from master candidate role.
2229
2230   """
2231   # try to ensure we're not the master by mistake
2232   master, myself = ssconf.GetMasterAndMyself()
2233   if master == myself:
2234     return (False, "ssconf status shows I'm the master node, will not demote")
2235   pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2236   if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2237     return (False, "The master daemon is running, will not demote")
2238   try:
2239     utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2240   except EnvironmentError, err:
2241     if err.errno != errno.ENOENT:
2242       return (False, "Error while backing up cluster file: %s" % str(err))
2243   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2244   return (True, "Done")
2245
2246
2247 def _FindDisks(nodes_ip, disks):
2248   """Sets the physical ID on disks and returns the block devices.
2249
2250   """
2251   # set the correct physical ID
2252   my_name = utils.HostInfo().name
2253   for cf in disks:
2254     cf.SetPhysicalID(my_name, nodes_ip)
2255
2256   bdevs = []
2257
2258   for cf in disks:
2259     rd = _RecursiveFindBD(cf)
2260     if rd is None:
2261       return (False, "Can't find device %s" % cf)
2262     bdevs.append(rd)
2263   return (True, bdevs)
2264
2265
2266 def DrbdDisconnectNet(nodes_ip, disks):
2267   """Disconnects the network on a list of drbd devices.
2268
2269   """
2270   status, bdevs = _FindDisks(nodes_ip, disks)
2271   if not status:
2272     return status, bdevs
2273
2274   # disconnect disks
2275   for rd in bdevs:
2276     try:
2277       rd.DisconnectNet()
2278     except errors.BlockDeviceError, err:
2279       logging.exception("Failed to go into standalone mode")
2280       return (False, "Can't change network configuration: %s" % str(err))
2281   return (True, "All disks are now disconnected")
2282
2283
2284 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2285   """Attaches the network on a list of drbd devices.
2286
2287   """
2288   status, bdevs = _FindDisks(nodes_ip, disks)
2289   if not status:
2290     return status, bdevs
2291
2292   if multimaster:
2293     for idx, rd in enumerate(bdevs):
2294       try:
2295         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2296       except EnvironmentError, err:
2297         return (False, "Can't create symlink: %s" % str(err))
2298   # reconnect disks, switch to new master configuration and if
2299   # needed primary mode
2300   for rd in bdevs:
2301     try:
2302       rd.AttachNet(multimaster)
2303     except errors.BlockDeviceError, err:
2304       return (False, "Can't change network configuration: %s" % str(err))
2305   # wait until the disks are connected; we need to retry the re-attach
2306   # if the device becomes standalone, as this might happen if the one
2307   # node disconnects and reconnects in a different mode before the
2308   # other node reconnects; in this case, one or both of the nodes will
2309   # decide it has wrong configuration and switch to standalone
2310   RECONNECT_TIMEOUT = 2 * 60
2311   sleep_time = 0.100 # start with 100 miliseconds
2312   timeout_limit = time.time() + RECONNECT_TIMEOUT
2313   while time.time() < timeout_limit:
2314     all_connected = True
2315     for rd in bdevs:
2316       stats = rd.GetProcStatus()
2317       if not (stats.is_connected or stats.is_in_resync):
2318         all_connected = False
2319       if stats.is_standalone:
2320         # peer had different config info and this node became
2321         # standalone, even though this should not happen with the
2322         # new staged way of changing disk configs
2323         try:
2324           rd.ReAttachNet(multimaster)
2325         except errors.BlockDeviceError, err:
2326           return (False, "Can't change network configuration: %s" % str(err))
2327     if all_connected:
2328       break
2329     time.sleep(sleep_time)
2330     sleep_time = min(5, sleep_time * 1.5)
2331   if not all_connected:
2332     return (False, "Timeout in disk reconnecting")
2333   if multimaster:
2334     # change to primary mode
2335     for rd in bdevs:
2336       rd.Open()
2337   if multimaster:
2338     msg = "multi-master and primary"
2339   else:
2340     msg = "single-master"
2341   return (True, "Disks are now configured as %s" % msg)
2342
2343
2344 def DrbdWaitSync(nodes_ip, disks):
2345   """Wait until DRBDs have synchronized.
2346
2347   """
2348   status, bdevs = _FindDisks(nodes_ip, disks)
2349   if not status:
2350     return status, bdevs
2351
2352   min_resync = 100
2353   alldone = True
2354   failure = False
2355   for rd in bdevs:
2356     stats = rd.GetProcStatus()
2357     if not (stats.is_connected or stats.is_in_resync):
2358       failure = True
2359       break
2360     alldone = alldone and (not stats.is_in_resync)
2361     if stats.sync_percent is not None:
2362       min_resync = min(min_resync, stats.sync_percent)
2363   return (not failure, (alldone, min_resync))
2364
2365
2366 class HooksRunner(object):
2367   """Hook runner.
2368
2369   This class is instantiated on the node side (ganeti-noded) and not
2370   on the master side.
2371
2372   """
2373   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2374
2375   def __init__(self, hooks_base_dir=None):
2376     """Constructor for hooks runner.
2377
2378     @type hooks_base_dir: str or None
2379     @param hooks_base_dir: if not None, this overrides the
2380         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2381
2382     """
2383     if hooks_base_dir is None:
2384       hooks_base_dir = constants.HOOKS_BASE_DIR
2385     self._BASE_DIR = hooks_base_dir
2386
2387   @staticmethod
2388   def ExecHook(script, env):
2389     """Exec one hook script.
2390
2391     @type script: str
2392     @param script: the full path to the script
2393     @type env: dict
2394     @param env: the environment with which to exec the script
2395     @rtype: tuple (success, message)
2396     @return: a tuple of success and message, where success
2397         indicates the succes of the operation, and message
2398         which will contain the error details in case we
2399         failed
2400
2401     """
2402     # exec the process using subprocess and log the output
2403     fdstdin = None
2404     try:
2405       fdstdin = open("/dev/null", "r")
2406       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2407                                stderr=subprocess.STDOUT, close_fds=True,
2408                                shell=False, cwd="/", env=env)
2409       output = ""
2410       try:
2411         output = child.stdout.read(4096)
2412         child.stdout.close()
2413       except EnvironmentError, err:
2414         output += "Hook script error: %s" % str(err)
2415
2416       while True:
2417         try:
2418           result = child.wait()
2419           break
2420         except EnvironmentError, err:
2421           if err.errno == errno.EINTR:
2422             continue
2423           raise
2424     finally:
2425       # try not to leak fds
2426       for fd in (fdstdin, ):
2427         if fd is not None:
2428           try:
2429             fd.close()
2430           except EnvironmentError, err:
2431             # just log the error
2432             #logging.exception("Error while closing fd %s", fd)
2433             pass
2434
2435     return result == 0, output
2436
2437   def RunHooks(self, hpath, phase, env):
2438     """Run the scripts in the hooks directory.
2439
2440     @type hpath: str
2441     @param hpath: the path to the hooks directory which
2442         holds the scripts
2443     @type phase: str
2444     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2445         L{constants.HOOKS_PHASE_POST}
2446     @type env: dict
2447     @param env: dictionary with the environment for the hook
2448     @rtype: list
2449     @return: list of 3-element tuples:
2450       - script path
2451       - script result, either L{constants.HKR_SUCCESS} or
2452         L{constants.HKR_FAIL}
2453       - output of the script
2454
2455     @raise errors.ProgrammerError: for invalid input
2456         parameters
2457
2458     """
2459     if phase == constants.HOOKS_PHASE_PRE:
2460       suffix = "pre"
2461     elif phase == constants.HOOKS_PHASE_POST:
2462       suffix = "post"
2463     else:
2464       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2465     rr = []
2466
2467     subdir = "%s-%s.d" % (hpath, suffix)
2468     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2469     try:
2470       dir_contents = utils.ListVisibleFiles(dir_name)
2471     except OSError, err:
2472       # FIXME: must log output in case of failures
2473       return rr
2474
2475     # we use the standard python sort order,
2476     # so 00name is the recommended naming scheme
2477     dir_contents.sort()
2478     for relname in dir_contents:
2479       fname = os.path.join(dir_name, relname)
2480       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2481           self.RE_MASK.match(relname) is not None):
2482         rrval = constants.HKR_SKIP
2483         output = ""
2484       else:
2485         result, output = self.ExecHook(fname, env)
2486         if not result:
2487           rrval = constants.HKR_FAIL
2488         else:
2489           rrval = constants.HKR_SUCCESS
2490       rr.append(("%s/%s" % (subdir, relname), rrval, output))
2491
2492     return rr
2493
2494
2495 class IAllocatorRunner(object):
2496   """IAllocator runner.
2497
2498   This class is instantiated on the node side (ganeti-noded) and not on
2499   the master side.
2500
2501   """
2502   def Run(self, name, idata):
2503     """Run an iallocator script.
2504
2505     @type name: str
2506     @param name: the iallocator script name
2507     @type idata: str
2508     @param idata: the allocator input data
2509
2510     @rtype: tuple
2511     @return: four element tuple of:
2512        - run status (one of the IARUN_ constants)
2513        - stdout
2514        - stderr
2515        - fail reason (as from L{utils.RunResult})
2516
2517     """
2518     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2519                                   os.path.isfile)
2520     if alloc_script is None:
2521       return (constants.IARUN_NOTFOUND, None, None, None)
2522
2523     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2524     try:
2525       os.write(fd, idata)
2526       os.close(fd)
2527       result = utils.RunCmd([alloc_script, fin_name])
2528       if result.failed:
2529         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2530                 result.fail_reason)
2531     finally:
2532       os.unlink(fin_name)
2533
2534     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2535
2536
2537 class DevCacheManager(object):
2538   """Simple class for managing a cache of block device information.
2539
2540   """
2541   _DEV_PREFIX = "/dev/"
2542   _ROOT_DIR = constants.BDEV_CACHE_DIR
2543
2544   @classmethod
2545   def _ConvertPath(cls, dev_path):
2546     """Converts a /dev/name path to the cache file name.
2547
2548     This replaces slashes with underscores and strips the /dev
2549     prefix. It then returns the full path to the cache file.
2550
2551     @type dev_path: str
2552     @param dev_path: the C{/dev/} path name
2553     @rtype: str
2554     @return: the converted path name
2555
2556     """
2557     if dev_path.startswith(cls._DEV_PREFIX):
2558       dev_path = dev_path[len(cls._DEV_PREFIX):]
2559     dev_path = dev_path.replace("/", "_")
2560     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2561     return fpath
2562
2563   @classmethod
2564   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2565     """Updates the cache information for a given device.
2566
2567     @type dev_path: str
2568     @param dev_path: the pathname of the device
2569     @type owner: str
2570     @param owner: the owner (instance name) of the device
2571     @type on_primary: bool
2572     @param on_primary: whether this is the primary
2573         node nor not
2574     @type iv_name: str
2575     @param iv_name: the instance-visible name of the
2576         device, as in objects.Disk.iv_name
2577
2578     @rtype: None
2579
2580     """
2581     if dev_path is None:
2582       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2583       return
2584     fpath = cls._ConvertPath(dev_path)
2585     if on_primary:
2586       state = "primary"
2587     else:
2588       state = "secondary"
2589     if iv_name is None:
2590       iv_name = "not_visible"
2591     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2592     try:
2593       utils.WriteFile(fpath, data=fdata)
2594     except EnvironmentError, err:
2595       logging.exception("Can't update bdev cache for %s", dev_path)
2596
2597   @classmethod
2598   def RemoveCache(cls, dev_path):
2599     """Remove data for a dev_path.
2600
2601     This is just a wrapper over L{utils.RemoveFile} with a converted
2602     path name and logging.
2603
2604     @type dev_path: str
2605     @param dev_path: the pathname of the device
2606
2607     @rtype: None
2608
2609     """
2610     if dev_path is None:
2611       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2612       return
2613     fpath = cls._ConvertPath(dev_path)
2614     try:
2615       utils.RemoveFile(fpath)
2616     except EnvironmentError, err:
2617       logging.exception("Can't update bdev cache for %s", dev_path)