Fill the 'call' attribute of offline rpc results
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36 import zlib
37 import base64
38
39 from ganeti import errors
40 from ganeti import utils
41 from ganeti import ssh
42 from ganeti import hypervisor
43 from ganeti import constants
44 from ganeti import bdev
45 from ganeti import objects
46 from ganeti import ssconf
47
48
49 def _GetConfig():
50   """Simple wrapper to return a SimpleStore.
51
52   @rtype: L{ssconf.SimpleStore}
53   @return: a SimpleStore instance
54
55   """
56   return ssconf.SimpleStore()
57
58
59 def _GetSshRunner(cluster_name):
60   """Simple wrapper to return an SshRunner.
61
62   @type cluster_name: str
63   @param cluster_name: the cluster name, which is needed
64       by the SshRunner constructor
65   @rtype: L{ssh.SshRunner}
66   @return: an SshRunner instance
67
68   """
69   return ssh.SshRunner(cluster_name)
70
71
72 def _Decompress(data):
73   """Unpacks data compressed by the RPC client.
74
75   @type data: list or tuple
76   @param data: Data sent by RPC client
77   @rtype: str
78   @return: Decompressed data
79
80   """
81   assert isinstance(data, (list, tuple))
82   assert len(data) == 2
83   (encoding, content) = data
84   if encoding == constants.RPC_ENCODING_NONE:
85     return content
86   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
87     return zlib.decompress(base64.b64decode(content))
88   else:
89     raise AssertionError("Unknown data encoding")
90
91
92 def _CleanDirectory(path, exclude=None):
93   """Removes all regular files in a directory.
94
95   @type path: str
96   @param path: the directory to clean
97   @type exclude: list
98   @param exclude: list of files to be excluded, defaults
99       to the empty list
100
101   """
102   if not os.path.isdir(path):
103     return
104   if exclude is None:
105     exclude = []
106   else:
107     # Normalize excluded paths
108     exclude = [os.path.normpath(i) for i in exclude]
109
110   for rel_name in utils.ListVisibleFiles(path):
111     full_name = os.path.normpath(os.path.join(path, rel_name))
112     if full_name in exclude:
113       continue
114     if os.path.isfile(full_name) and not os.path.islink(full_name):
115       utils.RemoveFile(full_name)
116
117
118 def JobQueuePurge():
119   """Removes job queue files and archived jobs.
120
121   @rtype: None
122
123   """
124   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
125   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
126
127
128 def GetMasterInfo():
129   """Returns master information.
130
131   This is an utility function to compute master information, either
132   for consumption here or from the node daemon.
133
134   @rtype: tuple
135   @return: (master_netdev, master_ip, master_name) if we have a good
136       configuration, otherwise (None, None, None)
137
138   """
139   try:
140     cfg = _GetConfig()
141     master_netdev = cfg.GetMasterNetdev()
142     master_ip = cfg.GetMasterIP()
143     master_node = cfg.GetMasterNode()
144   except errors.ConfigurationError, err:
145     logging.exception("Cluster configuration incomplete")
146     return (None, None, None)
147   return (master_netdev, master_ip, master_node)
148
149
150 def StartMaster(start_daemons):
151   """Activate local node as master node.
152
153   The function will always try activate the IP address of the master
154   (unless someone else has it). It will also start the master daemons,
155   based on the start_daemons parameter.
156
157   @type start_daemons: boolean
158   @param start_daemons: whther to also start the master
159       daemons (ganeti-masterd and ganeti-rapi)
160   @rtype: None
161
162   """
163   ok = True
164   master_netdev, master_ip, _ = GetMasterInfo()
165   if not master_netdev:
166     return False
167
168   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
169     if utils.OwnIpAddress(master_ip):
170       # we already have the ip:
171       logging.debug("Already started")
172     else:
173       logging.error("Someone else has the master ip, not activating")
174       ok = False
175   else:
176     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
177                            "dev", master_netdev, "label",
178                            "%s:0" % master_netdev])
179     if result.failed:
180       logging.error("Can't activate master IP: %s", result.output)
181       ok = False
182
183     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
184                            "-s", master_ip, master_ip])
185     # we'll ignore the exit code of arping
186
187   # and now start the master and rapi daemons
188   if start_daemons:
189     for daemon in 'ganeti-masterd', 'ganeti-rapi':
190       result = utils.RunCmd([daemon])
191       if result.failed:
192         logging.error("Can't start daemon %s: %s", daemon, result.output)
193         ok = False
194   return ok
195
196
197 def StopMaster(stop_daemons):
198   """Deactivate this node as master.
199
200   The function will always try to deactivate the IP address of the
201   master. It will also stop the master daemons depending on the
202   stop_daemons parameter.
203
204   @type stop_daemons: boolean
205   @param stop_daemons: whether to also stop the master daemons
206       (ganeti-masterd and ganeti-rapi)
207   @rtype: None
208
209   """
210   master_netdev, master_ip, _ = GetMasterInfo()
211   if not master_netdev:
212     return False
213
214   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
215                          "dev", master_netdev])
216   if result.failed:
217     logging.error("Can't remove the master IP, error: %s", result.output)
218     # but otherwise ignore the failure
219
220   if stop_daemons:
221     # stop/kill the rapi and the master daemon
222     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
223       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
224
225   return True
226
227
228 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
229   """Joins this node to the cluster.
230
231   This does the following:
232       - updates the hostkeys of the machine (rsa and dsa)
233       - adds the ssh private key to the user
234       - adds the ssh public key to the users' authorized_keys file
235
236   @type dsa: str
237   @param dsa: the DSA private key to write
238   @type dsapub: str
239   @param dsapub: the DSA public key to write
240   @type rsa: str
241   @param rsa: the RSA private key to write
242   @type rsapub: str
243   @param rsapub: the RSA public key to write
244   @type sshkey: str
245   @param sshkey: the SSH private key to write
246   @type sshpub: str
247   @param sshpub: the SSH public key to write
248   @rtype: boolean
249   @return: the success of the operation
250
251   """
252   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
253                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
254                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
255                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
256   for name, content, mode in sshd_keys:
257     utils.WriteFile(name, data=content, mode=mode)
258
259   try:
260     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
261                                                     mkdir=True)
262   except errors.OpExecError, err:
263     logging.exception("Error while processing user ssh files")
264     return False
265
266   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
267     utils.WriteFile(name, data=content, mode=0600)
268
269   utils.AddAuthorizedKey(auth_keys, sshpub)
270
271   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
272
273   return True
274
275
276 def LeaveCluster():
277   """Cleans up and remove the current node.
278
279   This function cleans up and prepares the current node to be removed
280   from the cluster.
281
282   If processing is successful, then it raises an
283   L{errors.QuitGanetiException} which is used as a special case to
284   shutdown the node daemon.
285
286   """
287   _CleanDirectory(constants.DATA_DIR)
288   JobQueuePurge()
289
290   try:
291     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
292   except errors.OpExecError:
293     logging.exception("Error while processing ssh files")
294     return
295
296   f = open(pub_key, 'r')
297   try:
298     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
299   finally:
300     f.close()
301
302   utils.RemoveFile(priv_key)
303   utils.RemoveFile(pub_key)
304
305   # Return a reassuring string to the caller, and quit
306   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
307
308
309 def GetNodeInfo(vgname, hypervisor_type):
310   """Gives back a hash with different informations about the node.
311
312   @type vgname: C{string}
313   @param vgname: the name of the volume group to ask for disk space information
314   @type hypervisor_type: C{str}
315   @param hypervisor_type: the name of the hypervisor to ask for
316       memory information
317   @rtype: C{dict}
318   @return: dictionary with the following keys:
319       - vg_size is the size of the configured volume group in MiB
320       - vg_free is the free size of the volume group in MiB
321       - memory_dom0 is the memory allocated for domain0 in MiB
322       - memory_free is the currently available (free) ram in MiB
323       - memory_total is the total number of ram in MiB
324
325   """
326   outputarray = {}
327   vginfo = _GetVGInfo(vgname)
328   outputarray['vg_size'] = vginfo['vg_size']
329   outputarray['vg_free'] = vginfo['vg_free']
330
331   hyper = hypervisor.GetHypervisor(hypervisor_type)
332   hyp_info = hyper.GetNodeInfo()
333   if hyp_info is not None:
334     outputarray.update(hyp_info)
335
336   f = open("/proc/sys/kernel/random/boot_id", 'r')
337   try:
338     outputarray["bootid"] = f.read(128).rstrip("\n")
339   finally:
340     f.close()
341
342   return outputarray
343
344
345 def VerifyNode(what, cluster_name):
346   """Verify the status of the local node.
347
348   Based on the input L{what} parameter, various checks are done on the
349   local node.
350
351   If the I{filelist} key is present, this list of
352   files is checksummed and the file/checksum pairs are returned.
353
354   If the I{nodelist} key is present, we check that we have
355   connectivity via ssh with the target nodes (and check the hostname
356   report).
357
358   If the I{node-net-test} key is present, we check that we have
359   connectivity to the given nodes via both primary IP and, if
360   applicable, secondary IPs.
361
362   @type what: C{dict}
363   @param what: a dictionary of things to check:
364       - filelist: list of files for which to compute checksums
365       - nodelist: list of nodes we should check ssh communication with
366       - node-net-test: list of nodes we should check node daemon port
367         connectivity with
368       - hypervisor: list with hypervisors to run the verify for
369   @rtype: dict
370   @return: a dictionary with the same keys as the input dict, and
371       values representing the result of the checks
372
373   """
374   result = {}
375
376   if constants.NV_HYPERVISOR in what:
377     result[constants.NV_HYPERVISOR] = tmp = {}
378     for hv_name in what[constants.NV_HYPERVISOR]:
379       tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
380
381   if constants.NV_FILELIST in what:
382     result[constants.NV_FILELIST] = utils.FingerprintFiles(
383       what[constants.NV_FILELIST])
384
385   if constants.NV_NODELIST in what:
386     result[constants.NV_NODELIST] = tmp = {}
387     random.shuffle(what[constants.NV_NODELIST])
388     for node in what[constants.NV_NODELIST]:
389       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
390       if not success:
391         tmp[node] = message
392
393   if constants.NV_NODENETTEST in what:
394     result[constants.NV_NODENETTEST] = tmp = {}
395     my_name = utils.HostInfo().name
396     my_pip = my_sip = None
397     for name, pip, sip in what[constants.NV_NODENETTEST]:
398       if name == my_name:
399         my_pip = pip
400         my_sip = sip
401         break
402     if not my_pip:
403       tmp[my_name] = ("Can't find my own primary/secondary IP"
404                       " in the node list")
405     else:
406       port = utils.GetNodeDaemonPort()
407       for name, pip, sip in what[constants.NV_NODENETTEST]:
408         fail = []
409         if not utils.TcpPing(pip, port, source=my_pip):
410           fail.append("primary")
411         if sip != pip:
412           if not utils.TcpPing(sip, port, source=my_sip):
413             fail.append("secondary")
414         if fail:
415           tmp[name] = ("failure using the %s interface(s)" %
416                        " and ".join(fail))
417
418   if constants.NV_LVLIST in what:
419     result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
420
421   if constants.NV_INSTANCELIST in what:
422     result[constants.NV_INSTANCELIST] = GetInstanceList(
423       what[constants.NV_INSTANCELIST])
424
425   if constants.NV_VGLIST in what:
426     result[constants.NV_VGLIST] = ListVolumeGroups()
427
428   if constants.NV_VERSION in what:
429     result[constants.NV_VERSION] = constants.PROTOCOL_VERSION
430
431   if constants.NV_HVINFO in what:
432     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
433     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
434
435   if constants.NV_DRBDLIST in what:
436     try:
437       used_minors = bdev.DRBD8.GetUsedDevs().keys()
438     except errors.BlockDeviceError:
439       logging.warning("Can't get used minors list", exc_info=True)
440       used_minors = []
441     result[constants.NV_DRBDLIST] = used_minors
442
443   return result
444
445
446 def GetVolumeList(vg_name):
447   """Compute list of logical volumes and their size.
448
449   @type vg_name: str
450   @param vg_name: the volume group whose LVs we should list
451   @rtype: dict
452   @return:
453       dictionary of all partions (key) with value being a tuple of
454       their size (in MiB), inactive and online status::
455
456         {'test1': ('20.06', True, True)}
457
458       in case of errors, a string is returned with the error
459       details.
460
461   """
462   lvs = {}
463   sep = '|'
464   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
465                          "--separator=%s" % sep,
466                          "-olv_name,lv_size,lv_attr", vg_name])
467   if result.failed:
468     logging.error("Failed to list logical volumes, lvs output: %s",
469                   result.output)
470     return result.output
471
472   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
473   for line in result.stdout.splitlines():
474     line = line.strip()
475     match = valid_line_re.match(line)
476     if not match:
477       logging.error("Invalid line returned from lvs output: '%s'", line)
478       continue
479     name, size, attr = match.groups()
480     inactive = attr[4] == '-'
481     online = attr[5] == 'o'
482     lvs[name] = (size, inactive, online)
483
484   return lvs
485
486
487 def ListVolumeGroups():
488   """List the volume groups and their size.
489
490   @rtype: dict
491   @return: dictionary with keys volume name and values the
492       size of the volume
493
494   """
495   return utils.ListVolumeGroups()
496
497
498 def NodeVolumes():
499   """List all volumes on this node.
500
501   @rtype: list
502   @return:
503     A list of dictionaries, each having four keys:
504       - name: the logical volume name,
505       - size: the size of the logical volume
506       - dev: the physical device on which the LV lives
507       - vg: the volume group to which it belongs
508
509     In case of errors, we return an empty list and log the
510     error.
511
512     Note that since a logical volume can live on multiple physical
513     volumes, the resulting list might include a logical volume
514     multiple times.
515
516   """
517   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
518                          "--separator=|",
519                          "--options=lv_name,lv_size,devices,vg_name"])
520   if result.failed:
521     logging.error("Failed to list logical volumes, lvs output: %s",
522                   result.output)
523     return []
524
525   def parse_dev(dev):
526     if '(' in dev:
527       return dev.split('(')[0]
528     else:
529       return dev
530
531   def map_line(line):
532     return {
533       'name': line[0].strip(),
534       'size': line[1].strip(),
535       'dev': parse_dev(line[2].strip()),
536       'vg': line[3].strip(),
537     }
538
539   return [map_line(line.split('|')) for line in result.stdout.splitlines()
540           if line.count('|') >= 3]
541
542
543 def BridgesExist(bridges_list):
544   """Check if a list of bridges exist on the current node.
545
546   @rtype: boolean
547   @return: C{True} if all of them exist, C{False} otherwise
548
549   """
550   for bridge in bridges_list:
551     if not utils.BridgeExists(bridge):
552       return False
553
554   return True
555
556
557 def GetInstanceList(hypervisor_list):
558   """Provides a list of instances.
559
560   @type hypervisor_list: list
561   @param hypervisor_list: the list of hypervisors to query information
562
563   @rtype: list
564   @return: a list of all running instances on the current node
565     - instance1.example.com
566     - instance2.example.com
567
568   """
569   results = []
570   for hname in hypervisor_list:
571     try:
572       names = hypervisor.GetHypervisor(hname).ListInstances()
573       results.extend(names)
574     except errors.HypervisorError, err:
575       logging.exception("Error enumerating instances for hypevisor %s", hname)
576       # FIXME: should we somehow not propagate this to the master?
577       raise
578
579   return results
580
581
582 def GetInstanceInfo(instance, hname):
583   """Gives back the informations about an instance as a dictionary.
584
585   @type instance: string
586   @param instance: the instance name
587   @type hname: string
588   @param hname: the hypervisor type of the instance
589
590   @rtype: dict
591   @return: dictionary with the following keys:
592       - memory: memory size of instance (int)
593       - state: xen state of instance (string)
594       - time: cpu time of instance (float)
595
596   """
597   output = {}
598
599   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
600   if iinfo is not None:
601     output['memory'] = iinfo[2]
602     output['state'] = iinfo[4]
603     output['time'] = iinfo[5]
604
605   return output
606
607
608 def GetInstanceMigratable(instance):
609   """Gives whether an instance can be migrated.
610
611   @type instance: L{objects.Instance}
612   @param instance: object representing the instance to be checked.
613
614   @rtype: tuple
615   @return: tuple of (result, description) where:
616       - result: whether the instance can be migrated or not
617       - description: a description of the issue, if relevant
618
619   """
620   hyper = hypervisor.GetHypervisor(instance.hypervisor)
621   if instance.name not in hyper.ListInstances():
622     return (False, 'not running')
623
624   for idx in range(len(instance.disks)):
625     link_name = _GetBlockDevSymlinkPath(instance.name, idx)
626     if not os.path.islink(link_name):
627       return (False, 'not restarted since ganeti 1.2.5')
628
629   return (True, '')
630
631
632 def GetAllInstancesInfo(hypervisor_list):
633   """Gather data about all instances.
634
635   This is the equivalent of L{GetInstanceInfo}, except that it
636   computes data for all instances at once, thus being faster if one
637   needs data about more than one instance.
638
639   @type hypervisor_list: list
640   @param hypervisor_list: list of hypervisors to query for instance data
641
642   @rtype: dict
643   @return: dictionary of instance: data, with data having the following keys:
644       - memory: memory size of instance (int)
645       - state: xen state of instance (string)
646       - time: cpu time of instance (float)
647       - vcpus: the number of vcpus
648
649   """
650   output = {}
651
652   for hname in hypervisor_list:
653     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
654     if iinfo:
655       for name, inst_id, memory, vcpus, state, times in iinfo:
656         value = {
657           'memory': memory,
658           'vcpus': vcpus,
659           'state': state,
660           'time': times,
661           }
662         if name in output and output[name] != value:
663           raise errors.HypervisorError("Instance %s running duplicate"
664                                        " with different parameters" % name)
665         output[name] = value
666
667   return output
668
669
670 def AddOSToInstance(instance):
671   """Add an OS to an instance.
672
673   @type instance: L{objects.Instance}
674   @param instance: Instance whose OS is to be installed
675   @rtype: boolean
676   @return: the success of the operation
677
678   """
679   inst_os = OSFromDisk(instance.os)
680
681   create_env = OSEnvironment(instance)
682
683   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
684                                      instance.name, int(time.time()))
685
686   result = utils.RunCmd([inst_os.create_script], env=create_env,
687                         cwd=inst_os.path, output=logfile,)
688   if result.failed:
689     logging.error("os create command '%s' returned error: %s, logfile: %s,"
690                   " output: %s", result.cmd, result.fail_reason, logfile,
691                   result.output)
692     lines = [val.encode("string_escape")
693              for val in utils.TailFile(logfile, lines=20)]
694     return (False, "OS create script failed (%s), last lines in the"
695             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
696
697   return (True, "Successfully installed")
698
699
700 def RunRenameInstance(instance, old_name):
701   """Run the OS rename script for an instance.
702
703   @type instance: L{objects.Instance}
704   @param instance: Instance whose OS is to be installed
705   @type old_name: string
706   @param old_name: previous instance name
707   @rtype: boolean
708   @return: the success of the operation
709
710   """
711   inst_os = OSFromDisk(instance.os)
712
713   rename_env = OSEnvironment(instance)
714   rename_env['OLD_INSTANCE_NAME'] = old_name
715
716   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
717                                            old_name,
718                                            instance.name, int(time.time()))
719
720   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
721                         cwd=inst_os.path, output=logfile)
722
723   if result.failed:
724     logging.error("os create command '%s' returned error: %s output: %s",
725                   result.cmd, result.fail_reason, result.output)
726     lines = [val.encode("string_escape")
727              for val in utils.TailFile(logfile, lines=20)]
728     return (False, "OS rename script failed (%s), last lines in the"
729             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
730
731   return (True, "Rename successful")
732
733
734 def _GetVGInfo(vg_name):
735   """Get informations about the volume group.
736
737   @type vg_name: str
738   @param vg_name: the volume group which we query
739   @rtype: dict
740   @return:
741     A dictionary with the following keys:
742       - C{vg_size} is the total size of the volume group in MiB
743       - C{vg_free} is the free size of the volume group in MiB
744       - C{pv_count} are the number of physical disks in that VG
745
746     If an error occurs during gathering of data, we return the same dict
747     with keys all set to None.
748
749   """
750   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
751
752   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
753                          "--nosuffix", "--units=m", "--separator=:", vg_name])
754
755   if retval.failed:
756     logging.error("volume group %s not present", vg_name)
757     return retdic
758   valarr = retval.stdout.strip().rstrip(':').split(':')
759   if len(valarr) == 3:
760     try:
761       retdic = {
762         "vg_size": int(round(float(valarr[0]), 0)),
763         "vg_free": int(round(float(valarr[1]), 0)),
764         "pv_count": int(valarr[2]),
765         }
766     except ValueError, err:
767       logging.exception("Fail to parse vgs output")
768   else:
769     logging.error("vgs output has the wrong number of fields (expected"
770                   " three): %s", str(valarr))
771   return retdic
772
773
774 def _GetBlockDevSymlinkPath(instance_name, idx):
775   return os.path.join(constants.DISK_LINKS_DIR,
776                       "%s:%d" % (instance_name, idx))
777
778
779 def _SymlinkBlockDev(instance_name, device_path, idx):
780   """Set up symlinks to a instance's block device.
781
782   This is an auxiliary function run when an instance is start (on the primary
783   node) or when an instance is migrated (on the target node).
784
785
786   @param instance_name: the name of the target instance
787   @param device_path: path of the physical block device, on the node
788   @param idx: the disk index
789   @return: absolute path to the disk's symlink
790
791   """
792   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
793   try:
794     os.symlink(device_path, link_name)
795   except OSError, err:
796     if err.errno == errno.EEXIST:
797       if (not os.path.islink(link_name) or
798           os.readlink(link_name) != device_path):
799         os.remove(link_name)
800         os.symlink(device_path, link_name)
801     else:
802       raise
803
804   return link_name
805
806
807 def _RemoveBlockDevLinks(instance_name, disks):
808   """Remove the block device symlinks belonging to the given instance.
809
810   """
811   for idx, disk in enumerate(disks):
812     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
813     if os.path.islink(link_name):
814       try:
815         os.remove(link_name)
816       except OSError:
817         logging.exception("Can't remove symlink '%s'", link_name)
818
819
820 def _GatherAndLinkBlockDevs(instance):
821   """Set up an instance's block device(s).
822
823   This is run on the primary node at instance startup. The block
824   devices must be already assembled.
825
826   @type instance: L{objects.Instance}
827   @param instance: the instance whose disks we shoul assemble
828   @rtype: list
829   @return: list of (disk_object, device_path)
830
831   """
832   block_devices = []
833   for idx, disk in enumerate(instance.disks):
834     device = _RecursiveFindBD(disk)
835     if device is None:
836       raise errors.BlockDeviceError("Block device '%s' is not set up." %
837                                     str(disk))
838     device.Open()
839     try:
840       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
841     except OSError, e:
842       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
843                                     e.strerror)
844
845     block_devices.append((disk, link_name))
846
847   return block_devices
848
849
850 def StartInstance(instance, extra_args):
851   """Start an instance.
852
853   @type instance: L{objects.Instance}
854   @param instance: the instance object
855   @rtype: boolean
856   @return: whether the startup was successful or not
857
858   """
859   running_instances = GetInstanceList([instance.hypervisor])
860
861   if instance.name in running_instances:
862     return (True, "Already running")
863
864   try:
865     block_devices = _GatherAndLinkBlockDevs(instance)
866     hyper = hypervisor.GetHypervisor(instance.hypervisor)
867     hyper.StartInstance(instance, block_devices, extra_args)
868   except errors.BlockDeviceError, err:
869     logging.exception("Failed to start instance")
870     return (False, "Block device error: %s" % str(err))
871   except errors.HypervisorError, err:
872     logging.exception("Failed to start instance")
873     _RemoveBlockDevLinks(instance.name, instance.disks)
874     return (False, "Hypervisor error: %s" % str(err))
875
876   return (True, "Instance started successfully")
877
878
879 def ShutdownInstance(instance):
880   """Shut an instance down.
881
882   @note: this functions uses polling with a hardcoded timeout.
883
884   @type instance: L{objects.Instance}
885   @param instance: the instance object
886   @rtype: boolean
887   @return: whether the startup was successful or not
888
889   """
890   hv_name = instance.hypervisor
891   running_instances = GetInstanceList([hv_name])
892
893   if instance.name not in running_instances:
894     return True
895
896   hyper = hypervisor.GetHypervisor(hv_name)
897   try:
898     hyper.StopInstance(instance)
899   except errors.HypervisorError, err:
900     logging.error("Failed to stop instance: %s" % err)
901     return False
902
903   # test every 10secs for 2min
904
905   time.sleep(1)
906   for dummy in range(11):
907     if instance.name not in GetInstanceList([hv_name]):
908       break
909     time.sleep(10)
910   else:
911     # the shutdown did not succeed
912     logging.error("Shutdown of '%s' unsuccessful, using destroy",
913                   instance.name)
914
915     try:
916       hyper.StopInstance(instance, force=True)
917     except errors.HypervisorError, err:
918       logging.exception("Failed to stop instance: %s" % err)
919       return False
920
921     time.sleep(1)
922     if instance.name in GetInstanceList([hv_name]):
923       logging.error("Could not shutdown instance '%s' even by destroy",
924                     instance.name)
925       return False
926
927   _RemoveBlockDevLinks(instance.name, instance.disks)
928
929   return True
930
931
932 def RebootInstance(instance, reboot_type, extra_args):
933   """Reboot an instance.
934
935   @type instance: L{objects.Instance}
936   @param instance: the instance object to reboot
937   @type reboot_type: str
938   @param reboot_type: the type of reboot, one the following
939     constants:
940       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
941         instance OS, do not recreate the VM
942       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
943         restart the VM (at the hypervisor level)
944       - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
945         is not accepted here, since that mode is handled
946         differently
947   @rtype: boolean
948   @return: the success of the operation
949
950   """
951   running_instances = GetInstanceList([instance.hypervisor])
952
953   if instance.name not in running_instances:
954     logging.error("Cannot reboot instance that is not running")
955     return False
956
957   hyper = hypervisor.GetHypervisor(instance.hypervisor)
958   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
959     try:
960       hyper.RebootInstance(instance)
961     except errors.HypervisorError, err:
962       logging.exception("Failed to soft reboot instance")
963       return False
964   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
965     try:
966       ShutdownInstance(instance)
967       StartInstance(instance, extra_args)
968     except errors.HypervisorError, err:
969       logging.exception("Failed to hard reboot instance")
970       return False
971   else:
972     raise errors.ParameterError("reboot_type invalid")
973
974   return True
975
976
977 def MigrationInfo(instance):
978   """Gather information about an instance to be migrated.
979
980   @type instance: L{objects.Instance}
981   @param instance: the instance definition
982
983   """
984   hyper = hypervisor.GetHypervisor(instance.hypervisor)
985   try:
986     info = hyper.MigrationInfo(instance)
987   except errors.HypervisorError, err:
988     msg = "Failed to fetch migration information"
989     logging.exception(msg)
990     return (False, '%s: %s' % (msg, err))
991   return (True, info)
992
993
994 def AcceptInstance(instance, info, target):
995   """Prepare the node to accept an instance.
996
997   @type instance: L{objects.Instance}
998   @param instance: the instance definition
999   @type info: string/data (opaque)
1000   @param info: migration information, from the source node
1001   @type target: string
1002   @param target: target host (usually ip), on this node
1003
1004   """
1005   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1006   try:
1007     hyper.AcceptInstance(instance, info, target)
1008   except errors.HypervisorError, err:
1009     msg = "Failed to accept instance"
1010     logging.exception(msg)
1011     return (False, '%s: %s' % (msg, err))
1012   return (True, "Accept successfull")
1013
1014
1015 def FinalizeMigration(instance, info, success):
1016   """Finalize any preparation to accept an instance.
1017
1018   @type instance: L{objects.Instance}
1019   @param instance: the instance definition
1020   @type info: string/data (opaque)
1021   @param info: migration information, from the source node
1022   @type success: boolean
1023   @param success: whether the migration was a success or a failure
1024
1025   """
1026   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1027   try:
1028     hyper.FinalizeMigration(instance, info, success)
1029   except errors.HypervisorError, err:
1030     msg = "Failed to finalize migration"
1031     logging.exception(msg)
1032     return (False, '%s: %s' % (msg, err))
1033   return (True, "Migration Finalized")
1034
1035
1036 def MigrateInstance(instance, target, live):
1037   """Migrates an instance to another node.
1038
1039   @type instance: L{objects.Instance}
1040   @param instance: the instance definition
1041   @type target: string
1042   @param target: the target node name
1043   @type live: boolean
1044   @param live: whether the migration should be done live or not (the
1045       interpretation of this parameter is left to the hypervisor)
1046   @rtype: tuple
1047   @return: a tuple of (success, msg) where:
1048       - succes is a boolean denoting the success/failure of the operation
1049       - msg is a string with details in case of failure
1050
1051   """
1052   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1053
1054   try:
1055     hyper.MigrateInstance(instance.name, target, live)
1056   except errors.HypervisorError, err:
1057     msg = "Failed to migrate instance"
1058     logging.exception(msg)
1059     return (False, "%s: %s" % (msg, err))
1060   return (True, "Migration successfull")
1061
1062
1063 def CreateBlockDevice(disk, size, owner, on_primary, info):
1064   """Creates a block device for an instance.
1065
1066   @type disk: L{objects.Disk}
1067   @param disk: the object describing the disk we should create
1068   @type size: int
1069   @param size: the size of the physical underlying device, in MiB
1070   @type owner: str
1071   @param owner: the name of the instance for which disk is created,
1072       used for device cache data
1073   @type on_primary: boolean
1074   @param on_primary:  indicates if it is the primary node or not
1075   @type info: string
1076   @param info: string that will be sent to the physical device
1077       creation, used for example to set (LVM) tags on LVs
1078
1079   @return: the new unique_id of the device (this can sometime be
1080       computed only after creation), or None. On secondary nodes,
1081       it's not required to return anything.
1082
1083   """
1084   clist = []
1085   if disk.children:
1086     for child in disk.children:
1087       crdev = _RecursiveAssembleBD(child, owner, on_primary)
1088       if on_primary or disk.AssembleOnSecondary():
1089         # we need the children open in case the device itself has to
1090         # be assembled
1091         crdev.Open()
1092       clist.append(crdev)
1093
1094   try:
1095     device = bdev.Create(disk.dev_type, disk.physical_id, clist, size)
1096   except errors.GenericError, err:
1097     return False, "Can't create block device: %s" % str(err)
1098
1099   if on_primary or disk.AssembleOnSecondary():
1100     if not device.Assemble():
1101       errorstring = "Can't assemble device after creation, very unusual event"
1102       logging.error(errorstring)
1103       return False, errorstring
1104     device.SetSyncSpeed(constants.SYNC_SPEED)
1105     if on_primary or disk.OpenOnSecondary():
1106       device.Open(force=True)
1107     DevCacheManager.UpdateCache(device.dev_path, owner,
1108                                 on_primary, disk.iv_name)
1109
1110   device.SetInfo(info)
1111
1112   physical_id = device.unique_id
1113   return True, physical_id
1114
1115
1116 def RemoveBlockDevice(disk):
1117   """Remove a block device.
1118
1119   @note: This is intended to be called recursively.
1120
1121   @type disk: L{objects.Disk}
1122   @param disk: the disk object we should remove
1123   @rtype: boolean
1124   @return: the success of the operation
1125
1126   """
1127   try:
1128     rdev = _RecursiveFindBD(disk)
1129   except errors.BlockDeviceError, err:
1130     # probably can't attach
1131     logging.info("Can't attach to device %s in remove", disk)
1132     rdev = None
1133   if rdev is not None:
1134     r_path = rdev.dev_path
1135     result = rdev.Remove()
1136     if result:
1137       DevCacheManager.RemoveCache(r_path)
1138   else:
1139     result = True
1140   if disk.children:
1141     for child in disk.children:
1142       result = result and RemoveBlockDevice(child)
1143   return result
1144
1145
1146 def _RecursiveAssembleBD(disk, owner, as_primary):
1147   """Activate a block device for an instance.
1148
1149   This is run on the primary and secondary nodes for an instance.
1150
1151   @note: this function is called recursively.
1152
1153   @type disk: L{objects.Disk}
1154   @param disk: the disk we try to assemble
1155   @type owner: str
1156   @param owner: the name of the instance which owns the disk
1157   @type as_primary: boolean
1158   @param as_primary: if we should make the block device
1159       read/write
1160
1161   @return: the assembled device or None (in case no device
1162       was assembled)
1163   @raise errors.BlockDeviceError: in case there is an error
1164       during the activation of the children or the device
1165       itself
1166
1167   """
1168   children = []
1169   if disk.children:
1170     mcn = disk.ChildrenNeeded()
1171     if mcn == -1:
1172       mcn = 0 # max number of Nones allowed
1173     else:
1174       mcn = len(disk.children) - mcn # max number of Nones
1175     for chld_disk in disk.children:
1176       try:
1177         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1178       except errors.BlockDeviceError, err:
1179         if children.count(None) >= mcn:
1180           raise
1181         cdev = None
1182         logging.debug("Error in child activation: %s", str(err))
1183       children.append(cdev)
1184
1185   if as_primary or disk.AssembleOnSecondary():
1186     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children)
1187     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1188     result = r_dev
1189     if as_primary or disk.OpenOnSecondary():
1190       r_dev.Open()
1191     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1192                                 as_primary, disk.iv_name)
1193
1194   else:
1195     result = True
1196   return result
1197
1198
1199 def AssembleBlockDevice(disk, owner, as_primary):
1200   """Activate a block device for an instance.
1201
1202   This is a wrapper over _RecursiveAssembleBD.
1203
1204   @rtype: str or boolean
1205   @return: a C{/dev/...} path for primary nodes, and
1206       C{True} for secondary nodes
1207
1208   """
1209   result = _RecursiveAssembleBD(disk, owner, as_primary)
1210   if isinstance(result, bdev.BlockDev):
1211     result = result.dev_path
1212   return result
1213
1214
1215 def ShutdownBlockDevice(disk):
1216   """Shut down a block device.
1217
1218   First, if the device is assembled (Attach() is successfull), then
1219   the device is shutdown. Then the children of the device are
1220   shutdown.
1221
1222   This function is called recursively. Note that we don't cache the
1223   children or such, as oppossed to assemble, shutdown of different
1224   devices doesn't require that the upper device was active.
1225
1226   @type disk: L{objects.Disk}
1227   @param disk: the description of the disk we should
1228       shutdown
1229   @rtype: boolean
1230   @return: the success of the operation
1231
1232   """
1233   r_dev = _RecursiveFindBD(disk)
1234   if r_dev is not None:
1235     r_path = r_dev.dev_path
1236     result = r_dev.Shutdown()
1237     if result:
1238       DevCacheManager.RemoveCache(r_path)
1239   else:
1240     result = True
1241   if disk.children:
1242     for child in disk.children:
1243       result = result and ShutdownBlockDevice(child)
1244   return result
1245
1246
1247 def MirrorAddChildren(parent_cdev, new_cdevs):
1248   """Extend a mirrored block device.
1249
1250   @type parent_cdev: L{objects.Disk}
1251   @param parent_cdev: the disk to which we should add children
1252   @type new_cdevs: list of L{objects.Disk}
1253   @param new_cdevs: the list of children which we should add
1254   @rtype: boolean
1255   @return: the success of the operation
1256
1257   """
1258   parent_bdev = _RecursiveFindBD(parent_cdev)
1259   if parent_bdev is None:
1260     logging.error("Can't find parent device")
1261     return False
1262   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1263   if new_bdevs.count(None) > 0:
1264     logging.error("Can't find new device(s) to add: %s:%s",
1265                   new_bdevs, new_cdevs)
1266     return False
1267   parent_bdev.AddChildren(new_bdevs)
1268   return True
1269
1270
1271 def MirrorRemoveChildren(parent_cdev, new_cdevs):
1272   """Shrink a mirrored block device.
1273
1274   @type parent_cdev: L{objects.Disk}
1275   @param parent_cdev: the disk from which we should remove children
1276   @type new_cdevs: list of L{objects.Disk}
1277   @param new_cdevs: the list of children which we should remove
1278   @rtype: boolean
1279   @return: the success of the operation
1280
1281   """
1282   parent_bdev = _RecursiveFindBD(parent_cdev)
1283   if parent_bdev is None:
1284     logging.error("Can't find parent in remove children: %s", parent_cdev)
1285     return False
1286   devs = []
1287   for disk in new_cdevs:
1288     rpath = disk.StaticDevPath()
1289     if rpath is None:
1290       bd = _RecursiveFindBD(disk)
1291       if bd is None:
1292         logging.error("Can't find dynamic device %s while removing children",
1293                       disk)
1294         return False
1295       else:
1296         devs.append(bd.dev_path)
1297     else:
1298       devs.append(rpath)
1299   parent_bdev.RemoveChildren(devs)
1300   return True
1301
1302
1303 def GetMirrorStatus(disks):
1304   """Get the mirroring status of a list of devices.
1305
1306   @type disks: list of L{objects.Disk}
1307   @param disks: the list of disks which we should query
1308   @rtype: disk
1309   @return:
1310       a list of (mirror_done, estimated_time) tuples, which
1311       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1312   @raise errors.BlockDeviceError: if any of the disks cannot be
1313       found
1314
1315   """
1316   stats = []
1317   for dsk in disks:
1318     rbd = _RecursiveFindBD(dsk)
1319     if rbd is None:
1320       raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1321     stats.append(rbd.CombinedSyncStatus())
1322   return stats
1323
1324
1325 def _RecursiveFindBD(disk):
1326   """Check if a device is activated.
1327
1328   If so, return informations about the real device.
1329
1330   @type disk: L{objects.Disk}
1331   @param disk: the disk object we need to find
1332
1333   @return: None if the device can't be found,
1334       otherwise the device instance
1335
1336   """
1337   children = []
1338   if disk.children:
1339     for chdisk in disk.children:
1340       children.append(_RecursiveFindBD(chdisk))
1341
1342   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1343
1344
1345 def FindBlockDevice(disk):
1346   """Check if a device is activated.
1347
1348   If it is, return informations about the real device.
1349
1350   @type disk: L{objects.Disk}
1351   @param disk: the disk to find
1352   @rtype: None or tuple
1353   @return: None if the disk cannot be found, otherwise a
1354       tuple (device_path, major, minor, sync_percent,
1355       estimated_time, is_degraded)
1356
1357   """
1358   rbd = _RecursiveFindBD(disk)
1359   if rbd is None:
1360     return rbd
1361   return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1362
1363
1364 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1365   """Write a file to the filesystem.
1366
1367   This allows the master to overwrite(!) a file. It will only perform
1368   the operation if the file belongs to a list of configuration files.
1369
1370   @type file_name: str
1371   @param file_name: the target file name
1372   @type data: str
1373   @param data: the new contents of the file
1374   @type mode: int
1375   @param mode: the mode to give the file (can be None)
1376   @type uid: int
1377   @param uid: the owner of the file (can be -1 for default)
1378   @type gid: int
1379   @param gid: the group of the file (can be -1 for default)
1380   @type atime: float
1381   @param atime: the atime to set on the file (can be None)
1382   @type mtime: float
1383   @param mtime: the mtime to set on the file (can be None)
1384   @rtype: boolean
1385   @return: the success of the operation; errors are logged
1386       in the node daemon log
1387
1388   """
1389   if not os.path.isabs(file_name):
1390     logging.error("Filename passed to UploadFile is not absolute: '%s'",
1391                   file_name)
1392     return False
1393
1394   allowed_files = [
1395     constants.CLUSTER_CONF_FILE,
1396     constants.ETC_HOSTS,
1397     constants.SSH_KNOWN_HOSTS_FILE,
1398     constants.VNC_PASSWORD_FILE,
1399     ]
1400
1401   if file_name not in allowed_files:
1402     logging.error("Filename passed to UploadFile not in allowed"
1403                  " upload targets: '%s'", file_name)
1404     return False
1405
1406   raw_data = _Decompress(data)
1407
1408   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1409                   atime=atime, mtime=mtime)
1410   return True
1411
1412
1413 def WriteSsconfFiles(values):
1414   """Update all ssconf files.
1415
1416   Wrapper around the SimpleStore.WriteFiles.
1417
1418   """
1419   ssconf.SimpleStore().WriteFiles(values)
1420
1421
1422 def _ErrnoOrStr(err):
1423   """Format an EnvironmentError exception.
1424
1425   If the L{err} argument has an errno attribute, it will be looked up
1426   and converted into a textual C{E...} description. Otherwise the
1427   string representation of the error will be returned.
1428
1429   @type err: L{EnvironmentError}
1430   @param err: the exception to format
1431
1432   """
1433   if hasattr(err, 'errno'):
1434     detail = errno.errorcode[err.errno]
1435   else:
1436     detail = str(err)
1437   return detail
1438
1439
1440 def _OSOndiskVersion(name, os_dir):
1441   """Compute and return the API version of a given OS.
1442
1443   This function will try to read the API version of the OS given by
1444   the 'name' parameter and residing in the 'os_dir' directory.
1445
1446   @type name: str
1447   @param name: the OS name we should look for
1448   @type os_dir: str
1449   @param os_dir: the directory inwhich we should look for the OS
1450   @rtype: int or None
1451   @return:
1452       Either an integer denoting the version or None in the
1453       case when this is not a valid OS name.
1454   @raise errors.InvalidOS: if the OS cannot be found
1455
1456   """
1457   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1458
1459   try:
1460     st = os.stat(api_file)
1461   except EnvironmentError, err:
1462     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1463                            " found (%s)" % _ErrnoOrStr(err))
1464
1465   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1466     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1467                            " a regular file")
1468
1469   try:
1470     f = open(api_file)
1471     try:
1472       api_versions = f.readlines()
1473     finally:
1474       f.close()
1475   except EnvironmentError, err:
1476     raise errors.InvalidOS(name, os_dir, "error while reading the"
1477                            " API version (%s)" % _ErrnoOrStr(err))
1478
1479   api_versions = [version.strip() for version in api_versions]
1480   try:
1481     api_versions = [int(version) for version in api_versions]
1482   except (TypeError, ValueError), err:
1483     raise errors.InvalidOS(name, os_dir,
1484                            "API version is not integer (%s)" % str(err))
1485
1486   return api_versions
1487
1488
1489 def DiagnoseOS(top_dirs=None):
1490   """Compute the validity for all OSes.
1491
1492   @type top_dirs: list
1493   @param top_dirs: the list of directories in which to
1494       search (if not given defaults to
1495       L{constants.OS_SEARCH_PATH})
1496   @rtype: list of L{objects.OS}
1497   @return: an OS object for each name in all the given
1498       directories
1499
1500   """
1501   if top_dirs is None:
1502     top_dirs = constants.OS_SEARCH_PATH
1503
1504   result = []
1505   for dir_name in top_dirs:
1506     if os.path.isdir(dir_name):
1507       try:
1508         f_names = utils.ListVisibleFiles(dir_name)
1509       except EnvironmentError, err:
1510         logging.exception("Can't list the OS directory %s", dir_name)
1511         break
1512       for name in f_names:
1513         try:
1514           os_inst = OSFromDisk(name, base_dir=dir_name)
1515           result.append(os_inst)
1516         except errors.InvalidOS, err:
1517           result.append(objects.OS.FromInvalidOS(err))
1518
1519   return result
1520
1521
1522 def OSFromDisk(name, base_dir=None):
1523   """Create an OS instance from disk.
1524
1525   This function will return an OS instance if the given name is a
1526   valid OS name. Otherwise, it will raise an appropriate
1527   L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1528
1529   @type base_dir: string
1530   @keyword base_dir: Base directory containing OS installations.
1531                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1532   @rtype: L{objects.OS}
1533   @return: the OS instance if we find a valid one
1534   @raise errors.InvalidOS: if we don't find a valid OS
1535
1536   """
1537   if base_dir is None:
1538     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1539     if os_dir is None:
1540       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1541   else:
1542     os_dir = os.path.sep.join([base_dir, name])
1543
1544   api_versions = _OSOndiskVersion(name, os_dir)
1545
1546   if constants.OS_API_VERSION not in api_versions:
1547     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1548                            " (found %s want %s)"
1549                            % (api_versions, constants.OS_API_VERSION))
1550
1551   # OS Scripts dictionary, we will populate it with the actual script names
1552   os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1553
1554   for script in os_scripts:
1555     os_scripts[script] = os.path.sep.join([os_dir, script])
1556
1557     try:
1558       st = os.stat(os_scripts[script])
1559     except EnvironmentError, err:
1560       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1561                              (script, _ErrnoOrStr(err)))
1562
1563     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1564       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1565                              script)
1566
1567     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1568       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1569                              script)
1570
1571
1572   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1573                     create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1574                     export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1575                     import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1576                     rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1577                     api_versions=api_versions)
1578
1579 def OSEnvironment(instance, debug=0):
1580   """Calculate the environment for an os script.
1581
1582   @type instance: L{objects.Instance}
1583   @param instance: target instance for the os script run
1584   @type debug: integer
1585   @param debug: debug level (0 or 1, for OS Api 10)
1586   @rtype: dict
1587   @return: dict of environment variables
1588   @raise errors.BlockDeviceError: if the block device
1589       cannot be found
1590
1591   """
1592   result = {}
1593   result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1594   result['INSTANCE_NAME'] = instance.name
1595   result['HYPERVISOR'] = instance.hypervisor
1596   result['DISK_COUNT'] = '%d' % len(instance.disks)
1597   result['NIC_COUNT'] = '%d' % len(instance.nics)
1598   result['DEBUG_LEVEL'] = '%d' % debug
1599   for idx, disk in enumerate(instance.disks):
1600     real_disk = _RecursiveFindBD(disk)
1601     if real_disk is None:
1602       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1603                                     str(disk))
1604     real_disk.Open()
1605     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1606     # FIXME: When disks will have read-only mode, populate this
1607     result['DISK_%d_ACCESS' % idx] = 'W'
1608     if constants.HV_DISK_TYPE in instance.hvparams:
1609       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1610         instance.hvparams[constants.HV_DISK_TYPE]
1611     if disk.dev_type in constants.LDS_BLOCK:
1612       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1613     elif disk.dev_type == constants.LD_FILE:
1614       result['DISK_%d_BACKEND_TYPE' % idx] = \
1615         'file:%s' % disk.physical_id[0]
1616   for idx, nic in enumerate(instance.nics):
1617     result['NIC_%d_MAC' % idx] = nic.mac
1618     if nic.ip:
1619       result['NIC_%d_IP' % idx] = nic.ip
1620     result['NIC_%d_BRIDGE' % idx] = nic.bridge
1621     if constants.HV_NIC_TYPE in instance.hvparams:
1622       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1623         instance.hvparams[constants.HV_NIC_TYPE]
1624
1625   return result
1626
1627 def GrowBlockDevice(disk, amount):
1628   """Grow a stack of block devices.
1629
1630   This function is called recursively, with the childrens being the
1631   first ones to resize.
1632
1633   @type disk: L{objects.Disk}
1634   @param disk: the disk to be grown
1635   @rtype: (status, result)
1636   @return: a tuple with the status of the operation
1637       (True/False), and the errors message if status
1638       is False
1639
1640   """
1641   r_dev = _RecursiveFindBD(disk)
1642   if r_dev is None:
1643     return False, "Cannot find block device %s" % (disk,)
1644
1645   try:
1646     r_dev.Grow(amount)
1647   except errors.BlockDeviceError, err:
1648     return False, str(err)
1649
1650   return True, None
1651
1652
1653 def SnapshotBlockDevice(disk):
1654   """Create a snapshot copy of a block device.
1655
1656   This function is called recursively, and the snapshot is actually created
1657   just for the leaf lvm backend device.
1658
1659   @type disk: L{objects.Disk}
1660   @param disk: the disk to be snapshotted
1661   @rtype: string
1662   @return: snapshot disk path
1663
1664   """
1665   if disk.children:
1666     if len(disk.children) == 1:
1667       # only one child, let's recurse on it
1668       return SnapshotBlockDevice(disk.children[0])
1669     else:
1670       # more than one child, choose one that matches
1671       for child in disk.children:
1672         if child.size == disk.size:
1673           # return implies breaking the loop
1674           return SnapshotBlockDevice(child)
1675   elif disk.dev_type == constants.LD_LV:
1676     r_dev = _RecursiveFindBD(disk)
1677     if r_dev is not None:
1678       # let's stay on the safe side and ask for the full size, for now
1679       return r_dev.Snapshot(disk.size)
1680     else:
1681       return None
1682   else:
1683     raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1684                                  " '%s' of type '%s'" %
1685                                  (disk.unique_id, disk.dev_type))
1686
1687
1688 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1689   """Export a block device snapshot to a remote node.
1690
1691   @type disk: L{objects.Disk}
1692   @param disk: the description of the disk to export
1693   @type dest_node: str
1694   @param dest_node: the destination node to export to
1695   @type instance: L{objects.Instance}
1696   @param instance: the instance object to whom the disk belongs
1697   @type cluster_name: str
1698   @param cluster_name: the cluster name, needed for SSH hostalias
1699   @type idx: int
1700   @param idx: the index of the disk in the instance's disk list,
1701       used to export to the OS scripts environment
1702   @rtype: boolean
1703   @return: the success of the operation
1704
1705   """
1706   export_env = OSEnvironment(instance)
1707
1708   inst_os = OSFromDisk(instance.os)
1709   export_script = inst_os.export_script
1710
1711   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1712                                      instance.name, int(time.time()))
1713   if not os.path.exists(constants.LOG_OS_DIR):
1714     os.mkdir(constants.LOG_OS_DIR, 0750)
1715   real_disk = _RecursiveFindBD(disk)
1716   if real_disk is None:
1717     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1718                                   str(disk))
1719   real_disk.Open()
1720
1721   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1722   export_env['EXPORT_INDEX'] = str(idx)
1723
1724   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1725   destfile = disk.physical_id[1]
1726
1727   # the target command is built out of three individual commands,
1728   # which are joined by pipes; we check each individual command for
1729   # valid parameters
1730   expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1731                                export_script, logfile)
1732
1733   comprcmd = "gzip"
1734
1735   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1736                                 destdir, destdir, destfile)
1737   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1738                                                    constants.GANETI_RUNAS,
1739                                                    destcmd)
1740
1741   # all commands have been checked, so we're safe to combine them
1742   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1743
1744   result = utils.RunCmd(command, env=export_env)
1745
1746   if result.failed:
1747     logging.error("os snapshot export command '%s' returned error: %s"
1748                   " output: %s", command, result.fail_reason, result.output)
1749     return False
1750
1751   return True
1752
1753
1754 def FinalizeExport(instance, snap_disks):
1755   """Write out the export configuration information.
1756
1757   @type instance: L{objects.Instance}
1758   @param instance: the instance which we export, used for
1759       saving configuration
1760   @type snap_disks: list of L{objects.Disk}
1761   @param snap_disks: list of snapshot block devices, which
1762       will be used to get the actual name of the dump file
1763
1764   @rtype: boolean
1765   @return: the success of the operation
1766
1767   """
1768   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1769   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1770
1771   config = objects.SerializableConfigParser()
1772
1773   config.add_section(constants.INISECT_EXP)
1774   config.set(constants.INISECT_EXP, 'version', '0')
1775   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1776   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1777   config.set(constants.INISECT_EXP, 'os', instance.os)
1778   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1779
1780   config.add_section(constants.INISECT_INS)
1781   config.set(constants.INISECT_INS, 'name', instance.name)
1782   config.set(constants.INISECT_INS, 'memory', '%d' %
1783              instance.beparams[constants.BE_MEMORY])
1784   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1785              instance.beparams[constants.BE_VCPUS])
1786   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1787
1788   nic_total = 0
1789   for nic_count, nic in enumerate(instance.nics):
1790     nic_total += 1
1791     config.set(constants.INISECT_INS, 'nic%d_mac' %
1792                nic_count, '%s' % nic.mac)
1793     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1794     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1795                '%s' % nic.bridge)
1796   # TODO: redundant: on load can read nics until it doesn't exist
1797   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1798
1799   disk_total = 0
1800   for disk_count, disk in enumerate(snap_disks):
1801     if disk:
1802       disk_total += 1
1803       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1804                  ('%s' % disk.iv_name))
1805       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1806                  ('%s' % disk.physical_id[1]))
1807       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1808                  ('%d' % disk.size))
1809
1810   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1811
1812   utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1813                   data=config.Dumps())
1814   shutil.rmtree(finaldestdir, True)
1815   shutil.move(destdir, finaldestdir)
1816
1817   return True
1818
1819
1820 def ExportInfo(dest):
1821   """Get export configuration information.
1822
1823   @type dest: str
1824   @param dest: directory containing the export
1825
1826   @rtype: L{objects.SerializableConfigParser}
1827   @return: a serializable config file containing the
1828       export info
1829
1830   """
1831   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1832
1833   config = objects.SerializableConfigParser()
1834   config.read(cff)
1835
1836   if (not config.has_section(constants.INISECT_EXP) or
1837       not config.has_section(constants.INISECT_INS)):
1838     return None
1839
1840   return config
1841
1842
1843 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1844   """Import an os image into an instance.
1845
1846   @type instance: L{objects.Instance}
1847   @param instance: instance to import the disks into
1848   @type src_node: string
1849   @param src_node: source node for the disk images
1850   @type src_images: list of string
1851   @param src_images: absolute paths of the disk images
1852   @rtype: list of boolean
1853   @return: each boolean represent the success of importing the n-th disk
1854
1855   """
1856   import_env = OSEnvironment(instance)
1857   inst_os = OSFromDisk(instance.os)
1858   import_script = inst_os.import_script
1859
1860   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1861                                         instance.name, int(time.time()))
1862   if not os.path.exists(constants.LOG_OS_DIR):
1863     os.mkdir(constants.LOG_OS_DIR, 0750)
1864
1865   comprcmd = "gunzip"
1866   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1867                                import_script, logfile)
1868
1869   final_result = []
1870   for idx, image in enumerate(src_images):
1871     if image:
1872       destcmd = utils.BuildShellCmd('cat %s', image)
1873       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1874                                                        constants.GANETI_RUNAS,
1875                                                        destcmd)
1876       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1877       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1878       import_env['IMPORT_INDEX'] = str(idx)
1879       result = utils.RunCmd(command, env=import_env)
1880       if result.failed:
1881         logging.error("Disk import command '%s' returned error: %s"
1882                       " output: %s", command, result.fail_reason,
1883                       result.output)
1884         final_result.append(False)
1885       else:
1886         final_result.append(True)
1887     else:
1888       final_result.append(True)
1889
1890   return final_result
1891
1892
1893 def ListExports():
1894   """Return a list of exports currently available on this machine.
1895
1896   @rtype: list
1897   @return: list of the exports
1898
1899   """
1900   if os.path.isdir(constants.EXPORT_DIR):
1901     return utils.ListVisibleFiles(constants.EXPORT_DIR)
1902   else:
1903     return []
1904
1905
1906 def RemoveExport(export):
1907   """Remove an existing export from the node.
1908
1909   @type export: str
1910   @param export: the name of the export to remove
1911   @rtype: boolean
1912   @return: the success of the operation
1913
1914   """
1915   target = os.path.join(constants.EXPORT_DIR, export)
1916
1917   shutil.rmtree(target)
1918   # TODO: catch some of the relevant exceptions and provide a pretty
1919   # error message if rmtree fails.
1920
1921   return True
1922
1923
1924 def RenameBlockDevices(devlist):
1925   """Rename a list of block devices.
1926
1927   @type devlist: list of tuples
1928   @param devlist: list of tuples of the form  (disk,
1929       new_logical_id, new_physical_id); disk is an
1930       L{objects.Disk} object describing the current disk,
1931       and new logical_id/physical_id is the name we
1932       rename it to
1933   @rtype: boolean
1934   @return: True if all renames succeeded, False otherwise
1935
1936   """
1937   result = True
1938   for disk, unique_id in devlist:
1939     dev = _RecursiveFindBD(disk)
1940     if dev is None:
1941       result = False
1942       continue
1943     try:
1944       old_rpath = dev.dev_path
1945       dev.Rename(unique_id)
1946       new_rpath = dev.dev_path
1947       if old_rpath != new_rpath:
1948         DevCacheManager.RemoveCache(old_rpath)
1949         # FIXME: we should add the new cache information here, like:
1950         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1951         # but we don't have the owner here - maybe parse from existing
1952         # cache? for now, we only lose lvm data when we rename, which
1953         # is less critical than DRBD or MD
1954     except errors.BlockDeviceError, err:
1955       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1956       result = False
1957   return result
1958
1959
1960 def _TransformFileStorageDir(file_storage_dir):
1961   """Checks whether given file_storage_dir is valid.
1962
1963   Checks wheter the given file_storage_dir is within the cluster-wide
1964   default file_storage_dir stored in SimpleStore. Only paths under that
1965   directory are allowed.
1966
1967   @type file_storage_dir: str
1968   @param file_storage_dir: the path to check
1969
1970   @return: the normalized path if valid, None otherwise
1971
1972   """
1973   cfg = _GetConfig()
1974   file_storage_dir = os.path.normpath(file_storage_dir)
1975   base_file_storage_dir = cfg.GetFileStorageDir()
1976   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1977       base_file_storage_dir):
1978     logging.error("file storage directory '%s' is not under base file"
1979                   " storage directory '%s'",
1980                   file_storage_dir, base_file_storage_dir)
1981     return None
1982   return file_storage_dir
1983
1984
1985 def CreateFileStorageDir(file_storage_dir):
1986   """Create file storage directory.
1987
1988   @type file_storage_dir: str
1989   @param file_storage_dir: directory to create
1990
1991   @rtype: tuple
1992   @return: tuple with first element a boolean indicating wheter dir
1993       creation was successful or not
1994
1995   """
1996   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1997   result = True,
1998   if not file_storage_dir:
1999     result = False,
2000   else:
2001     if os.path.exists(file_storage_dir):
2002       if not os.path.isdir(file_storage_dir):
2003         logging.error("'%s' is not a directory", file_storage_dir)
2004         result = False,
2005     else:
2006       try:
2007         os.makedirs(file_storage_dir, 0750)
2008       except OSError, err:
2009         logging.error("Cannot create file storage directory '%s': %s",
2010                       file_storage_dir, err)
2011         result = False,
2012   return result
2013
2014
2015 def RemoveFileStorageDir(file_storage_dir):
2016   """Remove file storage directory.
2017
2018   Remove it only if it's empty. If not log an error and return.
2019
2020   @type file_storage_dir: str
2021   @param file_storage_dir: the directory we should cleanup
2022   @rtype: tuple (success,)
2023   @return: tuple of one element, C{success}, denoting
2024       whether the operation was successfull
2025
2026   """
2027   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2028   result = True,
2029   if not file_storage_dir:
2030     result = False,
2031   else:
2032     if os.path.exists(file_storage_dir):
2033       if not os.path.isdir(file_storage_dir):
2034         logging.error("'%s' is not a directory", file_storage_dir)
2035         result = False,
2036       # deletes dir only if empty, otherwise we want to return False
2037       try:
2038         os.rmdir(file_storage_dir)
2039       except OSError, err:
2040         logging.exception("Cannot remove file storage directory '%s'",
2041                           file_storage_dir)
2042         result = False,
2043   return result
2044
2045
2046 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2047   """Rename the file storage directory.
2048
2049   @type old_file_storage_dir: str
2050   @param old_file_storage_dir: the current path
2051   @type new_file_storage_dir: str
2052   @param new_file_storage_dir: the name we should rename to
2053   @rtype: tuple (success,)
2054   @return: tuple of one element, C{success}, denoting
2055       whether the operation was successful
2056
2057   """
2058   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2059   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2060   result = True,
2061   if not old_file_storage_dir or not new_file_storage_dir:
2062     result = False,
2063   else:
2064     if not os.path.exists(new_file_storage_dir):
2065       if os.path.isdir(old_file_storage_dir):
2066         try:
2067           os.rename(old_file_storage_dir, new_file_storage_dir)
2068         except OSError, err:
2069           logging.exception("Cannot rename '%s' to '%s'",
2070                             old_file_storage_dir, new_file_storage_dir)
2071           result =  False,
2072       else:
2073         logging.error("'%s' is not a directory", old_file_storage_dir)
2074         result = False,
2075     else:
2076       if os.path.exists(old_file_storage_dir):
2077         logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
2078                       old_file_storage_dir, new_file_storage_dir)
2079         result = False,
2080   return result
2081
2082
2083 def _IsJobQueueFile(file_name):
2084   """Checks whether the given filename is in the queue directory.
2085
2086   @type file_name: str
2087   @param file_name: the file name we should check
2088   @rtype: boolean
2089   @return: whether the file is under the queue directory
2090
2091   """
2092   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2093   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2094
2095   if not result:
2096     logging.error("'%s' is not a file in the queue directory",
2097                   file_name)
2098
2099   return result
2100
2101
2102 def JobQueueUpdate(file_name, content):
2103   """Updates a file in the queue directory.
2104
2105   This is just a wrapper over L{utils.WriteFile}, with proper
2106   checking.
2107
2108   @type file_name: str
2109   @param file_name: the job file name
2110   @type content: str
2111   @param content: the new job contents
2112   @rtype: boolean
2113   @return: the success of the operation
2114
2115   """
2116   if not _IsJobQueueFile(file_name):
2117     return False
2118
2119   # Write and replace the file atomically
2120   utils.WriteFile(file_name, data=_Decompress(content))
2121
2122   return True
2123
2124
2125 def JobQueueRename(old, new):
2126   """Renames a job queue file.
2127
2128   This is just a wrapper over os.rename with proper checking.
2129
2130   @type old: str
2131   @param old: the old (actual) file name
2132   @type new: str
2133   @param new: the desired file name
2134   @rtype: boolean
2135   @return: the success of the operation
2136
2137   """
2138   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2139     return False
2140
2141   utils.RenameFile(old, new, mkdir=True)
2142
2143   return True
2144
2145
2146 def JobQueueSetDrainFlag(drain_flag):
2147   """Set the drain flag for the queue.
2148
2149   This will set or unset the queue drain flag.
2150
2151   @type drain_flag: boolean
2152   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2153   @rtype: boolean
2154   @return: always True
2155   @warning: the function always returns True
2156
2157   """
2158   if drain_flag:
2159     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2160   else:
2161     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2162
2163   return True
2164
2165
2166 def CloseBlockDevices(instance_name, disks):
2167   """Closes the given block devices.
2168
2169   This means they will be switched to secondary mode (in case of
2170   DRBD).
2171
2172   @param instance_name: if the argument is not empty, the symlinks
2173       of this instance will be removed
2174   @type disks: list of L{objects.Disk}
2175   @param disks: the list of disks to be closed
2176   @rtype: tuple (success, message)
2177   @return: a tuple of success and message, where success
2178       indicates the succes of the operation, and message
2179       which will contain the error details in case we
2180       failed
2181
2182   """
2183   bdevs = []
2184   for cf in disks:
2185     rd = _RecursiveFindBD(cf)
2186     if rd is None:
2187       return (False, "Can't find device %s" % cf)
2188     bdevs.append(rd)
2189
2190   msg = []
2191   for rd in bdevs:
2192     try:
2193       rd.Close()
2194     except errors.BlockDeviceError, err:
2195       msg.append(str(err))
2196   if msg:
2197     return (False, "Can't make devices secondary: %s" % ",".join(msg))
2198   else:
2199     if instance_name:
2200       _RemoveBlockDevLinks(instance_name, disks)
2201     return (True, "All devices secondary")
2202
2203
2204 def ValidateHVParams(hvname, hvparams):
2205   """Validates the given hypervisor parameters.
2206
2207   @type hvname: string
2208   @param hvname: the hypervisor name
2209   @type hvparams: dict
2210   @param hvparams: the hypervisor parameters to be validated
2211   @rtype: tuple (success, message)
2212   @return: a tuple of success and message, where success
2213       indicates the succes of the operation, and message
2214       which will contain the error details in case we
2215       failed
2216
2217   """
2218   try:
2219     hv_type = hypervisor.GetHypervisor(hvname)
2220     hv_type.ValidateParameters(hvparams)
2221     return (True, "Validation passed")
2222   except errors.HypervisorError, err:
2223     return (False, str(err))
2224
2225
2226 def DemoteFromMC():
2227   """Demotes the current node from master candidate role.
2228
2229   """
2230   # try to ensure we're not the master by mistake
2231   master, myself = ssconf.GetMasterAndMyself()
2232   if master == myself:
2233     return (False, "ssconf status shows I'm the master node, will not demote")
2234   pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2235   if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2236     return (False, "The master daemon is running, will not demote")
2237   try:
2238     utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2239   except EnvironmentError, err:
2240     if err.errno != errno.ENOENT:
2241       return (False, "Error while backing up cluster file: %s" % str(err))
2242   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2243   return (True, "Done")
2244
2245
2246 def _FindDisks(nodes_ip, disks):
2247   """Sets the physical ID on disks and returns the block devices.
2248
2249   """
2250   # set the correct physical ID
2251   my_name = utils.HostInfo().name
2252   for cf in disks:
2253     cf.SetPhysicalID(my_name, nodes_ip)
2254
2255   bdevs = []
2256
2257   for cf in disks:
2258     rd = _RecursiveFindBD(cf)
2259     if rd is None:
2260       return (False, "Can't find device %s" % cf)
2261     bdevs.append(rd)
2262   return (True, bdevs)
2263
2264
2265 def DrbdDisconnectNet(nodes_ip, disks):
2266   """Disconnects the network on a list of drbd devices.
2267
2268   """
2269   status, bdevs = _FindDisks(nodes_ip, disks)
2270   if not status:
2271     return status, bdevs
2272
2273   # disconnect disks
2274   for rd in bdevs:
2275     try:
2276       rd.DisconnectNet()
2277     except errors.BlockDeviceError, err:
2278       logging.exception("Failed to go into standalone mode")
2279       return (False, "Can't change network configuration: %s" % str(err))
2280   return (True, "All disks are now disconnected")
2281
2282
2283 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2284   """Attaches the network on a list of drbd devices.
2285
2286   """
2287   status, bdevs = _FindDisks(nodes_ip, disks)
2288   if not status:
2289     return status, bdevs
2290
2291   if multimaster:
2292     for idx, rd in enumerate(bdevs):
2293       try:
2294         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2295       except EnvironmentError, err:
2296         return (False, "Can't create symlink: %s" % str(err))
2297   # reconnect disks, switch to new master configuration and if
2298   # needed primary mode
2299   for rd in bdevs:
2300     try:
2301       rd.AttachNet(multimaster)
2302     except errors.BlockDeviceError, err:
2303       return (False, "Can't change network configuration: %s" % str(err))
2304   # wait until the disks are connected; we need to retry the re-attach
2305   # if the device becomes standalone, as this might happen if the one
2306   # node disconnects and reconnects in a different mode before the
2307   # other node reconnects; in this case, one or both of the nodes will
2308   # decide it has wrong configuration and switch to standalone
2309   RECONNECT_TIMEOUT = 2 * 60
2310   sleep_time = 0.100 # start with 100 miliseconds
2311   timeout_limit = time.time() + RECONNECT_TIMEOUT
2312   while time.time() < timeout_limit:
2313     all_connected = True
2314     for rd in bdevs:
2315       stats = rd.GetProcStatus()
2316       if not (stats.is_connected or stats.is_in_resync):
2317         all_connected = False
2318       if stats.is_standalone:
2319         # peer had different config info and this node became
2320         # standalone, even though this should not happen with the
2321         # new staged way of changing disk configs
2322         try:
2323           rd.ReAttachNet(multimaster)
2324         except errors.BlockDeviceError, err:
2325           return (False, "Can't change network configuration: %s" % str(err))
2326     if all_connected:
2327       break
2328     time.sleep(sleep_time)
2329     sleep_time = min(5, sleep_time * 1.5)
2330   if not all_connected:
2331     return (False, "Timeout in disk reconnecting")
2332   if multimaster:
2333     # change to primary mode
2334     for rd in bdevs:
2335       rd.Open()
2336   if multimaster:
2337     msg = "multi-master and primary"
2338   else:
2339     msg = "single-master"
2340   return (True, "Disks are now configured as %s" % msg)
2341
2342
2343 def DrbdWaitSync(nodes_ip, disks):
2344   """Wait until DRBDs have synchronized.
2345
2346   """
2347   status, bdevs = _FindDisks(nodes_ip, disks)
2348   if not status:
2349     return status, bdevs
2350
2351   min_resync = 100
2352   alldone = True
2353   failure = False
2354   for rd in bdevs:
2355     stats = rd.GetProcStatus()
2356     if not (stats.is_connected or stats.is_in_resync):
2357       failure = True
2358       break
2359     alldone = alldone and (not stats.is_in_resync)
2360     if stats.sync_percent is not None:
2361       min_resync = min(min_resync, stats.sync_percent)
2362   return (not failure, (alldone, min_resync))
2363
2364
2365 class HooksRunner(object):
2366   """Hook runner.
2367
2368   This class is instantiated on the node side (ganeti-noded) and not
2369   on the master side.
2370
2371   """
2372   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2373
2374   def __init__(self, hooks_base_dir=None):
2375     """Constructor for hooks runner.
2376
2377     @type hooks_base_dir: str or None
2378     @param hooks_base_dir: if not None, this overrides the
2379         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2380
2381     """
2382     if hooks_base_dir is None:
2383       hooks_base_dir = constants.HOOKS_BASE_DIR
2384     self._BASE_DIR = hooks_base_dir
2385
2386   @staticmethod
2387   def ExecHook(script, env):
2388     """Exec one hook script.
2389
2390     @type script: str
2391     @param script: the full path to the script
2392     @type env: dict
2393     @param env: the environment with which to exec the script
2394     @rtype: tuple (success, message)
2395     @return: a tuple of success and message, where success
2396         indicates the succes of the operation, and message
2397         which will contain the error details in case we
2398         failed
2399
2400     """
2401     # exec the process using subprocess and log the output
2402     fdstdin = None
2403     try:
2404       fdstdin = open("/dev/null", "r")
2405       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2406                                stderr=subprocess.STDOUT, close_fds=True,
2407                                shell=False, cwd="/", env=env)
2408       output = ""
2409       try:
2410         output = child.stdout.read(4096)
2411         child.stdout.close()
2412       except EnvironmentError, err:
2413         output += "Hook script error: %s" % str(err)
2414
2415       while True:
2416         try:
2417           result = child.wait()
2418           break
2419         except EnvironmentError, err:
2420           if err.errno == errno.EINTR:
2421             continue
2422           raise
2423     finally:
2424       # try not to leak fds
2425       for fd in (fdstdin, ):
2426         if fd is not None:
2427           try:
2428             fd.close()
2429           except EnvironmentError, err:
2430             # just log the error
2431             #logging.exception("Error while closing fd %s", fd)
2432             pass
2433
2434     return result == 0, output
2435
2436   def RunHooks(self, hpath, phase, env):
2437     """Run the scripts in the hooks directory.
2438
2439     @type hpath: str
2440     @param hpath: the path to the hooks directory which
2441         holds the scripts
2442     @type phase: str
2443     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2444         L{constants.HOOKS_PHASE_POST}
2445     @type env: dict
2446     @param env: dictionary with the environment for the hook
2447     @rtype: list
2448     @return: list of 3-element tuples:
2449       - script path
2450       - script result, either L{constants.HKR_SUCCESS} or
2451         L{constants.HKR_FAIL}
2452       - output of the script
2453
2454     @raise errors.ProgrammerError: for invalid input
2455         parameters
2456
2457     """
2458     if phase == constants.HOOKS_PHASE_PRE:
2459       suffix = "pre"
2460     elif phase == constants.HOOKS_PHASE_POST:
2461       suffix = "post"
2462     else:
2463       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2464     rr = []
2465
2466     subdir = "%s-%s.d" % (hpath, suffix)
2467     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2468     try:
2469       dir_contents = utils.ListVisibleFiles(dir_name)
2470     except OSError, err:
2471       # FIXME: must log output in case of failures
2472       return rr
2473
2474     # we use the standard python sort order,
2475     # so 00name is the recommended naming scheme
2476     dir_contents.sort()
2477     for relname in dir_contents:
2478       fname = os.path.join(dir_name, relname)
2479       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2480           self.RE_MASK.match(relname) is not None):
2481         rrval = constants.HKR_SKIP
2482         output = ""
2483       else:
2484         result, output = self.ExecHook(fname, env)
2485         if not result:
2486           rrval = constants.HKR_FAIL
2487         else:
2488           rrval = constants.HKR_SUCCESS
2489       rr.append(("%s/%s" % (subdir, relname), rrval, output))
2490
2491     return rr
2492
2493
2494 class IAllocatorRunner(object):
2495   """IAllocator runner.
2496
2497   This class is instantiated on the node side (ganeti-noded) and not on
2498   the master side.
2499
2500   """
2501   def Run(self, name, idata):
2502     """Run an iallocator script.
2503
2504     @type name: str
2505     @param name: the iallocator script name
2506     @type idata: str
2507     @param idata: the allocator input data
2508
2509     @rtype: tuple
2510     @return: four element tuple of:
2511        - run status (one of the IARUN_ constants)
2512        - stdout
2513        - stderr
2514        - fail reason (as from L{utils.RunResult})
2515
2516     """
2517     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2518                                   os.path.isfile)
2519     if alloc_script is None:
2520       return (constants.IARUN_NOTFOUND, None, None, None)
2521
2522     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2523     try:
2524       os.write(fd, idata)
2525       os.close(fd)
2526       result = utils.RunCmd([alloc_script, fin_name])
2527       if result.failed:
2528         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2529                 result.fail_reason)
2530     finally:
2531       os.unlink(fin_name)
2532
2533     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2534
2535
2536 class DevCacheManager(object):
2537   """Simple class for managing a cache of block device information.
2538
2539   """
2540   _DEV_PREFIX = "/dev/"
2541   _ROOT_DIR = constants.BDEV_CACHE_DIR
2542
2543   @classmethod
2544   def _ConvertPath(cls, dev_path):
2545     """Converts a /dev/name path to the cache file name.
2546
2547     This replaces slashes with underscores and strips the /dev
2548     prefix. It then returns the full path to the cache file.
2549
2550     @type dev_path: str
2551     @param dev_path: the C{/dev/} path name
2552     @rtype: str
2553     @return: the converted path name
2554
2555     """
2556     if dev_path.startswith(cls._DEV_PREFIX):
2557       dev_path = dev_path[len(cls._DEV_PREFIX):]
2558     dev_path = dev_path.replace("/", "_")
2559     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2560     return fpath
2561
2562   @classmethod
2563   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2564     """Updates the cache information for a given device.
2565
2566     @type dev_path: str
2567     @param dev_path: the pathname of the device
2568     @type owner: str
2569     @param owner: the owner (instance name) of the device
2570     @type on_primary: bool
2571     @param on_primary: whether this is the primary
2572         node nor not
2573     @type iv_name: str
2574     @param iv_name: the instance-visible name of the
2575         device, as in objects.Disk.iv_name
2576
2577     @rtype: None
2578
2579     """
2580     if dev_path is None:
2581       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2582       return
2583     fpath = cls._ConvertPath(dev_path)
2584     if on_primary:
2585       state = "primary"
2586     else:
2587       state = "secondary"
2588     if iv_name is None:
2589       iv_name = "not_visible"
2590     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2591     try:
2592       utils.WriteFile(fpath, data=fdata)
2593     except EnvironmentError, err:
2594       logging.exception("Can't update bdev cache for %s", dev_path)
2595
2596   @classmethod
2597   def RemoveCache(cls, dev_path):
2598     """Remove data for a dev_path.
2599
2600     This is just a wrapper over L{utils.RemoveFile} with a converted
2601     path name and logging.
2602
2603     @type dev_path: str
2604     @param dev_path: the pathname of the device
2605
2606     @rtype: None
2607
2608     """
2609     if dev_path is None:
2610       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2611       return
2612     fpath = cls._ConvertPath(dev_path)
2613     try:
2614       utils.RemoveFile(fpath)
2615     except EnvironmentError, err:
2616       logging.exception("Can't update bdev cache for %s", dev_path)