Convert export_list rpc to new style result
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36 import zlib
37 import base64
38
39 from ganeti import errors
40 from ganeti import utils
41 from ganeti import ssh
42 from ganeti import hypervisor
43 from ganeti import constants
44 from ganeti import bdev
45 from ganeti import objects
46 from ganeti import ssconf
47
48
49 class RPCFail(Exception):
50   """Class denoting RPC failure.
51
52   Its argument is the error message.
53
54   """
55
56 def _Fail(msg, *args, **kwargs):
57   """Log an error and the raise an RPCFail exception.
58
59   This exception is then handled specially in the ganeti daemon and
60   turned into a 'failed' return type. As such, this function is a
61   useful shortcut for logging the error and returning it to the master
62   daemon.
63
64   @type msg: string
65   @param msg: the text of the exception
66   @raise RPCFail
67
68   """
69   if args:
70     msg = msg % args
71   if "exc" in kwargs and kwargs["exc"]:
72     logging.exception(msg)
73   else:
74     logging.error(msg)
75   raise RPCFail(msg)
76
77
78 def _GetConfig():
79   """Simple wrapper to return a SimpleStore.
80
81   @rtype: L{ssconf.SimpleStore}
82   @return: a SimpleStore instance
83
84   """
85   return ssconf.SimpleStore()
86
87
88 def _GetSshRunner(cluster_name):
89   """Simple wrapper to return an SshRunner.
90
91   @type cluster_name: str
92   @param cluster_name: the cluster name, which is needed
93       by the SshRunner constructor
94   @rtype: L{ssh.SshRunner}
95   @return: an SshRunner instance
96
97   """
98   return ssh.SshRunner(cluster_name)
99
100
101 def _Decompress(data):
102   """Unpacks data compressed by the RPC client.
103
104   @type data: list or tuple
105   @param data: Data sent by RPC client
106   @rtype: str
107   @return: Decompressed data
108
109   """
110   assert isinstance(data, (list, tuple))
111   assert len(data) == 2
112   (encoding, content) = data
113   if encoding == constants.RPC_ENCODING_NONE:
114     return content
115   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
116     return zlib.decompress(base64.b64decode(content))
117   else:
118     raise AssertionError("Unknown data encoding")
119
120
121 def _CleanDirectory(path, exclude=None):
122   """Removes all regular files in a directory.
123
124   @type path: str
125   @param path: the directory to clean
126   @type exclude: list
127   @param exclude: list of files to be excluded, defaults
128       to the empty list
129
130   """
131   if not os.path.isdir(path):
132     return
133   if exclude is None:
134     exclude = []
135   else:
136     # Normalize excluded paths
137     exclude = [os.path.normpath(i) for i in exclude]
138
139   for rel_name in utils.ListVisibleFiles(path):
140     full_name = os.path.normpath(os.path.join(path, rel_name))
141     if full_name in exclude:
142       continue
143     if os.path.isfile(full_name) and not os.path.islink(full_name):
144       utils.RemoveFile(full_name)
145
146
147 def JobQueuePurge():
148   """Removes job queue files and archived jobs.
149
150   @rtype: None
151
152   """
153   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
154   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
155
156
157 def GetMasterInfo():
158   """Returns master information.
159
160   This is an utility function to compute master information, either
161   for consumption here or from the node daemon.
162
163   @rtype: tuple
164   @return: (master_netdev, master_ip, master_name) if we have a good
165       configuration, otherwise (None, None, None)
166
167   """
168   try:
169     cfg = _GetConfig()
170     master_netdev = cfg.GetMasterNetdev()
171     master_ip = cfg.GetMasterIP()
172     master_node = cfg.GetMasterNode()
173   except errors.ConfigurationError, err:
174     logging.exception("Cluster configuration incomplete")
175     return (None, None, None)
176   return (master_netdev, master_ip, master_node)
177
178
179 def StartMaster(start_daemons):
180   """Activate local node as master node.
181
182   The function will always try activate the IP address of the master
183   (unless someone else has it). It will also start the master daemons,
184   based on the start_daemons parameter.
185
186   @type start_daemons: boolean
187   @param start_daemons: whther to also start the master
188       daemons (ganeti-masterd and ganeti-rapi)
189   @rtype: None
190
191   """
192   ok = True
193   master_netdev, master_ip, _ = GetMasterInfo()
194   if not master_netdev:
195     return False
196
197   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
198     if utils.OwnIpAddress(master_ip):
199       # we already have the ip:
200       logging.debug("Already started")
201     else:
202       logging.error("Someone else has the master ip, not activating")
203       ok = False
204   else:
205     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
206                            "dev", master_netdev, "label",
207                            "%s:0" % master_netdev])
208     if result.failed:
209       logging.error("Can't activate master IP: %s", result.output)
210       ok = False
211
212     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
213                            "-s", master_ip, master_ip])
214     # we'll ignore the exit code of arping
215
216   # and now start the master and rapi daemons
217   if start_daemons:
218     for daemon in 'ganeti-masterd', 'ganeti-rapi':
219       result = utils.RunCmd([daemon])
220       if result.failed:
221         logging.error("Can't start daemon %s: %s", daemon, result.output)
222         ok = False
223   return ok
224
225
226 def StopMaster(stop_daemons):
227   """Deactivate this node as master.
228
229   The function will always try to deactivate the IP address of the
230   master. It will also stop the master daemons depending on the
231   stop_daemons parameter.
232
233   @type stop_daemons: boolean
234   @param stop_daemons: whether to also stop the master daemons
235       (ganeti-masterd and ganeti-rapi)
236   @rtype: None
237
238   """
239   master_netdev, master_ip, _ = GetMasterInfo()
240   if not master_netdev:
241     return False
242
243   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
244                          "dev", master_netdev])
245   if result.failed:
246     logging.error("Can't remove the master IP, error: %s", result.output)
247     # but otherwise ignore the failure
248
249   if stop_daemons:
250     # stop/kill the rapi and the master daemon
251     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
252       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
253
254   return True
255
256
257 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
258   """Joins this node to the cluster.
259
260   This does the following:
261       - updates the hostkeys of the machine (rsa and dsa)
262       - adds the ssh private key to the user
263       - adds the ssh public key to the users' authorized_keys file
264
265   @type dsa: str
266   @param dsa: the DSA private key to write
267   @type dsapub: str
268   @param dsapub: the DSA public key to write
269   @type rsa: str
270   @param rsa: the RSA private key to write
271   @type rsapub: str
272   @param rsapub: the RSA public key to write
273   @type sshkey: str
274   @param sshkey: the SSH private key to write
275   @type sshpub: str
276   @param sshpub: the SSH public key to write
277   @rtype: boolean
278   @return: the success of the operation
279
280   """
281   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
282                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
283                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
284                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
285   for name, content, mode in sshd_keys:
286     utils.WriteFile(name, data=content, mode=mode)
287
288   try:
289     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
290                                                     mkdir=True)
291   except errors.OpExecError, err:
292     _Fail("Error while processing user ssh files: %s", err, exc=True)
293
294   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
295     utils.WriteFile(name, data=content, mode=0600)
296
297   utils.AddAuthorizedKey(auth_keys, sshpub)
298
299   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
300
301   return (True, "Node added successfully")
302
303
304 def LeaveCluster():
305   """Cleans up and remove the current node.
306
307   This function cleans up and prepares the current node to be removed
308   from the cluster.
309
310   If processing is successful, then it raises an
311   L{errors.QuitGanetiException} which is used as a special case to
312   shutdown the node daemon.
313
314   """
315   _CleanDirectory(constants.DATA_DIR)
316   JobQueuePurge()
317
318   try:
319     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
320   except errors.OpExecError:
321     logging.exception("Error while processing ssh files")
322     return
323
324   f = open(pub_key, 'r')
325   try:
326     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
327   finally:
328     f.close()
329
330   utils.RemoveFile(priv_key)
331   utils.RemoveFile(pub_key)
332
333   # Return a reassuring string to the caller, and quit
334   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
335
336
337 def GetNodeInfo(vgname, hypervisor_type):
338   """Gives back a hash with different informations about the node.
339
340   @type vgname: C{string}
341   @param vgname: the name of the volume group to ask for disk space information
342   @type hypervisor_type: C{str}
343   @param hypervisor_type: the name of the hypervisor to ask for
344       memory information
345   @rtype: C{dict}
346   @return: dictionary with the following keys:
347       - vg_size is the size of the configured volume group in MiB
348       - vg_free is the free size of the volume group in MiB
349       - memory_dom0 is the memory allocated for domain0 in MiB
350       - memory_free is the currently available (free) ram in MiB
351       - memory_total is the total number of ram in MiB
352
353   """
354   outputarray = {}
355   vginfo = _GetVGInfo(vgname)
356   outputarray['vg_size'] = vginfo['vg_size']
357   outputarray['vg_free'] = vginfo['vg_free']
358
359   hyper = hypervisor.GetHypervisor(hypervisor_type)
360   hyp_info = hyper.GetNodeInfo()
361   if hyp_info is not None:
362     outputarray.update(hyp_info)
363
364   f = open("/proc/sys/kernel/random/boot_id", 'r')
365   try:
366     outputarray["bootid"] = f.read(128).rstrip("\n")
367   finally:
368     f.close()
369
370   return outputarray
371
372
373 def VerifyNode(what, cluster_name):
374   """Verify the status of the local node.
375
376   Based on the input L{what} parameter, various checks are done on the
377   local node.
378
379   If the I{filelist} key is present, this list of
380   files is checksummed and the file/checksum pairs are returned.
381
382   If the I{nodelist} key is present, we check that we have
383   connectivity via ssh with the target nodes (and check the hostname
384   report).
385
386   If the I{node-net-test} key is present, we check that we have
387   connectivity to the given nodes via both primary IP and, if
388   applicable, secondary IPs.
389
390   @type what: C{dict}
391   @param what: a dictionary of things to check:
392       - filelist: list of files for which to compute checksums
393       - nodelist: list of nodes we should check ssh communication with
394       - node-net-test: list of nodes we should check node daemon port
395         connectivity with
396       - hypervisor: list with hypervisors to run the verify for
397   @rtype: dict
398   @return: a dictionary with the same keys as the input dict, and
399       values representing the result of the checks
400
401   """
402   result = {}
403
404   if constants.NV_HYPERVISOR in what:
405     result[constants.NV_HYPERVISOR] = tmp = {}
406     for hv_name in what[constants.NV_HYPERVISOR]:
407       tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
408
409   if constants.NV_FILELIST in what:
410     result[constants.NV_FILELIST] = utils.FingerprintFiles(
411       what[constants.NV_FILELIST])
412
413   if constants.NV_NODELIST in what:
414     result[constants.NV_NODELIST] = tmp = {}
415     random.shuffle(what[constants.NV_NODELIST])
416     for node in what[constants.NV_NODELIST]:
417       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
418       if not success:
419         tmp[node] = message
420
421   if constants.NV_NODENETTEST in what:
422     result[constants.NV_NODENETTEST] = tmp = {}
423     my_name = utils.HostInfo().name
424     my_pip = my_sip = None
425     for name, pip, sip in what[constants.NV_NODENETTEST]:
426       if name == my_name:
427         my_pip = pip
428         my_sip = sip
429         break
430     if not my_pip:
431       tmp[my_name] = ("Can't find my own primary/secondary IP"
432                       " in the node list")
433     else:
434       port = utils.GetNodeDaemonPort()
435       for name, pip, sip in what[constants.NV_NODENETTEST]:
436         fail = []
437         if not utils.TcpPing(pip, port, source=my_pip):
438           fail.append("primary")
439         if sip != pip:
440           if not utils.TcpPing(sip, port, source=my_sip):
441             fail.append("secondary")
442         if fail:
443           tmp[name] = ("failure using the %s interface(s)" %
444                        " and ".join(fail))
445
446   if constants.NV_LVLIST in what:
447     result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
448
449   if constants.NV_INSTANCELIST in what:
450     result[constants.NV_INSTANCELIST] = GetInstanceList(
451       what[constants.NV_INSTANCELIST])
452
453   if constants.NV_VGLIST in what:
454     result[constants.NV_VGLIST] = ListVolumeGroups()
455
456   if constants.NV_VERSION in what:
457     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
458                                     constants.RELEASE_VERSION)
459
460   if constants.NV_HVINFO in what:
461     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
462     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
463
464   if constants.NV_DRBDLIST in what:
465     try:
466       used_minors = bdev.DRBD8.GetUsedDevs().keys()
467     except errors.BlockDeviceError, err:
468       logging.warning("Can't get used minors list", exc_info=True)
469       used_minors = str(err)
470     result[constants.NV_DRBDLIST] = used_minors
471
472   return result
473
474
475 def GetVolumeList(vg_name):
476   """Compute list of logical volumes and their size.
477
478   @type vg_name: str
479   @param vg_name: the volume group whose LVs we should list
480   @rtype: dict
481   @return:
482       dictionary of all partions (key) with value being a tuple of
483       their size (in MiB), inactive and online status::
484
485         {'test1': ('20.06', True, True)}
486
487       in case of errors, a string is returned with the error
488       details.
489
490   """
491   lvs = {}
492   sep = '|'
493   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
494                          "--separator=%s" % sep,
495                          "-olv_name,lv_size,lv_attr", vg_name])
496   if result.failed:
497     logging.error("Failed to list logical volumes, lvs output: %s",
498                   result.output)
499     return result.output
500
501   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
502   for line in result.stdout.splitlines():
503     line = line.strip()
504     match = valid_line_re.match(line)
505     if not match:
506       logging.error("Invalid line returned from lvs output: '%s'", line)
507       continue
508     name, size, attr = match.groups()
509     inactive = attr[4] == '-'
510     online = attr[5] == 'o'
511     lvs[name] = (size, inactive, online)
512
513   return lvs
514
515
516 def ListVolumeGroups():
517   """List the volume groups and their size.
518
519   @rtype: dict
520   @return: dictionary with keys volume name and values the
521       size of the volume
522
523   """
524   return utils.ListVolumeGroups()
525
526
527 def NodeVolumes():
528   """List all volumes on this node.
529
530   @rtype: list
531   @return:
532     A list of dictionaries, each having four keys:
533       - name: the logical volume name,
534       - size: the size of the logical volume
535       - dev: the physical device on which the LV lives
536       - vg: the volume group to which it belongs
537
538     In case of errors, we return an empty list and log the
539     error.
540
541     Note that since a logical volume can live on multiple physical
542     volumes, the resulting list might include a logical volume
543     multiple times.
544
545   """
546   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
547                          "--separator=|",
548                          "--options=lv_name,lv_size,devices,vg_name"])
549   if result.failed:
550     logging.error("Failed to list logical volumes, lvs output: %s",
551                   result.output)
552     return []
553
554   def parse_dev(dev):
555     if '(' in dev:
556       return dev.split('(')[0]
557     else:
558       return dev
559
560   def map_line(line):
561     return {
562       'name': line[0].strip(),
563       'size': line[1].strip(),
564       'dev': parse_dev(line[2].strip()),
565       'vg': line[3].strip(),
566     }
567
568   return [map_line(line.split('|')) for line in result.stdout.splitlines()
569           if line.count('|') >= 3]
570
571
572 def BridgesExist(bridges_list):
573   """Check if a list of bridges exist on the current node.
574
575   @rtype: boolean
576   @return: C{True} if all of them exist, C{False} otherwise
577
578   """
579   for bridge in bridges_list:
580     if not utils.BridgeExists(bridge):
581       return False
582
583   return True
584
585
586 def GetInstanceList(hypervisor_list):
587   """Provides a list of instances.
588
589   @type hypervisor_list: list
590   @param hypervisor_list: the list of hypervisors to query information
591
592   @rtype: list
593   @return: a list of all running instances on the current node
594     - instance1.example.com
595     - instance2.example.com
596
597   """
598   results = []
599   for hname in hypervisor_list:
600     try:
601       names = hypervisor.GetHypervisor(hname).ListInstances()
602       results.extend(names)
603     except errors.HypervisorError, err:
604       logging.exception("Error enumerating instances for hypevisor %s", hname)
605       raise
606
607   return results
608
609
610 def GetInstanceInfo(instance, hname):
611   """Gives back the informations about an instance as a dictionary.
612
613   @type instance: string
614   @param instance: the instance name
615   @type hname: string
616   @param hname: the hypervisor type of the instance
617
618   @rtype: dict
619   @return: dictionary with the following keys:
620       - memory: memory size of instance (int)
621       - state: xen state of instance (string)
622       - time: cpu time of instance (float)
623
624   """
625   output = {}
626
627   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
628   if iinfo is not None:
629     output['memory'] = iinfo[2]
630     output['state'] = iinfo[4]
631     output['time'] = iinfo[5]
632
633   return output
634
635
636 def GetInstanceMigratable(instance):
637   """Gives whether an instance can be migrated.
638
639   @type instance: L{objects.Instance}
640   @param instance: object representing the instance to be checked.
641
642   @rtype: tuple
643   @return: tuple of (result, description) where:
644       - result: whether the instance can be migrated or not
645       - description: a description of the issue, if relevant
646
647   """
648   hyper = hypervisor.GetHypervisor(instance.hypervisor)
649   if instance.name not in hyper.ListInstances():
650     return (False, 'not running')
651
652   for idx in range(len(instance.disks)):
653     link_name = _GetBlockDevSymlinkPath(instance.name, idx)
654     if not os.path.islink(link_name):
655       return (False, 'not restarted since ganeti 1.2.5')
656
657   return (True, '')
658
659
660 def GetAllInstancesInfo(hypervisor_list):
661   """Gather data about all instances.
662
663   This is the equivalent of L{GetInstanceInfo}, except that it
664   computes data for all instances at once, thus being faster if one
665   needs data about more than one instance.
666
667   @type hypervisor_list: list
668   @param hypervisor_list: list of hypervisors to query for instance data
669
670   @rtype: dict
671   @return: dictionary of instance: data, with data having the following keys:
672       - memory: memory size of instance (int)
673       - state: xen state of instance (string)
674       - time: cpu time of instance (float)
675       - vcpus: the number of vcpus
676
677   """
678   output = {}
679
680   for hname in hypervisor_list:
681     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
682     if iinfo:
683       for name, inst_id, memory, vcpus, state, times in iinfo:
684         value = {
685           'memory': memory,
686           'vcpus': vcpus,
687           'state': state,
688           'time': times,
689           }
690         if name in output:
691           # we only check static parameters, like memory and vcpus,
692           # and not state and time which can change between the
693           # invocations of the different hypervisors
694           for key in 'memory', 'vcpus':
695             if value[key] != output[name][key]:
696               raise errors.HypervisorError("Instance %s is running twice"
697                                            " with different parameters" % name)
698         output[name] = value
699
700   return output
701
702
703 def InstanceOsAdd(instance, reinstall):
704   """Add an OS to an instance.
705
706   @type instance: L{objects.Instance}
707   @param instance: Instance whose OS is to be installed
708   @type reinstall: boolean
709   @param reinstall: whether this is an instance reinstall
710   @rtype: boolean
711   @return: the success of the operation
712
713   """
714   try:
715     inst_os = OSFromDisk(instance.os)
716   except errors.InvalidOS, err:
717     os_name, os_dir, os_err = err.args
718     if os_dir is None:
719       return (False, "Can't find OS '%s': %s" % (os_name, os_err))
720     else:
721       return (False, "Error parsing OS '%s' in directory %s: %s" %
722               (os_name, os_dir, os_err))
723
724   create_env = OSEnvironment(instance)
725   if reinstall:
726     create_env['INSTANCE_REINSTALL'] = "1"
727
728   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
729                                      instance.name, int(time.time()))
730
731   result = utils.RunCmd([inst_os.create_script], env=create_env,
732                         cwd=inst_os.path, output=logfile,)
733   if result.failed:
734     logging.error("os create command '%s' returned error: %s, logfile: %s,"
735                   " output: %s", result.cmd, result.fail_reason, logfile,
736                   result.output)
737     lines = [utils.SafeEncode(val)
738              for val in utils.TailFile(logfile, lines=20)]
739     return (False, "OS create script failed (%s), last lines in the"
740             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
741
742   return (True, "Successfully installed")
743
744
745 def RunRenameInstance(instance, old_name):
746   """Run the OS rename script for an instance.
747
748   @type instance: L{objects.Instance}
749   @param instance: Instance whose OS is to be installed
750   @type old_name: string
751   @param old_name: previous instance name
752   @rtype: boolean
753   @return: the success of the operation
754
755   """
756   inst_os = OSFromDisk(instance.os)
757
758   rename_env = OSEnvironment(instance)
759   rename_env['OLD_INSTANCE_NAME'] = old_name
760
761   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
762                                            old_name,
763                                            instance.name, int(time.time()))
764
765   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
766                         cwd=inst_os.path, output=logfile)
767
768   if result.failed:
769     logging.error("os create command '%s' returned error: %s output: %s",
770                   result.cmd, result.fail_reason, result.output)
771     lines = [utils.SafeEncode(val)
772              for val in utils.TailFile(logfile, lines=20)]
773     return (False, "OS rename script failed (%s), last lines in the"
774             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
775
776   return (True, "Rename successful")
777
778
779 def _GetVGInfo(vg_name):
780   """Get informations about the volume group.
781
782   @type vg_name: str
783   @param vg_name: the volume group which we query
784   @rtype: dict
785   @return:
786     A dictionary with the following keys:
787       - C{vg_size} is the total size of the volume group in MiB
788       - C{vg_free} is the free size of the volume group in MiB
789       - C{pv_count} are the number of physical disks in that VG
790
791     If an error occurs during gathering of data, we return the same dict
792     with keys all set to None.
793
794   """
795   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
796
797   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
798                          "--nosuffix", "--units=m", "--separator=:", vg_name])
799
800   if retval.failed:
801     logging.error("volume group %s not present", vg_name)
802     return retdic
803   valarr = retval.stdout.strip().rstrip(':').split(':')
804   if len(valarr) == 3:
805     try:
806       retdic = {
807         "vg_size": int(round(float(valarr[0]), 0)),
808         "vg_free": int(round(float(valarr[1]), 0)),
809         "pv_count": int(valarr[2]),
810         }
811     except ValueError, err:
812       logging.exception("Fail to parse vgs output")
813   else:
814     logging.error("vgs output has the wrong number of fields (expected"
815                   " three): %s", str(valarr))
816   return retdic
817
818
819 def _GetBlockDevSymlinkPath(instance_name, idx):
820   return os.path.join(constants.DISK_LINKS_DIR,
821                       "%s:%d" % (instance_name, idx))
822
823
824 def _SymlinkBlockDev(instance_name, device_path, idx):
825   """Set up symlinks to a instance's block device.
826
827   This is an auxiliary function run when an instance is start (on the primary
828   node) or when an instance is migrated (on the target node).
829
830
831   @param instance_name: the name of the target instance
832   @param device_path: path of the physical block device, on the node
833   @param idx: the disk index
834   @return: absolute path to the disk's symlink
835
836   """
837   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
838   try:
839     os.symlink(device_path, link_name)
840   except OSError, err:
841     if err.errno == errno.EEXIST:
842       if (not os.path.islink(link_name) or
843           os.readlink(link_name) != device_path):
844         os.remove(link_name)
845         os.symlink(device_path, link_name)
846     else:
847       raise
848
849   return link_name
850
851
852 def _RemoveBlockDevLinks(instance_name, disks):
853   """Remove the block device symlinks belonging to the given instance.
854
855   """
856   for idx, disk in enumerate(disks):
857     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
858     if os.path.islink(link_name):
859       try:
860         os.remove(link_name)
861       except OSError:
862         logging.exception("Can't remove symlink '%s'", link_name)
863
864
865 def _GatherAndLinkBlockDevs(instance):
866   """Set up an instance's block device(s).
867
868   This is run on the primary node at instance startup. The block
869   devices must be already assembled.
870
871   @type instance: L{objects.Instance}
872   @param instance: the instance whose disks we shoul assemble
873   @rtype: list
874   @return: list of (disk_object, device_path)
875
876   """
877   block_devices = []
878   for idx, disk in enumerate(instance.disks):
879     device = _RecursiveFindBD(disk)
880     if device is None:
881       raise errors.BlockDeviceError("Block device '%s' is not set up." %
882                                     str(disk))
883     device.Open()
884     try:
885       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
886     except OSError, e:
887       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
888                                     e.strerror)
889
890     block_devices.append((disk, link_name))
891
892   return block_devices
893
894
895 def StartInstance(instance):
896   """Start an instance.
897
898   @type instance: L{objects.Instance}
899   @param instance: the instance object
900   @rtype: boolean
901   @return: whether the startup was successful or not
902
903   """
904   running_instances = GetInstanceList([instance.hypervisor])
905
906   if instance.name in running_instances:
907     return (True, "Already running")
908
909   try:
910     block_devices = _GatherAndLinkBlockDevs(instance)
911     hyper = hypervisor.GetHypervisor(instance.hypervisor)
912     hyper.StartInstance(instance, block_devices)
913   except errors.BlockDeviceError, err:
914     _Fail("Block device error: %s", err, exc=True)
915   except errors.HypervisorError, err:
916     _RemoveBlockDevLinks(instance.name, instance.disks)
917     _Fail("Hypervisor error: %s", err, exc=True)
918
919   return (True, "Instance started successfully")
920
921
922 def InstanceShutdown(instance):
923   """Shut an instance down.
924
925   @note: this functions uses polling with a hardcoded timeout.
926
927   @type instance: L{objects.Instance}
928   @param instance: the instance object
929   @rtype: boolean
930   @return: whether the startup was successful or not
931
932   """
933   hv_name = instance.hypervisor
934   running_instances = GetInstanceList([hv_name])
935
936   if instance.name not in running_instances:
937     return (True, "Instance already stopped")
938
939   hyper = hypervisor.GetHypervisor(hv_name)
940   try:
941     hyper.StopInstance(instance)
942   except errors.HypervisorError, err:
943     _Fail("Failed to stop instance %s: %s", instance.name, err)
944
945   # test every 10secs for 2min
946
947   time.sleep(1)
948   for dummy in range(11):
949     if instance.name not in GetInstanceList([hv_name]):
950       break
951     time.sleep(10)
952   else:
953     # the shutdown did not succeed
954     logging.error("Shutdown of '%s' unsuccessful, using destroy",
955                   instance.name)
956
957     try:
958       hyper.StopInstance(instance, force=True)
959     except errors.HypervisorError, err:
960       _Fail("Failed to force stop instance %s: %s", instance.name, err)
961
962     time.sleep(1)
963     if instance.name in GetInstanceList([hv_name]):
964       _Fail("Could not shutdown instance %s even by destroy", instance.name)
965
966   _RemoveBlockDevLinks(instance.name, instance.disks)
967
968   return (True, "Instance has been shutdown successfully")
969
970
971 def InstanceReboot(instance, reboot_type):
972   """Reboot an instance.
973
974   @type instance: L{objects.Instance}
975   @param instance: the instance object to reboot
976   @type reboot_type: str
977   @param reboot_type: the type of reboot, one the following
978     constants:
979       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
980         instance OS, do not recreate the VM
981       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
982         restart the VM (at the hypervisor level)
983       - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
984         is not accepted here, since that mode is handled
985         differently
986   @rtype: boolean
987   @return: the success of the operation
988
989   """
990   running_instances = GetInstanceList([instance.hypervisor])
991
992   if instance.name not in running_instances:
993     _Fail("Cannot reboot instance %s that is not running", instance.name)
994
995   hyper = hypervisor.GetHypervisor(instance.hypervisor)
996   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
997     try:
998       hyper.RebootInstance(instance)
999     except errors.HypervisorError, err:
1000       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1001   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1002     try:
1003       stop_result = InstanceShutdown(instance)
1004       if not stop_result[0]:
1005         return stop_result
1006       return StartInstance(instance)
1007     except errors.HypervisorError, err:
1008       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1009   else:
1010     _Fail("Invalid reboot_type received: %s", reboot_type)
1011
1012   return (True, "Reboot successful")
1013
1014
1015 def MigrationInfo(instance):
1016   """Gather information about an instance to be migrated.
1017
1018   @type instance: L{objects.Instance}
1019   @param instance: the instance definition
1020
1021   """
1022   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1023   try:
1024     info = hyper.MigrationInfo(instance)
1025   except errors.HypervisorError, err:
1026     _Fail("Failed to fetch migration information: %s", err, exc=True)
1027   return (True, info)
1028
1029
1030 def AcceptInstance(instance, info, target):
1031   """Prepare the node to accept an instance.
1032
1033   @type instance: L{objects.Instance}
1034   @param instance: the instance definition
1035   @type info: string/data (opaque)
1036   @param info: migration information, from the source node
1037   @type target: string
1038   @param target: target host (usually ip), on this node
1039
1040   """
1041   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1042   try:
1043     hyper.AcceptInstance(instance, info, target)
1044   except errors.HypervisorError, err:
1045     _Fail("Failed to accept instance: %s", err, exc=True)
1046   return (True, "Accept successfull")
1047
1048
1049 def FinalizeMigration(instance, info, success):
1050   """Finalize any preparation to accept an instance.
1051
1052   @type instance: L{objects.Instance}
1053   @param instance: the instance definition
1054   @type info: string/data (opaque)
1055   @param info: migration information, from the source node
1056   @type success: boolean
1057   @param success: whether the migration was a success or a failure
1058
1059   """
1060   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1061   try:
1062     hyper.FinalizeMigration(instance, info, success)
1063   except errors.HypervisorError, err:
1064     _Fail("Failed to finalize migration: %s", err, exc=True)
1065   return (True, "Migration Finalized")
1066
1067
1068 def MigrateInstance(instance, target, live):
1069   """Migrates an instance to another node.
1070
1071   @type instance: L{objects.Instance}
1072   @param instance: the instance definition
1073   @type target: string
1074   @param target: the target node name
1075   @type live: boolean
1076   @param live: whether the migration should be done live or not (the
1077       interpretation of this parameter is left to the hypervisor)
1078   @rtype: tuple
1079   @return: a tuple of (success, msg) where:
1080       - succes is a boolean denoting the success/failure of the operation
1081       - msg is a string with details in case of failure
1082
1083   """
1084   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1085
1086   try:
1087     hyper.MigrateInstance(instance.name, target, live)
1088   except errors.HypervisorError, err:
1089     _Fail("Failed to migrate instance: %s", err, exc=True)
1090   return (True, "Migration successfull")
1091
1092
1093 def BlockdevCreate(disk, size, owner, on_primary, info):
1094   """Creates a block device for an instance.
1095
1096   @type disk: L{objects.Disk}
1097   @param disk: the object describing the disk we should create
1098   @type size: int
1099   @param size: the size of the physical underlying device, in MiB
1100   @type owner: str
1101   @param owner: the name of the instance for which disk is created,
1102       used for device cache data
1103   @type on_primary: boolean
1104   @param on_primary:  indicates if it is the primary node or not
1105   @type info: string
1106   @param info: string that will be sent to the physical device
1107       creation, used for example to set (LVM) tags on LVs
1108
1109   @return: the new unique_id of the device (this can sometime be
1110       computed only after creation), or None. On secondary nodes,
1111       it's not required to return anything.
1112
1113   """
1114   clist = []
1115   if disk.children:
1116     for child in disk.children:
1117       try:
1118         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1119       except errors.BlockDeviceError, err:
1120         _Fail("Can't assemble device %s: %s", child, err)
1121       if on_primary or disk.AssembleOnSecondary():
1122         # we need the children open in case the device itself has to
1123         # be assembled
1124         try:
1125           crdev.Open()
1126         except errors.BlockDeviceError, err:
1127           _Fail("Can't make child '%s' read-write: %s", child, err)
1128       clist.append(crdev)
1129
1130   try:
1131     device = bdev.Create(disk.dev_type, disk.physical_id, clist, size)
1132   except errors.BlockDeviceError, err:
1133     _Fail("Can't create block device: %s", err)
1134
1135   if on_primary or disk.AssembleOnSecondary():
1136     try:
1137       device.Assemble()
1138     except errors.BlockDeviceError, err:
1139       _Fail("Can't assemble device after creation, unusual event: %s", err)
1140     device.SetSyncSpeed(constants.SYNC_SPEED)
1141     if on_primary or disk.OpenOnSecondary():
1142       try:
1143         device.Open(force=True)
1144       except errors.BlockDeviceError, err:
1145         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1146     DevCacheManager.UpdateCache(device.dev_path, owner,
1147                                 on_primary, disk.iv_name)
1148
1149   device.SetInfo(info)
1150
1151   physical_id = device.unique_id
1152   return True, physical_id
1153
1154
1155 def BlockdevRemove(disk):
1156   """Remove a block device.
1157
1158   @note: This is intended to be called recursively.
1159
1160   @type disk: L{objects.Disk}
1161   @param disk: the disk object we should remove
1162   @rtype: boolean
1163   @return: the success of the operation
1164
1165   """
1166   msgs = []
1167   result = True
1168   try:
1169     rdev = _RecursiveFindBD(disk)
1170   except errors.BlockDeviceError, err:
1171     # probably can't attach
1172     logging.info("Can't attach to device %s in remove", disk)
1173     rdev = None
1174   if rdev is not None:
1175     r_path = rdev.dev_path
1176     try:
1177       rdev.Remove()
1178     except errors.BlockDeviceError, err:
1179       msgs.append(str(err))
1180       result = False
1181     if result:
1182       DevCacheManager.RemoveCache(r_path)
1183
1184   if disk.children:
1185     for child in disk.children:
1186       c_status, c_msg = BlockdevRemove(child)
1187       result = result and c_status
1188       if c_msg: # not an empty message
1189         msgs.append(c_msg)
1190
1191   return (result, "; ".join(msgs))
1192
1193
1194 def _RecursiveAssembleBD(disk, owner, as_primary):
1195   """Activate a block device for an instance.
1196
1197   This is run on the primary and secondary nodes for an instance.
1198
1199   @note: this function is called recursively.
1200
1201   @type disk: L{objects.Disk}
1202   @param disk: the disk we try to assemble
1203   @type owner: str
1204   @param owner: the name of the instance which owns the disk
1205   @type as_primary: boolean
1206   @param as_primary: if we should make the block device
1207       read/write
1208
1209   @return: the assembled device or None (in case no device
1210       was assembled)
1211   @raise errors.BlockDeviceError: in case there is an error
1212       during the activation of the children or the device
1213       itself
1214
1215   """
1216   children = []
1217   if disk.children:
1218     mcn = disk.ChildrenNeeded()
1219     if mcn == -1:
1220       mcn = 0 # max number of Nones allowed
1221     else:
1222       mcn = len(disk.children) - mcn # max number of Nones
1223     for chld_disk in disk.children:
1224       try:
1225         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1226       except errors.BlockDeviceError, err:
1227         if children.count(None) >= mcn:
1228           raise
1229         cdev = None
1230         logging.error("Error in child activation (but continuing): %s",
1231                       str(err))
1232       children.append(cdev)
1233
1234   if as_primary or disk.AssembleOnSecondary():
1235     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children)
1236     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1237     result = r_dev
1238     if as_primary or disk.OpenOnSecondary():
1239       r_dev.Open()
1240     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1241                                 as_primary, disk.iv_name)
1242
1243   else:
1244     result = True
1245   return result
1246
1247
1248 def BlockdevAssemble(disk, owner, as_primary):
1249   """Activate a block device for an instance.
1250
1251   This is a wrapper over _RecursiveAssembleBD.
1252
1253   @rtype: str or boolean
1254   @return: a C{/dev/...} path for primary nodes, and
1255       C{True} for secondary nodes
1256
1257   """
1258   status = True
1259   result = "no error information"
1260   try:
1261     result = _RecursiveAssembleBD(disk, owner, as_primary)
1262     if isinstance(result, bdev.BlockDev):
1263       result = result.dev_path
1264   except errors.BlockDeviceError, err:
1265     result = "Error while assembling disk: %s" % str(err)
1266     status = False
1267   return (status, result)
1268
1269
1270 def BlockdevShutdown(disk):
1271   """Shut down a block device.
1272
1273   First, if the device is assembled (Attach() is successfull), then
1274   the device is shutdown. Then the children of the device are
1275   shutdown.
1276
1277   This function is called recursively. Note that we don't cache the
1278   children or such, as oppossed to assemble, shutdown of different
1279   devices doesn't require that the upper device was active.
1280
1281   @type disk: L{objects.Disk}
1282   @param disk: the description of the disk we should
1283       shutdown
1284   @rtype: boolean
1285   @return: the success of the operation
1286
1287   """
1288   msgs = []
1289   result = True
1290   r_dev = _RecursiveFindBD(disk)
1291   if r_dev is not None:
1292     r_path = r_dev.dev_path
1293     try:
1294       r_dev.Shutdown()
1295       DevCacheManager.RemoveCache(r_path)
1296     except errors.BlockDeviceError, err:
1297       msgs.append(str(err))
1298       result = False
1299
1300   if disk.children:
1301     for child in disk.children:
1302       c_status, c_msg = BlockdevShutdown(child)
1303       result = result and c_status
1304       if c_msg: # not an empty message
1305         msgs.append(c_msg)
1306
1307   return (result, "; ".join(msgs))
1308
1309
1310 def BlockdevAddchildren(parent_cdev, new_cdevs):
1311   """Extend a mirrored block device.
1312
1313   @type parent_cdev: L{objects.Disk}
1314   @param parent_cdev: the disk to which we should add children
1315   @type new_cdevs: list of L{objects.Disk}
1316   @param new_cdevs: the list of children which we should add
1317   @rtype: boolean
1318   @return: the success of the operation
1319
1320   """
1321   parent_bdev = _RecursiveFindBD(parent_cdev)
1322   if parent_bdev is None:
1323     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1324   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1325   if new_bdevs.count(None) > 0:
1326     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1327   parent_bdev.AddChildren(new_bdevs)
1328   return (True, None)
1329
1330
1331 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1332   """Shrink a mirrored block device.
1333
1334   @type parent_cdev: L{objects.Disk}
1335   @param parent_cdev: the disk from which we should remove children
1336   @type new_cdevs: list of L{objects.Disk}
1337   @param new_cdevs: the list of children which we should remove
1338   @rtype: boolean
1339   @return: the success of the operation
1340
1341   """
1342   parent_bdev = _RecursiveFindBD(parent_cdev)
1343   if parent_bdev is None:
1344     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1345   devs = []
1346   for disk in new_cdevs:
1347     rpath = disk.StaticDevPath()
1348     if rpath is None:
1349       bd = _RecursiveFindBD(disk)
1350       if bd is None:
1351         _Fail("Can't find device %s while removing children", disk)
1352       else:
1353         devs.append(bd.dev_path)
1354     else:
1355       devs.append(rpath)
1356   parent_bdev.RemoveChildren(devs)
1357   return (True, None)
1358
1359
1360 def BlockdevGetmirrorstatus(disks):
1361   """Get the mirroring status of a list of devices.
1362
1363   @type disks: list of L{objects.Disk}
1364   @param disks: the list of disks which we should query
1365   @rtype: disk
1366   @return:
1367       a list of (mirror_done, estimated_time) tuples, which
1368       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1369   @raise errors.BlockDeviceError: if any of the disks cannot be
1370       found
1371
1372   """
1373   stats = []
1374   for dsk in disks:
1375     rbd = _RecursiveFindBD(dsk)
1376     if rbd is None:
1377       _Fail("Can't find device %s", dsk)
1378     stats.append(rbd.CombinedSyncStatus())
1379   return True, stats
1380
1381
1382 def _RecursiveFindBD(disk):
1383   """Check if a device is activated.
1384
1385   If so, return informations about the real device.
1386
1387   @type disk: L{objects.Disk}
1388   @param disk: the disk object we need to find
1389
1390   @return: None if the device can't be found,
1391       otherwise the device instance
1392
1393   """
1394   children = []
1395   if disk.children:
1396     for chdisk in disk.children:
1397       children.append(_RecursiveFindBD(chdisk))
1398
1399   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1400
1401
1402 def BlockdevFind(disk):
1403   """Check if a device is activated.
1404
1405   If it is, return informations about the real device.
1406
1407   @type disk: L{objects.Disk}
1408   @param disk: the disk to find
1409   @rtype: None or tuple
1410   @return: None if the disk cannot be found, otherwise a
1411       tuple (device_path, major, minor, sync_percent,
1412       estimated_time, is_degraded)
1413
1414   """
1415   try:
1416     rbd = _RecursiveFindBD(disk)
1417   except errors.BlockDeviceError, err:
1418     _Fail("Failed to find device: %s", err, exc=True)
1419   if rbd is None:
1420     return (True, None)
1421   return (True, (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus())
1422
1423
1424 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1425   """Write a file to the filesystem.
1426
1427   This allows the master to overwrite(!) a file. It will only perform
1428   the operation if the file belongs to a list of configuration files.
1429
1430   @type file_name: str
1431   @param file_name: the target file name
1432   @type data: str
1433   @param data: the new contents of the file
1434   @type mode: int
1435   @param mode: the mode to give the file (can be None)
1436   @type uid: int
1437   @param uid: the owner of the file (can be -1 for default)
1438   @type gid: int
1439   @param gid: the group of the file (can be -1 for default)
1440   @type atime: float
1441   @param atime: the atime to set on the file (can be None)
1442   @type mtime: float
1443   @param mtime: the mtime to set on the file (can be None)
1444   @rtype: boolean
1445   @return: the success of the operation; errors are logged
1446       in the node daemon log
1447
1448   """
1449   if not os.path.isabs(file_name):
1450     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1451
1452   allowed_files = set([
1453     constants.CLUSTER_CONF_FILE,
1454     constants.ETC_HOSTS,
1455     constants.SSH_KNOWN_HOSTS_FILE,
1456     constants.VNC_PASSWORD_FILE,
1457     constants.RAPI_CERT_FILE,
1458     constants.RAPI_USERS_FILE,
1459     ])
1460
1461   for hv_name in constants.HYPER_TYPES:
1462     hv_class = hypervisor.GetHypervisor(hv_name)
1463     allowed_files.update(hv_class.GetAncillaryFiles())
1464
1465   if file_name not in allowed_files:
1466     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1467           file_name)
1468
1469   raw_data = _Decompress(data)
1470
1471   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1472                   atime=atime, mtime=mtime)
1473   return (True, "success")
1474
1475
1476 def WriteSsconfFiles(values):
1477   """Update all ssconf files.
1478
1479   Wrapper around the SimpleStore.WriteFiles.
1480
1481   """
1482   ssconf.SimpleStore().WriteFiles(values)
1483
1484
1485 def _ErrnoOrStr(err):
1486   """Format an EnvironmentError exception.
1487
1488   If the L{err} argument has an errno attribute, it will be looked up
1489   and converted into a textual C{E...} description. Otherwise the
1490   string representation of the error will be returned.
1491
1492   @type err: L{EnvironmentError}
1493   @param err: the exception to format
1494
1495   """
1496   if hasattr(err, 'errno'):
1497     detail = errno.errorcode[err.errno]
1498   else:
1499     detail = str(err)
1500   return detail
1501
1502
1503 def _OSOndiskVersion(name, os_dir):
1504   """Compute and return the API version of a given OS.
1505
1506   This function will try to read the API version of the OS given by
1507   the 'name' parameter and residing in the 'os_dir' directory.
1508
1509   @type name: str
1510   @param name: the OS name we should look for
1511   @type os_dir: str
1512   @param os_dir: the directory inwhich we should look for the OS
1513   @rtype: int or None
1514   @return:
1515       Either an integer denoting the version or None in the
1516       case when this is not a valid OS name.
1517   @raise errors.InvalidOS: if the OS cannot be found
1518
1519   """
1520   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1521
1522   try:
1523     st = os.stat(api_file)
1524   except EnvironmentError, err:
1525     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1526                            " found (%s)" % _ErrnoOrStr(err))
1527
1528   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1529     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1530                            " a regular file")
1531
1532   try:
1533     f = open(api_file)
1534     try:
1535       api_versions = f.readlines()
1536     finally:
1537       f.close()
1538   except EnvironmentError, err:
1539     raise errors.InvalidOS(name, os_dir, "error while reading the"
1540                            " API version (%s)" % _ErrnoOrStr(err))
1541
1542   api_versions = [version.strip() for version in api_versions]
1543   try:
1544     api_versions = [int(version) for version in api_versions]
1545   except (TypeError, ValueError), err:
1546     raise errors.InvalidOS(name, os_dir,
1547                            "API version is not integer (%s)" % str(err))
1548
1549   return api_versions
1550
1551
1552 def DiagnoseOS(top_dirs=None):
1553   """Compute the validity for all OSes.
1554
1555   @type top_dirs: list
1556   @param top_dirs: the list of directories in which to
1557       search (if not given defaults to
1558       L{constants.OS_SEARCH_PATH})
1559   @rtype: list of L{objects.OS}
1560   @return: an OS object for each name in all the given
1561       directories
1562
1563   """
1564   if top_dirs is None:
1565     top_dirs = constants.OS_SEARCH_PATH
1566
1567   result = []
1568   for dir_name in top_dirs:
1569     if os.path.isdir(dir_name):
1570       try:
1571         f_names = utils.ListVisibleFiles(dir_name)
1572       except EnvironmentError, err:
1573         logging.exception("Can't list the OS directory %s", dir_name)
1574         break
1575       for name in f_names:
1576         try:
1577           os_inst = OSFromDisk(name, base_dir=dir_name)
1578           result.append(os_inst)
1579         except errors.InvalidOS, err:
1580           result.append(objects.OS.FromInvalidOS(err))
1581
1582   return result
1583
1584
1585 def OSFromDisk(name, base_dir=None):
1586   """Create an OS instance from disk.
1587
1588   This function will return an OS instance if the given name is a
1589   valid OS name. Otherwise, it will raise an appropriate
1590   L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1591
1592   @type base_dir: string
1593   @keyword base_dir: Base directory containing OS installations.
1594                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1595   @rtype: L{objects.OS}
1596   @return: the OS instance if we find a valid one
1597   @raise errors.InvalidOS: if we don't find a valid OS
1598
1599   """
1600   if base_dir is None:
1601     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1602     if os_dir is None:
1603       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1604   else:
1605     os_dir = os.path.sep.join([base_dir, name])
1606
1607   api_versions = _OSOndiskVersion(name, os_dir)
1608
1609   if constants.OS_API_VERSION not in api_versions:
1610     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1611                            " (found %s want %s)"
1612                            % (api_versions, constants.OS_API_VERSION))
1613
1614   # OS Scripts dictionary, we will populate it with the actual script names
1615   os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1616
1617   for script in os_scripts:
1618     os_scripts[script] = os.path.sep.join([os_dir, script])
1619
1620     try:
1621       st = os.stat(os_scripts[script])
1622     except EnvironmentError, err:
1623       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1624                              (script, _ErrnoOrStr(err)))
1625
1626     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1627       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1628                              script)
1629
1630     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1631       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1632                              script)
1633
1634
1635   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1636                     create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1637                     export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1638                     import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1639                     rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1640                     api_versions=api_versions)
1641
1642 def OSEnvironment(instance, debug=0):
1643   """Calculate the environment for an os script.
1644
1645   @type instance: L{objects.Instance}
1646   @param instance: target instance for the os script run
1647   @type debug: integer
1648   @param debug: debug level (0 or 1, for OS Api 10)
1649   @rtype: dict
1650   @return: dict of environment variables
1651   @raise errors.BlockDeviceError: if the block device
1652       cannot be found
1653
1654   """
1655   result = {}
1656   result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1657   result['INSTANCE_NAME'] = instance.name
1658   result['INSTANCE_OS'] = instance.os
1659   result['HYPERVISOR'] = instance.hypervisor
1660   result['DISK_COUNT'] = '%d' % len(instance.disks)
1661   result['NIC_COUNT'] = '%d' % len(instance.nics)
1662   result['DEBUG_LEVEL'] = '%d' % debug
1663   for idx, disk in enumerate(instance.disks):
1664     real_disk = _RecursiveFindBD(disk)
1665     if real_disk is None:
1666       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1667                                     str(disk))
1668     real_disk.Open()
1669     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1670     result['DISK_%d_ACCESS' % idx] = disk.mode
1671     if constants.HV_DISK_TYPE in instance.hvparams:
1672       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1673         instance.hvparams[constants.HV_DISK_TYPE]
1674     if disk.dev_type in constants.LDS_BLOCK:
1675       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1676     elif disk.dev_type == constants.LD_FILE:
1677       result['DISK_%d_BACKEND_TYPE' % idx] = \
1678         'file:%s' % disk.physical_id[0]
1679   for idx, nic in enumerate(instance.nics):
1680     result['NIC_%d_MAC' % idx] = nic.mac
1681     if nic.ip:
1682       result['NIC_%d_IP' % idx] = nic.ip
1683     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1684     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1685       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1686     if nic.nicparams[constants.NIC_LINK]:
1687       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1688     if constants.HV_NIC_TYPE in instance.hvparams:
1689       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1690         instance.hvparams[constants.HV_NIC_TYPE]
1691
1692   return result
1693
1694 def BlockdevGrow(disk, amount):
1695   """Grow a stack of block devices.
1696
1697   This function is called recursively, with the childrens being the
1698   first ones to resize.
1699
1700   @type disk: L{objects.Disk}
1701   @param disk: the disk to be grown
1702   @rtype: (status, result)
1703   @return: a tuple with the status of the operation
1704       (True/False), and the errors message if status
1705       is False
1706
1707   """
1708   r_dev = _RecursiveFindBD(disk)
1709   if r_dev is None:
1710     return False, "Cannot find block device %s" % (disk,)
1711
1712   try:
1713     r_dev.Grow(amount)
1714   except errors.BlockDeviceError, err:
1715     _Fail("Failed to grow block device: %s", err, exc=True)
1716
1717   return True, None
1718
1719
1720 def BlockdevSnapshot(disk):
1721   """Create a snapshot copy of a block device.
1722
1723   This function is called recursively, and the snapshot is actually created
1724   just for the leaf lvm backend device.
1725
1726   @type disk: L{objects.Disk}
1727   @param disk: the disk to be snapshotted
1728   @rtype: string
1729   @return: snapshot disk path
1730
1731   """
1732   if disk.children:
1733     if len(disk.children) == 1:
1734       # only one child, let's recurse on it
1735       return BlockdevSnapshot(disk.children[0])
1736     else:
1737       # more than one child, choose one that matches
1738       for child in disk.children:
1739         if child.size == disk.size:
1740           # return implies breaking the loop
1741           return BlockdevSnapshot(child)
1742   elif disk.dev_type == constants.LD_LV:
1743     r_dev = _RecursiveFindBD(disk)
1744     if r_dev is not None:
1745       # let's stay on the safe side and ask for the full size, for now
1746       return True, r_dev.Snapshot(disk.size)
1747     else:
1748       _Fail("Cannot find block device %s", disk)
1749   else:
1750     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1751           disk.unique_id, disk.dev_type)
1752
1753
1754 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1755   """Export a block device snapshot to a remote node.
1756
1757   @type disk: L{objects.Disk}
1758   @param disk: the description of the disk to export
1759   @type dest_node: str
1760   @param dest_node: the destination node to export to
1761   @type instance: L{objects.Instance}
1762   @param instance: the instance object to whom the disk belongs
1763   @type cluster_name: str
1764   @param cluster_name: the cluster name, needed for SSH hostalias
1765   @type idx: int
1766   @param idx: the index of the disk in the instance's disk list,
1767       used to export to the OS scripts environment
1768   @rtype: boolean
1769   @return: the success of the operation
1770
1771   """
1772   export_env = OSEnvironment(instance)
1773
1774   inst_os = OSFromDisk(instance.os)
1775   export_script = inst_os.export_script
1776
1777   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1778                                      instance.name, int(time.time()))
1779   if not os.path.exists(constants.LOG_OS_DIR):
1780     os.mkdir(constants.LOG_OS_DIR, 0750)
1781   real_disk = _RecursiveFindBD(disk)
1782   if real_disk is None:
1783     _Fail("Block device '%s' is not set up", disk)
1784
1785   real_disk.Open()
1786
1787   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1788   export_env['EXPORT_INDEX'] = str(idx)
1789
1790   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1791   destfile = disk.physical_id[1]
1792
1793   # the target command is built out of three individual commands,
1794   # which are joined by pipes; we check each individual command for
1795   # valid parameters
1796   expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1797                                export_script, logfile)
1798
1799   comprcmd = "gzip"
1800
1801   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1802                                 destdir, destdir, destfile)
1803   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1804                                                    constants.GANETI_RUNAS,
1805                                                    destcmd)
1806
1807   # all commands have been checked, so we're safe to combine them
1808   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1809
1810   result = utils.RunCmd(command, env=export_env)
1811
1812   if result.failed:
1813     _Fail("OS snapshot export command '%s' returned error: %s"
1814           " output: %s", command, result.fail_reason, result.output)
1815
1816   return (True, None)
1817
1818
1819 def FinalizeExport(instance, snap_disks):
1820   """Write out the export configuration information.
1821
1822   @type instance: L{objects.Instance}
1823   @param instance: the instance which we export, used for
1824       saving configuration
1825   @type snap_disks: list of L{objects.Disk}
1826   @param snap_disks: list of snapshot block devices, which
1827       will be used to get the actual name of the dump file
1828
1829   @rtype: boolean
1830   @return: the success of the operation
1831
1832   """
1833   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1834   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1835
1836   config = objects.SerializableConfigParser()
1837
1838   config.add_section(constants.INISECT_EXP)
1839   config.set(constants.INISECT_EXP, 'version', '0')
1840   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1841   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1842   config.set(constants.INISECT_EXP, 'os', instance.os)
1843   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1844
1845   config.add_section(constants.INISECT_INS)
1846   config.set(constants.INISECT_INS, 'name', instance.name)
1847   config.set(constants.INISECT_INS, 'memory', '%d' %
1848              instance.beparams[constants.BE_MEMORY])
1849   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1850              instance.beparams[constants.BE_VCPUS])
1851   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1852
1853   nic_total = 0
1854   for nic_count, nic in enumerate(instance.nics):
1855     nic_total += 1
1856     config.set(constants.INISECT_INS, 'nic%d_mac' %
1857                nic_count, '%s' % nic.mac)
1858     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1859     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1860                '%s' % nic.bridge)
1861   # TODO: redundant: on load can read nics until it doesn't exist
1862   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1863
1864   disk_total = 0
1865   for disk_count, disk in enumerate(snap_disks):
1866     if disk:
1867       disk_total += 1
1868       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1869                  ('%s' % disk.iv_name))
1870       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1871                  ('%s' % disk.physical_id[1]))
1872       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1873                  ('%d' % disk.size))
1874
1875   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1876
1877   utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1878                   data=config.Dumps())
1879   shutil.rmtree(finaldestdir, True)
1880   shutil.move(destdir, finaldestdir)
1881
1882   return True, None
1883
1884
1885 def ExportInfo(dest):
1886   """Get export configuration information.
1887
1888   @type dest: str
1889   @param dest: directory containing the export
1890
1891   @rtype: L{objects.SerializableConfigParser}
1892   @return: a serializable config file containing the
1893       export info
1894
1895   """
1896   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1897
1898   config = objects.SerializableConfigParser()
1899   config.read(cff)
1900
1901   if (not config.has_section(constants.INISECT_EXP) or
1902       not config.has_section(constants.INISECT_INS)):
1903     _Fail("Export info file doesn't have the required fields")
1904
1905   return True, config.Dumps()
1906
1907
1908 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1909   """Import an os image into an instance.
1910
1911   @type instance: L{objects.Instance}
1912   @param instance: instance to import the disks into
1913   @type src_node: string
1914   @param src_node: source node for the disk images
1915   @type src_images: list of string
1916   @param src_images: absolute paths of the disk images
1917   @rtype: list of boolean
1918   @return: each boolean represent the success of importing the n-th disk
1919
1920   """
1921   import_env = OSEnvironment(instance)
1922   inst_os = OSFromDisk(instance.os)
1923   import_script = inst_os.import_script
1924
1925   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1926                                         instance.name, int(time.time()))
1927   if not os.path.exists(constants.LOG_OS_DIR):
1928     os.mkdir(constants.LOG_OS_DIR, 0750)
1929
1930   comprcmd = "gunzip"
1931   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1932                                import_script, logfile)
1933
1934   final_result = []
1935   for idx, image in enumerate(src_images):
1936     if image:
1937       destcmd = utils.BuildShellCmd('cat %s', image)
1938       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1939                                                        constants.GANETI_RUNAS,
1940                                                        destcmd)
1941       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1942       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1943       import_env['IMPORT_INDEX'] = str(idx)
1944       result = utils.RunCmd(command, env=import_env)
1945       if result.failed:
1946         logging.error("Disk import command '%s' returned error: %s"
1947                       " output: %s", command, result.fail_reason,
1948                       result.output)
1949         final_result.append(False)
1950       else:
1951         final_result.append(True)
1952     else:
1953       final_result.append(True)
1954
1955   return final_result
1956
1957
1958 def ListExports():
1959   """Return a list of exports currently available on this machine.
1960
1961   @rtype: list
1962   @return: list of the exports
1963
1964   """
1965   if os.path.isdir(constants.EXPORT_DIR):
1966     return True, utils.ListVisibleFiles(constants.EXPORT_DIR)
1967   else:
1968     return False, "No exports directory"
1969
1970
1971 def RemoveExport(export):
1972   """Remove an existing export from the node.
1973
1974   @type export: str
1975   @param export: the name of the export to remove
1976   @rtype: boolean
1977   @return: the success of the operation
1978
1979   """
1980   target = os.path.join(constants.EXPORT_DIR, export)
1981
1982   shutil.rmtree(target)
1983   # TODO: catch some of the relevant exceptions and provide a pretty
1984   # error message if rmtree fails.
1985
1986   return True
1987
1988
1989 def BlockdevRename(devlist):
1990   """Rename a list of block devices.
1991
1992   @type devlist: list of tuples
1993   @param devlist: list of tuples of the form  (disk,
1994       new_logical_id, new_physical_id); disk is an
1995       L{objects.Disk} object describing the current disk,
1996       and new logical_id/physical_id is the name we
1997       rename it to
1998   @rtype: boolean
1999   @return: True if all renames succeeded, False otherwise
2000
2001   """
2002   msgs = []
2003   result = True
2004   for disk, unique_id in devlist:
2005     dev = _RecursiveFindBD(disk)
2006     if dev is None:
2007       msgs.append("Can't find device %s in rename" % str(disk))
2008       result = False
2009       continue
2010     try:
2011       old_rpath = dev.dev_path
2012       dev.Rename(unique_id)
2013       new_rpath = dev.dev_path
2014       if old_rpath != new_rpath:
2015         DevCacheManager.RemoveCache(old_rpath)
2016         # FIXME: we should add the new cache information here, like:
2017         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2018         # but we don't have the owner here - maybe parse from existing
2019         # cache? for now, we only lose lvm data when we rename, which
2020         # is less critical than DRBD or MD
2021     except errors.BlockDeviceError, err:
2022       msgs.append("Can't rename device '%s' to '%s': %s" %
2023                   (dev, unique_id, err))
2024       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2025       result = False
2026   return (result, "; ".join(msgs))
2027
2028
2029 def _TransformFileStorageDir(file_storage_dir):
2030   """Checks whether given file_storage_dir is valid.
2031
2032   Checks wheter the given file_storage_dir is within the cluster-wide
2033   default file_storage_dir stored in SimpleStore. Only paths under that
2034   directory are allowed.
2035
2036   @type file_storage_dir: str
2037   @param file_storage_dir: the path to check
2038
2039   @return: the normalized path if valid, None otherwise
2040
2041   """
2042   cfg = _GetConfig()
2043   file_storage_dir = os.path.normpath(file_storage_dir)
2044   base_file_storage_dir = cfg.GetFileStorageDir()
2045   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2046       base_file_storage_dir):
2047     logging.error("file storage directory '%s' is not under base file"
2048                   " storage directory '%s'",
2049                   file_storage_dir, base_file_storage_dir)
2050     return None
2051   return file_storage_dir
2052
2053
2054 def CreateFileStorageDir(file_storage_dir):
2055   """Create file storage directory.
2056
2057   @type file_storage_dir: str
2058   @param file_storage_dir: directory to create
2059
2060   @rtype: tuple
2061   @return: tuple with first element a boolean indicating wheter dir
2062       creation was successful or not
2063
2064   """
2065   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2066   result = True,
2067   if not file_storage_dir:
2068     result = False,
2069   else:
2070     if os.path.exists(file_storage_dir):
2071       if not os.path.isdir(file_storage_dir):
2072         logging.error("'%s' is not a directory", file_storage_dir)
2073         result = False,
2074     else:
2075       try:
2076         os.makedirs(file_storage_dir, 0750)
2077       except OSError, err:
2078         logging.error("Cannot create file storage directory '%s': %s",
2079                       file_storage_dir, err)
2080         result = False,
2081   return result
2082
2083
2084 def RemoveFileStorageDir(file_storage_dir):
2085   """Remove file storage directory.
2086
2087   Remove it only if it's empty. If not log an error and return.
2088
2089   @type file_storage_dir: str
2090   @param file_storage_dir: the directory we should cleanup
2091   @rtype: tuple (success,)
2092   @return: tuple of one element, C{success}, denoting
2093       whether the operation was successfull
2094
2095   """
2096   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2097   result = True,
2098   if not file_storage_dir:
2099     result = False,
2100   else:
2101     if os.path.exists(file_storage_dir):
2102       if not os.path.isdir(file_storage_dir):
2103         logging.error("'%s' is not a directory", file_storage_dir)
2104         result = False,
2105       # deletes dir only if empty, otherwise we want to return False
2106       try:
2107         os.rmdir(file_storage_dir)
2108       except OSError, err:
2109         logging.exception("Cannot remove file storage directory '%s'",
2110                           file_storage_dir)
2111         result = False,
2112   return result
2113
2114
2115 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2116   """Rename the file storage directory.
2117
2118   @type old_file_storage_dir: str
2119   @param old_file_storage_dir: the current path
2120   @type new_file_storage_dir: str
2121   @param new_file_storage_dir: the name we should rename to
2122   @rtype: tuple (success,)
2123   @return: tuple of one element, C{success}, denoting
2124       whether the operation was successful
2125
2126   """
2127   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2128   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2129   result = True,
2130   if not old_file_storage_dir or not new_file_storage_dir:
2131     result = False,
2132   else:
2133     if not os.path.exists(new_file_storage_dir):
2134       if os.path.isdir(old_file_storage_dir):
2135         try:
2136           os.rename(old_file_storage_dir, new_file_storage_dir)
2137         except OSError, err:
2138           logging.exception("Cannot rename '%s' to '%s'",
2139                             old_file_storage_dir, new_file_storage_dir)
2140           result =  False,
2141       else:
2142         logging.error("'%s' is not a directory", old_file_storage_dir)
2143         result = False,
2144     else:
2145       if os.path.exists(old_file_storage_dir):
2146         logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
2147                       old_file_storage_dir, new_file_storage_dir)
2148         result = False,
2149   return result
2150
2151
2152 def _IsJobQueueFile(file_name):
2153   """Checks whether the given filename is in the queue directory.
2154
2155   @type file_name: str
2156   @param file_name: the file name we should check
2157   @rtype: boolean
2158   @return: whether the file is under the queue directory
2159
2160   """
2161   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2162   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2163
2164   if not result:
2165     logging.error("'%s' is not a file in the queue directory",
2166                   file_name)
2167
2168   return result
2169
2170
2171 def JobQueueUpdate(file_name, content):
2172   """Updates a file in the queue directory.
2173
2174   This is just a wrapper over L{utils.WriteFile}, with proper
2175   checking.
2176
2177   @type file_name: str
2178   @param file_name: the job file name
2179   @type content: str
2180   @param content: the new job contents
2181   @rtype: boolean
2182   @return: the success of the operation
2183
2184   """
2185   if not _IsJobQueueFile(file_name):
2186     return False
2187
2188   # Write and replace the file atomically
2189   utils.WriteFile(file_name, data=_Decompress(content))
2190
2191   return True
2192
2193
2194 def JobQueueRename(old, new):
2195   """Renames a job queue file.
2196
2197   This is just a wrapper over os.rename with proper checking.
2198
2199   @type old: str
2200   @param old: the old (actual) file name
2201   @type new: str
2202   @param new: the desired file name
2203   @rtype: boolean
2204   @return: the success of the operation
2205
2206   """
2207   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2208     return False
2209
2210   utils.RenameFile(old, new, mkdir=True)
2211
2212   return True
2213
2214
2215 def JobQueueSetDrainFlag(drain_flag):
2216   """Set the drain flag for the queue.
2217
2218   This will set or unset the queue drain flag.
2219
2220   @type drain_flag: boolean
2221   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2222   @rtype: boolean
2223   @return: always True
2224   @warning: the function always returns True
2225
2226   """
2227   if drain_flag:
2228     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2229   else:
2230     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2231
2232   return True
2233
2234
2235 def BlockdevClose(instance_name, disks):
2236   """Closes the given block devices.
2237
2238   This means they will be switched to secondary mode (in case of
2239   DRBD).
2240
2241   @param instance_name: if the argument is not empty, the symlinks
2242       of this instance will be removed
2243   @type disks: list of L{objects.Disk}
2244   @param disks: the list of disks to be closed
2245   @rtype: tuple (success, message)
2246   @return: a tuple of success and message, where success
2247       indicates the succes of the operation, and message
2248       which will contain the error details in case we
2249       failed
2250
2251   """
2252   bdevs = []
2253   for cf in disks:
2254     rd = _RecursiveFindBD(cf)
2255     if rd is None:
2256       _Fail("Can't find device %s", cf)
2257     bdevs.append(rd)
2258
2259   msg = []
2260   for rd in bdevs:
2261     try:
2262       rd.Close()
2263     except errors.BlockDeviceError, err:
2264       msg.append(str(err))
2265   if msg:
2266     return (False, "Can't make devices secondary: %s" % ",".join(msg))
2267   else:
2268     if instance_name:
2269       _RemoveBlockDevLinks(instance_name, disks)
2270     return (True, "All devices secondary")
2271
2272
2273 def ValidateHVParams(hvname, hvparams):
2274   """Validates the given hypervisor parameters.
2275
2276   @type hvname: string
2277   @param hvname: the hypervisor name
2278   @type hvparams: dict
2279   @param hvparams: the hypervisor parameters to be validated
2280   @rtype: tuple (success, message)
2281   @return: a tuple of success and message, where success
2282       indicates the succes of the operation, and message
2283       which will contain the error details in case we
2284       failed
2285
2286   """
2287   try:
2288     hv_type = hypervisor.GetHypervisor(hvname)
2289     hv_type.ValidateParameters(hvparams)
2290     return (True, "Validation passed")
2291   except errors.HypervisorError, err:
2292     return (False, str(err))
2293
2294
2295 def DemoteFromMC():
2296   """Demotes the current node from master candidate role.
2297
2298   """
2299   # try to ensure we're not the master by mistake
2300   master, myself = ssconf.GetMasterAndMyself()
2301   if master == myself:
2302     return (False, "ssconf status shows I'm the master node, will not demote")
2303   pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2304   if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2305     return (False, "The master daemon is running, will not demote")
2306   try:
2307     utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2308   except EnvironmentError, err:
2309     if err.errno != errno.ENOENT:
2310       return (False, "Error while backing up cluster file: %s" % str(err))
2311   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2312   return (True, "Done")
2313
2314
2315 def _FindDisks(nodes_ip, disks):
2316   """Sets the physical ID on disks and returns the block devices.
2317
2318   """
2319   # set the correct physical ID
2320   my_name = utils.HostInfo().name
2321   for cf in disks:
2322     cf.SetPhysicalID(my_name, nodes_ip)
2323
2324   bdevs = []
2325
2326   for cf in disks:
2327     rd = _RecursiveFindBD(cf)
2328     if rd is None:
2329       return (False, "Can't find device %s" % cf)
2330     bdevs.append(rd)
2331   return (True, bdevs)
2332
2333
2334 def DrbdDisconnectNet(nodes_ip, disks):
2335   """Disconnects the network on a list of drbd devices.
2336
2337   """
2338   status, bdevs = _FindDisks(nodes_ip, disks)
2339   if not status:
2340     return status, bdevs
2341
2342   # disconnect disks
2343   for rd in bdevs:
2344     try:
2345       rd.DisconnectNet()
2346     except errors.BlockDeviceError, err:
2347       _Fail("Can't change network configuration to standalone mode: %s",
2348             err, exc=True)
2349   return (True, "All disks are now disconnected")
2350
2351
2352 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2353   """Attaches the network on a list of drbd devices.
2354
2355   """
2356   status, bdevs = _FindDisks(nodes_ip, disks)
2357   if not status:
2358     return status, bdevs
2359
2360   if multimaster:
2361     for idx, rd in enumerate(bdevs):
2362       try:
2363         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2364       except EnvironmentError, err:
2365         _Fail("Can't create symlink: %s", err)
2366   # reconnect disks, switch to new master configuration and if
2367   # needed primary mode
2368   for rd in bdevs:
2369     try:
2370       rd.AttachNet(multimaster)
2371     except errors.BlockDeviceError, err:
2372       _Fail("Can't change network configuration: %s", err)
2373   # wait until the disks are connected; we need to retry the re-attach
2374   # if the device becomes standalone, as this might happen if the one
2375   # node disconnects and reconnects in a different mode before the
2376   # other node reconnects; in this case, one or both of the nodes will
2377   # decide it has wrong configuration and switch to standalone
2378   RECONNECT_TIMEOUT = 2 * 60
2379   sleep_time = 0.100 # start with 100 miliseconds
2380   timeout_limit = time.time() + RECONNECT_TIMEOUT
2381   while time.time() < timeout_limit:
2382     all_connected = True
2383     for rd in bdevs:
2384       stats = rd.GetProcStatus()
2385       if not (stats.is_connected or stats.is_in_resync):
2386         all_connected = False
2387       if stats.is_standalone:
2388         # peer had different config info and this node became
2389         # standalone, even though this should not happen with the
2390         # new staged way of changing disk configs
2391         try:
2392           rd.ReAttachNet(multimaster)
2393         except errors.BlockDeviceError, err:
2394           _Fail("Can't change network configuration: %s", err)
2395     if all_connected:
2396       break
2397     time.sleep(sleep_time)
2398     sleep_time = min(5, sleep_time * 1.5)
2399   if not all_connected:
2400     return (False, "Timeout in disk reconnecting")
2401   if multimaster:
2402     # change to primary mode
2403     for rd in bdevs:
2404       try:
2405         rd.Open()
2406       except errors.BlockDeviceError, err:
2407         _Fail("Can't change to primary mode: %s", err)
2408   if multimaster:
2409     msg = "multi-master and primary"
2410   else:
2411     msg = "single-master"
2412   return (True, "Disks are now configured as %s" % msg)
2413
2414
2415 def DrbdWaitSync(nodes_ip, disks):
2416   """Wait until DRBDs have synchronized.
2417
2418   """
2419   status, bdevs = _FindDisks(nodes_ip, disks)
2420   if not status:
2421     return status, bdevs
2422
2423   min_resync = 100
2424   alldone = True
2425   failure = False
2426   for rd in bdevs:
2427     stats = rd.GetProcStatus()
2428     if not (stats.is_connected or stats.is_in_resync):
2429       failure = True
2430       break
2431     alldone = alldone and (not stats.is_in_resync)
2432     if stats.sync_percent is not None:
2433       min_resync = min(min_resync, stats.sync_percent)
2434   return (not failure, (alldone, min_resync))
2435
2436
2437 def PowercycleNode(hypervisor_type):
2438   """Hard-powercycle the node.
2439
2440   Because we need to return first, and schedule the powercycle in the
2441   background, we won't be able to report failures nicely.
2442
2443   """
2444   hyper = hypervisor.GetHypervisor(hypervisor_type)
2445   try:
2446     pid = os.fork()
2447   except OSError, err:
2448     # if we can't fork, we'll pretend that we're in the child process
2449     pid = 0
2450   if pid > 0:
2451     return (True, "Reboot scheduled in 5 seconds")
2452   time.sleep(5)
2453   hyper.PowercycleNode()
2454
2455
2456 class HooksRunner(object):
2457   """Hook runner.
2458
2459   This class is instantiated on the node side (ganeti-noded) and not
2460   on the master side.
2461
2462   """
2463   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2464
2465   def __init__(self, hooks_base_dir=None):
2466     """Constructor for hooks runner.
2467
2468     @type hooks_base_dir: str or None
2469     @param hooks_base_dir: if not None, this overrides the
2470         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2471
2472     """
2473     if hooks_base_dir is None:
2474       hooks_base_dir = constants.HOOKS_BASE_DIR
2475     self._BASE_DIR = hooks_base_dir
2476
2477   @staticmethod
2478   def ExecHook(script, env):
2479     """Exec one hook script.
2480
2481     @type script: str
2482     @param script: the full path to the script
2483     @type env: dict
2484     @param env: the environment with which to exec the script
2485     @rtype: tuple (success, message)
2486     @return: a tuple of success and message, where success
2487         indicates the succes of the operation, and message
2488         which will contain the error details in case we
2489         failed
2490
2491     """
2492     # exec the process using subprocess and log the output
2493     fdstdin = None
2494     try:
2495       fdstdin = open("/dev/null", "r")
2496       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2497                                stderr=subprocess.STDOUT, close_fds=True,
2498                                shell=False, cwd="/", env=env)
2499       output = ""
2500       try:
2501         output = child.stdout.read(4096)
2502         child.stdout.close()
2503       except EnvironmentError, err:
2504         output += "Hook script error: %s" % str(err)
2505
2506       while True:
2507         try:
2508           result = child.wait()
2509           break
2510         except EnvironmentError, err:
2511           if err.errno == errno.EINTR:
2512             continue
2513           raise
2514     finally:
2515       # try not to leak fds
2516       for fd in (fdstdin, ):
2517         if fd is not None:
2518           try:
2519             fd.close()
2520           except EnvironmentError, err:
2521             # just log the error
2522             #logging.exception("Error while closing fd %s", fd)
2523             pass
2524
2525     return result == 0, utils.SafeEncode(output.strip())
2526
2527   def RunHooks(self, hpath, phase, env):
2528     """Run the scripts in the hooks directory.
2529
2530     @type hpath: str
2531     @param hpath: the path to the hooks directory which
2532         holds the scripts
2533     @type phase: str
2534     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2535         L{constants.HOOKS_PHASE_POST}
2536     @type env: dict
2537     @param env: dictionary with the environment for the hook
2538     @rtype: list
2539     @return: list of 3-element tuples:
2540       - script path
2541       - script result, either L{constants.HKR_SUCCESS} or
2542         L{constants.HKR_FAIL}
2543       - output of the script
2544
2545     @raise errors.ProgrammerError: for invalid input
2546         parameters
2547
2548     """
2549     if phase == constants.HOOKS_PHASE_PRE:
2550       suffix = "pre"
2551     elif phase == constants.HOOKS_PHASE_POST:
2552       suffix = "post"
2553     else:
2554       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2555     rr = []
2556
2557     subdir = "%s-%s.d" % (hpath, suffix)
2558     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2559     try:
2560       dir_contents = utils.ListVisibleFiles(dir_name)
2561     except OSError, err:
2562       # FIXME: must log output in case of failures
2563       return rr
2564
2565     # we use the standard python sort order,
2566     # so 00name is the recommended naming scheme
2567     dir_contents.sort()
2568     for relname in dir_contents:
2569       fname = os.path.join(dir_name, relname)
2570       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2571           self.RE_MASK.match(relname) is not None):
2572         rrval = constants.HKR_SKIP
2573         output = ""
2574       else:
2575         result, output = self.ExecHook(fname, env)
2576         if not result:
2577           rrval = constants.HKR_FAIL
2578         else:
2579           rrval = constants.HKR_SUCCESS
2580       rr.append(("%s/%s" % (subdir, relname), rrval, output))
2581
2582     return rr
2583
2584
2585 class IAllocatorRunner(object):
2586   """IAllocator runner.
2587
2588   This class is instantiated on the node side (ganeti-noded) and not on
2589   the master side.
2590
2591   """
2592   def Run(self, name, idata):
2593     """Run an iallocator script.
2594
2595     @type name: str
2596     @param name: the iallocator script name
2597     @type idata: str
2598     @param idata: the allocator input data
2599
2600     @rtype: tuple
2601     @return: four element tuple of:
2602        - run status (one of the IARUN_ constants)
2603        - stdout
2604        - stderr
2605        - fail reason (as from L{utils.RunResult})
2606
2607     """
2608     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2609                                   os.path.isfile)
2610     if alloc_script is None:
2611       return (constants.IARUN_NOTFOUND, None, None, None)
2612
2613     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2614     try:
2615       os.write(fd, idata)
2616       os.close(fd)
2617       result = utils.RunCmd([alloc_script, fin_name])
2618       if result.failed:
2619         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2620                 result.fail_reason)
2621     finally:
2622       os.unlink(fin_name)
2623
2624     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2625
2626
2627 class DevCacheManager(object):
2628   """Simple class for managing a cache of block device information.
2629
2630   """
2631   _DEV_PREFIX = "/dev/"
2632   _ROOT_DIR = constants.BDEV_CACHE_DIR
2633
2634   @classmethod
2635   def _ConvertPath(cls, dev_path):
2636     """Converts a /dev/name path to the cache file name.
2637
2638     This replaces slashes with underscores and strips the /dev
2639     prefix. It then returns the full path to the cache file.
2640
2641     @type dev_path: str
2642     @param dev_path: the C{/dev/} path name
2643     @rtype: str
2644     @return: the converted path name
2645
2646     """
2647     if dev_path.startswith(cls._DEV_PREFIX):
2648       dev_path = dev_path[len(cls._DEV_PREFIX):]
2649     dev_path = dev_path.replace("/", "_")
2650     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2651     return fpath
2652
2653   @classmethod
2654   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2655     """Updates the cache information for a given device.
2656
2657     @type dev_path: str
2658     @param dev_path: the pathname of the device
2659     @type owner: str
2660     @param owner: the owner (instance name) of the device
2661     @type on_primary: bool
2662     @param on_primary: whether this is the primary
2663         node nor not
2664     @type iv_name: str
2665     @param iv_name: the instance-visible name of the
2666         device, as in objects.Disk.iv_name
2667
2668     @rtype: None
2669
2670     """
2671     if dev_path is None:
2672       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2673       return
2674     fpath = cls._ConvertPath(dev_path)
2675     if on_primary:
2676       state = "primary"
2677     else:
2678       state = "secondary"
2679     if iv_name is None:
2680       iv_name = "not_visible"
2681     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2682     try:
2683       utils.WriteFile(fpath, data=fdata)
2684     except EnvironmentError, err:
2685       logging.exception("Can't update bdev cache for %s", dev_path)
2686
2687   @classmethod
2688   def RemoveCache(cls, dev_path):
2689     """Remove data for a dev_path.
2690
2691     This is just a wrapper over L{utils.RemoveFile} with a converted
2692     path name and logging.
2693
2694     @type dev_path: str
2695     @param dev_path: the pathname of the device
2696
2697     @rtype: None
2698
2699     """
2700     if dev_path is None:
2701       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2702       return
2703     fpath = cls._ConvertPath(dev_path)
2704     try:
2705       utils.RemoveFile(fpath)
2706     except EnvironmentError, err:
2707       logging.exception("Can't update bdev cache for %s", dev_path)