Xen: NIC parameters
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36 import zlib
37 import base64
38
39 from ganeti import errors
40 from ganeti import utils
41 from ganeti import ssh
42 from ganeti import hypervisor
43 from ganeti import constants
44 from ganeti import bdev
45 from ganeti import objects
46 from ganeti import ssconf
47
48
49 class RPCFail(Exception):
50   """Class denoting RPC failure.
51
52   Its argument is the error message.
53
54   """
55
56 def _Fail(msg, *args, **kwargs):
57   """Log an error and the raise an RPCFail exception.
58
59   This exception is then handled specially in the ganeti daemon and
60   turned into a 'failed' return type. As such, this function is a
61   useful shortcut for logging the error and returning it to the master
62   daemon.
63
64   @type msg: string
65   @param msg: the text of the exception
66   @raise RPCFail
67
68   """
69   if args:
70     msg = msg % args
71   if "exc" in kwargs and kwargs["exc"]:
72     logging.exception(msg)
73   else:
74     logging.error(msg)
75   raise RPCFail(msg)
76
77
78 def _GetConfig():
79   """Simple wrapper to return a SimpleStore.
80
81   @rtype: L{ssconf.SimpleStore}
82   @return: a SimpleStore instance
83
84   """
85   return ssconf.SimpleStore()
86
87
88 def _GetSshRunner(cluster_name):
89   """Simple wrapper to return an SshRunner.
90
91   @type cluster_name: str
92   @param cluster_name: the cluster name, which is needed
93       by the SshRunner constructor
94   @rtype: L{ssh.SshRunner}
95   @return: an SshRunner instance
96
97   """
98   return ssh.SshRunner(cluster_name)
99
100
101 def _Decompress(data):
102   """Unpacks data compressed by the RPC client.
103
104   @type data: list or tuple
105   @param data: Data sent by RPC client
106   @rtype: str
107   @return: Decompressed data
108
109   """
110   assert isinstance(data, (list, tuple))
111   assert len(data) == 2
112   (encoding, content) = data
113   if encoding == constants.RPC_ENCODING_NONE:
114     return content
115   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
116     return zlib.decompress(base64.b64decode(content))
117   else:
118     raise AssertionError("Unknown data encoding")
119
120
121 def _CleanDirectory(path, exclude=None):
122   """Removes all regular files in a directory.
123
124   @type path: str
125   @param path: the directory to clean
126   @type exclude: list
127   @param exclude: list of files to be excluded, defaults
128       to the empty list
129
130   """
131   if not os.path.isdir(path):
132     return
133   if exclude is None:
134     exclude = []
135   else:
136     # Normalize excluded paths
137     exclude = [os.path.normpath(i) for i in exclude]
138
139   for rel_name in utils.ListVisibleFiles(path):
140     full_name = os.path.normpath(os.path.join(path, rel_name))
141     if full_name in exclude:
142       continue
143     if os.path.isfile(full_name) and not os.path.islink(full_name):
144       utils.RemoveFile(full_name)
145
146
147 def JobQueuePurge():
148   """Removes job queue files and archived jobs.
149
150   @rtype: None
151
152   """
153   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
154   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
155
156
157 def GetMasterInfo():
158   """Returns master information.
159
160   This is an utility function to compute master information, either
161   for consumption here or from the node daemon.
162
163   @rtype: tuple
164   @return: (master_netdev, master_ip, master_name) if we have a good
165       configuration, otherwise (None, None, None)
166
167   """
168   try:
169     cfg = _GetConfig()
170     master_netdev = cfg.GetMasterNetdev()
171     master_ip = cfg.GetMasterIP()
172     master_node = cfg.GetMasterNode()
173   except errors.ConfigurationError, err:
174     logging.exception("Cluster configuration incomplete")
175     return (None, None, None)
176   return (master_netdev, master_ip, master_node)
177
178
179 def StartMaster(start_daemons):
180   """Activate local node as master node.
181
182   The function will always try activate the IP address of the master
183   (unless someone else has it). It will also start the master daemons,
184   based on the start_daemons parameter.
185
186   @type start_daemons: boolean
187   @param start_daemons: whther to also start the master
188       daemons (ganeti-masterd and ganeti-rapi)
189   @rtype: None
190
191   """
192   ok = True
193   master_netdev, master_ip, _ = GetMasterInfo()
194   if not master_netdev:
195     return False
196
197   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
198     if utils.OwnIpAddress(master_ip):
199       # we already have the ip:
200       logging.debug("Already started")
201     else:
202       logging.error("Someone else has the master ip, not activating")
203       ok = False
204   else:
205     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
206                            "dev", master_netdev, "label",
207                            "%s:0" % master_netdev])
208     if result.failed:
209       logging.error("Can't activate master IP: %s", result.output)
210       ok = False
211
212     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
213                            "-s", master_ip, master_ip])
214     # we'll ignore the exit code of arping
215
216   # and now start the master and rapi daemons
217   if start_daemons:
218     for daemon in 'ganeti-masterd', 'ganeti-rapi':
219       result = utils.RunCmd([daemon])
220       if result.failed:
221         logging.error("Can't start daemon %s: %s", daemon, result.output)
222         ok = False
223   return ok
224
225
226 def StopMaster(stop_daemons):
227   """Deactivate this node as master.
228
229   The function will always try to deactivate the IP address of the
230   master. It will also stop the master daemons depending on the
231   stop_daemons parameter.
232
233   @type stop_daemons: boolean
234   @param stop_daemons: whether to also stop the master daemons
235       (ganeti-masterd and ganeti-rapi)
236   @rtype: None
237
238   """
239   master_netdev, master_ip, _ = GetMasterInfo()
240   if not master_netdev:
241     return False
242
243   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
244                          "dev", master_netdev])
245   if result.failed:
246     logging.error("Can't remove the master IP, error: %s", result.output)
247     # but otherwise ignore the failure
248
249   if stop_daemons:
250     # stop/kill the rapi and the master daemon
251     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
252       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
253
254   return True
255
256
257 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
258   """Joins this node to the cluster.
259
260   This does the following:
261       - updates the hostkeys of the machine (rsa and dsa)
262       - adds the ssh private key to the user
263       - adds the ssh public key to the users' authorized_keys file
264
265   @type dsa: str
266   @param dsa: the DSA private key to write
267   @type dsapub: str
268   @param dsapub: the DSA public key to write
269   @type rsa: str
270   @param rsa: the RSA private key to write
271   @type rsapub: str
272   @param rsapub: the RSA public key to write
273   @type sshkey: str
274   @param sshkey: the SSH private key to write
275   @type sshpub: str
276   @param sshpub: the SSH public key to write
277   @rtype: boolean
278   @return: the success of the operation
279
280   """
281   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
282                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
283                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
284                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
285   for name, content, mode in sshd_keys:
286     utils.WriteFile(name, data=content, mode=mode)
287
288   try:
289     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
290                                                     mkdir=True)
291   except errors.OpExecError, err:
292     _Fail("Error while processing user ssh files: %s", err, exc=True)
293
294   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
295     utils.WriteFile(name, data=content, mode=0600)
296
297   utils.AddAuthorizedKey(auth_keys, sshpub)
298
299   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
300
301   return (True, "Node added successfully")
302
303
304 def LeaveCluster():
305   """Cleans up and remove the current node.
306
307   This function cleans up and prepares the current node to be removed
308   from the cluster.
309
310   If processing is successful, then it raises an
311   L{errors.QuitGanetiException} which is used as a special case to
312   shutdown the node daemon.
313
314   """
315   _CleanDirectory(constants.DATA_DIR)
316   JobQueuePurge()
317
318   try:
319     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
320   except errors.OpExecError:
321     logging.exception("Error while processing ssh files")
322     return
323
324   f = open(pub_key, 'r')
325   try:
326     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
327   finally:
328     f.close()
329
330   utils.RemoveFile(priv_key)
331   utils.RemoveFile(pub_key)
332
333   # Return a reassuring string to the caller, and quit
334   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
335
336
337 def GetNodeInfo(vgname, hypervisor_type):
338   """Gives back a hash with different informations about the node.
339
340   @type vgname: C{string}
341   @param vgname: the name of the volume group to ask for disk space information
342   @type hypervisor_type: C{str}
343   @param hypervisor_type: the name of the hypervisor to ask for
344       memory information
345   @rtype: C{dict}
346   @return: dictionary with the following keys:
347       - vg_size is the size of the configured volume group in MiB
348       - vg_free is the free size of the volume group in MiB
349       - memory_dom0 is the memory allocated for domain0 in MiB
350       - memory_free is the currently available (free) ram in MiB
351       - memory_total is the total number of ram in MiB
352
353   """
354   outputarray = {}
355   vginfo = _GetVGInfo(vgname)
356   outputarray['vg_size'] = vginfo['vg_size']
357   outputarray['vg_free'] = vginfo['vg_free']
358
359   hyper = hypervisor.GetHypervisor(hypervisor_type)
360   hyp_info = hyper.GetNodeInfo()
361   if hyp_info is not None:
362     outputarray.update(hyp_info)
363
364   f = open("/proc/sys/kernel/random/boot_id", 'r')
365   try:
366     outputarray["bootid"] = f.read(128).rstrip("\n")
367   finally:
368     f.close()
369
370   return outputarray
371
372
373 def VerifyNode(what, cluster_name):
374   """Verify the status of the local node.
375
376   Based on the input L{what} parameter, various checks are done on the
377   local node.
378
379   If the I{filelist} key is present, this list of
380   files is checksummed and the file/checksum pairs are returned.
381
382   If the I{nodelist} key is present, we check that we have
383   connectivity via ssh with the target nodes (and check the hostname
384   report).
385
386   If the I{node-net-test} key is present, we check that we have
387   connectivity to the given nodes via both primary IP and, if
388   applicable, secondary IPs.
389
390   @type what: C{dict}
391   @param what: a dictionary of things to check:
392       - filelist: list of files for which to compute checksums
393       - nodelist: list of nodes we should check ssh communication with
394       - node-net-test: list of nodes we should check node daemon port
395         connectivity with
396       - hypervisor: list with hypervisors to run the verify for
397   @rtype: dict
398   @return: a dictionary with the same keys as the input dict, and
399       values representing the result of the checks
400
401   """
402   result = {}
403
404   if constants.NV_HYPERVISOR in what:
405     result[constants.NV_HYPERVISOR] = tmp = {}
406     for hv_name in what[constants.NV_HYPERVISOR]:
407       tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
408
409   if constants.NV_FILELIST in what:
410     result[constants.NV_FILELIST] = utils.FingerprintFiles(
411       what[constants.NV_FILELIST])
412
413   if constants.NV_NODELIST in what:
414     result[constants.NV_NODELIST] = tmp = {}
415     random.shuffle(what[constants.NV_NODELIST])
416     for node in what[constants.NV_NODELIST]:
417       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
418       if not success:
419         tmp[node] = message
420
421   if constants.NV_NODENETTEST in what:
422     result[constants.NV_NODENETTEST] = tmp = {}
423     my_name = utils.HostInfo().name
424     my_pip = my_sip = None
425     for name, pip, sip in what[constants.NV_NODENETTEST]:
426       if name == my_name:
427         my_pip = pip
428         my_sip = sip
429         break
430     if not my_pip:
431       tmp[my_name] = ("Can't find my own primary/secondary IP"
432                       " in the node list")
433     else:
434       port = utils.GetNodeDaemonPort()
435       for name, pip, sip in what[constants.NV_NODENETTEST]:
436         fail = []
437         if not utils.TcpPing(pip, port, source=my_pip):
438           fail.append("primary")
439         if sip != pip:
440           if not utils.TcpPing(sip, port, source=my_sip):
441             fail.append("secondary")
442         if fail:
443           tmp[name] = ("failure using the %s interface(s)" %
444                        " and ".join(fail))
445
446   if constants.NV_LVLIST in what:
447     result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
448
449   if constants.NV_INSTANCELIST in what:
450     result[constants.NV_INSTANCELIST] = GetInstanceList(
451       what[constants.NV_INSTANCELIST])
452
453   if constants.NV_VGLIST in what:
454     result[constants.NV_VGLIST] = ListVolumeGroups()
455
456   if constants.NV_VERSION in what:
457     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
458                                     constants.RELEASE_VERSION)
459
460   if constants.NV_HVINFO in what:
461     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
462     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
463
464   if constants.NV_DRBDLIST in what:
465     try:
466       used_minors = bdev.DRBD8.GetUsedDevs().keys()
467     except errors.BlockDeviceError, err:
468       logging.warning("Can't get used minors list", exc_info=True)
469       used_minors = str(err)
470     result[constants.NV_DRBDLIST] = used_minors
471
472   return result
473
474
475 def GetVolumeList(vg_name):
476   """Compute list of logical volumes and their size.
477
478   @type vg_name: str
479   @param vg_name: the volume group whose LVs we should list
480   @rtype: dict
481   @return:
482       dictionary of all partions (key) with value being a tuple of
483       their size (in MiB), inactive and online status::
484
485         {'test1': ('20.06', True, True)}
486
487       in case of errors, a string is returned with the error
488       details.
489
490   """
491   lvs = {}
492   sep = '|'
493   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
494                          "--separator=%s" % sep,
495                          "-olv_name,lv_size,lv_attr", vg_name])
496   if result.failed:
497     logging.error("Failed to list logical volumes, lvs output: %s",
498                   result.output)
499     return result.output
500
501   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
502   for line in result.stdout.splitlines():
503     line = line.strip()
504     match = valid_line_re.match(line)
505     if not match:
506       logging.error("Invalid line returned from lvs output: '%s'", line)
507       continue
508     name, size, attr = match.groups()
509     inactive = attr[4] == '-'
510     online = attr[5] == 'o'
511     lvs[name] = (size, inactive, online)
512
513   return lvs
514
515
516 def ListVolumeGroups():
517   """List the volume groups and their size.
518
519   @rtype: dict
520   @return: dictionary with keys volume name and values the
521       size of the volume
522
523   """
524   return utils.ListVolumeGroups()
525
526
527 def NodeVolumes():
528   """List all volumes on this node.
529
530   @rtype: list
531   @return:
532     A list of dictionaries, each having four keys:
533       - name: the logical volume name,
534       - size: the size of the logical volume
535       - dev: the physical device on which the LV lives
536       - vg: the volume group to which it belongs
537
538     In case of errors, we return an empty list and log the
539     error.
540
541     Note that since a logical volume can live on multiple physical
542     volumes, the resulting list might include a logical volume
543     multiple times.
544
545   """
546   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
547                          "--separator=|",
548                          "--options=lv_name,lv_size,devices,vg_name"])
549   if result.failed:
550     logging.error("Failed to list logical volumes, lvs output: %s",
551                   result.output)
552     return []
553
554   def parse_dev(dev):
555     if '(' in dev:
556       return dev.split('(')[0]
557     else:
558       return dev
559
560   def map_line(line):
561     return {
562       'name': line[0].strip(),
563       'size': line[1].strip(),
564       'dev': parse_dev(line[2].strip()),
565       'vg': line[3].strip(),
566     }
567
568   return [map_line(line.split('|')) for line in result.stdout.splitlines()
569           if line.count('|') >= 3]
570
571
572 def BridgesExist(bridges_list):
573   """Check if a list of bridges exist on the current node.
574
575   @rtype: boolean
576   @return: C{True} if all of them exist, C{False} otherwise
577
578   """
579   for bridge in bridges_list:
580     if not utils.BridgeExists(bridge):
581       return False
582
583   return True
584
585
586 def GetInstanceList(hypervisor_list):
587   """Provides a list of instances.
588
589   @type hypervisor_list: list
590   @param hypervisor_list: the list of hypervisors to query information
591
592   @rtype: list
593   @return: a list of all running instances on the current node
594     - instance1.example.com
595     - instance2.example.com
596
597   """
598   results = []
599   for hname in hypervisor_list:
600     try:
601       names = hypervisor.GetHypervisor(hname).ListInstances()
602       results.extend(names)
603     except errors.HypervisorError, err:
604       logging.exception("Error enumerating instances for hypevisor %s", hname)
605       raise
606
607   return results
608
609
610 def GetInstanceInfo(instance, hname):
611   """Gives back the informations about an instance as a dictionary.
612
613   @type instance: string
614   @param instance: the instance name
615   @type hname: string
616   @param hname: the hypervisor type of the instance
617
618   @rtype: dict
619   @return: dictionary with the following keys:
620       - memory: memory size of instance (int)
621       - state: xen state of instance (string)
622       - time: cpu time of instance (float)
623
624   """
625   output = {}
626
627   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
628   if iinfo is not None:
629     output['memory'] = iinfo[2]
630     output['state'] = iinfo[4]
631     output['time'] = iinfo[5]
632
633   return output
634
635
636 def GetInstanceMigratable(instance):
637   """Gives whether an instance can be migrated.
638
639   @type instance: L{objects.Instance}
640   @param instance: object representing the instance to be checked.
641
642   @rtype: tuple
643   @return: tuple of (result, description) where:
644       - result: whether the instance can be migrated or not
645       - description: a description of the issue, if relevant
646
647   """
648   hyper = hypervisor.GetHypervisor(instance.hypervisor)
649   if instance.name not in hyper.ListInstances():
650     return (False, 'not running')
651
652   for idx in range(len(instance.disks)):
653     link_name = _GetBlockDevSymlinkPath(instance.name, idx)
654     if not os.path.islink(link_name):
655       return (False, 'not restarted since ganeti 1.2.5')
656
657   return (True, '')
658
659
660 def GetAllInstancesInfo(hypervisor_list):
661   """Gather data about all instances.
662
663   This is the equivalent of L{GetInstanceInfo}, except that it
664   computes data for all instances at once, thus being faster if one
665   needs data about more than one instance.
666
667   @type hypervisor_list: list
668   @param hypervisor_list: list of hypervisors to query for instance data
669
670   @rtype: dict
671   @return: dictionary of instance: data, with data having the following keys:
672       - memory: memory size of instance (int)
673       - state: xen state of instance (string)
674       - time: cpu time of instance (float)
675       - vcpus: the number of vcpus
676
677   """
678   output = {}
679
680   for hname in hypervisor_list:
681     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
682     if iinfo:
683       for name, inst_id, memory, vcpus, state, times in iinfo:
684         value = {
685           'memory': memory,
686           'vcpus': vcpus,
687           'state': state,
688           'time': times,
689           }
690         if name in output:
691           # we only check static parameters, like memory and vcpus,
692           # and not state and time which can change between the
693           # invocations of the different hypervisors
694           for key in 'memory', 'vcpus':
695             if value[key] != output[name][key]:
696               raise errors.HypervisorError("Instance %s is running twice"
697                                            " with different parameters" % name)
698         output[name] = value
699
700   return output
701
702
703 def InstanceOsAdd(instance, reinstall):
704   """Add an OS to an instance.
705
706   @type instance: L{objects.Instance}
707   @param instance: Instance whose OS is to be installed
708   @type reinstall: boolean
709   @param reinstall: whether this is an instance reinstall
710   @rtype: boolean
711   @return: the success of the operation
712
713   """
714   try:
715     inst_os = OSFromDisk(instance.os)
716   except errors.InvalidOS, err:
717     os_name, os_dir, os_err = err.args
718     if os_dir is None:
719       return (False, "Can't find OS '%s': %s" % (os_name, os_err))
720     else:
721       return (False, "Error parsing OS '%s' in directory %s: %s" %
722               (os_name, os_dir, os_err))
723
724   create_env = OSEnvironment(instance)
725   if reinstall:
726     create_env['INSTANCE_REINSTALL'] = "1"
727
728   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
729                                      instance.name, int(time.time()))
730
731   result = utils.RunCmd([inst_os.create_script], env=create_env,
732                         cwd=inst_os.path, output=logfile,)
733   if result.failed:
734     logging.error("os create command '%s' returned error: %s, logfile: %s,"
735                   " output: %s", result.cmd, result.fail_reason, logfile,
736                   result.output)
737     lines = [utils.SafeEncode(val)
738              for val in utils.TailFile(logfile, lines=20)]
739     return (False, "OS create script failed (%s), last lines in the"
740             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
741
742   return (True, "Successfully installed")
743
744
745 def RunRenameInstance(instance, old_name):
746   """Run the OS rename script for an instance.
747
748   @type instance: L{objects.Instance}
749   @param instance: Instance whose OS is to be installed
750   @type old_name: string
751   @param old_name: previous instance name
752   @rtype: boolean
753   @return: the success of the operation
754
755   """
756   inst_os = OSFromDisk(instance.os)
757
758   rename_env = OSEnvironment(instance)
759   rename_env['OLD_INSTANCE_NAME'] = old_name
760
761   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
762                                            old_name,
763                                            instance.name, int(time.time()))
764
765   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
766                         cwd=inst_os.path, output=logfile)
767
768   if result.failed:
769     logging.error("os create command '%s' returned error: %s output: %s",
770                   result.cmd, result.fail_reason, result.output)
771     lines = [utils.SafeEncode(val)
772              for val in utils.TailFile(logfile, lines=20)]
773     return (False, "OS rename script failed (%s), last lines in the"
774             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
775
776   return (True, "Rename successful")
777
778
779 def _GetVGInfo(vg_name):
780   """Get informations about the volume group.
781
782   @type vg_name: str
783   @param vg_name: the volume group which we query
784   @rtype: dict
785   @return:
786     A dictionary with the following keys:
787       - C{vg_size} is the total size of the volume group in MiB
788       - C{vg_free} is the free size of the volume group in MiB
789       - C{pv_count} are the number of physical disks in that VG
790
791     If an error occurs during gathering of data, we return the same dict
792     with keys all set to None.
793
794   """
795   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
796
797   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
798                          "--nosuffix", "--units=m", "--separator=:", vg_name])
799
800   if retval.failed:
801     logging.error("volume group %s not present", vg_name)
802     return retdic
803   valarr = retval.stdout.strip().rstrip(':').split(':')
804   if len(valarr) == 3:
805     try:
806       retdic = {
807         "vg_size": int(round(float(valarr[0]), 0)),
808         "vg_free": int(round(float(valarr[1]), 0)),
809         "pv_count": int(valarr[2]),
810         }
811     except ValueError, err:
812       logging.exception("Fail to parse vgs output")
813   else:
814     logging.error("vgs output has the wrong number of fields (expected"
815                   " three): %s", str(valarr))
816   return retdic
817
818
819 def _GetBlockDevSymlinkPath(instance_name, idx):
820   return os.path.join(constants.DISK_LINKS_DIR,
821                       "%s:%d" % (instance_name, idx))
822
823
824 def _SymlinkBlockDev(instance_name, device_path, idx):
825   """Set up symlinks to a instance's block device.
826
827   This is an auxiliary function run when an instance is start (on the primary
828   node) or when an instance is migrated (on the target node).
829
830
831   @param instance_name: the name of the target instance
832   @param device_path: path of the physical block device, on the node
833   @param idx: the disk index
834   @return: absolute path to the disk's symlink
835
836   """
837   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
838   try:
839     os.symlink(device_path, link_name)
840   except OSError, err:
841     if err.errno == errno.EEXIST:
842       if (not os.path.islink(link_name) or
843           os.readlink(link_name) != device_path):
844         os.remove(link_name)
845         os.symlink(device_path, link_name)
846     else:
847       raise
848
849   return link_name
850
851
852 def _RemoveBlockDevLinks(instance_name, disks):
853   """Remove the block device symlinks belonging to the given instance.
854
855   """
856   for idx, disk in enumerate(disks):
857     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
858     if os.path.islink(link_name):
859       try:
860         os.remove(link_name)
861       except OSError:
862         logging.exception("Can't remove symlink '%s'", link_name)
863
864
865 def _GatherAndLinkBlockDevs(instance):
866   """Set up an instance's block device(s).
867
868   This is run on the primary node at instance startup. The block
869   devices must be already assembled.
870
871   @type instance: L{objects.Instance}
872   @param instance: the instance whose disks we shoul assemble
873   @rtype: list
874   @return: list of (disk_object, device_path)
875
876   """
877   block_devices = []
878   for idx, disk in enumerate(instance.disks):
879     device = _RecursiveFindBD(disk)
880     if device is None:
881       raise errors.BlockDeviceError("Block device '%s' is not set up." %
882                                     str(disk))
883     device.Open()
884     try:
885       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
886     except OSError, e:
887       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
888                                     e.strerror)
889
890     block_devices.append((disk, link_name))
891
892   return block_devices
893
894
895 def StartInstance(instance):
896   """Start an instance.
897
898   @type instance: L{objects.Instance}
899   @param instance: the instance object
900   @rtype: boolean
901   @return: whether the startup was successful or not
902
903   """
904   running_instances = GetInstanceList([instance.hypervisor])
905
906   if instance.name in running_instances:
907     return (True, "Already running")
908
909   try:
910     block_devices = _GatherAndLinkBlockDevs(instance)
911     hyper = hypervisor.GetHypervisor(instance.hypervisor)
912     hyper.StartInstance(instance, block_devices)
913   except errors.BlockDeviceError, err:
914     _Fail("Block device error: %s", err, exc=True)
915   except errors.HypervisorError, err:
916     _RemoveBlockDevLinks(instance.name, instance.disks)
917     _Fail("Hypervisor error: %s", err, exc=True)
918
919   return (True, "Instance started successfully")
920
921
922 def InstanceShutdown(instance):
923   """Shut an instance down.
924
925   @note: this functions uses polling with a hardcoded timeout.
926
927   @type instance: L{objects.Instance}
928   @param instance: the instance object
929   @rtype: boolean
930   @return: whether the startup was successful or not
931
932   """
933   hv_name = instance.hypervisor
934   running_instances = GetInstanceList([hv_name])
935
936   if instance.name not in running_instances:
937     return (True, "Instance already stopped")
938
939   hyper = hypervisor.GetHypervisor(hv_name)
940   try:
941     hyper.StopInstance(instance)
942   except errors.HypervisorError, err:
943     _Fail("Failed to stop instance %s: %s", instance.name, err)
944
945   # test every 10secs for 2min
946
947   time.sleep(1)
948   for dummy in range(11):
949     if instance.name not in GetInstanceList([hv_name]):
950       break
951     time.sleep(10)
952   else:
953     # the shutdown did not succeed
954     logging.error("Shutdown of '%s' unsuccessful, using destroy",
955                   instance.name)
956
957     try:
958       hyper.StopInstance(instance, force=True)
959     except errors.HypervisorError, err:
960       _Fail("Failed to force stop instance %s: %s", instance.name, err)
961
962     time.sleep(1)
963     if instance.name in GetInstanceList([hv_name]):
964       _Fail("Could not shutdown instance %s even by destroy", instance.name)
965
966   _RemoveBlockDevLinks(instance.name, instance.disks)
967
968   return (True, "Instance has been shutdown successfully")
969
970
971 def InstanceReboot(instance, reboot_type):
972   """Reboot an instance.
973
974   @type instance: L{objects.Instance}
975   @param instance: the instance object to reboot
976   @type reboot_type: str
977   @param reboot_type: the type of reboot, one the following
978     constants:
979       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
980         instance OS, do not recreate the VM
981       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
982         restart the VM (at the hypervisor level)
983       - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
984         is not accepted here, since that mode is handled
985         differently
986   @rtype: boolean
987   @return: the success of the operation
988
989   """
990   running_instances = GetInstanceList([instance.hypervisor])
991
992   if instance.name not in running_instances:
993     _Fail("Cannot reboot instance %s that is not running", instance.name)
994
995   hyper = hypervisor.GetHypervisor(instance.hypervisor)
996   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
997     try:
998       hyper.RebootInstance(instance)
999     except errors.HypervisorError, err:
1000       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1001   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1002     try:
1003       stop_result = InstanceShutdown(instance)
1004       if not stop_result[0]:
1005         return stop_result
1006       return StartInstance(instance)
1007     except errors.HypervisorError, err:
1008       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1009   else:
1010     _Fail("Invalid reboot_type received: %s", reboot_type)
1011
1012   return (True, "Reboot successful")
1013
1014
1015 def MigrationInfo(instance):
1016   """Gather information about an instance to be migrated.
1017
1018   @type instance: L{objects.Instance}
1019   @param instance: the instance definition
1020
1021   """
1022   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1023   try:
1024     info = hyper.MigrationInfo(instance)
1025   except errors.HypervisorError, err:
1026     _Fail("Failed to fetch migration information: %s", err, exc=True)
1027   return (True, info)
1028
1029
1030 def AcceptInstance(instance, info, target):
1031   """Prepare the node to accept an instance.
1032
1033   @type instance: L{objects.Instance}
1034   @param instance: the instance definition
1035   @type info: string/data (opaque)
1036   @param info: migration information, from the source node
1037   @type target: string
1038   @param target: target host (usually ip), on this node
1039
1040   """
1041   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1042   try:
1043     hyper.AcceptInstance(instance, info, target)
1044   except errors.HypervisorError, err:
1045     _Fail("Failed to accept instance: %s", err, exc=True)
1046   return (True, "Accept successfull")
1047
1048
1049 def FinalizeMigration(instance, info, success):
1050   """Finalize any preparation to accept an instance.
1051
1052   @type instance: L{objects.Instance}
1053   @param instance: the instance definition
1054   @type info: string/data (opaque)
1055   @param info: migration information, from the source node
1056   @type success: boolean
1057   @param success: whether the migration was a success or a failure
1058
1059   """
1060   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1061   try:
1062     hyper.FinalizeMigration(instance, info, success)
1063   except errors.HypervisorError, err:
1064     _Fail("Failed to finalize migration: %s", err, exc=True)
1065   return (True, "Migration Finalized")
1066
1067
1068 def MigrateInstance(instance, target, live):
1069   """Migrates an instance to another node.
1070
1071   @type instance: L{objects.Instance}
1072   @param instance: the instance definition
1073   @type target: string
1074   @param target: the target node name
1075   @type live: boolean
1076   @param live: whether the migration should be done live or not (the
1077       interpretation of this parameter is left to the hypervisor)
1078   @rtype: tuple
1079   @return: a tuple of (success, msg) where:
1080       - succes is a boolean denoting the success/failure of the operation
1081       - msg is a string with details in case of failure
1082
1083   """
1084   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1085
1086   try:
1087     hyper.MigrateInstance(instance.name, target, live)
1088   except errors.HypervisorError, err:
1089     _Fail("Failed to migrate instance: %s", err, exc=True)
1090   return (True, "Migration successfull")
1091
1092
1093 def BlockdevCreate(disk, size, owner, on_primary, info):
1094   """Creates a block device for an instance.
1095
1096   @type disk: L{objects.Disk}
1097   @param disk: the object describing the disk we should create
1098   @type size: int
1099   @param size: the size of the physical underlying device, in MiB
1100   @type owner: str
1101   @param owner: the name of the instance for which disk is created,
1102       used for device cache data
1103   @type on_primary: boolean
1104   @param on_primary:  indicates if it is the primary node or not
1105   @type info: string
1106   @param info: string that will be sent to the physical device
1107       creation, used for example to set (LVM) tags on LVs
1108
1109   @return: the new unique_id of the device (this can sometime be
1110       computed only after creation), or None. On secondary nodes,
1111       it's not required to return anything.
1112
1113   """
1114   clist = []
1115   if disk.children:
1116     for child in disk.children:
1117       try:
1118         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1119       except errors.BlockDeviceError, err:
1120         _Fail("Can't assemble device %s: %s", child, err)
1121       if on_primary or disk.AssembleOnSecondary():
1122         # we need the children open in case the device itself has to
1123         # be assembled
1124         try:
1125           crdev.Open()
1126         except errors.BlockDeviceError, err:
1127           _Fail("Can't make child '%s' read-write: %s", child, err)
1128       clist.append(crdev)
1129
1130   try:
1131     device = bdev.Create(disk.dev_type, disk.physical_id, clist, size)
1132   except errors.BlockDeviceError, err:
1133     _Fail("Can't create block device: %s", err)
1134
1135   if on_primary or disk.AssembleOnSecondary():
1136     try:
1137       device.Assemble()
1138     except errors.BlockDeviceError, err:
1139       _Fail("Can't assemble device after creation, unusual event: %s", err)
1140     device.SetSyncSpeed(constants.SYNC_SPEED)
1141     if on_primary or disk.OpenOnSecondary():
1142       try:
1143         device.Open(force=True)
1144       except errors.BlockDeviceError, err:
1145         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1146     DevCacheManager.UpdateCache(device.dev_path, owner,
1147                                 on_primary, disk.iv_name)
1148
1149   device.SetInfo(info)
1150
1151   physical_id = device.unique_id
1152   return True, physical_id
1153
1154
1155 def BlockdevRemove(disk):
1156   """Remove a block device.
1157
1158   @note: This is intended to be called recursively.
1159
1160   @type disk: L{objects.Disk}
1161   @param disk: the disk object we should remove
1162   @rtype: boolean
1163   @return: the success of the operation
1164
1165   """
1166   msgs = []
1167   result = True
1168   try:
1169     rdev = _RecursiveFindBD(disk)
1170   except errors.BlockDeviceError, err:
1171     # probably can't attach
1172     logging.info("Can't attach to device %s in remove", disk)
1173     rdev = None
1174   if rdev is not None:
1175     r_path = rdev.dev_path
1176     try:
1177       rdev.Remove()
1178     except errors.BlockDeviceError, err:
1179       msgs.append(str(err))
1180       result = False
1181     if result:
1182       DevCacheManager.RemoveCache(r_path)
1183
1184   if disk.children:
1185     for child in disk.children:
1186       c_status, c_msg = BlockdevRemove(child)
1187       result = result and c_status
1188       if c_msg: # not an empty message
1189         msgs.append(c_msg)
1190
1191   return (result, "; ".join(msgs))
1192
1193
1194 def _RecursiveAssembleBD(disk, owner, as_primary):
1195   """Activate a block device for an instance.
1196
1197   This is run on the primary and secondary nodes for an instance.
1198
1199   @note: this function is called recursively.
1200
1201   @type disk: L{objects.Disk}
1202   @param disk: the disk we try to assemble
1203   @type owner: str
1204   @param owner: the name of the instance which owns the disk
1205   @type as_primary: boolean
1206   @param as_primary: if we should make the block device
1207       read/write
1208
1209   @return: the assembled device or None (in case no device
1210       was assembled)
1211   @raise errors.BlockDeviceError: in case there is an error
1212       during the activation of the children or the device
1213       itself
1214
1215   """
1216   children = []
1217   if disk.children:
1218     mcn = disk.ChildrenNeeded()
1219     if mcn == -1:
1220       mcn = 0 # max number of Nones allowed
1221     else:
1222       mcn = len(disk.children) - mcn # max number of Nones
1223     for chld_disk in disk.children:
1224       try:
1225         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1226       except errors.BlockDeviceError, err:
1227         if children.count(None) >= mcn:
1228           raise
1229         cdev = None
1230         logging.error("Error in child activation (but continuing): %s",
1231                       str(err))
1232       children.append(cdev)
1233
1234   if as_primary or disk.AssembleOnSecondary():
1235     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children)
1236     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1237     result = r_dev
1238     if as_primary or disk.OpenOnSecondary():
1239       r_dev.Open()
1240     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1241                                 as_primary, disk.iv_name)
1242
1243   else:
1244     result = True
1245   return result
1246
1247
1248 def BlockdevAssemble(disk, owner, as_primary):
1249   """Activate a block device for an instance.
1250
1251   This is a wrapper over _RecursiveAssembleBD.
1252
1253   @rtype: str or boolean
1254   @return: a C{/dev/...} path for primary nodes, and
1255       C{True} for secondary nodes
1256
1257   """
1258   status = True
1259   result = "no error information"
1260   try:
1261     result = _RecursiveAssembleBD(disk, owner, as_primary)
1262     if isinstance(result, bdev.BlockDev):
1263       result = result.dev_path
1264   except errors.BlockDeviceError, err:
1265     result = "Error while assembling disk: %s" % str(err)
1266     status = False
1267   return (status, result)
1268
1269
1270 def BlockdevShutdown(disk):
1271   """Shut down a block device.
1272
1273   First, if the device is assembled (Attach() is successfull), then
1274   the device is shutdown. Then the children of the device are
1275   shutdown.
1276
1277   This function is called recursively. Note that we don't cache the
1278   children or such, as oppossed to assemble, shutdown of different
1279   devices doesn't require that the upper device was active.
1280
1281   @type disk: L{objects.Disk}
1282   @param disk: the description of the disk we should
1283       shutdown
1284   @rtype: boolean
1285   @return: the success of the operation
1286
1287   """
1288   msgs = []
1289   result = True
1290   r_dev = _RecursiveFindBD(disk)
1291   if r_dev is not None:
1292     r_path = r_dev.dev_path
1293     try:
1294       r_dev.Shutdown()
1295       DevCacheManager.RemoveCache(r_path)
1296     except errors.BlockDeviceError, err:
1297       msgs.append(str(err))
1298       result = False
1299
1300   if disk.children:
1301     for child in disk.children:
1302       c_status, c_msg = BlockdevShutdown(child)
1303       result = result and c_status
1304       if c_msg: # not an empty message
1305         msgs.append(c_msg)
1306
1307   return (result, "; ".join(msgs))
1308
1309
1310 def BlockdevAddchildren(parent_cdev, new_cdevs):
1311   """Extend a mirrored block device.
1312
1313   @type parent_cdev: L{objects.Disk}
1314   @param parent_cdev: the disk to which we should add children
1315   @type new_cdevs: list of L{objects.Disk}
1316   @param new_cdevs: the list of children which we should add
1317   @rtype: boolean
1318   @return: the success of the operation
1319
1320   """
1321   parent_bdev = _RecursiveFindBD(parent_cdev)
1322   if parent_bdev is None:
1323     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1324   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1325   if new_bdevs.count(None) > 0:
1326     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1327   parent_bdev.AddChildren(new_bdevs)
1328   return (True, None)
1329
1330
1331 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1332   """Shrink a mirrored block device.
1333
1334   @type parent_cdev: L{objects.Disk}
1335   @param parent_cdev: the disk from which we should remove children
1336   @type new_cdevs: list of L{objects.Disk}
1337   @param new_cdevs: the list of children which we should remove
1338   @rtype: boolean
1339   @return: the success of the operation
1340
1341   """
1342   parent_bdev = _RecursiveFindBD(parent_cdev)
1343   if parent_bdev is None:
1344     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1345   devs = []
1346   for disk in new_cdevs:
1347     rpath = disk.StaticDevPath()
1348     if rpath is None:
1349       bd = _RecursiveFindBD(disk)
1350       if bd is None:
1351         _Fail("Can't find device %s while removing children", disk)
1352       else:
1353         devs.append(bd.dev_path)
1354     else:
1355       devs.append(rpath)
1356   parent_bdev.RemoveChildren(devs)
1357   return (True, None)
1358
1359
1360 def BlockdevGetmirrorstatus(disks):
1361   """Get the mirroring status of a list of devices.
1362
1363   @type disks: list of L{objects.Disk}
1364   @param disks: the list of disks which we should query
1365   @rtype: disk
1366   @return:
1367       a list of (mirror_done, estimated_time) tuples, which
1368       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1369   @raise errors.BlockDeviceError: if any of the disks cannot be
1370       found
1371
1372   """
1373   stats = []
1374   for dsk in disks:
1375     rbd = _RecursiveFindBD(dsk)
1376     if rbd is None:
1377       raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1378     stats.append(rbd.CombinedSyncStatus())
1379   return stats
1380
1381
1382 def _RecursiveFindBD(disk):
1383   """Check if a device is activated.
1384
1385   If so, return informations about the real device.
1386
1387   @type disk: L{objects.Disk}
1388   @param disk: the disk object we need to find
1389
1390   @return: None if the device can't be found,
1391       otherwise the device instance
1392
1393   """
1394   children = []
1395   if disk.children:
1396     for chdisk in disk.children:
1397       children.append(_RecursiveFindBD(chdisk))
1398
1399   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1400
1401
1402 def BlockdevFind(disk):
1403   """Check if a device is activated.
1404
1405   If it is, return informations about the real device.
1406
1407   @type disk: L{objects.Disk}
1408   @param disk: the disk to find
1409   @rtype: None or tuple
1410   @return: None if the disk cannot be found, otherwise a
1411       tuple (device_path, major, minor, sync_percent,
1412       estimated_time, is_degraded)
1413
1414   """
1415   try:
1416     rbd = _RecursiveFindBD(disk)
1417   except errors.BlockDeviceError, err:
1418     _Fail("Failed to find device: %s", err, exc=True)
1419   if rbd is None:
1420     return (True, None)
1421   return (True, (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus())
1422
1423
1424 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1425   """Write a file to the filesystem.
1426
1427   This allows the master to overwrite(!) a file. It will only perform
1428   the operation if the file belongs to a list of configuration files.
1429
1430   @type file_name: str
1431   @param file_name: the target file name
1432   @type data: str
1433   @param data: the new contents of the file
1434   @type mode: int
1435   @param mode: the mode to give the file (can be None)
1436   @type uid: int
1437   @param uid: the owner of the file (can be -1 for default)
1438   @type gid: int
1439   @param gid: the group of the file (can be -1 for default)
1440   @type atime: float
1441   @param atime: the atime to set on the file (can be None)
1442   @type mtime: float
1443   @param mtime: the mtime to set on the file (can be None)
1444   @rtype: boolean
1445   @return: the success of the operation; errors are logged
1446       in the node daemon log
1447
1448   """
1449   if not os.path.isabs(file_name):
1450     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1451
1452   allowed_files = set([
1453     constants.CLUSTER_CONF_FILE,
1454     constants.ETC_HOSTS,
1455     constants.SSH_KNOWN_HOSTS_FILE,
1456     constants.VNC_PASSWORD_FILE,
1457     constants.RAPI_CERT_FILE,
1458     constants.RAPI_USERS_FILE,
1459     ])
1460
1461   for hv_name in constants.HYPER_TYPES:
1462     hv_class = hypervisor.GetHypervisor(hv_name)
1463     allowed_files.update(hv_class.GetAncillaryFiles())
1464
1465   if file_name not in allowed_files:
1466     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1467           file_name)
1468
1469   raw_data = _Decompress(data)
1470
1471   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1472                   atime=atime, mtime=mtime)
1473   return (True, "success")
1474
1475
1476 def WriteSsconfFiles(values):
1477   """Update all ssconf files.
1478
1479   Wrapper around the SimpleStore.WriteFiles.
1480
1481   """
1482   ssconf.SimpleStore().WriteFiles(values)
1483
1484
1485 def _ErrnoOrStr(err):
1486   """Format an EnvironmentError exception.
1487
1488   If the L{err} argument has an errno attribute, it will be looked up
1489   and converted into a textual C{E...} description. Otherwise the
1490   string representation of the error will be returned.
1491
1492   @type err: L{EnvironmentError}
1493   @param err: the exception to format
1494
1495   """
1496   if hasattr(err, 'errno'):
1497     detail = errno.errorcode[err.errno]
1498   else:
1499     detail = str(err)
1500   return detail
1501
1502
1503 def _OSOndiskVersion(name, os_dir):
1504   """Compute and return the API version of a given OS.
1505
1506   This function will try to read the API version of the OS given by
1507   the 'name' parameter and residing in the 'os_dir' directory.
1508
1509   @type name: str
1510   @param name: the OS name we should look for
1511   @type os_dir: str
1512   @param os_dir: the directory inwhich we should look for the OS
1513   @rtype: int or None
1514   @return:
1515       Either an integer denoting the version or None in the
1516       case when this is not a valid OS name.
1517   @raise errors.InvalidOS: if the OS cannot be found
1518
1519   """
1520   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1521
1522   try:
1523     st = os.stat(api_file)
1524   except EnvironmentError, err:
1525     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1526                            " found (%s)" % _ErrnoOrStr(err))
1527
1528   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1529     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1530                            " a regular file")
1531
1532   try:
1533     f = open(api_file)
1534     try:
1535       api_versions = f.readlines()
1536     finally:
1537       f.close()
1538   except EnvironmentError, err:
1539     raise errors.InvalidOS(name, os_dir, "error while reading the"
1540                            " API version (%s)" % _ErrnoOrStr(err))
1541
1542   api_versions = [version.strip() for version in api_versions]
1543   try:
1544     api_versions = [int(version) for version in api_versions]
1545   except (TypeError, ValueError), err:
1546     raise errors.InvalidOS(name, os_dir,
1547                            "API version is not integer (%s)" % str(err))
1548
1549   return api_versions
1550
1551
1552 def DiagnoseOS(top_dirs=None):
1553   """Compute the validity for all OSes.
1554
1555   @type top_dirs: list
1556   @param top_dirs: the list of directories in which to
1557       search (if not given defaults to
1558       L{constants.OS_SEARCH_PATH})
1559   @rtype: list of L{objects.OS}
1560   @return: an OS object for each name in all the given
1561       directories
1562
1563   """
1564   if top_dirs is None:
1565     top_dirs = constants.OS_SEARCH_PATH
1566
1567   result = []
1568   for dir_name in top_dirs:
1569     if os.path.isdir(dir_name):
1570       try:
1571         f_names = utils.ListVisibleFiles(dir_name)
1572       except EnvironmentError, err:
1573         logging.exception("Can't list the OS directory %s", dir_name)
1574         break
1575       for name in f_names:
1576         try:
1577           os_inst = OSFromDisk(name, base_dir=dir_name)
1578           result.append(os_inst)
1579         except errors.InvalidOS, err:
1580           result.append(objects.OS.FromInvalidOS(err))
1581
1582   return result
1583
1584
1585 def OSFromDisk(name, base_dir=None):
1586   """Create an OS instance from disk.
1587
1588   This function will return an OS instance if the given name is a
1589   valid OS name. Otherwise, it will raise an appropriate
1590   L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1591
1592   @type base_dir: string
1593   @keyword base_dir: Base directory containing OS installations.
1594                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1595   @rtype: L{objects.OS}
1596   @return: the OS instance if we find a valid one
1597   @raise errors.InvalidOS: if we don't find a valid OS
1598
1599   """
1600   if base_dir is None:
1601     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1602     if os_dir is None:
1603       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1604   else:
1605     os_dir = os.path.sep.join([base_dir, name])
1606
1607   api_versions = _OSOndiskVersion(name, os_dir)
1608
1609   if constants.OS_API_VERSION not in api_versions:
1610     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1611                            " (found %s want %s)"
1612                            % (api_versions, constants.OS_API_VERSION))
1613
1614   # OS Scripts dictionary, we will populate it with the actual script names
1615   os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1616
1617   for script in os_scripts:
1618     os_scripts[script] = os.path.sep.join([os_dir, script])
1619
1620     try:
1621       st = os.stat(os_scripts[script])
1622     except EnvironmentError, err:
1623       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1624                              (script, _ErrnoOrStr(err)))
1625
1626     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1627       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1628                              script)
1629
1630     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1631       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1632                              script)
1633
1634
1635   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1636                     create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1637                     export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1638                     import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1639                     rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1640                     api_versions=api_versions)
1641
1642 def OSEnvironment(instance, debug=0):
1643   """Calculate the environment for an os script.
1644
1645   @type instance: L{objects.Instance}
1646   @param instance: target instance for the os script run
1647   @type debug: integer
1648   @param debug: debug level (0 or 1, for OS Api 10)
1649   @rtype: dict
1650   @return: dict of environment variables
1651   @raise errors.BlockDeviceError: if the block device
1652       cannot be found
1653
1654   """
1655   result = {}
1656   result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1657   result['INSTANCE_NAME'] = instance.name
1658   result['INSTANCE_OS'] = instance.os
1659   result['HYPERVISOR'] = instance.hypervisor
1660   result['DISK_COUNT'] = '%d' % len(instance.disks)
1661   result['NIC_COUNT'] = '%d' % len(instance.nics)
1662   result['DEBUG_LEVEL'] = '%d' % debug
1663   for idx, disk in enumerate(instance.disks):
1664     real_disk = _RecursiveFindBD(disk)
1665     if real_disk is None:
1666       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1667                                     str(disk))
1668     real_disk.Open()
1669     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1670     result['DISK_%d_ACCESS' % idx] = disk.mode
1671     if constants.HV_DISK_TYPE in instance.hvparams:
1672       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1673         instance.hvparams[constants.HV_DISK_TYPE]
1674     if disk.dev_type in constants.LDS_BLOCK:
1675       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1676     elif disk.dev_type == constants.LD_FILE:
1677       result['DISK_%d_BACKEND_TYPE' % idx] = \
1678         'file:%s' % disk.physical_id[0]
1679   for idx, nic in enumerate(instance.nics):
1680     result['NIC_%d_MAC' % idx] = nic.mac
1681     if nic.ip:
1682       result['NIC_%d_IP' % idx] = nic.ip
1683     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1684     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1685       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1686     if nic.nicparams[constants.NIC_LINK]:
1687       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1688     if constants.HV_NIC_TYPE in instance.hvparams:
1689       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1690         instance.hvparams[constants.HV_NIC_TYPE]
1691
1692   return result
1693
1694 def BlockdevGrow(disk, amount):
1695   """Grow a stack of block devices.
1696
1697   This function is called recursively, with the childrens being the
1698   first ones to resize.
1699
1700   @type disk: L{objects.Disk}
1701   @param disk: the disk to be grown
1702   @rtype: (status, result)
1703   @return: a tuple with the status of the operation
1704       (True/False), and the errors message if status
1705       is False
1706
1707   """
1708   r_dev = _RecursiveFindBD(disk)
1709   if r_dev is None:
1710     return False, "Cannot find block device %s" % (disk,)
1711
1712   try:
1713     r_dev.Grow(amount)
1714   except errors.BlockDeviceError, err:
1715     _Fail("Failed to grow block device: %s", err, exc=True)
1716
1717   return True, None
1718
1719
1720 def BlockdevSnapshot(disk):
1721   """Create a snapshot copy of a block device.
1722
1723   This function is called recursively, and the snapshot is actually created
1724   just for the leaf lvm backend device.
1725
1726   @type disk: L{objects.Disk}
1727   @param disk: the disk to be snapshotted
1728   @rtype: string
1729   @return: snapshot disk path
1730
1731   """
1732   if disk.children:
1733     if len(disk.children) == 1:
1734       # only one child, let's recurse on it
1735       return BlockdevSnapshot(disk.children[0])
1736     else:
1737       # more than one child, choose one that matches
1738       for child in disk.children:
1739         if child.size == disk.size:
1740           # return implies breaking the loop
1741           return BlockdevSnapshot(child)
1742   elif disk.dev_type == constants.LD_LV:
1743     r_dev = _RecursiveFindBD(disk)
1744     if r_dev is not None:
1745       # let's stay on the safe side and ask for the full size, for now
1746       return r_dev.Snapshot(disk.size)
1747     else:
1748       return None
1749   else:
1750     raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1751                                  " '%s' of type '%s'" %
1752                                  (disk.unique_id, disk.dev_type))
1753
1754
1755 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1756   """Export a block device snapshot to a remote node.
1757
1758   @type disk: L{objects.Disk}
1759   @param disk: the description of the disk to export
1760   @type dest_node: str
1761   @param dest_node: the destination node to export to
1762   @type instance: L{objects.Instance}
1763   @param instance: the instance object to whom the disk belongs
1764   @type cluster_name: str
1765   @param cluster_name: the cluster name, needed for SSH hostalias
1766   @type idx: int
1767   @param idx: the index of the disk in the instance's disk list,
1768       used to export to the OS scripts environment
1769   @rtype: boolean
1770   @return: the success of the operation
1771
1772   """
1773   export_env = OSEnvironment(instance)
1774
1775   inst_os = OSFromDisk(instance.os)
1776   export_script = inst_os.export_script
1777
1778   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1779                                      instance.name, int(time.time()))
1780   if not os.path.exists(constants.LOG_OS_DIR):
1781     os.mkdir(constants.LOG_OS_DIR, 0750)
1782   real_disk = _RecursiveFindBD(disk)
1783   if real_disk is None:
1784     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1785                                   str(disk))
1786   real_disk.Open()
1787
1788   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1789   export_env['EXPORT_INDEX'] = str(idx)
1790
1791   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1792   destfile = disk.physical_id[1]
1793
1794   # the target command is built out of three individual commands,
1795   # which are joined by pipes; we check each individual command for
1796   # valid parameters
1797   expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1798                                export_script, logfile)
1799
1800   comprcmd = "gzip"
1801
1802   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1803                                 destdir, destdir, destfile)
1804   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1805                                                    constants.GANETI_RUNAS,
1806                                                    destcmd)
1807
1808   # all commands have been checked, so we're safe to combine them
1809   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1810
1811   result = utils.RunCmd(command, env=export_env)
1812
1813   if result.failed:
1814     logging.error("os snapshot export command '%s' returned error: %s"
1815                   " output: %s", command, result.fail_reason, result.output)
1816     return False
1817
1818   return True
1819
1820
1821 def FinalizeExport(instance, snap_disks):
1822   """Write out the export configuration information.
1823
1824   @type instance: L{objects.Instance}
1825   @param instance: the instance which we export, used for
1826       saving configuration
1827   @type snap_disks: list of L{objects.Disk}
1828   @param snap_disks: list of snapshot block devices, which
1829       will be used to get the actual name of the dump file
1830
1831   @rtype: boolean
1832   @return: the success of the operation
1833
1834   """
1835   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1836   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1837
1838   config = objects.SerializableConfigParser()
1839
1840   config.add_section(constants.INISECT_EXP)
1841   config.set(constants.INISECT_EXP, 'version', '0')
1842   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1843   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1844   config.set(constants.INISECT_EXP, 'os', instance.os)
1845   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1846
1847   config.add_section(constants.INISECT_INS)
1848   config.set(constants.INISECT_INS, 'name', instance.name)
1849   config.set(constants.INISECT_INS, 'memory', '%d' %
1850              instance.beparams[constants.BE_MEMORY])
1851   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1852              instance.beparams[constants.BE_VCPUS])
1853   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1854
1855   nic_total = 0
1856   for nic_count, nic in enumerate(instance.nics):
1857     nic_total += 1
1858     config.set(constants.INISECT_INS, 'nic%d_mac' %
1859                nic_count, '%s' % nic.mac)
1860     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1861     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1862                '%s' % nic.bridge)
1863   # TODO: redundant: on load can read nics until it doesn't exist
1864   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1865
1866   disk_total = 0
1867   for disk_count, disk in enumerate(snap_disks):
1868     if disk:
1869       disk_total += 1
1870       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1871                  ('%s' % disk.iv_name))
1872       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1873                  ('%s' % disk.physical_id[1]))
1874       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1875                  ('%d' % disk.size))
1876
1877   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1878
1879   utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1880                   data=config.Dumps())
1881   shutil.rmtree(finaldestdir, True)
1882   shutil.move(destdir, finaldestdir)
1883
1884   return True
1885
1886
1887 def ExportInfo(dest):
1888   """Get export configuration information.
1889
1890   @type dest: str
1891   @param dest: directory containing the export
1892
1893   @rtype: L{objects.SerializableConfigParser}
1894   @return: a serializable config file containing the
1895       export info
1896
1897   """
1898   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1899
1900   config = objects.SerializableConfigParser()
1901   config.read(cff)
1902
1903   if (not config.has_section(constants.INISECT_EXP) or
1904       not config.has_section(constants.INISECT_INS)):
1905     return None
1906
1907   return config
1908
1909
1910 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1911   """Import an os image into an instance.
1912
1913   @type instance: L{objects.Instance}
1914   @param instance: instance to import the disks into
1915   @type src_node: string
1916   @param src_node: source node for the disk images
1917   @type src_images: list of string
1918   @param src_images: absolute paths of the disk images
1919   @rtype: list of boolean
1920   @return: each boolean represent the success of importing the n-th disk
1921
1922   """
1923   import_env = OSEnvironment(instance)
1924   inst_os = OSFromDisk(instance.os)
1925   import_script = inst_os.import_script
1926
1927   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1928                                         instance.name, int(time.time()))
1929   if not os.path.exists(constants.LOG_OS_DIR):
1930     os.mkdir(constants.LOG_OS_DIR, 0750)
1931
1932   comprcmd = "gunzip"
1933   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1934                                import_script, logfile)
1935
1936   final_result = []
1937   for idx, image in enumerate(src_images):
1938     if image:
1939       destcmd = utils.BuildShellCmd('cat %s', image)
1940       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1941                                                        constants.GANETI_RUNAS,
1942                                                        destcmd)
1943       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1944       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1945       import_env['IMPORT_INDEX'] = str(idx)
1946       result = utils.RunCmd(command, env=import_env)
1947       if result.failed:
1948         logging.error("Disk import command '%s' returned error: %s"
1949                       " output: %s", command, result.fail_reason,
1950                       result.output)
1951         final_result.append(False)
1952       else:
1953         final_result.append(True)
1954     else:
1955       final_result.append(True)
1956
1957   return final_result
1958
1959
1960 def ListExports():
1961   """Return a list of exports currently available on this machine.
1962
1963   @rtype: list
1964   @return: list of the exports
1965
1966   """
1967   if os.path.isdir(constants.EXPORT_DIR):
1968     return utils.ListVisibleFiles(constants.EXPORT_DIR)
1969   else:
1970     return []
1971
1972
1973 def RemoveExport(export):
1974   """Remove an existing export from the node.
1975
1976   @type export: str
1977   @param export: the name of the export to remove
1978   @rtype: boolean
1979   @return: the success of the operation
1980
1981   """
1982   target = os.path.join(constants.EXPORT_DIR, export)
1983
1984   shutil.rmtree(target)
1985   # TODO: catch some of the relevant exceptions and provide a pretty
1986   # error message if rmtree fails.
1987
1988   return True
1989
1990
1991 def BlockdevRename(devlist):
1992   """Rename a list of block devices.
1993
1994   @type devlist: list of tuples
1995   @param devlist: list of tuples of the form  (disk,
1996       new_logical_id, new_physical_id); disk is an
1997       L{objects.Disk} object describing the current disk,
1998       and new logical_id/physical_id is the name we
1999       rename it to
2000   @rtype: boolean
2001   @return: True if all renames succeeded, False otherwise
2002
2003   """
2004   msgs = []
2005   result = True
2006   for disk, unique_id in devlist:
2007     dev = _RecursiveFindBD(disk)
2008     if dev is None:
2009       msgs.append("Can't find device %s in rename" % str(disk))
2010       result = False
2011       continue
2012     try:
2013       old_rpath = dev.dev_path
2014       dev.Rename(unique_id)
2015       new_rpath = dev.dev_path
2016       if old_rpath != new_rpath:
2017         DevCacheManager.RemoveCache(old_rpath)
2018         # FIXME: we should add the new cache information here, like:
2019         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2020         # but we don't have the owner here - maybe parse from existing
2021         # cache? for now, we only lose lvm data when we rename, which
2022         # is less critical than DRBD or MD
2023     except errors.BlockDeviceError, err:
2024       msgs.append("Can't rename device '%s' to '%s': %s" %
2025                   (dev, unique_id, err))
2026       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2027       result = False
2028   return (result, "; ".join(msgs))
2029
2030
2031 def _TransformFileStorageDir(file_storage_dir):
2032   """Checks whether given file_storage_dir is valid.
2033
2034   Checks wheter the given file_storage_dir is within the cluster-wide
2035   default file_storage_dir stored in SimpleStore. Only paths under that
2036   directory are allowed.
2037
2038   @type file_storage_dir: str
2039   @param file_storage_dir: the path to check
2040
2041   @return: the normalized path if valid, None otherwise
2042
2043   """
2044   cfg = _GetConfig()
2045   file_storage_dir = os.path.normpath(file_storage_dir)
2046   base_file_storage_dir = cfg.GetFileStorageDir()
2047   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2048       base_file_storage_dir):
2049     logging.error("file storage directory '%s' is not under base file"
2050                   " storage directory '%s'",
2051                   file_storage_dir, base_file_storage_dir)
2052     return None
2053   return file_storage_dir
2054
2055
2056 def CreateFileStorageDir(file_storage_dir):
2057   """Create file storage directory.
2058
2059   @type file_storage_dir: str
2060   @param file_storage_dir: directory to create
2061
2062   @rtype: tuple
2063   @return: tuple with first element a boolean indicating wheter dir
2064       creation was successful or not
2065
2066   """
2067   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2068   result = True,
2069   if not file_storage_dir:
2070     result = False,
2071   else:
2072     if os.path.exists(file_storage_dir):
2073       if not os.path.isdir(file_storage_dir):
2074         logging.error("'%s' is not a directory", file_storage_dir)
2075         result = False,
2076     else:
2077       try:
2078         os.makedirs(file_storage_dir, 0750)
2079       except OSError, err:
2080         logging.error("Cannot create file storage directory '%s': %s",
2081                       file_storage_dir, err)
2082         result = False,
2083   return result
2084
2085
2086 def RemoveFileStorageDir(file_storage_dir):
2087   """Remove file storage directory.
2088
2089   Remove it only if it's empty. If not log an error and return.
2090
2091   @type file_storage_dir: str
2092   @param file_storage_dir: the directory we should cleanup
2093   @rtype: tuple (success,)
2094   @return: tuple of one element, C{success}, denoting
2095       whether the operation was successfull
2096
2097   """
2098   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2099   result = True,
2100   if not file_storage_dir:
2101     result = False,
2102   else:
2103     if os.path.exists(file_storage_dir):
2104       if not os.path.isdir(file_storage_dir):
2105         logging.error("'%s' is not a directory", file_storage_dir)
2106         result = False,
2107       # deletes dir only if empty, otherwise we want to return False
2108       try:
2109         os.rmdir(file_storage_dir)
2110       except OSError, err:
2111         logging.exception("Cannot remove file storage directory '%s'",
2112                           file_storage_dir)
2113         result = False,
2114   return result
2115
2116
2117 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2118   """Rename the file storage directory.
2119
2120   @type old_file_storage_dir: str
2121   @param old_file_storage_dir: the current path
2122   @type new_file_storage_dir: str
2123   @param new_file_storage_dir: the name we should rename to
2124   @rtype: tuple (success,)
2125   @return: tuple of one element, C{success}, denoting
2126       whether the operation was successful
2127
2128   """
2129   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2130   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2131   result = True,
2132   if not old_file_storage_dir or not new_file_storage_dir:
2133     result = False,
2134   else:
2135     if not os.path.exists(new_file_storage_dir):
2136       if os.path.isdir(old_file_storage_dir):
2137         try:
2138           os.rename(old_file_storage_dir, new_file_storage_dir)
2139         except OSError, err:
2140           logging.exception("Cannot rename '%s' to '%s'",
2141                             old_file_storage_dir, new_file_storage_dir)
2142           result =  False,
2143       else:
2144         logging.error("'%s' is not a directory", old_file_storage_dir)
2145         result = False,
2146     else:
2147       if os.path.exists(old_file_storage_dir):
2148         logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
2149                       old_file_storage_dir, new_file_storage_dir)
2150         result = False,
2151   return result
2152
2153
2154 def _IsJobQueueFile(file_name):
2155   """Checks whether the given filename is in the queue directory.
2156
2157   @type file_name: str
2158   @param file_name: the file name we should check
2159   @rtype: boolean
2160   @return: whether the file is under the queue directory
2161
2162   """
2163   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2164   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2165
2166   if not result:
2167     logging.error("'%s' is not a file in the queue directory",
2168                   file_name)
2169
2170   return result
2171
2172
2173 def JobQueueUpdate(file_name, content):
2174   """Updates a file in the queue directory.
2175
2176   This is just a wrapper over L{utils.WriteFile}, with proper
2177   checking.
2178
2179   @type file_name: str
2180   @param file_name: the job file name
2181   @type content: str
2182   @param content: the new job contents
2183   @rtype: boolean
2184   @return: the success of the operation
2185
2186   """
2187   if not _IsJobQueueFile(file_name):
2188     return False
2189
2190   # Write and replace the file atomically
2191   utils.WriteFile(file_name, data=_Decompress(content))
2192
2193   return True
2194
2195
2196 def JobQueueRename(old, new):
2197   """Renames a job queue file.
2198
2199   This is just a wrapper over os.rename with proper checking.
2200
2201   @type old: str
2202   @param old: the old (actual) file name
2203   @type new: str
2204   @param new: the desired file name
2205   @rtype: boolean
2206   @return: the success of the operation
2207
2208   """
2209   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2210     return False
2211
2212   utils.RenameFile(old, new, mkdir=True)
2213
2214   return True
2215
2216
2217 def JobQueueSetDrainFlag(drain_flag):
2218   """Set the drain flag for the queue.
2219
2220   This will set or unset the queue drain flag.
2221
2222   @type drain_flag: boolean
2223   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2224   @rtype: boolean
2225   @return: always True
2226   @warning: the function always returns True
2227
2228   """
2229   if drain_flag:
2230     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2231   else:
2232     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2233
2234   return True
2235
2236
2237 def BlockdevClose(instance_name, disks):
2238   """Closes the given block devices.
2239
2240   This means they will be switched to secondary mode (in case of
2241   DRBD).
2242
2243   @param instance_name: if the argument is not empty, the symlinks
2244       of this instance will be removed
2245   @type disks: list of L{objects.Disk}
2246   @param disks: the list of disks to be closed
2247   @rtype: tuple (success, message)
2248   @return: a tuple of success and message, where success
2249       indicates the succes of the operation, and message
2250       which will contain the error details in case we
2251       failed
2252
2253   """
2254   bdevs = []
2255   for cf in disks:
2256     rd = _RecursiveFindBD(cf)
2257     if rd is None:
2258       _Fail("Can't find device %s", cf)
2259     bdevs.append(rd)
2260
2261   msg = []
2262   for rd in bdevs:
2263     try:
2264       rd.Close()
2265     except errors.BlockDeviceError, err:
2266       msg.append(str(err))
2267   if msg:
2268     return (False, "Can't make devices secondary: %s" % ",".join(msg))
2269   else:
2270     if instance_name:
2271       _RemoveBlockDevLinks(instance_name, disks)
2272     return (True, "All devices secondary")
2273
2274
2275 def ValidateHVParams(hvname, hvparams):
2276   """Validates the given hypervisor parameters.
2277
2278   @type hvname: string
2279   @param hvname: the hypervisor name
2280   @type hvparams: dict
2281   @param hvparams: the hypervisor parameters to be validated
2282   @rtype: tuple (success, message)
2283   @return: a tuple of success and message, where success
2284       indicates the succes of the operation, and message
2285       which will contain the error details in case we
2286       failed
2287
2288   """
2289   try:
2290     hv_type = hypervisor.GetHypervisor(hvname)
2291     hv_type.ValidateParameters(hvparams)
2292     return (True, "Validation passed")
2293   except errors.HypervisorError, err:
2294     return (False, str(err))
2295
2296
2297 def DemoteFromMC():
2298   """Demotes the current node from master candidate role.
2299
2300   """
2301   # try to ensure we're not the master by mistake
2302   master, myself = ssconf.GetMasterAndMyself()
2303   if master == myself:
2304     return (False, "ssconf status shows I'm the master node, will not demote")
2305   pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2306   if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2307     return (False, "The master daemon is running, will not demote")
2308   try:
2309     utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2310   except EnvironmentError, err:
2311     if err.errno != errno.ENOENT:
2312       return (False, "Error while backing up cluster file: %s" % str(err))
2313   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2314   return (True, "Done")
2315
2316
2317 def _FindDisks(nodes_ip, disks):
2318   """Sets the physical ID on disks and returns the block devices.
2319
2320   """
2321   # set the correct physical ID
2322   my_name = utils.HostInfo().name
2323   for cf in disks:
2324     cf.SetPhysicalID(my_name, nodes_ip)
2325
2326   bdevs = []
2327
2328   for cf in disks:
2329     rd = _RecursiveFindBD(cf)
2330     if rd is None:
2331       return (False, "Can't find device %s" % cf)
2332     bdevs.append(rd)
2333   return (True, bdevs)
2334
2335
2336 def DrbdDisconnectNet(nodes_ip, disks):
2337   """Disconnects the network on a list of drbd devices.
2338
2339   """
2340   status, bdevs = _FindDisks(nodes_ip, disks)
2341   if not status:
2342     return status, bdevs
2343
2344   # disconnect disks
2345   for rd in bdevs:
2346     try:
2347       rd.DisconnectNet()
2348     except errors.BlockDeviceError, err:
2349       _Fail("Can't change network configuration to standalone mode: %s",
2350             err, exc=True)
2351   return (True, "All disks are now disconnected")
2352
2353
2354 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2355   """Attaches the network on a list of drbd devices.
2356
2357   """
2358   status, bdevs = _FindDisks(nodes_ip, disks)
2359   if not status:
2360     return status, bdevs
2361
2362   if multimaster:
2363     for idx, rd in enumerate(bdevs):
2364       try:
2365         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2366       except EnvironmentError, err:
2367         _Fail("Can't create symlink: %s", err)
2368   # reconnect disks, switch to new master configuration and if
2369   # needed primary mode
2370   for rd in bdevs:
2371     try:
2372       rd.AttachNet(multimaster)
2373     except errors.BlockDeviceError, err:
2374       _Fail("Can't change network configuration: %s", err)
2375   # wait until the disks are connected; we need to retry the re-attach
2376   # if the device becomes standalone, as this might happen if the one
2377   # node disconnects and reconnects in a different mode before the
2378   # other node reconnects; in this case, one or both of the nodes will
2379   # decide it has wrong configuration and switch to standalone
2380   RECONNECT_TIMEOUT = 2 * 60
2381   sleep_time = 0.100 # start with 100 miliseconds
2382   timeout_limit = time.time() + RECONNECT_TIMEOUT
2383   while time.time() < timeout_limit:
2384     all_connected = True
2385     for rd in bdevs:
2386       stats = rd.GetProcStatus()
2387       if not (stats.is_connected or stats.is_in_resync):
2388         all_connected = False
2389       if stats.is_standalone:
2390         # peer had different config info and this node became
2391         # standalone, even though this should not happen with the
2392         # new staged way of changing disk configs
2393         try:
2394           rd.ReAttachNet(multimaster)
2395         except errors.BlockDeviceError, err:
2396           _Fail("Can't change network configuration: %s", err)
2397     if all_connected:
2398       break
2399     time.sleep(sleep_time)
2400     sleep_time = min(5, sleep_time * 1.5)
2401   if not all_connected:
2402     return (False, "Timeout in disk reconnecting")
2403   if multimaster:
2404     # change to primary mode
2405     for rd in bdevs:
2406       try:
2407         rd.Open()
2408       except errors.BlockDeviceError, err:
2409         _Fail("Can't change to primary mode: %s", err)
2410   if multimaster:
2411     msg = "multi-master and primary"
2412   else:
2413     msg = "single-master"
2414   return (True, "Disks are now configured as %s" % msg)
2415
2416
2417 def DrbdWaitSync(nodes_ip, disks):
2418   """Wait until DRBDs have synchronized.
2419
2420   """
2421   status, bdevs = _FindDisks(nodes_ip, disks)
2422   if not status:
2423     return status, bdevs
2424
2425   min_resync = 100
2426   alldone = True
2427   failure = False
2428   for rd in bdevs:
2429     stats = rd.GetProcStatus()
2430     if not (stats.is_connected or stats.is_in_resync):
2431       failure = True
2432       break
2433     alldone = alldone and (not stats.is_in_resync)
2434     if stats.sync_percent is not None:
2435       min_resync = min(min_resync, stats.sync_percent)
2436   return (not failure, (alldone, min_resync))
2437
2438
2439 def PowercycleNode(hypervisor_type):
2440   """Hard-powercycle the node.
2441
2442   Because we need to return first, and schedule the powercycle in the
2443   background, we won't be able to report failures nicely.
2444
2445   """
2446   hyper = hypervisor.GetHypervisor(hypervisor_type)
2447   try:
2448     pid = os.fork()
2449   except OSError, err:
2450     # if we can't fork, we'll pretend that we're in the child process
2451     pid = 0
2452   if pid > 0:
2453     return (True, "Reboot scheduled in 5 seconds")
2454   time.sleep(5)
2455   hyper.PowercycleNode()
2456
2457
2458 class HooksRunner(object):
2459   """Hook runner.
2460
2461   This class is instantiated on the node side (ganeti-noded) and not
2462   on the master side.
2463
2464   """
2465   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2466
2467   def __init__(self, hooks_base_dir=None):
2468     """Constructor for hooks runner.
2469
2470     @type hooks_base_dir: str or None
2471     @param hooks_base_dir: if not None, this overrides the
2472         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2473
2474     """
2475     if hooks_base_dir is None:
2476       hooks_base_dir = constants.HOOKS_BASE_DIR
2477     self._BASE_DIR = hooks_base_dir
2478
2479   @staticmethod
2480   def ExecHook(script, env):
2481     """Exec one hook script.
2482
2483     @type script: str
2484     @param script: the full path to the script
2485     @type env: dict
2486     @param env: the environment with which to exec the script
2487     @rtype: tuple (success, message)
2488     @return: a tuple of success and message, where success
2489         indicates the succes of the operation, and message
2490         which will contain the error details in case we
2491         failed
2492
2493     """
2494     # exec the process using subprocess and log the output
2495     fdstdin = None
2496     try:
2497       fdstdin = open("/dev/null", "r")
2498       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2499                                stderr=subprocess.STDOUT, close_fds=True,
2500                                shell=False, cwd="/", env=env)
2501       output = ""
2502       try:
2503         output = child.stdout.read(4096)
2504         child.stdout.close()
2505       except EnvironmentError, err:
2506         output += "Hook script error: %s" % str(err)
2507
2508       while True:
2509         try:
2510           result = child.wait()
2511           break
2512         except EnvironmentError, err:
2513           if err.errno == errno.EINTR:
2514             continue
2515           raise
2516     finally:
2517       # try not to leak fds
2518       for fd in (fdstdin, ):
2519         if fd is not None:
2520           try:
2521             fd.close()
2522           except EnvironmentError, err:
2523             # just log the error
2524             #logging.exception("Error while closing fd %s", fd)
2525             pass
2526
2527     return result == 0, utils.SafeEncode(output.strip())
2528
2529   def RunHooks(self, hpath, phase, env):
2530     """Run the scripts in the hooks directory.
2531
2532     @type hpath: str
2533     @param hpath: the path to the hooks directory which
2534         holds the scripts
2535     @type phase: str
2536     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2537         L{constants.HOOKS_PHASE_POST}
2538     @type env: dict
2539     @param env: dictionary with the environment for the hook
2540     @rtype: list
2541     @return: list of 3-element tuples:
2542       - script path
2543       - script result, either L{constants.HKR_SUCCESS} or
2544         L{constants.HKR_FAIL}
2545       - output of the script
2546
2547     @raise errors.ProgrammerError: for invalid input
2548         parameters
2549
2550     """
2551     if phase == constants.HOOKS_PHASE_PRE:
2552       suffix = "pre"
2553     elif phase == constants.HOOKS_PHASE_POST:
2554       suffix = "post"
2555     else:
2556       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2557     rr = []
2558
2559     subdir = "%s-%s.d" % (hpath, suffix)
2560     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2561     try:
2562       dir_contents = utils.ListVisibleFiles(dir_name)
2563     except OSError, err:
2564       # FIXME: must log output in case of failures
2565       return rr
2566
2567     # we use the standard python sort order,
2568     # so 00name is the recommended naming scheme
2569     dir_contents.sort()
2570     for relname in dir_contents:
2571       fname = os.path.join(dir_name, relname)
2572       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2573           self.RE_MASK.match(relname) is not None):
2574         rrval = constants.HKR_SKIP
2575         output = ""
2576       else:
2577         result, output = self.ExecHook(fname, env)
2578         if not result:
2579           rrval = constants.HKR_FAIL
2580         else:
2581           rrval = constants.HKR_SUCCESS
2582       rr.append(("%s/%s" % (subdir, relname), rrval, output))
2583
2584     return rr
2585
2586
2587 class IAllocatorRunner(object):
2588   """IAllocator runner.
2589
2590   This class is instantiated on the node side (ganeti-noded) and not on
2591   the master side.
2592
2593   """
2594   def Run(self, name, idata):
2595     """Run an iallocator script.
2596
2597     @type name: str
2598     @param name: the iallocator script name
2599     @type idata: str
2600     @param idata: the allocator input data
2601
2602     @rtype: tuple
2603     @return: four element tuple of:
2604        - run status (one of the IARUN_ constants)
2605        - stdout
2606        - stderr
2607        - fail reason (as from L{utils.RunResult})
2608
2609     """
2610     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2611                                   os.path.isfile)
2612     if alloc_script is None:
2613       return (constants.IARUN_NOTFOUND, None, None, None)
2614
2615     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2616     try:
2617       os.write(fd, idata)
2618       os.close(fd)
2619       result = utils.RunCmd([alloc_script, fin_name])
2620       if result.failed:
2621         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2622                 result.fail_reason)
2623     finally:
2624       os.unlink(fin_name)
2625
2626     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2627
2628
2629 class DevCacheManager(object):
2630   """Simple class for managing a cache of block device information.
2631
2632   """
2633   _DEV_PREFIX = "/dev/"
2634   _ROOT_DIR = constants.BDEV_CACHE_DIR
2635
2636   @classmethod
2637   def _ConvertPath(cls, dev_path):
2638     """Converts a /dev/name path to the cache file name.
2639
2640     This replaces slashes with underscores and strips the /dev
2641     prefix. It then returns the full path to the cache file.
2642
2643     @type dev_path: str
2644     @param dev_path: the C{/dev/} path name
2645     @rtype: str
2646     @return: the converted path name
2647
2648     """
2649     if dev_path.startswith(cls._DEV_PREFIX):
2650       dev_path = dev_path[len(cls._DEV_PREFIX):]
2651     dev_path = dev_path.replace("/", "_")
2652     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2653     return fpath
2654
2655   @classmethod
2656   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2657     """Updates the cache information for a given device.
2658
2659     @type dev_path: str
2660     @param dev_path: the pathname of the device
2661     @type owner: str
2662     @param owner: the owner (instance name) of the device
2663     @type on_primary: bool
2664     @param on_primary: whether this is the primary
2665         node nor not
2666     @type iv_name: str
2667     @param iv_name: the instance-visible name of the
2668         device, as in objects.Disk.iv_name
2669
2670     @rtype: None
2671
2672     """
2673     if dev_path is None:
2674       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2675       return
2676     fpath = cls._ConvertPath(dev_path)
2677     if on_primary:
2678       state = "primary"
2679     else:
2680       state = "secondary"
2681     if iv_name is None:
2682       iv_name = "not_visible"
2683     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2684     try:
2685       utils.WriteFile(fpath, data=fdata)
2686     except EnvironmentError, err:
2687       logging.exception("Can't update bdev cache for %s", dev_path)
2688
2689   @classmethod
2690   def RemoveCache(cls, dev_path):
2691     """Remove data for a dev_path.
2692
2693     This is just a wrapper over L{utils.RemoveFile} with a converted
2694     path name and logging.
2695
2696     @type dev_path: str
2697     @param dev_path: the pathname of the device
2698
2699     @rtype: None
2700
2701     """
2702     if dev_path is None:
2703       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2704       return
2705     fpath = cls._ConvertPath(dev_path)
2706     try:
2707       utils.RemoveFile(fpath)
2708     except EnvironmentError, err:
2709       logging.exception("Can't update bdev cache for %s", dev_path)