cleanup: more unused vars
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36 import zlib
37 import base64
38
39 from ganeti import errors
40 from ganeti import utils
41 from ganeti import ssh
42 from ganeti import hypervisor
43 from ganeti import constants
44 from ganeti import bdev
45 from ganeti import objects
46 from ganeti import ssconf
47
48
49 def _GetConfig():
50   """Simple wrapper to return a SimpleStore.
51
52   @rtype: L{ssconf.SimpleStore}
53   @return: a SimpleStore instance
54
55   """
56   return ssconf.SimpleStore()
57
58
59 def _GetSshRunner(cluster_name):
60   """Simple wrapper to return an SshRunner.
61
62   @type cluster_name: str
63   @param cluster_name: the cluster name, which is needed
64       by the SshRunner constructor
65   @rtype: L{ssh.SshRunner}
66   @return: an SshRunner instance
67
68   """
69   return ssh.SshRunner(cluster_name)
70
71
72 def _Decompress(data):
73   """Unpacks data compressed by the RPC client.
74
75   @type data: list or tuple
76   @param data: Data sent by RPC client
77   @rtype: str
78   @return: Decompressed data
79
80   """
81   assert isinstance(data, (list, tuple))
82   assert len(data) == 2
83   (encoding, content) = data
84   if encoding == constants.RPC_ENCODING_NONE:
85     return content
86   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
87     return zlib.decompress(base64.b64decode(content))
88   else:
89     raise AssertionError("Unknown data encoding")
90
91
92 def _CleanDirectory(path, exclude=None):
93   """Removes all regular files in a directory.
94
95   @type path: str
96   @param path: the directory to clean
97   @type exclude: list
98   @param exclude: list of files to be excluded, defaults
99       to the empty list
100
101   """
102   if not os.path.isdir(path):
103     return
104   if exclude is None:
105     exclude = []
106   else:
107     # Normalize excluded paths
108     exclude = [os.path.normpath(i) for i in exclude]
109
110   for rel_name in utils.ListVisibleFiles(path):
111     full_name = os.path.normpath(os.path.join(path, rel_name))
112     if full_name in exclude:
113       continue
114     if os.path.isfile(full_name) and not os.path.islink(full_name):
115       utils.RemoveFile(full_name)
116
117
118 def JobQueuePurge():
119   """Removes job queue files and archived jobs.
120
121   @rtype: None
122
123   """
124   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
125   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
126
127
128 def GetMasterInfo():
129   """Returns master information.
130
131   This is an utility function to compute master information, either
132   for consumption here or from the node daemon.
133
134   @rtype: tuple
135   @return: (master_netdev, master_ip, master_name) if we have a good
136       configuration, otherwise (None, None, None)
137
138   """
139   try:
140     cfg = _GetConfig()
141     master_netdev = cfg.GetMasterNetdev()
142     master_ip = cfg.GetMasterIP()
143     master_node = cfg.GetMasterNode()
144   except errors.ConfigurationError, err:
145     logging.exception("Cluster configuration incomplete")
146     return (None, None, None)
147   return (master_netdev, master_ip, master_node)
148
149
150 def StartMaster(start_daemons):
151   """Activate local node as master node.
152
153   The function will always try activate the IP address of the master
154   (unless someone else has it). It will also start the master daemons,
155   based on the start_daemons parameter.
156
157   @type start_daemons: boolean
158   @param start_daemons: whther to also start the master
159       daemons (ganeti-masterd and ganeti-rapi)
160   @rtype: None
161
162   """
163   ok = True
164   master_netdev, master_ip, _ = GetMasterInfo()
165   if not master_netdev:
166     return False
167
168   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
169     if utils.OwnIpAddress(master_ip):
170       # we already have the ip:
171       logging.debug("Already started")
172     else:
173       logging.error("Someone else has the master ip, not activating")
174       ok = False
175   else:
176     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
177                            "dev", master_netdev, "label",
178                            "%s:0" % master_netdev])
179     if result.failed:
180       logging.error("Can't activate master IP: %s", result.output)
181       ok = False
182
183     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
184                            "-s", master_ip, master_ip])
185     # we'll ignore the exit code of arping
186
187   # and now start the master and rapi daemons
188   if start_daemons:
189     for daemon in 'ganeti-masterd', 'ganeti-rapi':
190       result = utils.RunCmd([daemon])
191       if result.failed:
192         logging.error("Can't start daemon %s: %s", daemon, result.output)
193         ok = False
194   return ok
195
196
197 def StopMaster(stop_daemons):
198   """Deactivate this node as master.
199
200   The function will always try to deactivate the IP address of the
201   master. It will also stop the master daemons depending on the
202   stop_daemons parameter.
203
204   @type stop_daemons: boolean
205   @param stop_daemons: whether to also stop the master daemons
206       (ganeti-masterd and ganeti-rapi)
207   @rtype: None
208
209   """
210   master_netdev, master_ip, _ = GetMasterInfo()
211   if not master_netdev:
212     return False
213
214   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
215                          "dev", master_netdev])
216   if result.failed:
217     logging.error("Can't remove the master IP, error: %s", result.output)
218     # but otherwise ignore the failure
219
220   if stop_daemons:
221     # stop/kill the rapi and the master daemon
222     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
223       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
224
225   return True
226
227
228 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
229   """Joins this node to the cluster.
230
231   This does the following:
232       - updates the hostkeys of the machine (rsa and dsa)
233       - adds the ssh private key to the user
234       - adds the ssh public key to the users' authorized_keys file
235
236   @type dsa: str
237   @param dsa: the DSA private key to write
238   @type dsapub: str
239   @param dsapub: the DSA public key to write
240   @type rsa: str
241   @param rsa: the RSA private key to write
242   @type rsapub: str
243   @param rsapub: the RSA public key to write
244   @type sshkey: str
245   @param sshkey: the SSH private key to write
246   @type sshpub: str
247   @param sshpub: the SSH public key to write
248   @rtype: boolean
249   @return: the success of the operation
250
251   """
252   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
253                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
254                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
255                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
256   for name, content, mode in sshd_keys:
257     utils.WriteFile(name, data=content, mode=mode)
258
259   try:
260     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
261                                                     mkdir=True)
262   except errors.OpExecError, err:
263     logging.exception("Error while processing user ssh files")
264     return False
265
266   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
267     utils.WriteFile(name, data=content, mode=0600)
268
269   utils.AddAuthorizedKey(auth_keys, sshpub)
270
271   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
272
273   return True
274
275
276 def LeaveCluster():
277   """Cleans up and remove the current node.
278
279   This function cleans up and prepares the current node to be removed
280   from the cluster.
281
282   If processing is successful, then it raises an
283   L{errors.QuitGanetiException} which is used as a special case to
284   shutdown the node daemon.
285
286   """
287   _CleanDirectory(constants.DATA_DIR)
288   JobQueuePurge()
289
290   try:
291     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
292   except errors.OpExecError:
293     logging.exception("Error while processing ssh files")
294     return
295
296   f = open(pub_key, 'r')
297   try:
298     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
299   finally:
300     f.close()
301
302   utils.RemoveFile(priv_key)
303   utils.RemoveFile(pub_key)
304
305   # Return a reassuring string to the caller, and quit
306   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
307
308
309 def GetNodeInfo(vgname, hypervisor_type):
310   """Gives back a hash with different informations about the node.
311
312   @type vgname: C{string}
313   @param vgname: the name of the volume group to ask for disk space information
314   @type hypervisor_type: C{str}
315   @param hypervisor_type: the name of the hypervisor to ask for
316       memory information
317   @rtype: C{dict}
318   @return: dictionary with the following keys:
319       - vg_size is the size of the configured volume group in MiB
320       - vg_free is the free size of the volume group in MiB
321       - memory_dom0 is the memory allocated for domain0 in MiB
322       - memory_free is the currently available (free) ram in MiB
323       - memory_total is the total number of ram in MiB
324
325   """
326   outputarray = {}
327   vginfo = _GetVGInfo(vgname)
328   outputarray['vg_size'] = vginfo['vg_size']
329   outputarray['vg_free'] = vginfo['vg_free']
330
331   hyper = hypervisor.GetHypervisor(hypervisor_type)
332   hyp_info = hyper.GetNodeInfo()
333   if hyp_info is not None:
334     outputarray.update(hyp_info)
335
336   f = open("/proc/sys/kernel/random/boot_id", 'r')
337   try:
338     outputarray["bootid"] = f.read(128).rstrip("\n")
339   finally:
340     f.close()
341
342   return outputarray
343
344
345 def VerifyNode(what, cluster_name):
346   """Verify the status of the local node.
347
348   Based on the input L{what} parameter, various checks are done on the
349   local node.
350
351   If the I{filelist} key is present, this list of
352   files is checksummed and the file/checksum pairs are returned.
353
354   If the I{nodelist} key is present, we check that we have
355   connectivity via ssh with the target nodes (and check the hostname
356   report).
357
358   If the I{node-net-test} key is present, we check that we have
359   connectivity to the given nodes via both primary IP and, if
360   applicable, secondary IPs.
361
362   @type what: C{dict}
363   @param what: a dictionary of things to check:
364       - filelist: list of files for which to compute checksums
365       - nodelist: list of nodes we should check ssh communication with
366       - node-net-test: list of nodes we should check node daemon port
367         connectivity with
368       - hypervisor: list with hypervisors to run the verify for
369   @rtype: dict
370   @return: a dictionary with the same keys as the input dict, and
371       values representing the result of the checks
372
373   """
374   result = {}
375
376   if constants.NV_HYPERVISOR in what:
377     result[constants.NV_HYPERVISOR] = tmp = {}
378     for hv_name in what[constants.NV_HYPERVISOR]:
379       tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
380
381   if constants.NV_FILELIST in what:
382     result[constants.NV_FILELIST] = utils.FingerprintFiles(
383       what[constants.NV_FILELIST])
384
385   if constants.NV_NODELIST in what:
386     result[constants.NV_NODELIST] = tmp = {}
387     random.shuffle(what[constants.NV_NODELIST])
388     for node in what[constants.NV_NODELIST]:
389       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
390       if not success:
391         tmp[node] = message
392
393   if constants.NV_NODENETTEST in what:
394     result[constants.NV_NODENETTEST] = tmp = {}
395     my_name = utils.HostInfo().name
396     my_pip = my_sip = None
397     for name, pip, sip in what[constants.NV_NODENETTEST]:
398       if name == my_name:
399         my_pip = pip
400         my_sip = sip
401         break
402     if not my_pip:
403       tmp[my_name] = ("Can't find my own primary/secondary IP"
404                       " in the node list")
405     else:
406       port = utils.GetNodeDaemonPort()
407       for name, pip, sip in what[constants.NV_NODENETTEST]:
408         fail = []
409         if not utils.TcpPing(pip, port, source=my_pip):
410           fail.append("primary")
411         if sip != pip:
412           if not utils.TcpPing(sip, port, source=my_sip):
413             fail.append("secondary")
414         if fail:
415           tmp[name] = ("failure using the %s interface(s)" %
416                        " and ".join(fail))
417
418   if constants.NV_LVLIST in what:
419     result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
420
421   if constants.NV_INSTANCELIST in what:
422     result[constants.NV_INSTANCELIST] = GetInstanceList(
423       what[constants.NV_INSTANCELIST])
424
425   if constants.NV_VGLIST in what:
426     result[constants.NV_VGLIST] = ListVolumeGroups()
427
428   if constants.NV_VERSION in what:
429     result[constants.NV_VERSION] = constants.PROTOCOL_VERSION
430
431   if constants.NV_HVINFO in what:
432     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
433     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
434
435   return result
436
437
438 def GetVolumeList(vg_name):
439   """Compute list of logical volumes and their size.
440
441   @type vg_name: str
442   @param vg_name: the volume group whose LVs we should list
443   @rtype: dict
444   @return:
445       dictionary of all partions (key) with value being a tuple of
446       their size (in MiB), inactive and online status::
447
448         {'test1': ('20.06', True, True)}
449
450       in case of errors, a string is returned with the error
451       details.
452
453   """
454   lvs = {}
455   sep = '|'
456   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
457                          "--separator=%s" % sep,
458                          "-olv_name,lv_size,lv_attr", vg_name])
459   if result.failed:
460     logging.error("Failed to list logical volumes, lvs output: %s",
461                   result.output)
462     return result.output
463
464   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
465   for line in result.stdout.splitlines():
466     line = line.strip()
467     match = valid_line_re.match(line)
468     if not match:
469       logging.error("Invalid line returned from lvs output: '%s'", line)
470       continue
471     name, size, attr = match.groups()
472     inactive = attr[4] == '-'
473     online = attr[5] == 'o'
474     lvs[name] = (size, inactive, online)
475
476   return lvs
477
478
479 def ListVolumeGroups():
480   """List the volume groups and their size.
481
482   @rtype: dict
483   @return: dictionary with keys volume name and values the
484       size of the volume
485
486   """
487   return utils.ListVolumeGroups()
488
489
490 def NodeVolumes():
491   """List all volumes on this node.
492
493   @rtype: list
494   @return:
495     A list of dictionaries, each having four keys:
496       - name: the logical volume name,
497       - size: the size of the logical volume
498       - dev: the physical device on which the LV lives
499       - vg: the volume group to which it belongs
500
501     In case of errors, we return an empty list and log the
502     error.
503
504     Note that since a logical volume can live on multiple physical
505     volumes, the resulting list might include a logical volume
506     multiple times.
507
508   """
509   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
510                          "--separator=|",
511                          "--options=lv_name,lv_size,devices,vg_name"])
512   if result.failed:
513     logging.error("Failed to list logical volumes, lvs output: %s",
514                   result.output)
515     return []
516
517   def parse_dev(dev):
518     if '(' in dev:
519       return dev.split('(')[0]
520     else:
521       return dev
522
523   def map_line(line):
524     return {
525       'name': line[0].strip(),
526       'size': line[1].strip(),
527       'dev': parse_dev(line[2].strip()),
528       'vg': line[3].strip(),
529     }
530
531   return [map_line(line.split('|')) for line in result.stdout.splitlines()
532           if line.count('|') >= 3]
533
534
535 def BridgesExist(bridges_list):
536   """Check if a list of bridges exist on the current node.
537
538   @rtype: boolean
539   @return: C{True} if all of them exist, C{False} otherwise
540
541   """
542   for bridge in bridges_list:
543     if not utils.BridgeExists(bridge):
544       return False
545
546   return True
547
548
549 def GetInstanceList(hypervisor_list):
550   """Provides a list of instances.
551
552   @type hypervisor_list: list
553   @param hypervisor_list: the list of hypervisors to query information
554
555   @rtype: list
556   @return: a list of all running instances on the current node
557     - instance1.example.com
558     - instance2.example.com
559
560   """
561   results = []
562   for hname in hypervisor_list:
563     try:
564       names = hypervisor.GetHypervisor(hname).ListInstances()
565       results.extend(names)
566     except errors.HypervisorError, err:
567       logging.exception("Error enumerating instances for hypevisor %s", hname)
568       # FIXME: should we somehow not propagate this to the master?
569       raise
570
571   return results
572
573
574 def GetInstanceInfo(instance, hname):
575   """Gives back the informations about an instance as a dictionary.
576
577   @type instance: string
578   @param instance: the instance name
579   @type hname: string
580   @param hname: the hypervisor type of the instance
581
582   @rtype: dict
583   @return: dictionary with the following keys:
584       - memory: memory size of instance (int)
585       - state: xen state of instance (string)
586       - time: cpu time of instance (float)
587
588   """
589   output = {}
590
591   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
592   if iinfo is not None:
593     output['memory'] = iinfo[2]
594     output['state'] = iinfo[4]
595     output['time'] = iinfo[5]
596
597   return output
598
599
600 def GetAllInstancesInfo(hypervisor_list):
601   """Gather data about all instances.
602
603   This is the equivalent of L{GetInstanceInfo}, except that it
604   computes data for all instances at once, thus being faster if one
605   needs data about more than one instance.
606
607   @type hypervisor_list: list
608   @param hypervisor_list: list of hypervisors to query for instance data
609
610   @rtype: dict
611   @return: dictionary of instance: data, with data having the following keys:
612       - memory: memory size of instance (int)
613       - state: xen state of instance (string)
614       - time: cpu time of instance (float)
615       - vcpus: the number of vcpus
616
617   """
618   output = {}
619
620   for hname in hypervisor_list:
621     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
622     if iinfo:
623       for name, inst_id, memory, vcpus, state, times in iinfo:
624         value = {
625           'memory': memory,
626           'vcpus': vcpus,
627           'state': state,
628           'time': times,
629           }
630         if name in output and output[name] != value:
631           raise errors.HypervisorError("Instance %s running duplicate"
632                                        " with different parameters" % name)
633         output[name] = value
634
635   return output
636
637
638 def AddOSToInstance(instance):
639   """Add an OS to an instance.
640
641   @type instance: L{objects.Instance}
642   @param instance: Instance whose OS is to be installed
643   @rtype: boolean
644   @return: the success of the operation
645
646   """
647   inst_os = OSFromDisk(instance.os)
648
649   create_env = OSEnvironment(instance)
650
651   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
652                                      instance.name, int(time.time()))
653
654   result = utils.RunCmd([inst_os.create_script], env=create_env,
655                         cwd=inst_os.path, output=logfile,)
656   if result.failed:
657     logging.error("os create command '%s' returned error: %s, logfile: %s,"
658                   " output: %s", result.cmd, result.fail_reason, logfile,
659                   result.output)
660     return False
661
662   return True
663
664
665 def RunRenameInstance(instance, old_name):
666   """Run the OS rename script for an instance.
667
668   @type instance: L{objects.Instance}
669   @param instance: Instance whose OS is to be installed
670   @type old_name: string
671   @param old_name: previous instance name
672   @rtype: boolean
673   @return: the success of the operation
674
675   """
676   inst_os = OSFromDisk(instance.os)
677
678   rename_env = OSEnvironment(instance)
679   rename_env['OLD_INSTANCE_NAME'] = old_name
680
681   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
682                                            old_name,
683                                            instance.name, int(time.time()))
684
685   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
686                         cwd=inst_os.path, output=logfile)
687
688   if result.failed:
689     logging.error("os create command '%s' returned error: %s output: %s",
690                   result.cmd, result.fail_reason, result.output)
691     return False
692
693   return True
694
695
696 def _GetVGInfo(vg_name):
697   """Get informations about the volume group.
698
699   @type vg_name: str
700   @param vg_name: the volume group which we query
701   @rtype: dict
702   @return:
703     A dictionary with the following keys:
704       - C{vg_size} is the total size of the volume group in MiB
705       - C{vg_free} is the free size of the volume group in MiB
706       - C{pv_count} are the number of physical disks in that VG
707
708     If an error occurs during gathering of data, we return the same dict
709     with keys all set to None.
710
711   """
712   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
713
714   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
715                          "--nosuffix", "--units=m", "--separator=:", vg_name])
716
717   if retval.failed:
718     logging.error("volume group %s not present", vg_name)
719     return retdic
720   valarr = retval.stdout.strip().rstrip(':').split(':')
721   if len(valarr) == 3:
722     try:
723       retdic = {
724         "vg_size": int(round(float(valarr[0]), 0)),
725         "vg_free": int(round(float(valarr[1]), 0)),
726         "pv_count": int(valarr[2]),
727         }
728     except ValueError, err:
729       logging.exception("Fail to parse vgs output")
730   else:
731     logging.error("vgs output has the wrong number of fields (expected"
732                   " three): %s", str(valarr))
733   return retdic
734
735
736 def _GatherBlockDevs(instance):
737   """Set up an instance's block device(s).
738
739   This is run on the primary node at instance startup. The block
740   devices must be already assembled.
741
742   @type instance: L{objects.Instance}
743   @param instance: the instance whose disks we shoul assemble
744   @rtype: list of L{bdev.BlockDev}
745   @return: list of the block devices
746
747   """
748   block_devices = []
749   for disk in instance.disks:
750     device = _RecursiveFindBD(disk)
751     if device is None:
752       raise errors.BlockDeviceError("Block device '%s' is not set up." %
753                                     str(disk))
754     device.Open()
755     block_devices.append((disk, device))
756   return block_devices
757
758
759 def StartInstance(instance, extra_args):
760   """Start an instance.
761
762   @type instance: L{objects.Instance}
763   @param instance: the instance object
764   @rtype: boolean
765   @return: whether the startup was successful or not
766
767   """
768   running_instances = GetInstanceList([instance.hypervisor])
769
770   if instance.name in running_instances:
771     return True
772
773   block_devices = _GatherBlockDevs(instance)
774   hyper = hypervisor.GetHypervisor(instance.hypervisor)
775
776   try:
777     hyper.StartInstance(instance, block_devices, extra_args)
778   except errors.HypervisorError, err:
779     logging.exception("Failed to start instance")
780     return False
781
782   return True
783
784
785 def ShutdownInstance(instance):
786   """Shut an instance down.
787
788   @note: this functions uses polling with a hardcoded timeout.
789
790   @type instance: L{objects.Instance}
791   @param instance: the instance object
792   @rtype: boolean
793   @return: whether the startup was successful or not
794
795   """
796   hv_name = instance.hypervisor
797   running_instances = GetInstanceList([hv_name])
798
799   if instance.name not in running_instances:
800     return True
801
802   hyper = hypervisor.GetHypervisor(hv_name)
803   try:
804     hyper.StopInstance(instance)
805   except errors.HypervisorError, err:
806     logging.error("Failed to stop instance")
807     return False
808
809   # test every 10secs for 2min
810
811   time.sleep(1)
812   for dummy in range(11):
813     if instance.name not in GetInstanceList([hv_name]):
814       break
815     time.sleep(10)
816   else:
817     # the shutdown did not succeed
818     logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
819
820     try:
821       hyper.StopInstance(instance, force=True)
822     except errors.HypervisorError, err:
823       logging.exception("Failed to stop instance")
824       return False
825
826     time.sleep(1)
827     if instance.name in GetInstanceList([hv_name]):
828       logging.error("could not shutdown instance '%s' even by destroy",
829                     instance.name)
830       return False
831
832   return True
833
834
835 def RebootInstance(instance, reboot_type, extra_args):
836   """Reboot an instance.
837
838   @type instance: L{objects.Instance}
839   @param instance: the instance object to reboot
840   @type reboot_type: str
841   @param reboot_type: the type of reboot, one the following
842     constants:
843       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
844         instance OS, do not recreate the VM
845       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
846         restart the VM (at the hypervisor level)
847       - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
848         is not accepted here, since that mode is handled
849         differently
850   @rtype: boolean
851   @return: the success of the operation
852
853   """
854   running_instances = GetInstanceList([instance.hypervisor])
855
856   if instance.name not in running_instances:
857     logging.error("Cannot reboot instance that is not running")
858     return False
859
860   hyper = hypervisor.GetHypervisor(instance.hypervisor)
861   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
862     try:
863       hyper.RebootInstance(instance)
864     except errors.HypervisorError, err:
865       logging.exception("Failed to soft reboot instance")
866       return False
867   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
868     try:
869       ShutdownInstance(instance)
870       StartInstance(instance, extra_args)
871     except errors.HypervisorError, err:
872       logging.exception("Failed to hard reboot instance")
873       return False
874   else:
875     raise errors.ParameterError("reboot_type invalid")
876
877   return True
878
879
880 def MigrateInstance(instance, target, live):
881   """Migrates an instance to another node.
882
883   @type instance: L{objects.Instance}
884   @param instance: the instance definition
885   @type target: string
886   @param target: the target node name
887   @type live: boolean
888   @param live: whether the migration should be done live or not (the
889       interpretation of this parameter is left to the hypervisor)
890   @rtype: tuple
891   @return: a tuple of (success, msg) where:
892       - succes is a boolean denoting the success/failure of the operation
893       - msg is a string with details in case of failure
894
895   """
896   hyper = hypervisor.GetHypervisor(instance.hypervisor_name)
897
898   try:
899     hyper.MigrateInstance(instance.name, target, live)
900   except errors.HypervisorError, err:
901     msg = "Failed to migrate instance: %s" % str(err)
902     logging.error(msg)
903     return (False, msg)
904   return (True, "Migration successfull")
905
906
907 def CreateBlockDevice(disk, size, owner, on_primary, info):
908   """Creates a block device for an instance.
909
910   @type disk: L{objects.Disk}
911   @param disk: the object describing the disk we should create
912   @type size: int
913   @param size: the size of the physical underlying device, in MiB
914   @type owner: str
915   @param owner: the name of the instance for which disk is created,
916       used for device cache data
917   @type on_primary: boolean
918   @param on_primary:  indicates if it is the primary node or not
919   @type info: string
920   @param info: string that will be sent to the physical device
921       creation, used for example to set (LVM) tags on LVs
922
923   @return: the new unique_id of the device (this can sometime be
924       computed only after creation), or None. On secondary nodes,
925       it's not required to return anything.
926
927   """
928   clist = []
929   if disk.children:
930     for child in disk.children:
931       crdev = _RecursiveAssembleBD(child, owner, on_primary)
932       if on_primary or disk.AssembleOnSecondary():
933         # we need the children open in case the device itself has to
934         # be assembled
935         crdev.Open()
936       clist.append(crdev)
937   try:
938     device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
939     if device is not None:
940       logging.info("removing existing device %s", disk)
941       device.Remove()
942   except errors.BlockDeviceError, err:
943     pass
944
945   device = bdev.Create(disk.dev_type, disk.physical_id,
946                        clist, size)
947   if device is None:
948     raise ValueError("Can't create child device for %s, %s" %
949                      (disk, size))
950   if on_primary or disk.AssembleOnSecondary():
951     if not device.Assemble():
952       errorstring = "Can't assemble device after creation"
953       logging.error(errorstring)
954       raise errors.BlockDeviceError("%s, very unusual event - check the node"
955                                     " daemon logs" % errorstring)
956     device.SetSyncSpeed(constants.SYNC_SPEED)
957     if on_primary or disk.OpenOnSecondary():
958       device.Open(force=True)
959     DevCacheManager.UpdateCache(device.dev_path, owner,
960                                 on_primary, disk.iv_name)
961
962   device.SetInfo(info)
963
964   physical_id = device.unique_id
965   return physical_id
966
967
968 def RemoveBlockDevice(disk):
969   """Remove a block device.
970
971   @note: This is intended to be called recursively.
972
973   @type disk: L{objects.Disk}
974   @param disk: the disk object we should remove
975   @rtype: boolean
976   @return: the success of the operation
977
978   """
979   try:
980     # since we are removing the device, allow a partial match
981     # this allows removal of broken mirrors
982     rdev = _RecursiveFindBD(disk, allow_partial=True)
983   except errors.BlockDeviceError, err:
984     # probably can't attach
985     logging.info("Can't attach to device %s in remove", disk)
986     rdev = None
987   if rdev is not None:
988     r_path = rdev.dev_path
989     result = rdev.Remove()
990     if result:
991       DevCacheManager.RemoveCache(r_path)
992   else:
993     result = True
994   if disk.children:
995     for child in disk.children:
996       result = result and RemoveBlockDevice(child)
997   return result
998
999
1000 def _RecursiveAssembleBD(disk, owner, as_primary):
1001   """Activate a block device for an instance.
1002
1003   This is run on the primary and secondary nodes for an instance.
1004
1005   @note: this function is called recursively.
1006
1007   @type disk: L{objects.Disk}
1008   @param disk: the disk we try to assemble
1009   @type owner: str
1010   @param owner: the name of the instance which owns the disk
1011   @type as_primary: boolean
1012   @param as_primary: if we should make the block device
1013       read/write
1014
1015   @return: the assembled device or None (in case no device
1016       was assembled)
1017   @raise errors.BlockDeviceError: in case there is an error
1018       during the activation of the children or the device
1019       itself
1020
1021   """
1022   children = []
1023   if disk.children:
1024     mcn = disk.ChildrenNeeded()
1025     if mcn == -1:
1026       mcn = 0 # max number of Nones allowed
1027     else:
1028       mcn = len(disk.children) - mcn # max number of Nones
1029     for chld_disk in disk.children:
1030       try:
1031         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1032       except errors.BlockDeviceError, err:
1033         if children.count(None) >= mcn:
1034           raise
1035         cdev = None
1036         logging.debug("Error in child activation: %s", str(err))
1037       children.append(cdev)
1038
1039   if as_primary or disk.AssembleOnSecondary():
1040     r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
1041     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1042     result = r_dev
1043     if as_primary or disk.OpenOnSecondary():
1044       r_dev.Open()
1045     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1046                                 as_primary, disk.iv_name)
1047
1048   else:
1049     result = True
1050   return result
1051
1052
1053 def AssembleBlockDevice(disk, owner, as_primary):
1054   """Activate a block device for an instance.
1055
1056   This is a wrapper over _RecursiveAssembleBD.
1057
1058   @rtype: str or boolean
1059   @return: a C{/dev/...} path for primary nodes, and
1060       C{True} for secondary nodes
1061
1062   """
1063   result = _RecursiveAssembleBD(disk, owner, as_primary)
1064   if isinstance(result, bdev.BlockDev):
1065     result = result.dev_path
1066   return result
1067
1068
1069 def ShutdownBlockDevice(disk):
1070   """Shut down a block device.
1071
1072   First, if the device is assembled (Attach() is successfull), then
1073   the device is shutdown. Then the children of the device are
1074   shutdown.
1075
1076   This function is called recursively. Note that we don't cache the
1077   children or such, as oppossed to assemble, shutdown of different
1078   devices doesn't require that the upper device was active.
1079
1080   @type disk: L{objects.Disk}
1081   @param disk: the description of the disk we should
1082       shutdown
1083   @rtype: boolean
1084   @return: the success of the operation
1085
1086   """
1087   r_dev = _RecursiveFindBD(disk)
1088   if r_dev is not None:
1089     r_path = r_dev.dev_path
1090     result = r_dev.Shutdown()
1091     if result:
1092       DevCacheManager.RemoveCache(r_path)
1093   else:
1094     result = True
1095   if disk.children:
1096     for child in disk.children:
1097       result = result and ShutdownBlockDevice(child)
1098   return result
1099
1100
1101 def MirrorAddChildren(parent_cdev, new_cdevs):
1102   """Extend a mirrored block device.
1103
1104   @type parent_cdev: L{objects.Disk}
1105   @param parent_cdev: the disk to which we should add children
1106   @type new_cdevs: list of L{objects.Disk}
1107   @param new_cdevs: the list of children which we should add
1108   @rtype: boolean
1109   @return: the success of the operation
1110
1111   """
1112   parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
1113   if parent_bdev is None:
1114     logging.error("Can't find parent device")
1115     return False
1116   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1117   if new_bdevs.count(None) > 0:
1118     logging.error("Can't find new device(s) to add: %s:%s",
1119                   new_bdevs, new_cdevs)
1120     return False
1121   parent_bdev.AddChildren(new_bdevs)
1122   return True
1123
1124
1125 def MirrorRemoveChildren(parent_cdev, new_cdevs):
1126   """Shrink a mirrored block device.
1127
1128   @type parent_cdev: L{objects.Disk}
1129   @param parent_cdev: the disk from which we should remove children
1130   @type new_cdevs: list of L{objects.Disk}
1131   @param new_cdevs: the list of children which we should remove
1132   @rtype: boolean
1133   @return: the success of the operation
1134
1135   """
1136   parent_bdev = _RecursiveFindBD(parent_cdev)
1137   if parent_bdev is None:
1138     logging.error("Can't find parent in remove children: %s", parent_cdev)
1139     return False
1140   devs = []
1141   for disk in new_cdevs:
1142     rpath = disk.StaticDevPath()
1143     if rpath is None:
1144       bd = _RecursiveFindBD(disk)
1145       if bd is None:
1146         logging.error("Can't find dynamic device %s while removing children",
1147                       disk)
1148         return False
1149       else:
1150         devs.append(bd.dev_path)
1151     else:
1152       devs.append(rpath)
1153   parent_bdev.RemoveChildren(devs)
1154   return True
1155
1156
1157 def GetMirrorStatus(disks):
1158   """Get the mirroring status of a list of devices.
1159
1160   @type disks: list of L{objects.Disk}
1161   @param disks: the list of disks which we should query
1162   @rtype: disk
1163   @return:
1164       a list of (mirror_done, estimated_time) tuples, which
1165       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1166   @raise errors.BlockDeviceError: if any of the disks cannot be
1167       found
1168
1169   """
1170   stats = []
1171   for dsk in disks:
1172     rbd = _RecursiveFindBD(dsk)
1173     if rbd is None:
1174       raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1175     stats.append(rbd.CombinedSyncStatus())
1176   return stats
1177
1178
1179 def _RecursiveFindBD(disk, allow_partial=False):
1180   """Check if a device is activated.
1181
1182   If so, return informations about the real device.
1183
1184   @type disk: L{objects.Disk}
1185   @param disk: the disk object we need to find
1186   @type allow_partial: boolean
1187   @param allow_partial: if true, don't abort the find if a
1188       child of the device can't be found; this is intended
1189       to be used when repairing mirrors
1190
1191   @return: None if the device can't be found,
1192       otherwise the device instance
1193
1194   """
1195   children = []
1196   if disk.children:
1197     for chdisk in disk.children:
1198       children.append(_RecursiveFindBD(chdisk))
1199
1200   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1201
1202
1203 def FindBlockDevice(disk):
1204   """Check if a device is activated.
1205
1206   If it is, return informations about the real device.
1207
1208   @type disk: L{objects.Disk}
1209   @param disk: the disk to find
1210   @rtype: None or tuple
1211   @return: None if the disk cannot be found, otherwise a
1212       tuple (device_path, major, minor, sync_percent,
1213       estimated_time, is_degraded)
1214
1215   """
1216   rbd = _RecursiveFindBD(disk)
1217   if rbd is None:
1218     return rbd
1219   return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1220
1221
1222 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1223   """Write a file to the filesystem.
1224
1225   This allows the master to overwrite(!) a file. It will only perform
1226   the operation if the file belongs to a list of configuration files.
1227
1228   @type file_name: str
1229   @param file_name: the target file name
1230   @type data: str
1231   @param data: the new contents of the file
1232   @type mode: int
1233   @param mode: the mode to give the file (can be None)
1234   @type uid: int
1235   @param uid: the owner of the file (can be -1 for default)
1236   @type gid: int
1237   @param gid: the group of the file (can be -1 for default)
1238   @type atime: float
1239   @param atime: the atime to set on the file (can be None)
1240   @type mtime: float
1241   @param mtime: the mtime to set on the file (can be None)
1242   @rtype: boolean
1243   @return: the success of the operation; errors are logged
1244       in the node daemon log
1245
1246   """
1247   if not os.path.isabs(file_name):
1248     logging.error("Filename passed to UploadFile is not absolute: '%s'",
1249                   file_name)
1250     return False
1251
1252   allowed_files = [
1253     constants.CLUSTER_CONF_FILE,
1254     constants.ETC_HOSTS,
1255     constants.SSH_KNOWN_HOSTS_FILE,
1256     constants.VNC_PASSWORD_FILE,
1257     ]
1258
1259   if file_name not in allowed_files:
1260     logging.error("Filename passed to UploadFile not in allowed"
1261                  " upload targets: '%s'", file_name)
1262     return False
1263
1264   raw_data = _Decompress(data)
1265
1266   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1267                   atime=atime, mtime=mtime)
1268   return True
1269
1270
1271 def WriteSsconfFiles(values):
1272   """Update all ssconf files.
1273
1274   Wrapper around the SimpleStore.WriteFiles.
1275
1276   """
1277   ssconf.SimpleStore().WriteFiles(values)
1278
1279
1280 def _ErrnoOrStr(err):
1281   """Format an EnvironmentError exception.
1282
1283   If the L{err} argument has an errno attribute, it will be looked up
1284   and converted into a textual C{E...} description. Otherwise the
1285   string representation of the error will be returned.
1286
1287   @type err: L{EnvironmentError}
1288   @param err: the exception to format
1289
1290   """
1291   if hasattr(err, 'errno'):
1292     detail = errno.errorcode[err.errno]
1293   else:
1294     detail = str(err)
1295   return detail
1296
1297
1298 def _OSOndiskVersion(name, os_dir):
1299   """Compute and return the API version of a given OS.
1300
1301   This function will try to read the API version of the OS given by
1302   the 'name' parameter and residing in the 'os_dir' directory.
1303
1304   @type name: str
1305   @param name: the OS name we should look for
1306   @type os_dir: str
1307   @param os_dir: the directory inwhich we should look for the OS
1308   @rtype: int or None
1309   @return:
1310       Either an integer denoting the version or None in the
1311       case when this is not a valid OS name.
1312   @raise errors.InvalidOS: if the OS cannot be found
1313
1314   """
1315   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1316
1317   try:
1318     st = os.stat(api_file)
1319   except EnvironmentError, err:
1320     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1321                            " found (%s)" % _ErrnoOrStr(err))
1322
1323   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1324     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1325                            " a regular file")
1326
1327   try:
1328     f = open(api_file)
1329     try:
1330       api_versions = f.readlines()
1331     finally:
1332       f.close()
1333   except EnvironmentError, err:
1334     raise errors.InvalidOS(name, os_dir, "error while reading the"
1335                            " API version (%s)" % _ErrnoOrStr(err))
1336
1337   api_versions = [version.strip() for version in api_versions]
1338   try:
1339     api_versions = [int(version) for version in api_versions]
1340   except (TypeError, ValueError), err:
1341     raise errors.InvalidOS(name, os_dir,
1342                            "API version is not integer (%s)" % str(err))
1343
1344   return api_versions
1345
1346
1347 def DiagnoseOS(top_dirs=None):
1348   """Compute the validity for all OSes.
1349
1350   @type top_dirs: list
1351   @param top_dirs: the list of directories in which to
1352       search (if not given defaults to
1353       L{constants.OS_SEARCH_PATH})
1354   @rtype: list of L{objects.OS}
1355   @return: an OS object for each name in all the given
1356       directories
1357
1358   """
1359   if top_dirs is None:
1360     top_dirs = constants.OS_SEARCH_PATH
1361
1362   result = []
1363   for dir_name in top_dirs:
1364     if os.path.isdir(dir_name):
1365       try:
1366         f_names = utils.ListVisibleFiles(dir_name)
1367       except EnvironmentError, err:
1368         logging.exception("Can't list the OS directory %s", dir_name)
1369         break
1370       for name in f_names:
1371         try:
1372           os_inst = OSFromDisk(name, base_dir=dir_name)
1373           result.append(os_inst)
1374         except errors.InvalidOS, err:
1375           result.append(objects.OS.FromInvalidOS(err))
1376
1377   return result
1378
1379
1380 def OSFromDisk(name, base_dir=None):
1381   """Create an OS instance from disk.
1382
1383   This function will return an OS instance if the given name is a
1384   valid OS name. Otherwise, it will raise an appropriate
1385   L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1386
1387   @type base_dir: string
1388   @keyword base_dir: Base directory containing OS installations.
1389                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1390   @rtype: L{objects.OS}
1391   @return: the OS instance if we find a valid one
1392   @raise errors.InvalidOS: if we don't find a valid OS
1393
1394   """
1395   if base_dir is None:
1396     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1397     if os_dir is None:
1398       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1399   else:
1400     os_dir = os.path.sep.join([base_dir, name])
1401
1402   api_versions = _OSOndiskVersion(name, os_dir)
1403
1404   if constants.OS_API_VERSION not in api_versions:
1405     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1406                            " (found %s want %s)"
1407                            % (api_versions, constants.OS_API_VERSION))
1408
1409   # OS Scripts dictionary, we will populate it with the actual script names
1410   os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1411
1412   for script in os_scripts:
1413     os_scripts[script] = os.path.sep.join([os_dir, script])
1414
1415     try:
1416       st = os.stat(os_scripts[script])
1417     except EnvironmentError, err:
1418       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1419                              (script, _ErrnoOrStr(err)))
1420
1421     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1422       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1423                              script)
1424
1425     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1426       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1427                              script)
1428
1429
1430   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1431                     create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1432                     export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1433                     import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1434                     rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1435                     api_versions=api_versions)
1436
1437 def OSEnvironment(instance, debug=0):
1438   """Calculate the environment for an os script.
1439
1440   @type instance: L{objects.Instance}
1441   @param instance: target instance for the os script run
1442   @type debug: integer
1443   @param debug: debug level (0 or 1, for OS Api 10)
1444   @rtype: dict
1445   @return: dict of environment variables
1446   @raise errors.BlockDeviceError: if the block device
1447       cannot be found
1448
1449   """
1450   result = {}
1451   result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1452   result['INSTANCE_NAME'] = instance.name
1453   result['HYPERVISOR'] = instance.hypervisor
1454   result['DISK_COUNT'] = '%d' % len(instance.disks)
1455   result['NIC_COUNT'] = '%d' % len(instance.nics)
1456   result['DEBUG_LEVEL'] = '%d' % debug
1457   for idx, disk in enumerate(instance.disks):
1458     real_disk = _RecursiveFindBD(disk)
1459     if real_disk is None:
1460       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1461                                     str(disk))
1462     real_disk.Open()
1463     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1464     # FIXME: When disks will have read-only mode, populate this
1465     result['DISK_%d_ACCESS' % idx] = 'W'
1466     if constants.HV_DISK_TYPE in instance.hvparams:
1467       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1468         instance.hvparams[constants.HV_DISK_TYPE]
1469     if disk.dev_type in constants.LDS_BLOCK:
1470       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1471     elif disk.dev_type == constants.LD_FILE:
1472       result['DISK_%d_BACKEND_TYPE' % idx] = \
1473         'file:%s' % disk.physical_id[0]
1474   for idx, nic in enumerate(instance.nics):
1475     result['NIC_%d_MAC' % idx] = nic.mac
1476     if nic.ip:
1477       result['NIC_%d_IP' % idx] = nic.ip
1478     result['NIC_%d_BRIDGE' % idx] = nic.bridge
1479     if constants.HV_NIC_TYPE in instance.hvparams:
1480       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1481         instance.hvparams[constants.HV_NIC_TYPE]
1482
1483   return result
1484
1485 def GrowBlockDevice(disk, amount):
1486   """Grow a stack of block devices.
1487
1488   This function is called recursively, with the childrens being the
1489   first ones to resize.
1490
1491   @type disk: L{objects.Disk}
1492   @param disk: the disk to be grown
1493   @rtype: (status, result)
1494   @return: a tuple with the status of the operation
1495       (True/False), and the errors message if status
1496       is False
1497
1498   """
1499   r_dev = _RecursiveFindBD(disk)
1500   if r_dev is None:
1501     return False, "Cannot find block device %s" % (disk,)
1502
1503   try:
1504     r_dev.Grow(amount)
1505   except errors.BlockDeviceError, err:
1506     return False, str(err)
1507
1508   return True, None
1509
1510
1511 def SnapshotBlockDevice(disk):
1512   """Create a snapshot copy of a block device.
1513
1514   This function is called recursively, and the snapshot is actually created
1515   just for the leaf lvm backend device.
1516
1517   @type disk: L{objects.Disk}
1518   @param disk: the disk to be snapshotted
1519   @rtype: string
1520   @return: snapshot disk path
1521
1522   """
1523   if disk.children:
1524     if len(disk.children) == 1:
1525       # only one child, let's recurse on it
1526       return SnapshotBlockDevice(disk.children[0])
1527     else:
1528       # more than one child, choose one that matches
1529       for child in disk.children:
1530         if child.size == disk.size:
1531           # return implies breaking the loop
1532           return SnapshotBlockDevice(child)
1533   elif disk.dev_type == constants.LD_LV:
1534     r_dev = _RecursiveFindBD(disk)
1535     if r_dev is not None:
1536       # let's stay on the safe side and ask for the full size, for now
1537       return r_dev.Snapshot(disk.size)
1538     else:
1539       return None
1540   else:
1541     raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1542                                  " '%s' of type '%s'" %
1543                                  (disk.unique_id, disk.dev_type))
1544
1545
1546 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1547   """Export a block device snapshot to a remote node.
1548
1549   @type disk: L{objects.Disk}
1550   @param disk: the description of the disk to export
1551   @type dest_node: str
1552   @param dest_node: the destination node to export to
1553   @type instance: L{objects.Instance}
1554   @param instance: the instance object to whom the disk belongs
1555   @type cluster_name: str
1556   @param cluster_name: the cluster name, needed for SSH hostalias
1557   @type idx: int
1558   @param idx: the index of the disk in the instance's disk list,
1559       used to export to the OS scripts environment
1560   @rtype: boolean
1561   @return: the success of the operation
1562
1563   """
1564   export_env = OSEnvironment(instance)
1565
1566   inst_os = OSFromDisk(instance.os)
1567   export_script = inst_os.export_script
1568
1569   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1570                                      instance.name, int(time.time()))
1571   if not os.path.exists(constants.LOG_OS_DIR):
1572     os.mkdir(constants.LOG_OS_DIR, 0750)
1573   real_disk = _RecursiveFindBD(disk)
1574   if real_disk is None:
1575     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1576                                   str(disk))
1577   real_disk.Open()
1578
1579   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1580   export_env['EXPORT_INDEX'] = str(idx)
1581
1582   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1583   destfile = disk.physical_id[1]
1584
1585   # the target command is built out of three individual commands,
1586   # which are joined by pipes; we check each individual command for
1587   # valid parameters
1588   expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1589                                export_script, logfile)
1590
1591   comprcmd = "gzip"
1592
1593   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1594                                 destdir, destdir, destfile)
1595   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1596                                                    constants.GANETI_RUNAS,
1597                                                    destcmd)
1598
1599   # all commands have been checked, so we're safe to combine them
1600   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1601
1602   result = utils.RunCmd(command, env=export_env)
1603
1604   if result.failed:
1605     logging.error("os snapshot export command '%s' returned error: %s"
1606                   " output: %s", command, result.fail_reason, result.output)
1607     return False
1608
1609   return True
1610
1611
1612 def FinalizeExport(instance, snap_disks):
1613   """Write out the export configuration information.
1614
1615   @type instance: L{objects.Instance}
1616   @param instance: the instance which we export, used for
1617       saving configuration
1618   @type snap_disks: list of L{objects.Disk}
1619   @param snap_disks: list of snapshot block devices, which
1620       will be used to get the actual name of the dump file
1621
1622   @rtype: boolean
1623   @return: the success of the operation
1624
1625   """
1626   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1627   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1628
1629   config = objects.SerializableConfigParser()
1630
1631   config.add_section(constants.INISECT_EXP)
1632   config.set(constants.INISECT_EXP, 'version', '0')
1633   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1634   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1635   config.set(constants.INISECT_EXP, 'os', instance.os)
1636   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1637
1638   config.add_section(constants.INISECT_INS)
1639   config.set(constants.INISECT_INS, 'name', instance.name)
1640   config.set(constants.INISECT_INS, 'memory', '%d' %
1641              instance.beparams[constants.BE_MEMORY])
1642   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1643              instance.beparams[constants.BE_VCPUS])
1644   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1645
1646   nic_count = 0
1647   for nic_count, nic in enumerate(instance.nics):
1648     config.set(constants.INISECT_INS, 'nic%d_mac' %
1649                nic_count, '%s' % nic.mac)
1650     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1651     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1652                '%s' % nic.bridge)
1653   # TODO: redundant: on load can read nics until it doesn't exist
1654   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1655
1656   disk_total = 0
1657   for disk_count, disk in enumerate(snap_disks):
1658     if disk:
1659       disk_total += 1
1660       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1661                  ('%s' % disk.iv_name))
1662       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1663                  ('%s' % disk.physical_id[1]))
1664       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1665                  ('%d' % disk.size))
1666
1667   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1668
1669   utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1670                   data=config.Dumps())
1671   shutil.rmtree(finaldestdir, True)
1672   shutil.move(destdir, finaldestdir)
1673
1674   return True
1675
1676
1677 def ExportInfo(dest):
1678   """Get export configuration information.
1679
1680   @type dest: str
1681   @param dest: directory containing the export
1682
1683   @rtype: L{objects.SerializableConfigParser}
1684   @return: a serializable config file containing the
1685       export info
1686
1687   """
1688   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1689
1690   config = objects.SerializableConfigParser()
1691   config.read(cff)
1692
1693   if (not config.has_section(constants.INISECT_EXP) or
1694       not config.has_section(constants.INISECT_INS)):
1695     return None
1696
1697   return config
1698
1699
1700 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1701   """Import an os image into an instance.
1702
1703   @type instance: L{objects.Instance}
1704   @param instance: instance to import the disks into
1705   @type src_node: string
1706   @param src_node: source node for the disk images
1707   @type src_images: list of string
1708   @param src_images: absolute paths of the disk images
1709   @rtype: list of boolean
1710   @return: each boolean represent the success of importing the n-th disk
1711
1712   """
1713   import_env = OSEnvironment(instance)
1714   inst_os = OSFromDisk(instance.os)
1715   import_script = inst_os.import_script
1716
1717   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1718                                         instance.name, int(time.time()))
1719   if not os.path.exists(constants.LOG_OS_DIR):
1720     os.mkdir(constants.LOG_OS_DIR, 0750)
1721
1722   comprcmd = "gunzip"
1723   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1724                                import_script, logfile)
1725
1726   final_result = []
1727   for idx, image in enumerate(src_images):
1728     if image:
1729       destcmd = utils.BuildShellCmd('cat %s', image)
1730       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1731                                                        constants.GANETI_RUNAS,
1732                                                        destcmd)
1733       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1734       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1735       import_env['IMPORT_INDEX'] = str(idx)
1736       result = utils.RunCmd(command, env=import_env)
1737       if result.failed:
1738         logging.error("Disk import command '%s' returned error: %s"
1739                       " output: %s", command, result.fail_reason,
1740                       result.output)
1741         final_result.append(False)
1742       else:
1743         final_result.append(True)
1744     else:
1745       final_result.append(True)
1746
1747   return final_result
1748
1749
1750 def ListExports():
1751   """Return a list of exports currently available on this machine.
1752
1753   @rtype: list
1754   @return: list of the exports
1755
1756   """
1757   if os.path.isdir(constants.EXPORT_DIR):
1758     return utils.ListVisibleFiles(constants.EXPORT_DIR)
1759   else:
1760     return []
1761
1762
1763 def RemoveExport(export):
1764   """Remove an existing export from the node.
1765
1766   @type export: str
1767   @param export: the name of the export to remove
1768   @rtype: boolean
1769   @return: the success of the operation
1770
1771   """
1772   target = os.path.join(constants.EXPORT_DIR, export)
1773
1774   shutil.rmtree(target)
1775   # TODO: catch some of the relevant exceptions and provide a pretty
1776   # error message if rmtree fails.
1777
1778   return True
1779
1780
1781 def RenameBlockDevices(devlist):
1782   """Rename a list of block devices.
1783
1784   @type devlist: list of tuples
1785   @param devlist: list of tuples of the form  (disk,
1786       new_logical_id, new_physical_id); disk is an
1787       L{objects.Disk} object describing the current disk,
1788       and new logical_id/physical_id is the name we
1789       rename it to
1790   @rtype: boolean
1791   @return: True if all renames succeeded, False otherwise
1792
1793   """
1794   result = True
1795   for disk, unique_id in devlist:
1796     dev = _RecursiveFindBD(disk)
1797     if dev is None:
1798       result = False
1799       continue
1800     try:
1801       old_rpath = dev.dev_path
1802       dev.Rename(unique_id)
1803       new_rpath = dev.dev_path
1804       if old_rpath != new_rpath:
1805         DevCacheManager.RemoveCache(old_rpath)
1806         # FIXME: we should add the new cache information here, like:
1807         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1808         # but we don't have the owner here - maybe parse from existing
1809         # cache? for now, we only lose lvm data when we rename, which
1810         # is less critical than DRBD or MD
1811     except errors.BlockDeviceError, err:
1812       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1813       result = False
1814   return result
1815
1816
1817 def _TransformFileStorageDir(file_storage_dir):
1818   """Checks whether given file_storage_dir is valid.
1819
1820   Checks wheter the given file_storage_dir is within the cluster-wide
1821   default file_storage_dir stored in SimpleStore. Only paths under that
1822   directory are allowed.
1823
1824   @type file_storage_dir: str
1825   @param file_storage_dir: the path to check
1826
1827   @return: the normalized path if valid, None otherwise
1828
1829   """
1830   cfg = _GetConfig()
1831   file_storage_dir = os.path.normpath(file_storage_dir)
1832   base_file_storage_dir = cfg.GetFileStorageDir()
1833   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1834       base_file_storage_dir):
1835     logging.error("file storage directory '%s' is not under base file"
1836                   " storage directory '%s'",
1837                   file_storage_dir, base_file_storage_dir)
1838     return None
1839   return file_storage_dir
1840
1841
1842 def CreateFileStorageDir(file_storage_dir):
1843   """Create file storage directory.
1844
1845   @type file_storage_dir: str
1846   @param file_storage_dir: directory to create
1847
1848   @rtype: tuple
1849   @return: tuple with first element a boolean indicating wheter dir
1850       creation was successful or not
1851
1852   """
1853   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1854   result = True,
1855   if not file_storage_dir:
1856     result = False,
1857   else:
1858     if os.path.exists(file_storage_dir):
1859       if not os.path.isdir(file_storage_dir):
1860         logging.error("'%s' is not a directory", file_storage_dir)
1861         result = False,
1862     else:
1863       try:
1864         os.makedirs(file_storage_dir, 0750)
1865       except OSError, err:
1866         logging.error("Cannot create file storage directory '%s': %s",
1867                       file_storage_dir, err)
1868         result = False,
1869   return result
1870
1871
1872 def RemoveFileStorageDir(file_storage_dir):
1873   """Remove file storage directory.
1874
1875   Remove it only if it's empty. If not log an error and return.
1876
1877   @type file_storage_dir: str
1878   @param file_storage_dir: the directory we should cleanup
1879   @rtype: tuple (success,)
1880   @return: tuple of one element, C{success}, denoting
1881       whether the operation was successfull
1882
1883   """
1884   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1885   result = True,
1886   if not file_storage_dir:
1887     result = False,
1888   else:
1889     if os.path.exists(file_storage_dir):
1890       if not os.path.isdir(file_storage_dir):
1891         logging.error("'%s' is not a directory", file_storage_dir)
1892         result = False,
1893       # deletes dir only if empty, otherwise we want to return False
1894       try:
1895         os.rmdir(file_storage_dir)
1896       except OSError, err:
1897         logging.exception("Cannot remove file storage directory '%s'",
1898                           file_storage_dir)
1899         result = False,
1900   return result
1901
1902
1903 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1904   """Rename the file storage directory.
1905
1906   @type old_file_storage_dir: str
1907   @param old_file_storage_dir: the current path
1908   @type new_file_storage_dir: str
1909   @param new_file_storage_dir: the name we should rename to
1910   @rtype: tuple (success,)
1911   @return: tuple of one element, C{success}, denoting
1912       whether the operation was successful
1913
1914   """
1915   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1916   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1917   result = True,
1918   if not old_file_storage_dir or not new_file_storage_dir:
1919     result = False,
1920   else:
1921     if not os.path.exists(new_file_storage_dir):
1922       if os.path.isdir(old_file_storage_dir):
1923         try:
1924           os.rename(old_file_storage_dir, new_file_storage_dir)
1925         except OSError, err:
1926           logging.exception("Cannot rename '%s' to '%s'",
1927                             old_file_storage_dir, new_file_storage_dir)
1928           result =  False,
1929       else:
1930         logging.error("'%s' is not a directory", old_file_storage_dir)
1931         result = False,
1932     else:
1933       if os.path.exists(old_file_storage_dir):
1934         logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1935                       old_file_storage_dir, new_file_storage_dir)
1936         result = False,
1937   return result
1938
1939
1940 def _IsJobQueueFile(file_name):
1941   """Checks whether the given filename is in the queue directory.
1942
1943   @type file_name: str
1944   @param file_name: the file name we should check
1945   @rtype: boolean
1946   @return: whether the file is under the queue directory
1947
1948   """
1949   queue_dir = os.path.normpath(constants.QUEUE_DIR)
1950   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
1951
1952   if not result:
1953     logging.error("'%s' is not a file in the queue directory",
1954                   file_name)
1955
1956   return result
1957
1958
1959 def JobQueueUpdate(file_name, content):
1960   """Updates a file in the queue directory.
1961
1962   This is just a wrapper over L{utils.WriteFile}, with proper
1963   checking.
1964
1965   @type file_name: str
1966   @param file_name: the job file name
1967   @type content: str
1968   @param content: the new job contents
1969   @rtype: boolean
1970   @return: the success of the operation
1971
1972   """
1973   if not _IsJobQueueFile(file_name):
1974     return False
1975
1976   # Write and replace the file atomically
1977   utils.WriteFile(file_name, data=_Decompress(content))
1978
1979   return True
1980
1981
1982 def JobQueueRename(old, new):
1983   """Renames a job queue file.
1984
1985   This is just a wrapper over os.rename with proper checking.
1986
1987   @type old: str
1988   @param old: the old (actual) file name
1989   @type new: str
1990   @param new: the desired file name
1991   @rtype: boolean
1992   @return: the success of the operation
1993
1994   """
1995   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
1996     return False
1997
1998   os.rename(old, new)
1999
2000   return True
2001
2002
2003 def JobQueueSetDrainFlag(drain_flag):
2004   """Set the drain flag for the queue.
2005
2006   This will set or unset the queue drain flag.
2007
2008   @type drain_flag: boolean
2009   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2010   @rtype: boolean
2011   @return: always True
2012   @warning: the function always returns True
2013
2014   """
2015   if drain_flag:
2016     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2017   else:
2018     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2019
2020   return True
2021
2022
2023 def CloseBlockDevices(disks):
2024   """Closes the given block devices.
2025
2026   This means they will be switched to secondary mode (in case of
2027   DRBD).
2028
2029   @type disks: list of L{objects.Disk}
2030   @param disks: the list of disks to be closed
2031   @rtype: tuple (success, message)
2032   @return: a tuple of success and message, where success
2033       indicates the succes of the operation, and message
2034       which will contain the error details in case we
2035       failed
2036
2037   """
2038   bdevs = []
2039   for cf in disks:
2040     rd = _RecursiveFindBD(cf)
2041     if rd is None:
2042       return (False, "Can't find device %s" % cf)
2043     bdevs.append(rd)
2044
2045   msg = []
2046   for rd in bdevs:
2047     try:
2048       rd.Close()
2049     except errors.BlockDeviceError, err:
2050       msg.append(str(err))
2051   if msg:
2052     return (False, "Can't make devices secondary: %s" % ",".join(msg))
2053   else:
2054     return (True, "All devices secondary")
2055
2056
2057 def ValidateHVParams(hvname, hvparams):
2058   """Validates the given hypervisor parameters.
2059
2060   @type hvname: string
2061   @param hvname: the hypervisor name
2062   @type hvparams: dict
2063   @param hvparams: the hypervisor parameters to be validated
2064   @rtype: tuple (success, message)
2065   @return: a tuple of success and message, where success
2066       indicates the succes of the operation, and message
2067       which will contain the error details in case we
2068       failed
2069
2070   """
2071   try:
2072     hv_type = hypervisor.GetHypervisor(hvname)
2073     hv_type.ValidateParameters(hvparams)
2074     return (True, "Validation passed")
2075   except errors.HypervisorError, err:
2076     return (False, str(err))
2077
2078
2079 def DemoteFromMC():
2080   """Demotes the current node from master candidate role.
2081
2082   """
2083   # try to ensure we're not the master by mistake
2084   master, myself = ssconf.GetMasterAndMyself()
2085   if master == myself:
2086     return (False, "ssconf status shows I'm the master node, will not demote")
2087   pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2088   if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2089     return (False, "The master daemon is running, will not demote")
2090   try:
2091     utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2092   except EnvironmentError, err:
2093     if err.errno != errno.ENOENT:
2094       return (False, "Error while backing up cluster file: %s" % str(err))
2095   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2096   return (True, "Done")
2097
2098
2099 class HooksRunner(object):
2100   """Hook runner.
2101
2102   This class is instantiated on the node side (ganeti-noded) and not
2103   on the master side.
2104
2105   """
2106   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2107
2108   def __init__(self, hooks_base_dir=None):
2109     """Constructor for hooks runner.
2110
2111     @type hooks_base_dir: str or None
2112     @param hooks_base_dir: if not None, this overrides the
2113         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2114
2115     """
2116     if hooks_base_dir is None:
2117       hooks_base_dir = constants.HOOKS_BASE_DIR
2118     self._BASE_DIR = hooks_base_dir
2119
2120   @staticmethod
2121   def ExecHook(script, env):
2122     """Exec one hook script.
2123
2124     @type script: str
2125     @param script: the full path to the script
2126     @type env: dict
2127     @param env: the environment with which to exec the script
2128     @rtype: tuple (success, message)
2129     @return: a tuple of success and message, where success
2130         indicates the succes of the operation, and message
2131         which will contain the error details in case we
2132         failed
2133
2134     """
2135     # exec the process using subprocess and log the output
2136     fdstdin = None
2137     try:
2138       fdstdin = open("/dev/null", "r")
2139       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2140                                stderr=subprocess.STDOUT, close_fds=True,
2141                                shell=False, cwd="/", env=env)
2142       output = ""
2143       try:
2144         output = child.stdout.read(4096)
2145         child.stdout.close()
2146       except EnvironmentError, err:
2147         output += "Hook script error: %s" % str(err)
2148
2149       while True:
2150         try:
2151           result = child.wait()
2152           break
2153         except EnvironmentError, err:
2154           if err.errno == errno.EINTR:
2155             continue
2156           raise
2157     finally:
2158       # try not to leak fds
2159       for fd in (fdstdin, ):
2160         if fd is not None:
2161           try:
2162             fd.close()
2163           except EnvironmentError, err:
2164             # just log the error
2165             #logging.exception("Error while closing fd %s", fd)
2166             pass
2167
2168     return result == 0, output
2169
2170   def RunHooks(self, hpath, phase, env):
2171     """Run the scripts in the hooks directory.
2172
2173     @type hpath: str
2174     @param hpath: the path to the hooks directory which
2175         holds the scripts
2176     @type phase: str
2177     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2178         L{constants.HOOKS_PHASE_POST}
2179     @type env: dict
2180     @param env: dictionary with the environment for the hook
2181     @rtype: list
2182     @return: list of 3-element tuples:
2183       - script path
2184       - script result, either L{constants.HKR_SUCCESS} or
2185         L{constants.HKR_FAIL}
2186       - output of the script
2187
2188     @raise errors.ProgrammerError: for invalid input
2189         parameters
2190
2191     """
2192     if phase == constants.HOOKS_PHASE_PRE:
2193       suffix = "pre"
2194     elif phase == constants.HOOKS_PHASE_POST:
2195       suffix = "post"
2196     else:
2197       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2198     rr = []
2199
2200     subdir = "%s-%s.d" % (hpath, suffix)
2201     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2202     try:
2203       dir_contents = utils.ListVisibleFiles(dir_name)
2204     except OSError, err:
2205       # FIXME: must log output in case of failures
2206       return rr
2207
2208     # we use the standard python sort order,
2209     # so 00name is the recommended naming scheme
2210     dir_contents.sort()
2211     for relname in dir_contents:
2212       fname = os.path.join(dir_name, relname)
2213       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2214           self.RE_MASK.match(relname) is not None):
2215         rrval = constants.HKR_SKIP
2216         output = ""
2217       else:
2218         result, output = self.ExecHook(fname, env)
2219         if not result:
2220           rrval = constants.HKR_FAIL
2221         else:
2222           rrval = constants.HKR_SUCCESS
2223       rr.append(("%s/%s" % (subdir, relname), rrval, output))
2224
2225     return rr
2226
2227
2228 class IAllocatorRunner(object):
2229   """IAllocator runner.
2230
2231   This class is instantiated on the node side (ganeti-noded) and not on
2232   the master side.
2233
2234   """
2235   def Run(self, name, idata):
2236     """Run an iallocator script.
2237
2238     @type name: str
2239     @param name: the iallocator script name
2240     @type idata: str
2241     @param idata: the allocator input data
2242
2243     @rtype: tuple
2244     @return: four element tuple of:
2245        - run status (one of the IARUN_ constants)
2246        - stdout
2247        - stderr
2248        - fail reason (as from L{utils.RunResult})
2249
2250     """
2251     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2252                                   os.path.isfile)
2253     if alloc_script is None:
2254       return (constants.IARUN_NOTFOUND, None, None, None)
2255
2256     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2257     try:
2258       os.write(fd, idata)
2259       os.close(fd)
2260       result = utils.RunCmd([alloc_script, fin_name])
2261       if result.failed:
2262         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2263                 result.fail_reason)
2264     finally:
2265       os.unlink(fin_name)
2266
2267     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2268
2269
2270 class DevCacheManager(object):
2271   """Simple class for managing a cache of block device information.
2272
2273   """
2274   _DEV_PREFIX = "/dev/"
2275   _ROOT_DIR = constants.BDEV_CACHE_DIR
2276
2277   @classmethod
2278   def _ConvertPath(cls, dev_path):
2279     """Converts a /dev/name path to the cache file name.
2280
2281     This replaces slashes with underscores and strips the /dev
2282     prefix. It then returns the full path to the cache file.
2283
2284     @type dev_path: str
2285     @param dev_path: the C{/dev/} path name
2286     @rtype: str
2287     @return: the converted path name
2288
2289     """
2290     if dev_path.startswith(cls._DEV_PREFIX):
2291       dev_path = dev_path[len(cls._DEV_PREFIX):]
2292     dev_path = dev_path.replace("/", "_")
2293     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2294     return fpath
2295
2296   @classmethod
2297   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2298     """Updates the cache information for a given device.
2299
2300     @type dev_path: str
2301     @param dev_path: the pathname of the device
2302     @type owner: str
2303     @param owner: the owner (instance name) of the device
2304     @type on_primary: bool
2305     @param on_primary: whether this is the primary
2306         node nor not
2307     @type iv_name: str
2308     @param iv_name: the instance-visible name of the
2309         device, as in objects.Disk.iv_name
2310
2311     @rtype: None
2312
2313     """
2314     if dev_path is None:
2315       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2316       return
2317     fpath = cls._ConvertPath(dev_path)
2318     if on_primary:
2319       state = "primary"
2320     else:
2321       state = "secondary"
2322     if iv_name is None:
2323       iv_name = "not_visible"
2324     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2325     try:
2326       utils.WriteFile(fpath, data=fdata)
2327     except EnvironmentError, err:
2328       logging.exception("Can't update bdev cache for %s", dev_path)
2329
2330   @classmethod
2331   def RemoveCache(cls, dev_path):
2332     """Remove data for a dev_path.
2333
2334     This is just a wrapper over L{utils.RemoveFile} with a converted
2335     path name and logging.
2336
2337     @type dev_path: str
2338     @param dev_path: the pathname of the device
2339
2340     @rtype: None
2341
2342     """
2343     if dev_path is None:
2344       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2345       return
2346     fpath = cls._ConvertPath(dev_path)
2347     try:
2348       utils.RemoveFile(fpath)
2349     except EnvironmentError, err:
2350       logging.exception("Can't update bdev cache for %s", dev_path)