9ebab3b720c2a2393491d0d6e3d66b7abd0a322d
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36 import zlib
37 import base64
38
39 from ganeti import errors
40 from ganeti import utils
41 from ganeti import ssh
42 from ganeti import hypervisor
43 from ganeti import constants
44 from ganeti import bdev
45 from ganeti import objects
46 from ganeti import ssconf
47
48
49 def _GetConfig():
50   """Simple wrapper to return a SimpleStore.
51
52   @rtype: L{ssconf.SimpleStore}
53   @return: a SimpleStore instance
54
55   """
56   return ssconf.SimpleStore()
57
58
59 def _GetSshRunner(cluster_name):
60   """Simple wrapper to return an SshRunner.
61
62   @type cluster_name: str
63   @param cluster_name: the cluster name, which is needed
64       by the SshRunner constructor
65   @rtype: L{ssh.SshRunner}
66   @return: an SshRunner instance
67
68   """
69   return ssh.SshRunner(cluster_name)
70
71
72 def _Decompress(data):
73   """Unpacks data compressed by the RPC client.
74
75   @type data: list or tuple
76   @param data: Data sent by RPC client
77   @rtype: str
78   @return: Decompressed data
79
80   """
81   assert isinstance(data, (list, tuple))
82   assert len(data) == 2
83   (encoding, content) = data
84   if encoding == constants.RPC_ENCODING_NONE:
85     return content
86   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
87     return zlib.decompress(base64.b64decode(content))
88   else:
89     raise AssertionError("Unknown data encoding")
90
91
92 def _CleanDirectory(path, exclude=None):
93   """Removes all regular files in a directory.
94
95   @type path: str
96   @param path: the directory to clean
97   @type exclude: list
98   @param exclude: list of files to be excluded, defaults
99       to the empty list
100
101   """
102   if not os.path.isdir(path):
103     return
104   if exclude is None:
105     exclude = []
106   else:
107     # Normalize excluded paths
108     exclude = [os.path.normpath(i) for i in exclude]
109
110   for rel_name in utils.ListVisibleFiles(path):
111     full_name = os.path.normpath(os.path.join(path, rel_name))
112     if full_name in exclude:
113       continue
114     if os.path.isfile(full_name) and not os.path.islink(full_name):
115       utils.RemoveFile(full_name)
116
117
118 def JobQueuePurge():
119   """Removes job queue files and archived jobs.
120
121   @rtype: None
122
123   """
124   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
125   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
126
127
128 def GetMasterInfo():
129   """Returns master information.
130
131   This is an utility function to compute master information, either
132   for consumption here or from the node daemon.
133
134   @rtype: tuple
135   @return: (master_netdev, master_ip, master_name) if we have a good
136       configuration, otherwise (None, None, None)
137
138   """
139   try:
140     cfg = _GetConfig()
141     master_netdev = cfg.GetMasterNetdev()
142     master_ip = cfg.GetMasterIP()
143     master_node = cfg.GetMasterNode()
144   except errors.ConfigurationError, err:
145     logging.exception("Cluster configuration incomplete")
146     return (None, None, None)
147   return (master_netdev, master_ip, master_node)
148
149
150 def StartMaster(start_daemons):
151   """Activate local node as master node.
152
153   The function will always try activate the IP address of the master
154   (unless someone else has it). It will also start the master daemons,
155   based on the start_daemons parameter.
156
157   @type start_daemons: boolean
158   @param start_daemons: whther to also start the master
159       daemons (ganeti-masterd and ganeti-rapi)
160   @rtype: None
161
162   """
163   ok = True
164   master_netdev, master_ip, _ = GetMasterInfo()
165   if not master_netdev:
166     return False
167
168   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
169     if utils.OwnIpAddress(master_ip):
170       # we already have the ip:
171       logging.debug("Already started")
172     else:
173       logging.error("Someone else has the master ip, not activating")
174       ok = False
175   else:
176     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
177                            "dev", master_netdev, "label",
178                            "%s:0" % master_netdev])
179     if result.failed:
180       logging.error("Can't activate master IP: %s", result.output)
181       ok = False
182
183     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
184                            "-s", master_ip, master_ip])
185     # we'll ignore the exit code of arping
186
187   # and now start the master and rapi daemons
188   if start_daemons:
189     for daemon in 'ganeti-masterd', 'ganeti-rapi':
190       result = utils.RunCmd([daemon])
191       if result.failed:
192         logging.error("Can't start daemon %s: %s", daemon, result.output)
193         ok = False
194   return ok
195
196
197 def StopMaster(stop_daemons):
198   """Deactivate this node as master.
199
200   The function will always try to deactivate the IP address of the
201   master. It will also stop the master daemons depending on the
202   stop_daemons parameter.
203
204   @type stop_daemons: boolean
205   @param stop_daemons: whether to also stop the master daemons
206       (ganeti-masterd and ganeti-rapi)
207   @rtype: None
208
209   """
210   master_netdev, master_ip, _ = GetMasterInfo()
211   if not master_netdev:
212     return False
213
214   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
215                          "dev", master_netdev])
216   if result.failed:
217     logging.error("Can't remove the master IP, error: %s", result.output)
218     # but otherwise ignore the failure
219
220   if stop_daemons:
221     # stop/kill the rapi and the master daemon
222     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
223       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
224
225   return True
226
227
228 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
229   """Joins this node to the cluster.
230
231   This does the following:
232       - updates the hostkeys of the machine (rsa and dsa)
233       - adds the ssh private key to the user
234       - adds the ssh public key to the users' authorized_keys file
235
236   @type dsa: str
237   @param dsa: the DSA private key to write
238   @type dsapub: str
239   @param dsapub: the DSA public key to write
240   @type rsa: str
241   @param rsa: the RSA private key to write
242   @type rsapub: str
243   @param rsapub: the RSA public key to write
244   @type sshkey: str
245   @param sshkey: the SSH private key to write
246   @type sshpub: str
247   @param sshpub: the SSH public key to write
248   @rtype: boolean
249   @return: the success of the operation
250
251   """
252   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
253                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
254                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
255                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
256   for name, content, mode in sshd_keys:
257     utils.WriteFile(name, data=content, mode=mode)
258
259   try:
260     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
261                                                     mkdir=True)
262   except errors.OpExecError, err:
263     msg = "Error while processing user ssh files"
264     logging.exception(msg)
265     return (False, "%s: %s" % (msg, err))
266
267   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
268     utils.WriteFile(name, data=content, mode=0600)
269
270   utils.AddAuthorizedKey(auth_keys, sshpub)
271
272   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
273
274   return (True, "Node added successfully")
275
276
277 def LeaveCluster():
278   """Cleans up and remove the current node.
279
280   This function cleans up and prepares the current node to be removed
281   from the cluster.
282
283   If processing is successful, then it raises an
284   L{errors.QuitGanetiException} which is used as a special case to
285   shutdown the node daemon.
286
287   """
288   _CleanDirectory(constants.DATA_DIR)
289   JobQueuePurge()
290
291   try:
292     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
293   except errors.OpExecError:
294     logging.exception("Error while processing ssh files")
295     return
296
297   f = open(pub_key, 'r')
298   try:
299     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
300   finally:
301     f.close()
302
303   utils.RemoveFile(priv_key)
304   utils.RemoveFile(pub_key)
305
306   # Return a reassuring string to the caller, and quit
307   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
308
309
310 def GetNodeInfo(vgname, hypervisor_type):
311   """Gives back a hash with different informations about the node.
312
313   @type vgname: C{string}
314   @param vgname: the name of the volume group to ask for disk space information
315   @type hypervisor_type: C{str}
316   @param hypervisor_type: the name of the hypervisor to ask for
317       memory information
318   @rtype: C{dict}
319   @return: dictionary with the following keys:
320       - vg_size is the size of the configured volume group in MiB
321       - vg_free is the free size of the volume group in MiB
322       - memory_dom0 is the memory allocated for domain0 in MiB
323       - memory_free is the currently available (free) ram in MiB
324       - memory_total is the total number of ram in MiB
325
326   """
327   outputarray = {}
328   vginfo = _GetVGInfo(vgname)
329   outputarray['vg_size'] = vginfo['vg_size']
330   outputarray['vg_free'] = vginfo['vg_free']
331
332   hyper = hypervisor.GetHypervisor(hypervisor_type)
333   hyp_info = hyper.GetNodeInfo()
334   if hyp_info is not None:
335     outputarray.update(hyp_info)
336
337   f = open("/proc/sys/kernel/random/boot_id", 'r')
338   try:
339     outputarray["bootid"] = f.read(128).rstrip("\n")
340   finally:
341     f.close()
342
343   return outputarray
344
345
346 def VerifyNode(what, cluster_name):
347   """Verify the status of the local node.
348
349   Based on the input L{what} parameter, various checks are done on the
350   local node.
351
352   If the I{filelist} key is present, this list of
353   files is checksummed and the file/checksum pairs are returned.
354
355   If the I{nodelist} key is present, we check that we have
356   connectivity via ssh with the target nodes (and check the hostname
357   report).
358
359   If the I{node-net-test} key is present, we check that we have
360   connectivity to the given nodes via both primary IP and, if
361   applicable, secondary IPs.
362
363   @type what: C{dict}
364   @param what: a dictionary of things to check:
365       - filelist: list of files for which to compute checksums
366       - nodelist: list of nodes we should check ssh communication with
367       - node-net-test: list of nodes we should check node daemon port
368         connectivity with
369       - hypervisor: list with hypervisors to run the verify for
370   @rtype: dict
371   @return: a dictionary with the same keys as the input dict, and
372       values representing the result of the checks
373
374   """
375   result = {}
376
377   if constants.NV_HYPERVISOR in what:
378     result[constants.NV_HYPERVISOR] = tmp = {}
379     for hv_name in what[constants.NV_HYPERVISOR]:
380       tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
381
382   if constants.NV_FILELIST in what:
383     result[constants.NV_FILELIST] = utils.FingerprintFiles(
384       what[constants.NV_FILELIST])
385
386   if constants.NV_NODELIST in what:
387     result[constants.NV_NODELIST] = tmp = {}
388     random.shuffle(what[constants.NV_NODELIST])
389     for node in what[constants.NV_NODELIST]:
390       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
391       if not success:
392         tmp[node] = message
393
394   if constants.NV_NODENETTEST in what:
395     result[constants.NV_NODENETTEST] = tmp = {}
396     my_name = utils.HostInfo().name
397     my_pip = my_sip = None
398     for name, pip, sip in what[constants.NV_NODENETTEST]:
399       if name == my_name:
400         my_pip = pip
401         my_sip = sip
402         break
403     if not my_pip:
404       tmp[my_name] = ("Can't find my own primary/secondary IP"
405                       " in the node list")
406     else:
407       port = utils.GetNodeDaemonPort()
408       for name, pip, sip in what[constants.NV_NODENETTEST]:
409         fail = []
410         if not utils.TcpPing(pip, port, source=my_pip):
411           fail.append("primary")
412         if sip != pip:
413           if not utils.TcpPing(sip, port, source=my_sip):
414             fail.append("secondary")
415         if fail:
416           tmp[name] = ("failure using the %s interface(s)" %
417                        " and ".join(fail))
418
419   if constants.NV_LVLIST in what:
420     result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
421
422   if constants.NV_INSTANCELIST in what:
423     result[constants.NV_INSTANCELIST] = GetInstanceList(
424       what[constants.NV_INSTANCELIST])
425
426   if constants.NV_VGLIST in what:
427     result[constants.NV_VGLIST] = ListVolumeGroups()
428
429   if constants.NV_VERSION in what:
430     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
431                                     constants.RELEASE_VERSION)
432
433   if constants.NV_HVINFO in what:
434     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
435     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
436
437   if constants.NV_DRBDLIST in what:
438     try:
439       used_minors = bdev.DRBD8.GetUsedDevs().keys()
440     except errors.BlockDeviceError:
441       logging.warning("Can't get used minors list", exc_info=True)
442       used_minors = []
443     result[constants.NV_DRBDLIST] = used_minors
444
445   return result
446
447
448 def GetVolumeList(vg_name):
449   """Compute list of logical volumes and their size.
450
451   @type vg_name: str
452   @param vg_name: the volume group whose LVs we should list
453   @rtype: dict
454   @return:
455       dictionary of all partions (key) with value being a tuple of
456       their size (in MiB), inactive and online status::
457
458         {'test1': ('20.06', True, True)}
459
460       in case of errors, a string is returned with the error
461       details.
462
463   """
464   lvs = {}
465   sep = '|'
466   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
467                          "--separator=%s" % sep,
468                          "-olv_name,lv_size,lv_attr", vg_name])
469   if result.failed:
470     logging.error("Failed to list logical volumes, lvs output: %s",
471                   result.output)
472     return result.output
473
474   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
475   for line in result.stdout.splitlines():
476     line = line.strip()
477     match = valid_line_re.match(line)
478     if not match:
479       logging.error("Invalid line returned from lvs output: '%s'", line)
480       continue
481     name, size, attr = match.groups()
482     inactive = attr[4] == '-'
483     online = attr[5] == 'o'
484     lvs[name] = (size, inactive, online)
485
486   return lvs
487
488
489 def ListVolumeGroups():
490   """List the volume groups and their size.
491
492   @rtype: dict
493   @return: dictionary with keys volume name and values the
494       size of the volume
495
496   """
497   return utils.ListVolumeGroups()
498
499
500 def NodeVolumes():
501   """List all volumes on this node.
502
503   @rtype: list
504   @return:
505     A list of dictionaries, each having four keys:
506       - name: the logical volume name,
507       - size: the size of the logical volume
508       - dev: the physical device on which the LV lives
509       - vg: the volume group to which it belongs
510
511     In case of errors, we return an empty list and log the
512     error.
513
514     Note that since a logical volume can live on multiple physical
515     volumes, the resulting list might include a logical volume
516     multiple times.
517
518   """
519   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
520                          "--separator=|",
521                          "--options=lv_name,lv_size,devices,vg_name"])
522   if result.failed:
523     logging.error("Failed to list logical volumes, lvs output: %s",
524                   result.output)
525     return []
526
527   def parse_dev(dev):
528     if '(' in dev:
529       return dev.split('(')[0]
530     else:
531       return dev
532
533   def map_line(line):
534     return {
535       'name': line[0].strip(),
536       'size': line[1].strip(),
537       'dev': parse_dev(line[2].strip()),
538       'vg': line[3].strip(),
539     }
540
541   return [map_line(line.split('|')) for line in result.stdout.splitlines()
542           if line.count('|') >= 3]
543
544
545 def BridgesExist(bridges_list):
546   """Check if a list of bridges exist on the current node.
547
548   @rtype: boolean
549   @return: C{True} if all of them exist, C{False} otherwise
550
551   """
552   for bridge in bridges_list:
553     if not utils.BridgeExists(bridge):
554       return False
555
556   return True
557
558
559 def GetInstanceList(hypervisor_list):
560   """Provides a list of instances.
561
562   @type hypervisor_list: list
563   @param hypervisor_list: the list of hypervisors to query information
564
565   @rtype: list
566   @return: a list of all running instances on the current node
567     - instance1.example.com
568     - instance2.example.com
569
570   """
571   results = []
572   for hname in hypervisor_list:
573     try:
574       names = hypervisor.GetHypervisor(hname).ListInstances()
575       results.extend(names)
576     except errors.HypervisorError, err:
577       logging.exception("Error enumerating instances for hypevisor %s", hname)
578       # FIXME: should we somehow not propagate this to the master?
579       raise
580
581   return results
582
583
584 def GetInstanceInfo(instance, hname):
585   """Gives back the informations about an instance as a dictionary.
586
587   @type instance: string
588   @param instance: the instance name
589   @type hname: string
590   @param hname: the hypervisor type of the instance
591
592   @rtype: dict
593   @return: dictionary with the following keys:
594       - memory: memory size of instance (int)
595       - state: xen state of instance (string)
596       - time: cpu time of instance (float)
597
598   """
599   output = {}
600
601   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
602   if iinfo is not None:
603     output['memory'] = iinfo[2]
604     output['state'] = iinfo[4]
605     output['time'] = iinfo[5]
606
607   return output
608
609
610 def GetInstanceMigratable(instance):
611   """Gives whether an instance can be migrated.
612
613   @type instance: L{objects.Instance}
614   @param instance: object representing the instance to be checked.
615
616   @rtype: tuple
617   @return: tuple of (result, description) where:
618       - result: whether the instance can be migrated or not
619       - description: a description of the issue, if relevant
620
621   """
622   hyper = hypervisor.GetHypervisor(instance.hypervisor)
623   if instance.name not in hyper.ListInstances():
624     return (False, 'not running')
625
626   for idx in range(len(instance.disks)):
627     link_name = _GetBlockDevSymlinkPath(instance.name, idx)
628     if not os.path.islink(link_name):
629       return (False, 'not restarted since ganeti 1.2.5')
630
631   return (True, '')
632
633
634 def GetAllInstancesInfo(hypervisor_list):
635   """Gather data about all instances.
636
637   This is the equivalent of L{GetInstanceInfo}, except that it
638   computes data for all instances at once, thus being faster if one
639   needs data about more than one instance.
640
641   @type hypervisor_list: list
642   @param hypervisor_list: list of hypervisors to query for instance data
643
644   @rtype: dict
645   @return: dictionary of instance: data, with data having the following keys:
646       - memory: memory size of instance (int)
647       - state: xen state of instance (string)
648       - time: cpu time of instance (float)
649       - vcpus: the number of vcpus
650
651   """
652   output = {}
653
654   for hname in hypervisor_list:
655     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
656     if iinfo:
657       for name, inst_id, memory, vcpus, state, times in iinfo:
658         value = {
659           'memory': memory,
660           'vcpus': vcpus,
661           'state': state,
662           'time': times,
663           }
664         if name in output and output[name] != value:
665           raise errors.HypervisorError("Instance %s running duplicate"
666                                        " with different parameters" % name)
667         output[name] = value
668
669   return output
670
671
672 def InstanceOsAdd(instance):
673   """Add an OS to an instance.
674
675   @type instance: L{objects.Instance}
676   @param instance: Instance whose OS is to be installed
677   @rtype: boolean
678   @return: the success of the operation
679
680   """
681   try:
682     inst_os = OSFromDisk(instance.os)
683   except errors.InvalidOS, err:
684     os_name, os_dir, os_err = err.args
685     if os_dir is None:
686       return (False, "Can't find OS '%s': %s" % (os_name, os_err))
687     else:
688       return (False, "Error parsing OS '%s' in directory %s: %s" %
689               (os_name, os_dir, os_err))
690
691   create_env = OSEnvironment(instance)
692
693   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
694                                      instance.name, int(time.time()))
695
696   result = utils.RunCmd([inst_os.create_script], env=create_env,
697                         cwd=inst_os.path, output=logfile,)
698   if result.failed:
699     logging.error("os create command '%s' returned error: %s, logfile: %s,"
700                   " output: %s", result.cmd, result.fail_reason, logfile,
701                   result.output)
702     lines = [utils.SafeEncode(val)
703              for val in utils.TailFile(logfile, lines=20)]
704     return (False, "OS create script failed (%s), last lines in the"
705             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
706
707   return (True, "Successfully installed")
708
709
710 def RunRenameInstance(instance, old_name):
711   """Run the OS rename script for an instance.
712
713   @type instance: L{objects.Instance}
714   @param instance: Instance whose OS is to be installed
715   @type old_name: string
716   @param old_name: previous instance name
717   @rtype: boolean
718   @return: the success of the operation
719
720   """
721   inst_os = OSFromDisk(instance.os)
722
723   rename_env = OSEnvironment(instance)
724   rename_env['OLD_INSTANCE_NAME'] = old_name
725
726   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
727                                            old_name,
728                                            instance.name, int(time.time()))
729
730   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
731                         cwd=inst_os.path, output=logfile)
732
733   if result.failed:
734     logging.error("os create command '%s' returned error: %s output: %s",
735                   result.cmd, result.fail_reason, result.output)
736     lines = [utils.SafeEncode(val)
737              for val in utils.TailFile(logfile, lines=20)]
738     return (False, "OS rename script failed (%s), last lines in the"
739             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
740
741   return (True, "Rename successful")
742
743
744 def _GetVGInfo(vg_name):
745   """Get informations about the volume group.
746
747   @type vg_name: str
748   @param vg_name: the volume group which we query
749   @rtype: dict
750   @return:
751     A dictionary with the following keys:
752       - C{vg_size} is the total size of the volume group in MiB
753       - C{vg_free} is the free size of the volume group in MiB
754       - C{pv_count} are the number of physical disks in that VG
755
756     If an error occurs during gathering of data, we return the same dict
757     with keys all set to None.
758
759   """
760   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
761
762   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
763                          "--nosuffix", "--units=m", "--separator=:", vg_name])
764
765   if retval.failed:
766     logging.error("volume group %s not present", vg_name)
767     return retdic
768   valarr = retval.stdout.strip().rstrip(':').split(':')
769   if len(valarr) == 3:
770     try:
771       retdic = {
772         "vg_size": int(round(float(valarr[0]), 0)),
773         "vg_free": int(round(float(valarr[1]), 0)),
774         "pv_count": int(valarr[2]),
775         }
776     except ValueError, err:
777       logging.exception("Fail to parse vgs output")
778   else:
779     logging.error("vgs output has the wrong number of fields (expected"
780                   " three): %s", str(valarr))
781   return retdic
782
783
784 def _GetBlockDevSymlinkPath(instance_name, idx):
785   return os.path.join(constants.DISK_LINKS_DIR,
786                       "%s:%d" % (instance_name, idx))
787
788
789 def _SymlinkBlockDev(instance_name, device_path, idx):
790   """Set up symlinks to a instance's block device.
791
792   This is an auxiliary function run when an instance is start (on the primary
793   node) or when an instance is migrated (on the target node).
794
795
796   @param instance_name: the name of the target instance
797   @param device_path: path of the physical block device, on the node
798   @param idx: the disk index
799   @return: absolute path to the disk's symlink
800
801   """
802   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
803   try:
804     os.symlink(device_path, link_name)
805   except OSError, err:
806     if err.errno == errno.EEXIST:
807       if (not os.path.islink(link_name) or
808           os.readlink(link_name) != device_path):
809         os.remove(link_name)
810         os.symlink(device_path, link_name)
811     else:
812       raise
813
814   return link_name
815
816
817 def _RemoveBlockDevLinks(instance_name, disks):
818   """Remove the block device symlinks belonging to the given instance.
819
820   """
821   for idx, disk in enumerate(disks):
822     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
823     if os.path.islink(link_name):
824       try:
825         os.remove(link_name)
826       except OSError:
827         logging.exception("Can't remove symlink '%s'", link_name)
828
829
830 def _GatherAndLinkBlockDevs(instance):
831   """Set up an instance's block device(s).
832
833   This is run on the primary node at instance startup. The block
834   devices must be already assembled.
835
836   @type instance: L{objects.Instance}
837   @param instance: the instance whose disks we shoul assemble
838   @rtype: list
839   @return: list of (disk_object, device_path)
840
841   """
842   block_devices = []
843   for idx, disk in enumerate(instance.disks):
844     device = _RecursiveFindBD(disk)
845     if device is None:
846       raise errors.BlockDeviceError("Block device '%s' is not set up." %
847                                     str(disk))
848     device.Open()
849     try:
850       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
851     except OSError, e:
852       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
853                                     e.strerror)
854
855     block_devices.append((disk, link_name))
856
857   return block_devices
858
859
860 def StartInstance(instance, extra_args):
861   """Start an instance.
862
863   @type instance: L{objects.Instance}
864   @param instance: the instance object
865   @rtype: boolean
866   @return: whether the startup was successful or not
867
868   """
869   running_instances = GetInstanceList([instance.hypervisor])
870
871   if instance.name in running_instances:
872     return (True, "Already running")
873
874   try:
875     block_devices = _GatherAndLinkBlockDevs(instance)
876     hyper = hypervisor.GetHypervisor(instance.hypervisor)
877     hyper.StartInstance(instance, block_devices, extra_args)
878   except errors.BlockDeviceError, err:
879     logging.exception("Failed to start instance")
880     return (False, "Block device error: %s" % str(err))
881   except errors.HypervisorError, err:
882     logging.exception("Failed to start instance")
883     _RemoveBlockDevLinks(instance.name, instance.disks)
884     return (False, "Hypervisor error: %s" % str(err))
885
886   return (True, "Instance started successfully")
887
888
889 def ShutdownInstance(instance):
890   """Shut an instance down.
891
892   @note: this functions uses polling with a hardcoded timeout.
893
894   @type instance: L{objects.Instance}
895   @param instance: the instance object
896   @rtype: boolean
897   @return: whether the startup was successful or not
898
899   """
900   hv_name = instance.hypervisor
901   running_instances = GetInstanceList([hv_name])
902
903   if instance.name not in running_instances:
904     return True
905
906   hyper = hypervisor.GetHypervisor(hv_name)
907   try:
908     hyper.StopInstance(instance)
909   except errors.HypervisorError, err:
910     logging.error("Failed to stop instance: %s" % err)
911     return False
912
913   # test every 10secs for 2min
914
915   time.sleep(1)
916   for dummy in range(11):
917     if instance.name not in GetInstanceList([hv_name]):
918       break
919     time.sleep(10)
920   else:
921     # the shutdown did not succeed
922     logging.error("Shutdown of '%s' unsuccessful, using destroy",
923                   instance.name)
924
925     try:
926       hyper.StopInstance(instance, force=True)
927     except errors.HypervisorError, err:
928       logging.exception("Failed to stop instance: %s" % err)
929       return False
930
931     time.sleep(1)
932     if instance.name in GetInstanceList([hv_name]):
933       logging.error("Could not shutdown instance '%s' even by destroy",
934                     instance.name)
935       return False
936
937   _RemoveBlockDevLinks(instance.name, instance.disks)
938
939   return True
940
941
942 def RebootInstance(instance, reboot_type, extra_args):
943   """Reboot an instance.
944
945   @type instance: L{objects.Instance}
946   @param instance: the instance object to reboot
947   @type reboot_type: str
948   @param reboot_type: the type of reboot, one the following
949     constants:
950       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
951         instance OS, do not recreate the VM
952       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
953         restart the VM (at the hypervisor level)
954       - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
955         is not accepted here, since that mode is handled
956         differently
957   @rtype: boolean
958   @return: the success of the operation
959
960   """
961   running_instances = GetInstanceList([instance.hypervisor])
962
963   if instance.name not in running_instances:
964     logging.error("Cannot reboot instance that is not running")
965     return False
966
967   hyper = hypervisor.GetHypervisor(instance.hypervisor)
968   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
969     try:
970       hyper.RebootInstance(instance)
971     except errors.HypervisorError, err:
972       logging.exception("Failed to soft reboot instance")
973       return False
974   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
975     try:
976       ShutdownInstance(instance)
977       StartInstance(instance, extra_args)
978     except errors.HypervisorError, err:
979       logging.exception("Failed to hard reboot instance")
980       return False
981   else:
982     raise errors.ParameterError("reboot_type invalid")
983
984   return True
985
986
987 def MigrationInfo(instance):
988   """Gather information about an instance to be migrated.
989
990   @type instance: L{objects.Instance}
991   @param instance: the instance definition
992
993   """
994   hyper = hypervisor.GetHypervisor(instance.hypervisor)
995   try:
996     info = hyper.MigrationInfo(instance)
997   except errors.HypervisorError, err:
998     msg = "Failed to fetch migration information"
999     logging.exception(msg)
1000     return (False, '%s: %s' % (msg, err))
1001   return (True, info)
1002
1003
1004 def AcceptInstance(instance, info, target):
1005   """Prepare the node to accept an instance.
1006
1007   @type instance: L{objects.Instance}
1008   @param instance: the instance definition
1009   @type info: string/data (opaque)
1010   @param info: migration information, from the source node
1011   @type target: string
1012   @param target: target host (usually ip), on this node
1013
1014   """
1015   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1016   try:
1017     hyper.AcceptInstance(instance, info, target)
1018   except errors.HypervisorError, err:
1019     msg = "Failed to accept instance"
1020     logging.exception(msg)
1021     return (False, '%s: %s' % (msg, err))
1022   return (True, "Accept successfull")
1023
1024
1025 def FinalizeMigration(instance, info, success):
1026   """Finalize any preparation to accept an instance.
1027
1028   @type instance: L{objects.Instance}
1029   @param instance: the instance definition
1030   @type info: string/data (opaque)
1031   @param info: migration information, from the source node
1032   @type success: boolean
1033   @param success: whether the migration was a success or a failure
1034
1035   """
1036   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1037   try:
1038     hyper.FinalizeMigration(instance, info, success)
1039   except errors.HypervisorError, err:
1040     msg = "Failed to finalize migration"
1041     logging.exception(msg)
1042     return (False, '%s: %s' % (msg, err))
1043   return (True, "Migration Finalized")
1044
1045
1046 def MigrateInstance(instance, target, live):
1047   """Migrates an instance to another node.
1048
1049   @type instance: L{objects.Instance}
1050   @param instance: the instance definition
1051   @type target: string
1052   @param target: the target node name
1053   @type live: boolean
1054   @param live: whether the migration should be done live or not (the
1055       interpretation of this parameter is left to the hypervisor)
1056   @rtype: tuple
1057   @return: a tuple of (success, msg) where:
1058       - succes is a boolean denoting the success/failure of the operation
1059       - msg is a string with details in case of failure
1060
1061   """
1062   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1063
1064   try:
1065     hyper.MigrateInstance(instance.name, target, live)
1066   except errors.HypervisorError, err:
1067     msg = "Failed to migrate instance"
1068     logging.exception(msg)
1069     return (False, "%s: %s" % (msg, err))
1070   return (True, "Migration successfull")
1071
1072
1073 def BlockdevCreate(disk, size, owner, on_primary, info):
1074   """Creates a block device for an instance.
1075
1076   @type disk: L{objects.Disk}
1077   @param disk: the object describing the disk we should create
1078   @type size: int
1079   @param size: the size of the physical underlying device, in MiB
1080   @type owner: str
1081   @param owner: the name of the instance for which disk is created,
1082       used for device cache data
1083   @type on_primary: boolean
1084   @param on_primary:  indicates if it is the primary node or not
1085   @type info: string
1086   @param info: string that will be sent to the physical device
1087       creation, used for example to set (LVM) tags on LVs
1088
1089   @return: the new unique_id of the device (this can sometime be
1090       computed only after creation), or None. On secondary nodes,
1091       it's not required to return anything.
1092
1093   """
1094   clist = []
1095   if disk.children:
1096     for child in disk.children:
1097       try:
1098         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1099       except errors.BlockDeviceError, err:
1100         errmsg = "Can't assemble device %s: %s" % (child, err)
1101         logging.error(errmsg)
1102         return False, errmsg
1103       if on_primary or disk.AssembleOnSecondary():
1104         # we need the children open in case the device itself has to
1105         # be assembled
1106         try:
1107           crdev.Open()
1108         except errors.BlockDeviceError, err:
1109           errmsg = "Can't make child '%s' read-write: %s" % (child, err)
1110           logging.error(errmsg)
1111           return False, errmsg
1112       clist.append(crdev)
1113
1114   try:
1115     device = bdev.Create(disk.dev_type, disk.physical_id, clist, size)
1116   except errors.BlockDeviceError, err:
1117     return False, "Can't create block device: %s" % str(err)
1118
1119   if on_primary or disk.AssembleOnSecondary():
1120     try:
1121       device.Assemble()
1122     except errors.BlockDeviceError, err:
1123       errmsg = ("Can't assemble device after creation, very"
1124                 " unusual event: %s" % str(err))
1125       logging.error(errmsg)
1126       return False, errmsg
1127     device.SetSyncSpeed(constants.SYNC_SPEED)
1128     if on_primary or disk.OpenOnSecondary():
1129       try:
1130         device.Open(force=True)
1131       except errors.BlockDeviceError, err:
1132         errmsg = ("Can't make device r/w after creation, very"
1133                   " unusual event: %s" % str(err))
1134         logging.error(errmsg)
1135         return False, errmsg
1136     DevCacheManager.UpdateCache(device.dev_path, owner,
1137                                 on_primary, disk.iv_name)
1138
1139   device.SetInfo(info)
1140
1141   physical_id = device.unique_id
1142   return True, physical_id
1143
1144
1145 def BlockdevRemove(disk):
1146   """Remove a block device.
1147
1148   @note: This is intended to be called recursively.
1149
1150   @type disk: L{objects.Disk}
1151   @param disk: the disk object we should remove
1152   @rtype: boolean
1153   @return: the success of the operation
1154
1155   """
1156   msgs = []
1157   result = True
1158   try:
1159     rdev = _RecursiveFindBD(disk)
1160   except errors.BlockDeviceError, err:
1161     # probably can't attach
1162     logging.info("Can't attach to device %s in remove", disk)
1163     rdev = None
1164   if rdev is not None:
1165     r_path = rdev.dev_path
1166     try:
1167       rdev.Remove()
1168     except errors.BlockDeviceError, err:
1169       msgs.append(str(err))
1170       result = False
1171     if result:
1172       DevCacheManager.RemoveCache(r_path)
1173
1174   if disk.children:
1175     for child in disk.children:
1176       c_status, c_msg = BlockdevRemove(child)
1177       result = result and c_status
1178       if c_msg: # not an empty message
1179         msgs.append(c_msg)
1180
1181   return (result, "; ".join(msgs))
1182
1183
1184 def _RecursiveAssembleBD(disk, owner, as_primary):
1185   """Activate a block device for an instance.
1186
1187   This is run on the primary and secondary nodes for an instance.
1188
1189   @note: this function is called recursively.
1190
1191   @type disk: L{objects.Disk}
1192   @param disk: the disk we try to assemble
1193   @type owner: str
1194   @param owner: the name of the instance which owns the disk
1195   @type as_primary: boolean
1196   @param as_primary: if we should make the block device
1197       read/write
1198
1199   @return: the assembled device or None (in case no device
1200       was assembled)
1201   @raise errors.BlockDeviceError: in case there is an error
1202       during the activation of the children or the device
1203       itself
1204
1205   """
1206   children = []
1207   if disk.children:
1208     mcn = disk.ChildrenNeeded()
1209     if mcn == -1:
1210       mcn = 0 # max number of Nones allowed
1211     else:
1212       mcn = len(disk.children) - mcn # max number of Nones
1213     for chld_disk in disk.children:
1214       try:
1215         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1216       except errors.BlockDeviceError, err:
1217         if children.count(None) >= mcn:
1218           raise
1219         cdev = None
1220         logging.error("Error in child activation (but continuing): %s",
1221                       str(err))
1222       children.append(cdev)
1223
1224   if as_primary or disk.AssembleOnSecondary():
1225     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children)
1226     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1227     result = r_dev
1228     if as_primary or disk.OpenOnSecondary():
1229       r_dev.Open()
1230     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1231                                 as_primary, disk.iv_name)
1232
1233   else:
1234     result = True
1235   return result
1236
1237
1238 def BlockdevAssemble(disk, owner, as_primary):
1239   """Activate a block device for an instance.
1240
1241   This is a wrapper over _RecursiveAssembleBD.
1242
1243   @rtype: str or boolean
1244   @return: a C{/dev/...} path for primary nodes, and
1245       C{True} for secondary nodes
1246
1247   """
1248   status = True
1249   result = "no error information"
1250   try:
1251     result = _RecursiveAssembleBD(disk, owner, as_primary)
1252     if isinstance(result, bdev.BlockDev):
1253       result = result.dev_path
1254   except errors.BlockDeviceError, err:
1255     result = "Error while assembling disk: %s" % str(err)
1256     status = False
1257   return (status, result)
1258
1259
1260 def BlockdevShutdown(disk):
1261   """Shut down a block device.
1262
1263   First, if the device is assembled (Attach() is successfull), then
1264   the device is shutdown. Then the children of the device are
1265   shutdown.
1266
1267   This function is called recursively. Note that we don't cache the
1268   children or such, as oppossed to assemble, shutdown of different
1269   devices doesn't require that the upper device was active.
1270
1271   @type disk: L{objects.Disk}
1272   @param disk: the description of the disk we should
1273       shutdown
1274   @rtype: boolean
1275   @return: the success of the operation
1276
1277   """
1278   msgs = []
1279   result = True
1280   r_dev = _RecursiveFindBD(disk)
1281   if r_dev is not None:
1282     r_path = r_dev.dev_path
1283     try:
1284       r_dev.Shutdown()
1285       DevCacheManager.RemoveCache(r_path)
1286     except errors.BlockDeviceError, err:
1287       msgs.append(str(err))
1288       result = False
1289
1290   if disk.children:
1291     for child in disk.children:
1292       c_status, c_msg = BlockdevShutdown(child)
1293       result = result and c_status
1294       if c_msg: # not an empty message
1295         msgs.append(c_msg)
1296
1297   return (result, "; ".join(msgs))
1298
1299
1300 def BlockdevAddchildren(parent_cdev, new_cdevs):
1301   """Extend a mirrored block device.
1302
1303   @type parent_cdev: L{objects.Disk}
1304   @param parent_cdev: the disk to which we should add children
1305   @type new_cdevs: list of L{objects.Disk}
1306   @param new_cdevs: the list of children which we should add
1307   @rtype: boolean
1308   @return: the success of the operation
1309
1310   """
1311   parent_bdev = _RecursiveFindBD(parent_cdev)
1312   if parent_bdev is None:
1313     logging.error("Can't find parent device")
1314     return False
1315   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1316   if new_bdevs.count(None) > 0:
1317     logging.error("Can't find new device(s) to add: %s:%s",
1318                   new_bdevs, new_cdevs)
1319     return False
1320   parent_bdev.AddChildren(new_bdevs)
1321   return True
1322
1323
1324 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1325   """Shrink a mirrored block device.
1326
1327   @type parent_cdev: L{objects.Disk}
1328   @param parent_cdev: the disk from which we should remove children
1329   @type new_cdevs: list of L{objects.Disk}
1330   @param new_cdevs: the list of children which we should remove
1331   @rtype: boolean
1332   @return: the success of the operation
1333
1334   """
1335   parent_bdev = _RecursiveFindBD(parent_cdev)
1336   if parent_bdev is None:
1337     logging.error("Can't find parent in remove children: %s", parent_cdev)
1338     return False
1339   devs = []
1340   for disk in new_cdevs:
1341     rpath = disk.StaticDevPath()
1342     if rpath is None:
1343       bd = _RecursiveFindBD(disk)
1344       if bd is None:
1345         logging.error("Can't find dynamic device %s while removing children",
1346                       disk)
1347         return False
1348       else:
1349         devs.append(bd.dev_path)
1350     else:
1351       devs.append(rpath)
1352   parent_bdev.RemoveChildren(devs)
1353   return True
1354
1355
1356 def BlockdevGetmirrorstatus(disks):
1357   """Get the mirroring status of a list of devices.
1358
1359   @type disks: list of L{objects.Disk}
1360   @param disks: the list of disks which we should query
1361   @rtype: disk
1362   @return:
1363       a list of (mirror_done, estimated_time) tuples, which
1364       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1365   @raise errors.BlockDeviceError: if any of the disks cannot be
1366       found
1367
1368   """
1369   stats = []
1370   for dsk in disks:
1371     rbd = _RecursiveFindBD(dsk)
1372     if rbd is None:
1373       raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1374     stats.append(rbd.CombinedSyncStatus())
1375   return stats
1376
1377
1378 def _RecursiveFindBD(disk):
1379   """Check if a device is activated.
1380
1381   If so, return informations about the real device.
1382
1383   @type disk: L{objects.Disk}
1384   @param disk: the disk object we need to find
1385
1386   @return: None if the device can't be found,
1387       otherwise the device instance
1388
1389   """
1390   children = []
1391   if disk.children:
1392     for chdisk in disk.children:
1393       children.append(_RecursiveFindBD(chdisk))
1394
1395   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1396
1397
1398 def BlockdevFind(disk):
1399   """Check if a device is activated.
1400
1401   If it is, return informations about the real device.
1402
1403   @type disk: L{objects.Disk}
1404   @param disk: the disk to find
1405   @rtype: None or tuple
1406   @return: None if the disk cannot be found, otherwise a
1407       tuple (device_path, major, minor, sync_percent,
1408       estimated_time, is_degraded)
1409
1410   """
1411   try:
1412     rbd = _RecursiveFindBD(disk)
1413   except errors.BlockDeviceError, err:
1414     return (False, str(err))
1415   if rbd is None:
1416     return (True, None)
1417   return (True, (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus())
1418
1419
1420 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1421   """Write a file to the filesystem.
1422
1423   This allows the master to overwrite(!) a file. It will only perform
1424   the operation if the file belongs to a list of configuration files.
1425
1426   @type file_name: str
1427   @param file_name: the target file name
1428   @type data: str
1429   @param data: the new contents of the file
1430   @type mode: int
1431   @param mode: the mode to give the file (can be None)
1432   @type uid: int
1433   @param uid: the owner of the file (can be -1 for default)
1434   @type gid: int
1435   @param gid: the group of the file (can be -1 for default)
1436   @type atime: float
1437   @param atime: the atime to set on the file (can be None)
1438   @type mtime: float
1439   @param mtime: the mtime to set on the file (can be None)
1440   @rtype: boolean
1441   @return: the success of the operation; errors are logged
1442       in the node daemon log
1443
1444   """
1445   if not os.path.isabs(file_name):
1446     logging.error("Filename passed to UploadFile is not absolute: '%s'",
1447                   file_name)
1448     return False
1449
1450   allowed_files = [
1451     constants.CLUSTER_CONF_FILE,
1452     constants.ETC_HOSTS,
1453     constants.SSH_KNOWN_HOSTS_FILE,
1454     constants.VNC_PASSWORD_FILE,
1455     ]
1456
1457   if file_name not in allowed_files:
1458     logging.error("Filename passed to UploadFile not in allowed"
1459                  " upload targets: '%s'", file_name)
1460     return False
1461
1462   raw_data = _Decompress(data)
1463
1464   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1465                   atime=atime, mtime=mtime)
1466   return True
1467
1468
1469 def WriteSsconfFiles(values):
1470   """Update all ssconf files.
1471
1472   Wrapper around the SimpleStore.WriteFiles.
1473
1474   """
1475   ssconf.SimpleStore().WriteFiles(values)
1476
1477
1478 def _ErrnoOrStr(err):
1479   """Format an EnvironmentError exception.
1480
1481   If the L{err} argument has an errno attribute, it will be looked up
1482   and converted into a textual C{E...} description. Otherwise the
1483   string representation of the error will be returned.
1484
1485   @type err: L{EnvironmentError}
1486   @param err: the exception to format
1487
1488   """
1489   if hasattr(err, 'errno'):
1490     detail = errno.errorcode[err.errno]
1491   else:
1492     detail = str(err)
1493   return detail
1494
1495
1496 def _OSOndiskVersion(name, os_dir):
1497   """Compute and return the API version of a given OS.
1498
1499   This function will try to read the API version of the OS given by
1500   the 'name' parameter and residing in the 'os_dir' directory.
1501
1502   @type name: str
1503   @param name: the OS name we should look for
1504   @type os_dir: str
1505   @param os_dir: the directory inwhich we should look for the OS
1506   @rtype: int or None
1507   @return:
1508       Either an integer denoting the version or None in the
1509       case when this is not a valid OS name.
1510   @raise errors.InvalidOS: if the OS cannot be found
1511
1512   """
1513   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1514
1515   try:
1516     st = os.stat(api_file)
1517   except EnvironmentError, err:
1518     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1519                            " found (%s)" % _ErrnoOrStr(err))
1520
1521   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1522     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1523                            " a regular file")
1524
1525   try:
1526     f = open(api_file)
1527     try:
1528       api_versions = f.readlines()
1529     finally:
1530       f.close()
1531   except EnvironmentError, err:
1532     raise errors.InvalidOS(name, os_dir, "error while reading the"
1533                            " API version (%s)" % _ErrnoOrStr(err))
1534
1535   api_versions = [version.strip() for version in api_versions]
1536   try:
1537     api_versions = [int(version) for version in api_versions]
1538   except (TypeError, ValueError), err:
1539     raise errors.InvalidOS(name, os_dir,
1540                            "API version is not integer (%s)" % str(err))
1541
1542   return api_versions
1543
1544
1545 def DiagnoseOS(top_dirs=None):
1546   """Compute the validity for all OSes.
1547
1548   @type top_dirs: list
1549   @param top_dirs: the list of directories in which to
1550       search (if not given defaults to
1551       L{constants.OS_SEARCH_PATH})
1552   @rtype: list of L{objects.OS}
1553   @return: an OS object for each name in all the given
1554       directories
1555
1556   """
1557   if top_dirs is None:
1558     top_dirs = constants.OS_SEARCH_PATH
1559
1560   result = []
1561   for dir_name in top_dirs:
1562     if os.path.isdir(dir_name):
1563       try:
1564         f_names = utils.ListVisibleFiles(dir_name)
1565       except EnvironmentError, err:
1566         logging.exception("Can't list the OS directory %s", dir_name)
1567         break
1568       for name in f_names:
1569         try:
1570           os_inst = OSFromDisk(name, base_dir=dir_name)
1571           result.append(os_inst)
1572         except errors.InvalidOS, err:
1573           result.append(objects.OS.FromInvalidOS(err))
1574
1575   return result
1576
1577
1578 def OSFromDisk(name, base_dir=None):
1579   """Create an OS instance from disk.
1580
1581   This function will return an OS instance if the given name is a
1582   valid OS name. Otherwise, it will raise an appropriate
1583   L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1584
1585   @type base_dir: string
1586   @keyword base_dir: Base directory containing OS installations.
1587                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1588   @rtype: L{objects.OS}
1589   @return: the OS instance if we find a valid one
1590   @raise errors.InvalidOS: if we don't find a valid OS
1591
1592   """
1593   if base_dir is None:
1594     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1595     if os_dir is None:
1596       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1597   else:
1598     os_dir = os.path.sep.join([base_dir, name])
1599
1600   api_versions = _OSOndiskVersion(name, os_dir)
1601
1602   if constants.OS_API_VERSION not in api_versions:
1603     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1604                            " (found %s want %s)"
1605                            % (api_versions, constants.OS_API_VERSION))
1606
1607   # OS Scripts dictionary, we will populate it with the actual script names
1608   os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1609
1610   for script in os_scripts:
1611     os_scripts[script] = os.path.sep.join([os_dir, script])
1612
1613     try:
1614       st = os.stat(os_scripts[script])
1615     except EnvironmentError, err:
1616       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1617                              (script, _ErrnoOrStr(err)))
1618
1619     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1620       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1621                              script)
1622
1623     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1624       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1625                              script)
1626
1627
1628   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1629                     create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1630                     export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1631                     import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1632                     rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1633                     api_versions=api_versions)
1634
1635 def OSEnvironment(instance, debug=0):
1636   """Calculate the environment for an os script.
1637
1638   @type instance: L{objects.Instance}
1639   @param instance: target instance for the os script run
1640   @type debug: integer
1641   @param debug: debug level (0 or 1, for OS Api 10)
1642   @rtype: dict
1643   @return: dict of environment variables
1644   @raise errors.BlockDeviceError: if the block device
1645       cannot be found
1646
1647   """
1648   result = {}
1649   result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1650   result['INSTANCE_NAME'] = instance.name
1651   result['INSTANCE_OS'] = instance.os
1652   result['HYPERVISOR'] = instance.hypervisor
1653   result['DISK_COUNT'] = '%d' % len(instance.disks)
1654   result['NIC_COUNT'] = '%d' % len(instance.nics)
1655   result['DEBUG_LEVEL'] = '%d' % debug
1656   for idx, disk in enumerate(instance.disks):
1657     real_disk = _RecursiveFindBD(disk)
1658     if real_disk is None:
1659       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1660                                     str(disk))
1661     real_disk.Open()
1662     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1663     # FIXME: When disks will have read-only mode, populate this
1664     result['DISK_%d_ACCESS' % idx] = disk.mode
1665     if constants.HV_DISK_TYPE in instance.hvparams:
1666       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1667         instance.hvparams[constants.HV_DISK_TYPE]
1668     if disk.dev_type in constants.LDS_BLOCK:
1669       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1670     elif disk.dev_type == constants.LD_FILE:
1671       result['DISK_%d_BACKEND_TYPE' % idx] = \
1672         'file:%s' % disk.physical_id[0]
1673   for idx, nic in enumerate(instance.nics):
1674     result['NIC_%d_MAC' % idx] = nic.mac
1675     if nic.ip:
1676       result['NIC_%d_IP' % idx] = nic.ip
1677     result['NIC_%d_BRIDGE' % idx] = nic.bridge
1678     if constants.HV_NIC_TYPE in instance.hvparams:
1679       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1680         instance.hvparams[constants.HV_NIC_TYPE]
1681
1682   return result
1683
1684 def BlockdevGrow(disk, amount):
1685   """Grow a stack of block devices.
1686
1687   This function is called recursively, with the childrens being the
1688   first ones to resize.
1689
1690   @type disk: L{objects.Disk}
1691   @param disk: the disk to be grown
1692   @rtype: (status, result)
1693   @return: a tuple with the status of the operation
1694       (True/False), and the errors message if status
1695       is False
1696
1697   """
1698   r_dev = _RecursiveFindBD(disk)
1699   if r_dev is None:
1700     return False, "Cannot find block device %s" % (disk,)
1701
1702   try:
1703     r_dev.Grow(amount)
1704   except errors.BlockDeviceError, err:
1705     return False, str(err)
1706
1707   return True, None
1708
1709
1710 def BlockdevSnapshot(disk):
1711   """Create a snapshot copy of a block device.
1712
1713   This function is called recursively, and the snapshot is actually created
1714   just for the leaf lvm backend device.
1715
1716   @type disk: L{objects.Disk}
1717   @param disk: the disk to be snapshotted
1718   @rtype: string
1719   @return: snapshot disk path
1720
1721   """
1722   if disk.children:
1723     if len(disk.children) == 1:
1724       # only one child, let's recurse on it
1725       return BlockdevSnapshot(disk.children[0])
1726     else:
1727       # more than one child, choose one that matches
1728       for child in disk.children:
1729         if child.size == disk.size:
1730           # return implies breaking the loop
1731           return BlockdevSnapshot(child)
1732   elif disk.dev_type == constants.LD_LV:
1733     r_dev = _RecursiveFindBD(disk)
1734     if r_dev is not None:
1735       # let's stay on the safe side and ask for the full size, for now
1736       return r_dev.Snapshot(disk.size)
1737     else:
1738       return None
1739   else:
1740     raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1741                                  " '%s' of type '%s'" %
1742                                  (disk.unique_id, disk.dev_type))
1743
1744
1745 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1746   """Export a block device snapshot to a remote node.
1747
1748   @type disk: L{objects.Disk}
1749   @param disk: the description of the disk to export
1750   @type dest_node: str
1751   @param dest_node: the destination node to export to
1752   @type instance: L{objects.Instance}
1753   @param instance: the instance object to whom the disk belongs
1754   @type cluster_name: str
1755   @param cluster_name: the cluster name, needed for SSH hostalias
1756   @type idx: int
1757   @param idx: the index of the disk in the instance's disk list,
1758       used to export to the OS scripts environment
1759   @rtype: boolean
1760   @return: the success of the operation
1761
1762   """
1763   export_env = OSEnvironment(instance)
1764
1765   inst_os = OSFromDisk(instance.os)
1766   export_script = inst_os.export_script
1767
1768   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1769                                      instance.name, int(time.time()))
1770   if not os.path.exists(constants.LOG_OS_DIR):
1771     os.mkdir(constants.LOG_OS_DIR, 0750)
1772   real_disk = _RecursiveFindBD(disk)
1773   if real_disk is None:
1774     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1775                                   str(disk))
1776   real_disk.Open()
1777
1778   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1779   export_env['EXPORT_INDEX'] = str(idx)
1780
1781   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1782   destfile = disk.physical_id[1]
1783
1784   # the target command is built out of three individual commands,
1785   # which are joined by pipes; we check each individual command for
1786   # valid parameters
1787   expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1788                                export_script, logfile)
1789
1790   comprcmd = "gzip"
1791
1792   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1793                                 destdir, destdir, destfile)
1794   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1795                                                    constants.GANETI_RUNAS,
1796                                                    destcmd)
1797
1798   # all commands have been checked, so we're safe to combine them
1799   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1800
1801   result = utils.RunCmd(command, env=export_env)
1802
1803   if result.failed:
1804     logging.error("os snapshot export command '%s' returned error: %s"
1805                   " output: %s", command, result.fail_reason, result.output)
1806     return False
1807
1808   return True
1809
1810
1811 def FinalizeExport(instance, snap_disks):
1812   """Write out the export configuration information.
1813
1814   @type instance: L{objects.Instance}
1815   @param instance: the instance which we export, used for
1816       saving configuration
1817   @type snap_disks: list of L{objects.Disk}
1818   @param snap_disks: list of snapshot block devices, which
1819       will be used to get the actual name of the dump file
1820
1821   @rtype: boolean
1822   @return: the success of the operation
1823
1824   """
1825   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1826   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1827
1828   config = objects.SerializableConfigParser()
1829
1830   config.add_section(constants.INISECT_EXP)
1831   config.set(constants.INISECT_EXP, 'version', '0')
1832   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1833   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1834   config.set(constants.INISECT_EXP, 'os', instance.os)
1835   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1836
1837   config.add_section(constants.INISECT_INS)
1838   config.set(constants.INISECT_INS, 'name', instance.name)
1839   config.set(constants.INISECT_INS, 'memory', '%d' %
1840              instance.beparams[constants.BE_MEMORY])
1841   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1842              instance.beparams[constants.BE_VCPUS])
1843   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1844
1845   nic_total = 0
1846   for nic_count, nic in enumerate(instance.nics):
1847     nic_total += 1
1848     config.set(constants.INISECT_INS, 'nic%d_mac' %
1849                nic_count, '%s' % nic.mac)
1850     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1851     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1852                '%s' % nic.bridge)
1853   # TODO: redundant: on load can read nics until it doesn't exist
1854   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1855
1856   disk_total = 0
1857   for disk_count, disk in enumerate(snap_disks):
1858     if disk:
1859       disk_total += 1
1860       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1861                  ('%s' % disk.iv_name))
1862       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1863                  ('%s' % disk.physical_id[1]))
1864       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1865                  ('%d' % disk.size))
1866
1867   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1868
1869   utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1870                   data=config.Dumps())
1871   shutil.rmtree(finaldestdir, True)
1872   shutil.move(destdir, finaldestdir)
1873
1874   return True
1875
1876
1877 def ExportInfo(dest):
1878   """Get export configuration information.
1879
1880   @type dest: str
1881   @param dest: directory containing the export
1882
1883   @rtype: L{objects.SerializableConfigParser}
1884   @return: a serializable config file containing the
1885       export info
1886
1887   """
1888   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1889
1890   config = objects.SerializableConfigParser()
1891   config.read(cff)
1892
1893   if (not config.has_section(constants.INISECT_EXP) or
1894       not config.has_section(constants.INISECT_INS)):
1895     return None
1896
1897   return config
1898
1899
1900 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1901   """Import an os image into an instance.
1902
1903   @type instance: L{objects.Instance}
1904   @param instance: instance to import the disks into
1905   @type src_node: string
1906   @param src_node: source node for the disk images
1907   @type src_images: list of string
1908   @param src_images: absolute paths of the disk images
1909   @rtype: list of boolean
1910   @return: each boolean represent the success of importing the n-th disk
1911
1912   """
1913   import_env = OSEnvironment(instance)
1914   inst_os = OSFromDisk(instance.os)
1915   import_script = inst_os.import_script
1916
1917   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1918                                         instance.name, int(time.time()))
1919   if not os.path.exists(constants.LOG_OS_DIR):
1920     os.mkdir(constants.LOG_OS_DIR, 0750)
1921
1922   comprcmd = "gunzip"
1923   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1924                                import_script, logfile)
1925
1926   final_result = []
1927   for idx, image in enumerate(src_images):
1928     if image:
1929       destcmd = utils.BuildShellCmd('cat %s', image)
1930       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1931                                                        constants.GANETI_RUNAS,
1932                                                        destcmd)
1933       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1934       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1935       import_env['IMPORT_INDEX'] = str(idx)
1936       result = utils.RunCmd(command, env=import_env)
1937       if result.failed:
1938         logging.error("Disk import command '%s' returned error: %s"
1939                       " output: %s", command, result.fail_reason,
1940                       result.output)
1941         final_result.append(False)
1942       else:
1943         final_result.append(True)
1944     else:
1945       final_result.append(True)
1946
1947   return final_result
1948
1949
1950 def ListExports():
1951   """Return a list of exports currently available on this machine.
1952
1953   @rtype: list
1954   @return: list of the exports
1955
1956   """
1957   if os.path.isdir(constants.EXPORT_DIR):
1958     return utils.ListVisibleFiles(constants.EXPORT_DIR)
1959   else:
1960     return []
1961
1962
1963 def RemoveExport(export):
1964   """Remove an existing export from the node.
1965
1966   @type export: str
1967   @param export: the name of the export to remove
1968   @rtype: boolean
1969   @return: the success of the operation
1970
1971   """
1972   target = os.path.join(constants.EXPORT_DIR, export)
1973
1974   shutil.rmtree(target)
1975   # TODO: catch some of the relevant exceptions and provide a pretty
1976   # error message if rmtree fails.
1977
1978   return True
1979
1980
1981 def BlockdevRename(devlist):
1982   """Rename a list of block devices.
1983
1984   @type devlist: list of tuples
1985   @param devlist: list of tuples of the form  (disk,
1986       new_logical_id, new_physical_id); disk is an
1987       L{objects.Disk} object describing the current disk,
1988       and new logical_id/physical_id is the name we
1989       rename it to
1990   @rtype: boolean
1991   @return: True if all renames succeeded, False otherwise
1992
1993   """
1994   result = True
1995   for disk, unique_id in devlist:
1996     dev = _RecursiveFindBD(disk)
1997     if dev is None:
1998       result = False
1999       continue
2000     try:
2001       old_rpath = dev.dev_path
2002       dev.Rename(unique_id)
2003       new_rpath = dev.dev_path
2004       if old_rpath != new_rpath:
2005         DevCacheManager.RemoveCache(old_rpath)
2006         # FIXME: we should add the new cache information here, like:
2007         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2008         # but we don't have the owner here - maybe parse from existing
2009         # cache? for now, we only lose lvm data when we rename, which
2010         # is less critical than DRBD or MD
2011     except errors.BlockDeviceError, err:
2012       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2013       result = False
2014   return result
2015
2016
2017 def _TransformFileStorageDir(file_storage_dir):
2018   """Checks whether given file_storage_dir is valid.
2019
2020   Checks wheter the given file_storage_dir is within the cluster-wide
2021   default file_storage_dir stored in SimpleStore. Only paths under that
2022   directory are allowed.
2023
2024   @type file_storage_dir: str
2025   @param file_storage_dir: the path to check
2026
2027   @return: the normalized path if valid, None otherwise
2028
2029   """
2030   cfg = _GetConfig()
2031   file_storage_dir = os.path.normpath(file_storage_dir)
2032   base_file_storage_dir = cfg.GetFileStorageDir()
2033   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2034       base_file_storage_dir):
2035     logging.error("file storage directory '%s' is not under base file"
2036                   " storage directory '%s'",
2037                   file_storage_dir, base_file_storage_dir)
2038     return None
2039   return file_storage_dir
2040
2041
2042 def CreateFileStorageDir(file_storage_dir):
2043   """Create file storage directory.
2044
2045   @type file_storage_dir: str
2046   @param file_storage_dir: directory to create
2047
2048   @rtype: tuple
2049   @return: tuple with first element a boolean indicating wheter dir
2050       creation was successful or not
2051
2052   """
2053   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2054   result = True,
2055   if not file_storage_dir:
2056     result = False,
2057   else:
2058     if os.path.exists(file_storage_dir):
2059       if not os.path.isdir(file_storage_dir):
2060         logging.error("'%s' is not a directory", file_storage_dir)
2061         result = False,
2062     else:
2063       try:
2064         os.makedirs(file_storage_dir, 0750)
2065       except OSError, err:
2066         logging.error("Cannot create file storage directory '%s': %s",
2067                       file_storage_dir, err)
2068         result = False,
2069   return result
2070
2071
2072 def RemoveFileStorageDir(file_storage_dir):
2073   """Remove file storage directory.
2074
2075   Remove it only if it's empty. If not log an error and return.
2076
2077   @type file_storage_dir: str
2078   @param file_storage_dir: the directory we should cleanup
2079   @rtype: tuple (success,)
2080   @return: tuple of one element, C{success}, denoting
2081       whether the operation was successfull
2082
2083   """
2084   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2085   result = True,
2086   if not file_storage_dir:
2087     result = False,
2088   else:
2089     if os.path.exists(file_storage_dir):
2090       if not os.path.isdir(file_storage_dir):
2091         logging.error("'%s' is not a directory", file_storage_dir)
2092         result = False,
2093       # deletes dir only if empty, otherwise we want to return False
2094       try:
2095         os.rmdir(file_storage_dir)
2096       except OSError, err:
2097         logging.exception("Cannot remove file storage directory '%s'",
2098                           file_storage_dir)
2099         result = False,
2100   return result
2101
2102
2103 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2104   """Rename the file storage directory.
2105
2106   @type old_file_storage_dir: str
2107   @param old_file_storage_dir: the current path
2108   @type new_file_storage_dir: str
2109   @param new_file_storage_dir: the name we should rename to
2110   @rtype: tuple (success,)
2111   @return: tuple of one element, C{success}, denoting
2112       whether the operation was successful
2113
2114   """
2115   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2116   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2117   result = True,
2118   if not old_file_storage_dir or not new_file_storage_dir:
2119     result = False,
2120   else:
2121     if not os.path.exists(new_file_storage_dir):
2122       if os.path.isdir(old_file_storage_dir):
2123         try:
2124           os.rename(old_file_storage_dir, new_file_storage_dir)
2125         except OSError, err:
2126           logging.exception("Cannot rename '%s' to '%s'",
2127                             old_file_storage_dir, new_file_storage_dir)
2128           result =  False,
2129       else:
2130         logging.error("'%s' is not a directory", old_file_storage_dir)
2131         result = False,
2132     else:
2133       if os.path.exists(old_file_storage_dir):
2134         logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
2135                       old_file_storage_dir, new_file_storage_dir)
2136         result = False,
2137   return result
2138
2139
2140 def _IsJobQueueFile(file_name):
2141   """Checks whether the given filename is in the queue directory.
2142
2143   @type file_name: str
2144   @param file_name: the file name we should check
2145   @rtype: boolean
2146   @return: whether the file is under the queue directory
2147
2148   """
2149   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2150   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2151
2152   if not result:
2153     logging.error("'%s' is not a file in the queue directory",
2154                   file_name)
2155
2156   return result
2157
2158
2159 def JobQueueUpdate(file_name, content):
2160   """Updates a file in the queue directory.
2161
2162   This is just a wrapper over L{utils.WriteFile}, with proper
2163   checking.
2164
2165   @type file_name: str
2166   @param file_name: the job file name
2167   @type content: str
2168   @param content: the new job contents
2169   @rtype: boolean
2170   @return: the success of the operation
2171
2172   """
2173   if not _IsJobQueueFile(file_name):
2174     return False
2175
2176   # Write and replace the file atomically
2177   utils.WriteFile(file_name, data=_Decompress(content))
2178
2179   return True
2180
2181
2182 def JobQueueRename(old, new):
2183   """Renames a job queue file.
2184
2185   This is just a wrapper over os.rename with proper checking.
2186
2187   @type old: str
2188   @param old: the old (actual) file name
2189   @type new: str
2190   @param new: the desired file name
2191   @rtype: boolean
2192   @return: the success of the operation
2193
2194   """
2195   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2196     return False
2197
2198   utils.RenameFile(old, new, mkdir=True)
2199
2200   return True
2201
2202
2203 def JobQueueSetDrainFlag(drain_flag):
2204   """Set the drain flag for the queue.
2205
2206   This will set or unset the queue drain flag.
2207
2208   @type drain_flag: boolean
2209   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2210   @rtype: boolean
2211   @return: always True
2212   @warning: the function always returns True
2213
2214   """
2215   if drain_flag:
2216     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2217   else:
2218     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2219
2220   return True
2221
2222
2223 def BlockdevClose(instance_name, disks):
2224   """Closes the given block devices.
2225
2226   This means they will be switched to secondary mode (in case of
2227   DRBD).
2228
2229   @param instance_name: if the argument is not empty, the symlinks
2230       of this instance will be removed
2231   @type disks: list of L{objects.Disk}
2232   @param disks: the list of disks to be closed
2233   @rtype: tuple (success, message)
2234   @return: a tuple of success and message, where success
2235       indicates the succes of the operation, and message
2236       which will contain the error details in case we
2237       failed
2238
2239   """
2240   bdevs = []
2241   for cf in disks:
2242     rd = _RecursiveFindBD(cf)
2243     if rd is None:
2244       return (False, "Can't find device %s" % cf)
2245     bdevs.append(rd)
2246
2247   msg = []
2248   for rd in bdevs:
2249     try:
2250       rd.Close()
2251     except errors.BlockDeviceError, err:
2252       msg.append(str(err))
2253   if msg:
2254     return (False, "Can't make devices secondary: %s" % ",".join(msg))
2255   else:
2256     if instance_name:
2257       _RemoveBlockDevLinks(instance_name, disks)
2258     return (True, "All devices secondary")
2259
2260
2261 def ValidateHVParams(hvname, hvparams):
2262   """Validates the given hypervisor parameters.
2263
2264   @type hvname: string
2265   @param hvname: the hypervisor name
2266   @type hvparams: dict
2267   @param hvparams: the hypervisor parameters to be validated
2268   @rtype: tuple (success, message)
2269   @return: a tuple of success and message, where success
2270       indicates the succes of the operation, and message
2271       which will contain the error details in case we
2272       failed
2273
2274   """
2275   try:
2276     hv_type = hypervisor.GetHypervisor(hvname)
2277     hv_type.ValidateParameters(hvparams)
2278     return (True, "Validation passed")
2279   except errors.HypervisorError, err:
2280     return (False, str(err))
2281
2282
2283 def DemoteFromMC():
2284   """Demotes the current node from master candidate role.
2285
2286   """
2287   # try to ensure we're not the master by mistake
2288   master, myself = ssconf.GetMasterAndMyself()
2289   if master == myself:
2290     return (False, "ssconf status shows I'm the master node, will not demote")
2291   pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2292   if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2293     return (False, "The master daemon is running, will not demote")
2294   try:
2295     utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2296   except EnvironmentError, err:
2297     if err.errno != errno.ENOENT:
2298       return (False, "Error while backing up cluster file: %s" % str(err))
2299   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2300   return (True, "Done")
2301
2302
2303 def _FindDisks(nodes_ip, disks):
2304   """Sets the physical ID on disks and returns the block devices.
2305
2306   """
2307   # set the correct physical ID
2308   my_name = utils.HostInfo().name
2309   for cf in disks:
2310     cf.SetPhysicalID(my_name, nodes_ip)
2311
2312   bdevs = []
2313
2314   for cf in disks:
2315     rd = _RecursiveFindBD(cf)
2316     if rd is None:
2317       return (False, "Can't find device %s" % cf)
2318     bdevs.append(rd)
2319   return (True, bdevs)
2320
2321
2322 def DrbdDisconnectNet(nodes_ip, disks):
2323   """Disconnects the network on a list of drbd devices.
2324
2325   """
2326   status, bdevs = _FindDisks(nodes_ip, disks)
2327   if not status:
2328     return status, bdevs
2329
2330   # disconnect disks
2331   for rd in bdevs:
2332     try:
2333       rd.DisconnectNet()
2334     except errors.BlockDeviceError, err:
2335       logging.exception("Failed to go into standalone mode")
2336       return (False, "Can't change network configuration: %s" % str(err))
2337   return (True, "All disks are now disconnected")
2338
2339
2340 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2341   """Attaches the network on a list of drbd devices.
2342
2343   """
2344   status, bdevs = _FindDisks(nodes_ip, disks)
2345   if not status:
2346     return status, bdevs
2347
2348   if multimaster:
2349     for idx, rd in enumerate(bdevs):
2350       try:
2351         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2352       except EnvironmentError, err:
2353         return (False, "Can't create symlink: %s" % str(err))
2354   # reconnect disks, switch to new master configuration and if
2355   # needed primary mode
2356   for rd in bdevs:
2357     try:
2358       rd.AttachNet(multimaster)
2359     except errors.BlockDeviceError, err:
2360       return (False, "Can't change network configuration: %s" % str(err))
2361   # wait until the disks are connected; we need to retry the re-attach
2362   # if the device becomes standalone, as this might happen if the one
2363   # node disconnects and reconnects in a different mode before the
2364   # other node reconnects; in this case, one or both of the nodes will
2365   # decide it has wrong configuration and switch to standalone
2366   RECONNECT_TIMEOUT = 2 * 60
2367   sleep_time = 0.100 # start with 100 miliseconds
2368   timeout_limit = time.time() + RECONNECT_TIMEOUT
2369   while time.time() < timeout_limit:
2370     all_connected = True
2371     for rd in bdevs:
2372       stats = rd.GetProcStatus()
2373       if not (stats.is_connected or stats.is_in_resync):
2374         all_connected = False
2375       if stats.is_standalone:
2376         # peer had different config info and this node became
2377         # standalone, even though this should not happen with the
2378         # new staged way of changing disk configs
2379         try:
2380           rd.ReAttachNet(multimaster)
2381         except errors.BlockDeviceError, err:
2382           return (False, "Can't change network configuration: %s" % str(err))
2383     if all_connected:
2384       break
2385     time.sleep(sleep_time)
2386     sleep_time = min(5, sleep_time * 1.5)
2387   if not all_connected:
2388     return (False, "Timeout in disk reconnecting")
2389   if multimaster:
2390     # change to primary mode
2391     for rd in bdevs:
2392       try:
2393         rd.Open()
2394       except errors.BlockDeviceError, err:
2395         return (False, "Can't change to primary mode: %s" % str(err))
2396   if multimaster:
2397     msg = "multi-master and primary"
2398   else:
2399     msg = "single-master"
2400   return (True, "Disks are now configured as %s" % msg)
2401
2402
2403 def DrbdWaitSync(nodes_ip, disks):
2404   """Wait until DRBDs have synchronized.
2405
2406   """
2407   status, bdevs = _FindDisks(nodes_ip, disks)
2408   if not status:
2409     return status, bdevs
2410
2411   min_resync = 100
2412   alldone = True
2413   failure = False
2414   for rd in bdevs:
2415     stats = rd.GetProcStatus()
2416     if not (stats.is_connected or stats.is_in_resync):
2417       failure = True
2418       break
2419     alldone = alldone and (not stats.is_in_resync)
2420     if stats.sync_percent is not None:
2421       min_resync = min(min_resync, stats.sync_percent)
2422   return (not failure, (alldone, min_resync))
2423
2424
2425 class HooksRunner(object):
2426   """Hook runner.
2427
2428   This class is instantiated on the node side (ganeti-noded) and not
2429   on the master side.
2430
2431   """
2432   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2433
2434   def __init__(self, hooks_base_dir=None):
2435     """Constructor for hooks runner.
2436
2437     @type hooks_base_dir: str or None
2438     @param hooks_base_dir: if not None, this overrides the
2439         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2440
2441     """
2442     if hooks_base_dir is None:
2443       hooks_base_dir = constants.HOOKS_BASE_DIR
2444     self._BASE_DIR = hooks_base_dir
2445
2446   @staticmethod
2447   def ExecHook(script, env):
2448     """Exec one hook script.
2449
2450     @type script: str
2451     @param script: the full path to the script
2452     @type env: dict
2453     @param env: the environment with which to exec the script
2454     @rtype: tuple (success, message)
2455     @return: a tuple of success and message, where success
2456         indicates the succes of the operation, and message
2457         which will contain the error details in case we
2458         failed
2459
2460     """
2461     # exec the process using subprocess and log the output
2462     fdstdin = None
2463     try:
2464       fdstdin = open("/dev/null", "r")
2465       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2466                                stderr=subprocess.STDOUT, close_fds=True,
2467                                shell=False, cwd="/", env=env)
2468       output = ""
2469       try:
2470         output = child.stdout.read(4096)
2471         child.stdout.close()
2472       except EnvironmentError, err:
2473         output += "Hook script error: %s" % str(err)
2474
2475       while True:
2476         try:
2477           result = child.wait()
2478           break
2479         except EnvironmentError, err:
2480           if err.errno == errno.EINTR:
2481             continue
2482           raise
2483     finally:
2484       # try not to leak fds
2485       for fd in (fdstdin, ):
2486         if fd is not None:
2487           try:
2488             fd.close()
2489           except EnvironmentError, err:
2490             # just log the error
2491             #logging.exception("Error while closing fd %s", fd)
2492             pass
2493
2494     return result == 0, utils.SafeEncode(output.strip())
2495
2496   def RunHooks(self, hpath, phase, env):
2497     """Run the scripts in the hooks directory.
2498
2499     @type hpath: str
2500     @param hpath: the path to the hooks directory which
2501         holds the scripts
2502     @type phase: str
2503     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2504         L{constants.HOOKS_PHASE_POST}
2505     @type env: dict
2506     @param env: dictionary with the environment for the hook
2507     @rtype: list
2508     @return: list of 3-element tuples:
2509       - script path
2510       - script result, either L{constants.HKR_SUCCESS} or
2511         L{constants.HKR_FAIL}
2512       - output of the script
2513
2514     @raise errors.ProgrammerError: for invalid input
2515         parameters
2516
2517     """
2518     if phase == constants.HOOKS_PHASE_PRE:
2519       suffix = "pre"
2520     elif phase == constants.HOOKS_PHASE_POST:
2521       suffix = "post"
2522     else:
2523       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2524     rr = []
2525
2526     subdir = "%s-%s.d" % (hpath, suffix)
2527     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2528     try:
2529       dir_contents = utils.ListVisibleFiles(dir_name)
2530     except OSError, err:
2531       # FIXME: must log output in case of failures
2532       return rr
2533
2534     # we use the standard python sort order,
2535     # so 00name is the recommended naming scheme
2536     dir_contents.sort()
2537     for relname in dir_contents:
2538       fname = os.path.join(dir_name, relname)
2539       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2540           self.RE_MASK.match(relname) is not None):
2541         rrval = constants.HKR_SKIP
2542         output = ""
2543       else:
2544         result, output = self.ExecHook(fname, env)
2545         if not result:
2546           rrval = constants.HKR_FAIL
2547         else:
2548           rrval = constants.HKR_SUCCESS
2549       rr.append(("%s/%s" % (subdir, relname), rrval, output))
2550
2551     return rr
2552
2553
2554 class IAllocatorRunner(object):
2555   """IAllocator runner.
2556
2557   This class is instantiated on the node side (ganeti-noded) and not on
2558   the master side.
2559
2560   """
2561   def Run(self, name, idata):
2562     """Run an iallocator script.
2563
2564     @type name: str
2565     @param name: the iallocator script name
2566     @type idata: str
2567     @param idata: the allocator input data
2568
2569     @rtype: tuple
2570     @return: four element tuple of:
2571        - run status (one of the IARUN_ constants)
2572        - stdout
2573        - stderr
2574        - fail reason (as from L{utils.RunResult})
2575
2576     """
2577     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2578                                   os.path.isfile)
2579     if alloc_script is None:
2580       return (constants.IARUN_NOTFOUND, None, None, None)
2581
2582     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2583     try:
2584       os.write(fd, idata)
2585       os.close(fd)
2586       result = utils.RunCmd([alloc_script, fin_name])
2587       if result.failed:
2588         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2589                 result.fail_reason)
2590     finally:
2591       os.unlink(fin_name)
2592
2593     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2594
2595
2596 class DevCacheManager(object):
2597   """Simple class for managing a cache of block device information.
2598
2599   """
2600   _DEV_PREFIX = "/dev/"
2601   _ROOT_DIR = constants.BDEV_CACHE_DIR
2602
2603   @classmethod
2604   def _ConvertPath(cls, dev_path):
2605     """Converts a /dev/name path to the cache file name.
2606
2607     This replaces slashes with underscores and strips the /dev
2608     prefix. It then returns the full path to the cache file.
2609
2610     @type dev_path: str
2611     @param dev_path: the C{/dev/} path name
2612     @rtype: str
2613     @return: the converted path name
2614
2615     """
2616     if dev_path.startswith(cls._DEV_PREFIX):
2617       dev_path = dev_path[len(cls._DEV_PREFIX):]
2618     dev_path = dev_path.replace("/", "_")
2619     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2620     return fpath
2621
2622   @classmethod
2623   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2624     """Updates the cache information for a given device.
2625
2626     @type dev_path: str
2627     @param dev_path: the pathname of the device
2628     @type owner: str
2629     @param owner: the owner (instance name) of the device
2630     @type on_primary: bool
2631     @param on_primary: whether this is the primary
2632         node nor not
2633     @type iv_name: str
2634     @param iv_name: the instance-visible name of the
2635         device, as in objects.Disk.iv_name
2636
2637     @rtype: None
2638
2639     """
2640     if dev_path is None:
2641       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2642       return
2643     fpath = cls._ConvertPath(dev_path)
2644     if on_primary:
2645       state = "primary"
2646     else:
2647       state = "secondary"
2648     if iv_name is None:
2649       iv_name = "not_visible"
2650     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2651     try:
2652       utils.WriteFile(fpath, data=fdata)
2653     except EnvironmentError, err:
2654       logging.exception("Can't update bdev cache for %s", dev_path)
2655
2656   @classmethod
2657   def RemoveCache(cls, dev_path):
2658     """Remove data for a dev_path.
2659
2660     This is just a wrapper over L{utils.RemoveFile} with a converted
2661     path name and logging.
2662
2663     @type dev_path: str
2664     @param dev_path: the pathname of the device
2665
2666     @rtype: None
2667
2668     """
2669     if dev_path is None:
2670       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2671       return
2672     fpath = cls._ConvertPath(dev_path)
2673     try:
2674       utils.RemoveFile(fpath)
2675     except EnvironmentError, err:
2676       logging.exception("Can't update bdev cache for %s", dev_path)