Simplify hypervisor block_devices structure
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36 import zlib
37 import base64
38
39 from ganeti import errors
40 from ganeti import utils
41 from ganeti import ssh
42 from ganeti import hypervisor
43 from ganeti import constants
44 from ganeti import bdev
45 from ganeti import objects
46 from ganeti import ssconf
47
48
49 def _GetConfig():
50   """Simple wrapper to return a SimpleStore.
51
52   @rtype: L{ssconf.SimpleStore}
53   @return: a SimpleStore instance
54
55   """
56   return ssconf.SimpleStore()
57
58
59 def _GetSshRunner(cluster_name):
60   """Simple wrapper to return an SshRunner.
61
62   @type cluster_name: str
63   @param cluster_name: the cluster name, which is needed
64       by the SshRunner constructor
65   @rtype: L{ssh.SshRunner}
66   @return: an SshRunner instance
67
68   """
69   return ssh.SshRunner(cluster_name)
70
71
72 def _Decompress(data):
73   """Unpacks data compressed by the RPC client.
74
75   @type data: list or tuple
76   @param data: Data sent by RPC client
77   @rtype: str
78   @return: Decompressed data
79
80   """
81   assert isinstance(data, (list, tuple))
82   assert len(data) == 2
83   (encoding, content) = data
84   if encoding == constants.RPC_ENCODING_NONE:
85     return content
86   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
87     return zlib.decompress(base64.b64decode(content))
88   else:
89     raise AssertionError("Unknown data encoding")
90
91
92 def _CleanDirectory(path, exclude=None):
93   """Removes all regular files in a directory.
94
95   @type path: str
96   @param path: the directory to clean
97   @type exclude: list
98   @param exclude: list of files to be excluded, defaults
99       to the empty list
100
101   """
102   if not os.path.isdir(path):
103     return
104   if exclude is None:
105     exclude = []
106   else:
107     # Normalize excluded paths
108     exclude = [os.path.normpath(i) for i in exclude]
109
110   for rel_name in utils.ListVisibleFiles(path):
111     full_name = os.path.normpath(os.path.join(path, rel_name))
112     if full_name in exclude:
113       continue
114     if os.path.isfile(full_name) and not os.path.islink(full_name):
115       utils.RemoveFile(full_name)
116
117
118 def JobQueuePurge():
119   """Removes job queue files and archived jobs.
120
121   @rtype: None
122
123   """
124   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
125   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
126
127
128 def GetMasterInfo():
129   """Returns master information.
130
131   This is an utility function to compute master information, either
132   for consumption here or from the node daemon.
133
134   @rtype: tuple
135   @return: (master_netdev, master_ip, master_name) if we have a good
136       configuration, otherwise (None, None, None)
137
138   """
139   try:
140     cfg = _GetConfig()
141     master_netdev = cfg.GetMasterNetdev()
142     master_ip = cfg.GetMasterIP()
143     master_node = cfg.GetMasterNode()
144   except errors.ConfigurationError, err:
145     logging.exception("Cluster configuration incomplete")
146     return (None, None, None)
147   return (master_netdev, master_ip, master_node)
148
149
150 def StartMaster(start_daemons):
151   """Activate local node as master node.
152
153   The function will always try activate the IP address of the master
154   (unless someone else has it). It will also start the master daemons,
155   based on the start_daemons parameter.
156
157   @type start_daemons: boolean
158   @param start_daemons: whther to also start the master
159       daemons (ganeti-masterd and ganeti-rapi)
160   @rtype: None
161
162   """
163   ok = True
164   master_netdev, master_ip, _ = GetMasterInfo()
165   if not master_netdev:
166     return False
167
168   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
169     if utils.OwnIpAddress(master_ip):
170       # we already have the ip:
171       logging.debug("Already started")
172     else:
173       logging.error("Someone else has the master ip, not activating")
174       ok = False
175   else:
176     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
177                            "dev", master_netdev, "label",
178                            "%s:0" % master_netdev])
179     if result.failed:
180       logging.error("Can't activate master IP: %s", result.output)
181       ok = False
182
183     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
184                            "-s", master_ip, master_ip])
185     # we'll ignore the exit code of arping
186
187   # and now start the master and rapi daemons
188   if start_daemons:
189     for daemon in 'ganeti-masterd', 'ganeti-rapi':
190       result = utils.RunCmd([daemon])
191       if result.failed:
192         logging.error("Can't start daemon %s: %s", daemon, result.output)
193         ok = False
194   return ok
195
196
197 def StopMaster(stop_daemons):
198   """Deactivate this node as master.
199
200   The function will always try to deactivate the IP address of the
201   master. It will also stop the master daemons depending on the
202   stop_daemons parameter.
203
204   @type stop_daemons: boolean
205   @param stop_daemons: whether to also stop the master daemons
206       (ganeti-masterd and ganeti-rapi)
207   @rtype: None
208
209   """
210   master_netdev, master_ip, _ = GetMasterInfo()
211   if not master_netdev:
212     return False
213
214   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
215                          "dev", master_netdev])
216   if result.failed:
217     logging.error("Can't remove the master IP, error: %s", result.output)
218     # but otherwise ignore the failure
219
220   if stop_daemons:
221     # stop/kill the rapi and the master daemon
222     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
223       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
224
225   return True
226
227
228 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
229   """Joins this node to the cluster.
230
231   This does the following:
232       - updates the hostkeys of the machine (rsa and dsa)
233       - adds the ssh private key to the user
234       - adds the ssh public key to the users' authorized_keys file
235
236   @type dsa: str
237   @param dsa: the DSA private key to write
238   @type dsapub: str
239   @param dsapub: the DSA public key to write
240   @type rsa: str
241   @param rsa: the RSA private key to write
242   @type rsapub: str
243   @param rsapub: the RSA public key to write
244   @type sshkey: str
245   @param sshkey: the SSH private key to write
246   @type sshpub: str
247   @param sshpub: the SSH public key to write
248   @rtype: boolean
249   @return: the success of the operation
250
251   """
252   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
253                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
254                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
255                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
256   for name, content, mode in sshd_keys:
257     utils.WriteFile(name, data=content, mode=mode)
258
259   try:
260     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
261                                                     mkdir=True)
262   except errors.OpExecError, err:
263     logging.exception("Error while processing user ssh files")
264     return False
265
266   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
267     utils.WriteFile(name, data=content, mode=0600)
268
269   utils.AddAuthorizedKey(auth_keys, sshpub)
270
271   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
272
273   return True
274
275
276 def LeaveCluster():
277   """Cleans up and remove the current node.
278
279   This function cleans up and prepares the current node to be removed
280   from the cluster.
281
282   If processing is successful, then it raises an
283   L{errors.QuitGanetiException} which is used as a special case to
284   shutdown the node daemon.
285
286   """
287   _CleanDirectory(constants.DATA_DIR)
288   JobQueuePurge()
289
290   try:
291     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
292   except errors.OpExecError:
293     logging.exception("Error while processing ssh files")
294     return
295
296   f = open(pub_key, 'r')
297   try:
298     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
299   finally:
300     f.close()
301
302   utils.RemoveFile(priv_key)
303   utils.RemoveFile(pub_key)
304
305   # Return a reassuring string to the caller, and quit
306   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
307
308
309 def GetNodeInfo(vgname, hypervisor_type):
310   """Gives back a hash with different informations about the node.
311
312   @type vgname: C{string}
313   @param vgname: the name of the volume group to ask for disk space information
314   @type hypervisor_type: C{str}
315   @param hypervisor_type: the name of the hypervisor to ask for
316       memory information
317   @rtype: C{dict}
318   @return: dictionary with the following keys:
319       - vg_size is the size of the configured volume group in MiB
320       - vg_free is the free size of the volume group in MiB
321       - memory_dom0 is the memory allocated for domain0 in MiB
322       - memory_free is the currently available (free) ram in MiB
323       - memory_total is the total number of ram in MiB
324
325   """
326   outputarray = {}
327   vginfo = _GetVGInfo(vgname)
328   outputarray['vg_size'] = vginfo['vg_size']
329   outputarray['vg_free'] = vginfo['vg_free']
330
331   hyper = hypervisor.GetHypervisor(hypervisor_type)
332   hyp_info = hyper.GetNodeInfo()
333   if hyp_info is not None:
334     outputarray.update(hyp_info)
335
336   f = open("/proc/sys/kernel/random/boot_id", 'r')
337   try:
338     outputarray["bootid"] = f.read(128).rstrip("\n")
339   finally:
340     f.close()
341
342   return outputarray
343
344
345 def VerifyNode(what, cluster_name):
346   """Verify the status of the local node.
347
348   Based on the input L{what} parameter, various checks are done on the
349   local node.
350
351   If the I{filelist} key is present, this list of
352   files is checksummed and the file/checksum pairs are returned.
353
354   If the I{nodelist} key is present, we check that we have
355   connectivity via ssh with the target nodes (and check the hostname
356   report).
357
358   If the I{node-net-test} key is present, we check that we have
359   connectivity to the given nodes via both primary IP and, if
360   applicable, secondary IPs.
361
362   @type what: C{dict}
363   @param what: a dictionary of things to check:
364       - filelist: list of files for which to compute checksums
365       - nodelist: list of nodes we should check ssh communication with
366       - node-net-test: list of nodes we should check node daemon port
367         connectivity with
368       - hypervisor: list with hypervisors to run the verify for
369   @rtype: dict
370   @return: a dictionary with the same keys as the input dict, and
371       values representing the result of the checks
372
373   """
374   result = {}
375
376   if constants.NV_HYPERVISOR in what:
377     result[constants.NV_HYPERVISOR] = tmp = {}
378     for hv_name in what[constants.NV_HYPERVISOR]:
379       tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
380
381   if constants.NV_FILELIST in what:
382     result[constants.NV_FILELIST] = utils.FingerprintFiles(
383       what[constants.NV_FILELIST])
384
385   if constants.NV_NODELIST in what:
386     result[constants.NV_NODELIST] = tmp = {}
387     random.shuffle(what[constants.NV_NODELIST])
388     for node in what[constants.NV_NODELIST]:
389       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
390       if not success:
391         tmp[node] = message
392
393   if constants.NV_NODENETTEST in what:
394     result[constants.NV_NODENETTEST] = tmp = {}
395     my_name = utils.HostInfo().name
396     my_pip = my_sip = None
397     for name, pip, sip in what[constants.NV_NODENETTEST]:
398       if name == my_name:
399         my_pip = pip
400         my_sip = sip
401         break
402     if not my_pip:
403       tmp[my_name] = ("Can't find my own primary/secondary IP"
404                       " in the node list")
405     else:
406       port = utils.GetNodeDaemonPort()
407       for name, pip, sip in what[constants.NV_NODENETTEST]:
408         fail = []
409         if not utils.TcpPing(pip, port, source=my_pip):
410           fail.append("primary")
411         if sip != pip:
412           if not utils.TcpPing(sip, port, source=my_sip):
413             fail.append("secondary")
414         if fail:
415           tmp[name] = ("failure using the %s interface(s)" %
416                        " and ".join(fail))
417
418   if constants.NV_LVLIST in what:
419     result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
420
421   if constants.NV_INSTANCELIST in what:
422     result[constants.NV_INSTANCELIST] = GetInstanceList(
423       what[constants.NV_INSTANCELIST])
424
425   if constants.NV_VGLIST in what:
426     result[constants.NV_VGLIST] = ListVolumeGroups()
427
428   if constants.NV_VERSION in what:
429     result[constants.NV_VERSION] = constants.PROTOCOL_VERSION
430
431   if constants.NV_HVINFO in what:
432     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
433     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
434
435   return result
436
437
438 def GetVolumeList(vg_name):
439   """Compute list of logical volumes and their size.
440
441   @type vg_name: str
442   @param vg_name: the volume group whose LVs we should list
443   @rtype: dict
444   @return:
445       dictionary of all partions (key) with value being a tuple of
446       their size (in MiB), inactive and online status::
447
448         {'test1': ('20.06', True, True)}
449
450       in case of errors, a string is returned with the error
451       details.
452
453   """
454   lvs = {}
455   sep = '|'
456   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
457                          "--separator=%s" % sep,
458                          "-olv_name,lv_size,lv_attr", vg_name])
459   if result.failed:
460     logging.error("Failed to list logical volumes, lvs output: %s",
461                   result.output)
462     return result.output
463
464   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
465   for line in result.stdout.splitlines():
466     line = line.strip()
467     match = valid_line_re.match(line)
468     if not match:
469       logging.error("Invalid line returned from lvs output: '%s'", line)
470       continue
471     name, size, attr = match.groups()
472     inactive = attr[4] == '-'
473     online = attr[5] == 'o'
474     lvs[name] = (size, inactive, online)
475
476   return lvs
477
478
479 def ListVolumeGroups():
480   """List the volume groups and their size.
481
482   @rtype: dict
483   @return: dictionary with keys volume name and values the
484       size of the volume
485
486   """
487   return utils.ListVolumeGroups()
488
489
490 def NodeVolumes():
491   """List all volumes on this node.
492
493   @rtype: list
494   @return:
495     A list of dictionaries, each having four keys:
496       - name: the logical volume name,
497       - size: the size of the logical volume
498       - dev: the physical device on which the LV lives
499       - vg: the volume group to which it belongs
500
501     In case of errors, we return an empty list and log the
502     error.
503
504     Note that since a logical volume can live on multiple physical
505     volumes, the resulting list might include a logical volume
506     multiple times.
507
508   """
509   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
510                          "--separator=|",
511                          "--options=lv_name,lv_size,devices,vg_name"])
512   if result.failed:
513     logging.error("Failed to list logical volumes, lvs output: %s",
514                   result.output)
515     return []
516
517   def parse_dev(dev):
518     if '(' in dev:
519       return dev.split('(')[0]
520     else:
521       return dev
522
523   def map_line(line):
524     return {
525       'name': line[0].strip(),
526       'size': line[1].strip(),
527       'dev': parse_dev(line[2].strip()),
528       'vg': line[3].strip(),
529     }
530
531   return [map_line(line.split('|')) for line in result.stdout.splitlines()
532           if line.count('|') >= 3]
533
534
535 def BridgesExist(bridges_list):
536   """Check if a list of bridges exist on the current node.
537
538   @rtype: boolean
539   @return: C{True} if all of them exist, C{False} otherwise
540
541   """
542   for bridge in bridges_list:
543     if not utils.BridgeExists(bridge):
544       return False
545
546   return True
547
548
549 def GetInstanceList(hypervisor_list):
550   """Provides a list of instances.
551
552   @type hypervisor_list: list
553   @param hypervisor_list: the list of hypervisors to query information
554
555   @rtype: list
556   @return: a list of all running instances on the current node
557     - instance1.example.com
558     - instance2.example.com
559
560   """
561   results = []
562   for hname in hypervisor_list:
563     try:
564       names = hypervisor.GetHypervisor(hname).ListInstances()
565       results.extend(names)
566     except errors.HypervisorError, err:
567       logging.exception("Error enumerating instances for hypevisor %s", hname)
568       # FIXME: should we somehow not propagate this to the master?
569       raise
570
571   return results
572
573
574 def GetInstanceInfo(instance, hname):
575   """Gives back the informations about an instance as a dictionary.
576
577   @type instance: string
578   @param instance: the instance name
579   @type hname: string
580   @param hname: the hypervisor type of the instance
581
582   @rtype: dict
583   @return: dictionary with the following keys:
584       - memory: memory size of instance (int)
585       - state: xen state of instance (string)
586       - time: cpu time of instance (float)
587
588   """
589   output = {}
590
591   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
592   if iinfo is not None:
593     output['memory'] = iinfo[2]
594     output['state'] = iinfo[4]
595     output['time'] = iinfo[5]
596
597   return output
598
599
600 def GetAllInstancesInfo(hypervisor_list):
601   """Gather data about all instances.
602
603   This is the equivalent of L{GetInstanceInfo}, except that it
604   computes data for all instances at once, thus being faster if one
605   needs data about more than one instance.
606
607   @type hypervisor_list: list
608   @param hypervisor_list: list of hypervisors to query for instance data
609
610   @rtype: dict
611   @return: dictionary of instance: data, with data having the following keys:
612       - memory: memory size of instance (int)
613       - state: xen state of instance (string)
614       - time: cpu time of instance (float)
615       - vcpus: the number of vcpus
616
617   """
618   output = {}
619
620   for hname in hypervisor_list:
621     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
622     if iinfo:
623       for name, inst_id, memory, vcpus, state, times in iinfo:
624         value = {
625           'memory': memory,
626           'vcpus': vcpus,
627           'state': state,
628           'time': times,
629           }
630         if name in output and output[name] != value:
631           raise errors.HypervisorError("Instance %s running duplicate"
632                                        " with different parameters" % name)
633         output[name] = value
634
635   return output
636
637
638 def AddOSToInstance(instance):
639   """Add an OS to an instance.
640
641   @type instance: L{objects.Instance}
642   @param instance: Instance whose OS is to be installed
643   @rtype: boolean
644   @return: the success of the operation
645
646   """
647   inst_os = OSFromDisk(instance.os)
648
649   create_env = OSEnvironment(instance)
650
651   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
652                                      instance.name, int(time.time()))
653
654   result = utils.RunCmd([inst_os.create_script], env=create_env,
655                         cwd=inst_os.path, output=logfile,)
656   if result.failed:
657     logging.error("os create command '%s' returned error: %s, logfile: %s,"
658                   " output: %s", result.cmd, result.fail_reason, logfile,
659                   result.output)
660     return False
661
662   return True
663
664
665 def RunRenameInstance(instance, old_name):
666   """Run the OS rename script for an instance.
667
668   @type instance: L{objects.Instance}
669   @param instance: Instance whose OS is to be installed
670   @type old_name: string
671   @param old_name: previous instance name
672   @rtype: boolean
673   @return: the success of the operation
674
675   """
676   inst_os = OSFromDisk(instance.os)
677
678   rename_env = OSEnvironment(instance)
679   rename_env['OLD_INSTANCE_NAME'] = old_name
680
681   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
682                                            old_name,
683                                            instance.name, int(time.time()))
684
685   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
686                         cwd=inst_os.path, output=logfile)
687
688   if result.failed:
689     logging.error("os create command '%s' returned error: %s output: %s",
690                   result.cmd, result.fail_reason, result.output)
691     return False
692
693   return True
694
695
696 def _GetVGInfo(vg_name):
697   """Get informations about the volume group.
698
699   @type vg_name: str
700   @param vg_name: the volume group which we query
701   @rtype: dict
702   @return:
703     A dictionary with the following keys:
704       - C{vg_size} is the total size of the volume group in MiB
705       - C{vg_free} is the free size of the volume group in MiB
706       - C{pv_count} are the number of physical disks in that VG
707
708     If an error occurs during gathering of data, we return the same dict
709     with keys all set to None.
710
711   """
712   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
713
714   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
715                          "--nosuffix", "--units=m", "--separator=:", vg_name])
716
717   if retval.failed:
718     logging.error("volume group %s not present", vg_name)
719     return retdic
720   valarr = retval.stdout.strip().rstrip(':').split(':')
721   if len(valarr) == 3:
722     try:
723       retdic = {
724         "vg_size": int(round(float(valarr[0]), 0)),
725         "vg_free": int(round(float(valarr[1]), 0)),
726         "pv_count": int(valarr[2]),
727         }
728     except ValueError, err:
729       logging.exception("Fail to parse vgs output")
730   else:
731     logging.error("vgs output has the wrong number of fields (expected"
732                   " three): %s", str(valarr))
733   return retdic
734
735
736 def _GatherBlockDevs(instance):
737   """Set up an instance's block device(s).
738
739   This is run on the primary node at instance startup. The block
740   devices must be already assembled.
741
742   @type instance: L{objects.Instance}
743   @param instance: the instance whose disks we shoul assemble
744   @rtype: list
745   @return: list of (disk_object, device_path)
746
747   """
748   block_devices = []
749   for disk in instance.disks:
750     device = _RecursiveFindBD(disk)
751     if device is None:
752       raise errors.BlockDeviceError("Block device '%s' is not set up." %
753                                     str(disk))
754     device.Open()
755     block_devices.append((disk, device.dev_path))
756   return block_devices
757
758
759 def StartInstance(instance, extra_args):
760   """Start an instance.
761
762   @type instance: L{objects.Instance}
763   @param instance: the instance object
764   @rtype: boolean
765   @return: whether the startup was successful or not
766
767   """
768   running_instances = GetInstanceList([instance.hypervisor])
769
770   if instance.name in running_instances:
771     return True
772
773   block_devices = _GatherBlockDevs(instance)
774   hyper = hypervisor.GetHypervisor(instance.hypervisor)
775
776   try:
777     hyper.StartInstance(instance, block_devices, extra_args)
778   except errors.HypervisorError, err:
779     logging.exception("Failed to start instance")
780     return False
781
782   return True
783
784
785 def ShutdownInstance(instance):
786   """Shut an instance down.
787
788   @note: this functions uses polling with a hardcoded timeout.
789
790   @type instance: L{objects.Instance}
791   @param instance: the instance object
792   @rtype: boolean
793   @return: whether the startup was successful or not
794
795   """
796   hv_name = instance.hypervisor
797   running_instances = GetInstanceList([hv_name])
798
799   if instance.name not in running_instances:
800     return True
801
802   hyper = hypervisor.GetHypervisor(hv_name)
803   try:
804     hyper.StopInstance(instance)
805   except errors.HypervisorError, err:
806     logging.error("Failed to stop instance")
807     return False
808
809   # test every 10secs for 2min
810
811   time.sleep(1)
812   for dummy in range(11):
813     if instance.name not in GetInstanceList([hv_name]):
814       break
815     time.sleep(10)
816   else:
817     # the shutdown did not succeed
818     logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
819
820     try:
821       hyper.StopInstance(instance, force=True)
822     except errors.HypervisorError, err:
823       logging.exception("Failed to stop instance")
824       return False
825
826     time.sleep(1)
827     if instance.name in GetInstanceList([hv_name]):
828       logging.error("could not shutdown instance '%s' even by destroy",
829                     instance.name)
830       return False
831
832   return True
833
834
835 def RebootInstance(instance, reboot_type, extra_args):
836   """Reboot an instance.
837
838   @type instance: L{objects.Instance}
839   @param instance: the instance object to reboot
840   @type reboot_type: str
841   @param reboot_type: the type of reboot, one the following
842     constants:
843       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
844         instance OS, do not recreate the VM
845       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
846         restart the VM (at the hypervisor level)
847       - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
848         is not accepted here, since that mode is handled
849         differently
850   @rtype: boolean
851   @return: the success of the operation
852
853   """
854   running_instances = GetInstanceList([instance.hypervisor])
855
856   if instance.name not in running_instances:
857     logging.error("Cannot reboot instance that is not running")
858     return False
859
860   hyper = hypervisor.GetHypervisor(instance.hypervisor)
861   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
862     try:
863       hyper.RebootInstance(instance)
864     except errors.HypervisorError, err:
865       logging.exception("Failed to soft reboot instance")
866       return False
867   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
868     try:
869       ShutdownInstance(instance)
870       StartInstance(instance, extra_args)
871     except errors.HypervisorError, err:
872       logging.exception("Failed to hard reboot instance")
873       return False
874   else:
875     raise errors.ParameterError("reboot_type invalid")
876
877   return True
878
879
880 def MigrateInstance(instance, target, live):
881   """Migrates an instance to another node.
882
883   @type instance: L{objects.Instance}
884   @param instance: the instance definition
885   @type target: string
886   @param target: the target node name
887   @type live: boolean
888   @param live: whether the migration should be done live or not (the
889       interpretation of this parameter is left to the hypervisor)
890   @rtype: tuple
891   @return: a tuple of (success, msg) where:
892       - succes is a boolean denoting the success/failure of the operation
893       - msg is a string with details in case of failure
894
895   """
896   hyper = hypervisor.GetHypervisor(instance.hypervisor_name)
897
898   try:
899     hyper.MigrateInstance(instance.name, target, live)
900   except errors.HypervisorError, err:
901     msg = "Failed to migrate instance: %s" % str(err)
902     logging.error(msg)
903     return (False, msg)
904   return (True, "Migration successfull")
905
906
907 def CreateBlockDevice(disk, size, owner, on_primary, info):
908   """Creates a block device for an instance.
909
910   @type disk: L{objects.Disk}
911   @param disk: the object describing the disk we should create
912   @type size: int
913   @param size: the size of the physical underlying device, in MiB
914   @type owner: str
915   @param owner: the name of the instance for which disk is created,
916       used for device cache data
917   @type on_primary: boolean
918   @param on_primary:  indicates if it is the primary node or not
919   @type info: string
920   @param info: string that will be sent to the physical device
921       creation, used for example to set (LVM) tags on LVs
922
923   @return: the new unique_id of the device (this can sometime be
924       computed only after creation), or None. On secondary nodes,
925       it's not required to return anything.
926
927   """
928   clist = []
929   if disk.children:
930     for child in disk.children:
931       crdev = _RecursiveAssembleBD(child, owner, on_primary)
932       if on_primary or disk.AssembleOnSecondary():
933         # we need the children open in case the device itself has to
934         # be assembled
935         crdev.Open()
936       clist.append(crdev)
937   try:
938     device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
939     if device is not None:
940       logging.info("removing existing device %s", disk)
941       device.Remove()
942   except errors.BlockDeviceError, err:
943     pass
944
945   device = bdev.Create(disk.dev_type, disk.physical_id,
946                        clist, size)
947   if device is None:
948     raise ValueError("Can't create child device for %s, %s" %
949                      (disk, size))
950   if on_primary or disk.AssembleOnSecondary():
951     if not device.Assemble():
952       errorstring = "Can't assemble device after creation"
953       logging.error(errorstring)
954       raise errors.BlockDeviceError("%s, very unusual event - check the node"
955                                     " daemon logs" % errorstring)
956     device.SetSyncSpeed(constants.SYNC_SPEED)
957     if on_primary or disk.OpenOnSecondary():
958       device.Open(force=True)
959     DevCacheManager.UpdateCache(device.dev_path, owner,
960                                 on_primary, disk.iv_name)
961
962   device.SetInfo(info)
963
964   physical_id = device.unique_id
965   return physical_id
966
967
968 def RemoveBlockDevice(disk):
969   """Remove a block device.
970
971   @note: This is intended to be called recursively.
972
973   @type disk: L{objects.Disk}
974   @param disk: the disk object we should remove
975   @rtype: boolean
976   @return: the success of the operation
977
978   """
979   try:
980     rdev = _RecursiveFindBD(disk)
981   except errors.BlockDeviceError, err:
982     # probably can't attach
983     logging.info("Can't attach to device %s in remove", disk)
984     rdev = None
985   if rdev is not None:
986     r_path = rdev.dev_path
987     result = rdev.Remove()
988     if result:
989       DevCacheManager.RemoveCache(r_path)
990   else:
991     result = True
992   if disk.children:
993     for child in disk.children:
994       result = result and RemoveBlockDevice(child)
995   return result
996
997
998 def _RecursiveAssembleBD(disk, owner, as_primary):
999   """Activate a block device for an instance.
1000
1001   This is run on the primary and secondary nodes for an instance.
1002
1003   @note: this function is called recursively.
1004
1005   @type disk: L{objects.Disk}
1006   @param disk: the disk we try to assemble
1007   @type owner: str
1008   @param owner: the name of the instance which owns the disk
1009   @type as_primary: boolean
1010   @param as_primary: if we should make the block device
1011       read/write
1012
1013   @return: the assembled device or None (in case no device
1014       was assembled)
1015   @raise errors.BlockDeviceError: in case there is an error
1016       during the activation of the children or the device
1017       itself
1018
1019   """
1020   children = []
1021   if disk.children:
1022     mcn = disk.ChildrenNeeded()
1023     if mcn == -1:
1024       mcn = 0 # max number of Nones allowed
1025     else:
1026       mcn = len(disk.children) - mcn # max number of Nones
1027     for chld_disk in disk.children:
1028       try:
1029         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1030       except errors.BlockDeviceError, err:
1031         if children.count(None) >= mcn:
1032           raise
1033         cdev = None
1034         logging.debug("Error in child activation: %s", str(err))
1035       children.append(cdev)
1036
1037   if as_primary or disk.AssembleOnSecondary():
1038     r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
1039     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1040     result = r_dev
1041     if as_primary or disk.OpenOnSecondary():
1042       r_dev.Open()
1043     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1044                                 as_primary, disk.iv_name)
1045
1046   else:
1047     result = True
1048   return result
1049
1050
1051 def AssembleBlockDevice(disk, owner, as_primary):
1052   """Activate a block device for an instance.
1053
1054   This is a wrapper over _RecursiveAssembleBD.
1055
1056   @rtype: str or boolean
1057   @return: a C{/dev/...} path for primary nodes, and
1058       C{True} for secondary nodes
1059
1060   """
1061   result = _RecursiveAssembleBD(disk, owner, as_primary)
1062   if isinstance(result, bdev.BlockDev):
1063     result = result.dev_path
1064   return result
1065
1066
1067 def ShutdownBlockDevice(disk):
1068   """Shut down a block device.
1069
1070   First, if the device is assembled (Attach() is successfull), then
1071   the device is shutdown. Then the children of the device are
1072   shutdown.
1073
1074   This function is called recursively. Note that we don't cache the
1075   children or such, as oppossed to assemble, shutdown of different
1076   devices doesn't require that the upper device was active.
1077
1078   @type disk: L{objects.Disk}
1079   @param disk: the description of the disk we should
1080       shutdown
1081   @rtype: boolean
1082   @return: the success of the operation
1083
1084   """
1085   r_dev = _RecursiveFindBD(disk)
1086   if r_dev is not None:
1087     r_path = r_dev.dev_path
1088     result = r_dev.Shutdown()
1089     if result:
1090       DevCacheManager.RemoveCache(r_path)
1091   else:
1092     result = True
1093   if disk.children:
1094     for child in disk.children:
1095       result = result and ShutdownBlockDevice(child)
1096   return result
1097
1098
1099 def MirrorAddChildren(parent_cdev, new_cdevs):
1100   """Extend a mirrored block device.
1101
1102   @type parent_cdev: L{objects.Disk}
1103   @param parent_cdev: the disk to which we should add children
1104   @type new_cdevs: list of L{objects.Disk}
1105   @param new_cdevs: the list of children which we should add
1106   @rtype: boolean
1107   @return: the success of the operation
1108
1109   """
1110   parent_bdev = _RecursiveFindBD(parent_cdev)
1111   if parent_bdev is None:
1112     logging.error("Can't find parent device")
1113     return False
1114   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1115   if new_bdevs.count(None) > 0:
1116     logging.error("Can't find new device(s) to add: %s:%s",
1117                   new_bdevs, new_cdevs)
1118     return False
1119   parent_bdev.AddChildren(new_bdevs)
1120   return True
1121
1122
1123 def MirrorRemoveChildren(parent_cdev, new_cdevs):
1124   """Shrink a mirrored block device.
1125
1126   @type parent_cdev: L{objects.Disk}
1127   @param parent_cdev: the disk from which we should remove children
1128   @type new_cdevs: list of L{objects.Disk}
1129   @param new_cdevs: the list of children which we should remove
1130   @rtype: boolean
1131   @return: the success of the operation
1132
1133   """
1134   parent_bdev = _RecursiveFindBD(parent_cdev)
1135   if parent_bdev is None:
1136     logging.error("Can't find parent in remove children: %s", parent_cdev)
1137     return False
1138   devs = []
1139   for disk in new_cdevs:
1140     rpath = disk.StaticDevPath()
1141     if rpath is None:
1142       bd = _RecursiveFindBD(disk)
1143       if bd is None:
1144         logging.error("Can't find dynamic device %s while removing children",
1145                       disk)
1146         return False
1147       else:
1148         devs.append(bd.dev_path)
1149     else:
1150       devs.append(rpath)
1151   parent_bdev.RemoveChildren(devs)
1152   return True
1153
1154
1155 def GetMirrorStatus(disks):
1156   """Get the mirroring status of a list of devices.
1157
1158   @type disks: list of L{objects.Disk}
1159   @param disks: the list of disks which we should query
1160   @rtype: disk
1161   @return:
1162       a list of (mirror_done, estimated_time) tuples, which
1163       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1164   @raise errors.BlockDeviceError: if any of the disks cannot be
1165       found
1166
1167   """
1168   stats = []
1169   for dsk in disks:
1170     rbd = _RecursiveFindBD(dsk)
1171     if rbd is None:
1172       raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1173     stats.append(rbd.CombinedSyncStatus())
1174   return stats
1175
1176
1177 def _RecursiveFindBD(disk):
1178   """Check if a device is activated.
1179
1180   If so, return informations about the real device.
1181
1182   @type disk: L{objects.Disk}
1183   @param disk: the disk object we need to find
1184
1185   @return: None if the device can't be found,
1186       otherwise the device instance
1187
1188   """
1189   children = []
1190   if disk.children:
1191     for chdisk in disk.children:
1192       children.append(_RecursiveFindBD(chdisk))
1193
1194   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1195
1196
1197 def FindBlockDevice(disk):
1198   """Check if a device is activated.
1199
1200   If it is, return informations about the real device.
1201
1202   @type disk: L{objects.Disk}
1203   @param disk: the disk to find
1204   @rtype: None or tuple
1205   @return: None if the disk cannot be found, otherwise a
1206       tuple (device_path, major, minor, sync_percent,
1207       estimated_time, is_degraded)
1208
1209   """
1210   rbd = _RecursiveFindBD(disk)
1211   if rbd is None:
1212     return rbd
1213   return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1214
1215
1216 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1217   """Write a file to the filesystem.
1218
1219   This allows the master to overwrite(!) a file. It will only perform
1220   the operation if the file belongs to a list of configuration files.
1221
1222   @type file_name: str
1223   @param file_name: the target file name
1224   @type data: str
1225   @param data: the new contents of the file
1226   @type mode: int
1227   @param mode: the mode to give the file (can be None)
1228   @type uid: int
1229   @param uid: the owner of the file (can be -1 for default)
1230   @type gid: int
1231   @param gid: the group of the file (can be -1 for default)
1232   @type atime: float
1233   @param atime: the atime to set on the file (can be None)
1234   @type mtime: float
1235   @param mtime: the mtime to set on the file (can be None)
1236   @rtype: boolean
1237   @return: the success of the operation; errors are logged
1238       in the node daemon log
1239
1240   """
1241   if not os.path.isabs(file_name):
1242     logging.error("Filename passed to UploadFile is not absolute: '%s'",
1243                   file_name)
1244     return False
1245
1246   allowed_files = [
1247     constants.CLUSTER_CONF_FILE,
1248     constants.ETC_HOSTS,
1249     constants.SSH_KNOWN_HOSTS_FILE,
1250     constants.VNC_PASSWORD_FILE,
1251     ]
1252
1253   if file_name not in allowed_files:
1254     logging.error("Filename passed to UploadFile not in allowed"
1255                  " upload targets: '%s'", file_name)
1256     return False
1257
1258   raw_data = _Decompress(data)
1259
1260   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1261                   atime=atime, mtime=mtime)
1262   return True
1263
1264
1265 def WriteSsconfFiles(values):
1266   """Update all ssconf files.
1267
1268   Wrapper around the SimpleStore.WriteFiles.
1269
1270   """
1271   ssconf.SimpleStore().WriteFiles(values)
1272
1273
1274 def _ErrnoOrStr(err):
1275   """Format an EnvironmentError exception.
1276
1277   If the L{err} argument has an errno attribute, it will be looked up
1278   and converted into a textual C{E...} description. Otherwise the
1279   string representation of the error will be returned.
1280
1281   @type err: L{EnvironmentError}
1282   @param err: the exception to format
1283
1284   """
1285   if hasattr(err, 'errno'):
1286     detail = errno.errorcode[err.errno]
1287   else:
1288     detail = str(err)
1289   return detail
1290
1291
1292 def _OSOndiskVersion(name, os_dir):
1293   """Compute and return the API version of a given OS.
1294
1295   This function will try to read the API version of the OS given by
1296   the 'name' parameter and residing in the 'os_dir' directory.
1297
1298   @type name: str
1299   @param name: the OS name we should look for
1300   @type os_dir: str
1301   @param os_dir: the directory inwhich we should look for the OS
1302   @rtype: int or None
1303   @return:
1304       Either an integer denoting the version or None in the
1305       case when this is not a valid OS name.
1306   @raise errors.InvalidOS: if the OS cannot be found
1307
1308   """
1309   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1310
1311   try:
1312     st = os.stat(api_file)
1313   except EnvironmentError, err:
1314     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1315                            " found (%s)" % _ErrnoOrStr(err))
1316
1317   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1318     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1319                            " a regular file")
1320
1321   try:
1322     f = open(api_file)
1323     try:
1324       api_versions = f.readlines()
1325     finally:
1326       f.close()
1327   except EnvironmentError, err:
1328     raise errors.InvalidOS(name, os_dir, "error while reading the"
1329                            " API version (%s)" % _ErrnoOrStr(err))
1330
1331   api_versions = [version.strip() for version in api_versions]
1332   try:
1333     api_versions = [int(version) for version in api_versions]
1334   except (TypeError, ValueError), err:
1335     raise errors.InvalidOS(name, os_dir,
1336                            "API version is not integer (%s)" % str(err))
1337
1338   return api_versions
1339
1340
1341 def DiagnoseOS(top_dirs=None):
1342   """Compute the validity for all OSes.
1343
1344   @type top_dirs: list
1345   @param top_dirs: the list of directories in which to
1346       search (if not given defaults to
1347       L{constants.OS_SEARCH_PATH})
1348   @rtype: list of L{objects.OS}
1349   @return: an OS object for each name in all the given
1350       directories
1351
1352   """
1353   if top_dirs is None:
1354     top_dirs = constants.OS_SEARCH_PATH
1355
1356   result = []
1357   for dir_name in top_dirs:
1358     if os.path.isdir(dir_name):
1359       try:
1360         f_names = utils.ListVisibleFiles(dir_name)
1361       except EnvironmentError, err:
1362         logging.exception("Can't list the OS directory %s", dir_name)
1363         break
1364       for name in f_names:
1365         try:
1366           os_inst = OSFromDisk(name, base_dir=dir_name)
1367           result.append(os_inst)
1368         except errors.InvalidOS, err:
1369           result.append(objects.OS.FromInvalidOS(err))
1370
1371   return result
1372
1373
1374 def OSFromDisk(name, base_dir=None):
1375   """Create an OS instance from disk.
1376
1377   This function will return an OS instance if the given name is a
1378   valid OS name. Otherwise, it will raise an appropriate
1379   L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1380
1381   @type base_dir: string
1382   @keyword base_dir: Base directory containing OS installations.
1383                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1384   @rtype: L{objects.OS}
1385   @return: the OS instance if we find a valid one
1386   @raise errors.InvalidOS: if we don't find a valid OS
1387
1388   """
1389   if base_dir is None:
1390     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1391     if os_dir is None:
1392       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1393   else:
1394     os_dir = os.path.sep.join([base_dir, name])
1395
1396   api_versions = _OSOndiskVersion(name, os_dir)
1397
1398   if constants.OS_API_VERSION not in api_versions:
1399     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1400                            " (found %s want %s)"
1401                            % (api_versions, constants.OS_API_VERSION))
1402
1403   # OS Scripts dictionary, we will populate it with the actual script names
1404   os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1405
1406   for script in os_scripts:
1407     os_scripts[script] = os.path.sep.join([os_dir, script])
1408
1409     try:
1410       st = os.stat(os_scripts[script])
1411     except EnvironmentError, err:
1412       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1413                              (script, _ErrnoOrStr(err)))
1414
1415     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1416       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1417                              script)
1418
1419     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1420       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1421                              script)
1422
1423
1424   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1425                     create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1426                     export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1427                     import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1428                     rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1429                     api_versions=api_versions)
1430
1431 def OSEnvironment(instance, debug=0):
1432   """Calculate the environment for an os script.
1433
1434   @type instance: L{objects.Instance}
1435   @param instance: target instance for the os script run
1436   @type debug: integer
1437   @param debug: debug level (0 or 1, for OS Api 10)
1438   @rtype: dict
1439   @return: dict of environment variables
1440   @raise errors.BlockDeviceError: if the block device
1441       cannot be found
1442
1443   """
1444   result = {}
1445   result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1446   result['INSTANCE_NAME'] = instance.name
1447   result['HYPERVISOR'] = instance.hypervisor
1448   result['DISK_COUNT'] = '%d' % len(instance.disks)
1449   result['NIC_COUNT'] = '%d' % len(instance.nics)
1450   result['DEBUG_LEVEL'] = '%d' % debug
1451   for idx, disk in enumerate(instance.disks):
1452     real_disk = _RecursiveFindBD(disk)
1453     if real_disk is None:
1454       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1455                                     str(disk))
1456     real_disk.Open()
1457     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1458     # FIXME: When disks will have read-only mode, populate this
1459     result['DISK_%d_ACCESS' % idx] = 'W'
1460     if constants.HV_DISK_TYPE in instance.hvparams:
1461       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1462         instance.hvparams[constants.HV_DISK_TYPE]
1463     if disk.dev_type in constants.LDS_BLOCK:
1464       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1465     elif disk.dev_type == constants.LD_FILE:
1466       result['DISK_%d_BACKEND_TYPE' % idx] = \
1467         'file:%s' % disk.physical_id[0]
1468   for idx, nic in enumerate(instance.nics):
1469     result['NIC_%d_MAC' % idx] = nic.mac
1470     if nic.ip:
1471       result['NIC_%d_IP' % idx] = nic.ip
1472     result['NIC_%d_BRIDGE' % idx] = nic.bridge
1473     if constants.HV_NIC_TYPE in instance.hvparams:
1474       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1475         instance.hvparams[constants.HV_NIC_TYPE]
1476
1477   return result
1478
1479 def GrowBlockDevice(disk, amount):
1480   """Grow a stack of block devices.
1481
1482   This function is called recursively, with the childrens being the
1483   first ones to resize.
1484
1485   @type disk: L{objects.Disk}
1486   @param disk: the disk to be grown
1487   @rtype: (status, result)
1488   @return: a tuple with the status of the operation
1489       (True/False), and the errors message if status
1490       is False
1491
1492   """
1493   r_dev = _RecursiveFindBD(disk)
1494   if r_dev is None:
1495     return False, "Cannot find block device %s" % (disk,)
1496
1497   try:
1498     r_dev.Grow(amount)
1499   except errors.BlockDeviceError, err:
1500     return False, str(err)
1501
1502   return True, None
1503
1504
1505 def SnapshotBlockDevice(disk):
1506   """Create a snapshot copy of a block device.
1507
1508   This function is called recursively, and the snapshot is actually created
1509   just for the leaf lvm backend device.
1510
1511   @type disk: L{objects.Disk}
1512   @param disk: the disk to be snapshotted
1513   @rtype: string
1514   @return: snapshot disk path
1515
1516   """
1517   if disk.children:
1518     if len(disk.children) == 1:
1519       # only one child, let's recurse on it
1520       return SnapshotBlockDevice(disk.children[0])
1521     else:
1522       # more than one child, choose one that matches
1523       for child in disk.children:
1524         if child.size == disk.size:
1525           # return implies breaking the loop
1526           return SnapshotBlockDevice(child)
1527   elif disk.dev_type == constants.LD_LV:
1528     r_dev = _RecursiveFindBD(disk)
1529     if r_dev is not None:
1530       # let's stay on the safe side and ask for the full size, for now
1531       return r_dev.Snapshot(disk.size)
1532     else:
1533       return None
1534   else:
1535     raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1536                                  " '%s' of type '%s'" %
1537                                  (disk.unique_id, disk.dev_type))
1538
1539
1540 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1541   """Export a block device snapshot to a remote node.
1542
1543   @type disk: L{objects.Disk}
1544   @param disk: the description of the disk to export
1545   @type dest_node: str
1546   @param dest_node: the destination node to export to
1547   @type instance: L{objects.Instance}
1548   @param instance: the instance object to whom the disk belongs
1549   @type cluster_name: str
1550   @param cluster_name: the cluster name, needed for SSH hostalias
1551   @type idx: int
1552   @param idx: the index of the disk in the instance's disk list,
1553       used to export to the OS scripts environment
1554   @rtype: boolean
1555   @return: the success of the operation
1556
1557   """
1558   export_env = OSEnvironment(instance)
1559
1560   inst_os = OSFromDisk(instance.os)
1561   export_script = inst_os.export_script
1562
1563   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1564                                      instance.name, int(time.time()))
1565   if not os.path.exists(constants.LOG_OS_DIR):
1566     os.mkdir(constants.LOG_OS_DIR, 0750)
1567   real_disk = _RecursiveFindBD(disk)
1568   if real_disk is None:
1569     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1570                                   str(disk))
1571   real_disk.Open()
1572
1573   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1574   export_env['EXPORT_INDEX'] = str(idx)
1575
1576   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1577   destfile = disk.physical_id[1]
1578
1579   # the target command is built out of three individual commands,
1580   # which are joined by pipes; we check each individual command for
1581   # valid parameters
1582   expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1583                                export_script, logfile)
1584
1585   comprcmd = "gzip"
1586
1587   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1588                                 destdir, destdir, destfile)
1589   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1590                                                    constants.GANETI_RUNAS,
1591                                                    destcmd)
1592
1593   # all commands have been checked, so we're safe to combine them
1594   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1595
1596   result = utils.RunCmd(command, env=export_env)
1597
1598   if result.failed:
1599     logging.error("os snapshot export command '%s' returned error: %s"
1600                   " output: %s", command, result.fail_reason, result.output)
1601     return False
1602
1603   return True
1604
1605
1606 def FinalizeExport(instance, snap_disks):
1607   """Write out the export configuration information.
1608
1609   @type instance: L{objects.Instance}
1610   @param instance: the instance which we export, used for
1611       saving configuration
1612   @type snap_disks: list of L{objects.Disk}
1613   @param snap_disks: list of snapshot block devices, which
1614       will be used to get the actual name of the dump file
1615
1616   @rtype: boolean
1617   @return: the success of the operation
1618
1619   """
1620   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1621   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1622
1623   config = objects.SerializableConfigParser()
1624
1625   config.add_section(constants.INISECT_EXP)
1626   config.set(constants.INISECT_EXP, 'version', '0')
1627   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1628   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1629   config.set(constants.INISECT_EXP, 'os', instance.os)
1630   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1631
1632   config.add_section(constants.INISECT_INS)
1633   config.set(constants.INISECT_INS, 'name', instance.name)
1634   config.set(constants.INISECT_INS, 'memory', '%d' %
1635              instance.beparams[constants.BE_MEMORY])
1636   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1637              instance.beparams[constants.BE_VCPUS])
1638   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1639
1640   nic_total = 0
1641   for nic_count, nic in enumerate(instance.nics):
1642     nic_total += 1
1643     config.set(constants.INISECT_INS, 'nic%d_mac' %
1644                nic_count, '%s' % nic.mac)
1645     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1646     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1647                '%s' % nic.bridge)
1648   # TODO: redundant: on load can read nics until it doesn't exist
1649   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1650
1651   disk_total = 0
1652   for disk_count, disk in enumerate(snap_disks):
1653     if disk:
1654       disk_total += 1
1655       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1656                  ('%s' % disk.iv_name))
1657       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1658                  ('%s' % disk.physical_id[1]))
1659       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1660                  ('%d' % disk.size))
1661
1662   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1663
1664   utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1665                   data=config.Dumps())
1666   shutil.rmtree(finaldestdir, True)
1667   shutil.move(destdir, finaldestdir)
1668
1669   return True
1670
1671
1672 def ExportInfo(dest):
1673   """Get export configuration information.
1674
1675   @type dest: str
1676   @param dest: directory containing the export
1677
1678   @rtype: L{objects.SerializableConfigParser}
1679   @return: a serializable config file containing the
1680       export info
1681
1682   """
1683   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1684
1685   config = objects.SerializableConfigParser()
1686   config.read(cff)
1687
1688   if (not config.has_section(constants.INISECT_EXP) or
1689       not config.has_section(constants.INISECT_INS)):
1690     return None
1691
1692   return config
1693
1694
1695 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1696   """Import an os image into an instance.
1697
1698   @type instance: L{objects.Instance}
1699   @param instance: instance to import the disks into
1700   @type src_node: string
1701   @param src_node: source node for the disk images
1702   @type src_images: list of string
1703   @param src_images: absolute paths of the disk images
1704   @rtype: list of boolean
1705   @return: each boolean represent the success of importing the n-th disk
1706
1707   """
1708   import_env = OSEnvironment(instance)
1709   inst_os = OSFromDisk(instance.os)
1710   import_script = inst_os.import_script
1711
1712   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1713                                         instance.name, int(time.time()))
1714   if not os.path.exists(constants.LOG_OS_DIR):
1715     os.mkdir(constants.LOG_OS_DIR, 0750)
1716
1717   comprcmd = "gunzip"
1718   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1719                                import_script, logfile)
1720
1721   final_result = []
1722   for idx, image in enumerate(src_images):
1723     if image:
1724       destcmd = utils.BuildShellCmd('cat %s', image)
1725       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1726                                                        constants.GANETI_RUNAS,
1727                                                        destcmd)
1728       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1729       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1730       import_env['IMPORT_INDEX'] = str(idx)
1731       result = utils.RunCmd(command, env=import_env)
1732       if result.failed:
1733         logging.error("Disk import command '%s' returned error: %s"
1734                       " output: %s", command, result.fail_reason,
1735                       result.output)
1736         final_result.append(False)
1737       else:
1738         final_result.append(True)
1739     else:
1740       final_result.append(True)
1741
1742   return final_result
1743
1744
1745 def ListExports():
1746   """Return a list of exports currently available on this machine.
1747
1748   @rtype: list
1749   @return: list of the exports
1750
1751   """
1752   if os.path.isdir(constants.EXPORT_DIR):
1753     return utils.ListVisibleFiles(constants.EXPORT_DIR)
1754   else:
1755     return []
1756
1757
1758 def RemoveExport(export):
1759   """Remove an existing export from the node.
1760
1761   @type export: str
1762   @param export: the name of the export to remove
1763   @rtype: boolean
1764   @return: the success of the operation
1765
1766   """
1767   target = os.path.join(constants.EXPORT_DIR, export)
1768
1769   shutil.rmtree(target)
1770   # TODO: catch some of the relevant exceptions and provide a pretty
1771   # error message if rmtree fails.
1772
1773   return True
1774
1775
1776 def RenameBlockDevices(devlist):
1777   """Rename a list of block devices.
1778
1779   @type devlist: list of tuples
1780   @param devlist: list of tuples of the form  (disk,
1781       new_logical_id, new_physical_id); disk is an
1782       L{objects.Disk} object describing the current disk,
1783       and new logical_id/physical_id is the name we
1784       rename it to
1785   @rtype: boolean
1786   @return: True if all renames succeeded, False otherwise
1787
1788   """
1789   result = True
1790   for disk, unique_id in devlist:
1791     dev = _RecursiveFindBD(disk)
1792     if dev is None:
1793       result = False
1794       continue
1795     try:
1796       old_rpath = dev.dev_path
1797       dev.Rename(unique_id)
1798       new_rpath = dev.dev_path
1799       if old_rpath != new_rpath:
1800         DevCacheManager.RemoveCache(old_rpath)
1801         # FIXME: we should add the new cache information here, like:
1802         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1803         # but we don't have the owner here - maybe parse from existing
1804         # cache? for now, we only lose lvm data when we rename, which
1805         # is less critical than DRBD or MD
1806     except errors.BlockDeviceError, err:
1807       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1808       result = False
1809   return result
1810
1811
1812 def _TransformFileStorageDir(file_storage_dir):
1813   """Checks whether given file_storage_dir is valid.
1814
1815   Checks wheter the given file_storage_dir is within the cluster-wide
1816   default file_storage_dir stored in SimpleStore. Only paths under that
1817   directory are allowed.
1818
1819   @type file_storage_dir: str
1820   @param file_storage_dir: the path to check
1821
1822   @return: the normalized path if valid, None otherwise
1823
1824   """
1825   cfg = _GetConfig()
1826   file_storage_dir = os.path.normpath(file_storage_dir)
1827   base_file_storage_dir = cfg.GetFileStorageDir()
1828   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1829       base_file_storage_dir):
1830     logging.error("file storage directory '%s' is not under base file"
1831                   " storage directory '%s'",
1832                   file_storage_dir, base_file_storage_dir)
1833     return None
1834   return file_storage_dir
1835
1836
1837 def CreateFileStorageDir(file_storage_dir):
1838   """Create file storage directory.
1839
1840   @type file_storage_dir: str
1841   @param file_storage_dir: directory to create
1842
1843   @rtype: tuple
1844   @return: tuple with first element a boolean indicating wheter dir
1845       creation was successful or not
1846
1847   """
1848   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1849   result = True,
1850   if not file_storage_dir:
1851     result = False,
1852   else:
1853     if os.path.exists(file_storage_dir):
1854       if not os.path.isdir(file_storage_dir):
1855         logging.error("'%s' is not a directory", file_storage_dir)
1856         result = False,
1857     else:
1858       try:
1859         os.makedirs(file_storage_dir, 0750)
1860       except OSError, err:
1861         logging.error("Cannot create file storage directory '%s': %s",
1862                       file_storage_dir, err)
1863         result = False,
1864   return result
1865
1866
1867 def RemoveFileStorageDir(file_storage_dir):
1868   """Remove file storage directory.
1869
1870   Remove it only if it's empty. If not log an error and return.
1871
1872   @type file_storage_dir: str
1873   @param file_storage_dir: the directory we should cleanup
1874   @rtype: tuple (success,)
1875   @return: tuple of one element, C{success}, denoting
1876       whether the operation was successfull
1877
1878   """
1879   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1880   result = True,
1881   if not file_storage_dir:
1882     result = False,
1883   else:
1884     if os.path.exists(file_storage_dir):
1885       if not os.path.isdir(file_storage_dir):
1886         logging.error("'%s' is not a directory", file_storage_dir)
1887         result = False,
1888       # deletes dir only if empty, otherwise we want to return False
1889       try:
1890         os.rmdir(file_storage_dir)
1891       except OSError, err:
1892         logging.exception("Cannot remove file storage directory '%s'",
1893                           file_storage_dir)
1894         result = False,
1895   return result
1896
1897
1898 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1899   """Rename the file storage directory.
1900
1901   @type old_file_storage_dir: str
1902   @param old_file_storage_dir: the current path
1903   @type new_file_storage_dir: str
1904   @param new_file_storage_dir: the name we should rename to
1905   @rtype: tuple (success,)
1906   @return: tuple of one element, C{success}, denoting
1907       whether the operation was successful
1908
1909   """
1910   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1911   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1912   result = True,
1913   if not old_file_storage_dir or not new_file_storage_dir:
1914     result = False,
1915   else:
1916     if not os.path.exists(new_file_storage_dir):
1917       if os.path.isdir(old_file_storage_dir):
1918         try:
1919           os.rename(old_file_storage_dir, new_file_storage_dir)
1920         except OSError, err:
1921           logging.exception("Cannot rename '%s' to '%s'",
1922                             old_file_storage_dir, new_file_storage_dir)
1923           result =  False,
1924       else:
1925         logging.error("'%s' is not a directory", old_file_storage_dir)
1926         result = False,
1927     else:
1928       if os.path.exists(old_file_storage_dir):
1929         logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1930                       old_file_storage_dir, new_file_storage_dir)
1931         result = False,
1932   return result
1933
1934
1935 def _IsJobQueueFile(file_name):
1936   """Checks whether the given filename is in the queue directory.
1937
1938   @type file_name: str
1939   @param file_name: the file name we should check
1940   @rtype: boolean
1941   @return: whether the file is under the queue directory
1942
1943   """
1944   queue_dir = os.path.normpath(constants.QUEUE_DIR)
1945   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
1946
1947   if not result:
1948     logging.error("'%s' is not a file in the queue directory",
1949                   file_name)
1950
1951   return result
1952
1953
1954 def JobQueueUpdate(file_name, content):
1955   """Updates a file in the queue directory.
1956
1957   This is just a wrapper over L{utils.WriteFile}, with proper
1958   checking.
1959
1960   @type file_name: str
1961   @param file_name: the job file name
1962   @type content: str
1963   @param content: the new job contents
1964   @rtype: boolean
1965   @return: the success of the operation
1966
1967   """
1968   if not _IsJobQueueFile(file_name):
1969     return False
1970
1971   # Write and replace the file atomically
1972   utils.WriteFile(file_name, data=_Decompress(content))
1973
1974   return True
1975
1976
1977 def JobQueueRename(old, new):
1978   """Renames a job queue file.
1979
1980   This is just a wrapper over os.rename with proper checking.
1981
1982   @type old: str
1983   @param old: the old (actual) file name
1984   @type new: str
1985   @param new: the desired file name
1986   @rtype: boolean
1987   @return: the success of the operation
1988
1989   """
1990   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
1991     return False
1992
1993   utils.RenameFile(old, new, mkdir=True)
1994
1995   return True
1996
1997
1998 def JobQueueSetDrainFlag(drain_flag):
1999   """Set the drain flag for the queue.
2000
2001   This will set or unset the queue drain flag.
2002
2003   @type drain_flag: boolean
2004   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2005   @rtype: boolean
2006   @return: always True
2007   @warning: the function always returns True
2008
2009   """
2010   if drain_flag:
2011     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2012   else:
2013     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2014
2015   return True
2016
2017
2018 def CloseBlockDevices(disks):
2019   """Closes the given block devices.
2020
2021   This means they will be switched to secondary mode (in case of
2022   DRBD).
2023
2024   @type disks: list of L{objects.Disk}
2025   @param disks: the list of disks to be closed
2026   @rtype: tuple (success, message)
2027   @return: a tuple of success and message, where success
2028       indicates the succes of the operation, and message
2029       which will contain the error details in case we
2030       failed
2031
2032   """
2033   bdevs = []
2034   for cf in disks:
2035     rd = _RecursiveFindBD(cf)
2036     if rd is None:
2037       return (False, "Can't find device %s" % cf)
2038     bdevs.append(rd)
2039
2040   msg = []
2041   for rd in bdevs:
2042     try:
2043       rd.Close()
2044     except errors.BlockDeviceError, err:
2045       msg.append(str(err))
2046   if msg:
2047     return (False, "Can't make devices secondary: %s" % ",".join(msg))
2048   else:
2049     return (True, "All devices secondary")
2050
2051
2052 def ValidateHVParams(hvname, hvparams):
2053   """Validates the given hypervisor parameters.
2054
2055   @type hvname: string
2056   @param hvname: the hypervisor name
2057   @type hvparams: dict
2058   @param hvparams: the hypervisor parameters to be validated
2059   @rtype: tuple (success, message)
2060   @return: a tuple of success and message, where success
2061       indicates the succes of the operation, and message
2062       which will contain the error details in case we
2063       failed
2064
2065   """
2066   try:
2067     hv_type = hypervisor.GetHypervisor(hvname)
2068     hv_type.ValidateParameters(hvparams)
2069     return (True, "Validation passed")
2070   except errors.HypervisorError, err:
2071     return (False, str(err))
2072
2073
2074 def DemoteFromMC():
2075   """Demotes the current node from master candidate role.
2076
2077   """
2078   # try to ensure we're not the master by mistake
2079   master, myself = ssconf.GetMasterAndMyself()
2080   if master == myself:
2081     return (False, "ssconf status shows I'm the master node, will not demote")
2082   pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2083   if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2084     return (False, "The master daemon is running, will not demote")
2085   try:
2086     utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2087   except EnvironmentError, err:
2088     if err.errno != errno.ENOENT:
2089       return (False, "Error while backing up cluster file: %s" % str(err))
2090   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2091   return (True, "Done")
2092
2093
2094 class HooksRunner(object):
2095   """Hook runner.
2096
2097   This class is instantiated on the node side (ganeti-noded) and not
2098   on the master side.
2099
2100   """
2101   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2102
2103   def __init__(self, hooks_base_dir=None):
2104     """Constructor for hooks runner.
2105
2106     @type hooks_base_dir: str or None
2107     @param hooks_base_dir: if not None, this overrides the
2108         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2109
2110     """
2111     if hooks_base_dir is None:
2112       hooks_base_dir = constants.HOOKS_BASE_DIR
2113     self._BASE_DIR = hooks_base_dir
2114
2115   @staticmethod
2116   def ExecHook(script, env):
2117     """Exec one hook script.
2118
2119     @type script: str
2120     @param script: the full path to the script
2121     @type env: dict
2122     @param env: the environment with which to exec the script
2123     @rtype: tuple (success, message)
2124     @return: a tuple of success and message, where success
2125         indicates the succes of the operation, and message
2126         which will contain the error details in case we
2127         failed
2128
2129     """
2130     # exec the process using subprocess and log the output
2131     fdstdin = None
2132     try:
2133       fdstdin = open("/dev/null", "r")
2134       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2135                                stderr=subprocess.STDOUT, close_fds=True,
2136                                shell=False, cwd="/", env=env)
2137       output = ""
2138       try:
2139         output = child.stdout.read(4096)
2140         child.stdout.close()
2141       except EnvironmentError, err:
2142         output += "Hook script error: %s" % str(err)
2143
2144       while True:
2145         try:
2146           result = child.wait()
2147           break
2148         except EnvironmentError, err:
2149           if err.errno == errno.EINTR:
2150             continue
2151           raise
2152     finally:
2153       # try not to leak fds
2154       for fd in (fdstdin, ):
2155         if fd is not None:
2156           try:
2157             fd.close()
2158           except EnvironmentError, err:
2159             # just log the error
2160             #logging.exception("Error while closing fd %s", fd)
2161             pass
2162
2163     return result == 0, output
2164
2165   def RunHooks(self, hpath, phase, env):
2166     """Run the scripts in the hooks directory.
2167
2168     @type hpath: str
2169     @param hpath: the path to the hooks directory which
2170         holds the scripts
2171     @type phase: str
2172     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2173         L{constants.HOOKS_PHASE_POST}
2174     @type env: dict
2175     @param env: dictionary with the environment for the hook
2176     @rtype: list
2177     @return: list of 3-element tuples:
2178       - script path
2179       - script result, either L{constants.HKR_SUCCESS} or
2180         L{constants.HKR_FAIL}
2181       - output of the script
2182
2183     @raise errors.ProgrammerError: for invalid input
2184         parameters
2185
2186     """
2187     if phase == constants.HOOKS_PHASE_PRE:
2188       suffix = "pre"
2189     elif phase == constants.HOOKS_PHASE_POST:
2190       suffix = "post"
2191     else:
2192       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2193     rr = []
2194
2195     subdir = "%s-%s.d" % (hpath, suffix)
2196     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2197     try:
2198       dir_contents = utils.ListVisibleFiles(dir_name)
2199     except OSError, err:
2200       # FIXME: must log output in case of failures
2201       return rr
2202
2203     # we use the standard python sort order,
2204     # so 00name is the recommended naming scheme
2205     dir_contents.sort()
2206     for relname in dir_contents:
2207       fname = os.path.join(dir_name, relname)
2208       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2209           self.RE_MASK.match(relname) is not None):
2210         rrval = constants.HKR_SKIP
2211         output = ""
2212       else:
2213         result, output = self.ExecHook(fname, env)
2214         if not result:
2215           rrval = constants.HKR_FAIL
2216         else:
2217           rrval = constants.HKR_SUCCESS
2218       rr.append(("%s/%s" % (subdir, relname), rrval, output))
2219
2220     return rr
2221
2222
2223 class IAllocatorRunner(object):
2224   """IAllocator runner.
2225
2226   This class is instantiated on the node side (ganeti-noded) and not on
2227   the master side.
2228
2229   """
2230   def Run(self, name, idata):
2231     """Run an iallocator script.
2232
2233     @type name: str
2234     @param name: the iallocator script name
2235     @type idata: str
2236     @param idata: the allocator input data
2237
2238     @rtype: tuple
2239     @return: four element tuple of:
2240        - run status (one of the IARUN_ constants)
2241        - stdout
2242        - stderr
2243        - fail reason (as from L{utils.RunResult})
2244
2245     """
2246     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2247                                   os.path.isfile)
2248     if alloc_script is None:
2249       return (constants.IARUN_NOTFOUND, None, None, None)
2250
2251     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2252     try:
2253       os.write(fd, idata)
2254       os.close(fd)
2255       result = utils.RunCmd([alloc_script, fin_name])
2256       if result.failed:
2257         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2258                 result.fail_reason)
2259     finally:
2260       os.unlink(fin_name)
2261
2262     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2263
2264
2265 class DevCacheManager(object):
2266   """Simple class for managing a cache of block device information.
2267
2268   """
2269   _DEV_PREFIX = "/dev/"
2270   _ROOT_DIR = constants.BDEV_CACHE_DIR
2271
2272   @classmethod
2273   def _ConvertPath(cls, dev_path):
2274     """Converts a /dev/name path to the cache file name.
2275
2276     This replaces slashes with underscores and strips the /dev
2277     prefix. It then returns the full path to the cache file.
2278
2279     @type dev_path: str
2280     @param dev_path: the C{/dev/} path name
2281     @rtype: str
2282     @return: the converted path name
2283
2284     """
2285     if dev_path.startswith(cls._DEV_PREFIX):
2286       dev_path = dev_path[len(cls._DEV_PREFIX):]
2287     dev_path = dev_path.replace("/", "_")
2288     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2289     return fpath
2290
2291   @classmethod
2292   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2293     """Updates the cache information for a given device.
2294
2295     @type dev_path: str
2296     @param dev_path: the pathname of the device
2297     @type owner: str
2298     @param owner: the owner (instance name) of the device
2299     @type on_primary: bool
2300     @param on_primary: whether this is the primary
2301         node nor not
2302     @type iv_name: str
2303     @param iv_name: the instance-visible name of the
2304         device, as in objects.Disk.iv_name
2305
2306     @rtype: None
2307
2308     """
2309     if dev_path is None:
2310       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2311       return
2312     fpath = cls._ConvertPath(dev_path)
2313     if on_primary:
2314       state = "primary"
2315     else:
2316       state = "secondary"
2317     if iv_name is None:
2318       iv_name = "not_visible"
2319     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2320     try:
2321       utils.WriteFile(fpath, data=fdata)
2322     except EnvironmentError, err:
2323       logging.exception("Can't update bdev cache for %s", dev_path)
2324
2325   @classmethod
2326   def RemoveCache(cls, dev_path):
2327     """Remove data for a dev_path.
2328
2329     This is just a wrapper over L{utils.RemoveFile} with a converted
2330     path name and logging.
2331
2332     @type dev_path: str
2333     @param dev_path: the pathname of the device
2334
2335     @rtype: None
2336
2337     """
2338     if dev_path is None:
2339       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2340       return
2341     fpath = cls._ConvertPath(dev_path)
2342     try:
2343       utils.RemoveFile(fpath)
2344     except EnvironmentError, err:
2345       logging.exception("Can't update bdev cache for %s", dev_path)