Fix some more pylint errors
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36 import zlib
37 import base64
38
39 from ganeti import errors
40 from ganeti import utils
41 from ganeti import ssh
42 from ganeti import hypervisor
43 from ganeti import constants
44 from ganeti import bdev
45 from ganeti import objects
46 from ganeti import ssconf
47
48
49 def _GetConfig():
50   """Simple wrapper to return a SimpleStore.
51
52   @rtype: L{ssconf.SimpleStore}
53   @return: a SimpleStore instance
54
55   """
56   return ssconf.SimpleStore()
57
58
59 def _GetSshRunner(cluster_name):
60   """Simple wrapper to return an SshRunner.
61
62   @type cluster_name: str
63   @param cluster_name: the cluster name, which is needed
64       by the SshRunner constructor
65   @rtype: L{ssh.SshRunner}
66   @return: an SshRunner instance
67
68   """
69   return ssh.SshRunner(cluster_name)
70
71
72 def _Decompress(data):
73   """Unpacks data compressed by the RPC client.
74
75   @type data: list or tuple
76   @param data: Data sent by RPC client
77   @rtype: str
78   @return: Decompressed data
79
80   """
81   assert isinstance(data, (list, tuple))
82   assert len(data) == 2
83   (encoding, content) = data
84   if encoding == constants.RPC_ENCODING_NONE:
85     return content
86   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
87     return zlib.decompress(base64.b64decode(content))
88   else:
89     raise AssertionError("Unknown data encoding")
90
91
92 def _CleanDirectory(path, exclude=None):
93   """Removes all regular files in a directory.
94
95   @type path: str
96   @param path: the directory to clean
97   @type exclude: list
98   @param exclude: list of files to be excluded, defaults
99       to the empty list
100
101   """
102   if not os.path.isdir(path):
103     return
104   if exclude is None:
105     exclude = []
106   else:
107     # Normalize excluded paths
108     exclude = [os.path.normpath(i) for i in exclude]
109
110   for rel_name in utils.ListVisibleFiles(path):
111     full_name = os.path.normpath(os.path.join(path, rel_name))
112     if full_name in exclude:
113       continue
114     if os.path.isfile(full_name) and not os.path.islink(full_name):
115       utils.RemoveFile(full_name)
116
117
118 def JobQueuePurge():
119   """Removes job queue files and archived jobs.
120
121   @rtype: None
122
123   """
124   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
125   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
126
127
128 def GetMasterInfo():
129   """Returns master information.
130
131   This is an utility function to compute master information, either
132   for consumption here or from the node daemon.
133
134   @rtype: tuple
135   @return: (master_netdev, master_ip, master_name) if we have a good
136       configuration, otherwise (None, None, None)
137
138   """
139   try:
140     cfg = _GetConfig()
141     master_netdev = cfg.GetMasterNetdev()
142     master_ip = cfg.GetMasterIP()
143     master_node = cfg.GetMasterNode()
144   except errors.ConfigurationError, err:
145     logging.exception("Cluster configuration incomplete")
146     return (None, None, None)
147   return (master_netdev, master_ip, master_node)
148
149
150 def StartMaster(start_daemons):
151   """Activate local node as master node.
152
153   The function will always try activate the IP address of the master
154   (unless someone else has it). It will also start the master daemons,
155   based on the start_daemons parameter.
156
157   @type start_daemons: boolean
158   @param start_daemons: whther to also start the master
159       daemons (ganeti-masterd and ganeti-rapi)
160   @rtype: None
161
162   """
163   ok = True
164   master_netdev, master_ip, _ = GetMasterInfo()
165   if not master_netdev:
166     return False
167
168   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
169     if utils.OwnIpAddress(master_ip):
170       # we already have the ip:
171       logging.debug("Already started")
172     else:
173       logging.error("Someone else has the master ip, not activating")
174       ok = False
175   else:
176     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
177                            "dev", master_netdev, "label",
178                            "%s:0" % master_netdev])
179     if result.failed:
180       logging.error("Can't activate master IP: %s", result.output)
181       ok = False
182
183     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
184                            "-s", master_ip, master_ip])
185     # we'll ignore the exit code of arping
186
187   # and now start the master and rapi daemons
188   if start_daemons:
189     for daemon in 'ganeti-masterd', 'ganeti-rapi':
190       result = utils.RunCmd([daemon])
191       if result.failed:
192         logging.error("Can't start daemon %s: %s", daemon, result.output)
193         ok = False
194   return ok
195
196
197 def StopMaster(stop_daemons):
198   """Deactivate this node as master.
199
200   The function will always try to deactivate the IP address of the
201   master. It will also stop the master daemons depending on the
202   stop_daemons parameter.
203
204   @type stop_daemons: boolean
205   @param stop_daemons: whether to also stop the master daemons
206       (ganeti-masterd and ganeti-rapi)
207   @rtype: None
208
209   """
210   master_netdev, master_ip, _ = GetMasterInfo()
211   if not master_netdev:
212     return False
213
214   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
215                          "dev", master_netdev])
216   if result.failed:
217     logging.error("Can't remove the master IP, error: %s", result.output)
218     # but otherwise ignore the failure
219
220   if stop_daemons:
221     # stop/kill the rapi and the master daemon
222     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
223       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
224
225   return True
226
227
228 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
229   """Joins this node to the cluster.
230
231   This does the following:
232       - updates the hostkeys of the machine (rsa and dsa)
233       - adds the ssh private key to the user
234       - adds the ssh public key to the users' authorized_keys file
235
236   @type dsa: str
237   @param dsa: the DSA private key to write
238   @type dsapub: str
239   @param dsapub: the DSA public key to write
240   @type rsa: str
241   @param rsa: the RSA private key to write
242   @type rsapub: str
243   @param rsapub: the RSA public key to write
244   @type sshkey: str
245   @param sshkey: the SSH private key to write
246   @type sshpub: str
247   @param sshpub: the SSH public key to write
248   @rtype: boolean
249   @return: the success of the operation
250
251   """
252   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
253                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
254                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
255                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
256   for name, content, mode in sshd_keys:
257     utils.WriteFile(name, data=content, mode=mode)
258
259   try:
260     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
261                                                     mkdir=True)
262   except errors.OpExecError, err:
263     logging.exception("Error while processing user ssh files")
264     return False
265
266   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
267     utils.WriteFile(name, data=content, mode=0600)
268
269   utils.AddAuthorizedKey(auth_keys, sshpub)
270
271   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
272
273   return True
274
275
276 def LeaveCluster():
277   """Cleans up and remove the current node.
278
279   This function cleans up and prepares the current node to be removed
280   from the cluster.
281
282   If processing is successful, then it raises an
283   L{errors.QuitGanetiException} which is used as a special case to
284   shutdown the node daemon.
285
286   """
287   _CleanDirectory(constants.DATA_DIR)
288   JobQueuePurge()
289
290   try:
291     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
292   except errors.OpExecError:
293     logging.exception("Error while processing ssh files")
294     return
295
296   f = open(pub_key, 'r')
297   try:
298     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
299   finally:
300     f.close()
301
302   utils.RemoveFile(priv_key)
303   utils.RemoveFile(pub_key)
304
305   # Return a reassuring string to the caller, and quit
306   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
307
308
309 def GetNodeInfo(vgname, hypervisor_type):
310   """Gives back a hash with different informations about the node.
311
312   @type vgname: C{string}
313   @param vgname: the name of the volume group to ask for disk space information
314   @type hypervisor_type: C{str}
315   @param hypervisor_type: the name of the hypervisor to ask for
316       memory information
317   @rtype: C{dict}
318   @return: dictionary with the following keys:
319       - vg_size is the size of the configured volume group in MiB
320       - vg_free is the free size of the volume group in MiB
321       - memory_dom0 is the memory allocated for domain0 in MiB
322       - memory_free is the currently available (free) ram in MiB
323       - memory_total is the total number of ram in MiB
324
325   """
326   outputarray = {}
327   vginfo = _GetVGInfo(vgname)
328   outputarray['vg_size'] = vginfo['vg_size']
329   outputarray['vg_free'] = vginfo['vg_free']
330
331   hyper = hypervisor.GetHypervisor(hypervisor_type)
332   hyp_info = hyper.GetNodeInfo()
333   if hyp_info is not None:
334     outputarray.update(hyp_info)
335
336   f = open("/proc/sys/kernel/random/boot_id", 'r')
337   try:
338     outputarray["bootid"] = f.read(128).rstrip("\n")
339   finally:
340     f.close()
341
342   return outputarray
343
344
345 def VerifyNode(what, cluster_name):
346   """Verify the status of the local node.
347
348   Based on the input L{what} parameter, various checks are done on the
349   local node.
350
351   If the I{filelist} key is present, this list of
352   files is checksummed and the file/checksum pairs are returned.
353
354   If the I{nodelist} key is present, we check that we have
355   connectivity via ssh with the target nodes (and check the hostname
356   report).
357
358   If the I{node-net-test} key is present, we check that we have
359   connectivity to the given nodes via both primary IP and, if
360   applicable, secondary IPs.
361
362   @type what: C{dict}
363   @param what: a dictionary of things to check:
364       - filelist: list of files for which to compute checksums
365       - nodelist: list of nodes we should check ssh communication with
366       - node-net-test: list of nodes we should check node daemon port
367         connectivity with
368       - hypervisor: list with hypervisors to run the verify for
369   @rtype: dict
370   @return: a dictionary with the same keys as the input dict, and
371       values representing the result of the checks
372
373   """
374   result = {}
375
376   if constants.NV_HYPERVISOR in what:
377     result[constants.NV_HYPERVISOR] = tmp = {}
378     for hv_name in what[constants.NV_HYPERVISOR]:
379       tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
380
381   if constants.NV_FILELIST in what:
382     result[constants.NV_FILELIST] = utils.FingerprintFiles(
383       what[constants.NV_FILELIST])
384
385   if constants.NV_NODELIST in what:
386     result[constants.NV_NODELIST] = tmp = {}
387     random.shuffle(what[constants.NV_NODELIST])
388     for node in what[constants.NV_NODELIST]:
389       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
390       if not success:
391         tmp[node] = message
392
393   if constants.NV_NODENETTEST in what:
394     result[constants.NV_NODENETTEST] = tmp = {}
395     my_name = utils.HostInfo().name
396     my_pip = my_sip = None
397     for name, pip, sip in what[constants.NV_NODENETTEST]:
398       if name == my_name:
399         my_pip = pip
400         my_sip = sip
401         break
402     if not my_pip:
403       tmp[my_name] = ("Can't find my own primary/secondary IP"
404                       " in the node list")
405     else:
406       port = utils.GetNodeDaemonPort()
407       for name, pip, sip in what[constants.NV_NODENETTEST]:
408         fail = []
409         if not utils.TcpPing(pip, port, source=my_pip):
410           fail.append("primary")
411         if sip != pip:
412           if not utils.TcpPing(sip, port, source=my_sip):
413             fail.append("secondary")
414         if fail:
415           tmp[name] = ("failure using the %s interface(s)" %
416                        " and ".join(fail))
417
418   if constants.NV_LVLIST in what:
419     result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
420
421   if constants.NV_INSTANCELIST in what:
422     result[constants.NV_INSTANCELIST] = GetInstanceList(
423       what[constants.NV_INSTANCELIST])
424
425   if constants.NV_VGLIST in what:
426     result[constants.NV_VGLIST] = ListVolumeGroups()
427
428   if constants.NV_VERSION in what:
429     result[constants.NV_VERSION] = constants.PROTOCOL_VERSION
430
431   if constants.NV_HVINFO in what:
432     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
433     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
434
435   if constants.NV_DRBDLIST in what:
436     try:
437       used_minors = bdev.DRBD8.GetUsedDevs().keys()
438     except errors.BlockDeviceError:
439       logging.warning("Can't get used minors list", exc_info=True)
440       used_minors = []
441     result[constants.NV_DRBDLIST] = used_minors
442
443   return result
444
445
446 def GetVolumeList(vg_name):
447   """Compute list of logical volumes and their size.
448
449   @type vg_name: str
450   @param vg_name: the volume group whose LVs we should list
451   @rtype: dict
452   @return:
453       dictionary of all partions (key) with value being a tuple of
454       their size (in MiB), inactive and online status::
455
456         {'test1': ('20.06', True, True)}
457
458       in case of errors, a string is returned with the error
459       details.
460
461   """
462   lvs = {}
463   sep = '|'
464   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
465                          "--separator=%s" % sep,
466                          "-olv_name,lv_size,lv_attr", vg_name])
467   if result.failed:
468     logging.error("Failed to list logical volumes, lvs output: %s",
469                   result.output)
470     return result.output
471
472   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
473   for line in result.stdout.splitlines():
474     line = line.strip()
475     match = valid_line_re.match(line)
476     if not match:
477       logging.error("Invalid line returned from lvs output: '%s'", line)
478       continue
479     name, size, attr = match.groups()
480     inactive = attr[4] == '-'
481     online = attr[5] == 'o'
482     lvs[name] = (size, inactive, online)
483
484   return lvs
485
486
487 def ListVolumeGroups():
488   """List the volume groups and their size.
489
490   @rtype: dict
491   @return: dictionary with keys volume name and values the
492       size of the volume
493
494   """
495   return utils.ListVolumeGroups()
496
497
498 def NodeVolumes():
499   """List all volumes on this node.
500
501   @rtype: list
502   @return:
503     A list of dictionaries, each having four keys:
504       - name: the logical volume name,
505       - size: the size of the logical volume
506       - dev: the physical device on which the LV lives
507       - vg: the volume group to which it belongs
508
509     In case of errors, we return an empty list and log the
510     error.
511
512     Note that since a logical volume can live on multiple physical
513     volumes, the resulting list might include a logical volume
514     multiple times.
515
516   """
517   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
518                          "--separator=|",
519                          "--options=lv_name,lv_size,devices,vg_name"])
520   if result.failed:
521     logging.error("Failed to list logical volumes, lvs output: %s",
522                   result.output)
523     return []
524
525   def parse_dev(dev):
526     if '(' in dev:
527       return dev.split('(')[0]
528     else:
529       return dev
530
531   def map_line(line):
532     return {
533       'name': line[0].strip(),
534       'size': line[1].strip(),
535       'dev': parse_dev(line[2].strip()),
536       'vg': line[3].strip(),
537     }
538
539   return [map_line(line.split('|')) for line in result.stdout.splitlines()
540           if line.count('|') >= 3]
541
542
543 def BridgesExist(bridges_list):
544   """Check if a list of bridges exist on the current node.
545
546   @rtype: boolean
547   @return: C{True} if all of them exist, C{False} otherwise
548
549   """
550   for bridge in bridges_list:
551     if not utils.BridgeExists(bridge):
552       return False
553
554   return True
555
556
557 def GetInstanceList(hypervisor_list):
558   """Provides a list of instances.
559
560   @type hypervisor_list: list
561   @param hypervisor_list: the list of hypervisors to query information
562
563   @rtype: list
564   @return: a list of all running instances on the current node
565     - instance1.example.com
566     - instance2.example.com
567
568   """
569   results = []
570   for hname in hypervisor_list:
571     try:
572       names = hypervisor.GetHypervisor(hname).ListInstances()
573       results.extend(names)
574     except errors.HypervisorError, err:
575       logging.exception("Error enumerating instances for hypevisor %s", hname)
576       # FIXME: should we somehow not propagate this to the master?
577       raise
578
579   return results
580
581
582 def GetInstanceInfo(instance, hname):
583   """Gives back the informations about an instance as a dictionary.
584
585   @type instance: string
586   @param instance: the instance name
587   @type hname: string
588   @param hname: the hypervisor type of the instance
589
590   @rtype: dict
591   @return: dictionary with the following keys:
592       - memory: memory size of instance (int)
593       - state: xen state of instance (string)
594       - time: cpu time of instance (float)
595
596   """
597   output = {}
598
599   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
600   if iinfo is not None:
601     output['memory'] = iinfo[2]
602     output['state'] = iinfo[4]
603     output['time'] = iinfo[5]
604
605   return output
606
607
608 def GetInstanceMigratable(instance):
609   """Gives whether an instance can be migrated.
610
611   @type instance: L{objects.Instance}
612   @param instance: object representing the instance to be checked.
613
614   @rtype: tuple
615   @return: tuple of (result, description) where:
616       - result: whether the instance can be migrated or not
617       - description: a description of the issue, if relevant
618
619   """
620   hyper = hypervisor.GetHypervisor(instance.hypervisor)
621   if instance.name not in hyper.ListInstances():
622     return (False, 'not running')
623
624   for idx in range(len(instance.disks)):
625     link_name = _GetBlockDevSymlinkPath(instance.name, idx)
626     if not os.path.islink(link_name):
627       return (False, 'not restarted since ganeti 1.2.5')
628
629   return (True, '')
630
631
632 def GetAllInstancesInfo(hypervisor_list):
633   """Gather data about all instances.
634
635   This is the equivalent of L{GetInstanceInfo}, except that it
636   computes data for all instances at once, thus being faster if one
637   needs data about more than one instance.
638
639   @type hypervisor_list: list
640   @param hypervisor_list: list of hypervisors to query for instance data
641
642   @rtype: dict
643   @return: dictionary of instance: data, with data having the following keys:
644       - memory: memory size of instance (int)
645       - state: xen state of instance (string)
646       - time: cpu time of instance (float)
647       - vcpus: the number of vcpus
648
649   """
650   output = {}
651
652   for hname in hypervisor_list:
653     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
654     if iinfo:
655       for name, inst_id, memory, vcpus, state, times in iinfo:
656         value = {
657           'memory': memory,
658           'vcpus': vcpus,
659           'state': state,
660           'time': times,
661           }
662         if name in output and output[name] != value:
663           raise errors.HypervisorError("Instance %s running duplicate"
664                                        " with different parameters" % name)
665         output[name] = value
666
667   return output
668
669
670 def AddOSToInstance(instance):
671   """Add an OS to an instance.
672
673   @type instance: L{objects.Instance}
674   @param instance: Instance whose OS is to be installed
675   @rtype: boolean
676   @return: the success of the operation
677
678   """
679   inst_os = OSFromDisk(instance.os)
680
681   create_env = OSEnvironment(instance)
682
683   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
684                                      instance.name, int(time.time()))
685
686   result = utils.RunCmd([inst_os.create_script], env=create_env,
687                         cwd=inst_os.path, output=logfile,)
688   if result.failed:
689     logging.error("os create command '%s' returned error: %s, logfile: %s,"
690                   " output: %s", result.cmd, result.fail_reason, logfile,
691                   result.output)
692     lines = [val.encode("string_escape")
693              for val in utils.TailFile(logfile, lines=20)]
694     return (False, "OS create script failed (%s), last lines in the"
695             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
696
697   return (True, "Successfully installed")
698
699
700 def RunRenameInstance(instance, old_name):
701   """Run the OS rename script for an instance.
702
703   @type instance: L{objects.Instance}
704   @param instance: Instance whose OS is to be installed
705   @type old_name: string
706   @param old_name: previous instance name
707   @rtype: boolean
708   @return: the success of the operation
709
710   """
711   inst_os = OSFromDisk(instance.os)
712
713   rename_env = OSEnvironment(instance)
714   rename_env['OLD_INSTANCE_NAME'] = old_name
715
716   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
717                                            old_name,
718                                            instance.name, int(time.time()))
719
720   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
721                         cwd=inst_os.path, output=logfile)
722
723   if result.failed:
724     logging.error("os create command '%s' returned error: %s output: %s",
725                   result.cmd, result.fail_reason, result.output)
726     lines = [val.encode("string_escape")
727              for val in utils.TailFile(logfile, lines=20)]
728     return (False, "OS rename script failed (%s), last lines in the"
729             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
730
731   return (True, "Rename successful")
732
733
734 def _GetVGInfo(vg_name):
735   """Get informations about the volume group.
736
737   @type vg_name: str
738   @param vg_name: the volume group which we query
739   @rtype: dict
740   @return:
741     A dictionary with the following keys:
742       - C{vg_size} is the total size of the volume group in MiB
743       - C{vg_free} is the free size of the volume group in MiB
744       - C{pv_count} are the number of physical disks in that VG
745
746     If an error occurs during gathering of data, we return the same dict
747     with keys all set to None.
748
749   """
750   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
751
752   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
753                          "--nosuffix", "--units=m", "--separator=:", vg_name])
754
755   if retval.failed:
756     logging.error("volume group %s not present", vg_name)
757     return retdic
758   valarr = retval.stdout.strip().rstrip(':').split(':')
759   if len(valarr) == 3:
760     try:
761       retdic = {
762         "vg_size": int(round(float(valarr[0]), 0)),
763         "vg_free": int(round(float(valarr[1]), 0)),
764         "pv_count": int(valarr[2]),
765         }
766     except ValueError, err:
767       logging.exception("Fail to parse vgs output")
768   else:
769     logging.error("vgs output has the wrong number of fields (expected"
770                   " three): %s", str(valarr))
771   return retdic
772
773
774 def _GetBlockDevSymlinkPath(instance_name, idx):
775   return os.path.join(constants.DISK_LINKS_DIR,
776                       "%s:%d" % (instance_name, idx))
777
778
779 def _SymlinkBlockDev(instance_name, device_path, idx):
780   """Set up symlinks to a instance's block device.
781
782   This is an auxiliary function run when an instance is start (on the primary
783   node) or when an instance is migrated (on the target node).
784
785
786   @param instance_name: the name of the target instance
787   @param device_path: path of the physical block device, on the node
788   @param idx: the disk index
789   @return: absolute path to the disk's symlink
790
791   """
792   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
793   try:
794     os.symlink(device_path, link_name)
795   except OSError, err:
796     if err.errno == errno.EEXIST:
797       if (not os.path.islink(link_name) or
798           os.readlink(link_name) != device_path):
799         os.remove(link_name)
800         os.symlink(device_path, link_name)
801     else:
802       raise
803
804   return link_name
805
806
807 def _RemoveBlockDevLinks(instance_name, disks):
808   """Remove the block device symlinks belonging to the given instance.
809
810   """
811   for idx, disk in enumerate(disks):
812     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
813     if os.path.islink(link_name):
814       try:
815         os.remove(link_name)
816       except OSError:
817         logging.exception("Can't remove symlink '%s'", link_name)
818
819
820 def _GatherAndLinkBlockDevs(instance):
821   """Set up an instance's block device(s).
822
823   This is run on the primary node at instance startup. The block
824   devices must be already assembled.
825
826   @type instance: L{objects.Instance}
827   @param instance: the instance whose disks we shoul assemble
828   @rtype: list
829   @return: list of (disk_object, device_path)
830
831   """
832   block_devices = []
833   for idx, disk in enumerate(instance.disks):
834     device = _RecursiveFindBD(disk)
835     if device is None:
836       raise errors.BlockDeviceError("Block device '%s' is not set up." %
837                                     str(disk))
838     device.Open()
839     try:
840       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
841     except OSError, e:
842       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
843                                     e.strerror)
844
845     block_devices.append((disk, link_name))
846
847   return block_devices
848
849
850 def StartInstance(instance, extra_args):
851   """Start an instance.
852
853   @type instance: L{objects.Instance}
854   @param instance: the instance object
855   @rtype: boolean
856   @return: whether the startup was successful or not
857
858   """
859   running_instances = GetInstanceList([instance.hypervisor])
860
861   if instance.name in running_instances:
862     return (True, "Already running")
863
864   try:
865     block_devices = _GatherAndLinkBlockDevs(instance)
866     hyper = hypervisor.GetHypervisor(instance.hypervisor)
867     hyper.StartInstance(instance, block_devices, extra_args)
868   except errors.BlockDeviceError, err:
869     logging.exception("Failed to start instance")
870     return (False, "Block device error: %s" % str(err))
871   except errors.HypervisorError, err:
872     logging.exception("Failed to start instance")
873     _RemoveBlockDevLinks(instance.name, instance.disks)
874     return (False, "Hypervisor error: %s" % str(err))
875
876   return (True, "Instance started successfully")
877
878
879 def ShutdownInstance(instance):
880   """Shut an instance down.
881
882   @note: this functions uses polling with a hardcoded timeout.
883
884   @type instance: L{objects.Instance}
885   @param instance: the instance object
886   @rtype: boolean
887   @return: whether the startup was successful or not
888
889   """
890   hv_name = instance.hypervisor
891   running_instances = GetInstanceList([hv_name])
892
893   if instance.name not in running_instances:
894     return True
895
896   hyper = hypervisor.GetHypervisor(hv_name)
897   try:
898     hyper.StopInstance(instance)
899   except errors.HypervisorError, err:
900     logging.error("Failed to stop instance: %s" % err)
901     return False
902
903   # test every 10secs for 2min
904
905   time.sleep(1)
906   for dummy in range(11):
907     if instance.name not in GetInstanceList([hv_name]):
908       break
909     time.sleep(10)
910   else:
911     # the shutdown did not succeed
912     logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
913
914     try:
915       hyper.StopInstance(instance, force=True)
916     except errors.HypervisorError, err:
917       logging.exception("Failed to stop instance: %s" % err)
918       return False
919
920     time.sleep(1)
921     if instance.name in GetInstanceList([hv_name]):
922       logging.error("could not shutdown instance '%s' even by destroy",
923                     instance.name)
924       return False
925
926   _RemoveBlockDevLinks(instance.name, instance.disks)
927
928   return True
929
930
931 def RebootInstance(instance, reboot_type, extra_args):
932   """Reboot an instance.
933
934   @type instance: L{objects.Instance}
935   @param instance: the instance object to reboot
936   @type reboot_type: str
937   @param reboot_type: the type of reboot, one the following
938     constants:
939       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
940         instance OS, do not recreate the VM
941       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
942         restart the VM (at the hypervisor level)
943       - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
944         is not accepted here, since that mode is handled
945         differently
946   @rtype: boolean
947   @return: the success of the operation
948
949   """
950   running_instances = GetInstanceList([instance.hypervisor])
951
952   if instance.name not in running_instances:
953     logging.error("Cannot reboot instance that is not running")
954     return False
955
956   hyper = hypervisor.GetHypervisor(instance.hypervisor)
957   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
958     try:
959       hyper.RebootInstance(instance)
960     except errors.HypervisorError, err:
961       logging.exception("Failed to soft reboot instance")
962       return False
963   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
964     try:
965       ShutdownInstance(instance)
966       StartInstance(instance, extra_args)
967     except errors.HypervisorError, err:
968       logging.exception("Failed to hard reboot instance")
969       return False
970   else:
971     raise errors.ParameterError("reboot_type invalid")
972
973   return True
974
975
976 def MigrationInfo(instance):
977   """Gather information about an instance to be migrated.
978
979   @type instance: L{objects.Instance}
980   @param instance: the instance definition
981
982   """
983   hyper = hypervisor.GetHypervisor(instance.hypervisor)
984   try:
985     info = hyper.MigrationInfo(instance)
986   except errors.HypervisorError, err:
987     msg = "Failed to fetch migration information"
988     logging.exception(msg)
989     return (False, '%s: %s' % (msg, err))
990   return (True, info)
991
992
993 def AcceptInstance(instance, info, target):
994   """Prepare the node to accept an instance.
995
996   @type instance: L{objects.Instance}
997   @param instance: the instance definition
998   @type info: string/data (opaque)
999   @param info: migration information, from the source node
1000   @type target: string
1001   @param target: target host (usually ip), on this node
1002
1003   """
1004   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1005   try:
1006     hyper.AcceptInstance(instance, info, target)
1007   except errors.HypervisorError, err:
1008     msg = "Failed to accept instance"
1009     logging.exception(msg)
1010     return (False, '%s: %s' % (msg, err))
1011   return (True, "Accept successfull")
1012
1013
1014 def FinalizeMigration(instance, info, success):
1015   """Finalize any preparation to accept an instance.
1016
1017   @type instance: L{objects.Instance}
1018   @param instance: the instance definition
1019   @type info: string/data (opaque)
1020   @param info: migration information, from the source node
1021   @type success: boolean
1022   @param success: whether the migration was a success or a failure
1023
1024   """
1025   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1026   try:
1027     hyper.FinalizeMigration(instance, info, success)
1028   except errors.HypervisorError, err:
1029     msg = "Failed to finalize migration"
1030     logging.exception(msg)
1031     return (False, '%s: %s' % (msg, err))
1032   return (True, "Migration Finalized")
1033
1034
1035 def MigrateInstance(instance, target, live):
1036   """Migrates an instance to another node.
1037
1038   @type instance: L{objects.Instance}
1039   @param instance: the instance definition
1040   @type target: string
1041   @param target: the target node name
1042   @type live: boolean
1043   @param live: whether the migration should be done live or not (the
1044       interpretation of this parameter is left to the hypervisor)
1045   @rtype: tuple
1046   @return: a tuple of (success, msg) where:
1047       - succes is a boolean denoting the success/failure of the operation
1048       - msg is a string with details in case of failure
1049
1050   """
1051   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1052
1053   try:
1054     hyper.MigrateInstance(instance.name, target, live)
1055   except errors.HypervisorError, err:
1056     msg = "Failed to migrate instance"
1057     logging.exception(msg)
1058     return (False, "%s: %s" % (msg, err))
1059   return (True, "Migration successfull")
1060
1061
1062 def CreateBlockDevice(disk, size, owner, on_primary, info):
1063   """Creates a block device for an instance.
1064
1065   @type disk: L{objects.Disk}
1066   @param disk: the object describing the disk we should create
1067   @type size: int
1068   @param size: the size of the physical underlying device, in MiB
1069   @type owner: str
1070   @param owner: the name of the instance for which disk is created,
1071       used for device cache data
1072   @type on_primary: boolean
1073   @param on_primary:  indicates if it is the primary node or not
1074   @type info: string
1075   @param info: string that will be sent to the physical device
1076       creation, used for example to set (LVM) tags on LVs
1077
1078   @return: the new unique_id of the device (this can sometime be
1079       computed only after creation), or None. On secondary nodes,
1080       it's not required to return anything.
1081
1082   """
1083   clist = []
1084   if disk.children:
1085     for child in disk.children:
1086       crdev = _RecursiveAssembleBD(child, owner, on_primary)
1087       if on_primary or disk.AssembleOnSecondary():
1088         # we need the children open in case the device itself has to
1089         # be assembled
1090         crdev.Open()
1091       clist.append(crdev)
1092
1093   try:
1094     device = bdev.Create(disk.dev_type, disk.physical_id, clist, size)
1095   except errors.GenericError, err:
1096     return False, "Can't create block device: %s" % str(err)
1097
1098   if on_primary or disk.AssembleOnSecondary():
1099     if not device.Assemble():
1100       errorstring = "Can't assemble device after creation, very unusual event"
1101       logging.error(errorstring)
1102       return False, errorstring
1103     device.SetSyncSpeed(constants.SYNC_SPEED)
1104     if on_primary or disk.OpenOnSecondary():
1105       device.Open(force=True)
1106     DevCacheManager.UpdateCache(device.dev_path, owner,
1107                                 on_primary, disk.iv_name)
1108
1109   device.SetInfo(info)
1110
1111   physical_id = device.unique_id
1112   return True, physical_id
1113
1114
1115 def RemoveBlockDevice(disk):
1116   """Remove a block device.
1117
1118   @note: This is intended to be called recursively.
1119
1120   @type disk: L{objects.Disk}
1121   @param disk: the disk object we should remove
1122   @rtype: boolean
1123   @return: the success of the operation
1124
1125   """
1126   try:
1127     rdev = _RecursiveFindBD(disk)
1128   except errors.BlockDeviceError, err:
1129     # probably can't attach
1130     logging.info("Can't attach to device %s in remove", disk)
1131     rdev = None
1132   if rdev is not None:
1133     r_path = rdev.dev_path
1134     result = rdev.Remove()
1135     if result:
1136       DevCacheManager.RemoveCache(r_path)
1137   else:
1138     result = True
1139   if disk.children:
1140     for child in disk.children:
1141       result = result and RemoveBlockDevice(child)
1142   return result
1143
1144
1145 def _RecursiveAssembleBD(disk, owner, as_primary):
1146   """Activate a block device for an instance.
1147
1148   This is run on the primary and secondary nodes for an instance.
1149
1150   @note: this function is called recursively.
1151
1152   @type disk: L{objects.Disk}
1153   @param disk: the disk we try to assemble
1154   @type owner: str
1155   @param owner: the name of the instance which owns the disk
1156   @type as_primary: boolean
1157   @param as_primary: if we should make the block device
1158       read/write
1159
1160   @return: the assembled device or None (in case no device
1161       was assembled)
1162   @raise errors.BlockDeviceError: in case there is an error
1163       during the activation of the children or the device
1164       itself
1165
1166   """
1167   children = []
1168   if disk.children:
1169     mcn = disk.ChildrenNeeded()
1170     if mcn == -1:
1171       mcn = 0 # max number of Nones allowed
1172     else:
1173       mcn = len(disk.children) - mcn # max number of Nones
1174     for chld_disk in disk.children:
1175       try:
1176         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1177       except errors.BlockDeviceError, err:
1178         if children.count(None) >= mcn:
1179           raise
1180         cdev = None
1181         logging.debug("Error in child activation: %s", str(err))
1182       children.append(cdev)
1183
1184   if as_primary or disk.AssembleOnSecondary():
1185     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children)
1186     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1187     result = r_dev
1188     if as_primary or disk.OpenOnSecondary():
1189       r_dev.Open()
1190     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1191                                 as_primary, disk.iv_name)
1192
1193   else:
1194     result = True
1195   return result
1196
1197
1198 def AssembleBlockDevice(disk, owner, as_primary):
1199   """Activate a block device for an instance.
1200
1201   This is a wrapper over _RecursiveAssembleBD.
1202
1203   @rtype: str or boolean
1204   @return: a C{/dev/...} path for primary nodes, and
1205       C{True} for secondary nodes
1206
1207   """
1208   result = _RecursiveAssembleBD(disk, owner, as_primary)
1209   if isinstance(result, bdev.BlockDev):
1210     result = result.dev_path
1211   return result
1212
1213
1214 def ShutdownBlockDevice(disk):
1215   """Shut down a block device.
1216
1217   First, if the device is assembled (Attach() is successfull), then
1218   the device is shutdown. Then the children of the device are
1219   shutdown.
1220
1221   This function is called recursively. Note that we don't cache the
1222   children or such, as oppossed to assemble, shutdown of different
1223   devices doesn't require that the upper device was active.
1224
1225   @type disk: L{objects.Disk}
1226   @param disk: the description of the disk we should
1227       shutdown
1228   @rtype: boolean
1229   @return: the success of the operation
1230
1231   """
1232   r_dev = _RecursiveFindBD(disk)
1233   if r_dev is not None:
1234     r_path = r_dev.dev_path
1235     result = r_dev.Shutdown()
1236     if result:
1237       DevCacheManager.RemoveCache(r_path)
1238   else:
1239     result = True
1240   if disk.children:
1241     for child in disk.children:
1242       result = result and ShutdownBlockDevice(child)
1243   return result
1244
1245
1246 def MirrorAddChildren(parent_cdev, new_cdevs):
1247   """Extend a mirrored block device.
1248
1249   @type parent_cdev: L{objects.Disk}
1250   @param parent_cdev: the disk to which we should add children
1251   @type new_cdevs: list of L{objects.Disk}
1252   @param new_cdevs: the list of children which we should add
1253   @rtype: boolean
1254   @return: the success of the operation
1255
1256   """
1257   parent_bdev = _RecursiveFindBD(parent_cdev)
1258   if parent_bdev is None:
1259     logging.error("Can't find parent device")
1260     return False
1261   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1262   if new_bdevs.count(None) > 0:
1263     logging.error("Can't find new device(s) to add: %s:%s",
1264                   new_bdevs, new_cdevs)
1265     return False
1266   parent_bdev.AddChildren(new_bdevs)
1267   return True
1268
1269
1270 def MirrorRemoveChildren(parent_cdev, new_cdevs):
1271   """Shrink a mirrored block device.
1272
1273   @type parent_cdev: L{objects.Disk}
1274   @param parent_cdev: the disk from which we should remove children
1275   @type new_cdevs: list of L{objects.Disk}
1276   @param new_cdevs: the list of children which we should remove
1277   @rtype: boolean
1278   @return: the success of the operation
1279
1280   """
1281   parent_bdev = _RecursiveFindBD(parent_cdev)
1282   if parent_bdev is None:
1283     logging.error("Can't find parent in remove children: %s", parent_cdev)
1284     return False
1285   devs = []
1286   for disk in new_cdevs:
1287     rpath = disk.StaticDevPath()
1288     if rpath is None:
1289       bd = _RecursiveFindBD(disk)
1290       if bd is None:
1291         logging.error("Can't find dynamic device %s while removing children",
1292                       disk)
1293         return False
1294       else:
1295         devs.append(bd.dev_path)
1296     else:
1297       devs.append(rpath)
1298   parent_bdev.RemoveChildren(devs)
1299   return True
1300
1301
1302 def GetMirrorStatus(disks):
1303   """Get the mirroring status of a list of devices.
1304
1305   @type disks: list of L{objects.Disk}
1306   @param disks: the list of disks which we should query
1307   @rtype: disk
1308   @return:
1309       a list of (mirror_done, estimated_time) tuples, which
1310       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1311   @raise errors.BlockDeviceError: if any of the disks cannot be
1312       found
1313
1314   """
1315   stats = []
1316   for dsk in disks:
1317     rbd = _RecursiveFindBD(dsk)
1318     if rbd is None:
1319       raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1320     stats.append(rbd.CombinedSyncStatus())
1321   return stats
1322
1323
1324 def _RecursiveFindBD(disk):
1325   """Check if a device is activated.
1326
1327   If so, return informations about the real device.
1328
1329   @type disk: L{objects.Disk}
1330   @param disk: the disk object we need to find
1331
1332   @return: None if the device can't be found,
1333       otherwise the device instance
1334
1335   """
1336   children = []
1337   if disk.children:
1338     for chdisk in disk.children:
1339       children.append(_RecursiveFindBD(chdisk))
1340
1341   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1342
1343
1344 def FindBlockDevice(disk):
1345   """Check if a device is activated.
1346
1347   If it is, return informations about the real device.
1348
1349   @type disk: L{objects.Disk}
1350   @param disk: the disk to find
1351   @rtype: None or tuple
1352   @return: None if the disk cannot be found, otherwise a
1353       tuple (device_path, major, minor, sync_percent,
1354       estimated_time, is_degraded)
1355
1356   """
1357   rbd = _RecursiveFindBD(disk)
1358   if rbd is None:
1359     return rbd
1360   return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1361
1362
1363 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1364   """Write a file to the filesystem.
1365
1366   This allows the master to overwrite(!) a file. It will only perform
1367   the operation if the file belongs to a list of configuration files.
1368
1369   @type file_name: str
1370   @param file_name: the target file name
1371   @type data: str
1372   @param data: the new contents of the file
1373   @type mode: int
1374   @param mode: the mode to give the file (can be None)
1375   @type uid: int
1376   @param uid: the owner of the file (can be -1 for default)
1377   @type gid: int
1378   @param gid: the group of the file (can be -1 for default)
1379   @type atime: float
1380   @param atime: the atime to set on the file (can be None)
1381   @type mtime: float
1382   @param mtime: the mtime to set on the file (can be None)
1383   @rtype: boolean
1384   @return: the success of the operation; errors are logged
1385       in the node daemon log
1386
1387   """
1388   if not os.path.isabs(file_name):
1389     logging.error("Filename passed to UploadFile is not absolute: '%s'",
1390                   file_name)
1391     return False
1392
1393   allowed_files = [
1394     constants.CLUSTER_CONF_FILE,
1395     constants.ETC_HOSTS,
1396     constants.SSH_KNOWN_HOSTS_FILE,
1397     constants.VNC_PASSWORD_FILE,
1398     ]
1399
1400   if file_name not in allowed_files:
1401     logging.error("Filename passed to UploadFile not in allowed"
1402                  " upload targets: '%s'", file_name)
1403     return False
1404
1405   raw_data = _Decompress(data)
1406
1407   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1408                   atime=atime, mtime=mtime)
1409   return True
1410
1411
1412 def WriteSsconfFiles(values):
1413   """Update all ssconf files.
1414
1415   Wrapper around the SimpleStore.WriteFiles.
1416
1417   """
1418   ssconf.SimpleStore().WriteFiles(values)
1419
1420
1421 def _ErrnoOrStr(err):
1422   """Format an EnvironmentError exception.
1423
1424   If the L{err} argument has an errno attribute, it will be looked up
1425   and converted into a textual C{E...} description. Otherwise the
1426   string representation of the error will be returned.
1427
1428   @type err: L{EnvironmentError}
1429   @param err: the exception to format
1430
1431   """
1432   if hasattr(err, 'errno'):
1433     detail = errno.errorcode[err.errno]
1434   else:
1435     detail = str(err)
1436   return detail
1437
1438
1439 def _OSOndiskVersion(name, os_dir):
1440   """Compute and return the API version of a given OS.
1441
1442   This function will try to read the API version of the OS given by
1443   the 'name' parameter and residing in the 'os_dir' directory.
1444
1445   @type name: str
1446   @param name: the OS name we should look for
1447   @type os_dir: str
1448   @param os_dir: the directory inwhich we should look for the OS
1449   @rtype: int or None
1450   @return:
1451       Either an integer denoting the version or None in the
1452       case when this is not a valid OS name.
1453   @raise errors.InvalidOS: if the OS cannot be found
1454
1455   """
1456   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1457
1458   try:
1459     st = os.stat(api_file)
1460   except EnvironmentError, err:
1461     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1462                            " found (%s)" % _ErrnoOrStr(err))
1463
1464   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1465     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1466                            " a regular file")
1467
1468   try:
1469     f = open(api_file)
1470     try:
1471       api_versions = f.readlines()
1472     finally:
1473       f.close()
1474   except EnvironmentError, err:
1475     raise errors.InvalidOS(name, os_dir, "error while reading the"
1476                            " API version (%s)" % _ErrnoOrStr(err))
1477
1478   api_versions = [version.strip() for version in api_versions]
1479   try:
1480     api_versions = [int(version) for version in api_versions]
1481   except (TypeError, ValueError), err:
1482     raise errors.InvalidOS(name, os_dir,
1483                            "API version is not integer (%s)" % str(err))
1484
1485   return api_versions
1486
1487
1488 def DiagnoseOS(top_dirs=None):
1489   """Compute the validity for all OSes.
1490
1491   @type top_dirs: list
1492   @param top_dirs: the list of directories in which to
1493       search (if not given defaults to
1494       L{constants.OS_SEARCH_PATH})
1495   @rtype: list of L{objects.OS}
1496   @return: an OS object for each name in all the given
1497       directories
1498
1499   """
1500   if top_dirs is None:
1501     top_dirs = constants.OS_SEARCH_PATH
1502
1503   result = []
1504   for dir_name in top_dirs:
1505     if os.path.isdir(dir_name):
1506       try:
1507         f_names = utils.ListVisibleFiles(dir_name)
1508       except EnvironmentError, err:
1509         logging.exception("Can't list the OS directory %s", dir_name)
1510         break
1511       for name in f_names:
1512         try:
1513           os_inst = OSFromDisk(name, base_dir=dir_name)
1514           result.append(os_inst)
1515         except errors.InvalidOS, err:
1516           result.append(objects.OS.FromInvalidOS(err))
1517
1518   return result
1519
1520
1521 def OSFromDisk(name, base_dir=None):
1522   """Create an OS instance from disk.
1523
1524   This function will return an OS instance if the given name is a
1525   valid OS name. Otherwise, it will raise an appropriate
1526   L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1527
1528   @type base_dir: string
1529   @keyword base_dir: Base directory containing OS installations.
1530                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1531   @rtype: L{objects.OS}
1532   @return: the OS instance if we find a valid one
1533   @raise errors.InvalidOS: if we don't find a valid OS
1534
1535   """
1536   if base_dir is None:
1537     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1538     if os_dir is None:
1539       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1540   else:
1541     os_dir = os.path.sep.join([base_dir, name])
1542
1543   api_versions = _OSOndiskVersion(name, os_dir)
1544
1545   if constants.OS_API_VERSION not in api_versions:
1546     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1547                            " (found %s want %s)"
1548                            % (api_versions, constants.OS_API_VERSION))
1549
1550   # OS Scripts dictionary, we will populate it with the actual script names
1551   os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1552
1553   for script in os_scripts:
1554     os_scripts[script] = os.path.sep.join([os_dir, script])
1555
1556     try:
1557       st = os.stat(os_scripts[script])
1558     except EnvironmentError, err:
1559       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1560                              (script, _ErrnoOrStr(err)))
1561
1562     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1563       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1564                              script)
1565
1566     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1567       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1568                              script)
1569
1570
1571   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1572                     create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1573                     export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1574                     import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1575                     rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1576                     api_versions=api_versions)
1577
1578 def OSEnvironment(instance, debug=0):
1579   """Calculate the environment for an os script.
1580
1581   @type instance: L{objects.Instance}
1582   @param instance: target instance for the os script run
1583   @type debug: integer
1584   @param debug: debug level (0 or 1, for OS Api 10)
1585   @rtype: dict
1586   @return: dict of environment variables
1587   @raise errors.BlockDeviceError: if the block device
1588       cannot be found
1589
1590   """
1591   result = {}
1592   result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1593   result['INSTANCE_NAME'] = instance.name
1594   result['HYPERVISOR'] = instance.hypervisor
1595   result['DISK_COUNT'] = '%d' % len(instance.disks)
1596   result['NIC_COUNT'] = '%d' % len(instance.nics)
1597   result['DEBUG_LEVEL'] = '%d' % debug
1598   for idx, disk in enumerate(instance.disks):
1599     real_disk = _RecursiveFindBD(disk)
1600     if real_disk is None:
1601       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1602                                     str(disk))
1603     real_disk.Open()
1604     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1605     # FIXME: When disks will have read-only mode, populate this
1606     result['DISK_%d_ACCESS' % idx] = 'W'
1607     if constants.HV_DISK_TYPE in instance.hvparams:
1608       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1609         instance.hvparams[constants.HV_DISK_TYPE]
1610     if disk.dev_type in constants.LDS_BLOCK:
1611       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1612     elif disk.dev_type == constants.LD_FILE:
1613       result['DISK_%d_BACKEND_TYPE' % idx] = \
1614         'file:%s' % disk.physical_id[0]
1615   for idx, nic in enumerate(instance.nics):
1616     result['NIC_%d_MAC' % idx] = nic.mac
1617     if nic.ip:
1618       result['NIC_%d_IP' % idx] = nic.ip
1619     result['NIC_%d_BRIDGE' % idx] = nic.bridge
1620     if constants.HV_NIC_TYPE in instance.hvparams:
1621       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1622         instance.hvparams[constants.HV_NIC_TYPE]
1623
1624   return result
1625
1626 def GrowBlockDevice(disk, amount):
1627   """Grow a stack of block devices.
1628
1629   This function is called recursively, with the childrens being the
1630   first ones to resize.
1631
1632   @type disk: L{objects.Disk}
1633   @param disk: the disk to be grown
1634   @rtype: (status, result)
1635   @return: a tuple with the status of the operation
1636       (True/False), and the errors message if status
1637       is False
1638
1639   """
1640   r_dev = _RecursiveFindBD(disk)
1641   if r_dev is None:
1642     return False, "Cannot find block device %s" % (disk,)
1643
1644   try:
1645     r_dev.Grow(amount)
1646   except errors.BlockDeviceError, err:
1647     return False, str(err)
1648
1649   return True, None
1650
1651
1652 def SnapshotBlockDevice(disk):
1653   """Create a snapshot copy of a block device.
1654
1655   This function is called recursively, and the snapshot is actually created
1656   just for the leaf lvm backend device.
1657
1658   @type disk: L{objects.Disk}
1659   @param disk: the disk to be snapshotted
1660   @rtype: string
1661   @return: snapshot disk path
1662
1663   """
1664   if disk.children:
1665     if len(disk.children) == 1:
1666       # only one child, let's recurse on it
1667       return SnapshotBlockDevice(disk.children[0])
1668     else:
1669       # more than one child, choose one that matches
1670       for child in disk.children:
1671         if child.size == disk.size:
1672           # return implies breaking the loop
1673           return SnapshotBlockDevice(child)
1674   elif disk.dev_type == constants.LD_LV:
1675     r_dev = _RecursiveFindBD(disk)
1676     if r_dev is not None:
1677       # let's stay on the safe side and ask for the full size, for now
1678       return r_dev.Snapshot(disk.size)
1679     else:
1680       return None
1681   else:
1682     raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1683                                  " '%s' of type '%s'" %
1684                                  (disk.unique_id, disk.dev_type))
1685
1686
1687 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1688   """Export a block device snapshot to a remote node.
1689
1690   @type disk: L{objects.Disk}
1691   @param disk: the description of the disk to export
1692   @type dest_node: str
1693   @param dest_node: the destination node to export to
1694   @type instance: L{objects.Instance}
1695   @param instance: the instance object to whom the disk belongs
1696   @type cluster_name: str
1697   @param cluster_name: the cluster name, needed for SSH hostalias
1698   @type idx: int
1699   @param idx: the index of the disk in the instance's disk list,
1700       used to export to the OS scripts environment
1701   @rtype: boolean
1702   @return: the success of the operation
1703
1704   """
1705   export_env = OSEnvironment(instance)
1706
1707   inst_os = OSFromDisk(instance.os)
1708   export_script = inst_os.export_script
1709
1710   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1711                                      instance.name, int(time.time()))
1712   if not os.path.exists(constants.LOG_OS_DIR):
1713     os.mkdir(constants.LOG_OS_DIR, 0750)
1714   real_disk = _RecursiveFindBD(disk)
1715   if real_disk is None:
1716     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1717                                   str(disk))
1718   real_disk.Open()
1719
1720   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1721   export_env['EXPORT_INDEX'] = str(idx)
1722
1723   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1724   destfile = disk.physical_id[1]
1725
1726   # the target command is built out of three individual commands,
1727   # which are joined by pipes; we check each individual command for
1728   # valid parameters
1729   expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1730                                export_script, logfile)
1731
1732   comprcmd = "gzip"
1733
1734   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1735                                 destdir, destdir, destfile)
1736   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1737                                                    constants.GANETI_RUNAS,
1738                                                    destcmd)
1739
1740   # all commands have been checked, so we're safe to combine them
1741   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1742
1743   result = utils.RunCmd(command, env=export_env)
1744
1745   if result.failed:
1746     logging.error("os snapshot export command '%s' returned error: %s"
1747                   " output: %s", command, result.fail_reason, result.output)
1748     return False
1749
1750   return True
1751
1752
1753 def FinalizeExport(instance, snap_disks):
1754   """Write out the export configuration information.
1755
1756   @type instance: L{objects.Instance}
1757   @param instance: the instance which we export, used for
1758       saving configuration
1759   @type snap_disks: list of L{objects.Disk}
1760   @param snap_disks: list of snapshot block devices, which
1761       will be used to get the actual name of the dump file
1762
1763   @rtype: boolean
1764   @return: the success of the operation
1765
1766   """
1767   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1768   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1769
1770   config = objects.SerializableConfigParser()
1771
1772   config.add_section(constants.INISECT_EXP)
1773   config.set(constants.INISECT_EXP, 'version', '0')
1774   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1775   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1776   config.set(constants.INISECT_EXP, 'os', instance.os)
1777   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1778
1779   config.add_section(constants.INISECT_INS)
1780   config.set(constants.INISECT_INS, 'name', instance.name)
1781   config.set(constants.INISECT_INS, 'memory', '%d' %
1782              instance.beparams[constants.BE_MEMORY])
1783   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1784              instance.beparams[constants.BE_VCPUS])
1785   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1786
1787   nic_total = 0
1788   for nic_count, nic in enumerate(instance.nics):
1789     nic_total += 1
1790     config.set(constants.INISECT_INS, 'nic%d_mac' %
1791                nic_count, '%s' % nic.mac)
1792     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1793     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1794                '%s' % nic.bridge)
1795   # TODO: redundant: on load can read nics until it doesn't exist
1796   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1797
1798   disk_total = 0
1799   for disk_count, disk in enumerate(snap_disks):
1800     if disk:
1801       disk_total += 1
1802       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1803                  ('%s' % disk.iv_name))
1804       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1805                  ('%s' % disk.physical_id[1]))
1806       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1807                  ('%d' % disk.size))
1808
1809   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1810
1811   utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1812                   data=config.Dumps())
1813   shutil.rmtree(finaldestdir, True)
1814   shutil.move(destdir, finaldestdir)
1815
1816   return True
1817
1818
1819 def ExportInfo(dest):
1820   """Get export configuration information.
1821
1822   @type dest: str
1823   @param dest: directory containing the export
1824
1825   @rtype: L{objects.SerializableConfigParser}
1826   @return: a serializable config file containing the
1827       export info
1828
1829   """
1830   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1831
1832   config = objects.SerializableConfigParser()
1833   config.read(cff)
1834
1835   if (not config.has_section(constants.INISECT_EXP) or
1836       not config.has_section(constants.INISECT_INS)):
1837     return None
1838
1839   return config
1840
1841
1842 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1843   """Import an os image into an instance.
1844
1845   @type instance: L{objects.Instance}
1846   @param instance: instance to import the disks into
1847   @type src_node: string
1848   @param src_node: source node for the disk images
1849   @type src_images: list of string
1850   @param src_images: absolute paths of the disk images
1851   @rtype: list of boolean
1852   @return: each boolean represent the success of importing the n-th disk
1853
1854   """
1855   import_env = OSEnvironment(instance)
1856   inst_os = OSFromDisk(instance.os)
1857   import_script = inst_os.import_script
1858
1859   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1860                                         instance.name, int(time.time()))
1861   if not os.path.exists(constants.LOG_OS_DIR):
1862     os.mkdir(constants.LOG_OS_DIR, 0750)
1863
1864   comprcmd = "gunzip"
1865   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1866                                import_script, logfile)
1867
1868   final_result = []
1869   for idx, image in enumerate(src_images):
1870     if image:
1871       destcmd = utils.BuildShellCmd('cat %s', image)
1872       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1873                                                        constants.GANETI_RUNAS,
1874                                                        destcmd)
1875       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1876       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1877       import_env['IMPORT_INDEX'] = str(idx)
1878       result = utils.RunCmd(command, env=import_env)
1879       if result.failed:
1880         logging.error("Disk import command '%s' returned error: %s"
1881                       " output: %s", command, result.fail_reason,
1882                       result.output)
1883         final_result.append(False)
1884       else:
1885         final_result.append(True)
1886     else:
1887       final_result.append(True)
1888
1889   return final_result
1890
1891
1892 def ListExports():
1893   """Return a list of exports currently available on this machine.
1894
1895   @rtype: list
1896   @return: list of the exports
1897
1898   """
1899   if os.path.isdir(constants.EXPORT_DIR):
1900     return utils.ListVisibleFiles(constants.EXPORT_DIR)
1901   else:
1902     return []
1903
1904
1905 def RemoveExport(export):
1906   """Remove an existing export from the node.
1907
1908   @type export: str
1909   @param export: the name of the export to remove
1910   @rtype: boolean
1911   @return: the success of the operation
1912
1913   """
1914   target = os.path.join(constants.EXPORT_DIR, export)
1915
1916   shutil.rmtree(target)
1917   # TODO: catch some of the relevant exceptions and provide a pretty
1918   # error message if rmtree fails.
1919
1920   return True
1921
1922
1923 def RenameBlockDevices(devlist):
1924   """Rename a list of block devices.
1925
1926   @type devlist: list of tuples
1927   @param devlist: list of tuples of the form  (disk,
1928       new_logical_id, new_physical_id); disk is an
1929       L{objects.Disk} object describing the current disk,
1930       and new logical_id/physical_id is the name we
1931       rename it to
1932   @rtype: boolean
1933   @return: True if all renames succeeded, False otherwise
1934
1935   """
1936   result = True
1937   for disk, unique_id in devlist:
1938     dev = _RecursiveFindBD(disk)
1939     if dev is None:
1940       result = False
1941       continue
1942     try:
1943       old_rpath = dev.dev_path
1944       dev.Rename(unique_id)
1945       new_rpath = dev.dev_path
1946       if old_rpath != new_rpath:
1947         DevCacheManager.RemoveCache(old_rpath)
1948         # FIXME: we should add the new cache information here, like:
1949         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1950         # but we don't have the owner here - maybe parse from existing
1951         # cache? for now, we only lose lvm data when we rename, which
1952         # is less critical than DRBD or MD
1953     except errors.BlockDeviceError, err:
1954       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1955       result = False
1956   return result
1957
1958
1959 def _TransformFileStorageDir(file_storage_dir):
1960   """Checks whether given file_storage_dir is valid.
1961
1962   Checks wheter the given file_storage_dir is within the cluster-wide
1963   default file_storage_dir stored in SimpleStore. Only paths under that
1964   directory are allowed.
1965
1966   @type file_storage_dir: str
1967   @param file_storage_dir: the path to check
1968
1969   @return: the normalized path if valid, None otherwise
1970
1971   """
1972   cfg = _GetConfig()
1973   file_storage_dir = os.path.normpath(file_storage_dir)
1974   base_file_storage_dir = cfg.GetFileStorageDir()
1975   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1976       base_file_storage_dir):
1977     logging.error("file storage directory '%s' is not under base file"
1978                   " storage directory '%s'",
1979                   file_storage_dir, base_file_storage_dir)
1980     return None
1981   return file_storage_dir
1982
1983
1984 def CreateFileStorageDir(file_storage_dir):
1985   """Create file storage directory.
1986
1987   @type file_storage_dir: str
1988   @param file_storage_dir: directory to create
1989
1990   @rtype: tuple
1991   @return: tuple with first element a boolean indicating wheter dir
1992       creation was successful or not
1993
1994   """
1995   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1996   result = True,
1997   if not file_storage_dir:
1998     result = False,
1999   else:
2000     if os.path.exists(file_storage_dir):
2001       if not os.path.isdir(file_storage_dir):
2002         logging.error("'%s' is not a directory", file_storage_dir)
2003         result = False,
2004     else:
2005       try:
2006         os.makedirs(file_storage_dir, 0750)
2007       except OSError, err:
2008         logging.error("Cannot create file storage directory '%s': %s",
2009                       file_storage_dir, err)
2010         result = False,
2011   return result
2012
2013
2014 def RemoveFileStorageDir(file_storage_dir):
2015   """Remove file storage directory.
2016
2017   Remove it only if it's empty. If not log an error and return.
2018
2019   @type file_storage_dir: str
2020   @param file_storage_dir: the directory we should cleanup
2021   @rtype: tuple (success,)
2022   @return: tuple of one element, C{success}, denoting
2023       whether the operation was successfull
2024
2025   """
2026   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2027   result = True,
2028   if not file_storage_dir:
2029     result = False,
2030   else:
2031     if os.path.exists(file_storage_dir):
2032       if not os.path.isdir(file_storage_dir):
2033         logging.error("'%s' is not a directory", file_storage_dir)
2034         result = False,
2035       # deletes dir only if empty, otherwise we want to return False
2036       try:
2037         os.rmdir(file_storage_dir)
2038       except OSError, err:
2039         logging.exception("Cannot remove file storage directory '%s'",
2040                           file_storage_dir)
2041         result = False,
2042   return result
2043
2044
2045 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2046   """Rename the file storage directory.
2047
2048   @type old_file_storage_dir: str
2049   @param old_file_storage_dir: the current path
2050   @type new_file_storage_dir: str
2051   @param new_file_storage_dir: the name we should rename to
2052   @rtype: tuple (success,)
2053   @return: tuple of one element, C{success}, denoting
2054       whether the operation was successful
2055
2056   """
2057   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2058   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2059   result = True,
2060   if not old_file_storage_dir or not new_file_storage_dir:
2061     result = False,
2062   else:
2063     if not os.path.exists(new_file_storage_dir):
2064       if os.path.isdir(old_file_storage_dir):
2065         try:
2066           os.rename(old_file_storage_dir, new_file_storage_dir)
2067         except OSError, err:
2068           logging.exception("Cannot rename '%s' to '%s'",
2069                             old_file_storage_dir, new_file_storage_dir)
2070           result =  False,
2071       else:
2072         logging.error("'%s' is not a directory", old_file_storage_dir)
2073         result = False,
2074     else:
2075       if os.path.exists(old_file_storage_dir):
2076         logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
2077                       old_file_storage_dir, new_file_storage_dir)
2078         result = False,
2079   return result
2080
2081
2082 def _IsJobQueueFile(file_name):
2083   """Checks whether the given filename is in the queue directory.
2084
2085   @type file_name: str
2086   @param file_name: the file name we should check
2087   @rtype: boolean
2088   @return: whether the file is under the queue directory
2089
2090   """
2091   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2092   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2093
2094   if not result:
2095     logging.error("'%s' is not a file in the queue directory",
2096                   file_name)
2097
2098   return result
2099
2100
2101 def JobQueueUpdate(file_name, content):
2102   """Updates a file in the queue directory.
2103
2104   This is just a wrapper over L{utils.WriteFile}, with proper
2105   checking.
2106
2107   @type file_name: str
2108   @param file_name: the job file name
2109   @type content: str
2110   @param content: the new job contents
2111   @rtype: boolean
2112   @return: the success of the operation
2113
2114   """
2115   if not _IsJobQueueFile(file_name):
2116     return False
2117
2118   # Write and replace the file atomically
2119   utils.WriteFile(file_name, data=_Decompress(content))
2120
2121   return True
2122
2123
2124 def JobQueueRename(old, new):
2125   """Renames a job queue file.
2126
2127   This is just a wrapper over os.rename with proper checking.
2128
2129   @type old: str
2130   @param old: the old (actual) file name
2131   @type new: str
2132   @param new: the desired file name
2133   @rtype: boolean
2134   @return: the success of the operation
2135
2136   """
2137   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2138     return False
2139
2140   utils.RenameFile(old, new, mkdir=True)
2141
2142   return True
2143
2144
2145 def JobQueueSetDrainFlag(drain_flag):
2146   """Set the drain flag for the queue.
2147
2148   This will set or unset the queue drain flag.
2149
2150   @type drain_flag: boolean
2151   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2152   @rtype: boolean
2153   @return: always True
2154   @warning: the function always returns True
2155
2156   """
2157   if drain_flag:
2158     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2159   else:
2160     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2161
2162   return True
2163
2164
2165 def CloseBlockDevices(instance_name, disks):
2166   """Closes the given block devices.
2167
2168   This means they will be switched to secondary mode (in case of
2169   DRBD).
2170
2171   @param instance_name: if the argument is not empty, the symlinks
2172       of this instance will be removed
2173   @type disks: list of L{objects.Disk}
2174   @param disks: the list of disks to be closed
2175   @rtype: tuple (success, message)
2176   @return: a tuple of success and message, where success
2177       indicates the succes of the operation, and message
2178       which will contain the error details in case we
2179       failed
2180
2181   """
2182   bdevs = []
2183   for cf in disks:
2184     rd = _RecursiveFindBD(cf)
2185     if rd is None:
2186       return (False, "Can't find device %s" % cf)
2187     bdevs.append(rd)
2188
2189   msg = []
2190   for rd in bdevs:
2191     try:
2192       rd.Close()
2193     except errors.BlockDeviceError, err:
2194       msg.append(str(err))
2195   if msg:
2196     return (False, "Can't make devices secondary: %s" % ",".join(msg))
2197   else:
2198     if instance_name:
2199       _RemoveBlockDevLinks(instance_name, disks)
2200     return (True, "All devices secondary")
2201
2202
2203 def ValidateHVParams(hvname, hvparams):
2204   """Validates the given hypervisor parameters.
2205
2206   @type hvname: string
2207   @param hvname: the hypervisor name
2208   @type hvparams: dict
2209   @param hvparams: the hypervisor parameters to be validated
2210   @rtype: tuple (success, message)
2211   @return: a tuple of success and message, where success
2212       indicates the succes of the operation, and message
2213       which will contain the error details in case we
2214       failed
2215
2216   """
2217   try:
2218     hv_type = hypervisor.GetHypervisor(hvname)
2219     hv_type.ValidateParameters(hvparams)
2220     return (True, "Validation passed")
2221   except errors.HypervisorError, err:
2222     return (False, str(err))
2223
2224
2225 def DemoteFromMC():
2226   """Demotes the current node from master candidate role.
2227
2228   """
2229   # try to ensure we're not the master by mistake
2230   master, myself = ssconf.GetMasterAndMyself()
2231   if master == myself:
2232     return (False, "ssconf status shows I'm the master node, will not demote")
2233   pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2234   if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2235     return (False, "The master daemon is running, will not demote")
2236   try:
2237     utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2238   except EnvironmentError, err:
2239     if err.errno != errno.ENOENT:
2240       return (False, "Error while backing up cluster file: %s" % str(err))
2241   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2242   return (True, "Done")
2243
2244
2245 def _FindDisks(nodes_ip, disks):
2246   """Sets the physical ID on disks and returns the block devices.
2247
2248   """
2249   # set the correct physical ID
2250   my_name = utils.HostInfo().name
2251   for cf in disks:
2252     cf.SetPhysicalID(my_name, nodes_ip)
2253
2254   bdevs = []
2255
2256   for cf in disks:
2257     rd = _RecursiveFindBD(cf)
2258     if rd is None:
2259       return (False, "Can't find device %s" % cf)
2260     bdevs.append(rd)
2261   return (True, bdevs)
2262
2263
2264 def DrbdDisconnectNet(nodes_ip, disks):
2265   """Disconnects the network on a list of drbd devices.
2266
2267   """
2268   status, bdevs = _FindDisks(nodes_ip, disks)
2269   if not status:
2270     return status, bdevs
2271
2272   # disconnect disks
2273   for rd in bdevs:
2274     try:
2275       rd.DisconnectNet()
2276     except errors.BlockDeviceError, err:
2277       logging.exception("Failed to go into standalone mode")
2278       return (False, "Can't change network configuration: %s" % str(err))
2279   return (True, "All disks are now disconnected")
2280
2281
2282 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2283   """Attaches the network on a list of drbd devices.
2284
2285   """
2286   status, bdevs = _FindDisks(nodes_ip, disks)
2287   if not status:
2288     return status, bdevs
2289
2290   if multimaster:
2291     for idx, rd in enumerate(bdevs):
2292       try:
2293         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2294       except EnvironmentError, err:
2295         return (False, "Can't create symlink: %s" % str(err))
2296   # reconnect disks, switch to new master configuration and if
2297   # needed primary mode
2298   for rd in bdevs:
2299     try:
2300       rd.AttachNet(multimaster)
2301     except errors.BlockDeviceError, err:
2302       return (False, "Can't change network configuration: %s" % str(err))
2303   # wait until the disks are connected; we need to retry the re-attach
2304   # if the device becomes standalone, as this might happen if the one
2305   # node disconnects and reconnects in a different mode before the
2306   # other node reconnects; in this case, one or both of the nodes will
2307   # decide it has wrong configuration and switch to standalone
2308   RECONNECT_TIMEOUT = 2 * 60
2309   sleep_time = 0.100 # start with 100 miliseconds
2310   timeout_limit = time.time() + RECONNECT_TIMEOUT
2311   while time.time() < timeout_limit:
2312     all_connected = True
2313     for rd in bdevs:
2314       stats = rd.GetProcStatus()
2315       if not (stats.is_connected or stats.is_in_resync):
2316         all_connected = False
2317       if stats.is_standalone:
2318         # peer had different config info and this node became
2319         # standalone, even though this should not happen with the
2320         # new staged way of changing disk configs
2321         try:
2322           rd.ReAttachNet(multimaster)
2323         except errors.BlockDeviceError, err:
2324           return (False, "Can't change network configuration: %s" % str(err))
2325     if all_connected:
2326       break
2327     time.sleep(sleep_time)
2328     sleep_time = min(5, sleep_time * 1.5)
2329   if not all_connected:
2330     return (False, "Timeout in disk reconnecting")
2331   if multimaster:
2332     # change to primary mode
2333     for rd in bdevs:
2334       rd.Open()
2335   if multimaster:
2336     msg = "multi-master and primary"
2337   else:
2338     msg = "single-master"
2339   return (True, "Disks are now configured as %s" % msg)
2340
2341
2342 def DrbdWaitSync(nodes_ip, disks):
2343   """Wait until DRBDs have synchronized.
2344
2345   """
2346   status, bdevs = _FindDisks(nodes_ip, disks)
2347   if not status:
2348     return status, bdevs
2349
2350   min_resync = 100
2351   alldone = True
2352   failure = False
2353   for rd in bdevs:
2354     stats = rd.GetProcStatus()
2355     if not (stats.is_connected or stats.is_in_resync):
2356       failure = True
2357       break
2358     alldone = alldone and (not stats.is_in_resync)
2359     if stats.sync_percent is not None:
2360       min_resync = min(min_resync, stats.sync_percent)
2361   return (not failure, (alldone, min_resync))
2362
2363
2364 class HooksRunner(object):
2365   """Hook runner.
2366
2367   This class is instantiated on the node side (ganeti-noded) and not
2368   on the master side.
2369
2370   """
2371   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2372
2373   def __init__(self, hooks_base_dir=None):
2374     """Constructor for hooks runner.
2375
2376     @type hooks_base_dir: str or None
2377     @param hooks_base_dir: if not None, this overrides the
2378         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2379
2380     """
2381     if hooks_base_dir is None:
2382       hooks_base_dir = constants.HOOKS_BASE_DIR
2383     self._BASE_DIR = hooks_base_dir
2384
2385   @staticmethod
2386   def ExecHook(script, env):
2387     """Exec one hook script.
2388
2389     @type script: str
2390     @param script: the full path to the script
2391     @type env: dict
2392     @param env: the environment with which to exec the script
2393     @rtype: tuple (success, message)
2394     @return: a tuple of success and message, where success
2395         indicates the succes of the operation, and message
2396         which will contain the error details in case we
2397         failed
2398
2399     """
2400     # exec the process using subprocess and log the output
2401     fdstdin = None
2402     try:
2403       fdstdin = open("/dev/null", "r")
2404       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2405                                stderr=subprocess.STDOUT, close_fds=True,
2406                                shell=False, cwd="/", env=env)
2407       output = ""
2408       try:
2409         output = child.stdout.read(4096)
2410         child.stdout.close()
2411       except EnvironmentError, err:
2412         output += "Hook script error: %s" % str(err)
2413
2414       while True:
2415         try:
2416           result = child.wait()
2417           break
2418         except EnvironmentError, err:
2419           if err.errno == errno.EINTR:
2420             continue
2421           raise
2422     finally:
2423       # try not to leak fds
2424       for fd in (fdstdin, ):
2425         if fd is not None:
2426           try:
2427             fd.close()
2428           except EnvironmentError, err:
2429             # just log the error
2430             #logging.exception("Error while closing fd %s", fd)
2431             pass
2432
2433     return result == 0, output
2434
2435   def RunHooks(self, hpath, phase, env):
2436     """Run the scripts in the hooks directory.
2437
2438     @type hpath: str
2439     @param hpath: the path to the hooks directory which
2440         holds the scripts
2441     @type phase: str
2442     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2443         L{constants.HOOKS_PHASE_POST}
2444     @type env: dict
2445     @param env: dictionary with the environment for the hook
2446     @rtype: list
2447     @return: list of 3-element tuples:
2448       - script path
2449       - script result, either L{constants.HKR_SUCCESS} or
2450         L{constants.HKR_FAIL}
2451       - output of the script
2452
2453     @raise errors.ProgrammerError: for invalid input
2454         parameters
2455
2456     """
2457     if phase == constants.HOOKS_PHASE_PRE:
2458       suffix = "pre"
2459     elif phase == constants.HOOKS_PHASE_POST:
2460       suffix = "post"
2461     else:
2462       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2463     rr = []
2464
2465     subdir = "%s-%s.d" % (hpath, suffix)
2466     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2467     try:
2468       dir_contents = utils.ListVisibleFiles(dir_name)
2469     except OSError, err:
2470       # FIXME: must log output in case of failures
2471       return rr
2472
2473     # we use the standard python sort order,
2474     # so 00name is the recommended naming scheme
2475     dir_contents.sort()
2476     for relname in dir_contents:
2477       fname = os.path.join(dir_name, relname)
2478       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2479           self.RE_MASK.match(relname) is not None):
2480         rrval = constants.HKR_SKIP
2481         output = ""
2482       else:
2483         result, output = self.ExecHook(fname, env)
2484         if not result:
2485           rrval = constants.HKR_FAIL
2486         else:
2487           rrval = constants.HKR_SUCCESS
2488       rr.append(("%s/%s" % (subdir, relname), rrval, output))
2489
2490     return rr
2491
2492
2493 class IAllocatorRunner(object):
2494   """IAllocator runner.
2495
2496   This class is instantiated on the node side (ganeti-noded) and not on
2497   the master side.
2498
2499   """
2500   def Run(self, name, idata):
2501     """Run an iallocator script.
2502
2503     @type name: str
2504     @param name: the iallocator script name
2505     @type idata: str
2506     @param idata: the allocator input data
2507
2508     @rtype: tuple
2509     @return: four element tuple of:
2510        - run status (one of the IARUN_ constants)
2511        - stdout
2512        - stderr
2513        - fail reason (as from L{utils.RunResult})
2514
2515     """
2516     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2517                                   os.path.isfile)
2518     if alloc_script is None:
2519       return (constants.IARUN_NOTFOUND, None, None, None)
2520
2521     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2522     try:
2523       os.write(fd, idata)
2524       os.close(fd)
2525       result = utils.RunCmd([alloc_script, fin_name])
2526       if result.failed:
2527         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2528                 result.fail_reason)
2529     finally:
2530       os.unlink(fin_name)
2531
2532     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2533
2534
2535 class DevCacheManager(object):
2536   """Simple class for managing a cache of block device information.
2537
2538   """
2539   _DEV_PREFIX = "/dev/"
2540   _ROOT_DIR = constants.BDEV_CACHE_DIR
2541
2542   @classmethod
2543   def _ConvertPath(cls, dev_path):
2544     """Converts a /dev/name path to the cache file name.
2545
2546     This replaces slashes with underscores and strips the /dev
2547     prefix. It then returns the full path to the cache file.
2548
2549     @type dev_path: str
2550     @param dev_path: the C{/dev/} path name
2551     @rtype: str
2552     @return: the converted path name
2553
2554     """
2555     if dev_path.startswith(cls._DEV_PREFIX):
2556       dev_path = dev_path[len(cls._DEV_PREFIX):]
2557     dev_path = dev_path.replace("/", "_")
2558     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2559     return fpath
2560
2561   @classmethod
2562   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2563     """Updates the cache information for a given device.
2564
2565     @type dev_path: str
2566     @param dev_path: the pathname of the device
2567     @type owner: str
2568     @param owner: the owner (instance name) of the device
2569     @type on_primary: bool
2570     @param on_primary: whether this is the primary
2571         node nor not
2572     @type iv_name: str
2573     @param iv_name: the instance-visible name of the
2574         device, as in objects.Disk.iv_name
2575
2576     @rtype: None
2577
2578     """
2579     if dev_path is None:
2580       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2581       return
2582     fpath = cls._ConvertPath(dev_path)
2583     if on_primary:
2584       state = "primary"
2585     else:
2586       state = "secondary"
2587     if iv_name is None:
2588       iv_name = "not_visible"
2589     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2590     try:
2591       utils.WriteFile(fpath, data=fdata)
2592     except EnvironmentError, err:
2593       logging.exception("Can't update bdev cache for %s", dev_path)
2594
2595   @classmethod
2596   def RemoveCache(cls, dev_path):
2597     """Remove data for a dev_path.
2598
2599     This is just a wrapper over L{utils.RemoveFile} with a converted
2600     path name and logging.
2601
2602     @type dev_path: str
2603     @param dev_path: the pathname of the device
2604
2605     @rtype: None
2606
2607     """
2608     if dev_path is None:
2609       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2610       return
2611     fpath = cls._ConvertPath(dev_path)
2612     try:
2613       utils.RemoveFile(fpath)
2614     except EnvironmentError, err:
2615       logging.exception("Can't update bdev cache for %s", dev_path)