f2ac504f7da2fa8c3dae9667bd98d4fef836daa5
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36 import zlib
37 import base64
38
39 from ganeti import errors
40 from ganeti import utils
41 from ganeti import ssh
42 from ganeti import hypervisor
43 from ganeti import constants
44 from ganeti import bdev
45 from ganeti import objects
46 from ganeti import ssconf
47
48
49 class RPCFail(Exception):
50   """Class denoting RPC failure.
51
52   Its argument is the error message.
53
54   """
55
56 def _Fail(msg, *args, **kwargs):
57   """Log an error and the raise an RPCFail exception.
58
59   This exception is then handled specially in the ganeti daemon and
60   turned into a 'failed' return type. As such, this function is a
61   useful shortcut for logging the error and returning it to the master
62   daemon.
63
64   @type msg: string
65   @param msg: the text of the exception
66   @raise RPCFail
67
68   """
69   if args:
70     msg = msg % args
71   if "exc" in kwargs and kwargs["exc"]:
72     logging.exception(msg)
73   else:
74     logging.error(msg)
75   raise RPCFail(msg)
76
77
78 def _GetConfig():
79   """Simple wrapper to return a SimpleStore.
80
81   @rtype: L{ssconf.SimpleStore}
82   @return: a SimpleStore instance
83
84   """
85   return ssconf.SimpleStore()
86
87
88 def _GetSshRunner(cluster_name):
89   """Simple wrapper to return an SshRunner.
90
91   @type cluster_name: str
92   @param cluster_name: the cluster name, which is needed
93       by the SshRunner constructor
94   @rtype: L{ssh.SshRunner}
95   @return: an SshRunner instance
96
97   """
98   return ssh.SshRunner(cluster_name)
99
100
101 def _Decompress(data):
102   """Unpacks data compressed by the RPC client.
103
104   @type data: list or tuple
105   @param data: Data sent by RPC client
106   @rtype: str
107   @return: Decompressed data
108
109   """
110   assert isinstance(data, (list, tuple))
111   assert len(data) == 2
112   (encoding, content) = data
113   if encoding == constants.RPC_ENCODING_NONE:
114     return content
115   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
116     return zlib.decompress(base64.b64decode(content))
117   else:
118     raise AssertionError("Unknown data encoding")
119
120
121 def _CleanDirectory(path, exclude=None):
122   """Removes all regular files in a directory.
123
124   @type path: str
125   @param path: the directory to clean
126   @type exclude: list
127   @param exclude: list of files to be excluded, defaults
128       to the empty list
129
130   """
131   if not os.path.isdir(path):
132     return
133   if exclude is None:
134     exclude = []
135   else:
136     # Normalize excluded paths
137     exclude = [os.path.normpath(i) for i in exclude]
138
139   for rel_name in utils.ListVisibleFiles(path):
140     full_name = os.path.normpath(os.path.join(path, rel_name))
141     if full_name in exclude:
142       continue
143     if os.path.isfile(full_name) and not os.path.islink(full_name):
144       utils.RemoveFile(full_name)
145
146
147 def JobQueuePurge():
148   """Removes job queue files and archived jobs.
149
150   @rtype: None
151
152   """
153   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
154   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
155
156
157 def GetMasterInfo():
158   """Returns master information.
159
160   This is an utility function to compute master information, either
161   for consumption here or from the node daemon.
162
163   @rtype: tuple
164   @return: (master_netdev, master_ip, master_name) if we have a good
165       configuration, otherwise (None, None, None)
166
167   """
168   try:
169     cfg = _GetConfig()
170     master_netdev = cfg.GetMasterNetdev()
171     master_ip = cfg.GetMasterIP()
172     master_node = cfg.GetMasterNode()
173   except errors.ConfigurationError, err:
174     logging.exception("Cluster configuration incomplete")
175     return (None, None, None)
176   return (master_netdev, master_ip, master_node)
177
178
179 def StartMaster(start_daemons):
180   """Activate local node as master node.
181
182   The function will always try activate the IP address of the master
183   (unless someone else has it). It will also start the master daemons,
184   based on the start_daemons parameter.
185
186   @type start_daemons: boolean
187   @param start_daemons: whther to also start the master
188       daemons (ganeti-masterd and ganeti-rapi)
189   @rtype: None
190
191   """
192   ok = True
193   master_netdev, master_ip, _ = GetMasterInfo()
194   if not master_netdev:
195     return False
196
197   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
198     if utils.OwnIpAddress(master_ip):
199       # we already have the ip:
200       logging.debug("Already started")
201     else:
202       logging.error("Someone else has the master ip, not activating")
203       ok = False
204   else:
205     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
206                            "dev", master_netdev, "label",
207                            "%s:0" % master_netdev])
208     if result.failed:
209       logging.error("Can't activate master IP: %s", result.output)
210       ok = False
211
212     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
213                            "-s", master_ip, master_ip])
214     # we'll ignore the exit code of arping
215
216   # and now start the master and rapi daemons
217   if start_daemons:
218     for daemon in 'ganeti-masterd', 'ganeti-rapi':
219       result = utils.RunCmd([daemon])
220       if result.failed:
221         logging.error("Can't start daemon %s: %s", daemon, result.output)
222         ok = False
223   return ok
224
225
226 def StopMaster(stop_daemons):
227   """Deactivate this node as master.
228
229   The function will always try to deactivate the IP address of the
230   master. It will also stop the master daemons depending on the
231   stop_daemons parameter.
232
233   @type stop_daemons: boolean
234   @param stop_daemons: whether to also stop the master daemons
235       (ganeti-masterd and ganeti-rapi)
236   @rtype: None
237
238   """
239   master_netdev, master_ip, _ = GetMasterInfo()
240   if not master_netdev:
241     return False
242
243   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
244                          "dev", master_netdev])
245   if result.failed:
246     logging.error("Can't remove the master IP, error: %s", result.output)
247     # but otherwise ignore the failure
248
249   if stop_daemons:
250     # stop/kill the rapi and the master daemon
251     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
252       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
253
254   return True
255
256
257 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
258   """Joins this node to the cluster.
259
260   This does the following:
261       - updates the hostkeys of the machine (rsa and dsa)
262       - adds the ssh private key to the user
263       - adds the ssh public key to the users' authorized_keys file
264
265   @type dsa: str
266   @param dsa: the DSA private key to write
267   @type dsapub: str
268   @param dsapub: the DSA public key to write
269   @type rsa: str
270   @param rsa: the RSA private key to write
271   @type rsapub: str
272   @param rsapub: the RSA public key to write
273   @type sshkey: str
274   @param sshkey: the SSH private key to write
275   @type sshpub: str
276   @param sshpub: the SSH public key to write
277   @rtype: boolean
278   @return: the success of the operation
279
280   """
281   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
282                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
283                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
284                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
285   for name, content, mode in sshd_keys:
286     utils.WriteFile(name, data=content, mode=mode)
287
288   try:
289     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
290                                                     mkdir=True)
291   except errors.OpExecError, err:
292     _Fail("Error while processing user ssh files: %s", err, exc=True)
293
294   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
295     utils.WriteFile(name, data=content, mode=0600)
296
297   utils.AddAuthorizedKey(auth_keys, sshpub)
298
299   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
300
301   return (True, "Node added successfully")
302
303
304 def LeaveCluster():
305   """Cleans up and remove the current node.
306
307   This function cleans up and prepares the current node to be removed
308   from the cluster.
309
310   If processing is successful, then it raises an
311   L{errors.QuitGanetiException} which is used as a special case to
312   shutdown the node daemon.
313
314   """
315   _CleanDirectory(constants.DATA_DIR)
316   JobQueuePurge()
317
318   try:
319     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
320   except errors.OpExecError:
321     logging.exception("Error while processing ssh files")
322     return
323
324   f = open(pub_key, 'r')
325   try:
326     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
327   finally:
328     f.close()
329
330   utils.RemoveFile(priv_key)
331   utils.RemoveFile(pub_key)
332
333   # Return a reassuring string to the caller, and quit
334   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
335
336
337 def GetNodeInfo(vgname, hypervisor_type):
338   """Gives back a hash with different informations about the node.
339
340   @type vgname: C{string}
341   @param vgname: the name of the volume group to ask for disk space information
342   @type hypervisor_type: C{str}
343   @param hypervisor_type: the name of the hypervisor to ask for
344       memory information
345   @rtype: C{dict}
346   @return: dictionary with the following keys:
347       - vg_size is the size of the configured volume group in MiB
348       - vg_free is the free size of the volume group in MiB
349       - memory_dom0 is the memory allocated for domain0 in MiB
350       - memory_free is the currently available (free) ram in MiB
351       - memory_total is the total number of ram in MiB
352
353   """
354   outputarray = {}
355   vginfo = _GetVGInfo(vgname)
356   outputarray['vg_size'] = vginfo['vg_size']
357   outputarray['vg_free'] = vginfo['vg_free']
358
359   hyper = hypervisor.GetHypervisor(hypervisor_type)
360   hyp_info = hyper.GetNodeInfo()
361   if hyp_info is not None:
362     outputarray.update(hyp_info)
363
364   f = open("/proc/sys/kernel/random/boot_id", 'r')
365   try:
366     outputarray["bootid"] = f.read(128).rstrip("\n")
367   finally:
368     f.close()
369
370   return outputarray
371
372
373 def VerifyNode(what, cluster_name):
374   """Verify the status of the local node.
375
376   Based on the input L{what} parameter, various checks are done on the
377   local node.
378
379   If the I{filelist} key is present, this list of
380   files is checksummed and the file/checksum pairs are returned.
381
382   If the I{nodelist} key is present, we check that we have
383   connectivity via ssh with the target nodes (and check the hostname
384   report).
385
386   If the I{node-net-test} key is present, we check that we have
387   connectivity to the given nodes via both primary IP and, if
388   applicable, secondary IPs.
389
390   @type what: C{dict}
391   @param what: a dictionary of things to check:
392       - filelist: list of files for which to compute checksums
393       - nodelist: list of nodes we should check ssh communication with
394       - node-net-test: list of nodes we should check node daemon port
395         connectivity with
396       - hypervisor: list with hypervisors to run the verify for
397   @rtype: dict
398   @return: a dictionary with the same keys as the input dict, and
399       values representing the result of the checks
400
401   """
402   result = {}
403
404   if constants.NV_HYPERVISOR in what:
405     result[constants.NV_HYPERVISOR] = tmp = {}
406     for hv_name in what[constants.NV_HYPERVISOR]:
407       tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
408
409   if constants.NV_FILELIST in what:
410     result[constants.NV_FILELIST] = utils.FingerprintFiles(
411       what[constants.NV_FILELIST])
412
413   if constants.NV_NODELIST in what:
414     result[constants.NV_NODELIST] = tmp = {}
415     random.shuffle(what[constants.NV_NODELIST])
416     for node in what[constants.NV_NODELIST]:
417       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
418       if not success:
419         tmp[node] = message
420
421   if constants.NV_NODENETTEST in what:
422     result[constants.NV_NODENETTEST] = tmp = {}
423     my_name = utils.HostInfo().name
424     my_pip = my_sip = None
425     for name, pip, sip in what[constants.NV_NODENETTEST]:
426       if name == my_name:
427         my_pip = pip
428         my_sip = sip
429         break
430     if not my_pip:
431       tmp[my_name] = ("Can't find my own primary/secondary IP"
432                       " in the node list")
433     else:
434       port = utils.GetNodeDaemonPort()
435       for name, pip, sip in what[constants.NV_NODENETTEST]:
436         fail = []
437         if not utils.TcpPing(pip, port, source=my_pip):
438           fail.append("primary")
439         if sip != pip:
440           if not utils.TcpPing(sip, port, source=my_sip):
441             fail.append("secondary")
442         if fail:
443           tmp[name] = ("failure using the %s interface(s)" %
444                        " and ".join(fail))
445
446   if constants.NV_LVLIST in what:
447     result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
448
449   if constants.NV_INSTANCELIST in what:
450     result[constants.NV_INSTANCELIST] = GetInstanceList(
451       what[constants.NV_INSTANCELIST])
452
453   if constants.NV_VGLIST in what:
454     result[constants.NV_VGLIST] = ListVolumeGroups()
455
456   if constants.NV_VERSION in what:
457     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
458                                     constants.RELEASE_VERSION)
459
460   if constants.NV_HVINFO in what:
461     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
462     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
463
464   if constants.NV_DRBDLIST in what:
465     try:
466       used_minors = bdev.DRBD8.GetUsedDevs().keys()
467     except errors.BlockDeviceError, err:
468       logging.warning("Can't get used minors list", exc_info=True)
469       used_minors = str(err)
470     result[constants.NV_DRBDLIST] = used_minors
471
472   return result
473
474
475 def GetVolumeList(vg_name):
476   """Compute list of logical volumes and their size.
477
478   @type vg_name: str
479   @param vg_name: the volume group whose LVs we should list
480   @rtype: dict
481   @return:
482       dictionary of all partions (key) with value being a tuple of
483       their size (in MiB), inactive and online status::
484
485         {'test1': ('20.06', True, True)}
486
487       in case of errors, a string is returned with the error
488       details.
489
490   """
491   lvs = {}
492   sep = '|'
493   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
494                          "--separator=%s" % sep,
495                          "-olv_name,lv_size,lv_attr", vg_name])
496   if result.failed:
497     logging.error("Failed to list logical volumes, lvs output: %s",
498                   result.output)
499     return result.output
500
501   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
502   for line in result.stdout.splitlines():
503     line = line.strip()
504     match = valid_line_re.match(line)
505     if not match:
506       logging.error("Invalid line returned from lvs output: '%s'", line)
507       continue
508     name, size, attr = match.groups()
509     inactive = attr[4] == '-'
510     online = attr[5] == 'o'
511     lvs[name] = (size, inactive, online)
512
513   return lvs
514
515
516 def ListVolumeGroups():
517   """List the volume groups and their size.
518
519   @rtype: dict
520   @return: dictionary with keys volume name and values the
521       size of the volume
522
523   """
524   return utils.ListVolumeGroups()
525
526
527 def NodeVolumes():
528   """List all volumes on this node.
529
530   @rtype: list
531   @return:
532     A list of dictionaries, each having four keys:
533       - name: the logical volume name,
534       - size: the size of the logical volume
535       - dev: the physical device on which the LV lives
536       - vg: the volume group to which it belongs
537
538     In case of errors, we return an empty list and log the
539     error.
540
541     Note that since a logical volume can live on multiple physical
542     volumes, the resulting list might include a logical volume
543     multiple times.
544
545   """
546   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
547                          "--separator=|",
548                          "--options=lv_name,lv_size,devices,vg_name"])
549   if result.failed:
550     logging.error("Failed to list logical volumes, lvs output: %s",
551                   result.output)
552     return []
553
554   def parse_dev(dev):
555     if '(' in dev:
556       return dev.split('(')[0]
557     else:
558       return dev
559
560   def map_line(line):
561     return {
562       'name': line[0].strip(),
563       'size': line[1].strip(),
564       'dev': parse_dev(line[2].strip()),
565       'vg': line[3].strip(),
566     }
567
568   return [map_line(line.split('|')) for line in result.stdout.splitlines()
569           if line.count('|') >= 3]
570
571
572 def BridgesExist(bridges_list):
573   """Check if a list of bridges exist on the current node.
574
575   @rtype: boolean
576   @return: C{True} if all of them exist, C{False} otherwise
577
578   """
579   for bridge in bridges_list:
580     if not utils.BridgeExists(bridge):
581       return False
582
583   return True
584
585
586 def GetInstanceList(hypervisor_list):
587   """Provides a list of instances.
588
589   @type hypervisor_list: list
590   @param hypervisor_list: the list of hypervisors to query information
591
592   @rtype: list
593   @return: a list of all running instances on the current node
594     - instance1.example.com
595     - instance2.example.com
596
597   """
598   results = []
599   for hname in hypervisor_list:
600     try:
601       names = hypervisor.GetHypervisor(hname).ListInstances()
602       results.extend(names)
603     except errors.HypervisorError, err:
604       logging.exception("Error enumerating instances for hypevisor %s", hname)
605       raise
606
607   return results
608
609
610 def GetInstanceInfo(instance, hname):
611   """Gives back the informations about an instance as a dictionary.
612
613   @type instance: string
614   @param instance: the instance name
615   @type hname: string
616   @param hname: the hypervisor type of the instance
617
618   @rtype: dict
619   @return: dictionary with the following keys:
620       - memory: memory size of instance (int)
621       - state: xen state of instance (string)
622       - time: cpu time of instance (float)
623
624   """
625   output = {}
626
627   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
628   if iinfo is not None:
629     output['memory'] = iinfo[2]
630     output['state'] = iinfo[4]
631     output['time'] = iinfo[5]
632
633   return output
634
635
636 def GetInstanceMigratable(instance):
637   """Gives whether an instance can be migrated.
638
639   @type instance: L{objects.Instance}
640   @param instance: object representing the instance to be checked.
641
642   @rtype: tuple
643   @return: tuple of (result, description) where:
644       - result: whether the instance can be migrated or not
645       - description: a description of the issue, if relevant
646
647   """
648   hyper = hypervisor.GetHypervisor(instance.hypervisor)
649   if instance.name not in hyper.ListInstances():
650     return (False, 'not running')
651
652   for idx in range(len(instance.disks)):
653     link_name = _GetBlockDevSymlinkPath(instance.name, idx)
654     if not os.path.islink(link_name):
655       return (False, 'not restarted since ganeti 1.2.5')
656
657   return (True, '')
658
659
660 def GetAllInstancesInfo(hypervisor_list):
661   """Gather data about all instances.
662
663   This is the equivalent of L{GetInstanceInfo}, except that it
664   computes data for all instances at once, thus being faster if one
665   needs data about more than one instance.
666
667   @type hypervisor_list: list
668   @param hypervisor_list: list of hypervisors to query for instance data
669
670   @rtype: dict
671   @return: dictionary of instance: data, with data having the following keys:
672       - memory: memory size of instance (int)
673       - state: xen state of instance (string)
674       - time: cpu time of instance (float)
675       - vcpus: the number of vcpus
676
677   """
678   output = {}
679
680   for hname in hypervisor_list:
681     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
682     if iinfo:
683       for name, inst_id, memory, vcpus, state, times in iinfo:
684         value = {
685           'memory': memory,
686           'vcpus': vcpus,
687           'state': state,
688           'time': times,
689           }
690         if name in output:
691           # we only check static parameters, like memory and vcpus,
692           # and not state and time which can change between the
693           # invocations of the different hypervisors
694           for key in 'memory', 'vcpus':
695             if value[key] != output[name][key]:
696               raise errors.HypervisorError("Instance %s is running twice"
697                                            " with different parameters" % name)
698         output[name] = value
699
700   return output
701
702
703 def InstanceOsAdd(instance, reinstall):
704   """Add an OS to an instance.
705
706   @type instance: L{objects.Instance}
707   @param instance: Instance whose OS is to be installed
708   @type reinstall: boolean
709   @param reinstall: whether this is an instance reinstall
710   @rtype: boolean
711   @return: the success of the operation
712
713   """
714   try:
715     inst_os = OSFromDisk(instance.os)
716   except errors.InvalidOS, err:
717     os_name, os_dir, os_err = err.args
718     if os_dir is None:
719       return (False, "Can't find OS '%s': %s" % (os_name, os_err))
720     else:
721       return (False, "Error parsing OS '%s' in directory %s: %s" %
722               (os_name, os_dir, os_err))
723
724   create_env = OSEnvironment(instance)
725   if reinstall:
726     create_env['INSTANCE_REINSTALL'] = "1"
727
728   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
729                                      instance.name, int(time.time()))
730
731   result = utils.RunCmd([inst_os.create_script], env=create_env,
732                         cwd=inst_os.path, output=logfile,)
733   if result.failed:
734     logging.error("os create command '%s' returned error: %s, logfile: %s,"
735                   " output: %s", result.cmd, result.fail_reason, logfile,
736                   result.output)
737     lines = [utils.SafeEncode(val)
738              for val in utils.TailFile(logfile, lines=20)]
739     return (False, "OS create script failed (%s), last lines in the"
740             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
741
742   return (True, "Successfully installed")
743
744
745 def RunRenameInstance(instance, old_name):
746   """Run the OS rename script for an instance.
747
748   @type instance: L{objects.Instance}
749   @param instance: Instance whose OS is to be installed
750   @type old_name: string
751   @param old_name: previous instance name
752   @rtype: boolean
753   @return: the success of the operation
754
755   """
756   inst_os = OSFromDisk(instance.os)
757
758   rename_env = OSEnvironment(instance)
759   rename_env['OLD_INSTANCE_NAME'] = old_name
760
761   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
762                                            old_name,
763                                            instance.name, int(time.time()))
764
765   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
766                         cwd=inst_os.path, output=logfile)
767
768   if result.failed:
769     logging.error("os create command '%s' returned error: %s output: %s",
770                   result.cmd, result.fail_reason, result.output)
771     lines = [utils.SafeEncode(val)
772              for val in utils.TailFile(logfile, lines=20)]
773     return (False, "OS rename script failed (%s), last lines in the"
774             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
775
776   return (True, "Rename successful")
777
778
779 def _GetVGInfo(vg_name):
780   """Get informations about the volume group.
781
782   @type vg_name: str
783   @param vg_name: the volume group which we query
784   @rtype: dict
785   @return:
786     A dictionary with the following keys:
787       - C{vg_size} is the total size of the volume group in MiB
788       - C{vg_free} is the free size of the volume group in MiB
789       - C{pv_count} are the number of physical disks in that VG
790
791     If an error occurs during gathering of data, we return the same dict
792     with keys all set to None.
793
794   """
795   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
796
797   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
798                          "--nosuffix", "--units=m", "--separator=:", vg_name])
799
800   if retval.failed:
801     logging.error("volume group %s not present", vg_name)
802     return retdic
803   valarr = retval.stdout.strip().rstrip(':').split(':')
804   if len(valarr) == 3:
805     try:
806       retdic = {
807         "vg_size": int(round(float(valarr[0]), 0)),
808         "vg_free": int(round(float(valarr[1]), 0)),
809         "pv_count": int(valarr[2]),
810         }
811     except ValueError, err:
812       logging.exception("Fail to parse vgs output")
813   else:
814     logging.error("vgs output has the wrong number of fields (expected"
815                   " three): %s", str(valarr))
816   return retdic
817
818
819 def _GetBlockDevSymlinkPath(instance_name, idx):
820   return os.path.join(constants.DISK_LINKS_DIR,
821                       "%s:%d" % (instance_name, idx))
822
823
824 def _SymlinkBlockDev(instance_name, device_path, idx):
825   """Set up symlinks to a instance's block device.
826
827   This is an auxiliary function run when an instance is start (on the primary
828   node) or when an instance is migrated (on the target node).
829
830
831   @param instance_name: the name of the target instance
832   @param device_path: path of the physical block device, on the node
833   @param idx: the disk index
834   @return: absolute path to the disk's symlink
835
836   """
837   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
838   try:
839     os.symlink(device_path, link_name)
840   except OSError, err:
841     if err.errno == errno.EEXIST:
842       if (not os.path.islink(link_name) or
843           os.readlink(link_name) != device_path):
844         os.remove(link_name)
845         os.symlink(device_path, link_name)
846     else:
847       raise
848
849   return link_name
850
851
852 def _RemoveBlockDevLinks(instance_name, disks):
853   """Remove the block device symlinks belonging to the given instance.
854
855   """
856   for idx, disk in enumerate(disks):
857     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
858     if os.path.islink(link_name):
859       try:
860         os.remove(link_name)
861       except OSError:
862         logging.exception("Can't remove symlink '%s'", link_name)
863
864
865 def _GatherAndLinkBlockDevs(instance):
866   """Set up an instance's block device(s).
867
868   This is run on the primary node at instance startup. The block
869   devices must be already assembled.
870
871   @type instance: L{objects.Instance}
872   @param instance: the instance whose disks we shoul assemble
873   @rtype: list
874   @return: list of (disk_object, device_path)
875
876   """
877   block_devices = []
878   for idx, disk in enumerate(instance.disks):
879     device = _RecursiveFindBD(disk)
880     if device is None:
881       raise errors.BlockDeviceError("Block device '%s' is not set up." %
882                                     str(disk))
883     device.Open()
884     try:
885       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
886     except OSError, e:
887       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
888                                     e.strerror)
889
890     block_devices.append((disk, link_name))
891
892   return block_devices
893
894
895 def StartInstance(instance):
896   """Start an instance.
897
898   @type instance: L{objects.Instance}
899   @param instance: the instance object
900   @rtype: boolean
901   @return: whether the startup was successful or not
902
903   """
904   running_instances = GetInstanceList([instance.hypervisor])
905
906   if instance.name in running_instances:
907     return (True, "Already running")
908
909   try:
910     block_devices = _GatherAndLinkBlockDevs(instance)
911     hyper = hypervisor.GetHypervisor(instance.hypervisor)
912     hyper.StartInstance(instance, block_devices)
913   except errors.BlockDeviceError, err:
914     _Fail("Block device error: %s", err, exc=True)
915   except errors.HypervisorError, err:
916     _RemoveBlockDevLinks(instance.name, instance.disks)
917     _Fail("Hypervisor error: %s", err, exc=True)
918
919   return (True, "Instance started successfully")
920
921
922 def InstanceShutdown(instance):
923   """Shut an instance down.
924
925   @note: this functions uses polling with a hardcoded timeout.
926
927   @type instance: L{objects.Instance}
928   @param instance: the instance object
929   @rtype: boolean
930   @return: whether the startup was successful or not
931
932   """
933   hv_name = instance.hypervisor
934   running_instances = GetInstanceList([hv_name])
935
936   if instance.name not in running_instances:
937     return (True, "Instance already stopped")
938
939   hyper = hypervisor.GetHypervisor(hv_name)
940   try:
941     hyper.StopInstance(instance)
942   except errors.HypervisorError, err:
943     _Fail("Failed to stop instance %s: %s", instance.name, err)
944
945   # test every 10secs for 2min
946
947   time.sleep(1)
948   for dummy in range(11):
949     if instance.name not in GetInstanceList([hv_name]):
950       break
951     time.sleep(10)
952   else:
953     # the shutdown did not succeed
954     logging.error("Shutdown of '%s' unsuccessful, using destroy",
955                   instance.name)
956
957     try:
958       hyper.StopInstance(instance, force=True)
959     except errors.HypervisorError, err:
960       _Fail("Failed to force stop instance %s: %s", instance.name, err)
961
962     time.sleep(1)
963     if instance.name in GetInstanceList([hv_name]):
964       _Fail("Could not shutdown instance %s even by destroy", instance.name)
965
966   _RemoveBlockDevLinks(instance.name, instance.disks)
967
968   return (True, "Instance has been shutdown successfully")
969
970
971 def InstanceReboot(instance, reboot_type):
972   """Reboot an instance.
973
974   @type instance: L{objects.Instance}
975   @param instance: the instance object to reboot
976   @type reboot_type: str
977   @param reboot_type: the type of reboot, one the following
978     constants:
979       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
980         instance OS, do not recreate the VM
981       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
982         restart the VM (at the hypervisor level)
983       - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
984         is not accepted here, since that mode is handled
985         differently
986   @rtype: boolean
987   @return: the success of the operation
988
989   """
990   running_instances = GetInstanceList([instance.hypervisor])
991
992   if instance.name not in running_instances:
993     _Fail("Cannot reboot instance %s that is not running", instance.name)
994
995   hyper = hypervisor.GetHypervisor(instance.hypervisor)
996   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
997     try:
998       hyper.RebootInstance(instance)
999     except errors.HypervisorError, err:
1000       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1001   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1002     try:
1003       stop_result = InstanceShutdown(instance)
1004       if not stop_result[0]:
1005         return stop_result
1006       return StartInstance(instance)
1007     except errors.HypervisorError, err:
1008       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1009   else:
1010     _Fail("Invalid reboot_type received: %s", reboot_type)
1011
1012   return (True, "Reboot successful")
1013
1014
1015 def MigrationInfo(instance):
1016   """Gather information about an instance to be migrated.
1017
1018   @type instance: L{objects.Instance}
1019   @param instance: the instance definition
1020
1021   """
1022   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1023   try:
1024     info = hyper.MigrationInfo(instance)
1025   except errors.HypervisorError, err:
1026     _Fail("Failed to fetch migration information: %s", err, exc=True)
1027   return (True, info)
1028
1029
1030 def AcceptInstance(instance, info, target):
1031   """Prepare the node to accept an instance.
1032
1033   @type instance: L{objects.Instance}
1034   @param instance: the instance definition
1035   @type info: string/data (opaque)
1036   @param info: migration information, from the source node
1037   @type target: string
1038   @param target: target host (usually ip), on this node
1039
1040   """
1041   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1042   try:
1043     hyper.AcceptInstance(instance, info, target)
1044   except errors.HypervisorError, err:
1045     _Fail("Failed to accept instance: %s", err, exc=True)
1046   return (True, "Accept successfull")
1047
1048
1049 def FinalizeMigration(instance, info, success):
1050   """Finalize any preparation to accept an instance.
1051
1052   @type instance: L{objects.Instance}
1053   @param instance: the instance definition
1054   @type info: string/data (opaque)
1055   @param info: migration information, from the source node
1056   @type success: boolean
1057   @param success: whether the migration was a success or a failure
1058
1059   """
1060   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1061   try:
1062     hyper.FinalizeMigration(instance, info, success)
1063   except errors.HypervisorError, err:
1064     _Fail("Failed to finalize migration: %s", err, exc=True)
1065   return (True, "Migration Finalized")
1066
1067
1068 def MigrateInstance(instance, target, live):
1069   """Migrates an instance to another node.
1070
1071   @type instance: L{objects.Instance}
1072   @param instance: the instance definition
1073   @type target: string
1074   @param target: the target node name
1075   @type live: boolean
1076   @param live: whether the migration should be done live or not (the
1077       interpretation of this parameter is left to the hypervisor)
1078   @rtype: tuple
1079   @return: a tuple of (success, msg) where:
1080       - succes is a boolean denoting the success/failure of the operation
1081       - msg is a string with details in case of failure
1082
1083   """
1084   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1085
1086   try:
1087     hyper.MigrateInstance(instance.name, target, live)
1088   except errors.HypervisorError, err:
1089     _Fail("Failed to migrate instance: %s", err, exc=True)
1090   return (True, "Migration successfull")
1091
1092
1093 def BlockdevCreate(disk, size, owner, on_primary, info):
1094   """Creates a block device for an instance.
1095
1096   @type disk: L{objects.Disk}
1097   @param disk: the object describing the disk we should create
1098   @type size: int
1099   @param size: the size of the physical underlying device, in MiB
1100   @type owner: str
1101   @param owner: the name of the instance for which disk is created,
1102       used for device cache data
1103   @type on_primary: boolean
1104   @param on_primary:  indicates if it is the primary node or not
1105   @type info: string
1106   @param info: string that will be sent to the physical device
1107       creation, used for example to set (LVM) tags on LVs
1108
1109   @return: the new unique_id of the device (this can sometime be
1110       computed only after creation), or None. On secondary nodes,
1111       it's not required to return anything.
1112
1113   """
1114   clist = []
1115   if disk.children:
1116     for child in disk.children:
1117       try:
1118         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1119       except errors.BlockDeviceError, err:
1120         _Fail("Can't assemble device %s: %s", child, err)
1121       if on_primary or disk.AssembleOnSecondary():
1122         # we need the children open in case the device itself has to
1123         # be assembled
1124         try:
1125           crdev.Open()
1126         except errors.BlockDeviceError, err:
1127           _Fail("Can't make child '%s' read-write: %s", child, err)
1128       clist.append(crdev)
1129
1130   try:
1131     device = bdev.Create(disk.dev_type, disk.physical_id, clist, size)
1132   except errors.BlockDeviceError, err:
1133     _Fail("Can't create block device: %s", err)
1134
1135   if on_primary or disk.AssembleOnSecondary():
1136     try:
1137       device.Assemble()
1138     except errors.BlockDeviceError, err:
1139       _Fail("Can't assemble device after creation, unusual event: %s", err)
1140     device.SetSyncSpeed(constants.SYNC_SPEED)
1141     if on_primary or disk.OpenOnSecondary():
1142       try:
1143         device.Open(force=True)
1144       except errors.BlockDeviceError, err:
1145         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1146     DevCacheManager.UpdateCache(device.dev_path, owner,
1147                                 on_primary, disk.iv_name)
1148
1149   device.SetInfo(info)
1150
1151   physical_id = device.unique_id
1152   return True, physical_id
1153
1154
1155 def BlockdevRemove(disk):
1156   """Remove a block device.
1157
1158   @note: This is intended to be called recursively.
1159
1160   @type disk: L{objects.Disk}
1161   @param disk: the disk object we should remove
1162   @rtype: boolean
1163   @return: the success of the operation
1164
1165   """
1166   msgs = []
1167   result = True
1168   try:
1169     rdev = _RecursiveFindBD(disk)
1170   except errors.BlockDeviceError, err:
1171     # probably can't attach
1172     logging.info("Can't attach to device %s in remove", disk)
1173     rdev = None
1174   if rdev is not None:
1175     r_path = rdev.dev_path
1176     try:
1177       rdev.Remove()
1178     except errors.BlockDeviceError, err:
1179       msgs.append(str(err))
1180       result = False
1181     if result:
1182       DevCacheManager.RemoveCache(r_path)
1183
1184   if disk.children:
1185     for child in disk.children:
1186       c_status, c_msg = BlockdevRemove(child)
1187       result = result and c_status
1188       if c_msg: # not an empty message
1189         msgs.append(c_msg)
1190
1191   return (result, "; ".join(msgs))
1192
1193
1194 def _RecursiveAssembleBD(disk, owner, as_primary):
1195   """Activate a block device for an instance.
1196
1197   This is run on the primary and secondary nodes for an instance.
1198
1199   @note: this function is called recursively.
1200
1201   @type disk: L{objects.Disk}
1202   @param disk: the disk we try to assemble
1203   @type owner: str
1204   @param owner: the name of the instance which owns the disk
1205   @type as_primary: boolean
1206   @param as_primary: if we should make the block device
1207       read/write
1208
1209   @return: the assembled device or None (in case no device
1210       was assembled)
1211   @raise errors.BlockDeviceError: in case there is an error
1212       during the activation of the children or the device
1213       itself
1214
1215   """
1216   children = []
1217   if disk.children:
1218     mcn = disk.ChildrenNeeded()
1219     if mcn == -1:
1220       mcn = 0 # max number of Nones allowed
1221     else:
1222       mcn = len(disk.children) - mcn # max number of Nones
1223     for chld_disk in disk.children:
1224       try:
1225         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1226       except errors.BlockDeviceError, err:
1227         if children.count(None) >= mcn:
1228           raise
1229         cdev = None
1230         logging.error("Error in child activation (but continuing): %s",
1231                       str(err))
1232       children.append(cdev)
1233
1234   if as_primary or disk.AssembleOnSecondary():
1235     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children)
1236     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1237     result = r_dev
1238     if as_primary or disk.OpenOnSecondary():
1239       r_dev.Open()
1240     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1241                                 as_primary, disk.iv_name)
1242
1243   else:
1244     result = True
1245   return result
1246
1247
1248 def BlockdevAssemble(disk, owner, as_primary):
1249   """Activate a block device for an instance.
1250
1251   This is a wrapper over _RecursiveAssembleBD.
1252
1253   @rtype: str or boolean
1254   @return: a C{/dev/...} path for primary nodes, and
1255       C{True} for secondary nodes
1256
1257   """
1258   status = True
1259   result = "no error information"
1260   try:
1261     result = _RecursiveAssembleBD(disk, owner, as_primary)
1262     if isinstance(result, bdev.BlockDev):
1263       result = result.dev_path
1264   except errors.BlockDeviceError, err:
1265     result = "Error while assembling disk: %s" % str(err)
1266     status = False
1267   return (status, result)
1268
1269
1270 def BlockdevShutdown(disk):
1271   """Shut down a block device.
1272
1273   First, if the device is assembled (Attach() is successfull), then
1274   the device is shutdown. Then the children of the device are
1275   shutdown.
1276
1277   This function is called recursively. Note that we don't cache the
1278   children or such, as oppossed to assemble, shutdown of different
1279   devices doesn't require that the upper device was active.
1280
1281   @type disk: L{objects.Disk}
1282   @param disk: the description of the disk we should
1283       shutdown
1284   @rtype: boolean
1285   @return: the success of the operation
1286
1287   """
1288   msgs = []
1289   result = True
1290   r_dev = _RecursiveFindBD(disk)
1291   if r_dev is not None:
1292     r_path = r_dev.dev_path
1293     try:
1294       r_dev.Shutdown()
1295       DevCacheManager.RemoveCache(r_path)
1296     except errors.BlockDeviceError, err:
1297       msgs.append(str(err))
1298       result = False
1299
1300   if disk.children:
1301     for child in disk.children:
1302       c_status, c_msg = BlockdevShutdown(child)
1303       result = result and c_status
1304       if c_msg: # not an empty message
1305         msgs.append(c_msg)
1306
1307   return (result, "; ".join(msgs))
1308
1309
1310 def BlockdevAddchildren(parent_cdev, new_cdevs):
1311   """Extend a mirrored block device.
1312
1313   @type parent_cdev: L{objects.Disk}
1314   @param parent_cdev: the disk to which we should add children
1315   @type new_cdevs: list of L{objects.Disk}
1316   @param new_cdevs: the list of children which we should add
1317   @rtype: boolean
1318   @return: the success of the operation
1319
1320   """
1321   parent_bdev = _RecursiveFindBD(parent_cdev)
1322   if parent_bdev is None:
1323     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1324   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1325   if new_bdevs.count(None) > 0:
1326     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1327   parent_bdev.AddChildren(new_bdevs)
1328   return (True, None)
1329
1330
1331 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1332   """Shrink a mirrored block device.
1333
1334   @type parent_cdev: L{objects.Disk}
1335   @param parent_cdev: the disk from which we should remove children
1336   @type new_cdevs: list of L{objects.Disk}
1337   @param new_cdevs: the list of children which we should remove
1338   @rtype: boolean
1339   @return: the success of the operation
1340
1341   """
1342   parent_bdev = _RecursiveFindBD(parent_cdev)
1343   if parent_bdev is None:
1344     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1345   devs = []
1346   for disk in new_cdevs:
1347     rpath = disk.StaticDevPath()
1348     if rpath is None:
1349       bd = _RecursiveFindBD(disk)
1350       if bd is None:
1351         _Fail("Can't find device %s while removing children", disk)
1352       else:
1353         devs.append(bd.dev_path)
1354     else:
1355       devs.append(rpath)
1356   parent_bdev.RemoveChildren(devs)
1357   return (True, None)
1358
1359
1360 def BlockdevGetmirrorstatus(disks):
1361   """Get the mirroring status of a list of devices.
1362
1363   @type disks: list of L{objects.Disk}
1364   @param disks: the list of disks which we should query
1365   @rtype: disk
1366   @return:
1367       a list of (mirror_done, estimated_time) tuples, which
1368       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1369   @raise errors.BlockDeviceError: if any of the disks cannot be
1370       found
1371
1372   """
1373   stats = []
1374   for dsk in disks:
1375     rbd = _RecursiveFindBD(dsk)
1376     if rbd is None:
1377       _Fail("Can't find device %s", dsk)
1378     stats.append(rbd.CombinedSyncStatus())
1379   return True, stats
1380
1381
1382 def _RecursiveFindBD(disk):
1383   """Check if a device is activated.
1384
1385   If so, return informations about the real device.
1386
1387   @type disk: L{objects.Disk}
1388   @param disk: the disk object we need to find
1389
1390   @return: None if the device can't be found,
1391       otherwise the device instance
1392
1393   """
1394   children = []
1395   if disk.children:
1396     for chdisk in disk.children:
1397       children.append(_RecursiveFindBD(chdisk))
1398
1399   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1400
1401
1402 def BlockdevFind(disk):
1403   """Check if a device is activated.
1404
1405   If it is, return informations about the real device.
1406
1407   @type disk: L{objects.Disk}
1408   @param disk: the disk to find
1409   @rtype: None or tuple
1410   @return: None if the disk cannot be found, otherwise a
1411       tuple (device_path, major, minor, sync_percent,
1412       estimated_time, is_degraded)
1413
1414   """
1415   try:
1416     rbd = _RecursiveFindBD(disk)
1417   except errors.BlockDeviceError, err:
1418     _Fail("Failed to find device: %s", err, exc=True)
1419   if rbd is None:
1420     return (True, None)
1421   return (True, (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus())
1422
1423
1424 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1425   """Write a file to the filesystem.
1426
1427   This allows the master to overwrite(!) a file. It will only perform
1428   the operation if the file belongs to a list of configuration files.
1429
1430   @type file_name: str
1431   @param file_name: the target file name
1432   @type data: str
1433   @param data: the new contents of the file
1434   @type mode: int
1435   @param mode: the mode to give the file (can be None)
1436   @type uid: int
1437   @param uid: the owner of the file (can be -1 for default)
1438   @type gid: int
1439   @param gid: the group of the file (can be -1 for default)
1440   @type atime: float
1441   @param atime: the atime to set on the file (can be None)
1442   @type mtime: float
1443   @param mtime: the mtime to set on the file (can be None)
1444   @rtype: boolean
1445   @return: the success of the operation; errors are logged
1446       in the node daemon log
1447
1448   """
1449   if not os.path.isabs(file_name):
1450     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1451
1452   allowed_files = set([
1453     constants.CLUSTER_CONF_FILE,
1454     constants.ETC_HOSTS,
1455     constants.SSH_KNOWN_HOSTS_FILE,
1456     constants.VNC_PASSWORD_FILE,
1457     constants.RAPI_CERT_FILE,
1458     constants.RAPI_USERS_FILE,
1459     ])
1460
1461   for hv_name in constants.HYPER_TYPES:
1462     hv_class = hypervisor.GetHypervisor(hv_name)
1463     allowed_files.update(hv_class.GetAncillaryFiles())
1464
1465   if file_name not in allowed_files:
1466     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1467           file_name)
1468
1469   raw_data = _Decompress(data)
1470
1471   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1472                   atime=atime, mtime=mtime)
1473   return (True, "success")
1474
1475
1476 def WriteSsconfFiles(values):
1477   """Update all ssconf files.
1478
1479   Wrapper around the SimpleStore.WriteFiles.
1480
1481   """
1482   ssconf.SimpleStore().WriteFiles(values)
1483
1484
1485 def _ErrnoOrStr(err):
1486   """Format an EnvironmentError exception.
1487
1488   If the L{err} argument has an errno attribute, it will be looked up
1489   and converted into a textual C{E...} description. Otherwise the
1490   string representation of the error will be returned.
1491
1492   @type err: L{EnvironmentError}
1493   @param err: the exception to format
1494
1495   """
1496   if hasattr(err, 'errno'):
1497     detail = errno.errorcode[err.errno]
1498   else:
1499     detail = str(err)
1500   return detail
1501
1502
1503 def _OSOndiskVersion(name, os_dir):
1504   """Compute and return the API version of a given OS.
1505
1506   This function will try to read the API version of the OS given by
1507   the 'name' parameter and residing in the 'os_dir' directory.
1508
1509   @type name: str
1510   @param name: the OS name we should look for
1511   @type os_dir: str
1512   @param os_dir: the directory inwhich we should look for the OS
1513   @rtype: int or None
1514   @return:
1515       Either an integer denoting the version or None in the
1516       case when this is not a valid OS name.
1517   @raise errors.InvalidOS: if the OS cannot be found
1518
1519   """
1520   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1521
1522   try:
1523     st = os.stat(api_file)
1524   except EnvironmentError, err:
1525     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1526                            " found (%s)" % _ErrnoOrStr(err))
1527
1528   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1529     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1530                            " a regular file")
1531
1532   try:
1533     f = open(api_file)
1534     try:
1535       api_versions = f.readlines()
1536     finally:
1537       f.close()
1538   except EnvironmentError, err:
1539     raise errors.InvalidOS(name, os_dir, "error while reading the"
1540                            " API version (%s)" % _ErrnoOrStr(err))
1541
1542   api_versions = [version.strip() for version in api_versions]
1543   try:
1544     api_versions = [int(version) for version in api_versions]
1545   except (TypeError, ValueError), err:
1546     raise errors.InvalidOS(name, os_dir,
1547                            "API version is not integer (%s)" % str(err))
1548
1549   return api_versions
1550
1551
1552 def DiagnoseOS(top_dirs=None):
1553   """Compute the validity for all OSes.
1554
1555   @type top_dirs: list
1556   @param top_dirs: the list of directories in which to
1557       search (if not given defaults to
1558       L{constants.OS_SEARCH_PATH})
1559   @rtype: list of L{objects.OS}
1560   @return: an OS object for each name in all the given
1561       directories
1562
1563   """
1564   if top_dirs is None:
1565     top_dirs = constants.OS_SEARCH_PATH
1566
1567   result = []
1568   for dir_name in top_dirs:
1569     if os.path.isdir(dir_name):
1570       try:
1571         f_names = utils.ListVisibleFiles(dir_name)
1572       except EnvironmentError, err:
1573         logging.exception("Can't list the OS directory %s", dir_name)
1574         break
1575       for name in f_names:
1576         try:
1577           os_inst = OSFromDisk(name, base_dir=dir_name)
1578           result.append(os_inst)
1579         except errors.InvalidOS, err:
1580           result.append(objects.OS.FromInvalidOS(err))
1581
1582   return result
1583
1584
1585 def OSFromDisk(name, base_dir=None):
1586   """Create an OS instance from disk.
1587
1588   This function will return an OS instance if the given name is a
1589   valid OS name. Otherwise, it will raise an appropriate
1590   L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1591
1592   @type base_dir: string
1593   @keyword base_dir: Base directory containing OS installations.
1594                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1595   @rtype: L{objects.OS}
1596   @return: the OS instance if we find a valid one
1597   @raise errors.InvalidOS: if we don't find a valid OS
1598
1599   """
1600   if base_dir is None:
1601     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1602     if os_dir is None:
1603       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1604   else:
1605     os_dir = os.path.sep.join([base_dir, name])
1606
1607   api_versions = _OSOndiskVersion(name, os_dir)
1608
1609   if constants.OS_API_VERSION not in api_versions:
1610     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1611                            " (found %s want %s)"
1612                            % (api_versions, constants.OS_API_VERSION))
1613
1614   # OS Scripts dictionary, we will populate it with the actual script names
1615   os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1616
1617   for script in os_scripts:
1618     os_scripts[script] = os.path.sep.join([os_dir, script])
1619
1620     try:
1621       st = os.stat(os_scripts[script])
1622     except EnvironmentError, err:
1623       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1624                              (script, _ErrnoOrStr(err)))
1625
1626     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1627       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1628                              script)
1629
1630     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1631       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1632                              script)
1633
1634
1635   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1636                     create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1637                     export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1638                     import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1639                     rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1640                     api_versions=api_versions)
1641
1642 def OSEnvironment(instance, debug=0):
1643   """Calculate the environment for an os script.
1644
1645   @type instance: L{objects.Instance}
1646   @param instance: target instance for the os script run
1647   @type debug: integer
1648   @param debug: debug level (0 or 1, for OS Api 10)
1649   @rtype: dict
1650   @return: dict of environment variables
1651   @raise errors.BlockDeviceError: if the block device
1652       cannot be found
1653
1654   """
1655   result = {}
1656   result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1657   result['INSTANCE_NAME'] = instance.name
1658   result['INSTANCE_OS'] = instance.os
1659   result['HYPERVISOR'] = instance.hypervisor
1660   result['DISK_COUNT'] = '%d' % len(instance.disks)
1661   result['NIC_COUNT'] = '%d' % len(instance.nics)
1662   result['DEBUG_LEVEL'] = '%d' % debug
1663   for idx, disk in enumerate(instance.disks):
1664     real_disk = _RecursiveFindBD(disk)
1665     if real_disk is None:
1666       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1667                                     str(disk))
1668     real_disk.Open()
1669     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1670     result['DISK_%d_ACCESS' % idx] = disk.mode
1671     if constants.HV_DISK_TYPE in instance.hvparams:
1672       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1673         instance.hvparams[constants.HV_DISK_TYPE]
1674     if disk.dev_type in constants.LDS_BLOCK:
1675       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1676     elif disk.dev_type == constants.LD_FILE:
1677       result['DISK_%d_BACKEND_TYPE' % idx] = \
1678         'file:%s' % disk.physical_id[0]
1679   for idx, nic in enumerate(instance.nics):
1680     result['NIC_%d_MAC' % idx] = nic.mac
1681     if nic.ip:
1682       result['NIC_%d_IP' % idx] = nic.ip
1683     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1684     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1685       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1686     if nic.nicparams[constants.NIC_LINK]:
1687       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1688     if constants.HV_NIC_TYPE in instance.hvparams:
1689       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1690         instance.hvparams[constants.HV_NIC_TYPE]
1691
1692   return result
1693
1694 def BlockdevGrow(disk, amount):
1695   """Grow a stack of block devices.
1696
1697   This function is called recursively, with the childrens being the
1698   first ones to resize.
1699
1700   @type disk: L{objects.Disk}
1701   @param disk: the disk to be grown
1702   @rtype: (status, result)
1703   @return: a tuple with the status of the operation
1704       (True/False), and the errors message if status
1705       is False
1706
1707   """
1708   r_dev = _RecursiveFindBD(disk)
1709   if r_dev is None:
1710     return False, "Cannot find block device %s" % (disk,)
1711
1712   try:
1713     r_dev.Grow(amount)
1714   except errors.BlockDeviceError, err:
1715     _Fail("Failed to grow block device: %s", err, exc=True)
1716
1717   return True, None
1718
1719
1720 def BlockdevSnapshot(disk):
1721   """Create a snapshot copy of a block device.
1722
1723   This function is called recursively, and the snapshot is actually created
1724   just for the leaf lvm backend device.
1725
1726   @type disk: L{objects.Disk}
1727   @param disk: the disk to be snapshotted
1728   @rtype: string
1729   @return: snapshot disk path
1730
1731   """
1732   if disk.children:
1733     if len(disk.children) == 1:
1734       # only one child, let's recurse on it
1735       return BlockdevSnapshot(disk.children[0])
1736     else:
1737       # more than one child, choose one that matches
1738       for child in disk.children:
1739         if child.size == disk.size:
1740           # return implies breaking the loop
1741           return BlockdevSnapshot(child)
1742   elif disk.dev_type == constants.LD_LV:
1743     r_dev = _RecursiveFindBD(disk)
1744     if r_dev is not None:
1745       # let's stay on the safe side and ask for the full size, for now
1746       return True, r_dev.Snapshot(disk.size)
1747     else:
1748       _Fail("Cannot find block device %s", disk)
1749   else:
1750     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1751           disk.unique_id, disk.dev_type)
1752
1753
1754 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1755   """Export a block device snapshot to a remote node.
1756
1757   @type disk: L{objects.Disk}
1758   @param disk: the description of the disk to export
1759   @type dest_node: str
1760   @param dest_node: the destination node to export to
1761   @type instance: L{objects.Instance}
1762   @param instance: the instance object to whom the disk belongs
1763   @type cluster_name: str
1764   @param cluster_name: the cluster name, needed for SSH hostalias
1765   @type idx: int
1766   @param idx: the index of the disk in the instance's disk list,
1767       used to export to the OS scripts environment
1768   @rtype: boolean
1769   @return: the success of the operation
1770
1771   """
1772   export_env = OSEnvironment(instance)
1773
1774   inst_os = OSFromDisk(instance.os)
1775   export_script = inst_os.export_script
1776
1777   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1778                                      instance.name, int(time.time()))
1779   if not os.path.exists(constants.LOG_OS_DIR):
1780     os.mkdir(constants.LOG_OS_DIR, 0750)
1781   real_disk = _RecursiveFindBD(disk)
1782   if real_disk is None:
1783     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1784                                   str(disk))
1785   real_disk.Open()
1786
1787   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1788   export_env['EXPORT_INDEX'] = str(idx)
1789
1790   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1791   destfile = disk.physical_id[1]
1792
1793   # the target command is built out of three individual commands,
1794   # which are joined by pipes; we check each individual command for
1795   # valid parameters
1796   expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1797                                export_script, logfile)
1798
1799   comprcmd = "gzip"
1800
1801   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1802                                 destdir, destdir, destfile)
1803   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1804                                                    constants.GANETI_RUNAS,
1805                                                    destcmd)
1806
1807   # all commands have been checked, so we're safe to combine them
1808   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1809
1810   result = utils.RunCmd(command, env=export_env)
1811
1812   if result.failed:
1813     logging.error("os snapshot export command '%s' returned error: %s"
1814                   " output: %s", command, result.fail_reason, result.output)
1815     return False
1816
1817   return True
1818
1819
1820 def FinalizeExport(instance, snap_disks):
1821   """Write out the export configuration information.
1822
1823   @type instance: L{objects.Instance}
1824   @param instance: the instance which we export, used for
1825       saving configuration
1826   @type snap_disks: list of L{objects.Disk}
1827   @param snap_disks: list of snapshot block devices, which
1828       will be used to get the actual name of the dump file
1829
1830   @rtype: boolean
1831   @return: the success of the operation
1832
1833   """
1834   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1835   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1836
1837   config = objects.SerializableConfigParser()
1838
1839   config.add_section(constants.INISECT_EXP)
1840   config.set(constants.INISECT_EXP, 'version', '0')
1841   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1842   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1843   config.set(constants.INISECT_EXP, 'os', instance.os)
1844   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1845
1846   config.add_section(constants.INISECT_INS)
1847   config.set(constants.INISECT_INS, 'name', instance.name)
1848   config.set(constants.INISECT_INS, 'memory', '%d' %
1849              instance.beparams[constants.BE_MEMORY])
1850   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1851              instance.beparams[constants.BE_VCPUS])
1852   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1853
1854   nic_total = 0
1855   for nic_count, nic in enumerate(instance.nics):
1856     nic_total += 1
1857     config.set(constants.INISECT_INS, 'nic%d_mac' %
1858                nic_count, '%s' % nic.mac)
1859     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1860     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1861                '%s' % nic.bridge)
1862   # TODO: redundant: on load can read nics until it doesn't exist
1863   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1864
1865   disk_total = 0
1866   for disk_count, disk in enumerate(snap_disks):
1867     if disk:
1868       disk_total += 1
1869       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1870                  ('%s' % disk.iv_name))
1871       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1872                  ('%s' % disk.physical_id[1]))
1873       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1874                  ('%d' % disk.size))
1875
1876   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1877
1878   utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1879                   data=config.Dumps())
1880   shutil.rmtree(finaldestdir, True)
1881   shutil.move(destdir, finaldestdir)
1882
1883   return True
1884
1885
1886 def ExportInfo(dest):
1887   """Get export configuration information.
1888
1889   @type dest: str
1890   @param dest: directory containing the export
1891
1892   @rtype: L{objects.SerializableConfigParser}
1893   @return: a serializable config file containing the
1894       export info
1895
1896   """
1897   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1898
1899   config = objects.SerializableConfigParser()
1900   config.read(cff)
1901
1902   if (not config.has_section(constants.INISECT_EXP) or
1903       not config.has_section(constants.INISECT_INS)):
1904     return None
1905
1906   return config
1907
1908
1909 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1910   """Import an os image into an instance.
1911
1912   @type instance: L{objects.Instance}
1913   @param instance: instance to import the disks into
1914   @type src_node: string
1915   @param src_node: source node for the disk images
1916   @type src_images: list of string
1917   @param src_images: absolute paths of the disk images
1918   @rtype: list of boolean
1919   @return: each boolean represent the success of importing the n-th disk
1920
1921   """
1922   import_env = OSEnvironment(instance)
1923   inst_os = OSFromDisk(instance.os)
1924   import_script = inst_os.import_script
1925
1926   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1927                                         instance.name, int(time.time()))
1928   if not os.path.exists(constants.LOG_OS_DIR):
1929     os.mkdir(constants.LOG_OS_DIR, 0750)
1930
1931   comprcmd = "gunzip"
1932   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1933                                import_script, logfile)
1934
1935   final_result = []
1936   for idx, image in enumerate(src_images):
1937     if image:
1938       destcmd = utils.BuildShellCmd('cat %s', image)
1939       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1940                                                        constants.GANETI_RUNAS,
1941                                                        destcmd)
1942       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1943       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1944       import_env['IMPORT_INDEX'] = str(idx)
1945       result = utils.RunCmd(command, env=import_env)
1946       if result.failed:
1947         logging.error("Disk import command '%s' returned error: %s"
1948                       " output: %s", command, result.fail_reason,
1949                       result.output)
1950         final_result.append(False)
1951       else:
1952         final_result.append(True)
1953     else:
1954       final_result.append(True)
1955
1956   return final_result
1957
1958
1959 def ListExports():
1960   """Return a list of exports currently available on this machine.
1961
1962   @rtype: list
1963   @return: list of the exports
1964
1965   """
1966   if os.path.isdir(constants.EXPORT_DIR):
1967     return utils.ListVisibleFiles(constants.EXPORT_DIR)
1968   else:
1969     return []
1970
1971
1972 def RemoveExport(export):
1973   """Remove an existing export from the node.
1974
1975   @type export: str
1976   @param export: the name of the export to remove
1977   @rtype: boolean
1978   @return: the success of the operation
1979
1980   """
1981   target = os.path.join(constants.EXPORT_DIR, export)
1982
1983   shutil.rmtree(target)
1984   # TODO: catch some of the relevant exceptions and provide a pretty
1985   # error message if rmtree fails.
1986
1987   return True
1988
1989
1990 def BlockdevRename(devlist):
1991   """Rename a list of block devices.
1992
1993   @type devlist: list of tuples
1994   @param devlist: list of tuples of the form  (disk,
1995       new_logical_id, new_physical_id); disk is an
1996       L{objects.Disk} object describing the current disk,
1997       and new logical_id/physical_id is the name we
1998       rename it to
1999   @rtype: boolean
2000   @return: True if all renames succeeded, False otherwise
2001
2002   """
2003   msgs = []
2004   result = True
2005   for disk, unique_id in devlist:
2006     dev = _RecursiveFindBD(disk)
2007     if dev is None:
2008       msgs.append("Can't find device %s in rename" % str(disk))
2009       result = False
2010       continue
2011     try:
2012       old_rpath = dev.dev_path
2013       dev.Rename(unique_id)
2014       new_rpath = dev.dev_path
2015       if old_rpath != new_rpath:
2016         DevCacheManager.RemoveCache(old_rpath)
2017         # FIXME: we should add the new cache information here, like:
2018         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2019         # but we don't have the owner here - maybe parse from existing
2020         # cache? for now, we only lose lvm data when we rename, which
2021         # is less critical than DRBD or MD
2022     except errors.BlockDeviceError, err:
2023       msgs.append("Can't rename device '%s' to '%s': %s" %
2024                   (dev, unique_id, err))
2025       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2026       result = False
2027   return (result, "; ".join(msgs))
2028
2029
2030 def _TransformFileStorageDir(file_storage_dir):
2031   """Checks whether given file_storage_dir is valid.
2032
2033   Checks wheter the given file_storage_dir is within the cluster-wide
2034   default file_storage_dir stored in SimpleStore. Only paths under that
2035   directory are allowed.
2036
2037   @type file_storage_dir: str
2038   @param file_storage_dir: the path to check
2039
2040   @return: the normalized path if valid, None otherwise
2041
2042   """
2043   cfg = _GetConfig()
2044   file_storage_dir = os.path.normpath(file_storage_dir)
2045   base_file_storage_dir = cfg.GetFileStorageDir()
2046   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2047       base_file_storage_dir):
2048     logging.error("file storage directory '%s' is not under base file"
2049                   " storage directory '%s'",
2050                   file_storage_dir, base_file_storage_dir)
2051     return None
2052   return file_storage_dir
2053
2054
2055 def CreateFileStorageDir(file_storage_dir):
2056   """Create file storage directory.
2057
2058   @type file_storage_dir: str
2059   @param file_storage_dir: directory to create
2060
2061   @rtype: tuple
2062   @return: tuple with first element a boolean indicating wheter dir
2063       creation was successful or not
2064
2065   """
2066   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2067   result = True,
2068   if not file_storage_dir:
2069     result = False,
2070   else:
2071     if os.path.exists(file_storage_dir):
2072       if not os.path.isdir(file_storage_dir):
2073         logging.error("'%s' is not a directory", file_storage_dir)
2074         result = False,
2075     else:
2076       try:
2077         os.makedirs(file_storage_dir, 0750)
2078       except OSError, err:
2079         logging.error("Cannot create file storage directory '%s': %s",
2080                       file_storage_dir, err)
2081         result = False,
2082   return result
2083
2084
2085 def RemoveFileStorageDir(file_storage_dir):
2086   """Remove file storage directory.
2087
2088   Remove it only if it's empty. If not log an error and return.
2089
2090   @type file_storage_dir: str
2091   @param file_storage_dir: the directory we should cleanup
2092   @rtype: tuple (success,)
2093   @return: tuple of one element, C{success}, denoting
2094       whether the operation was successfull
2095
2096   """
2097   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2098   result = True,
2099   if not file_storage_dir:
2100     result = False,
2101   else:
2102     if os.path.exists(file_storage_dir):
2103       if not os.path.isdir(file_storage_dir):
2104         logging.error("'%s' is not a directory", file_storage_dir)
2105         result = False,
2106       # deletes dir only if empty, otherwise we want to return False
2107       try:
2108         os.rmdir(file_storage_dir)
2109       except OSError, err:
2110         logging.exception("Cannot remove file storage directory '%s'",
2111                           file_storage_dir)
2112         result = False,
2113   return result
2114
2115
2116 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2117   """Rename the file storage directory.
2118
2119   @type old_file_storage_dir: str
2120   @param old_file_storage_dir: the current path
2121   @type new_file_storage_dir: str
2122   @param new_file_storage_dir: the name we should rename to
2123   @rtype: tuple (success,)
2124   @return: tuple of one element, C{success}, denoting
2125       whether the operation was successful
2126
2127   """
2128   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2129   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2130   result = True,
2131   if not old_file_storage_dir or not new_file_storage_dir:
2132     result = False,
2133   else:
2134     if not os.path.exists(new_file_storage_dir):
2135       if os.path.isdir(old_file_storage_dir):
2136         try:
2137           os.rename(old_file_storage_dir, new_file_storage_dir)
2138         except OSError, err:
2139           logging.exception("Cannot rename '%s' to '%s'",
2140                             old_file_storage_dir, new_file_storage_dir)
2141           result =  False,
2142       else:
2143         logging.error("'%s' is not a directory", old_file_storage_dir)
2144         result = False,
2145     else:
2146       if os.path.exists(old_file_storage_dir):
2147         logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
2148                       old_file_storage_dir, new_file_storage_dir)
2149         result = False,
2150   return result
2151
2152
2153 def _IsJobQueueFile(file_name):
2154   """Checks whether the given filename is in the queue directory.
2155
2156   @type file_name: str
2157   @param file_name: the file name we should check
2158   @rtype: boolean
2159   @return: whether the file is under the queue directory
2160
2161   """
2162   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2163   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2164
2165   if not result:
2166     logging.error("'%s' is not a file in the queue directory",
2167                   file_name)
2168
2169   return result
2170
2171
2172 def JobQueueUpdate(file_name, content):
2173   """Updates a file in the queue directory.
2174
2175   This is just a wrapper over L{utils.WriteFile}, with proper
2176   checking.
2177
2178   @type file_name: str
2179   @param file_name: the job file name
2180   @type content: str
2181   @param content: the new job contents
2182   @rtype: boolean
2183   @return: the success of the operation
2184
2185   """
2186   if not _IsJobQueueFile(file_name):
2187     return False
2188
2189   # Write and replace the file atomically
2190   utils.WriteFile(file_name, data=_Decompress(content))
2191
2192   return True
2193
2194
2195 def JobQueueRename(old, new):
2196   """Renames a job queue file.
2197
2198   This is just a wrapper over os.rename with proper checking.
2199
2200   @type old: str
2201   @param old: the old (actual) file name
2202   @type new: str
2203   @param new: the desired file name
2204   @rtype: boolean
2205   @return: the success of the operation
2206
2207   """
2208   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2209     return False
2210
2211   utils.RenameFile(old, new, mkdir=True)
2212
2213   return True
2214
2215
2216 def JobQueueSetDrainFlag(drain_flag):
2217   """Set the drain flag for the queue.
2218
2219   This will set or unset the queue drain flag.
2220
2221   @type drain_flag: boolean
2222   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2223   @rtype: boolean
2224   @return: always True
2225   @warning: the function always returns True
2226
2227   """
2228   if drain_flag:
2229     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2230   else:
2231     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2232
2233   return True
2234
2235
2236 def BlockdevClose(instance_name, disks):
2237   """Closes the given block devices.
2238
2239   This means they will be switched to secondary mode (in case of
2240   DRBD).
2241
2242   @param instance_name: if the argument is not empty, the symlinks
2243       of this instance will be removed
2244   @type disks: list of L{objects.Disk}
2245   @param disks: the list of disks to be closed
2246   @rtype: tuple (success, message)
2247   @return: a tuple of success and message, where success
2248       indicates the succes of the operation, and message
2249       which will contain the error details in case we
2250       failed
2251
2252   """
2253   bdevs = []
2254   for cf in disks:
2255     rd = _RecursiveFindBD(cf)
2256     if rd is None:
2257       _Fail("Can't find device %s", cf)
2258     bdevs.append(rd)
2259
2260   msg = []
2261   for rd in bdevs:
2262     try:
2263       rd.Close()
2264     except errors.BlockDeviceError, err:
2265       msg.append(str(err))
2266   if msg:
2267     return (False, "Can't make devices secondary: %s" % ",".join(msg))
2268   else:
2269     if instance_name:
2270       _RemoveBlockDevLinks(instance_name, disks)
2271     return (True, "All devices secondary")
2272
2273
2274 def ValidateHVParams(hvname, hvparams):
2275   """Validates the given hypervisor parameters.
2276
2277   @type hvname: string
2278   @param hvname: the hypervisor name
2279   @type hvparams: dict
2280   @param hvparams: the hypervisor parameters to be validated
2281   @rtype: tuple (success, message)
2282   @return: a tuple of success and message, where success
2283       indicates the succes of the operation, and message
2284       which will contain the error details in case we
2285       failed
2286
2287   """
2288   try:
2289     hv_type = hypervisor.GetHypervisor(hvname)
2290     hv_type.ValidateParameters(hvparams)
2291     return (True, "Validation passed")
2292   except errors.HypervisorError, err:
2293     return (False, str(err))
2294
2295
2296 def DemoteFromMC():
2297   """Demotes the current node from master candidate role.
2298
2299   """
2300   # try to ensure we're not the master by mistake
2301   master, myself = ssconf.GetMasterAndMyself()
2302   if master == myself:
2303     return (False, "ssconf status shows I'm the master node, will not demote")
2304   pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2305   if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2306     return (False, "The master daemon is running, will not demote")
2307   try:
2308     utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2309   except EnvironmentError, err:
2310     if err.errno != errno.ENOENT:
2311       return (False, "Error while backing up cluster file: %s" % str(err))
2312   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2313   return (True, "Done")
2314
2315
2316 def _FindDisks(nodes_ip, disks):
2317   """Sets the physical ID on disks and returns the block devices.
2318
2319   """
2320   # set the correct physical ID
2321   my_name = utils.HostInfo().name
2322   for cf in disks:
2323     cf.SetPhysicalID(my_name, nodes_ip)
2324
2325   bdevs = []
2326
2327   for cf in disks:
2328     rd = _RecursiveFindBD(cf)
2329     if rd is None:
2330       return (False, "Can't find device %s" % cf)
2331     bdevs.append(rd)
2332   return (True, bdevs)
2333
2334
2335 def DrbdDisconnectNet(nodes_ip, disks):
2336   """Disconnects the network on a list of drbd devices.
2337
2338   """
2339   status, bdevs = _FindDisks(nodes_ip, disks)
2340   if not status:
2341     return status, bdevs
2342
2343   # disconnect disks
2344   for rd in bdevs:
2345     try:
2346       rd.DisconnectNet()
2347     except errors.BlockDeviceError, err:
2348       _Fail("Can't change network configuration to standalone mode: %s",
2349             err, exc=True)
2350   return (True, "All disks are now disconnected")
2351
2352
2353 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2354   """Attaches the network on a list of drbd devices.
2355
2356   """
2357   status, bdevs = _FindDisks(nodes_ip, disks)
2358   if not status:
2359     return status, bdevs
2360
2361   if multimaster:
2362     for idx, rd in enumerate(bdevs):
2363       try:
2364         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2365       except EnvironmentError, err:
2366         _Fail("Can't create symlink: %s", err)
2367   # reconnect disks, switch to new master configuration and if
2368   # needed primary mode
2369   for rd in bdevs:
2370     try:
2371       rd.AttachNet(multimaster)
2372     except errors.BlockDeviceError, err:
2373       _Fail("Can't change network configuration: %s", err)
2374   # wait until the disks are connected; we need to retry the re-attach
2375   # if the device becomes standalone, as this might happen if the one
2376   # node disconnects and reconnects in a different mode before the
2377   # other node reconnects; in this case, one or both of the nodes will
2378   # decide it has wrong configuration and switch to standalone
2379   RECONNECT_TIMEOUT = 2 * 60
2380   sleep_time = 0.100 # start with 100 miliseconds
2381   timeout_limit = time.time() + RECONNECT_TIMEOUT
2382   while time.time() < timeout_limit:
2383     all_connected = True
2384     for rd in bdevs:
2385       stats = rd.GetProcStatus()
2386       if not (stats.is_connected or stats.is_in_resync):
2387         all_connected = False
2388       if stats.is_standalone:
2389         # peer had different config info and this node became
2390         # standalone, even though this should not happen with the
2391         # new staged way of changing disk configs
2392         try:
2393           rd.ReAttachNet(multimaster)
2394         except errors.BlockDeviceError, err:
2395           _Fail("Can't change network configuration: %s", err)
2396     if all_connected:
2397       break
2398     time.sleep(sleep_time)
2399     sleep_time = min(5, sleep_time * 1.5)
2400   if not all_connected:
2401     return (False, "Timeout in disk reconnecting")
2402   if multimaster:
2403     # change to primary mode
2404     for rd in bdevs:
2405       try:
2406         rd.Open()
2407       except errors.BlockDeviceError, err:
2408         _Fail("Can't change to primary mode: %s", err)
2409   if multimaster:
2410     msg = "multi-master and primary"
2411   else:
2412     msg = "single-master"
2413   return (True, "Disks are now configured as %s" % msg)
2414
2415
2416 def DrbdWaitSync(nodes_ip, disks):
2417   """Wait until DRBDs have synchronized.
2418
2419   """
2420   status, bdevs = _FindDisks(nodes_ip, disks)
2421   if not status:
2422     return status, bdevs
2423
2424   min_resync = 100
2425   alldone = True
2426   failure = False
2427   for rd in bdevs:
2428     stats = rd.GetProcStatus()
2429     if not (stats.is_connected or stats.is_in_resync):
2430       failure = True
2431       break
2432     alldone = alldone and (not stats.is_in_resync)
2433     if stats.sync_percent is not None:
2434       min_resync = min(min_resync, stats.sync_percent)
2435   return (not failure, (alldone, min_resync))
2436
2437
2438 def PowercycleNode(hypervisor_type):
2439   """Hard-powercycle the node.
2440
2441   Because we need to return first, and schedule the powercycle in the
2442   background, we won't be able to report failures nicely.
2443
2444   """
2445   hyper = hypervisor.GetHypervisor(hypervisor_type)
2446   try:
2447     pid = os.fork()
2448   except OSError, err:
2449     # if we can't fork, we'll pretend that we're in the child process
2450     pid = 0
2451   if pid > 0:
2452     return (True, "Reboot scheduled in 5 seconds")
2453   time.sleep(5)
2454   hyper.PowercycleNode()
2455
2456
2457 class HooksRunner(object):
2458   """Hook runner.
2459
2460   This class is instantiated on the node side (ganeti-noded) and not
2461   on the master side.
2462
2463   """
2464   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2465
2466   def __init__(self, hooks_base_dir=None):
2467     """Constructor for hooks runner.
2468
2469     @type hooks_base_dir: str or None
2470     @param hooks_base_dir: if not None, this overrides the
2471         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2472
2473     """
2474     if hooks_base_dir is None:
2475       hooks_base_dir = constants.HOOKS_BASE_DIR
2476     self._BASE_DIR = hooks_base_dir
2477
2478   @staticmethod
2479   def ExecHook(script, env):
2480     """Exec one hook script.
2481
2482     @type script: str
2483     @param script: the full path to the script
2484     @type env: dict
2485     @param env: the environment with which to exec the script
2486     @rtype: tuple (success, message)
2487     @return: a tuple of success and message, where success
2488         indicates the succes of the operation, and message
2489         which will contain the error details in case we
2490         failed
2491
2492     """
2493     # exec the process using subprocess and log the output
2494     fdstdin = None
2495     try:
2496       fdstdin = open("/dev/null", "r")
2497       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2498                                stderr=subprocess.STDOUT, close_fds=True,
2499                                shell=False, cwd="/", env=env)
2500       output = ""
2501       try:
2502         output = child.stdout.read(4096)
2503         child.stdout.close()
2504       except EnvironmentError, err:
2505         output += "Hook script error: %s" % str(err)
2506
2507       while True:
2508         try:
2509           result = child.wait()
2510           break
2511         except EnvironmentError, err:
2512           if err.errno == errno.EINTR:
2513             continue
2514           raise
2515     finally:
2516       # try not to leak fds
2517       for fd in (fdstdin, ):
2518         if fd is not None:
2519           try:
2520             fd.close()
2521           except EnvironmentError, err:
2522             # just log the error
2523             #logging.exception("Error while closing fd %s", fd)
2524             pass
2525
2526     return result == 0, utils.SafeEncode(output.strip())
2527
2528   def RunHooks(self, hpath, phase, env):
2529     """Run the scripts in the hooks directory.
2530
2531     @type hpath: str
2532     @param hpath: the path to the hooks directory which
2533         holds the scripts
2534     @type phase: str
2535     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2536         L{constants.HOOKS_PHASE_POST}
2537     @type env: dict
2538     @param env: dictionary with the environment for the hook
2539     @rtype: list
2540     @return: list of 3-element tuples:
2541       - script path
2542       - script result, either L{constants.HKR_SUCCESS} or
2543         L{constants.HKR_FAIL}
2544       - output of the script
2545
2546     @raise errors.ProgrammerError: for invalid input
2547         parameters
2548
2549     """
2550     if phase == constants.HOOKS_PHASE_PRE:
2551       suffix = "pre"
2552     elif phase == constants.HOOKS_PHASE_POST:
2553       suffix = "post"
2554     else:
2555       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2556     rr = []
2557
2558     subdir = "%s-%s.d" % (hpath, suffix)
2559     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2560     try:
2561       dir_contents = utils.ListVisibleFiles(dir_name)
2562     except OSError, err:
2563       # FIXME: must log output in case of failures
2564       return rr
2565
2566     # we use the standard python sort order,
2567     # so 00name is the recommended naming scheme
2568     dir_contents.sort()
2569     for relname in dir_contents:
2570       fname = os.path.join(dir_name, relname)
2571       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2572           self.RE_MASK.match(relname) is not None):
2573         rrval = constants.HKR_SKIP
2574         output = ""
2575       else:
2576         result, output = self.ExecHook(fname, env)
2577         if not result:
2578           rrval = constants.HKR_FAIL
2579         else:
2580           rrval = constants.HKR_SUCCESS
2581       rr.append(("%s/%s" % (subdir, relname), rrval, output))
2582
2583     return rr
2584
2585
2586 class IAllocatorRunner(object):
2587   """IAllocator runner.
2588
2589   This class is instantiated on the node side (ganeti-noded) and not on
2590   the master side.
2591
2592   """
2593   def Run(self, name, idata):
2594     """Run an iallocator script.
2595
2596     @type name: str
2597     @param name: the iallocator script name
2598     @type idata: str
2599     @param idata: the allocator input data
2600
2601     @rtype: tuple
2602     @return: four element tuple of:
2603        - run status (one of the IARUN_ constants)
2604        - stdout
2605        - stderr
2606        - fail reason (as from L{utils.RunResult})
2607
2608     """
2609     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2610                                   os.path.isfile)
2611     if alloc_script is None:
2612       return (constants.IARUN_NOTFOUND, None, None, None)
2613
2614     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2615     try:
2616       os.write(fd, idata)
2617       os.close(fd)
2618       result = utils.RunCmd([alloc_script, fin_name])
2619       if result.failed:
2620         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2621                 result.fail_reason)
2622     finally:
2623       os.unlink(fin_name)
2624
2625     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2626
2627
2628 class DevCacheManager(object):
2629   """Simple class for managing a cache of block device information.
2630
2631   """
2632   _DEV_PREFIX = "/dev/"
2633   _ROOT_DIR = constants.BDEV_CACHE_DIR
2634
2635   @classmethod
2636   def _ConvertPath(cls, dev_path):
2637     """Converts a /dev/name path to the cache file name.
2638
2639     This replaces slashes with underscores and strips the /dev
2640     prefix. It then returns the full path to the cache file.
2641
2642     @type dev_path: str
2643     @param dev_path: the C{/dev/} path name
2644     @rtype: str
2645     @return: the converted path name
2646
2647     """
2648     if dev_path.startswith(cls._DEV_PREFIX):
2649       dev_path = dev_path[len(cls._DEV_PREFIX):]
2650     dev_path = dev_path.replace("/", "_")
2651     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2652     return fpath
2653
2654   @classmethod
2655   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2656     """Updates the cache information for a given device.
2657
2658     @type dev_path: str
2659     @param dev_path: the pathname of the device
2660     @type owner: str
2661     @param owner: the owner (instance name) of the device
2662     @type on_primary: bool
2663     @param on_primary: whether this is the primary
2664         node nor not
2665     @type iv_name: str
2666     @param iv_name: the instance-visible name of the
2667         device, as in objects.Disk.iv_name
2668
2669     @rtype: None
2670
2671     """
2672     if dev_path is None:
2673       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2674       return
2675     fpath = cls._ConvertPath(dev_path)
2676     if on_primary:
2677       state = "primary"
2678     else:
2679       state = "secondary"
2680     if iv_name is None:
2681       iv_name = "not_visible"
2682     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2683     try:
2684       utils.WriteFile(fpath, data=fdata)
2685     except EnvironmentError, err:
2686       logging.exception("Can't update bdev cache for %s", dev_path)
2687
2688   @classmethod
2689   def RemoveCache(cls, dev_path):
2690     """Remove data for a dev_path.
2691
2692     This is just a wrapper over L{utils.RemoveFile} with a converted
2693     path name and logging.
2694
2695     @type dev_path: str
2696     @param dev_path: the pathname of the device
2697
2698     @rtype: None
2699
2700     """
2701     if dev_path is None:
2702       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2703       return
2704     fpath = cls._ConvertPath(dev_path)
2705     try:
2706       utils.RemoveFile(fpath)
2707     except EnvironmentError, err:
2708       logging.exception("Can't update bdev cache for %s", dev_path)