Convert instance_os_import rpc to new style result
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36 import zlib
37 import base64
38
39 from ganeti import errors
40 from ganeti import utils
41 from ganeti import ssh
42 from ganeti import hypervisor
43 from ganeti import constants
44 from ganeti import bdev
45 from ganeti import objects
46 from ganeti import ssconf
47
48
49 class RPCFail(Exception):
50   """Class denoting RPC failure.
51
52   Its argument is the error message.
53
54   """
55
56 def _Fail(msg, *args, **kwargs):
57   """Log an error and the raise an RPCFail exception.
58
59   This exception is then handled specially in the ganeti daemon and
60   turned into a 'failed' return type. As such, this function is a
61   useful shortcut for logging the error and returning it to the master
62   daemon.
63
64   @type msg: string
65   @param msg: the text of the exception
66   @raise RPCFail
67
68   """
69   if args:
70     msg = msg % args
71   if "exc" in kwargs and kwargs["exc"]:
72     logging.exception(msg)
73   else:
74     logging.error(msg)
75   raise RPCFail(msg)
76
77
78 def _GetConfig():
79   """Simple wrapper to return a SimpleStore.
80
81   @rtype: L{ssconf.SimpleStore}
82   @return: a SimpleStore instance
83
84   """
85   return ssconf.SimpleStore()
86
87
88 def _GetSshRunner(cluster_name):
89   """Simple wrapper to return an SshRunner.
90
91   @type cluster_name: str
92   @param cluster_name: the cluster name, which is needed
93       by the SshRunner constructor
94   @rtype: L{ssh.SshRunner}
95   @return: an SshRunner instance
96
97   """
98   return ssh.SshRunner(cluster_name)
99
100
101 def _Decompress(data):
102   """Unpacks data compressed by the RPC client.
103
104   @type data: list or tuple
105   @param data: Data sent by RPC client
106   @rtype: str
107   @return: Decompressed data
108
109   """
110   assert isinstance(data, (list, tuple))
111   assert len(data) == 2
112   (encoding, content) = data
113   if encoding == constants.RPC_ENCODING_NONE:
114     return content
115   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
116     return zlib.decompress(base64.b64decode(content))
117   else:
118     raise AssertionError("Unknown data encoding")
119
120
121 def _CleanDirectory(path, exclude=None):
122   """Removes all regular files in a directory.
123
124   @type path: str
125   @param path: the directory to clean
126   @type exclude: list
127   @param exclude: list of files to be excluded, defaults
128       to the empty list
129
130   """
131   if not os.path.isdir(path):
132     return
133   if exclude is None:
134     exclude = []
135   else:
136     # Normalize excluded paths
137     exclude = [os.path.normpath(i) for i in exclude]
138
139   for rel_name in utils.ListVisibleFiles(path):
140     full_name = os.path.normpath(os.path.join(path, rel_name))
141     if full_name in exclude:
142       continue
143     if os.path.isfile(full_name) and not os.path.islink(full_name):
144       utils.RemoveFile(full_name)
145
146
147 def JobQueuePurge():
148   """Removes job queue files and archived jobs.
149
150   @rtype: None
151
152   """
153   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
154   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
155
156
157 def GetMasterInfo():
158   """Returns master information.
159
160   This is an utility function to compute master information, either
161   for consumption here or from the node daemon.
162
163   @rtype: tuple
164   @return: (master_netdev, master_ip, master_name) if we have a good
165       configuration, otherwise (None, None, None)
166
167   """
168   try:
169     cfg = _GetConfig()
170     master_netdev = cfg.GetMasterNetdev()
171     master_ip = cfg.GetMasterIP()
172     master_node = cfg.GetMasterNode()
173   except errors.ConfigurationError, err:
174     logging.exception("Cluster configuration incomplete")
175     return (None, None, None)
176   return (master_netdev, master_ip, master_node)
177
178
179 def StartMaster(start_daemons):
180   """Activate local node as master node.
181
182   The function will always try activate the IP address of the master
183   (unless someone else has it). It will also start the master daemons,
184   based on the start_daemons parameter.
185
186   @type start_daemons: boolean
187   @param start_daemons: whther to also start the master
188       daemons (ganeti-masterd and ganeti-rapi)
189   @rtype: None
190
191   """
192   ok = True
193   master_netdev, master_ip, _ = GetMasterInfo()
194   if not master_netdev:
195     return False
196
197   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
198     if utils.OwnIpAddress(master_ip):
199       # we already have the ip:
200       logging.debug("Already started")
201     else:
202       logging.error("Someone else has the master ip, not activating")
203       ok = False
204   else:
205     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
206                            "dev", master_netdev, "label",
207                            "%s:0" % master_netdev])
208     if result.failed:
209       logging.error("Can't activate master IP: %s", result.output)
210       ok = False
211
212     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
213                            "-s", master_ip, master_ip])
214     # we'll ignore the exit code of arping
215
216   # and now start the master and rapi daemons
217   if start_daemons:
218     for daemon in 'ganeti-masterd', 'ganeti-rapi':
219       result = utils.RunCmd([daemon])
220       if result.failed:
221         logging.error("Can't start daemon %s: %s", daemon, result.output)
222         ok = False
223   return ok
224
225
226 def StopMaster(stop_daemons):
227   """Deactivate this node as master.
228
229   The function will always try to deactivate the IP address of the
230   master. It will also stop the master daemons depending on the
231   stop_daemons parameter.
232
233   @type stop_daemons: boolean
234   @param stop_daemons: whether to also stop the master daemons
235       (ganeti-masterd and ganeti-rapi)
236   @rtype: None
237
238   """
239   master_netdev, master_ip, _ = GetMasterInfo()
240   if not master_netdev:
241     return False
242
243   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
244                          "dev", master_netdev])
245   if result.failed:
246     logging.error("Can't remove the master IP, error: %s", result.output)
247     # but otherwise ignore the failure
248
249   if stop_daemons:
250     # stop/kill the rapi and the master daemon
251     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
252       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
253
254   return True
255
256
257 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
258   """Joins this node to the cluster.
259
260   This does the following:
261       - updates the hostkeys of the machine (rsa and dsa)
262       - adds the ssh private key to the user
263       - adds the ssh public key to the users' authorized_keys file
264
265   @type dsa: str
266   @param dsa: the DSA private key to write
267   @type dsapub: str
268   @param dsapub: the DSA public key to write
269   @type rsa: str
270   @param rsa: the RSA private key to write
271   @type rsapub: str
272   @param rsapub: the RSA public key to write
273   @type sshkey: str
274   @param sshkey: the SSH private key to write
275   @type sshpub: str
276   @param sshpub: the SSH public key to write
277   @rtype: boolean
278   @return: the success of the operation
279
280   """
281   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
282                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
283                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
284                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
285   for name, content, mode in sshd_keys:
286     utils.WriteFile(name, data=content, mode=mode)
287
288   try:
289     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
290                                                     mkdir=True)
291   except errors.OpExecError, err:
292     _Fail("Error while processing user ssh files: %s", err, exc=True)
293
294   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
295     utils.WriteFile(name, data=content, mode=0600)
296
297   utils.AddAuthorizedKey(auth_keys, sshpub)
298
299   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
300
301   return (True, "Node added successfully")
302
303
304 def LeaveCluster():
305   """Cleans up and remove the current node.
306
307   This function cleans up and prepares the current node to be removed
308   from the cluster.
309
310   If processing is successful, then it raises an
311   L{errors.QuitGanetiException} which is used as a special case to
312   shutdown the node daemon.
313
314   """
315   _CleanDirectory(constants.DATA_DIR)
316   JobQueuePurge()
317
318   try:
319     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
320   except errors.OpExecError:
321     logging.exception("Error while processing ssh files")
322     return
323
324   f = open(pub_key, 'r')
325   try:
326     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
327   finally:
328     f.close()
329
330   utils.RemoveFile(priv_key)
331   utils.RemoveFile(pub_key)
332
333   # Return a reassuring string to the caller, and quit
334   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
335
336
337 def GetNodeInfo(vgname, hypervisor_type):
338   """Gives back a hash with different informations about the node.
339
340   @type vgname: C{string}
341   @param vgname: the name of the volume group to ask for disk space information
342   @type hypervisor_type: C{str}
343   @param hypervisor_type: the name of the hypervisor to ask for
344       memory information
345   @rtype: C{dict}
346   @return: dictionary with the following keys:
347       - vg_size is the size of the configured volume group in MiB
348       - vg_free is the free size of the volume group in MiB
349       - memory_dom0 is the memory allocated for domain0 in MiB
350       - memory_free is the currently available (free) ram in MiB
351       - memory_total is the total number of ram in MiB
352
353   """
354   outputarray = {}
355   vginfo = _GetVGInfo(vgname)
356   outputarray['vg_size'] = vginfo['vg_size']
357   outputarray['vg_free'] = vginfo['vg_free']
358
359   hyper = hypervisor.GetHypervisor(hypervisor_type)
360   hyp_info = hyper.GetNodeInfo()
361   if hyp_info is not None:
362     outputarray.update(hyp_info)
363
364   f = open("/proc/sys/kernel/random/boot_id", 'r')
365   try:
366     outputarray["bootid"] = f.read(128).rstrip("\n")
367   finally:
368     f.close()
369
370   return outputarray
371
372
373 def VerifyNode(what, cluster_name):
374   """Verify the status of the local node.
375
376   Based on the input L{what} parameter, various checks are done on the
377   local node.
378
379   If the I{filelist} key is present, this list of
380   files is checksummed and the file/checksum pairs are returned.
381
382   If the I{nodelist} key is present, we check that we have
383   connectivity via ssh with the target nodes (and check the hostname
384   report).
385
386   If the I{node-net-test} key is present, we check that we have
387   connectivity to the given nodes via both primary IP and, if
388   applicable, secondary IPs.
389
390   @type what: C{dict}
391   @param what: a dictionary of things to check:
392       - filelist: list of files for which to compute checksums
393       - nodelist: list of nodes we should check ssh communication with
394       - node-net-test: list of nodes we should check node daemon port
395         connectivity with
396       - hypervisor: list with hypervisors to run the verify for
397   @rtype: dict
398   @return: a dictionary with the same keys as the input dict, and
399       values representing the result of the checks
400
401   """
402   result = {}
403
404   if constants.NV_HYPERVISOR in what:
405     result[constants.NV_HYPERVISOR] = tmp = {}
406     for hv_name in what[constants.NV_HYPERVISOR]:
407       tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
408
409   if constants.NV_FILELIST in what:
410     result[constants.NV_FILELIST] = utils.FingerprintFiles(
411       what[constants.NV_FILELIST])
412
413   if constants.NV_NODELIST in what:
414     result[constants.NV_NODELIST] = tmp = {}
415     random.shuffle(what[constants.NV_NODELIST])
416     for node in what[constants.NV_NODELIST]:
417       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
418       if not success:
419         tmp[node] = message
420
421   if constants.NV_NODENETTEST in what:
422     result[constants.NV_NODENETTEST] = tmp = {}
423     my_name = utils.HostInfo().name
424     my_pip = my_sip = None
425     for name, pip, sip in what[constants.NV_NODENETTEST]:
426       if name == my_name:
427         my_pip = pip
428         my_sip = sip
429         break
430     if not my_pip:
431       tmp[my_name] = ("Can't find my own primary/secondary IP"
432                       " in the node list")
433     else:
434       port = utils.GetNodeDaemonPort()
435       for name, pip, sip in what[constants.NV_NODENETTEST]:
436         fail = []
437         if not utils.TcpPing(pip, port, source=my_pip):
438           fail.append("primary")
439         if sip != pip:
440           if not utils.TcpPing(sip, port, source=my_sip):
441             fail.append("secondary")
442         if fail:
443           tmp[name] = ("failure using the %s interface(s)" %
444                        " and ".join(fail))
445
446   if constants.NV_LVLIST in what:
447     result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
448
449   if constants.NV_INSTANCELIST in what:
450     result[constants.NV_INSTANCELIST] = GetInstanceList(
451       what[constants.NV_INSTANCELIST])
452
453   if constants.NV_VGLIST in what:
454     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
455
456   if constants.NV_VERSION in what:
457     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
458                                     constants.RELEASE_VERSION)
459
460   if constants.NV_HVINFO in what:
461     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
462     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
463
464   if constants.NV_DRBDLIST in what:
465     try:
466       used_minors = bdev.DRBD8.GetUsedDevs().keys()
467     except errors.BlockDeviceError, err:
468       logging.warning("Can't get used minors list", exc_info=True)
469       used_minors = str(err)
470     result[constants.NV_DRBDLIST] = used_minors
471
472   return result
473
474
475 def GetVolumeList(vg_name):
476   """Compute list of logical volumes and their size.
477
478   @type vg_name: str
479   @param vg_name: the volume group whose LVs we should list
480   @rtype: dict
481   @return:
482       dictionary of all partions (key) with value being a tuple of
483       their size (in MiB), inactive and online status::
484
485         {'test1': ('20.06', True, True)}
486
487       in case of errors, a string is returned with the error
488       details.
489
490   """
491   lvs = {}
492   sep = '|'
493   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
494                          "--separator=%s" % sep,
495                          "-olv_name,lv_size,lv_attr", vg_name])
496   if result.failed:
497     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
498
499   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
500   for line in result.stdout.splitlines():
501     line = line.strip()
502     match = valid_line_re.match(line)
503     if not match:
504       logging.error("Invalid line returned from lvs output: '%s'", line)
505       continue
506     name, size, attr = match.groups()
507     inactive = attr[4] == '-'
508     online = attr[5] == 'o'
509     lvs[name] = (size, inactive, online)
510
511   return lvs
512
513
514 def ListVolumeGroups():
515   """List the volume groups and their size.
516
517   @rtype: dict
518   @return: dictionary with keys volume name and values the
519       size of the volume
520
521   """
522   return True, utils.ListVolumeGroups()
523
524
525 def NodeVolumes():
526   """List all volumes on this node.
527
528   @rtype: list
529   @return:
530     A list of dictionaries, each having four keys:
531       - name: the logical volume name,
532       - size: the size of the logical volume
533       - dev: the physical device on which the LV lives
534       - vg: the volume group to which it belongs
535
536     In case of errors, we return an empty list and log the
537     error.
538
539     Note that since a logical volume can live on multiple physical
540     volumes, the resulting list might include a logical volume
541     multiple times.
542
543   """
544   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
545                          "--separator=|",
546                          "--options=lv_name,lv_size,devices,vg_name"])
547   if result.failed:
548     logging.error("Failed to list logical volumes, lvs output: %s",
549                   result.output)
550     return []
551
552   def parse_dev(dev):
553     if '(' in dev:
554       return dev.split('(')[0]
555     else:
556       return dev
557
558   def map_line(line):
559     return {
560       'name': line[0].strip(),
561       'size': line[1].strip(),
562       'dev': parse_dev(line[2].strip()),
563       'vg': line[3].strip(),
564     }
565
566   return [map_line(line.split('|')) for line in result.stdout.splitlines()
567           if line.count('|') >= 3]
568
569
570 def BridgesExist(bridges_list):
571   """Check if a list of bridges exist on the current node.
572
573   @rtype: boolean
574   @return: C{True} if all of them exist, C{False} otherwise
575
576   """
577   missing = []
578   for bridge in bridges_list:
579     if not utils.BridgeExists(bridge):
580       missing.append(bridge)
581
582   if missing:
583     return False, "Missing bridges %s" % (", ".join(missing),)
584
585   return True, None
586
587
588 def GetInstanceList(hypervisor_list):
589   """Provides a list of instances.
590
591   @type hypervisor_list: list
592   @param hypervisor_list: the list of hypervisors to query information
593
594   @rtype: list
595   @return: a list of all running instances on the current node
596     - instance1.example.com
597     - instance2.example.com
598
599   """
600   results = []
601   for hname in hypervisor_list:
602     try:
603       names = hypervisor.GetHypervisor(hname).ListInstances()
604       results.extend(names)
605     except errors.HypervisorError, err:
606       logging.exception("Error enumerating instances for hypevisor %s", hname)
607       raise
608
609   return results
610
611
612 def GetInstanceInfo(instance, hname):
613   """Gives back the informations about an instance as a dictionary.
614
615   @type instance: string
616   @param instance: the instance name
617   @type hname: string
618   @param hname: the hypervisor type of the instance
619
620   @rtype: dict
621   @return: dictionary with the following keys:
622       - memory: memory size of instance (int)
623       - state: xen state of instance (string)
624       - time: cpu time of instance (float)
625
626   """
627   output = {}
628
629   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
630   if iinfo is not None:
631     output['memory'] = iinfo[2]
632     output['state'] = iinfo[4]
633     output['time'] = iinfo[5]
634
635   return output
636
637
638 def GetInstanceMigratable(instance):
639   """Gives whether an instance can be migrated.
640
641   @type instance: L{objects.Instance}
642   @param instance: object representing the instance to be checked.
643
644   @rtype: tuple
645   @return: tuple of (result, description) where:
646       - result: whether the instance can be migrated or not
647       - description: a description of the issue, if relevant
648
649   """
650   hyper = hypervisor.GetHypervisor(instance.hypervisor)
651   if instance.name not in hyper.ListInstances():
652     return (False, 'not running')
653
654   for idx in range(len(instance.disks)):
655     link_name = _GetBlockDevSymlinkPath(instance.name, idx)
656     if not os.path.islink(link_name):
657       return (False, 'not restarted since ganeti 1.2.5')
658
659   return (True, '')
660
661
662 def GetAllInstancesInfo(hypervisor_list):
663   """Gather data about all instances.
664
665   This is the equivalent of L{GetInstanceInfo}, except that it
666   computes data for all instances at once, thus being faster if one
667   needs data about more than one instance.
668
669   @type hypervisor_list: list
670   @param hypervisor_list: list of hypervisors to query for instance data
671
672   @rtype: dict
673   @return: dictionary of instance: data, with data having the following keys:
674       - memory: memory size of instance (int)
675       - state: xen state of instance (string)
676       - time: cpu time of instance (float)
677       - vcpus: the number of vcpus
678
679   """
680   output = {}
681
682   for hname in hypervisor_list:
683     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
684     if iinfo:
685       for name, inst_id, memory, vcpus, state, times in iinfo:
686         value = {
687           'memory': memory,
688           'vcpus': vcpus,
689           'state': state,
690           'time': times,
691           }
692         if name in output:
693           # we only check static parameters, like memory and vcpus,
694           # and not state and time which can change between the
695           # invocations of the different hypervisors
696           for key in 'memory', 'vcpus':
697             if value[key] != output[name][key]:
698               raise errors.HypervisorError("Instance %s is running twice"
699                                            " with different parameters" % name)
700         output[name] = value
701
702   return output
703
704
705 def InstanceOsAdd(instance, reinstall):
706   """Add an OS to an instance.
707
708   @type instance: L{objects.Instance}
709   @param instance: Instance whose OS is to be installed
710   @type reinstall: boolean
711   @param reinstall: whether this is an instance reinstall
712   @rtype: boolean
713   @return: the success of the operation
714
715   """
716   try:
717     inst_os = OSFromDisk(instance.os)
718   except errors.InvalidOS, err:
719     os_name, os_dir, os_err = err.args
720     if os_dir is None:
721       return (False, "Can't find OS '%s': %s" % (os_name, os_err))
722     else:
723       return (False, "Error parsing OS '%s' in directory %s: %s" %
724               (os_name, os_dir, os_err))
725
726   create_env = OSEnvironment(instance)
727   if reinstall:
728     create_env['INSTANCE_REINSTALL'] = "1"
729
730   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
731                                      instance.name, int(time.time()))
732
733   result = utils.RunCmd([inst_os.create_script], env=create_env,
734                         cwd=inst_os.path, output=logfile,)
735   if result.failed:
736     logging.error("os create command '%s' returned error: %s, logfile: %s,"
737                   " output: %s", result.cmd, result.fail_reason, logfile,
738                   result.output)
739     lines = [utils.SafeEncode(val)
740              for val in utils.TailFile(logfile, lines=20)]
741     return (False, "OS create script failed (%s), last lines in the"
742             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
743
744   return (True, "Successfully installed")
745
746
747 def RunRenameInstance(instance, old_name):
748   """Run the OS rename script for an instance.
749
750   @type instance: L{objects.Instance}
751   @param instance: Instance whose OS is to be installed
752   @type old_name: string
753   @param old_name: previous instance name
754   @rtype: boolean
755   @return: the success of the operation
756
757   """
758   inst_os = OSFromDisk(instance.os)
759
760   rename_env = OSEnvironment(instance)
761   rename_env['OLD_INSTANCE_NAME'] = old_name
762
763   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
764                                            old_name,
765                                            instance.name, int(time.time()))
766
767   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
768                         cwd=inst_os.path, output=logfile)
769
770   if result.failed:
771     logging.error("os create command '%s' returned error: %s output: %s",
772                   result.cmd, result.fail_reason, result.output)
773     lines = [utils.SafeEncode(val)
774              for val in utils.TailFile(logfile, lines=20)]
775     return (False, "OS rename script failed (%s), last lines in the"
776             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
777
778   return (True, "Rename successful")
779
780
781 def _GetVGInfo(vg_name):
782   """Get informations about the volume group.
783
784   @type vg_name: str
785   @param vg_name: the volume group which we query
786   @rtype: dict
787   @return:
788     A dictionary with the following keys:
789       - C{vg_size} is the total size of the volume group in MiB
790       - C{vg_free} is the free size of the volume group in MiB
791       - C{pv_count} are the number of physical disks in that VG
792
793     If an error occurs during gathering of data, we return the same dict
794     with keys all set to None.
795
796   """
797   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
798
799   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
800                          "--nosuffix", "--units=m", "--separator=:", vg_name])
801
802   if retval.failed:
803     logging.error("volume group %s not present", vg_name)
804     return retdic
805   valarr = retval.stdout.strip().rstrip(':').split(':')
806   if len(valarr) == 3:
807     try:
808       retdic = {
809         "vg_size": int(round(float(valarr[0]), 0)),
810         "vg_free": int(round(float(valarr[1]), 0)),
811         "pv_count": int(valarr[2]),
812         }
813     except ValueError, err:
814       logging.exception("Fail to parse vgs output")
815   else:
816     logging.error("vgs output has the wrong number of fields (expected"
817                   " three): %s", str(valarr))
818   return retdic
819
820
821 def _GetBlockDevSymlinkPath(instance_name, idx):
822   return os.path.join(constants.DISK_LINKS_DIR,
823                       "%s:%d" % (instance_name, idx))
824
825
826 def _SymlinkBlockDev(instance_name, device_path, idx):
827   """Set up symlinks to a instance's block device.
828
829   This is an auxiliary function run when an instance is start (on the primary
830   node) or when an instance is migrated (on the target node).
831
832
833   @param instance_name: the name of the target instance
834   @param device_path: path of the physical block device, on the node
835   @param idx: the disk index
836   @return: absolute path to the disk's symlink
837
838   """
839   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
840   try:
841     os.symlink(device_path, link_name)
842   except OSError, err:
843     if err.errno == errno.EEXIST:
844       if (not os.path.islink(link_name) or
845           os.readlink(link_name) != device_path):
846         os.remove(link_name)
847         os.symlink(device_path, link_name)
848     else:
849       raise
850
851   return link_name
852
853
854 def _RemoveBlockDevLinks(instance_name, disks):
855   """Remove the block device symlinks belonging to the given instance.
856
857   """
858   for idx, disk in enumerate(disks):
859     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
860     if os.path.islink(link_name):
861       try:
862         os.remove(link_name)
863       except OSError:
864         logging.exception("Can't remove symlink '%s'", link_name)
865
866
867 def _GatherAndLinkBlockDevs(instance):
868   """Set up an instance's block device(s).
869
870   This is run on the primary node at instance startup. The block
871   devices must be already assembled.
872
873   @type instance: L{objects.Instance}
874   @param instance: the instance whose disks we shoul assemble
875   @rtype: list
876   @return: list of (disk_object, device_path)
877
878   """
879   block_devices = []
880   for idx, disk in enumerate(instance.disks):
881     device = _RecursiveFindBD(disk)
882     if device is None:
883       raise errors.BlockDeviceError("Block device '%s' is not set up." %
884                                     str(disk))
885     device.Open()
886     try:
887       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
888     except OSError, e:
889       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
890                                     e.strerror)
891
892     block_devices.append((disk, link_name))
893
894   return block_devices
895
896
897 def StartInstance(instance):
898   """Start an instance.
899
900   @type instance: L{objects.Instance}
901   @param instance: the instance object
902   @rtype: boolean
903   @return: whether the startup was successful or not
904
905   """
906   running_instances = GetInstanceList([instance.hypervisor])
907
908   if instance.name in running_instances:
909     return (True, "Already running")
910
911   try:
912     block_devices = _GatherAndLinkBlockDevs(instance)
913     hyper = hypervisor.GetHypervisor(instance.hypervisor)
914     hyper.StartInstance(instance, block_devices)
915   except errors.BlockDeviceError, err:
916     _Fail("Block device error: %s", err, exc=True)
917   except errors.HypervisorError, err:
918     _RemoveBlockDevLinks(instance.name, instance.disks)
919     _Fail("Hypervisor error: %s", err, exc=True)
920
921   return (True, "Instance started successfully")
922
923
924 def InstanceShutdown(instance):
925   """Shut an instance down.
926
927   @note: this functions uses polling with a hardcoded timeout.
928
929   @type instance: L{objects.Instance}
930   @param instance: the instance object
931   @rtype: boolean
932   @return: whether the startup was successful or not
933
934   """
935   hv_name = instance.hypervisor
936   running_instances = GetInstanceList([hv_name])
937
938   if instance.name not in running_instances:
939     return (True, "Instance already stopped")
940
941   hyper = hypervisor.GetHypervisor(hv_name)
942   try:
943     hyper.StopInstance(instance)
944   except errors.HypervisorError, err:
945     _Fail("Failed to stop instance %s: %s", instance.name, err)
946
947   # test every 10secs for 2min
948
949   time.sleep(1)
950   for dummy in range(11):
951     if instance.name not in GetInstanceList([hv_name]):
952       break
953     time.sleep(10)
954   else:
955     # the shutdown did not succeed
956     logging.error("Shutdown of '%s' unsuccessful, using destroy",
957                   instance.name)
958
959     try:
960       hyper.StopInstance(instance, force=True)
961     except errors.HypervisorError, err:
962       _Fail("Failed to force stop instance %s: %s", instance.name, err)
963
964     time.sleep(1)
965     if instance.name in GetInstanceList([hv_name]):
966       _Fail("Could not shutdown instance %s even by destroy", instance.name)
967
968   _RemoveBlockDevLinks(instance.name, instance.disks)
969
970   return (True, "Instance has been shutdown successfully")
971
972
973 def InstanceReboot(instance, reboot_type):
974   """Reboot an instance.
975
976   @type instance: L{objects.Instance}
977   @param instance: the instance object to reboot
978   @type reboot_type: str
979   @param reboot_type: the type of reboot, one the following
980     constants:
981       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
982         instance OS, do not recreate the VM
983       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
984         restart the VM (at the hypervisor level)
985       - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
986         is not accepted here, since that mode is handled
987         differently
988   @rtype: boolean
989   @return: the success of the operation
990
991   """
992   running_instances = GetInstanceList([instance.hypervisor])
993
994   if instance.name not in running_instances:
995     _Fail("Cannot reboot instance %s that is not running", instance.name)
996
997   hyper = hypervisor.GetHypervisor(instance.hypervisor)
998   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
999     try:
1000       hyper.RebootInstance(instance)
1001     except errors.HypervisorError, err:
1002       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1003   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1004     try:
1005       stop_result = InstanceShutdown(instance)
1006       if not stop_result[0]:
1007         return stop_result
1008       return StartInstance(instance)
1009     except errors.HypervisorError, err:
1010       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1011   else:
1012     _Fail("Invalid reboot_type received: %s", reboot_type)
1013
1014   return (True, "Reboot successful")
1015
1016
1017 def MigrationInfo(instance):
1018   """Gather information about an instance to be migrated.
1019
1020   @type instance: L{objects.Instance}
1021   @param instance: the instance definition
1022
1023   """
1024   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1025   try:
1026     info = hyper.MigrationInfo(instance)
1027   except errors.HypervisorError, err:
1028     _Fail("Failed to fetch migration information: %s", err, exc=True)
1029   return (True, info)
1030
1031
1032 def AcceptInstance(instance, info, target):
1033   """Prepare the node to accept an instance.
1034
1035   @type instance: L{objects.Instance}
1036   @param instance: the instance definition
1037   @type info: string/data (opaque)
1038   @param info: migration information, from the source node
1039   @type target: string
1040   @param target: target host (usually ip), on this node
1041
1042   """
1043   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1044   try:
1045     hyper.AcceptInstance(instance, info, target)
1046   except errors.HypervisorError, err:
1047     _Fail("Failed to accept instance: %s", err, exc=True)
1048   return (True, "Accept successfull")
1049
1050
1051 def FinalizeMigration(instance, info, success):
1052   """Finalize any preparation to accept an instance.
1053
1054   @type instance: L{objects.Instance}
1055   @param instance: the instance definition
1056   @type info: string/data (opaque)
1057   @param info: migration information, from the source node
1058   @type success: boolean
1059   @param success: whether the migration was a success or a failure
1060
1061   """
1062   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1063   try:
1064     hyper.FinalizeMigration(instance, info, success)
1065   except errors.HypervisorError, err:
1066     _Fail("Failed to finalize migration: %s", err, exc=True)
1067   return (True, "Migration Finalized")
1068
1069
1070 def MigrateInstance(instance, target, live):
1071   """Migrates an instance to another node.
1072
1073   @type instance: L{objects.Instance}
1074   @param instance: the instance definition
1075   @type target: string
1076   @param target: the target node name
1077   @type live: boolean
1078   @param live: whether the migration should be done live or not (the
1079       interpretation of this parameter is left to the hypervisor)
1080   @rtype: tuple
1081   @return: a tuple of (success, msg) where:
1082       - succes is a boolean denoting the success/failure of the operation
1083       - msg is a string with details in case of failure
1084
1085   """
1086   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1087
1088   try:
1089     hyper.MigrateInstance(instance.name, target, live)
1090   except errors.HypervisorError, err:
1091     _Fail("Failed to migrate instance: %s", err, exc=True)
1092   return (True, "Migration successfull")
1093
1094
1095 def BlockdevCreate(disk, size, owner, on_primary, info):
1096   """Creates a block device for an instance.
1097
1098   @type disk: L{objects.Disk}
1099   @param disk: the object describing the disk we should create
1100   @type size: int
1101   @param size: the size of the physical underlying device, in MiB
1102   @type owner: str
1103   @param owner: the name of the instance for which disk is created,
1104       used for device cache data
1105   @type on_primary: boolean
1106   @param on_primary:  indicates if it is the primary node or not
1107   @type info: string
1108   @param info: string that will be sent to the physical device
1109       creation, used for example to set (LVM) tags on LVs
1110
1111   @return: the new unique_id of the device (this can sometime be
1112       computed only after creation), or None. On secondary nodes,
1113       it's not required to return anything.
1114
1115   """
1116   clist = []
1117   if disk.children:
1118     for child in disk.children:
1119       try:
1120         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1121       except errors.BlockDeviceError, err:
1122         _Fail("Can't assemble device %s: %s", child, err)
1123       if on_primary or disk.AssembleOnSecondary():
1124         # we need the children open in case the device itself has to
1125         # be assembled
1126         try:
1127           crdev.Open()
1128         except errors.BlockDeviceError, err:
1129           _Fail("Can't make child '%s' read-write: %s", child, err)
1130       clist.append(crdev)
1131
1132   try:
1133     device = bdev.Create(disk.dev_type, disk.physical_id, clist, size)
1134   except errors.BlockDeviceError, err:
1135     _Fail("Can't create block device: %s", err)
1136
1137   if on_primary or disk.AssembleOnSecondary():
1138     try:
1139       device.Assemble()
1140     except errors.BlockDeviceError, err:
1141       _Fail("Can't assemble device after creation, unusual event: %s", err)
1142     device.SetSyncSpeed(constants.SYNC_SPEED)
1143     if on_primary or disk.OpenOnSecondary():
1144       try:
1145         device.Open(force=True)
1146       except errors.BlockDeviceError, err:
1147         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1148     DevCacheManager.UpdateCache(device.dev_path, owner,
1149                                 on_primary, disk.iv_name)
1150
1151   device.SetInfo(info)
1152
1153   physical_id = device.unique_id
1154   return True, physical_id
1155
1156
1157 def BlockdevRemove(disk):
1158   """Remove a block device.
1159
1160   @note: This is intended to be called recursively.
1161
1162   @type disk: L{objects.Disk}
1163   @param disk: the disk object we should remove
1164   @rtype: boolean
1165   @return: the success of the operation
1166
1167   """
1168   msgs = []
1169   result = True
1170   try:
1171     rdev = _RecursiveFindBD(disk)
1172   except errors.BlockDeviceError, err:
1173     # probably can't attach
1174     logging.info("Can't attach to device %s in remove", disk)
1175     rdev = None
1176   if rdev is not None:
1177     r_path = rdev.dev_path
1178     try:
1179       rdev.Remove()
1180     except errors.BlockDeviceError, err:
1181       msgs.append(str(err))
1182       result = False
1183     if result:
1184       DevCacheManager.RemoveCache(r_path)
1185
1186   if disk.children:
1187     for child in disk.children:
1188       c_status, c_msg = BlockdevRemove(child)
1189       result = result and c_status
1190       if c_msg: # not an empty message
1191         msgs.append(c_msg)
1192
1193   return (result, "; ".join(msgs))
1194
1195
1196 def _RecursiveAssembleBD(disk, owner, as_primary):
1197   """Activate a block device for an instance.
1198
1199   This is run on the primary and secondary nodes for an instance.
1200
1201   @note: this function is called recursively.
1202
1203   @type disk: L{objects.Disk}
1204   @param disk: the disk we try to assemble
1205   @type owner: str
1206   @param owner: the name of the instance which owns the disk
1207   @type as_primary: boolean
1208   @param as_primary: if we should make the block device
1209       read/write
1210
1211   @return: the assembled device or None (in case no device
1212       was assembled)
1213   @raise errors.BlockDeviceError: in case there is an error
1214       during the activation of the children or the device
1215       itself
1216
1217   """
1218   children = []
1219   if disk.children:
1220     mcn = disk.ChildrenNeeded()
1221     if mcn == -1:
1222       mcn = 0 # max number of Nones allowed
1223     else:
1224       mcn = len(disk.children) - mcn # max number of Nones
1225     for chld_disk in disk.children:
1226       try:
1227         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1228       except errors.BlockDeviceError, err:
1229         if children.count(None) >= mcn:
1230           raise
1231         cdev = None
1232         logging.error("Error in child activation (but continuing): %s",
1233                       str(err))
1234       children.append(cdev)
1235
1236   if as_primary or disk.AssembleOnSecondary():
1237     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children)
1238     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1239     result = r_dev
1240     if as_primary or disk.OpenOnSecondary():
1241       r_dev.Open()
1242     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1243                                 as_primary, disk.iv_name)
1244
1245   else:
1246     result = True
1247   return result
1248
1249
1250 def BlockdevAssemble(disk, owner, as_primary):
1251   """Activate a block device for an instance.
1252
1253   This is a wrapper over _RecursiveAssembleBD.
1254
1255   @rtype: str or boolean
1256   @return: a C{/dev/...} path for primary nodes, and
1257       C{True} for secondary nodes
1258
1259   """
1260   status = True
1261   result = "no error information"
1262   try:
1263     result = _RecursiveAssembleBD(disk, owner, as_primary)
1264     if isinstance(result, bdev.BlockDev):
1265       result = result.dev_path
1266   except errors.BlockDeviceError, err:
1267     result = "Error while assembling disk: %s" % str(err)
1268     status = False
1269   return (status, result)
1270
1271
1272 def BlockdevShutdown(disk):
1273   """Shut down a block device.
1274
1275   First, if the device is assembled (Attach() is successfull), then
1276   the device is shutdown. Then the children of the device are
1277   shutdown.
1278
1279   This function is called recursively. Note that we don't cache the
1280   children or such, as oppossed to assemble, shutdown of different
1281   devices doesn't require that the upper device was active.
1282
1283   @type disk: L{objects.Disk}
1284   @param disk: the description of the disk we should
1285       shutdown
1286   @rtype: boolean
1287   @return: the success of the operation
1288
1289   """
1290   msgs = []
1291   result = True
1292   r_dev = _RecursiveFindBD(disk)
1293   if r_dev is not None:
1294     r_path = r_dev.dev_path
1295     try:
1296       r_dev.Shutdown()
1297       DevCacheManager.RemoveCache(r_path)
1298     except errors.BlockDeviceError, err:
1299       msgs.append(str(err))
1300       result = False
1301
1302   if disk.children:
1303     for child in disk.children:
1304       c_status, c_msg = BlockdevShutdown(child)
1305       result = result and c_status
1306       if c_msg: # not an empty message
1307         msgs.append(c_msg)
1308
1309   return (result, "; ".join(msgs))
1310
1311
1312 def BlockdevAddchildren(parent_cdev, new_cdevs):
1313   """Extend a mirrored block device.
1314
1315   @type parent_cdev: L{objects.Disk}
1316   @param parent_cdev: the disk to which we should add children
1317   @type new_cdevs: list of L{objects.Disk}
1318   @param new_cdevs: the list of children which we should add
1319   @rtype: boolean
1320   @return: the success of the operation
1321
1322   """
1323   parent_bdev = _RecursiveFindBD(parent_cdev)
1324   if parent_bdev is None:
1325     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1326   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1327   if new_bdevs.count(None) > 0:
1328     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1329   parent_bdev.AddChildren(new_bdevs)
1330   return (True, None)
1331
1332
1333 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1334   """Shrink a mirrored block device.
1335
1336   @type parent_cdev: L{objects.Disk}
1337   @param parent_cdev: the disk from which we should remove children
1338   @type new_cdevs: list of L{objects.Disk}
1339   @param new_cdevs: the list of children which we should remove
1340   @rtype: boolean
1341   @return: the success of the operation
1342
1343   """
1344   parent_bdev = _RecursiveFindBD(parent_cdev)
1345   if parent_bdev is None:
1346     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1347   devs = []
1348   for disk in new_cdevs:
1349     rpath = disk.StaticDevPath()
1350     if rpath is None:
1351       bd = _RecursiveFindBD(disk)
1352       if bd is None:
1353         _Fail("Can't find device %s while removing children", disk)
1354       else:
1355         devs.append(bd.dev_path)
1356     else:
1357       devs.append(rpath)
1358   parent_bdev.RemoveChildren(devs)
1359   return (True, None)
1360
1361
1362 def BlockdevGetmirrorstatus(disks):
1363   """Get the mirroring status of a list of devices.
1364
1365   @type disks: list of L{objects.Disk}
1366   @param disks: the list of disks which we should query
1367   @rtype: disk
1368   @return:
1369       a list of (mirror_done, estimated_time) tuples, which
1370       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1371   @raise errors.BlockDeviceError: if any of the disks cannot be
1372       found
1373
1374   """
1375   stats = []
1376   for dsk in disks:
1377     rbd = _RecursiveFindBD(dsk)
1378     if rbd is None:
1379       _Fail("Can't find device %s", dsk)
1380     stats.append(rbd.CombinedSyncStatus())
1381   return True, stats
1382
1383
1384 def _RecursiveFindBD(disk):
1385   """Check if a device is activated.
1386
1387   If so, return informations about the real device.
1388
1389   @type disk: L{objects.Disk}
1390   @param disk: the disk object we need to find
1391
1392   @return: None if the device can't be found,
1393       otherwise the device instance
1394
1395   """
1396   children = []
1397   if disk.children:
1398     for chdisk in disk.children:
1399       children.append(_RecursiveFindBD(chdisk))
1400
1401   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1402
1403
1404 def BlockdevFind(disk):
1405   """Check if a device is activated.
1406
1407   If it is, return informations about the real device.
1408
1409   @type disk: L{objects.Disk}
1410   @param disk: the disk to find
1411   @rtype: None or tuple
1412   @return: None if the disk cannot be found, otherwise a
1413       tuple (device_path, major, minor, sync_percent,
1414       estimated_time, is_degraded)
1415
1416   """
1417   try:
1418     rbd = _RecursiveFindBD(disk)
1419   except errors.BlockDeviceError, err:
1420     _Fail("Failed to find device: %s", err, exc=True)
1421   if rbd is None:
1422     return (True, None)
1423   return (True, (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus())
1424
1425
1426 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1427   """Write a file to the filesystem.
1428
1429   This allows the master to overwrite(!) a file. It will only perform
1430   the operation if the file belongs to a list of configuration files.
1431
1432   @type file_name: str
1433   @param file_name: the target file name
1434   @type data: str
1435   @param data: the new contents of the file
1436   @type mode: int
1437   @param mode: the mode to give the file (can be None)
1438   @type uid: int
1439   @param uid: the owner of the file (can be -1 for default)
1440   @type gid: int
1441   @param gid: the group of the file (can be -1 for default)
1442   @type atime: float
1443   @param atime: the atime to set on the file (can be None)
1444   @type mtime: float
1445   @param mtime: the mtime to set on the file (can be None)
1446   @rtype: boolean
1447   @return: the success of the operation; errors are logged
1448       in the node daemon log
1449
1450   """
1451   if not os.path.isabs(file_name):
1452     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1453
1454   allowed_files = set([
1455     constants.CLUSTER_CONF_FILE,
1456     constants.ETC_HOSTS,
1457     constants.SSH_KNOWN_HOSTS_FILE,
1458     constants.VNC_PASSWORD_FILE,
1459     constants.RAPI_CERT_FILE,
1460     constants.RAPI_USERS_FILE,
1461     ])
1462
1463   for hv_name in constants.HYPER_TYPES:
1464     hv_class = hypervisor.GetHypervisor(hv_name)
1465     allowed_files.update(hv_class.GetAncillaryFiles())
1466
1467   if file_name not in allowed_files:
1468     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1469           file_name)
1470
1471   raw_data = _Decompress(data)
1472
1473   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1474                   atime=atime, mtime=mtime)
1475   return (True, "success")
1476
1477
1478 def WriteSsconfFiles(values):
1479   """Update all ssconf files.
1480
1481   Wrapper around the SimpleStore.WriteFiles.
1482
1483   """
1484   ssconf.SimpleStore().WriteFiles(values)
1485
1486
1487 def _ErrnoOrStr(err):
1488   """Format an EnvironmentError exception.
1489
1490   If the L{err} argument has an errno attribute, it will be looked up
1491   and converted into a textual C{E...} description. Otherwise the
1492   string representation of the error will be returned.
1493
1494   @type err: L{EnvironmentError}
1495   @param err: the exception to format
1496
1497   """
1498   if hasattr(err, 'errno'):
1499     detail = errno.errorcode[err.errno]
1500   else:
1501     detail = str(err)
1502   return detail
1503
1504
1505 def _OSOndiskVersion(name, os_dir):
1506   """Compute and return the API version of a given OS.
1507
1508   This function will try to read the API version of the OS given by
1509   the 'name' parameter and residing in the 'os_dir' directory.
1510
1511   @type name: str
1512   @param name: the OS name we should look for
1513   @type os_dir: str
1514   @param os_dir: the directory inwhich we should look for the OS
1515   @rtype: int or None
1516   @return:
1517       Either an integer denoting the version or None in the
1518       case when this is not a valid OS name.
1519   @raise errors.InvalidOS: if the OS cannot be found
1520
1521   """
1522   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1523
1524   try:
1525     st = os.stat(api_file)
1526   except EnvironmentError, err:
1527     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1528                            " found (%s)" % _ErrnoOrStr(err))
1529
1530   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1531     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1532                            " a regular file")
1533
1534   try:
1535     f = open(api_file)
1536     try:
1537       api_versions = f.readlines()
1538     finally:
1539       f.close()
1540   except EnvironmentError, err:
1541     raise errors.InvalidOS(name, os_dir, "error while reading the"
1542                            " API version (%s)" % _ErrnoOrStr(err))
1543
1544   api_versions = [version.strip() for version in api_versions]
1545   try:
1546     api_versions = [int(version) for version in api_versions]
1547   except (TypeError, ValueError), err:
1548     raise errors.InvalidOS(name, os_dir,
1549                            "API version is not integer (%s)" % str(err))
1550
1551   return api_versions
1552
1553
1554 def DiagnoseOS(top_dirs=None):
1555   """Compute the validity for all OSes.
1556
1557   @type top_dirs: list
1558   @param top_dirs: the list of directories in which to
1559       search (if not given defaults to
1560       L{constants.OS_SEARCH_PATH})
1561   @rtype: list of L{objects.OS}
1562   @return: an OS object for each name in all the given
1563       directories
1564
1565   """
1566   if top_dirs is None:
1567     top_dirs = constants.OS_SEARCH_PATH
1568
1569   result = []
1570   for dir_name in top_dirs:
1571     if os.path.isdir(dir_name):
1572       try:
1573         f_names = utils.ListVisibleFiles(dir_name)
1574       except EnvironmentError, err:
1575         logging.exception("Can't list the OS directory %s", dir_name)
1576         break
1577       for name in f_names:
1578         try:
1579           os_inst = OSFromDisk(name, base_dir=dir_name)
1580           result.append(os_inst)
1581         except errors.InvalidOS, err:
1582           result.append(objects.OS.FromInvalidOS(err))
1583
1584   return result
1585
1586
1587 def OSFromDisk(name, base_dir=None):
1588   """Create an OS instance from disk.
1589
1590   This function will return an OS instance if the given name is a
1591   valid OS name. Otherwise, it will raise an appropriate
1592   L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1593
1594   @type base_dir: string
1595   @keyword base_dir: Base directory containing OS installations.
1596                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1597   @rtype: L{objects.OS}
1598   @return: the OS instance if we find a valid one
1599   @raise errors.InvalidOS: if we don't find a valid OS
1600
1601   """
1602   if base_dir is None:
1603     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1604     if os_dir is None:
1605       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1606   else:
1607     os_dir = os.path.sep.join([base_dir, name])
1608
1609   api_versions = _OSOndiskVersion(name, os_dir)
1610
1611   if constants.OS_API_VERSION not in api_versions:
1612     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1613                            " (found %s want %s)"
1614                            % (api_versions, constants.OS_API_VERSION))
1615
1616   # OS Scripts dictionary, we will populate it with the actual script names
1617   os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1618
1619   for script in os_scripts:
1620     os_scripts[script] = os.path.sep.join([os_dir, script])
1621
1622     try:
1623       st = os.stat(os_scripts[script])
1624     except EnvironmentError, err:
1625       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1626                              (script, _ErrnoOrStr(err)))
1627
1628     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1629       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1630                              script)
1631
1632     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1633       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1634                              script)
1635
1636
1637   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1638                     create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1639                     export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1640                     import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1641                     rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1642                     api_versions=api_versions)
1643
1644 def OSEnvironment(instance, debug=0):
1645   """Calculate the environment for an os script.
1646
1647   @type instance: L{objects.Instance}
1648   @param instance: target instance for the os script run
1649   @type debug: integer
1650   @param debug: debug level (0 or 1, for OS Api 10)
1651   @rtype: dict
1652   @return: dict of environment variables
1653   @raise errors.BlockDeviceError: if the block device
1654       cannot be found
1655
1656   """
1657   result = {}
1658   result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1659   result['INSTANCE_NAME'] = instance.name
1660   result['INSTANCE_OS'] = instance.os
1661   result['HYPERVISOR'] = instance.hypervisor
1662   result['DISK_COUNT'] = '%d' % len(instance.disks)
1663   result['NIC_COUNT'] = '%d' % len(instance.nics)
1664   result['DEBUG_LEVEL'] = '%d' % debug
1665   for idx, disk in enumerate(instance.disks):
1666     real_disk = _RecursiveFindBD(disk)
1667     if real_disk is None:
1668       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1669                                     str(disk))
1670     real_disk.Open()
1671     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1672     result['DISK_%d_ACCESS' % idx] = disk.mode
1673     if constants.HV_DISK_TYPE in instance.hvparams:
1674       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1675         instance.hvparams[constants.HV_DISK_TYPE]
1676     if disk.dev_type in constants.LDS_BLOCK:
1677       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1678     elif disk.dev_type == constants.LD_FILE:
1679       result['DISK_%d_BACKEND_TYPE' % idx] = \
1680         'file:%s' % disk.physical_id[0]
1681   for idx, nic in enumerate(instance.nics):
1682     result['NIC_%d_MAC' % idx] = nic.mac
1683     if nic.ip:
1684       result['NIC_%d_IP' % idx] = nic.ip
1685     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1686     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1687       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1688     if nic.nicparams[constants.NIC_LINK]:
1689       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1690     if constants.HV_NIC_TYPE in instance.hvparams:
1691       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1692         instance.hvparams[constants.HV_NIC_TYPE]
1693
1694   return result
1695
1696 def BlockdevGrow(disk, amount):
1697   """Grow a stack of block devices.
1698
1699   This function is called recursively, with the childrens being the
1700   first ones to resize.
1701
1702   @type disk: L{objects.Disk}
1703   @param disk: the disk to be grown
1704   @rtype: (status, result)
1705   @return: a tuple with the status of the operation
1706       (True/False), and the errors message if status
1707       is False
1708
1709   """
1710   r_dev = _RecursiveFindBD(disk)
1711   if r_dev is None:
1712     return False, "Cannot find block device %s" % (disk,)
1713
1714   try:
1715     r_dev.Grow(amount)
1716   except errors.BlockDeviceError, err:
1717     _Fail("Failed to grow block device: %s", err, exc=True)
1718
1719   return True, None
1720
1721
1722 def BlockdevSnapshot(disk):
1723   """Create a snapshot copy of a block device.
1724
1725   This function is called recursively, and the snapshot is actually created
1726   just for the leaf lvm backend device.
1727
1728   @type disk: L{objects.Disk}
1729   @param disk: the disk to be snapshotted
1730   @rtype: string
1731   @return: snapshot disk path
1732
1733   """
1734   if disk.children:
1735     if len(disk.children) == 1:
1736       # only one child, let's recurse on it
1737       return BlockdevSnapshot(disk.children[0])
1738     else:
1739       # more than one child, choose one that matches
1740       for child in disk.children:
1741         if child.size == disk.size:
1742           # return implies breaking the loop
1743           return BlockdevSnapshot(child)
1744   elif disk.dev_type == constants.LD_LV:
1745     r_dev = _RecursiveFindBD(disk)
1746     if r_dev is not None:
1747       # let's stay on the safe side and ask for the full size, for now
1748       return True, r_dev.Snapshot(disk.size)
1749     else:
1750       _Fail("Cannot find block device %s", disk)
1751   else:
1752     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1753           disk.unique_id, disk.dev_type)
1754
1755
1756 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1757   """Export a block device snapshot to a remote node.
1758
1759   @type disk: L{objects.Disk}
1760   @param disk: the description of the disk to export
1761   @type dest_node: str
1762   @param dest_node: the destination node to export to
1763   @type instance: L{objects.Instance}
1764   @param instance: the instance object to whom the disk belongs
1765   @type cluster_name: str
1766   @param cluster_name: the cluster name, needed for SSH hostalias
1767   @type idx: int
1768   @param idx: the index of the disk in the instance's disk list,
1769       used to export to the OS scripts environment
1770   @rtype: boolean
1771   @return: the success of the operation
1772
1773   """
1774   export_env = OSEnvironment(instance)
1775
1776   inst_os = OSFromDisk(instance.os)
1777   export_script = inst_os.export_script
1778
1779   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1780                                      instance.name, int(time.time()))
1781   if not os.path.exists(constants.LOG_OS_DIR):
1782     os.mkdir(constants.LOG_OS_DIR, 0750)
1783   real_disk = _RecursiveFindBD(disk)
1784   if real_disk is None:
1785     _Fail("Block device '%s' is not set up", disk)
1786
1787   real_disk.Open()
1788
1789   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1790   export_env['EXPORT_INDEX'] = str(idx)
1791
1792   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1793   destfile = disk.physical_id[1]
1794
1795   # the target command is built out of three individual commands,
1796   # which are joined by pipes; we check each individual command for
1797   # valid parameters
1798   expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1799                                export_script, logfile)
1800
1801   comprcmd = "gzip"
1802
1803   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1804                                 destdir, destdir, destfile)
1805   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1806                                                    constants.GANETI_RUNAS,
1807                                                    destcmd)
1808
1809   # all commands have been checked, so we're safe to combine them
1810   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1811
1812   result = utils.RunCmd(command, env=export_env)
1813
1814   if result.failed:
1815     _Fail("OS snapshot export command '%s' returned error: %s"
1816           " output: %s", command, result.fail_reason, result.output)
1817
1818   return (True, None)
1819
1820
1821 def FinalizeExport(instance, snap_disks):
1822   """Write out the export configuration information.
1823
1824   @type instance: L{objects.Instance}
1825   @param instance: the instance which we export, used for
1826       saving configuration
1827   @type snap_disks: list of L{objects.Disk}
1828   @param snap_disks: list of snapshot block devices, which
1829       will be used to get the actual name of the dump file
1830
1831   @rtype: boolean
1832   @return: the success of the operation
1833
1834   """
1835   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1836   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1837
1838   config = objects.SerializableConfigParser()
1839
1840   config.add_section(constants.INISECT_EXP)
1841   config.set(constants.INISECT_EXP, 'version', '0')
1842   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1843   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1844   config.set(constants.INISECT_EXP, 'os', instance.os)
1845   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1846
1847   config.add_section(constants.INISECT_INS)
1848   config.set(constants.INISECT_INS, 'name', instance.name)
1849   config.set(constants.INISECT_INS, 'memory', '%d' %
1850              instance.beparams[constants.BE_MEMORY])
1851   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1852              instance.beparams[constants.BE_VCPUS])
1853   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1854
1855   nic_total = 0
1856   for nic_count, nic in enumerate(instance.nics):
1857     nic_total += 1
1858     config.set(constants.INISECT_INS, 'nic%d_mac' %
1859                nic_count, '%s' % nic.mac)
1860     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1861     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1862                '%s' % nic.bridge)
1863   # TODO: redundant: on load can read nics until it doesn't exist
1864   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1865
1866   disk_total = 0
1867   for disk_count, disk in enumerate(snap_disks):
1868     if disk:
1869       disk_total += 1
1870       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1871                  ('%s' % disk.iv_name))
1872       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1873                  ('%s' % disk.physical_id[1]))
1874       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1875                  ('%d' % disk.size))
1876
1877   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1878
1879   utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1880                   data=config.Dumps())
1881   shutil.rmtree(finaldestdir, True)
1882   shutil.move(destdir, finaldestdir)
1883
1884   return True, None
1885
1886
1887 def ExportInfo(dest):
1888   """Get export configuration information.
1889
1890   @type dest: str
1891   @param dest: directory containing the export
1892
1893   @rtype: L{objects.SerializableConfigParser}
1894   @return: a serializable config file containing the
1895       export info
1896
1897   """
1898   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1899
1900   config = objects.SerializableConfigParser()
1901   config.read(cff)
1902
1903   if (not config.has_section(constants.INISECT_EXP) or
1904       not config.has_section(constants.INISECT_INS)):
1905     _Fail("Export info file doesn't have the required fields")
1906
1907   return True, config.Dumps()
1908
1909
1910 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1911   """Import an os image into an instance.
1912
1913   @type instance: L{objects.Instance}
1914   @param instance: instance to import the disks into
1915   @type src_node: string
1916   @param src_node: source node for the disk images
1917   @type src_images: list of string
1918   @param src_images: absolute paths of the disk images
1919   @rtype: list of boolean
1920   @return: each boolean represent the success of importing the n-th disk
1921
1922   """
1923   import_env = OSEnvironment(instance)
1924   inst_os = OSFromDisk(instance.os)
1925   import_script = inst_os.import_script
1926
1927   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1928                                         instance.name, int(time.time()))
1929   if not os.path.exists(constants.LOG_OS_DIR):
1930     os.mkdir(constants.LOG_OS_DIR, 0750)
1931
1932   comprcmd = "gunzip"
1933   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1934                                import_script, logfile)
1935
1936   final_result = []
1937   for idx, image in enumerate(src_images):
1938     if image:
1939       destcmd = utils.BuildShellCmd('cat %s', image)
1940       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1941                                                        constants.GANETI_RUNAS,
1942                                                        destcmd)
1943       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1944       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1945       import_env['IMPORT_INDEX'] = str(idx)
1946       result = utils.RunCmd(command, env=import_env)
1947       if result.failed:
1948         logging.error("Disk import command '%s' returned error: %s"
1949                       " output: %s", command, result.fail_reason,
1950                       result.output)
1951         final_result.append("error importing disk %d: %s, %s" %
1952                             (idx, result.fail_reason, result.output[-100]))
1953
1954   if final_result:
1955     return False, "; ".join(final_result)
1956   return True, None
1957
1958
1959 def ListExports():
1960   """Return a list of exports currently available on this machine.
1961
1962   @rtype: list
1963   @return: list of the exports
1964
1965   """
1966   if os.path.isdir(constants.EXPORT_DIR):
1967     return True, utils.ListVisibleFiles(constants.EXPORT_DIR)
1968   else:
1969     return False, "No exports directory"
1970
1971
1972 def RemoveExport(export):
1973   """Remove an existing export from the node.
1974
1975   @type export: str
1976   @param export: the name of the export to remove
1977   @rtype: boolean
1978   @return: the success of the operation
1979
1980   """
1981   target = os.path.join(constants.EXPORT_DIR, export)
1982
1983   try:
1984     shutil.rmtree(target)
1985   except EnvironmentError, err:
1986     _Fail("Error while removing the export: %s", err, exc=True)
1987
1988   return True, None
1989
1990
1991 def BlockdevRename(devlist):
1992   """Rename a list of block devices.
1993
1994   @type devlist: list of tuples
1995   @param devlist: list of tuples of the form  (disk,
1996       new_logical_id, new_physical_id); disk is an
1997       L{objects.Disk} object describing the current disk,
1998       and new logical_id/physical_id is the name we
1999       rename it to
2000   @rtype: boolean
2001   @return: True if all renames succeeded, False otherwise
2002
2003   """
2004   msgs = []
2005   result = True
2006   for disk, unique_id in devlist:
2007     dev = _RecursiveFindBD(disk)
2008     if dev is None:
2009       msgs.append("Can't find device %s in rename" % str(disk))
2010       result = False
2011       continue
2012     try:
2013       old_rpath = dev.dev_path
2014       dev.Rename(unique_id)
2015       new_rpath = dev.dev_path
2016       if old_rpath != new_rpath:
2017         DevCacheManager.RemoveCache(old_rpath)
2018         # FIXME: we should add the new cache information here, like:
2019         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2020         # but we don't have the owner here - maybe parse from existing
2021         # cache? for now, we only lose lvm data when we rename, which
2022         # is less critical than DRBD or MD
2023     except errors.BlockDeviceError, err:
2024       msgs.append("Can't rename device '%s' to '%s': %s" %
2025                   (dev, unique_id, err))
2026       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2027       result = False
2028   return (result, "; ".join(msgs))
2029
2030
2031 def _TransformFileStorageDir(file_storage_dir):
2032   """Checks whether given file_storage_dir is valid.
2033
2034   Checks wheter the given file_storage_dir is within the cluster-wide
2035   default file_storage_dir stored in SimpleStore. Only paths under that
2036   directory are allowed.
2037
2038   @type file_storage_dir: str
2039   @param file_storage_dir: the path to check
2040
2041   @return: the normalized path if valid, None otherwise
2042
2043   """
2044   cfg = _GetConfig()
2045   file_storage_dir = os.path.normpath(file_storage_dir)
2046   base_file_storage_dir = cfg.GetFileStorageDir()
2047   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2048       base_file_storage_dir):
2049     logging.error("file storage directory '%s' is not under base file"
2050                   " storage directory '%s'",
2051                   file_storage_dir, base_file_storage_dir)
2052     return None
2053   return file_storage_dir
2054
2055
2056 def CreateFileStorageDir(file_storage_dir):
2057   """Create file storage directory.
2058
2059   @type file_storage_dir: str
2060   @param file_storage_dir: directory to create
2061
2062   @rtype: tuple
2063   @return: tuple with first element a boolean indicating wheter dir
2064       creation was successful or not
2065
2066   """
2067   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2068   result = True,
2069   if not file_storage_dir:
2070     result = False,
2071   else:
2072     if os.path.exists(file_storage_dir):
2073       if not os.path.isdir(file_storage_dir):
2074         logging.error("'%s' is not a directory", file_storage_dir)
2075         result = False,
2076     else:
2077       try:
2078         os.makedirs(file_storage_dir, 0750)
2079       except OSError, err:
2080         logging.error("Cannot create file storage directory '%s': %s",
2081                       file_storage_dir, err)
2082         result = False,
2083   return result
2084
2085
2086 def RemoveFileStorageDir(file_storage_dir):
2087   """Remove file storage directory.
2088
2089   Remove it only if it's empty. If not log an error and return.
2090
2091   @type file_storage_dir: str
2092   @param file_storage_dir: the directory we should cleanup
2093   @rtype: tuple (success,)
2094   @return: tuple of one element, C{success}, denoting
2095       whether the operation was successfull
2096
2097   """
2098   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2099   result = True,
2100   if not file_storage_dir:
2101     result = False,
2102   else:
2103     if os.path.exists(file_storage_dir):
2104       if not os.path.isdir(file_storage_dir):
2105         logging.error("'%s' is not a directory", file_storage_dir)
2106         result = False,
2107       # deletes dir only if empty, otherwise we want to return False
2108       try:
2109         os.rmdir(file_storage_dir)
2110       except OSError, err:
2111         logging.exception("Cannot remove file storage directory '%s'",
2112                           file_storage_dir)
2113         result = False,
2114   return result
2115
2116
2117 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2118   """Rename the file storage directory.
2119
2120   @type old_file_storage_dir: str
2121   @param old_file_storage_dir: the current path
2122   @type new_file_storage_dir: str
2123   @param new_file_storage_dir: the name we should rename to
2124   @rtype: tuple (success,)
2125   @return: tuple of one element, C{success}, denoting
2126       whether the operation was successful
2127
2128   """
2129   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2130   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2131   result = True,
2132   if not old_file_storage_dir or not new_file_storage_dir:
2133     result = False,
2134   else:
2135     if not os.path.exists(new_file_storage_dir):
2136       if os.path.isdir(old_file_storage_dir):
2137         try:
2138           os.rename(old_file_storage_dir, new_file_storage_dir)
2139         except OSError, err:
2140           logging.exception("Cannot rename '%s' to '%s'",
2141                             old_file_storage_dir, new_file_storage_dir)
2142           result =  False,
2143       else:
2144         logging.error("'%s' is not a directory", old_file_storage_dir)
2145         result = False,
2146     else:
2147       if os.path.exists(old_file_storage_dir):
2148         logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
2149                       old_file_storage_dir, new_file_storage_dir)
2150         result = False,
2151   return result
2152
2153
2154 def _IsJobQueueFile(file_name):
2155   """Checks whether the given filename is in the queue directory.
2156
2157   @type file_name: str
2158   @param file_name: the file name we should check
2159   @rtype: boolean
2160   @return: whether the file is under the queue directory
2161
2162   """
2163   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2164   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2165
2166   if not result:
2167     logging.error("'%s' is not a file in the queue directory",
2168                   file_name)
2169
2170   return result
2171
2172
2173 def JobQueueUpdate(file_name, content):
2174   """Updates a file in the queue directory.
2175
2176   This is just a wrapper over L{utils.WriteFile}, with proper
2177   checking.
2178
2179   @type file_name: str
2180   @param file_name: the job file name
2181   @type content: str
2182   @param content: the new job contents
2183   @rtype: boolean
2184   @return: the success of the operation
2185
2186   """
2187   if not _IsJobQueueFile(file_name):
2188     return False
2189
2190   # Write and replace the file atomically
2191   utils.WriteFile(file_name, data=_Decompress(content))
2192
2193   return True
2194
2195
2196 def JobQueueRename(old, new):
2197   """Renames a job queue file.
2198
2199   This is just a wrapper over os.rename with proper checking.
2200
2201   @type old: str
2202   @param old: the old (actual) file name
2203   @type new: str
2204   @param new: the desired file name
2205   @rtype: boolean
2206   @return: the success of the operation
2207
2208   """
2209   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2210     return False
2211
2212   utils.RenameFile(old, new, mkdir=True)
2213
2214   return True
2215
2216
2217 def JobQueueSetDrainFlag(drain_flag):
2218   """Set the drain flag for the queue.
2219
2220   This will set or unset the queue drain flag.
2221
2222   @type drain_flag: boolean
2223   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2224   @rtype: boolean
2225   @return: always True
2226   @warning: the function always returns True
2227
2228   """
2229   if drain_flag:
2230     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2231   else:
2232     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2233
2234   return True
2235
2236
2237 def BlockdevClose(instance_name, disks):
2238   """Closes the given block devices.
2239
2240   This means they will be switched to secondary mode (in case of
2241   DRBD).
2242
2243   @param instance_name: if the argument is not empty, the symlinks
2244       of this instance will be removed
2245   @type disks: list of L{objects.Disk}
2246   @param disks: the list of disks to be closed
2247   @rtype: tuple (success, message)
2248   @return: a tuple of success and message, where success
2249       indicates the succes of the operation, and message
2250       which will contain the error details in case we
2251       failed
2252
2253   """
2254   bdevs = []
2255   for cf in disks:
2256     rd = _RecursiveFindBD(cf)
2257     if rd is None:
2258       _Fail("Can't find device %s", cf)
2259     bdevs.append(rd)
2260
2261   msg = []
2262   for rd in bdevs:
2263     try:
2264       rd.Close()
2265     except errors.BlockDeviceError, err:
2266       msg.append(str(err))
2267   if msg:
2268     return (False, "Can't make devices secondary: %s" % ",".join(msg))
2269   else:
2270     if instance_name:
2271       _RemoveBlockDevLinks(instance_name, disks)
2272     return (True, "All devices secondary")
2273
2274
2275 def ValidateHVParams(hvname, hvparams):
2276   """Validates the given hypervisor parameters.
2277
2278   @type hvname: string
2279   @param hvname: the hypervisor name
2280   @type hvparams: dict
2281   @param hvparams: the hypervisor parameters to be validated
2282   @rtype: tuple (success, message)
2283   @return: a tuple of success and message, where success
2284       indicates the succes of the operation, and message
2285       which will contain the error details in case we
2286       failed
2287
2288   """
2289   try:
2290     hv_type = hypervisor.GetHypervisor(hvname)
2291     hv_type.ValidateParameters(hvparams)
2292     return (True, "Validation passed")
2293   except errors.HypervisorError, err:
2294     return (False, str(err))
2295
2296
2297 def DemoteFromMC():
2298   """Demotes the current node from master candidate role.
2299
2300   """
2301   # try to ensure we're not the master by mistake
2302   master, myself = ssconf.GetMasterAndMyself()
2303   if master == myself:
2304     return (False, "ssconf status shows I'm the master node, will not demote")
2305   pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2306   if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2307     return (False, "The master daemon is running, will not demote")
2308   try:
2309     utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2310   except EnvironmentError, err:
2311     if err.errno != errno.ENOENT:
2312       return (False, "Error while backing up cluster file: %s" % str(err))
2313   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2314   return (True, "Done")
2315
2316
2317 def _FindDisks(nodes_ip, disks):
2318   """Sets the physical ID on disks and returns the block devices.
2319
2320   """
2321   # set the correct physical ID
2322   my_name = utils.HostInfo().name
2323   for cf in disks:
2324     cf.SetPhysicalID(my_name, nodes_ip)
2325
2326   bdevs = []
2327
2328   for cf in disks:
2329     rd = _RecursiveFindBD(cf)
2330     if rd is None:
2331       return (False, "Can't find device %s" % cf)
2332     bdevs.append(rd)
2333   return (True, bdevs)
2334
2335
2336 def DrbdDisconnectNet(nodes_ip, disks):
2337   """Disconnects the network on a list of drbd devices.
2338
2339   """
2340   status, bdevs = _FindDisks(nodes_ip, disks)
2341   if not status:
2342     return status, bdevs
2343
2344   # disconnect disks
2345   for rd in bdevs:
2346     try:
2347       rd.DisconnectNet()
2348     except errors.BlockDeviceError, err:
2349       _Fail("Can't change network configuration to standalone mode: %s",
2350             err, exc=True)
2351   return (True, "All disks are now disconnected")
2352
2353
2354 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2355   """Attaches the network on a list of drbd devices.
2356
2357   """
2358   status, bdevs = _FindDisks(nodes_ip, disks)
2359   if not status:
2360     return status, bdevs
2361
2362   if multimaster:
2363     for idx, rd in enumerate(bdevs):
2364       try:
2365         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2366       except EnvironmentError, err:
2367         _Fail("Can't create symlink: %s", err)
2368   # reconnect disks, switch to new master configuration and if
2369   # needed primary mode
2370   for rd in bdevs:
2371     try:
2372       rd.AttachNet(multimaster)
2373     except errors.BlockDeviceError, err:
2374       _Fail("Can't change network configuration: %s", err)
2375   # wait until the disks are connected; we need to retry the re-attach
2376   # if the device becomes standalone, as this might happen if the one
2377   # node disconnects and reconnects in a different mode before the
2378   # other node reconnects; in this case, one or both of the nodes will
2379   # decide it has wrong configuration and switch to standalone
2380   RECONNECT_TIMEOUT = 2 * 60
2381   sleep_time = 0.100 # start with 100 miliseconds
2382   timeout_limit = time.time() + RECONNECT_TIMEOUT
2383   while time.time() < timeout_limit:
2384     all_connected = True
2385     for rd in bdevs:
2386       stats = rd.GetProcStatus()
2387       if not (stats.is_connected or stats.is_in_resync):
2388         all_connected = False
2389       if stats.is_standalone:
2390         # peer had different config info and this node became
2391         # standalone, even though this should not happen with the
2392         # new staged way of changing disk configs
2393         try:
2394           rd.ReAttachNet(multimaster)
2395         except errors.BlockDeviceError, err:
2396           _Fail("Can't change network configuration: %s", err)
2397     if all_connected:
2398       break
2399     time.sleep(sleep_time)
2400     sleep_time = min(5, sleep_time * 1.5)
2401   if not all_connected:
2402     return (False, "Timeout in disk reconnecting")
2403   if multimaster:
2404     # change to primary mode
2405     for rd in bdevs:
2406       try:
2407         rd.Open()
2408       except errors.BlockDeviceError, err:
2409         _Fail("Can't change to primary mode: %s", err)
2410   if multimaster:
2411     msg = "multi-master and primary"
2412   else:
2413     msg = "single-master"
2414   return (True, "Disks are now configured as %s" % msg)
2415
2416
2417 def DrbdWaitSync(nodes_ip, disks):
2418   """Wait until DRBDs have synchronized.
2419
2420   """
2421   status, bdevs = _FindDisks(nodes_ip, disks)
2422   if not status:
2423     return status, bdevs
2424
2425   min_resync = 100
2426   alldone = True
2427   failure = False
2428   for rd in bdevs:
2429     stats = rd.GetProcStatus()
2430     if not (stats.is_connected or stats.is_in_resync):
2431       failure = True
2432       break
2433     alldone = alldone and (not stats.is_in_resync)
2434     if stats.sync_percent is not None:
2435       min_resync = min(min_resync, stats.sync_percent)
2436   return (not failure, (alldone, min_resync))
2437
2438
2439 def PowercycleNode(hypervisor_type):
2440   """Hard-powercycle the node.
2441
2442   Because we need to return first, and schedule the powercycle in the
2443   background, we won't be able to report failures nicely.
2444
2445   """
2446   hyper = hypervisor.GetHypervisor(hypervisor_type)
2447   try:
2448     pid = os.fork()
2449   except OSError, err:
2450     # if we can't fork, we'll pretend that we're in the child process
2451     pid = 0
2452   if pid > 0:
2453     return (True, "Reboot scheduled in 5 seconds")
2454   time.sleep(5)
2455   hyper.PowercycleNode()
2456
2457
2458 class HooksRunner(object):
2459   """Hook runner.
2460
2461   This class is instantiated on the node side (ganeti-noded) and not
2462   on the master side.
2463
2464   """
2465   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2466
2467   def __init__(self, hooks_base_dir=None):
2468     """Constructor for hooks runner.
2469
2470     @type hooks_base_dir: str or None
2471     @param hooks_base_dir: if not None, this overrides the
2472         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2473
2474     """
2475     if hooks_base_dir is None:
2476       hooks_base_dir = constants.HOOKS_BASE_DIR
2477     self._BASE_DIR = hooks_base_dir
2478
2479   @staticmethod
2480   def ExecHook(script, env):
2481     """Exec one hook script.
2482
2483     @type script: str
2484     @param script: the full path to the script
2485     @type env: dict
2486     @param env: the environment with which to exec the script
2487     @rtype: tuple (success, message)
2488     @return: a tuple of success and message, where success
2489         indicates the succes of the operation, and message
2490         which will contain the error details in case we
2491         failed
2492
2493     """
2494     # exec the process using subprocess and log the output
2495     fdstdin = None
2496     try:
2497       fdstdin = open("/dev/null", "r")
2498       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2499                                stderr=subprocess.STDOUT, close_fds=True,
2500                                shell=False, cwd="/", env=env)
2501       output = ""
2502       try:
2503         output = child.stdout.read(4096)
2504         child.stdout.close()
2505       except EnvironmentError, err:
2506         output += "Hook script error: %s" % str(err)
2507
2508       while True:
2509         try:
2510           result = child.wait()
2511           break
2512         except EnvironmentError, err:
2513           if err.errno == errno.EINTR:
2514             continue
2515           raise
2516     finally:
2517       # try not to leak fds
2518       for fd in (fdstdin, ):
2519         if fd is not None:
2520           try:
2521             fd.close()
2522           except EnvironmentError, err:
2523             # just log the error
2524             #logging.exception("Error while closing fd %s", fd)
2525             pass
2526
2527     return result == 0, utils.SafeEncode(output.strip())
2528
2529   def RunHooks(self, hpath, phase, env):
2530     """Run the scripts in the hooks directory.
2531
2532     @type hpath: str
2533     @param hpath: the path to the hooks directory which
2534         holds the scripts
2535     @type phase: str
2536     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2537         L{constants.HOOKS_PHASE_POST}
2538     @type env: dict
2539     @param env: dictionary with the environment for the hook
2540     @rtype: list
2541     @return: list of 3-element tuples:
2542       - script path
2543       - script result, either L{constants.HKR_SUCCESS} or
2544         L{constants.HKR_FAIL}
2545       - output of the script
2546
2547     @raise errors.ProgrammerError: for invalid input
2548         parameters
2549
2550     """
2551     if phase == constants.HOOKS_PHASE_PRE:
2552       suffix = "pre"
2553     elif phase == constants.HOOKS_PHASE_POST:
2554       suffix = "post"
2555     else:
2556       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2557     rr = []
2558
2559     subdir = "%s-%s.d" % (hpath, suffix)
2560     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2561     try:
2562       dir_contents = utils.ListVisibleFiles(dir_name)
2563     except OSError, err:
2564       # FIXME: must log output in case of failures
2565       return rr
2566
2567     # we use the standard python sort order,
2568     # so 00name is the recommended naming scheme
2569     dir_contents.sort()
2570     for relname in dir_contents:
2571       fname = os.path.join(dir_name, relname)
2572       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2573           self.RE_MASK.match(relname) is not None):
2574         rrval = constants.HKR_SKIP
2575         output = ""
2576       else:
2577         result, output = self.ExecHook(fname, env)
2578         if not result:
2579           rrval = constants.HKR_FAIL
2580         else:
2581           rrval = constants.HKR_SUCCESS
2582       rr.append(("%s/%s" % (subdir, relname), rrval, output))
2583
2584     return rr
2585
2586
2587 class IAllocatorRunner(object):
2588   """IAllocator runner.
2589
2590   This class is instantiated on the node side (ganeti-noded) and not on
2591   the master side.
2592
2593   """
2594   def Run(self, name, idata):
2595     """Run an iallocator script.
2596
2597     @type name: str
2598     @param name: the iallocator script name
2599     @type idata: str
2600     @param idata: the allocator input data
2601
2602     @rtype: tuple
2603     @return: four element tuple of:
2604        - run status (one of the IARUN_ constants)
2605        - stdout
2606        - stderr
2607        - fail reason (as from L{utils.RunResult})
2608
2609     """
2610     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2611                                   os.path.isfile)
2612     if alloc_script is None:
2613       return (constants.IARUN_NOTFOUND, None, None, None)
2614
2615     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2616     try:
2617       os.write(fd, idata)
2618       os.close(fd)
2619       result = utils.RunCmd([alloc_script, fin_name])
2620       if result.failed:
2621         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2622                 result.fail_reason)
2623     finally:
2624       os.unlink(fin_name)
2625
2626     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2627
2628
2629 class DevCacheManager(object):
2630   """Simple class for managing a cache of block device information.
2631
2632   """
2633   _DEV_PREFIX = "/dev/"
2634   _ROOT_DIR = constants.BDEV_CACHE_DIR
2635
2636   @classmethod
2637   def _ConvertPath(cls, dev_path):
2638     """Converts a /dev/name path to the cache file name.
2639
2640     This replaces slashes with underscores and strips the /dev
2641     prefix. It then returns the full path to the cache file.
2642
2643     @type dev_path: str
2644     @param dev_path: the C{/dev/} path name
2645     @rtype: str
2646     @return: the converted path name
2647
2648     """
2649     if dev_path.startswith(cls._DEV_PREFIX):
2650       dev_path = dev_path[len(cls._DEV_PREFIX):]
2651     dev_path = dev_path.replace("/", "_")
2652     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2653     return fpath
2654
2655   @classmethod
2656   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2657     """Updates the cache information for a given device.
2658
2659     @type dev_path: str
2660     @param dev_path: the pathname of the device
2661     @type owner: str
2662     @param owner: the owner (instance name) of the device
2663     @type on_primary: bool
2664     @param on_primary: whether this is the primary
2665         node nor not
2666     @type iv_name: str
2667     @param iv_name: the instance-visible name of the
2668         device, as in objects.Disk.iv_name
2669
2670     @rtype: None
2671
2672     """
2673     if dev_path is None:
2674       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2675       return
2676     fpath = cls._ConvertPath(dev_path)
2677     if on_primary:
2678       state = "primary"
2679     else:
2680       state = "secondary"
2681     if iv_name is None:
2682       iv_name = "not_visible"
2683     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2684     try:
2685       utils.WriteFile(fpath, data=fdata)
2686     except EnvironmentError, err:
2687       logging.exception("Can't update bdev cache for %s", dev_path)
2688
2689   @classmethod
2690   def RemoveCache(cls, dev_path):
2691     """Remove data for a dev_path.
2692
2693     This is just a wrapper over L{utils.RemoveFile} with a converted
2694     path name and logging.
2695
2696     @type dev_path: str
2697     @param dev_path: the pathname of the device
2698
2699     @rtype: None
2700
2701     """
2702     if dev_path is None:
2703       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2704       return
2705     fpath = cls._ConvertPath(dev_path)
2706     try:
2707       utils.RemoveFile(fpath)
2708     except EnvironmentError, err:
2709       logging.exception("Can't update bdev cache for %s", dev_path)