Fix the _RemoveBlockDevLinks() function
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36 import zlib
37 import base64
38
39 from ganeti import errors
40 from ganeti import utils
41 from ganeti import ssh
42 from ganeti import hypervisor
43 from ganeti import constants
44 from ganeti import bdev
45 from ganeti import objects
46 from ganeti import ssconf
47
48
49 def _GetConfig():
50   """Simple wrapper to return a SimpleStore.
51
52   @rtype: L{ssconf.SimpleStore}
53   @return: a SimpleStore instance
54
55   """
56   return ssconf.SimpleStore()
57
58
59 def _GetSshRunner(cluster_name):
60   """Simple wrapper to return an SshRunner.
61
62   @type cluster_name: str
63   @param cluster_name: the cluster name, which is needed
64       by the SshRunner constructor
65   @rtype: L{ssh.SshRunner}
66   @return: an SshRunner instance
67
68   """
69   return ssh.SshRunner(cluster_name)
70
71
72 def _Decompress(data):
73   """Unpacks data compressed by the RPC client.
74
75   @type data: list or tuple
76   @param data: Data sent by RPC client
77   @rtype: str
78   @return: Decompressed data
79
80   """
81   assert isinstance(data, (list, tuple))
82   assert len(data) == 2
83   (encoding, content) = data
84   if encoding == constants.RPC_ENCODING_NONE:
85     return content
86   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
87     return zlib.decompress(base64.b64decode(content))
88   else:
89     raise AssertionError("Unknown data encoding")
90
91
92 def _CleanDirectory(path, exclude=None):
93   """Removes all regular files in a directory.
94
95   @type path: str
96   @param path: the directory to clean
97   @type exclude: list
98   @param exclude: list of files to be excluded, defaults
99       to the empty list
100
101   """
102   if not os.path.isdir(path):
103     return
104   if exclude is None:
105     exclude = []
106   else:
107     # Normalize excluded paths
108     exclude = [os.path.normpath(i) for i in exclude]
109
110   for rel_name in utils.ListVisibleFiles(path):
111     full_name = os.path.normpath(os.path.join(path, rel_name))
112     if full_name in exclude:
113       continue
114     if os.path.isfile(full_name) and not os.path.islink(full_name):
115       utils.RemoveFile(full_name)
116
117
118 def JobQueuePurge():
119   """Removes job queue files and archived jobs.
120
121   @rtype: None
122
123   """
124   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
125   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
126
127
128 def GetMasterInfo():
129   """Returns master information.
130
131   This is an utility function to compute master information, either
132   for consumption here or from the node daemon.
133
134   @rtype: tuple
135   @return: (master_netdev, master_ip, master_name) if we have a good
136       configuration, otherwise (None, None, None)
137
138   """
139   try:
140     cfg = _GetConfig()
141     master_netdev = cfg.GetMasterNetdev()
142     master_ip = cfg.GetMasterIP()
143     master_node = cfg.GetMasterNode()
144   except errors.ConfigurationError, err:
145     logging.exception("Cluster configuration incomplete")
146     return (None, None, None)
147   return (master_netdev, master_ip, master_node)
148
149
150 def StartMaster(start_daemons):
151   """Activate local node as master node.
152
153   The function will always try activate the IP address of the master
154   (unless someone else has it). It will also start the master daemons,
155   based on the start_daemons parameter.
156
157   @type start_daemons: boolean
158   @param start_daemons: whther to also start the master
159       daemons (ganeti-masterd and ganeti-rapi)
160   @rtype: None
161
162   """
163   ok = True
164   master_netdev, master_ip, _ = GetMasterInfo()
165   if not master_netdev:
166     return False
167
168   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
169     if utils.OwnIpAddress(master_ip):
170       # we already have the ip:
171       logging.debug("Already started")
172     else:
173       logging.error("Someone else has the master ip, not activating")
174       ok = False
175   else:
176     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
177                            "dev", master_netdev, "label",
178                            "%s:0" % master_netdev])
179     if result.failed:
180       logging.error("Can't activate master IP: %s", result.output)
181       ok = False
182
183     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
184                            "-s", master_ip, master_ip])
185     # we'll ignore the exit code of arping
186
187   # and now start the master and rapi daemons
188   if start_daemons:
189     for daemon in 'ganeti-masterd', 'ganeti-rapi':
190       result = utils.RunCmd([daemon])
191       if result.failed:
192         logging.error("Can't start daemon %s: %s", daemon, result.output)
193         ok = False
194   return ok
195
196
197 def StopMaster(stop_daemons):
198   """Deactivate this node as master.
199
200   The function will always try to deactivate the IP address of the
201   master. It will also stop the master daemons depending on the
202   stop_daemons parameter.
203
204   @type stop_daemons: boolean
205   @param stop_daemons: whether to also stop the master daemons
206       (ganeti-masterd and ganeti-rapi)
207   @rtype: None
208
209   """
210   master_netdev, master_ip, _ = GetMasterInfo()
211   if not master_netdev:
212     return False
213
214   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
215                          "dev", master_netdev])
216   if result.failed:
217     logging.error("Can't remove the master IP, error: %s", result.output)
218     # but otherwise ignore the failure
219
220   if stop_daemons:
221     # stop/kill the rapi and the master daemon
222     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
223       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
224
225   return True
226
227
228 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
229   """Joins this node to the cluster.
230
231   This does the following:
232       - updates the hostkeys of the machine (rsa and dsa)
233       - adds the ssh private key to the user
234       - adds the ssh public key to the users' authorized_keys file
235
236   @type dsa: str
237   @param dsa: the DSA private key to write
238   @type dsapub: str
239   @param dsapub: the DSA public key to write
240   @type rsa: str
241   @param rsa: the RSA private key to write
242   @type rsapub: str
243   @param rsapub: the RSA public key to write
244   @type sshkey: str
245   @param sshkey: the SSH private key to write
246   @type sshpub: str
247   @param sshpub: the SSH public key to write
248   @rtype: boolean
249   @return: the success of the operation
250
251   """
252   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
253                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
254                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
255                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
256   for name, content, mode in sshd_keys:
257     utils.WriteFile(name, data=content, mode=mode)
258
259   try:
260     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
261                                                     mkdir=True)
262   except errors.OpExecError, err:
263     logging.exception("Error while processing user ssh files")
264     return False
265
266   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
267     utils.WriteFile(name, data=content, mode=0600)
268
269   utils.AddAuthorizedKey(auth_keys, sshpub)
270
271   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
272
273   return True
274
275
276 def LeaveCluster():
277   """Cleans up and remove the current node.
278
279   This function cleans up and prepares the current node to be removed
280   from the cluster.
281
282   If processing is successful, then it raises an
283   L{errors.QuitGanetiException} which is used as a special case to
284   shutdown the node daemon.
285
286   """
287   _CleanDirectory(constants.DATA_DIR)
288   JobQueuePurge()
289
290   try:
291     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
292   except errors.OpExecError:
293     logging.exception("Error while processing ssh files")
294     return
295
296   f = open(pub_key, 'r')
297   try:
298     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
299   finally:
300     f.close()
301
302   utils.RemoveFile(priv_key)
303   utils.RemoveFile(pub_key)
304
305   # Return a reassuring string to the caller, and quit
306   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
307
308
309 def GetNodeInfo(vgname, hypervisor_type):
310   """Gives back a hash with different informations about the node.
311
312   @type vgname: C{string}
313   @param vgname: the name of the volume group to ask for disk space information
314   @type hypervisor_type: C{str}
315   @param hypervisor_type: the name of the hypervisor to ask for
316       memory information
317   @rtype: C{dict}
318   @return: dictionary with the following keys:
319       - vg_size is the size of the configured volume group in MiB
320       - vg_free is the free size of the volume group in MiB
321       - memory_dom0 is the memory allocated for domain0 in MiB
322       - memory_free is the currently available (free) ram in MiB
323       - memory_total is the total number of ram in MiB
324
325   """
326   outputarray = {}
327   vginfo = _GetVGInfo(vgname)
328   outputarray['vg_size'] = vginfo['vg_size']
329   outputarray['vg_free'] = vginfo['vg_free']
330
331   hyper = hypervisor.GetHypervisor(hypervisor_type)
332   hyp_info = hyper.GetNodeInfo()
333   if hyp_info is not None:
334     outputarray.update(hyp_info)
335
336   f = open("/proc/sys/kernel/random/boot_id", 'r')
337   try:
338     outputarray["bootid"] = f.read(128).rstrip("\n")
339   finally:
340     f.close()
341
342   return outputarray
343
344
345 def VerifyNode(what, cluster_name):
346   """Verify the status of the local node.
347
348   Based on the input L{what} parameter, various checks are done on the
349   local node.
350
351   If the I{filelist} key is present, this list of
352   files is checksummed and the file/checksum pairs are returned.
353
354   If the I{nodelist} key is present, we check that we have
355   connectivity via ssh with the target nodes (and check the hostname
356   report).
357
358   If the I{node-net-test} key is present, we check that we have
359   connectivity to the given nodes via both primary IP and, if
360   applicable, secondary IPs.
361
362   @type what: C{dict}
363   @param what: a dictionary of things to check:
364       - filelist: list of files for which to compute checksums
365       - nodelist: list of nodes we should check ssh communication with
366       - node-net-test: list of nodes we should check node daemon port
367         connectivity with
368       - hypervisor: list with hypervisors to run the verify for
369   @rtype: dict
370   @return: a dictionary with the same keys as the input dict, and
371       values representing the result of the checks
372
373   """
374   result = {}
375
376   if constants.NV_HYPERVISOR in what:
377     result[constants.NV_HYPERVISOR] = tmp = {}
378     for hv_name in what[constants.NV_HYPERVISOR]:
379       tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
380
381   if constants.NV_FILELIST in what:
382     result[constants.NV_FILELIST] = utils.FingerprintFiles(
383       what[constants.NV_FILELIST])
384
385   if constants.NV_NODELIST in what:
386     result[constants.NV_NODELIST] = tmp = {}
387     random.shuffle(what[constants.NV_NODELIST])
388     for node in what[constants.NV_NODELIST]:
389       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
390       if not success:
391         tmp[node] = message
392
393   if constants.NV_NODENETTEST in what:
394     result[constants.NV_NODENETTEST] = tmp = {}
395     my_name = utils.HostInfo().name
396     my_pip = my_sip = None
397     for name, pip, sip in what[constants.NV_NODENETTEST]:
398       if name == my_name:
399         my_pip = pip
400         my_sip = sip
401         break
402     if not my_pip:
403       tmp[my_name] = ("Can't find my own primary/secondary IP"
404                       " in the node list")
405     else:
406       port = utils.GetNodeDaemonPort()
407       for name, pip, sip in what[constants.NV_NODENETTEST]:
408         fail = []
409         if not utils.TcpPing(pip, port, source=my_pip):
410           fail.append("primary")
411         if sip != pip:
412           if not utils.TcpPing(sip, port, source=my_sip):
413             fail.append("secondary")
414         if fail:
415           tmp[name] = ("failure using the %s interface(s)" %
416                        " and ".join(fail))
417
418   if constants.NV_LVLIST in what:
419     result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
420
421   if constants.NV_INSTANCELIST in what:
422     result[constants.NV_INSTANCELIST] = GetInstanceList(
423       what[constants.NV_INSTANCELIST])
424
425   if constants.NV_VGLIST in what:
426     result[constants.NV_VGLIST] = ListVolumeGroups()
427
428   if constants.NV_VERSION in what:
429     result[constants.NV_VERSION] = constants.PROTOCOL_VERSION
430
431   if constants.NV_HVINFO in what:
432     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
433     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
434
435   return result
436
437
438 def GetVolumeList(vg_name):
439   """Compute list of logical volumes and their size.
440
441   @type vg_name: str
442   @param vg_name: the volume group whose LVs we should list
443   @rtype: dict
444   @return:
445       dictionary of all partions (key) with value being a tuple of
446       their size (in MiB), inactive and online status::
447
448         {'test1': ('20.06', True, True)}
449
450       in case of errors, a string is returned with the error
451       details.
452
453   """
454   lvs = {}
455   sep = '|'
456   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
457                          "--separator=%s" % sep,
458                          "-olv_name,lv_size,lv_attr", vg_name])
459   if result.failed:
460     logging.error("Failed to list logical volumes, lvs output: %s",
461                   result.output)
462     return result.output
463
464   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
465   for line in result.stdout.splitlines():
466     line = line.strip()
467     match = valid_line_re.match(line)
468     if not match:
469       logging.error("Invalid line returned from lvs output: '%s'", line)
470       continue
471     name, size, attr = match.groups()
472     inactive = attr[4] == '-'
473     online = attr[5] == 'o'
474     lvs[name] = (size, inactive, online)
475
476   return lvs
477
478
479 def ListVolumeGroups():
480   """List the volume groups and their size.
481
482   @rtype: dict
483   @return: dictionary with keys volume name and values the
484       size of the volume
485
486   """
487   return utils.ListVolumeGroups()
488
489
490 def NodeVolumes():
491   """List all volumes on this node.
492
493   @rtype: list
494   @return:
495     A list of dictionaries, each having four keys:
496       - name: the logical volume name,
497       - size: the size of the logical volume
498       - dev: the physical device on which the LV lives
499       - vg: the volume group to which it belongs
500
501     In case of errors, we return an empty list and log the
502     error.
503
504     Note that since a logical volume can live on multiple physical
505     volumes, the resulting list might include a logical volume
506     multiple times.
507
508   """
509   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
510                          "--separator=|",
511                          "--options=lv_name,lv_size,devices,vg_name"])
512   if result.failed:
513     logging.error("Failed to list logical volumes, lvs output: %s",
514                   result.output)
515     return []
516
517   def parse_dev(dev):
518     if '(' in dev:
519       return dev.split('(')[0]
520     else:
521       return dev
522
523   def map_line(line):
524     return {
525       'name': line[0].strip(),
526       'size': line[1].strip(),
527       'dev': parse_dev(line[2].strip()),
528       'vg': line[3].strip(),
529     }
530
531   return [map_line(line.split('|')) for line in result.stdout.splitlines()
532           if line.count('|') >= 3]
533
534
535 def BridgesExist(bridges_list):
536   """Check if a list of bridges exist on the current node.
537
538   @rtype: boolean
539   @return: C{True} if all of them exist, C{False} otherwise
540
541   """
542   for bridge in bridges_list:
543     if not utils.BridgeExists(bridge):
544       return False
545
546   return True
547
548
549 def GetInstanceList(hypervisor_list):
550   """Provides a list of instances.
551
552   @type hypervisor_list: list
553   @param hypervisor_list: the list of hypervisors to query information
554
555   @rtype: list
556   @return: a list of all running instances on the current node
557     - instance1.example.com
558     - instance2.example.com
559
560   """
561   results = []
562   for hname in hypervisor_list:
563     try:
564       names = hypervisor.GetHypervisor(hname).ListInstances()
565       results.extend(names)
566     except errors.HypervisorError, err:
567       logging.exception("Error enumerating instances for hypevisor %s", hname)
568       # FIXME: should we somehow not propagate this to the master?
569       raise
570
571   return results
572
573
574 def GetInstanceInfo(instance, hname):
575   """Gives back the informations about an instance as a dictionary.
576
577   @type instance: string
578   @param instance: the instance name
579   @type hname: string
580   @param hname: the hypervisor type of the instance
581
582   @rtype: dict
583   @return: dictionary with the following keys:
584       - memory: memory size of instance (int)
585       - state: xen state of instance (string)
586       - time: cpu time of instance (float)
587
588   """
589   output = {}
590
591   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
592   if iinfo is not None:
593     output['memory'] = iinfo[2]
594     output['state'] = iinfo[4]
595     output['time'] = iinfo[5]
596
597   return output
598
599
600 def GetAllInstancesInfo(hypervisor_list):
601   """Gather data about all instances.
602
603   This is the equivalent of L{GetInstanceInfo}, except that it
604   computes data for all instances at once, thus being faster if one
605   needs data about more than one instance.
606
607   @type hypervisor_list: list
608   @param hypervisor_list: list of hypervisors to query for instance data
609
610   @rtype: dict
611   @return: dictionary of instance: data, with data having the following keys:
612       - memory: memory size of instance (int)
613       - state: xen state of instance (string)
614       - time: cpu time of instance (float)
615       - vcpus: the number of vcpus
616
617   """
618   output = {}
619
620   for hname in hypervisor_list:
621     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
622     if iinfo:
623       for name, inst_id, memory, vcpus, state, times in iinfo:
624         value = {
625           'memory': memory,
626           'vcpus': vcpus,
627           'state': state,
628           'time': times,
629           }
630         if name in output and output[name] != value:
631           raise errors.HypervisorError("Instance %s running duplicate"
632                                        " with different parameters" % name)
633         output[name] = value
634
635   return output
636
637
638 def AddOSToInstance(instance):
639   """Add an OS to an instance.
640
641   @type instance: L{objects.Instance}
642   @param instance: Instance whose OS is to be installed
643   @rtype: boolean
644   @return: the success of the operation
645
646   """
647   inst_os = OSFromDisk(instance.os)
648
649   create_env = OSEnvironment(instance)
650
651   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
652                                      instance.name, int(time.time()))
653
654   result = utils.RunCmd([inst_os.create_script], env=create_env,
655                         cwd=inst_os.path, output=logfile,)
656   if result.failed:
657     logging.error("os create command '%s' returned error: %s, logfile: %s,"
658                   " output: %s", result.cmd, result.fail_reason, logfile,
659                   result.output)
660     return False
661
662   return True
663
664
665 def RunRenameInstance(instance, old_name):
666   """Run the OS rename script for an instance.
667
668   @type instance: L{objects.Instance}
669   @param instance: Instance whose OS is to be installed
670   @type old_name: string
671   @param old_name: previous instance name
672   @rtype: boolean
673   @return: the success of the operation
674
675   """
676   inst_os = OSFromDisk(instance.os)
677
678   rename_env = OSEnvironment(instance)
679   rename_env['OLD_INSTANCE_NAME'] = old_name
680
681   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
682                                            old_name,
683                                            instance.name, int(time.time()))
684
685   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
686                         cwd=inst_os.path, output=logfile)
687
688   if result.failed:
689     logging.error("os create command '%s' returned error: %s output: %s",
690                   result.cmd, result.fail_reason, result.output)
691     return False
692
693   return True
694
695
696 def _GetVGInfo(vg_name):
697   """Get informations about the volume group.
698
699   @type vg_name: str
700   @param vg_name: the volume group which we query
701   @rtype: dict
702   @return:
703     A dictionary with the following keys:
704       - C{vg_size} is the total size of the volume group in MiB
705       - C{vg_free} is the free size of the volume group in MiB
706       - C{pv_count} are the number of physical disks in that VG
707
708     If an error occurs during gathering of data, we return the same dict
709     with keys all set to None.
710
711   """
712   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
713
714   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
715                          "--nosuffix", "--units=m", "--separator=:", vg_name])
716
717   if retval.failed:
718     logging.error("volume group %s not present", vg_name)
719     return retdic
720   valarr = retval.stdout.strip().rstrip(':').split(':')
721   if len(valarr) == 3:
722     try:
723       retdic = {
724         "vg_size": int(round(float(valarr[0]), 0)),
725         "vg_free": int(round(float(valarr[1]), 0)),
726         "pv_count": int(valarr[2]),
727         }
728     except ValueError, err:
729       logging.exception("Fail to parse vgs output")
730   else:
731     logging.error("vgs output has the wrong number of fields (expected"
732                   " three): %s", str(valarr))
733   return retdic
734
735
736 def _SymlinkBlockDev(instance_name, device_path, device_name):
737   """Set up symlinks to a instance's block device.
738
739   This is an auxiliary function run when an instance is start (on the primary
740   node) or when an instance is migrated (on the target node).
741
742   Args:
743     instance_name: the name of the target instance
744     device_path: path of the physical block device, on the node
745     device_name: 'virtual' name of the device
746
747   Returns:
748     absolute path to the disk's symlink
749
750   """
751   link_basename = "%s-%s" % (instance_name, device_name)
752   link_name = os.path.join(constants.DISK_LINKS_DIR, link_basename)
753   try:
754     os.symlink(device_path, link_name)
755   except OSError, e:
756     if e.errno == errno.EEXIST:
757       if (not os.path.islink(link_name) or
758           os.readlink(link_name) != device_path):
759         os.remove(link_name)
760         os.symlink(device_path, link_name)
761     else:
762       raise
763
764   return link_name
765
766
767 def _RemoveBlockDevLinks(instance_name):
768   """Remove the block device symlinks belonging to the given instance.
769
770   """
771   for short_name in os.listdir(constants.DISK_LINKS_DIR):
772     link_name = os.path.join(constants.DISK_LINKS_DIR, short_name)
773     if (os.path.islink(link_name) and
774         short_name.startswith('%s-' % instance_name)):
775       try:
776         os.remove(link_name)
777       except OSError:
778         logging.exception("Can't remove symlink '%s'", link_name)
779
780
781 def _GatherAndLinkBlockDevs(instance):
782   """Set up an instance's block device(s).
783
784   This is run on the primary node at instance startup. The block
785   devices must be already assembled.
786
787   @type instance: L{objects.Instance}
788   @param instance: the instance whose disks we shoul assemble
789   @rtype: list
790   @return: list of (disk_object, device_path)
791
792   """
793   block_devices = []
794   for idx, disk in enumerate(instance.disks):
795     device = _RecursiveFindBD(disk)
796     if device is None:
797       raise errors.BlockDeviceError("Block device '%s' is not set up." %
798                                     str(disk))
799     device.Open()
800     try:
801       link_name = _SymlinkBlockDev(instance.name, device.dev_path,
802                                    "disk%d" % idx)
803     except OSError, e:
804       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
805                                     e.strerror)
806
807     block_devices.append((disk, link_name))
808
809   return block_devices
810
811
812 def StartInstance(instance, extra_args):
813   """Start an instance.
814
815   @type instance: L{objects.Instance}
816   @param instance: the instance object
817   @rtype: boolean
818   @return: whether the startup was successful or not
819
820   """
821   running_instances = GetInstanceList([instance.hypervisor])
822
823   if instance.name in running_instances:
824     return True
825
826   try:
827     block_devices = _GatherAndLinkBlockDevs(instance)
828     hyper = hypervisor.GetHypervisor(instance.hypervisor)
829     hyper.StartInstance(instance, block_devices, extra_args)
830   except errors.BlockDeviceError, err:
831     logging.exception("Failed to start instance")
832     return False
833   except errors.HypervisorError, err:
834     logging.exception("Failed to start instance")
835     _RemoveBlockDevLinks(instance.name)
836     return False
837
838   return True
839
840
841 def ShutdownInstance(instance):
842   """Shut an instance down.
843
844   @note: this functions uses polling with a hardcoded timeout.
845
846   @type instance: L{objects.Instance}
847   @param instance: the instance object
848   @rtype: boolean
849   @return: whether the startup was successful or not
850
851   """
852   hv_name = instance.hypervisor
853   running_instances = GetInstanceList([hv_name])
854
855   if instance.name not in running_instances:
856     return True
857
858   hyper = hypervisor.GetHypervisor(hv_name)
859   try:
860     hyper.StopInstance(instance)
861   except errors.HypervisorError, err:
862     logging.error("Failed to stop instance")
863     return False
864
865   # test every 10secs for 2min
866
867   time.sleep(1)
868   for dummy in range(11):
869     if instance.name not in GetInstanceList([hv_name]):
870       break
871     time.sleep(10)
872   else:
873     # the shutdown did not succeed
874     logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
875
876     try:
877       hyper.StopInstance(instance, force=True)
878     except errors.HypervisorError, err:
879       logging.exception("Failed to stop instance")
880       return False
881
882     time.sleep(1)
883     if instance.name in GetInstanceList([hv_name]):
884       logging.error("could not shutdown instance '%s' even by destroy",
885                     instance.name)
886       return False
887
888   _RemoveBlockDevLinks(instance.name)
889
890   return True
891
892
893 def RebootInstance(instance, reboot_type, extra_args):
894   """Reboot an instance.
895
896   @type instance: L{objects.Instance}
897   @param instance: the instance object to reboot
898   @type reboot_type: str
899   @param reboot_type: the type of reboot, one the following
900     constants:
901       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
902         instance OS, do not recreate the VM
903       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
904         restart the VM (at the hypervisor level)
905       - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
906         is not accepted here, since that mode is handled
907         differently
908   @rtype: boolean
909   @return: the success of the operation
910
911   """
912   running_instances = GetInstanceList([instance.hypervisor])
913
914   if instance.name not in running_instances:
915     logging.error("Cannot reboot instance that is not running")
916     return False
917
918   hyper = hypervisor.GetHypervisor(instance.hypervisor)
919   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
920     try:
921       hyper.RebootInstance(instance)
922     except errors.HypervisorError, err:
923       logging.exception("Failed to soft reboot instance")
924       return False
925   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
926     try:
927       ShutdownInstance(instance)
928       StartInstance(instance, extra_args)
929     except errors.HypervisorError, err:
930       logging.exception("Failed to hard reboot instance")
931       return False
932   else:
933     raise errors.ParameterError("reboot_type invalid")
934
935   return True
936
937
938 def MigrateInstance(instance, target, live):
939   """Migrates an instance to another node.
940
941   @type instance: L{objects.Instance}
942   @param instance: the instance definition
943   @type target: string
944   @param target: the target node name
945   @type live: boolean
946   @param live: whether the migration should be done live or not (the
947       interpretation of this parameter is left to the hypervisor)
948   @rtype: tuple
949   @return: a tuple of (success, msg) where:
950       - succes is a boolean denoting the success/failure of the operation
951       - msg is a string with details in case of failure
952
953   """
954   hyper = hypervisor.GetHypervisor(instance.hypervisor_name)
955
956   try:
957     hyper.MigrateInstance(instance.name, target, live)
958   except errors.HypervisorError, err:
959     msg = "Failed to migrate instance: %s" % str(err)
960     logging.error(msg)
961     return (False, msg)
962   return (True, "Migration successfull")
963
964
965 def CreateBlockDevice(disk, size, owner, on_primary, info):
966   """Creates a block device for an instance.
967
968   @type disk: L{objects.Disk}
969   @param disk: the object describing the disk we should create
970   @type size: int
971   @param size: the size of the physical underlying device, in MiB
972   @type owner: str
973   @param owner: the name of the instance for which disk is created,
974       used for device cache data
975   @type on_primary: boolean
976   @param on_primary:  indicates if it is the primary node or not
977   @type info: string
978   @param info: string that will be sent to the physical device
979       creation, used for example to set (LVM) tags on LVs
980
981   @return: the new unique_id of the device (this can sometime be
982       computed only after creation), or None. On secondary nodes,
983       it's not required to return anything.
984
985   """
986   clist = []
987   if disk.children:
988     for child in disk.children:
989       crdev = _RecursiveAssembleBD(child, owner, on_primary)
990       if on_primary or disk.AssembleOnSecondary():
991         # we need the children open in case the device itself has to
992         # be assembled
993         crdev.Open()
994       clist.append(crdev)
995   try:
996     device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
997     if device is not None:
998       logging.info("removing existing device %s", disk)
999       device.Remove()
1000   except errors.BlockDeviceError, err:
1001     pass
1002
1003   device = bdev.Create(disk.dev_type, disk.physical_id,
1004                        clist, size)
1005   if device is None:
1006     raise ValueError("Can't create child device for %s, %s" %
1007                      (disk, size))
1008   if on_primary or disk.AssembleOnSecondary():
1009     if not device.Assemble():
1010       errorstring = "Can't assemble device after creation"
1011       logging.error(errorstring)
1012       raise errors.BlockDeviceError("%s, very unusual event - check the node"
1013                                     " daemon logs" % errorstring)
1014     device.SetSyncSpeed(constants.SYNC_SPEED)
1015     if on_primary or disk.OpenOnSecondary():
1016       device.Open(force=True)
1017     DevCacheManager.UpdateCache(device.dev_path, owner,
1018                                 on_primary, disk.iv_name)
1019
1020   device.SetInfo(info)
1021
1022   physical_id = device.unique_id
1023   return physical_id
1024
1025
1026 def RemoveBlockDevice(disk):
1027   """Remove a block device.
1028
1029   @note: This is intended to be called recursively.
1030
1031   @type disk: L{objects.Disk}
1032   @param disk: the disk object we should remove
1033   @rtype: boolean
1034   @return: the success of the operation
1035
1036   """
1037   try:
1038     rdev = _RecursiveFindBD(disk)
1039   except errors.BlockDeviceError, err:
1040     # probably can't attach
1041     logging.info("Can't attach to device %s in remove", disk)
1042     rdev = None
1043   if rdev is not None:
1044     r_path = rdev.dev_path
1045     result = rdev.Remove()
1046     if result:
1047       DevCacheManager.RemoveCache(r_path)
1048   else:
1049     result = True
1050   if disk.children:
1051     for child in disk.children:
1052       result = result and RemoveBlockDevice(child)
1053   return result
1054
1055
1056 def _RecursiveAssembleBD(disk, owner, as_primary):
1057   """Activate a block device for an instance.
1058
1059   This is run on the primary and secondary nodes for an instance.
1060
1061   @note: this function is called recursively.
1062
1063   @type disk: L{objects.Disk}
1064   @param disk: the disk we try to assemble
1065   @type owner: str
1066   @param owner: the name of the instance which owns the disk
1067   @type as_primary: boolean
1068   @param as_primary: if we should make the block device
1069       read/write
1070
1071   @return: the assembled device or None (in case no device
1072       was assembled)
1073   @raise errors.BlockDeviceError: in case there is an error
1074       during the activation of the children or the device
1075       itself
1076
1077   """
1078   children = []
1079   if disk.children:
1080     mcn = disk.ChildrenNeeded()
1081     if mcn == -1:
1082       mcn = 0 # max number of Nones allowed
1083     else:
1084       mcn = len(disk.children) - mcn # max number of Nones
1085     for chld_disk in disk.children:
1086       try:
1087         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1088       except errors.BlockDeviceError, err:
1089         if children.count(None) >= mcn:
1090           raise
1091         cdev = None
1092         logging.debug("Error in child activation: %s", str(err))
1093       children.append(cdev)
1094
1095   if as_primary or disk.AssembleOnSecondary():
1096     r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
1097     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1098     result = r_dev
1099     if as_primary or disk.OpenOnSecondary():
1100       r_dev.Open()
1101     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1102                                 as_primary, disk.iv_name)
1103
1104   else:
1105     result = True
1106   return result
1107
1108
1109 def AssembleBlockDevice(disk, owner, as_primary):
1110   """Activate a block device for an instance.
1111
1112   This is a wrapper over _RecursiveAssembleBD.
1113
1114   @rtype: str or boolean
1115   @return: a C{/dev/...} path for primary nodes, and
1116       C{True} for secondary nodes
1117
1118   """
1119   result = _RecursiveAssembleBD(disk, owner, as_primary)
1120   if isinstance(result, bdev.BlockDev):
1121     result = result.dev_path
1122   return result
1123
1124
1125 def ShutdownBlockDevice(disk):
1126   """Shut down a block device.
1127
1128   First, if the device is assembled (Attach() is successfull), then
1129   the device is shutdown. Then the children of the device are
1130   shutdown.
1131
1132   This function is called recursively. Note that we don't cache the
1133   children or such, as oppossed to assemble, shutdown of different
1134   devices doesn't require that the upper device was active.
1135
1136   @type disk: L{objects.Disk}
1137   @param disk: the description of the disk we should
1138       shutdown
1139   @rtype: boolean
1140   @return: the success of the operation
1141
1142   """
1143   r_dev = _RecursiveFindBD(disk)
1144   if r_dev is not None:
1145     r_path = r_dev.dev_path
1146     result = r_dev.Shutdown()
1147     if result:
1148       DevCacheManager.RemoveCache(r_path)
1149   else:
1150     result = True
1151   if disk.children:
1152     for child in disk.children:
1153       result = result and ShutdownBlockDevice(child)
1154   return result
1155
1156
1157 def MirrorAddChildren(parent_cdev, new_cdevs):
1158   """Extend a mirrored block device.
1159
1160   @type parent_cdev: L{objects.Disk}
1161   @param parent_cdev: the disk to which we should add children
1162   @type new_cdevs: list of L{objects.Disk}
1163   @param new_cdevs: the list of children which we should add
1164   @rtype: boolean
1165   @return: the success of the operation
1166
1167   """
1168   parent_bdev = _RecursiveFindBD(parent_cdev)
1169   if parent_bdev is None:
1170     logging.error("Can't find parent device")
1171     return False
1172   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1173   if new_bdevs.count(None) > 0:
1174     logging.error("Can't find new device(s) to add: %s:%s",
1175                   new_bdevs, new_cdevs)
1176     return False
1177   parent_bdev.AddChildren(new_bdevs)
1178   return True
1179
1180
1181 def MirrorRemoveChildren(parent_cdev, new_cdevs):
1182   """Shrink a mirrored block device.
1183
1184   @type parent_cdev: L{objects.Disk}
1185   @param parent_cdev: the disk from which we should remove children
1186   @type new_cdevs: list of L{objects.Disk}
1187   @param new_cdevs: the list of children which we should remove
1188   @rtype: boolean
1189   @return: the success of the operation
1190
1191   """
1192   parent_bdev = _RecursiveFindBD(parent_cdev)
1193   if parent_bdev is None:
1194     logging.error("Can't find parent in remove children: %s", parent_cdev)
1195     return False
1196   devs = []
1197   for disk in new_cdevs:
1198     rpath = disk.StaticDevPath()
1199     if rpath is None:
1200       bd = _RecursiveFindBD(disk)
1201       if bd is None:
1202         logging.error("Can't find dynamic device %s while removing children",
1203                       disk)
1204         return False
1205       else:
1206         devs.append(bd.dev_path)
1207     else:
1208       devs.append(rpath)
1209   parent_bdev.RemoveChildren(devs)
1210   return True
1211
1212
1213 def GetMirrorStatus(disks):
1214   """Get the mirroring status of a list of devices.
1215
1216   @type disks: list of L{objects.Disk}
1217   @param disks: the list of disks which we should query
1218   @rtype: disk
1219   @return:
1220       a list of (mirror_done, estimated_time) tuples, which
1221       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1222   @raise errors.BlockDeviceError: if any of the disks cannot be
1223       found
1224
1225   """
1226   stats = []
1227   for dsk in disks:
1228     rbd = _RecursiveFindBD(dsk)
1229     if rbd is None:
1230       raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1231     stats.append(rbd.CombinedSyncStatus())
1232   return stats
1233
1234
1235 def _RecursiveFindBD(disk):
1236   """Check if a device is activated.
1237
1238   If so, return informations about the real device.
1239
1240   @type disk: L{objects.Disk}
1241   @param disk: the disk object we need to find
1242
1243   @return: None if the device can't be found,
1244       otherwise the device instance
1245
1246   """
1247   children = []
1248   if disk.children:
1249     for chdisk in disk.children:
1250       children.append(_RecursiveFindBD(chdisk))
1251
1252   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1253
1254
1255 def FindBlockDevice(disk):
1256   """Check if a device is activated.
1257
1258   If it is, return informations about the real device.
1259
1260   @type disk: L{objects.Disk}
1261   @param disk: the disk to find
1262   @rtype: None or tuple
1263   @return: None if the disk cannot be found, otherwise a
1264       tuple (device_path, major, minor, sync_percent,
1265       estimated_time, is_degraded)
1266
1267   """
1268   rbd = _RecursiveFindBD(disk)
1269   if rbd is None:
1270     return rbd
1271   return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1272
1273
1274 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1275   """Write a file to the filesystem.
1276
1277   This allows the master to overwrite(!) a file. It will only perform
1278   the operation if the file belongs to a list of configuration files.
1279
1280   @type file_name: str
1281   @param file_name: the target file name
1282   @type data: str
1283   @param data: the new contents of the file
1284   @type mode: int
1285   @param mode: the mode to give the file (can be None)
1286   @type uid: int
1287   @param uid: the owner of the file (can be -1 for default)
1288   @type gid: int
1289   @param gid: the group of the file (can be -1 for default)
1290   @type atime: float
1291   @param atime: the atime to set on the file (can be None)
1292   @type mtime: float
1293   @param mtime: the mtime to set on the file (can be None)
1294   @rtype: boolean
1295   @return: the success of the operation; errors are logged
1296       in the node daemon log
1297
1298   """
1299   if not os.path.isabs(file_name):
1300     logging.error("Filename passed to UploadFile is not absolute: '%s'",
1301                   file_name)
1302     return False
1303
1304   allowed_files = [
1305     constants.CLUSTER_CONF_FILE,
1306     constants.ETC_HOSTS,
1307     constants.SSH_KNOWN_HOSTS_FILE,
1308     constants.VNC_PASSWORD_FILE,
1309     ]
1310
1311   if file_name not in allowed_files:
1312     logging.error("Filename passed to UploadFile not in allowed"
1313                  " upload targets: '%s'", file_name)
1314     return False
1315
1316   raw_data = _Decompress(data)
1317
1318   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1319                   atime=atime, mtime=mtime)
1320   return True
1321
1322
1323 def WriteSsconfFiles(values):
1324   """Update all ssconf files.
1325
1326   Wrapper around the SimpleStore.WriteFiles.
1327
1328   """
1329   ssconf.SimpleStore().WriteFiles(values)
1330
1331
1332 def _ErrnoOrStr(err):
1333   """Format an EnvironmentError exception.
1334
1335   If the L{err} argument has an errno attribute, it will be looked up
1336   and converted into a textual C{E...} description. Otherwise the
1337   string representation of the error will be returned.
1338
1339   @type err: L{EnvironmentError}
1340   @param err: the exception to format
1341
1342   """
1343   if hasattr(err, 'errno'):
1344     detail = errno.errorcode[err.errno]
1345   else:
1346     detail = str(err)
1347   return detail
1348
1349
1350 def _OSOndiskVersion(name, os_dir):
1351   """Compute and return the API version of a given OS.
1352
1353   This function will try to read the API version of the OS given by
1354   the 'name' parameter and residing in the 'os_dir' directory.
1355
1356   @type name: str
1357   @param name: the OS name we should look for
1358   @type os_dir: str
1359   @param os_dir: the directory inwhich we should look for the OS
1360   @rtype: int or None
1361   @return:
1362       Either an integer denoting the version or None in the
1363       case when this is not a valid OS name.
1364   @raise errors.InvalidOS: if the OS cannot be found
1365
1366   """
1367   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1368
1369   try:
1370     st = os.stat(api_file)
1371   except EnvironmentError, err:
1372     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1373                            " found (%s)" % _ErrnoOrStr(err))
1374
1375   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1376     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1377                            " a regular file")
1378
1379   try:
1380     f = open(api_file)
1381     try:
1382       api_versions = f.readlines()
1383     finally:
1384       f.close()
1385   except EnvironmentError, err:
1386     raise errors.InvalidOS(name, os_dir, "error while reading the"
1387                            " API version (%s)" % _ErrnoOrStr(err))
1388
1389   api_versions = [version.strip() for version in api_versions]
1390   try:
1391     api_versions = [int(version) for version in api_versions]
1392   except (TypeError, ValueError), err:
1393     raise errors.InvalidOS(name, os_dir,
1394                            "API version is not integer (%s)" % str(err))
1395
1396   return api_versions
1397
1398
1399 def DiagnoseOS(top_dirs=None):
1400   """Compute the validity for all OSes.
1401
1402   @type top_dirs: list
1403   @param top_dirs: the list of directories in which to
1404       search (if not given defaults to
1405       L{constants.OS_SEARCH_PATH})
1406   @rtype: list of L{objects.OS}
1407   @return: an OS object for each name in all the given
1408       directories
1409
1410   """
1411   if top_dirs is None:
1412     top_dirs = constants.OS_SEARCH_PATH
1413
1414   result = []
1415   for dir_name in top_dirs:
1416     if os.path.isdir(dir_name):
1417       try:
1418         f_names = utils.ListVisibleFiles(dir_name)
1419       except EnvironmentError, err:
1420         logging.exception("Can't list the OS directory %s", dir_name)
1421         break
1422       for name in f_names:
1423         try:
1424           os_inst = OSFromDisk(name, base_dir=dir_name)
1425           result.append(os_inst)
1426         except errors.InvalidOS, err:
1427           result.append(objects.OS.FromInvalidOS(err))
1428
1429   return result
1430
1431
1432 def OSFromDisk(name, base_dir=None):
1433   """Create an OS instance from disk.
1434
1435   This function will return an OS instance if the given name is a
1436   valid OS name. Otherwise, it will raise an appropriate
1437   L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1438
1439   @type base_dir: string
1440   @keyword base_dir: Base directory containing OS installations.
1441                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1442   @rtype: L{objects.OS}
1443   @return: the OS instance if we find a valid one
1444   @raise errors.InvalidOS: if we don't find a valid OS
1445
1446   """
1447   if base_dir is None:
1448     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1449     if os_dir is None:
1450       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1451   else:
1452     os_dir = os.path.sep.join([base_dir, name])
1453
1454   api_versions = _OSOndiskVersion(name, os_dir)
1455
1456   if constants.OS_API_VERSION not in api_versions:
1457     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1458                            " (found %s want %s)"
1459                            % (api_versions, constants.OS_API_VERSION))
1460
1461   # OS Scripts dictionary, we will populate it with the actual script names
1462   os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1463
1464   for script in os_scripts:
1465     os_scripts[script] = os.path.sep.join([os_dir, script])
1466
1467     try:
1468       st = os.stat(os_scripts[script])
1469     except EnvironmentError, err:
1470       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1471                              (script, _ErrnoOrStr(err)))
1472
1473     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1474       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1475                              script)
1476
1477     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1478       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1479                              script)
1480
1481
1482   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1483                     create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1484                     export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1485                     import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1486                     rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1487                     api_versions=api_versions)
1488
1489 def OSEnvironment(instance, debug=0):
1490   """Calculate the environment for an os script.
1491
1492   @type instance: L{objects.Instance}
1493   @param instance: target instance for the os script run
1494   @type debug: integer
1495   @param debug: debug level (0 or 1, for OS Api 10)
1496   @rtype: dict
1497   @return: dict of environment variables
1498   @raise errors.BlockDeviceError: if the block device
1499       cannot be found
1500
1501   """
1502   result = {}
1503   result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1504   result['INSTANCE_NAME'] = instance.name
1505   result['HYPERVISOR'] = instance.hypervisor
1506   result['DISK_COUNT'] = '%d' % len(instance.disks)
1507   result['NIC_COUNT'] = '%d' % len(instance.nics)
1508   result['DEBUG_LEVEL'] = '%d' % debug
1509   for idx, disk in enumerate(instance.disks):
1510     real_disk = _RecursiveFindBD(disk)
1511     if real_disk is None:
1512       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1513                                     str(disk))
1514     real_disk.Open()
1515     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1516     # FIXME: When disks will have read-only mode, populate this
1517     result['DISK_%d_ACCESS' % idx] = 'W'
1518     if constants.HV_DISK_TYPE in instance.hvparams:
1519       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1520         instance.hvparams[constants.HV_DISK_TYPE]
1521     if disk.dev_type in constants.LDS_BLOCK:
1522       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1523     elif disk.dev_type == constants.LD_FILE:
1524       result['DISK_%d_BACKEND_TYPE' % idx] = \
1525         'file:%s' % disk.physical_id[0]
1526   for idx, nic in enumerate(instance.nics):
1527     result['NIC_%d_MAC' % idx] = nic.mac
1528     if nic.ip:
1529       result['NIC_%d_IP' % idx] = nic.ip
1530     result['NIC_%d_BRIDGE' % idx] = nic.bridge
1531     if constants.HV_NIC_TYPE in instance.hvparams:
1532       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1533         instance.hvparams[constants.HV_NIC_TYPE]
1534
1535   return result
1536
1537 def GrowBlockDevice(disk, amount):
1538   """Grow a stack of block devices.
1539
1540   This function is called recursively, with the childrens being the
1541   first ones to resize.
1542
1543   @type disk: L{objects.Disk}
1544   @param disk: the disk to be grown
1545   @rtype: (status, result)
1546   @return: a tuple with the status of the operation
1547       (True/False), and the errors message if status
1548       is False
1549
1550   """
1551   r_dev = _RecursiveFindBD(disk)
1552   if r_dev is None:
1553     return False, "Cannot find block device %s" % (disk,)
1554
1555   try:
1556     r_dev.Grow(amount)
1557   except errors.BlockDeviceError, err:
1558     return False, str(err)
1559
1560   return True, None
1561
1562
1563 def SnapshotBlockDevice(disk):
1564   """Create a snapshot copy of a block device.
1565
1566   This function is called recursively, and the snapshot is actually created
1567   just for the leaf lvm backend device.
1568
1569   @type disk: L{objects.Disk}
1570   @param disk: the disk to be snapshotted
1571   @rtype: string
1572   @return: snapshot disk path
1573
1574   """
1575   if disk.children:
1576     if len(disk.children) == 1:
1577       # only one child, let's recurse on it
1578       return SnapshotBlockDevice(disk.children[0])
1579     else:
1580       # more than one child, choose one that matches
1581       for child in disk.children:
1582         if child.size == disk.size:
1583           # return implies breaking the loop
1584           return SnapshotBlockDevice(child)
1585   elif disk.dev_type == constants.LD_LV:
1586     r_dev = _RecursiveFindBD(disk)
1587     if r_dev is not None:
1588       # let's stay on the safe side and ask for the full size, for now
1589       return r_dev.Snapshot(disk.size)
1590     else:
1591       return None
1592   else:
1593     raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1594                                  " '%s' of type '%s'" %
1595                                  (disk.unique_id, disk.dev_type))
1596
1597
1598 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1599   """Export a block device snapshot to a remote node.
1600
1601   @type disk: L{objects.Disk}
1602   @param disk: the description of the disk to export
1603   @type dest_node: str
1604   @param dest_node: the destination node to export to
1605   @type instance: L{objects.Instance}
1606   @param instance: the instance object to whom the disk belongs
1607   @type cluster_name: str
1608   @param cluster_name: the cluster name, needed for SSH hostalias
1609   @type idx: int
1610   @param idx: the index of the disk in the instance's disk list,
1611       used to export to the OS scripts environment
1612   @rtype: boolean
1613   @return: the success of the operation
1614
1615   """
1616   export_env = OSEnvironment(instance)
1617
1618   inst_os = OSFromDisk(instance.os)
1619   export_script = inst_os.export_script
1620
1621   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1622                                      instance.name, int(time.time()))
1623   if not os.path.exists(constants.LOG_OS_DIR):
1624     os.mkdir(constants.LOG_OS_DIR, 0750)
1625   real_disk = _RecursiveFindBD(disk)
1626   if real_disk is None:
1627     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1628                                   str(disk))
1629   real_disk.Open()
1630
1631   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1632   export_env['EXPORT_INDEX'] = str(idx)
1633
1634   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1635   destfile = disk.physical_id[1]
1636
1637   # the target command is built out of three individual commands,
1638   # which are joined by pipes; we check each individual command for
1639   # valid parameters
1640   expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1641                                export_script, logfile)
1642
1643   comprcmd = "gzip"
1644
1645   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1646                                 destdir, destdir, destfile)
1647   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1648                                                    constants.GANETI_RUNAS,
1649                                                    destcmd)
1650
1651   # all commands have been checked, so we're safe to combine them
1652   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1653
1654   result = utils.RunCmd(command, env=export_env)
1655
1656   if result.failed:
1657     logging.error("os snapshot export command '%s' returned error: %s"
1658                   " output: %s", command, result.fail_reason, result.output)
1659     return False
1660
1661   return True
1662
1663
1664 def FinalizeExport(instance, snap_disks):
1665   """Write out the export configuration information.
1666
1667   @type instance: L{objects.Instance}
1668   @param instance: the instance which we export, used for
1669       saving configuration
1670   @type snap_disks: list of L{objects.Disk}
1671   @param snap_disks: list of snapshot block devices, which
1672       will be used to get the actual name of the dump file
1673
1674   @rtype: boolean
1675   @return: the success of the operation
1676
1677   """
1678   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1679   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1680
1681   config = objects.SerializableConfigParser()
1682
1683   config.add_section(constants.INISECT_EXP)
1684   config.set(constants.INISECT_EXP, 'version', '0')
1685   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1686   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1687   config.set(constants.INISECT_EXP, 'os', instance.os)
1688   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1689
1690   config.add_section(constants.INISECT_INS)
1691   config.set(constants.INISECT_INS, 'name', instance.name)
1692   config.set(constants.INISECT_INS, 'memory', '%d' %
1693              instance.beparams[constants.BE_MEMORY])
1694   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1695              instance.beparams[constants.BE_VCPUS])
1696   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1697
1698   nic_total = 0
1699   for nic_count, nic in enumerate(instance.nics):
1700     nic_total += 1
1701     config.set(constants.INISECT_INS, 'nic%d_mac' %
1702                nic_count, '%s' % nic.mac)
1703     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1704     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1705                '%s' % nic.bridge)
1706   # TODO: redundant: on load can read nics until it doesn't exist
1707   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1708
1709   disk_total = 0
1710   for disk_count, disk in enumerate(snap_disks):
1711     if disk:
1712       disk_total += 1
1713       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1714                  ('%s' % disk.iv_name))
1715       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1716                  ('%s' % disk.physical_id[1]))
1717       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1718                  ('%d' % disk.size))
1719
1720   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1721
1722   utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1723                   data=config.Dumps())
1724   shutil.rmtree(finaldestdir, True)
1725   shutil.move(destdir, finaldestdir)
1726
1727   return True
1728
1729
1730 def ExportInfo(dest):
1731   """Get export configuration information.
1732
1733   @type dest: str
1734   @param dest: directory containing the export
1735
1736   @rtype: L{objects.SerializableConfigParser}
1737   @return: a serializable config file containing the
1738       export info
1739
1740   """
1741   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1742
1743   config = objects.SerializableConfigParser()
1744   config.read(cff)
1745
1746   if (not config.has_section(constants.INISECT_EXP) or
1747       not config.has_section(constants.INISECT_INS)):
1748     return None
1749
1750   return config
1751
1752
1753 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1754   """Import an os image into an instance.
1755
1756   @type instance: L{objects.Instance}
1757   @param instance: instance to import the disks into
1758   @type src_node: string
1759   @param src_node: source node for the disk images
1760   @type src_images: list of string
1761   @param src_images: absolute paths of the disk images
1762   @rtype: list of boolean
1763   @return: each boolean represent the success of importing the n-th disk
1764
1765   """
1766   import_env = OSEnvironment(instance)
1767   inst_os = OSFromDisk(instance.os)
1768   import_script = inst_os.import_script
1769
1770   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1771                                         instance.name, int(time.time()))
1772   if not os.path.exists(constants.LOG_OS_DIR):
1773     os.mkdir(constants.LOG_OS_DIR, 0750)
1774
1775   comprcmd = "gunzip"
1776   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1777                                import_script, logfile)
1778
1779   final_result = []
1780   for idx, image in enumerate(src_images):
1781     if image:
1782       destcmd = utils.BuildShellCmd('cat %s', image)
1783       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1784                                                        constants.GANETI_RUNAS,
1785                                                        destcmd)
1786       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1787       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1788       import_env['IMPORT_INDEX'] = str(idx)
1789       result = utils.RunCmd(command, env=import_env)
1790       if result.failed:
1791         logging.error("Disk import command '%s' returned error: %s"
1792                       " output: %s", command, result.fail_reason,
1793                       result.output)
1794         final_result.append(False)
1795       else:
1796         final_result.append(True)
1797     else:
1798       final_result.append(True)
1799
1800   return final_result
1801
1802
1803 def ListExports():
1804   """Return a list of exports currently available on this machine.
1805
1806   @rtype: list
1807   @return: list of the exports
1808
1809   """
1810   if os.path.isdir(constants.EXPORT_DIR):
1811     return utils.ListVisibleFiles(constants.EXPORT_DIR)
1812   else:
1813     return []
1814
1815
1816 def RemoveExport(export):
1817   """Remove an existing export from the node.
1818
1819   @type export: str
1820   @param export: the name of the export to remove
1821   @rtype: boolean
1822   @return: the success of the operation
1823
1824   """
1825   target = os.path.join(constants.EXPORT_DIR, export)
1826
1827   shutil.rmtree(target)
1828   # TODO: catch some of the relevant exceptions and provide a pretty
1829   # error message if rmtree fails.
1830
1831   return True
1832
1833
1834 def RenameBlockDevices(devlist):
1835   """Rename a list of block devices.
1836
1837   @type devlist: list of tuples
1838   @param devlist: list of tuples of the form  (disk,
1839       new_logical_id, new_physical_id); disk is an
1840       L{objects.Disk} object describing the current disk,
1841       and new logical_id/physical_id is the name we
1842       rename it to
1843   @rtype: boolean
1844   @return: True if all renames succeeded, False otherwise
1845
1846   """
1847   result = True
1848   for disk, unique_id in devlist:
1849     dev = _RecursiveFindBD(disk)
1850     if dev is None:
1851       result = False
1852       continue
1853     try:
1854       old_rpath = dev.dev_path
1855       dev.Rename(unique_id)
1856       new_rpath = dev.dev_path
1857       if old_rpath != new_rpath:
1858         DevCacheManager.RemoveCache(old_rpath)
1859         # FIXME: we should add the new cache information here, like:
1860         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1861         # but we don't have the owner here - maybe parse from existing
1862         # cache? for now, we only lose lvm data when we rename, which
1863         # is less critical than DRBD or MD
1864     except errors.BlockDeviceError, err:
1865       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1866       result = False
1867   return result
1868
1869
1870 def _TransformFileStorageDir(file_storage_dir):
1871   """Checks whether given file_storage_dir is valid.
1872
1873   Checks wheter the given file_storage_dir is within the cluster-wide
1874   default file_storage_dir stored in SimpleStore. Only paths under that
1875   directory are allowed.
1876
1877   @type file_storage_dir: str
1878   @param file_storage_dir: the path to check
1879
1880   @return: the normalized path if valid, None otherwise
1881
1882   """
1883   cfg = _GetConfig()
1884   file_storage_dir = os.path.normpath(file_storage_dir)
1885   base_file_storage_dir = cfg.GetFileStorageDir()
1886   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1887       base_file_storage_dir):
1888     logging.error("file storage directory '%s' is not under base file"
1889                   " storage directory '%s'",
1890                   file_storage_dir, base_file_storage_dir)
1891     return None
1892   return file_storage_dir
1893
1894
1895 def CreateFileStorageDir(file_storage_dir):
1896   """Create file storage directory.
1897
1898   @type file_storage_dir: str
1899   @param file_storage_dir: directory to create
1900
1901   @rtype: tuple
1902   @return: tuple with first element a boolean indicating wheter dir
1903       creation was successful or not
1904
1905   """
1906   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1907   result = True,
1908   if not file_storage_dir:
1909     result = False,
1910   else:
1911     if os.path.exists(file_storage_dir):
1912       if not os.path.isdir(file_storage_dir):
1913         logging.error("'%s' is not a directory", file_storage_dir)
1914         result = False,
1915     else:
1916       try:
1917         os.makedirs(file_storage_dir, 0750)
1918       except OSError, err:
1919         logging.error("Cannot create file storage directory '%s': %s",
1920                       file_storage_dir, err)
1921         result = False,
1922   return result
1923
1924
1925 def RemoveFileStorageDir(file_storage_dir):
1926   """Remove file storage directory.
1927
1928   Remove it only if it's empty. If not log an error and return.
1929
1930   @type file_storage_dir: str
1931   @param file_storage_dir: the directory we should cleanup
1932   @rtype: tuple (success,)
1933   @return: tuple of one element, C{success}, denoting
1934       whether the operation was successfull
1935
1936   """
1937   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1938   result = True,
1939   if not file_storage_dir:
1940     result = False,
1941   else:
1942     if os.path.exists(file_storage_dir):
1943       if not os.path.isdir(file_storage_dir):
1944         logging.error("'%s' is not a directory", file_storage_dir)
1945         result = False,
1946       # deletes dir only if empty, otherwise we want to return False
1947       try:
1948         os.rmdir(file_storage_dir)
1949       except OSError, err:
1950         logging.exception("Cannot remove file storage directory '%s'",
1951                           file_storage_dir)
1952         result = False,
1953   return result
1954
1955
1956 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1957   """Rename the file storage directory.
1958
1959   @type old_file_storage_dir: str
1960   @param old_file_storage_dir: the current path
1961   @type new_file_storage_dir: str
1962   @param new_file_storage_dir: the name we should rename to
1963   @rtype: tuple (success,)
1964   @return: tuple of one element, C{success}, denoting
1965       whether the operation was successful
1966
1967   """
1968   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1969   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1970   result = True,
1971   if not old_file_storage_dir or not new_file_storage_dir:
1972     result = False,
1973   else:
1974     if not os.path.exists(new_file_storage_dir):
1975       if os.path.isdir(old_file_storage_dir):
1976         try:
1977           os.rename(old_file_storage_dir, new_file_storage_dir)
1978         except OSError, err:
1979           logging.exception("Cannot rename '%s' to '%s'",
1980                             old_file_storage_dir, new_file_storage_dir)
1981           result =  False,
1982       else:
1983         logging.error("'%s' is not a directory", old_file_storage_dir)
1984         result = False,
1985     else:
1986       if os.path.exists(old_file_storage_dir):
1987         logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1988                       old_file_storage_dir, new_file_storage_dir)
1989         result = False,
1990   return result
1991
1992
1993 def _IsJobQueueFile(file_name):
1994   """Checks whether the given filename is in the queue directory.
1995
1996   @type file_name: str
1997   @param file_name: the file name we should check
1998   @rtype: boolean
1999   @return: whether the file is under the queue directory
2000
2001   """
2002   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2003   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2004
2005   if not result:
2006     logging.error("'%s' is not a file in the queue directory",
2007                   file_name)
2008
2009   return result
2010
2011
2012 def JobQueueUpdate(file_name, content):
2013   """Updates a file in the queue directory.
2014
2015   This is just a wrapper over L{utils.WriteFile}, with proper
2016   checking.
2017
2018   @type file_name: str
2019   @param file_name: the job file name
2020   @type content: str
2021   @param content: the new job contents
2022   @rtype: boolean
2023   @return: the success of the operation
2024
2025   """
2026   if not _IsJobQueueFile(file_name):
2027     return False
2028
2029   # Write and replace the file atomically
2030   utils.WriteFile(file_name, data=_Decompress(content))
2031
2032   return True
2033
2034
2035 def JobQueueRename(old, new):
2036   """Renames a job queue file.
2037
2038   This is just a wrapper over os.rename with proper checking.
2039
2040   @type old: str
2041   @param old: the old (actual) file name
2042   @type new: str
2043   @param new: the desired file name
2044   @rtype: boolean
2045   @return: the success of the operation
2046
2047   """
2048   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2049     return False
2050
2051   utils.RenameFile(old, new, mkdir=True)
2052
2053   return True
2054
2055
2056 def JobQueueSetDrainFlag(drain_flag):
2057   """Set the drain flag for the queue.
2058
2059   This will set or unset the queue drain flag.
2060
2061   @type drain_flag: boolean
2062   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2063   @rtype: boolean
2064   @return: always True
2065   @warning: the function always returns True
2066
2067   """
2068   if drain_flag:
2069     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2070   else:
2071     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2072
2073   return True
2074
2075
2076 def CloseBlockDevices(disks):
2077   """Closes the given block devices.
2078
2079   This means they will be switched to secondary mode (in case of
2080   DRBD).
2081
2082   @type disks: list of L{objects.Disk}
2083   @param disks: the list of disks to be closed
2084   @rtype: tuple (success, message)
2085   @return: a tuple of success and message, where success
2086       indicates the succes of the operation, and message
2087       which will contain the error details in case we
2088       failed
2089
2090   """
2091   bdevs = []
2092   for cf in disks:
2093     rd = _RecursiveFindBD(cf)
2094     if rd is None:
2095       return (False, "Can't find device %s" % cf)
2096     bdevs.append(rd)
2097
2098   msg = []
2099   for rd in bdevs:
2100     try:
2101       rd.Close()
2102     except errors.BlockDeviceError, err:
2103       msg.append(str(err))
2104   if msg:
2105     return (False, "Can't make devices secondary: %s" % ",".join(msg))
2106   else:
2107     return (True, "All devices secondary")
2108
2109
2110 def ValidateHVParams(hvname, hvparams):
2111   """Validates the given hypervisor parameters.
2112
2113   @type hvname: string
2114   @param hvname: the hypervisor name
2115   @type hvparams: dict
2116   @param hvparams: the hypervisor parameters to be validated
2117   @rtype: tuple (success, message)
2118   @return: a tuple of success and message, where success
2119       indicates the succes of the operation, and message
2120       which will contain the error details in case we
2121       failed
2122
2123   """
2124   try:
2125     hv_type = hypervisor.GetHypervisor(hvname)
2126     hv_type.ValidateParameters(hvparams)
2127     return (True, "Validation passed")
2128   except errors.HypervisorError, err:
2129     return (False, str(err))
2130
2131
2132 def DemoteFromMC():
2133   """Demotes the current node from master candidate role.
2134
2135   """
2136   # try to ensure we're not the master by mistake
2137   master, myself = ssconf.GetMasterAndMyself()
2138   if master == myself:
2139     return (False, "ssconf status shows I'm the master node, will not demote")
2140   pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2141   if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2142     return (False, "The master daemon is running, will not demote")
2143   try:
2144     utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2145   except EnvironmentError, err:
2146     if err.errno != errno.ENOENT:
2147       return (False, "Error while backing up cluster file: %s" % str(err))
2148   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2149   return (True, "Done")
2150
2151
2152 class HooksRunner(object):
2153   """Hook runner.
2154
2155   This class is instantiated on the node side (ganeti-noded) and not
2156   on the master side.
2157
2158   """
2159   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2160
2161   def __init__(self, hooks_base_dir=None):
2162     """Constructor for hooks runner.
2163
2164     @type hooks_base_dir: str or None
2165     @param hooks_base_dir: if not None, this overrides the
2166         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2167
2168     """
2169     if hooks_base_dir is None:
2170       hooks_base_dir = constants.HOOKS_BASE_DIR
2171     self._BASE_DIR = hooks_base_dir
2172
2173   @staticmethod
2174   def ExecHook(script, env):
2175     """Exec one hook script.
2176
2177     @type script: str
2178     @param script: the full path to the script
2179     @type env: dict
2180     @param env: the environment with which to exec the script
2181     @rtype: tuple (success, message)
2182     @return: a tuple of success and message, where success
2183         indicates the succes of the operation, and message
2184         which will contain the error details in case we
2185         failed
2186
2187     """
2188     # exec the process using subprocess and log the output
2189     fdstdin = None
2190     try:
2191       fdstdin = open("/dev/null", "r")
2192       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2193                                stderr=subprocess.STDOUT, close_fds=True,
2194                                shell=False, cwd="/", env=env)
2195       output = ""
2196       try:
2197         output = child.stdout.read(4096)
2198         child.stdout.close()
2199       except EnvironmentError, err:
2200         output += "Hook script error: %s" % str(err)
2201
2202       while True:
2203         try:
2204           result = child.wait()
2205           break
2206         except EnvironmentError, err:
2207           if err.errno == errno.EINTR:
2208             continue
2209           raise
2210     finally:
2211       # try not to leak fds
2212       for fd in (fdstdin, ):
2213         if fd is not None:
2214           try:
2215             fd.close()
2216           except EnvironmentError, err:
2217             # just log the error
2218             #logging.exception("Error while closing fd %s", fd)
2219             pass
2220
2221     return result == 0, output
2222
2223   def RunHooks(self, hpath, phase, env):
2224     """Run the scripts in the hooks directory.
2225
2226     @type hpath: str
2227     @param hpath: the path to the hooks directory which
2228         holds the scripts
2229     @type phase: str
2230     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2231         L{constants.HOOKS_PHASE_POST}
2232     @type env: dict
2233     @param env: dictionary with the environment for the hook
2234     @rtype: list
2235     @return: list of 3-element tuples:
2236       - script path
2237       - script result, either L{constants.HKR_SUCCESS} or
2238         L{constants.HKR_FAIL}
2239       - output of the script
2240
2241     @raise errors.ProgrammerError: for invalid input
2242         parameters
2243
2244     """
2245     if phase == constants.HOOKS_PHASE_PRE:
2246       suffix = "pre"
2247     elif phase == constants.HOOKS_PHASE_POST:
2248       suffix = "post"
2249     else:
2250       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2251     rr = []
2252
2253     subdir = "%s-%s.d" % (hpath, suffix)
2254     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2255     try:
2256       dir_contents = utils.ListVisibleFiles(dir_name)
2257     except OSError, err:
2258       # FIXME: must log output in case of failures
2259       return rr
2260
2261     # we use the standard python sort order,
2262     # so 00name is the recommended naming scheme
2263     dir_contents.sort()
2264     for relname in dir_contents:
2265       fname = os.path.join(dir_name, relname)
2266       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2267           self.RE_MASK.match(relname) is not None):
2268         rrval = constants.HKR_SKIP
2269         output = ""
2270       else:
2271         result, output = self.ExecHook(fname, env)
2272         if not result:
2273           rrval = constants.HKR_FAIL
2274         else:
2275           rrval = constants.HKR_SUCCESS
2276       rr.append(("%s/%s" % (subdir, relname), rrval, output))
2277
2278     return rr
2279
2280
2281 class IAllocatorRunner(object):
2282   """IAllocator runner.
2283
2284   This class is instantiated on the node side (ganeti-noded) and not on
2285   the master side.
2286
2287   """
2288   def Run(self, name, idata):
2289     """Run an iallocator script.
2290
2291     @type name: str
2292     @param name: the iallocator script name
2293     @type idata: str
2294     @param idata: the allocator input data
2295
2296     @rtype: tuple
2297     @return: four element tuple of:
2298        - run status (one of the IARUN_ constants)
2299        - stdout
2300        - stderr
2301        - fail reason (as from L{utils.RunResult})
2302
2303     """
2304     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2305                                   os.path.isfile)
2306     if alloc_script is None:
2307       return (constants.IARUN_NOTFOUND, None, None, None)
2308
2309     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2310     try:
2311       os.write(fd, idata)
2312       os.close(fd)
2313       result = utils.RunCmd([alloc_script, fin_name])
2314       if result.failed:
2315         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2316                 result.fail_reason)
2317     finally:
2318       os.unlink(fin_name)
2319
2320     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2321
2322
2323 class DevCacheManager(object):
2324   """Simple class for managing a cache of block device information.
2325
2326   """
2327   _DEV_PREFIX = "/dev/"
2328   _ROOT_DIR = constants.BDEV_CACHE_DIR
2329
2330   @classmethod
2331   def _ConvertPath(cls, dev_path):
2332     """Converts a /dev/name path to the cache file name.
2333
2334     This replaces slashes with underscores and strips the /dev
2335     prefix. It then returns the full path to the cache file.
2336
2337     @type dev_path: str
2338     @param dev_path: the C{/dev/} path name
2339     @rtype: str
2340     @return: the converted path name
2341
2342     """
2343     if dev_path.startswith(cls._DEV_PREFIX):
2344       dev_path = dev_path[len(cls._DEV_PREFIX):]
2345     dev_path = dev_path.replace("/", "_")
2346     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2347     return fpath
2348
2349   @classmethod
2350   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2351     """Updates the cache information for a given device.
2352
2353     @type dev_path: str
2354     @param dev_path: the pathname of the device
2355     @type owner: str
2356     @param owner: the owner (instance name) of the device
2357     @type on_primary: bool
2358     @param on_primary: whether this is the primary
2359         node nor not
2360     @type iv_name: str
2361     @param iv_name: the instance-visible name of the
2362         device, as in objects.Disk.iv_name
2363
2364     @rtype: None
2365
2366     """
2367     if dev_path is None:
2368       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2369       return
2370     fpath = cls._ConvertPath(dev_path)
2371     if on_primary:
2372       state = "primary"
2373     else:
2374       state = "secondary"
2375     if iv_name is None:
2376       iv_name = "not_visible"
2377     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2378     try:
2379       utils.WriteFile(fpath, data=fdata)
2380     except EnvironmentError, err:
2381       logging.exception("Can't update bdev cache for %s", dev_path)
2382
2383   @classmethod
2384   def RemoveCache(cls, dev_path):
2385     """Remove data for a dev_path.
2386
2387     This is just a wrapper over L{utils.RemoveFile} with a converted
2388     path name and logging.
2389
2390     @type dev_path: str
2391     @param dev_path: the pathname of the device
2392
2393     @rtype: None
2394
2395     """
2396     if dev_path is None:
2397       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2398       return
2399     fpath = cls._ConvertPath(dev_path)
2400     try:
2401       utils.RemoveFile(fpath)
2402     except EnvironmentError, err:
2403       logging.exception("Can't update bdev cache for %s", dev_path)