48d0dc8ca794520ad77226bfc531ff338252dc90
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36 import zlib
37 import base64
38
39 from ganeti import errors
40 from ganeti import utils
41 from ganeti import ssh
42 from ganeti import hypervisor
43 from ganeti import constants
44 from ganeti import bdev
45 from ganeti import objects
46 from ganeti import ssconf
47
48
49 def _GetConfig():
50   """Simple wrapper to return a SimpleStore.
51
52   @rtype: L{ssconf.SimpleStore}
53   @return: a SimpleStore instance
54
55   """
56   return ssconf.SimpleStore()
57
58
59 def _GetSshRunner(cluster_name):
60   """Simple wrapper to return an SshRunner.
61
62   @type cluster_name: str
63   @param cluster_name: the cluster name, which is needed
64       by the SshRunner constructor
65   @rtype: L{ssh.SshRunner}
66   @return: an SshRunner instance
67
68   """
69   return ssh.SshRunner(cluster_name)
70
71
72 def _Decompress(data):
73   """Unpacks data compressed by the RPC client.
74
75   @type data: list or tuple
76   @param data: Data sent by RPC client
77   @rtype: str
78   @return: Decompressed data
79
80   """
81   assert isinstance(data, (list, tuple))
82   assert len(data) == 2
83   (encoding, content) = data
84   if encoding == constants.RPC_ENCODING_NONE:
85     return content
86   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
87     return zlib.decompress(base64.b64decode(content))
88   else:
89     raise AssertionError("Unknown data encoding")
90
91
92 def _CleanDirectory(path, exclude=None):
93   """Removes all regular files in a directory.
94
95   @type path: str
96   @param path: the directory to clean
97   @type exclude: list
98   @param exclude: list of files to be excluded, defaults
99       to the empty list
100
101   """
102   if not os.path.isdir(path):
103     return
104   if exclude is None:
105     exclude = []
106   else:
107     # Normalize excluded paths
108     exclude = [os.path.normpath(i) for i in exclude]
109
110   for rel_name in utils.ListVisibleFiles(path):
111     full_name = os.path.normpath(os.path.join(path, rel_name))
112     if full_name in exclude:
113       continue
114     if os.path.isfile(full_name) and not os.path.islink(full_name):
115       utils.RemoveFile(full_name)
116
117
118 def JobQueuePurge():
119   """Removes job queue files and archived jobs.
120
121   @rtype: None
122
123   """
124   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
125   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
126
127
128 def GetMasterInfo():
129   """Returns master information.
130
131   This is an utility function to compute master information, either
132   for consumption here or from the node daemon.
133
134   @rtype: tuple
135   @return: (master_netdev, master_ip, master_name) if we have a good
136       configuration, otherwise (None, None, None)
137
138   """
139   try:
140     cfg = _GetConfig()
141     master_netdev = cfg.GetMasterNetdev()
142     master_ip = cfg.GetMasterIP()
143     master_node = cfg.GetMasterNode()
144   except errors.ConfigurationError, err:
145     logging.exception("Cluster configuration incomplete")
146     return (None, None, None)
147   return (master_netdev, master_ip, master_node)
148
149
150 def StartMaster(start_daemons):
151   """Activate local node as master node.
152
153   The function will always try activate the IP address of the master
154   (unless someone else has it). It will also start the master daemons,
155   based on the start_daemons parameter.
156
157   @type start_daemons: boolean
158   @param start_daemons: whther to also start the master
159       daemons (ganeti-masterd and ganeti-rapi)
160   @rtype: None
161
162   """
163   ok = True
164   master_netdev, master_ip, _ = GetMasterInfo()
165   if not master_netdev:
166     return False
167
168   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
169     if utils.OwnIpAddress(master_ip):
170       # we already have the ip:
171       logging.debug("Already started")
172     else:
173       logging.error("Someone else has the master ip, not activating")
174       ok = False
175   else:
176     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
177                            "dev", master_netdev, "label",
178                            "%s:0" % master_netdev])
179     if result.failed:
180       logging.error("Can't activate master IP: %s", result.output)
181       ok = False
182
183     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
184                            "-s", master_ip, master_ip])
185     # we'll ignore the exit code of arping
186
187   # and now start the master and rapi daemons
188   if start_daemons:
189     for daemon in 'ganeti-masterd', 'ganeti-rapi':
190       result = utils.RunCmd([daemon])
191       if result.failed:
192         logging.error("Can't start daemon %s: %s", daemon, result.output)
193         ok = False
194   return ok
195
196
197 def StopMaster(stop_daemons):
198   """Deactivate this node as master.
199
200   The function will always try to deactivate the IP address of the
201   master. It will also stop the master daemons depending on the
202   stop_daemons parameter.
203
204   @type stop_daemons: boolean
205   @param stop_daemons: whether to also stop the master daemons
206       (ganeti-masterd and ganeti-rapi)
207   @rtype: None
208
209   """
210   master_netdev, master_ip, _ = GetMasterInfo()
211   if not master_netdev:
212     return False
213
214   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
215                          "dev", master_netdev])
216   if result.failed:
217     logging.error("Can't remove the master IP, error: %s", result.output)
218     # but otherwise ignore the failure
219
220   if stop_daemons:
221     # stop/kill the rapi and the master daemon
222     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
223       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
224
225   return True
226
227
228 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
229   """Joins this node to the cluster.
230
231   This does the following:
232       - updates the hostkeys of the machine (rsa and dsa)
233       - adds the ssh private key to the user
234       - adds the ssh public key to the users' authorized_keys file
235
236   @type dsa: str
237   @param dsa: the DSA private key to write
238   @type dsapub: str
239   @param dsapub: the DSA public key to write
240   @type rsa: str
241   @param rsa: the RSA private key to write
242   @type rsapub: str
243   @param rsapub: the RSA public key to write
244   @type sshkey: str
245   @param sshkey: the SSH private key to write
246   @type sshpub: str
247   @param sshpub: the SSH public key to write
248   @rtype: boolean
249   @return: the success of the operation
250
251   """
252   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
253                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
254                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
255                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
256   for name, content, mode in sshd_keys:
257     utils.WriteFile(name, data=content, mode=mode)
258
259   try:
260     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
261                                                     mkdir=True)
262   except errors.OpExecError, err:
263     msg = "Error while processing user ssh files"
264     logging.exception(msg)
265     return (False, "%s: %s" % (msg, err))
266
267   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
268     utils.WriteFile(name, data=content, mode=0600)
269
270   utils.AddAuthorizedKey(auth_keys, sshpub)
271
272   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
273
274   return (True, "Node added successfully")
275
276
277 def LeaveCluster():
278   """Cleans up and remove the current node.
279
280   This function cleans up and prepares the current node to be removed
281   from the cluster.
282
283   If processing is successful, then it raises an
284   L{errors.QuitGanetiException} which is used as a special case to
285   shutdown the node daemon.
286
287   """
288   _CleanDirectory(constants.DATA_DIR)
289   JobQueuePurge()
290
291   try:
292     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
293   except errors.OpExecError:
294     logging.exception("Error while processing ssh files")
295     return
296
297   f = open(pub_key, 'r')
298   try:
299     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
300   finally:
301     f.close()
302
303   utils.RemoveFile(priv_key)
304   utils.RemoveFile(pub_key)
305
306   # Return a reassuring string to the caller, and quit
307   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
308
309
310 def GetNodeInfo(vgname, hypervisor_type):
311   """Gives back a hash with different informations about the node.
312
313   @type vgname: C{string}
314   @param vgname: the name of the volume group to ask for disk space information
315   @type hypervisor_type: C{str}
316   @param hypervisor_type: the name of the hypervisor to ask for
317       memory information
318   @rtype: C{dict}
319   @return: dictionary with the following keys:
320       - vg_size is the size of the configured volume group in MiB
321       - vg_free is the free size of the volume group in MiB
322       - memory_dom0 is the memory allocated for domain0 in MiB
323       - memory_free is the currently available (free) ram in MiB
324       - memory_total is the total number of ram in MiB
325
326   """
327   outputarray = {}
328   vginfo = _GetVGInfo(vgname)
329   outputarray['vg_size'] = vginfo['vg_size']
330   outputarray['vg_free'] = vginfo['vg_free']
331
332   hyper = hypervisor.GetHypervisor(hypervisor_type)
333   hyp_info = hyper.GetNodeInfo()
334   if hyp_info is not None:
335     outputarray.update(hyp_info)
336
337   f = open("/proc/sys/kernel/random/boot_id", 'r')
338   try:
339     outputarray["bootid"] = f.read(128).rstrip("\n")
340   finally:
341     f.close()
342
343   return outputarray
344
345
346 def VerifyNode(what, cluster_name):
347   """Verify the status of the local node.
348
349   Based on the input L{what} parameter, various checks are done on the
350   local node.
351
352   If the I{filelist} key is present, this list of
353   files is checksummed and the file/checksum pairs are returned.
354
355   If the I{nodelist} key is present, we check that we have
356   connectivity via ssh with the target nodes (and check the hostname
357   report).
358
359   If the I{node-net-test} key is present, we check that we have
360   connectivity to the given nodes via both primary IP and, if
361   applicable, secondary IPs.
362
363   @type what: C{dict}
364   @param what: a dictionary of things to check:
365       - filelist: list of files for which to compute checksums
366       - nodelist: list of nodes we should check ssh communication with
367       - node-net-test: list of nodes we should check node daemon port
368         connectivity with
369       - hypervisor: list with hypervisors to run the verify for
370   @rtype: dict
371   @return: a dictionary with the same keys as the input dict, and
372       values representing the result of the checks
373
374   """
375   result = {}
376
377   if constants.NV_HYPERVISOR in what:
378     result[constants.NV_HYPERVISOR] = tmp = {}
379     for hv_name in what[constants.NV_HYPERVISOR]:
380       tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
381
382   if constants.NV_FILELIST in what:
383     result[constants.NV_FILELIST] = utils.FingerprintFiles(
384       what[constants.NV_FILELIST])
385
386   if constants.NV_NODELIST in what:
387     result[constants.NV_NODELIST] = tmp = {}
388     random.shuffle(what[constants.NV_NODELIST])
389     for node in what[constants.NV_NODELIST]:
390       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
391       if not success:
392         tmp[node] = message
393
394   if constants.NV_NODENETTEST in what:
395     result[constants.NV_NODENETTEST] = tmp = {}
396     my_name = utils.HostInfo().name
397     my_pip = my_sip = None
398     for name, pip, sip in what[constants.NV_NODENETTEST]:
399       if name == my_name:
400         my_pip = pip
401         my_sip = sip
402         break
403     if not my_pip:
404       tmp[my_name] = ("Can't find my own primary/secondary IP"
405                       " in the node list")
406     else:
407       port = utils.GetNodeDaemonPort()
408       for name, pip, sip in what[constants.NV_NODENETTEST]:
409         fail = []
410         if not utils.TcpPing(pip, port, source=my_pip):
411           fail.append("primary")
412         if sip != pip:
413           if not utils.TcpPing(sip, port, source=my_sip):
414             fail.append("secondary")
415         if fail:
416           tmp[name] = ("failure using the %s interface(s)" %
417                        " and ".join(fail))
418
419   if constants.NV_LVLIST in what:
420     result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
421
422   if constants.NV_INSTANCELIST in what:
423     result[constants.NV_INSTANCELIST] = GetInstanceList(
424       what[constants.NV_INSTANCELIST])
425
426   if constants.NV_VGLIST in what:
427     result[constants.NV_VGLIST] = ListVolumeGroups()
428
429   if constants.NV_VERSION in what:
430     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
431                                     constants.RELEASE_VERSION)
432
433   if constants.NV_HVINFO in what:
434     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
435     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
436
437   if constants.NV_DRBDLIST in what:
438     try:
439       used_minors = bdev.DRBD8.GetUsedDevs().keys()
440     except errors.BlockDeviceError:
441       logging.warning("Can't get used minors list", exc_info=True)
442       used_minors = []
443     result[constants.NV_DRBDLIST] = used_minors
444
445   return result
446
447
448 def GetVolumeList(vg_name):
449   """Compute list of logical volumes and their size.
450
451   @type vg_name: str
452   @param vg_name: the volume group whose LVs we should list
453   @rtype: dict
454   @return:
455       dictionary of all partions (key) with value being a tuple of
456       their size (in MiB), inactive and online status::
457
458         {'test1': ('20.06', True, True)}
459
460       in case of errors, a string is returned with the error
461       details.
462
463   """
464   lvs = {}
465   sep = '|'
466   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
467                          "--separator=%s" % sep,
468                          "-olv_name,lv_size,lv_attr", vg_name])
469   if result.failed:
470     logging.error("Failed to list logical volumes, lvs output: %s",
471                   result.output)
472     return result.output
473
474   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
475   for line in result.stdout.splitlines():
476     line = line.strip()
477     match = valid_line_re.match(line)
478     if not match:
479       logging.error("Invalid line returned from lvs output: '%s'", line)
480       continue
481     name, size, attr = match.groups()
482     inactive = attr[4] == '-'
483     online = attr[5] == 'o'
484     lvs[name] = (size, inactive, online)
485
486   return lvs
487
488
489 def ListVolumeGroups():
490   """List the volume groups and their size.
491
492   @rtype: dict
493   @return: dictionary with keys volume name and values the
494       size of the volume
495
496   """
497   return utils.ListVolumeGroups()
498
499
500 def NodeVolumes():
501   """List all volumes on this node.
502
503   @rtype: list
504   @return:
505     A list of dictionaries, each having four keys:
506       - name: the logical volume name,
507       - size: the size of the logical volume
508       - dev: the physical device on which the LV lives
509       - vg: the volume group to which it belongs
510
511     In case of errors, we return an empty list and log the
512     error.
513
514     Note that since a logical volume can live on multiple physical
515     volumes, the resulting list might include a logical volume
516     multiple times.
517
518   """
519   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
520                          "--separator=|",
521                          "--options=lv_name,lv_size,devices,vg_name"])
522   if result.failed:
523     logging.error("Failed to list logical volumes, lvs output: %s",
524                   result.output)
525     return []
526
527   def parse_dev(dev):
528     if '(' in dev:
529       return dev.split('(')[0]
530     else:
531       return dev
532
533   def map_line(line):
534     return {
535       'name': line[0].strip(),
536       'size': line[1].strip(),
537       'dev': parse_dev(line[2].strip()),
538       'vg': line[3].strip(),
539     }
540
541   return [map_line(line.split('|')) for line in result.stdout.splitlines()
542           if line.count('|') >= 3]
543
544
545 def BridgesExist(bridges_list):
546   """Check if a list of bridges exist on the current node.
547
548   @rtype: boolean
549   @return: C{True} if all of them exist, C{False} otherwise
550
551   """
552   for bridge in bridges_list:
553     if not utils.BridgeExists(bridge):
554       return False
555
556   return True
557
558
559 def GetInstanceList(hypervisor_list):
560   """Provides a list of instances.
561
562   @type hypervisor_list: list
563   @param hypervisor_list: the list of hypervisors to query information
564
565   @rtype: list
566   @return: a list of all running instances on the current node
567     - instance1.example.com
568     - instance2.example.com
569
570   """
571   results = []
572   for hname in hypervisor_list:
573     try:
574       names = hypervisor.GetHypervisor(hname).ListInstances()
575       results.extend(names)
576     except errors.HypervisorError, err:
577       logging.exception("Error enumerating instances for hypevisor %s", hname)
578       # FIXME: should we somehow not propagate this to the master?
579       raise
580
581   return results
582
583
584 def GetInstanceInfo(instance, hname):
585   """Gives back the informations about an instance as a dictionary.
586
587   @type instance: string
588   @param instance: the instance name
589   @type hname: string
590   @param hname: the hypervisor type of the instance
591
592   @rtype: dict
593   @return: dictionary with the following keys:
594       - memory: memory size of instance (int)
595       - state: xen state of instance (string)
596       - time: cpu time of instance (float)
597
598   """
599   output = {}
600
601   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
602   if iinfo is not None:
603     output['memory'] = iinfo[2]
604     output['state'] = iinfo[4]
605     output['time'] = iinfo[5]
606
607   return output
608
609
610 def GetInstanceMigratable(instance):
611   """Gives whether an instance can be migrated.
612
613   @type instance: L{objects.Instance}
614   @param instance: object representing the instance to be checked.
615
616   @rtype: tuple
617   @return: tuple of (result, description) where:
618       - result: whether the instance can be migrated or not
619       - description: a description of the issue, if relevant
620
621   """
622   hyper = hypervisor.GetHypervisor(instance.hypervisor)
623   if instance.name not in hyper.ListInstances():
624     return (False, 'not running')
625
626   for idx in range(len(instance.disks)):
627     link_name = _GetBlockDevSymlinkPath(instance.name, idx)
628     if not os.path.islink(link_name):
629       return (False, 'not restarted since ganeti 1.2.5')
630
631   return (True, '')
632
633
634 def GetAllInstancesInfo(hypervisor_list):
635   """Gather data about all instances.
636
637   This is the equivalent of L{GetInstanceInfo}, except that it
638   computes data for all instances at once, thus being faster if one
639   needs data about more than one instance.
640
641   @type hypervisor_list: list
642   @param hypervisor_list: list of hypervisors to query for instance data
643
644   @rtype: dict
645   @return: dictionary of instance: data, with data having the following keys:
646       - memory: memory size of instance (int)
647       - state: xen state of instance (string)
648       - time: cpu time of instance (float)
649       - vcpus: the number of vcpus
650
651   """
652   output = {}
653
654   for hname in hypervisor_list:
655     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
656     if iinfo:
657       for name, inst_id, memory, vcpus, state, times in iinfo:
658         value = {
659           'memory': memory,
660           'vcpus': vcpus,
661           'state': state,
662           'time': times,
663           }
664         if name in output and output[name] != value:
665           raise errors.HypervisorError("Instance %s running duplicate"
666                                        " with different parameters" % name)
667         output[name] = value
668
669   return output
670
671
672 def InstanceOsAdd(instance):
673   """Add an OS to an instance.
674
675   @type instance: L{objects.Instance}
676   @param instance: Instance whose OS is to be installed
677   @rtype: boolean
678   @return: the success of the operation
679
680   """
681   try:
682     inst_os = OSFromDisk(instance.os)
683   except errors.InvalidOS, err:
684     os_name, os_dir, os_err = err.args
685     if os_dir is None:
686       return (False, "Can't find OS '%s': %s" % (os_name, os_err))
687     else:
688       return (False, "Error parsing OS '%s' in directory %s: %s" %
689               (os_name, os_dir, os_err))
690
691   create_env = OSEnvironment(instance)
692
693   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
694                                      instance.name, int(time.time()))
695
696   result = utils.RunCmd([inst_os.create_script], env=create_env,
697                         cwd=inst_os.path, output=logfile,)
698   if result.failed:
699     logging.error("os create command '%s' returned error: %s, logfile: %s,"
700                   " output: %s", result.cmd, result.fail_reason, logfile,
701                   result.output)
702     lines = [utils.SafeEncode(val)
703              for val in utils.TailFile(logfile, lines=20)]
704     return (False, "OS create script failed (%s), last lines in the"
705             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
706
707   return (True, "Successfully installed")
708
709
710 def RunRenameInstance(instance, old_name):
711   """Run the OS rename script for an instance.
712
713   @type instance: L{objects.Instance}
714   @param instance: Instance whose OS is to be installed
715   @type old_name: string
716   @param old_name: previous instance name
717   @rtype: boolean
718   @return: the success of the operation
719
720   """
721   inst_os = OSFromDisk(instance.os)
722
723   rename_env = OSEnvironment(instance)
724   rename_env['OLD_INSTANCE_NAME'] = old_name
725
726   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
727                                            old_name,
728                                            instance.name, int(time.time()))
729
730   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
731                         cwd=inst_os.path, output=logfile)
732
733   if result.failed:
734     logging.error("os create command '%s' returned error: %s output: %s",
735                   result.cmd, result.fail_reason, result.output)
736     lines = [utils.SafeEncode(val)
737              for val in utils.TailFile(logfile, lines=20)]
738     return (False, "OS rename script failed (%s), last lines in the"
739             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
740
741   return (True, "Rename successful")
742
743
744 def _GetVGInfo(vg_name):
745   """Get informations about the volume group.
746
747   @type vg_name: str
748   @param vg_name: the volume group which we query
749   @rtype: dict
750   @return:
751     A dictionary with the following keys:
752       - C{vg_size} is the total size of the volume group in MiB
753       - C{vg_free} is the free size of the volume group in MiB
754       - C{pv_count} are the number of physical disks in that VG
755
756     If an error occurs during gathering of data, we return the same dict
757     with keys all set to None.
758
759   """
760   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
761
762   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
763                          "--nosuffix", "--units=m", "--separator=:", vg_name])
764
765   if retval.failed:
766     logging.error("volume group %s not present", vg_name)
767     return retdic
768   valarr = retval.stdout.strip().rstrip(':').split(':')
769   if len(valarr) == 3:
770     try:
771       retdic = {
772         "vg_size": int(round(float(valarr[0]), 0)),
773         "vg_free": int(round(float(valarr[1]), 0)),
774         "pv_count": int(valarr[2]),
775         }
776     except ValueError, err:
777       logging.exception("Fail to parse vgs output")
778   else:
779     logging.error("vgs output has the wrong number of fields (expected"
780                   " three): %s", str(valarr))
781   return retdic
782
783
784 def _GetBlockDevSymlinkPath(instance_name, idx):
785   return os.path.join(constants.DISK_LINKS_DIR,
786                       "%s:%d" % (instance_name, idx))
787
788
789 def _SymlinkBlockDev(instance_name, device_path, idx):
790   """Set up symlinks to a instance's block device.
791
792   This is an auxiliary function run when an instance is start (on the primary
793   node) or when an instance is migrated (on the target node).
794
795
796   @param instance_name: the name of the target instance
797   @param device_path: path of the physical block device, on the node
798   @param idx: the disk index
799   @return: absolute path to the disk's symlink
800
801   """
802   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
803   try:
804     os.symlink(device_path, link_name)
805   except OSError, err:
806     if err.errno == errno.EEXIST:
807       if (not os.path.islink(link_name) or
808           os.readlink(link_name) != device_path):
809         os.remove(link_name)
810         os.symlink(device_path, link_name)
811     else:
812       raise
813
814   return link_name
815
816
817 def _RemoveBlockDevLinks(instance_name, disks):
818   """Remove the block device symlinks belonging to the given instance.
819
820   """
821   for idx, disk in enumerate(disks):
822     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
823     if os.path.islink(link_name):
824       try:
825         os.remove(link_name)
826       except OSError:
827         logging.exception("Can't remove symlink '%s'", link_name)
828
829
830 def _GatherAndLinkBlockDevs(instance):
831   """Set up an instance's block device(s).
832
833   This is run on the primary node at instance startup. The block
834   devices must be already assembled.
835
836   @type instance: L{objects.Instance}
837   @param instance: the instance whose disks we shoul assemble
838   @rtype: list
839   @return: list of (disk_object, device_path)
840
841   """
842   block_devices = []
843   for idx, disk in enumerate(instance.disks):
844     device = _RecursiveFindBD(disk)
845     if device is None:
846       raise errors.BlockDeviceError("Block device '%s' is not set up." %
847                                     str(disk))
848     device.Open()
849     try:
850       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
851     except OSError, e:
852       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
853                                     e.strerror)
854
855     block_devices.append((disk, link_name))
856
857   return block_devices
858
859
860 def StartInstance(instance, extra_args):
861   """Start an instance.
862
863   @type instance: L{objects.Instance}
864   @param instance: the instance object
865   @rtype: boolean
866   @return: whether the startup was successful or not
867
868   """
869   running_instances = GetInstanceList([instance.hypervisor])
870
871   if instance.name in running_instances:
872     return (True, "Already running")
873
874   try:
875     block_devices = _GatherAndLinkBlockDevs(instance)
876     hyper = hypervisor.GetHypervisor(instance.hypervisor)
877     hyper.StartInstance(instance, block_devices, extra_args)
878   except errors.BlockDeviceError, err:
879     logging.exception("Failed to start instance")
880     return (False, "Block device error: %s" % str(err))
881   except errors.HypervisorError, err:
882     logging.exception("Failed to start instance")
883     _RemoveBlockDevLinks(instance.name, instance.disks)
884     return (False, "Hypervisor error: %s" % str(err))
885
886   return (True, "Instance started successfully")
887
888
889 def ShutdownInstance(instance):
890   """Shut an instance down.
891
892   @note: this functions uses polling with a hardcoded timeout.
893
894   @type instance: L{objects.Instance}
895   @param instance: the instance object
896   @rtype: boolean
897   @return: whether the startup was successful or not
898
899   """
900   hv_name = instance.hypervisor
901   running_instances = GetInstanceList([hv_name])
902
903   if instance.name not in running_instances:
904     return True
905
906   hyper = hypervisor.GetHypervisor(hv_name)
907   try:
908     hyper.StopInstance(instance)
909   except errors.HypervisorError, err:
910     logging.error("Failed to stop instance: %s" % err)
911     return False
912
913   # test every 10secs for 2min
914
915   time.sleep(1)
916   for dummy in range(11):
917     if instance.name not in GetInstanceList([hv_name]):
918       break
919     time.sleep(10)
920   else:
921     # the shutdown did not succeed
922     logging.error("Shutdown of '%s' unsuccessful, using destroy",
923                   instance.name)
924
925     try:
926       hyper.StopInstance(instance, force=True)
927     except errors.HypervisorError, err:
928       logging.exception("Failed to stop instance: %s" % err)
929       return False
930
931     time.sleep(1)
932     if instance.name in GetInstanceList([hv_name]):
933       logging.error("Could not shutdown instance '%s' even by destroy",
934                     instance.name)
935       return False
936
937   _RemoveBlockDevLinks(instance.name, instance.disks)
938
939   return True
940
941
942 def InstanceReboot(instance, reboot_type, extra_args):
943   """Reboot an instance.
944
945   @type instance: L{objects.Instance}
946   @param instance: the instance object to reboot
947   @type reboot_type: str
948   @param reboot_type: the type of reboot, one the following
949     constants:
950       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
951         instance OS, do not recreate the VM
952       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
953         restart the VM (at the hypervisor level)
954       - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
955         is not accepted here, since that mode is handled
956         differently
957   @rtype: boolean
958   @return: the success of the operation
959
960   """
961   running_instances = GetInstanceList([instance.hypervisor])
962
963   if instance.name not in running_instances:
964     msg = "Cannot reboot instance %s that is not running" % instance.name
965     logging.error(msg)
966     return (False, msg)
967
968   hyper = hypervisor.GetHypervisor(instance.hypervisor)
969   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
970     try:
971       hyper.RebootInstance(instance)
972     except errors.HypervisorError, err:
973       msg = "Failed to soft reboot instance %s: %s" % (instance.name, err)
974       logging.error(msg)
975       return (False, msg)
976   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
977     try:
978       ShutdownInstance(instance)
979       StartInstance(instance, extra_args)
980     except errors.HypervisorError, err:
981       msg = "Failed to hard reboot instance %s: %s" % (instance.name, err)
982       logging.error(msg)
983       return (False, msg)
984   else:
985     return (False, "Invalid reboot_type received: %s" % (reboot_type,))
986
987   return (True, "Reboot successful")
988
989
990 def MigrationInfo(instance):
991   """Gather information about an instance to be migrated.
992
993   @type instance: L{objects.Instance}
994   @param instance: the instance definition
995
996   """
997   hyper = hypervisor.GetHypervisor(instance.hypervisor)
998   try:
999     info = hyper.MigrationInfo(instance)
1000   except errors.HypervisorError, err:
1001     msg = "Failed to fetch migration information"
1002     logging.exception(msg)
1003     return (False, '%s: %s' % (msg, err))
1004   return (True, info)
1005
1006
1007 def AcceptInstance(instance, info, target):
1008   """Prepare the node to accept an instance.
1009
1010   @type instance: L{objects.Instance}
1011   @param instance: the instance definition
1012   @type info: string/data (opaque)
1013   @param info: migration information, from the source node
1014   @type target: string
1015   @param target: target host (usually ip), on this node
1016
1017   """
1018   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1019   try:
1020     hyper.AcceptInstance(instance, info, target)
1021   except errors.HypervisorError, err:
1022     msg = "Failed to accept instance"
1023     logging.exception(msg)
1024     return (False, '%s: %s' % (msg, err))
1025   return (True, "Accept successfull")
1026
1027
1028 def FinalizeMigration(instance, info, success):
1029   """Finalize any preparation to accept an instance.
1030
1031   @type instance: L{objects.Instance}
1032   @param instance: the instance definition
1033   @type info: string/data (opaque)
1034   @param info: migration information, from the source node
1035   @type success: boolean
1036   @param success: whether the migration was a success or a failure
1037
1038   """
1039   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1040   try:
1041     hyper.FinalizeMigration(instance, info, success)
1042   except errors.HypervisorError, err:
1043     msg = "Failed to finalize migration"
1044     logging.exception(msg)
1045     return (False, '%s: %s' % (msg, err))
1046   return (True, "Migration Finalized")
1047
1048
1049 def MigrateInstance(instance, target, live):
1050   """Migrates an instance to another node.
1051
1052   @type instance: L{objects.Instance}
1053   @param instance: the instance definition
1054   @type target: string
1055   @param target: the target node name
1056   @type live: boolean
1057   @param live: whether the migration should be done live or not (the
1058       interpretation of this parameter is left to the hypervisor)
1059   @rtype: tuple
1060   @return: a tuple of (success, msg) where:
1061       - succes is a boolean denoting the success/failure of the operation
1062       - msg is a string with details in case of failure
1063
1064   """
1065   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1066
1067   try:
1068     hyper.MigrateInstance(instance.name, target, live)
1069   except errors.HypervisorError, err:
1070     msg = "Failed to migrate instance"
1071     logging.exception(msg)
1072     return (False, "%s: %s" % (msg, err))
1073   return (True, "Migration successfull")
1074
1075
1076 def BlockdevCreate(disk, size, owner, on_primary, info):
1077   """Creates a block device for an instance.
1078
1079   @type disk: L{objects.Disk}
1080   @param disk: the object describing the disk we should create
1081   @type size: int
1082   @param size: the size of the physical underlying device, in MiB
1083   @type owner: str
1084   @param owner: the name of the instance for which disk is created,
1085       used for device cache data
1086   @type on_primary: boolean
1087   @param on_primary:  indicates if it is the primary node or not
1088   @type info: string
1089   @param info: string that will be sent to the physical device
1090       creation, used for example to set (LVM) tags on LVs
1091
1092   @return: the new unique_id of the device (this can sometime be
1093       computed only after creation), or None. On secondary nodes,
1094       it's not required to return anything.
1095
1096   """
1097   clist = []
1098   if disk.children:
1099     for child in disk.children:
1100       try:
1101         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1102       except errors.BlockDeviceError, err:
1103         errmsg = "Can't assemble device %s: %s" % (child, err)
1104         logging.error(errmsg)
1105         return False, errmsg
1106       if on_primary or disk.AssembleOnSecondary():
1107         # we need the children open in case the device itself has to
1108         # be assembled
1109         try:
1110           crdev.Open()
1111         except errors.BlockDeviceError, err:
1112           errmsg = "Can't make child '%s' read-write: %s" % (child, err)
1113           logging.error(errmsg)
1114           return False, errmsg
1115       clist.append(crdev)
1116
1117   try:
1118     device = bdev.Create(disk.dev_type, disk.physical_id, clist, size)
1119   except errors.BlockDeviceError, err:
1120     return False, "Can't create block device: %s" % str(err)
1121
1122   if on_primary or disk.AssembleOnSecondary():
1123     try:
1124       device.Assemble()
1125     except errors.BlockDeviceError, err:
1126       errmsg = ("Can't assemble device after creation, very"
1127                 " unusual event: %s" % str(err))
1128       logging.error(errmsg)
1129       return False, errmsg
1130     device.SetSyncSpeed(constants.SYNC_SPEED)
1131     if on_primary or disk.OpenOnSecondary():
1132       try:
1133         device.Open(force=True)
1134       except errors.BlockDeviceError, err:
1135         errmsg = ("Can't make device r/w after creation, very"
1136                   " unusual event: %s" % str(err))
1137         logging.error(errmsg)
1138         return False, errmsg
1139     DevCacheManager.UpdateCache(device.dev_path, owner,
1140                                 on_primary, disk.iv_name)
1141
1142   device.SetInfo(info)
1143
1144   physical_id = device.unique_id
1145   return True, physical_id
1146
1147
1148 def BlockdevRemove(disk):
1149   """Remove a block device.
1150
1151   @note: This is intended to be called recursively.
1152
1153   @type disk: L{objects.Disk}
1154   @param disk: the disk object we should remove
1155   @rtype: boolean
1156   @return: the success of the operation
1157
1158   """
1159   msgs = []
1160   result = True
1161   try:
1162     rdev = _RecursiveFindBD(disk)
1163   except errors.BlockDeviceError, err:
1164     # probably can't attach
1165     logging.info("Can't attach to device %s in remove", disk)
1166     rdev = None
1167   if rdev is not None:
1168     r_path = rdev.dev_path
1169     try:
1170       rdev.Remove()
1171     except errors.BlockDeviceError, err:
1172       msgs.append(str(err))
1173       result = False
1174     if result:
1175       DevCacheManager.RemoveCache(r_path)
1176
1177   if disk.children:
1178     for child in disk.children:
1179       c_status, c_msg = BlockdevRemove(child)
1180       result = result and c_status
1181       if c_msg: # not an empty message
1182         msgs.append(c_msg)
1183
1184   return (result, "; ".join(msgs))
1185
1186
1187 def _RecursiveAssembleBD(disk, owner, as_primary):
1188   """Activate a block device for an instance.
1189
1190   This is run on the primary and secondary nodes for an instance.
1191
1192   @note: this function is called recursively.
1193
1194   @type disk: L{objects.Disk}
1195   @param disk: the disk we try to assemble
1196   @type owner: str
1197   @param owner: the name of the instance which owns the disk
1198   @type as_primary: boolean
1199   @param as_primary: if we should make the block device
1200       read/write
1201
1202   @return: the assembled device or None (in case no device
1203       was assembled)
1204   @raise errors.BlockDeviceError: in case there is an error
1205       during the activation of the children or the device
1206       itself
1207
1208   """
1209   children = []
1210   if disk.children:
1211     mcn = disk.ChildrenNeeded()
1212     if mcn == -1:
1213       mcn = 0 # max number of Nones allowed
1214     else:
1215       mcn = len(disk.children) - mcn # max number of Nones
1216     for chld_disk in disk.children:
1217       try:
1218         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1219       except errors.BlockDeviceError, err:
1220         if children.count(None) >= mcn:
1221           raise
1222         cdev = None
1223         logging.error("Error in child activation (but continuing): %s",
1224                       str(err))
1225       children.append(cdev)
1226
1227   if as_primary or disk.AssembleOnSecondary():
1228     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children)
1229     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1230     result = r_dev
1231     if as_primary or disk.OpenOnSecondary():
1232       r_dev.Open()
1233     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1234                                 as_primary, disk.iv_name)
1235
1236   else:
1237     result = True
1238   return result
1239
1240
1241 def BlockdevAssemble(disk, owner, as_primary):
1242   """Activate a block device for an instance.
1243
1244   This is a wrapper over _RecursiveAssembleBD.
1245
1246   @rtype: str or boolean
1247   @return: a C{/dev/...} path for primary nodes, and
1248       C{True} for secondary nodes
1249
1250   """
1251   status = True
1252   result = "no error information"
1253   try:
1254     result = _RecursiveAssembleBD(disk, owner, as_primary)
1255     if isinstance(result, bdev.BlockDev):
1256       result = result.dev_path
1257   except errors.BlockDeviceError, err:
1258     result = "Error while assembling disk: %s" % str(err)
1259     status = False
1260   return (status, result)
1261
1262
1263 def BlockdevShutdown(disk):
1264   """Shut down a block device.
1265
1266   First, if the device is assembled (Attach() is successfull), then
1267   the device is shutdown. Then the children of the device are
1268   shutdown.
1269
1270   This function is called recursively. Note that we don't cache the
1271   children or such, as oppossed to assemble, shutdown of different
1272   devices doesn't require that the upper device was active.
1273
1274   @type disk: L{objects.Disk}
1275   @param disk: the description of the disk we should
1276       shutdown
1277   @rtype: boolean
1278   @return: the success of the operation
1279
1280   """
1281   msgs = []
1282   result = True
1283   r_dev = _RecursiveFindBD(disk)
1284   if r_dev is not None:
1285     r_path = r_dev.dev_path
1286     try:
1287       r_dev.Shutdown()
1288       DevCacheManager.RemoveCache(r_path)
1289     except errors.BlockDeviceError, err:
1290       msgs.append(str(err))
1291       result = False
1292
1293   if disk.children:
1294     for child in disk.children:
1295       c_status, c_msg = BlockdevShutdown(child)
1296       result = result and c_status
1297       if c_msg: # not an empty message
1298         msgs.append(c_msg)
1299
1300   return (result, "; ".join(msgs))
1301
1302
1303 def BlockdevAddchildren(parent_cdev, new_cdevs):
1304   """Extend a mirrored block device.
1305
1306   @type parent_cdev: L{objects.Disk}
1307   @param parent_cdev: the disk to which we should add children
1308   @type new_cdevs: list of L{objects.Disk}
1309   @param new_cdevs: the list of children which we should add
1310   @rtype: boolean
1311   @return: the success of the operation
1312
1313   """
1314   parent_bdev = _RecursiveFindBD(parent_cdev)
1315   if parent_bdev is None:
1316     logging.error("Can't find parent device")
1317     return False
1318   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1319   if new_bdevs.count(None) > 0:
1320     logging.error("Can't find new device(s) to add: %s:%s",
1321                   new_bdevs, new_cdevs)
1322     return False
1323   parent_bdev.AddChildren(new_bdevs)
1324   return True
1325
1326
1327 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1328   """Shrink a mirrored block device.
1329
1330   @type parent_cdev: L{objects.Disk}
1331   @param parent_cdev: the disk from which we should remove children
1332   @type new_cdevs: list of L{objects.Disk}
1333   @param new_cdevs: the list of children which we should remove
1334   @rtype: boolean
1335   @return: the success of the operation
1336
1337   """
1338   parent_bdev = _RecursiveFindBD(parent_cdev)
1339   if parent_bdev is None:
1340     logging.error("Can't find parent in remove children: %s", parent_cdev)
1341     return False
1342   devs = []
1343   for disk in new_cdevs:
1344     rpath = disk.StaticDevPath()
1345     if rpath is None:
1346       bd = _RecursiveFindBD(disk)
1347       if bd is None:
1348         logging.error("Can't find dynamic device %s while removing children",
1349                       disk)
1350         return False
1351       else:
1352         devs.append(bd.dev_path)
1353     else:
1354       devs.append(rpath)
1355   parent_bdev.RemoveChildren(devs)
1356   return True
1357
1358
1359 def BlockdevGetmirrorstatus(disks):
1360   """Get the mirroring status of a list of devices.
1361
1362   @type disks: list of L{objects.Disk}
1363   @param disks: the list of disks which we should query
1364   @rtype: disk
1365   @return:
1366       a list of (mirror_done, estimated_time) tuples, which
1367       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1368   @raise errors.BlockDeviceError: if any of the disks cannot be
1369       found
1370
1371   """
1372   stats = []
1373   for dsk in disks:
1374     rbd = _RecursiveFindBD(dsk)
1375     if rbd is None:
1376       raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1377     stats.append(rbd.CombinedSyncStatus())
1378   return stats
1379
1380
1381 def _RecursiveFindBD(disk):
1382   """Check if a device is activated.
1383
1384   If so, return informations about the real device.
1385
1386   @type disk: L{objects.Disk}
1387   @param disk: the disk object we need to find
1388
1389   @return: None if the device can't be found,
1390       otherwise the device instance
1391
1392   """
1393   children = []
1394   if disk.children:
1395     for chdisk in disk.children:
1396       children.append(_RecursiveFindBD(chdisk))
1397
1398   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1399
1400
1401 def BlockdevFind(disk):
1402   """Check if a device is activated.
1403
1404   If it is, return informations about the real device.
1405
1406   @type disk: L{objects.Disk}
1407   @param disk: the disk to find
1408   @rtype: None or tuple
1409   @return: None if the disk cannot be found, otherwise a
1410       tuple (device_path, major, minor, sync_percent,
1411       estimated_time, is_degraded)
1412
1413   """
1414   try:
1415     rbd = _RecursiveFindBD(disk)
1416   except errors.BlockDeviceError, err:
1417     return (False, str(err))
1418   if rbd is None:
1419     return (True, None)
1420   return (True, (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus())
1421
1422
1423 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1424   """Write a file to the filesystem.
1425
1426   This allows the master to overwrite(!) a file. It will only perform
1427   the operation if the file belongs to a list of configuration files.
1428
1429   @type file_name: str
1430   @param file_name: the target file name
1431   @type data: str
1432   @param data: the new contents of the file
1433   @type mode: int
1434   @param mode: the mode to give the file (can be None)
1435   @type uid: int
1436   @param uid: the owner of the file (can be -1 for default)
1437   @type gid: int
1438   @param gid: the group of the file (can be -1 for default)
1439   @type atime: float
1440   @param atime: the atime to set on the file (can be None)
1441   @type mtime: float
1442   @param mtime: the mtime to set on the file (can be None)
1443   @rtype: boolean
1444   @return: the success of the operation; errors are logged
1445       in the node daemon log
1446
1447   """
1448   if not os.path.isabs(file_name):
1449     logging.error("Filename passed to UploadFile is not absolute: '%s'",
1450                   file_name)
1451     return False
1452
1453   allowed_files = [
1454     constants.CLUSTER_CONF_FILE,
1455     constants.ETC_HOSTS,
1456     constants.SSH_KNOWN_HOSTS_FILE,
1457     constants.VNC_PASSWORD_FILE,
1458     ]
1459
1460   if file_name not in allowed_files:
1461     logging.error("Filename passed to UploadFile not in allowed"
1462                  " upload targets: '%s'", file_name)
1463     return False
1464
1465   raw_data = _Decompress(data)
1466
1467   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1468                   atime=atime, mtime=mtime)
1469   return True
1470
1471
1472 def WriteSsconfFiles(values):
1473   """Update all ssconf files.
1474
1475   Wrapper around the SimpleStore.WriteFiles.
1476
1477   """
1478   ssconf.SimpleStore().WriteFiles(values)
1479
1480
1481 def _ErrnoOrStr(err):
1482   """Format an EnvironmentError exception.
1483
1484   If the L{err} argument has an errno attribute, it will be looked up
1485   and converted into a textual C{E...} description. Otherwise the
1486   string representation of the error will be returned.
1487
1488   @type err: L{EnvironmentError}
1489   @param err: the exception to format
1490
1491   """
1492   if hasattr(err, 'errno'):
1493     detail = errno.errorcode[err.errno]
1494   else:
1495     detail = str(err)
1496   return detail
1497
1498
1499 def _OSOndiskVersion(name, os_dir):
1500   """Compute and return the API version of a given OS.
1501
1502   This function will try to read the API version of the OS given by
1503   the 'name' parameter and residing in the 'os_dir' directory.
1504
1505   @type name: str
1506   @param name: the OS name we should look for
1507   @type os_dir: str
1508   @param os_dir: the directory inwhich we should look for the OS
1509   @rtype: int or None
1510   @return:
1511       Either an integer denoting the version or None in the
1512       case when this is not a valid OS name.
1513   @raise errors.InvalidOS: if the OS cannot be found
1514
1515   """
1516   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1517
1518   try:
1519     st = os.stat(api_file)
1520   except EnvironmentError, err:
1521     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1522                            " found (%s)" % _ErrnoOrStr(err))
1523
1524   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1525     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1526                            " a regular file")
1527
1528   try:
1529     f = open(api_file)
1530     try:
1531       api_versions = f.readlines()
1532     finally:
1533       f.close()
1534   except EnvironmentError, err:
1535     raise errors.InvalidOS(name, os_dir, "error while reading the"
1536                            " API version (%s)" % _ErrnoOrStr(err))
1537
1538   api_versions = [version.strip() for version in api_versions]
1539   try:
1540     api_versions = [int(version) for version in api_versions]
1541   except (TypeError, ValueError), err:
1542     raise errors.InvalidOS(name, os_dir,
1543                            "API version is not integer (%s)" % str(err))
1544
1545   return api_versions
1546
1547
1548 def DiagnoseOS(top_dirs=None):
1549   """Compute the validity for all OSes.
1550
1551   @type top_dirs: list
1552   @param top_dirs: the list of directories in which to
1553       search (if not given defaults to
1554       L{constants.OS_SEARCH_PATH})
1555   @rtype: list of L{objects.OS}
1556   @return: an OS object for each name in all the given
1557       directories
1558
1559   """
1560   if top_dirs is None:
1561     top_dirs = constants.OS_SEARCH_PATH
1562
1563   result = []
1564   for dir_name in top_dirs:
1565     if os.path.isdir(dir_name):
1566       try:
1567         f_names = utils.ListVisibleFiles(dir_name)
1568       except EnvironmentError, err:
1569         logging.exception("Can't list the OS directory %s", dir_name)
1570         break
1571       for name in f_names:
1572         try:
1573           os_inst = OSFromDisk(name, base_dir=dir_name)
1574           result.append(os_inst)
1575         except errors.InvalidOS, err:
1576           result.append(objects.OS.FromInvalidOS(err))
1577
1578   return result
1579
1580
1581 def OSFromDisk(name, base_dir=None):
1582   """Create an OS instance from disk.
1583
1584   This function will return an OS instance if the given name is a
1585   valid OS name. Otherwise, it will raise an appropriate
1586   L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1587
1588   @type base_dir: string
1589   @keyword base_dir: Base directory containing OS installations.
1590                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1591   @rtype: L{objects.OS}
1592   @return: the OS instance if we find a valid one
1593   @raise errors.InvalidOS: if we don't find a valid OS
1594
1595   """
1596   if base_dir is None:
1597     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1598     if os_dir is None:
1599       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1600   else:
1601     os_dir = os.path.sep.join([base_dir, name])
1602
1603   api_versions = _OSOndiskVersion(name, os_dir)
1604
1605   if constants.OS_API_VERSION not in api_versions:
1606     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1607                            " (found %s want %s)"
1608                            % (api_versions, constants.OS_API_VERSION))
1609
1610   # OS Scripts dictionary, we will populate it with the actual script names
1611   os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1612
1613   for script in os_scripts:
1614     os_scripts[script] = os.path.sep.join([os_dir, script])
1615
1616     try:
1617       st = os.stat(os_scripts[script])
1618     except EnvironmentError, err:
1619       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1620                              (script, _ErrnoOrStr(err)))
1621
1622     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1623       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1624                              script)
1625
1626     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1627       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1628                              script)
1629
1630
1631   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1632                     create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1633                     export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1634                     import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1635                     rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1636                     api_versions=api_versions)
1637
1638 def OSEnvironment(instance, debug=0):
1639   """Calculate the environment for an os script.
1640
1641   @type instance: L{objects.Instance}
1642   @param instance: target instance for the os script run
1643   @type debug: integer
1644   @param debug: debug level (0 or 1, for OS Api 10)
1645   @rtype: dict
1646   @return: dict of environment variables
1647   @raise errors.BlockDeviceError: if the block device
1648       cannot be found
1649
1650   """
1651   result = {}
1652   result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1653   result['INSTANCE_NAME'] = instance.name
1654   result['INSTANCE_OS'] = instance.os
1655   result['HYPERVISOR'] = instance.hypervisor
1656   result['DISK_COUNT'] = '%d' % len(instance.disks)
1657   result['NIC_COUNT'] = '%d' % len(instance.nics)
1658   result['DEBUG_LEVEL'] = '%d' % debug
1659   for idx, disk in enumerate(instance.disks):
1660     real_disk = _RecursiveFindBD(disk)
1661     if real_disk is None:
1662       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1663                                     str(disk))
1664     real_disk.Open()
1665     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1666     # FIXME: When disks will have read-only mode, populate this
1667     result['DISK_%d_ACCESS' % idx] = disk.mode
1668     if constants.HV_DISK_TYPE in instance.hvparams:
1669       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1670         instance.hvparams[constants.HV_DISK_TYPE]
1671     if disk.dev_type in constants.LDS_BLOCK:
1672       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1673     elif disk.dev_type == constants.LD_FILE:
1674       result['DISK_%d_BACKEND_TYPE' % idx] = \
1675         'file:%s' % disk.physical_id[0]
1676   for idx, nic in enumerate(instance.nics):
1677     result['NIC_%d_MAC' % idx] = nic.mac
1678     if nic.ip:
1679       result['NIC_%d_IP' % idx] = nic.ip
1680     result['NIC_%d_BRIDGE' % idx] = nic.bridge
1681     if constants.HV_NIC_TYPE in instance.hvparams:
1682       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1683         instance.hvparams[constants.HV_NIC_TYPE]
1684
1685   return result
1686
1687 def BlockdevGrow(disk, amount):
1688   """Grow a stack of block devices.
1689
1690   This function is called recursively, with the childrens being the
1691   first ones to resize.
1692
1693   @type disk: L{objects.Disk}
1694   @param disk: the disk to be grown
1695   @rtype: (status, result)
1696   @return: a tuple with the status of the operation
1697       (True/False), and the errors message if status
1698       is False
1699
1700   """
1701   r_dev = _RecursiveFindBD(disk)
1702   if r_dev is None:
1703     return False, "Cannot find block device %s" % (disk,)
1704
1705   try:
1706     r_dev.Grow(amount)
1707   except errors.BlockDeviceError, err:
1708     return False, str(err)
1709
1710   return True, None
1711
1712
1713 def BlockdevSnapshot(disk):
1714   """Create a snapshot copy of a block device.
1715
1716   This function is called recursively, and the snapshot is actually created
1717   just for the leaf lvm backend device.
1718
1719   @type disk: L{objects.Disk}
1720   @param disk: the disk to be snapshotted
1721   @rtype: string
1722   @return: snapshot disk path
1723
1724   """
1725   if disk.children:
1726     if len(disk.children) == 1:
1727       # only one child, let's recurse on it
1728       return BlockdevSnapshot(disk.children[0])
1729     else:
1730       # more than one child, choose one that matches
1731       for child in disk.children:
1732         if child.size == disk.size:
1733           # return implies breaking the loop
1734           return BlockdevSnapshot(child)
1735   elif disk.dev_type == constants.LD_LV:
1736     r_dev = _RecursiveFindBD(disk)
1737     if r_dev is not None:
1738       # let's stay on the safe side and ask for the full size, for now
1739       return r_dev.Snapshot(disk.size)
1740     else:
1741       return None
1742   else:
1743     raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1744                                  " '%s' of type '%s'" %
1745                                  (disk.unique_id, disk.dev_type))
1746
1747
1748 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1749   """Export a block device snapshot to a remote node.
1750
1751   @type disk: L{objects.Disk}
1752   @param disk: the description of the disk to export
1753   @type dest_node: str
1754   @param dest_node: the destination node to export to
1755   @type instance: L{objects.Instance}
1756   @param instance: the instance object to whom the disk belongs
1757   @type cluster_name: str
1758   @param cluster_name: the cluster name, needed for SSH hostalias
1759   @type idx: int
1760   @param idx: the index of the disk in the instance's disk list,
1761       used to export to the OS scripts environment
1762   @rtype: boolean
1763   @return: the success of the operation
1764
1765   """
1766   export_env = OSEnvironment(instance)
1767
1768   inst_os = OSFromDisk(instance.os)
1769   export_script = inst_os.export_script
1770
1771   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1772                                      instance.name, int(time.time()))
1773   if not os.path.exists(constants.LOG_OS_DIR):
1774     os.mkdir(constants.LOG_OS_DIR, 0750)
1775   real_disk = _RecursiveFindBD(disk)
1776   if real_disk is None:
1777     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1778                                   str(disk))
1779   real_disk.Open()
1780
1781   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1782   export_env['EXPORT_INDEX'] = str(idx)
1783
1784   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1785   destfile = disk.physical_id[1]
1786
1787   # the target command is built out of three individual commands,
1788   # which are joined by pipes; we check each individual command for
1789   # valid parameters
1790   expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1791                                export_script, logfile)
1792
1793   comprcmd = "gzip"
1794
1795   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1796                                 destdir, destdir, destfile)
1797   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1798                                                    constants.GANETI_RUNAS,
1799                                                    destcmd)
1800
1801   # all commands have been checked, so we're safe to combine them
1802   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1803
1804   result = utils.RunCmd(command, env=export_env)
1805
1806   if result.failed:
1807     logging.error("os snapshot export command '%s' returned error: %s"
1808                   " output: %s", command, result.fail_reason, result.output)
1809     return False
1810
1811   return True
1812
1813
1814 def FinalizeExport(instance, snap_disks):
1815   """Write out the export configuration information.
1816
1817   @type instance: L{objects.Instance}
1818   @param instance: the instance which we export, used for
1819       saving configuration
1820   @type snap_disks: list of L{objects.Disk}
1821   @param snap_disks: list of snapshot block devices, which
1822       will be used to get the actual name of the dump file
1823
1824   @rtype: boolean
1825   @return: the success of the operation
1826
1827   """
1828   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1829   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1830
1831   config = objects.SerializableConfigParser()
1832
1833   config.add_section(constants.INISECT_EXP)
1834   config.set(constants.INISECT_EXP, 'version', '0')
1835   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1836   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1837   config.set(constants.INISECT_EXP, 'os', instance.os)
1838   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1839
1840   config.add_section(constants.INISECT_INS)
1841   config.set(constants.INISECT_INS, 'name', instance.name)
1842   config.set(constants.INISECT_INS, 'memory', '%d' %
1843              instance.beparams[constants.BE_MEMORY])
1844   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1845              instance.beparams[constants.BE_VCPUS])
1846   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1847
1848   nic_total = 0
1849   for nic_count, nic in enumerate(instance.nics):
1850     nic_total += 1
1851     config.set(constants.INISECT_INS, 'nic%d_mac' %
1852                nic_count, '%s' % nic.mac)
1853     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1854     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1855                '%s' % nic.bridge)
1856   # TODO: redundant: on load can read nics until it doesn't exist
1857   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1858
1859   disk_total = 0
1860   for disk_count, disk in enumerate(snap_disks):
1861     if disk:
1862       disk_total += 1
1863       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1864                  ('%s' % disk.iv_name))
1865       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1866                  ('%s' % disk.physical_id[1]))
1867       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1868                  ('%d' % disk.size))
1869
1870   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1871
1872   utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1873                   data=config.Dumps())
1874   shutil.rmtree(finaldestdir, True)
1875   shutil.move(destdir, finaldestdir)
1876
1877   return True
1878
1879
1880 def ExportInfo(dest):
1881   """Get export configuration information.
1882
1883   @type dest: str
1884   @param dest: directory containing the export
1885
1886   @rtype: L{objects.SerializableConfigParser}
1887   @return: a serializable config file containing the
1888       export info
1889
1890   """
1891   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1892
1893   config = objects.SerializableConfigParser()
1894   config.read(cff)
1895
1896   if (not config.has_section(constants.INISECT_EXP) or
1897       not config.has_section(constants.INISECT_INS)):
1898     return None
1899
1900   return config
1901
1902
1903 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1904   """Import an os image into an instance.
1905
1906   @type instance: L{objects.Instance}
1907   @param instance: instance to import the disks into
1908   @type src_node: string
1909   @param src_node: source node for the disk images
1910   @type src_images: list of string
1911   @param src_images: absolute paths of the disk images
1912   @rtype: list of boolean
1913   @return: each boolean represent the success of importing the n-th disk
1914
1915   """
1916   import_env = OSEnvironment(instance)
1917   inst_os = OSFromDisk(instance.os)
1918   import_script = inst_os.import_script
1919
1920   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1921                                         instance.name, int(time.time()))
1922   if not os.path.exists(constants.LOG_OS_DIR):
1923     os.mkdir(constants.LOG_OS_DIR, 0750)
1924
1925   comprcmd = "gunzip"
1926   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1927                                import_script, logfile)
1928
1929   final_result = []
1930   for idx, image in enumerate(src_images):
1931     if image:
1932       destcmd = utils.BuildShellCmd('cat %s', image)
1933       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1934                                                        constants.GANETI_RUNAS,
1935                                                        destcmd)
1936       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1937       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1938       import_env['IMPORT_INDEX'] = str(idx)
1939       result = utils.RunCmd(command, env=import_env)
1940       if result.failed:
1941         logging.error("Disk import command '%s' returned error: %s"
1942                       " output: %s", command, result.fail_reason,
1943                       result.output)
1944         final_result.append(False)
1945       else:
1946         final_result.append(True)
1947     else:
1948       final_result.append(True)
1949
1950   return final_result
1951
1952
1953 def ListExports():
1954   """Return a list of exports currently available on this machine.
1955
1956   @rtype: list
1957   @return: list of the exports
1958
1959   """
1960   if os.path.isdir(constants.EXPORT_DIR):
1961     return utils.ListVisibleFiles(constants.EXPORT_DIR)
1962   else:
1963     return []
1964
1965
1966 def RemoveExport(export):
1967   """Remove an existing export from the node.
1968
1969   @type export: str
1970   @param export: the name of the export to remove
1971   @rtype: boolean
1972   @return: the success of the operation
1973
1974   """
1975   target = os.path.join(constants.EXPORT_DIR, export)
1976
1977   shutil.rmtree(target)
1978   # TODO: catch some of the relevant exceptions and provide a pretty
1979   # error message if rmtree fails.
1980
1981   return True
1982
1983
1984 def BlockdevRename(devlist):
1985   """Rename a list of block devices.
1986
1987   @type devlist: list of tuples
1988   @param devlist: list of tuples of the form  (disk,
1989       new_logical_id, new_physical_id); disk is an
1990       L{objects.Disk} object describing the current disk,
1991       and new logical_id/physical_id is the name we
1992       rename it to
1993   @rtype: boolean
1994   @return: True if all renames succeeded, False otherwise
1995
1996   """
1997   result = True
1998   for disk, unique_id in devlist:
1999     dev = _RecursiveFindBD(disk)
2000     if dev is None:
2001       result = False
2002       continue
2003     try:
2004       old_rpath = dev.dev_path
2005       dev.Rename(unique_id)
2006       new_rpath = dev.dev_path
2007       if old_rpath != new_rpath:
2008         DevCacheManager.RemoveCache(old_rpath)
2009         # FIXME: we should add the new cache information here, like:
2010         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2011         # but we don't have the owner here - maybe parse from existing
2012         # cache? for now, we only lose lvm data when we rename, which
2013         # is less critical than DRBD or MD
2014     except errors.BlockDeviceError, err:
2015       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2016       result = False
2017   return result
2018
2019
2020 def _TransformFileStorageDir(file_storage_dir):
2021   """Checks whether given file_storage_dir is valid.
2022
2023   Checks wheter the given file_storage_dir is within the cluster-wide
2024   default file_storage_dir stored in SimpleStore. Only paths under that
2025   directory are allowed.
2026
2027   @type file_storage_dir: str
2028   @param file_storage_dir: the path to check
2029
2030   @return: the normalized path if valid, None otherwise
2031
2032   """
2033   cfg = _GetConfig()
2034   file_storage_dir = os.path.normpath(file_storage_dir)
2035   base_file_storage_dir = cfg.GetFileStorageDir()
2036   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2037       base_file_storage_dir):
2038     logging.error("file storage directory '%s' is not under base file"
2039                   " storage directory '%s'",
2040                   file_storage_dir, base_file_storage_dir)
2041     return None
2042   return file_storage_dir
2043
2044
2045 def CreateFileStorageDir(file_storage_dir):
2046   """Create file storage directory.
2047
2048   @type file_storage_dir: str
2049   @param file_storage_dir: directory to create
2050
2051   @rtype: tuple
2052   @return: tuple with first element a boolean indicating wheter dir
2053       creation was successful or not
2054
2055   """
2056   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2057   result = True,
2058   if not file_storage_dir:
2059     result = False,
2060   else:
2061     if os.path.exists(file_storage_dir):
2062       if not os.path.isdir(file_storage_dir):
2063         logging.error("'%s' is not a directory", file_storage_dir)
2064         result = False,
2065     else:
2066       try:
2067         os.makedirs(file_storage_dir, 0750)
2068       except OSError, err:
2069         logging.error("Cannot create file storage directory '%s': %s",
2070                       file_storage_dir, err)
2071         result = False,
2072   return result
2073
2074
2075 def RemoveFileStorageDir(file_storage_dir):
2076   """Remove file storage directory.
2077
2078   Remove it only if it's empty. If not log an error and return.
2079
2080   @type file_storage_dir: str
2081   @param file_storage_dir: the directory we should cleanup
2082   @rtype: tuple (success,)
2083   @return: tuple of one element, C{success}, denoting
2084       whether the operation was successfull
2085
2086   """
2087   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2088   result = True,
2089   if not file_storage_dir:
2090     result = False,
2091   else:
2092     if os.path.exists(file_storage_dir):
2093       if not os.path.isdir(file_storage_dir):
2094         logging.error("'%s' is not a directory", file_storage_dir)
2095         result = False,
2096       # deletes dir only if empty, otherwise we want to return False
2097       try:
2098         os.rmdir(file_storage_dir)
2099       except OSError, err:
2100         logging.exception("Cannot remove file storage directory '%s'",
2101                           file_storage_dir)
2102         result = False,
2103   return result
2104
2105
2106 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2107   """Rename the file storage directory.
2108
2109   @type old_file_storage_dir: str
2110   @param old_file_storage_dir: the current path
2111   @type new_file_storage_dir: str
2112   @param new_file_storage_dir: the name we should rename to
2113   @rtype: tuple (success,)
2114   @return: tuple of one element, C{success}, denoting
2115       whether the operation was successful
2116
2117   """
2118   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2119   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2120   result = True,
2121   if not old_file_storage_dir or not new_file_storage_dir:
2122     result = False,
2123   else:
2124     if not os.path.exists(new_file_storage_dir):
2125       if os.path.isdir(old_file_storage_dir):
2126         try:
2127           os.rename(old_file_storage_dir, new_file_storage_dir)
2128         except OSError, err:
2129           logging.exception("Cannot rename '%s' to '%s'",
2130                             old_file_storage_dir, new_file_storage_dir)
2131           result =  False,
2132       else:
2133         logging.error("'%s' is not a directory", old_file_storage_dir)
2134         result = False,
2135     else:
2136       if os.path.exists(old_file_storage_dir):
2137         logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
2138                       old_file_storage_dir, new_file_storage_dir)
2139         result = False,
2140   return result
2141
2142
2143 def _IsJobQueueFile(file_name):
2144   """Checks whether the given filename is in the queue directory.
2145
2146   @type file_name: str
2147   @param file_name: the file name we should check
2148   @rtype: boolean
2149   @return: whether the file is under the queue directory
2150
2151   """
2152   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2153   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2154
2155   if not result:
2156     logging.error("'%s' is not a file in the queue directory",
2157                   file_name)
2158
2159   return result
2160
2161
2162 def JobQueueUpdate(file_name, content):
2163   """Updates a file in the queue directory.
2164
2165   This is just a wrapper over L{utils.WriteFile}, with proper
2166   checking.
2167
2168   @type file_name: str
2169   @param file_name: the job file name
2170   @type content: str
2171   @param content: the new job contents
2172   @rtype: boolean
2173   @return: the success of the operation
2174
2175   """
2176   if not _IsJobQueueFile(file_name):
2177     return False
2178
2179   # Write and replace the file atomically
2180   utils.WriteFile(file_name, data=_Decompress(content))
2181
2182   return True
2183
2184
2185 def JobQueueRename(old, new):
2186   """Renames a job queue file.
2187
2188   This is just a wrapper over os.rename with proper checking.
2189
2190   @type old: str
2191   @param old: the old (actual) file name
2192   @type new: str
2193   @param new: the desired file name
2194   @rtype: boolean
2195   @return: the success of the operation
2196
2197   """
2198   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2199     return False
2200
2201   utils.RenameFile(old, new, mkdir=True)
2202
2203   return True
2204
2205
2206 def JobQueueSetDrainFlag(drain_flag):
2207   """Set the drain flag for the queue.
2208
2209   This will set or unset the queue drain flag.
2210
2211   @type drain_flag: boolean
2212   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2213   @rtype: boolean
2214   @return: always True
2215   @warning: the function always returns True
2216
2217   """
2218   if drain_flag:
2219     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2220   else:
2221     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2222
2223   return True
2224
2225
2226 def BlockdevClose(instance_name, disks):
2227   """Closes the given block devices.
2228
2229   This means they will be switched to secondary mode (in case of
2230   DRBD).
2231
2232   @param instance_name: if the argument is not empty, the symlinks
2233       of this instance will be removed
2234   @type disks: list of L{objects.Disk}
2235   @param disks: the list of disks to be closed
2236   @rtype: tuple (success, message)
2237   @return: a tuple of success and message, where success
2238       indicates the succes of the operation, and message
2239       which will contain the error details in case we
2240       failed
2241
2242   """
2243   bdevs = []
2244   for cf in disks:
2245     rd = _RecursiveFindBD(cf)
2246     if rd is None:
2247       return (False, "Can't find device %s" % cf)
2248     bdevs.append(rd)
2249
2250   msg = []
2251   for rd in bdevs:
2252     try:
2253       rd.Close()
2254     except errors.BlockDeviceError, err:
2255       msg.append(str(err))
2256   if msg:
2257     return (False, "Can't make devices secondary: %s" % ",".join(msg))
2258   else:
2259     if instance_name:
2260       _RemoveBlockDevLinks(instance_name, disks)
2261     return (True, "All devices secondary")
2262
2263
2264 def ValidateHVParams(hvname, hvparams):
2265   """Validates the given hypervisor parameters.
2266
2267   @type hvname: string
2268   @param hvname: the hypervisor name
2269   @type hvparams: dict
2270   @param hvparams: the hypervisor parameters to be validated
2271   @rtype: tuple (success, message)
2272   @return: a tuple of success and message, where success
2273       indicates the succes of the operation, and message
2274       which will contain the error details in case we
2275       failed
2276
2277   """
2278   try:
2279     hv_type = hypervisor.GetHypervisor(hvname)
2280     hv_type.ValidateParameters(hvparams)
2281     return (True, "Validation passed")
2282   except errors.HypervisorError, err:
2283     return (False, str(err))
2284
2285
2286 def DemoteFromMC():
2287   """Demotes the current node from master candidate role.
2288
2289   """
2290   # try to ensure we're not the master by mistake
2291   master, myself = ssconf.GetMasterAndMyself()
2292   if master == myself:
2293     return (False, "ssconf status shows I'm the master node, will not demote")
2294   pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2295   if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2296     return (False, "The master daemon is running, will not demote")
2297   try:
2298     utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2299   except EnvironmentError, err:
2300     if err.errno != errno.ENOENT:
2301       return (False, "Error while backing up cluster file: %s" % str(err))
2302   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2303   return (True, "Done")
2304
2305
2306 def _FindDisks(nodes_ip, disks):
2307   """Sets the physical ID on disks and returns the block devices.
2308
2309   """
2310   # set the correct physical ID
2311   my_name = utils.HostInfo().name
2312   for cf in disks:
2313     cf.SetPhysicalID(my_name, nodes_ip)
2314
2315   bdevs = []
2316
2317   for cf in disks:
2318     rd = _RecursiveFindBD(cf)
2319     if rd is None:
2320       return (False, "Can't find device %s" % cf)
2321     bdevs.append(rd)
2322   return (True, bdevs)
2323
2324
2325 def DrbdDisconnectNet(nodes_ip, disks):
2326   """Disconnects the network on a list of drbd devices.
2327
2328   """
2329   status, bdevs = _FindDisks(nodes_ip, disks)
2330   if not status:
2331     return status, bdevs
2332
2333   # disconnect disks
2334   for rd in bdevs:
2335     try:
2336       rd.DisconnectNet()
2337     except errors.BlockDeviceError, err:
2338       logging.exception("Failed to go into standalone mode")
2339       return (False, "Can't change network configuration: %s" % str(err))
2340   return (True, "All disks are now disconnected")
2341
2342
2343 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2344   """Attaches the network on a list of drbd devices.
2345
2346   """
2347   status, bdevs = _FindDisks(nodes_ip, disks)
2348   if not status:
2349     return status, bdevs
2350
2351   if multimaster:
2352     for idx, rd in enumerate(bdevs):
2353       try:
2354         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2355       except EnvironmentError, err:
2356         return (False, "Can't create symlink: %s" % str(err))
2357   # reconnect disks, switch to new master configuration and if
2358   # needed primary mode
2359   for rd in bdevs:
2360     try:
2361       rd.AttachNet(multimaster)
2362     except errors.BlockDeviceError, err:
2363       return (False, "Can't change network configuration: %s" % str(err))
2364   # wait until the disks are connected; we need to retry the re-attach
2365   # if the device becomes standalone, as this might happen if the one
2366   # node disconnects and reconnects in a different mode before the
2367   # other node reconnects; in this case, one or both of the nodes will
2368   # decide it has wrong configuration and switch to standalone
2369   RECONNECT_TIMEOUT = 2 * 60
2370   sleep_time = 0.100 # start with 100 miliseconds
2371   timeout_limit = time.time() + RECONNECT_TIMEOUT
2372   while time.time() < timeout_limit:
2373     all_connected = True
2374     for rd in bdevs:
2375       stats = rd.GetProcStatus()
2376       if not (stats.is_connected or stats.is_in_resync):
2377         all_connected = False
2378       if stats.is_standalone:
2379         # peer had different config info and this node became
2380         # standalone, even though this should not happen with the
2381         # new staged way of changing disk configs
2382         try:
2383           rd.ReAttachNet(multimaster)
2384         except errors.BlockDeviceError, err:
2385           return (False, "Can't change network configuration: %s" % str(err))
2386     if all_connected:
2387       break
2388     time.sleep(sleep_time)
2389     sleep_time = min(5, sleep_time * 1.5)
2390   if not all_connected:
2391     return (False, "Timeout in disk reconnecting")
2392   if multimaster:
2393     # change to primary mode
2394     for rd in bdevs:
2395       try:
2396         rd.Open()
2397       except errors.BlockDeviceError, err:
2398         return (False, "Can't change to primary mode: %s" % str(err))
2399   if multimaster:
2400     msg = "multi-master and primary"
2401   else:
2402     msg = "single-master"
2403   return (True, "Disks are now configured as %s" % msg)
2404
2405
2406 def DrbdWaitSync(nodes_ip, disks):
2407   """Wait until DRBDs have synchronized.
2408
2409   """
2410   status, bdevs = _FindDisks(nodes_ip, disks)
2411   if not status:
2412     return status, bdevs
2413
2414   min_resync = 100
2415   alldone = True
2416   failure = False
2417   for rd in bdevs:
2418     stats = rd.GetProcStatus()
2419     if not (stats.is_connected or stats.is_in_resync):
2420       failure = True
2421       break
2422     alldone = alldone and (not stats.is_in_resync)
2423     if stats.sync_percent is not None:
2424       min_resync = min(min_resync, stats.sync_percent)
2425   return (not failure, (alldone, min_resync))
2426
2427
2428 class HooksRunner(object):
2429   """Hook runner.
2430
2431   This class is instantiated on the node side (ganeti-noded) and not
2432   on the master side.
2433
2434   """
2435   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2436
2437   def __init__(self, hooks_base_dir=None):
2438     """Constructor for hooks runner.
2439
2440     @type hooks_base_dir: str or None
2441     @param hooks_base_dir: if not None, this overrides the
2442         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2443
2444     """
2445     if hooks_base_dir is None:
2446       hooks_base_dir = constants.HOOKS_BASE_DIR
2447     self._BASE_DIR = hooks_base_dir
2448
2449   @staticmethod
2450   def ExecHook(script, env):
2451     """Exec one hook script.
2452
2453     @type script: str
2454     @param script: the full path to the script
2455     @type env: dict
2456     @param env: the environment with which to exec the script
2457     @rtype: tuple (success, message)
2458     @return: a tuple of success and message, where success
2459         indicates the succes of the operation, and message
2460         which will contain the error details in case we
2461         failed
2462
2463     """
2464     # exec the process using subprocess and log the output
2465     fdstdin = None
2466     try:
2467       fdstdin = open("/dev/null", "r")
2468       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2469                                stderr=subprocess.STDOUT, close_fds=True,
2470                                shell=False, cwd="/", env=env)
2471       output = ""
2472       try:
2473         output = child.stdout.read(4096)
2474         child.stdout.close()
2475       except EnvironmentError, err:
2476         output += "Hook script error: %s" % str(err)
2477
2478       while True:
2479         try:
2480           result = child.wait()
2481           break
2482         except EnvironmentError, err:
2483           if err.errno == errno.EINTR:
2484             continue
2485           raise
2486     finally:
2487       # try not to leak fds
2488       for fd in (fdstdin, ):
2489         if fd is not None:
2490           try:
2491             fd.close()
2492           except EnvironmentError, err:
2493             # just log the error
2494             #logging.exception("Error while closing fd %s", fd)
2495             pass
2496
2497     return result == 0, utils.SafeEncode(output.strip())
2498
2499   def RunHooks(self, hpath, phase, env):
2500     """Run the scripts in the hooks directory.
2501
2502     @type hpath: str
2503     @param hpath: the path to the hooks directory which
2504         holds the scripts
2505     @type phase: str
2506     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2507         L{constants.HOOKS_PHASE_POST}
2508     @type env: dict
2509     @param env: dictionary with the environment for the hook
2510     @rtype: list
2511     @return: list of 3-element tuples:
2512       - script path
2513       - script result, either L{constants.HKR_SUCCESS} or
2514         L{constants.HKR_FAIL}
2515       - output of the script
2516
2517     @raise errors.ProgrammerError: for invalid input
2518         parameters
2519
2520     """
2521     if phase == constants.HOOKS_PHASE_PRE:
2522       suffix = "pre"
2523     elif phase == constants.HOOKS_PHASE_POST:
2524       suffix = "post"
2525     else:
2526       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2527     rr = []
2528
2529     subdir = "%s-%s.d" % (hpath, suffix)
2530     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2531     try:
2532       dir_contents = utils.ListVisibleFiles(dir_name)
2533     except OSError, err:
2534       # FIXME: must log output in case of failures
2535       return rr
2536
2537     # we use the standard python sort order,
2538     # so 00name is the recommended naming scheme
2539     dir_contents.sort()
2540     for relname in dir_contents:
2541       fname = os.path.join(dir_name, relname)
2542       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2543           self.RE_MASK.match(relname) is not None):
2544         rrval = constants.HKR_SKIP
2545         output = ""
2546       else:
2547         result, output = self.ExecHook(fname, env)
2548         if not result:
2549           rrval = constants.HKR_FAIL
2550         else:
2551           rrval = constants.HKR_SUCCESS
2552       rr.append(("%s/%s" % (subdir, relname), rrval, output))
2553
2554     return rr
2555
2556
2557 class IAllocatorRunner(object):
2558   """IAllocator runner.
2559
2560   This class is instantiated on the node side (ganeti-noded) and not on
2561   the master side.
2562
2563   """
2564   def Run(self, name, idata):
2565     """Run an iallocator script.
2566
2567     @type name: str
2568     @param name: the iallocator script name
2569     @type idata: str
2570     @param idata: the allocator input data
2571
2572     @rtype: tuple
2573     @return: four element tuple of:
2574        - run status (one of the IARUN_ constants)
2575        - stdout
2576        - stderr
2577        - fail reason (as from L{utils.RunResult})
2578
2579     """
2580     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2581                                   os.path.isfile)
2582     if alloc_script is None:
2583       return (constants.IARUN_NOTFOUND, None, None, None)
2584
2585     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2586     try:
2587       os.write(fd, idata)
2588       os.close(fd)
2589       result = utils.RunCmd([alloc_script, fin_name])
2590       if result.failed:
2591         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2592                 result.fail_reason)
2593     finally:
2594       os.unlink(fin_name)
2595
2596     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2597
2598
2599 class DevCacheManager(object):
2600   """Simple class for managing a cache of block device information.
2601
2602   """
2603   _DEV_PREFIX = "/dev/"
2604   _ROOT_DIR = constants.BDEV_CACHE_DIR
2605
2606   @classmethod
2607   def _ConvertPath(cls, dev_path):
2608     """Converts a /dev/name path to the cache file name.
2609
2610     This replaces slashes with underscores and strips the /dev
2611     prefix. It then returns the full path to the cache file.
2612
2613     @type dev_path: str
2614     @param dev_path: the C{/dev/} path name
2615     @rtype: str
2616     @return: the converted path name
2617
2618     """
2619     if dev_path.startswith(cls._DEV_PREFIX):
2620       dev_path = dev_path[len(cls._DEV_PREFIX):]
2621     dev_path = dev_path.replace("/", "_")
2622     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2623     return fpath
2624
2625   @classmethod
2626   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2627     """Updates the cache information for a given device.
2628
2629     @type dev_path: str
2630     @param dev_path: the pathname of the device
2631     @type owner: str
2632     @param owner: the owner (instance name) of the device
2633     @type on_primary: bool
2634     @param on_primary: whether this is the primary
2635         node nor not
2636     @type iv_name: str
2637     @param iv_name: the instance-visible name of the
2638         device, as in objects.Disk.iv_name
2639
2640     @rtype: None
2641
2642     """
2643     if dev_path is None:
2644       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2645       return
2646     fpath = cls._ConvertPath(dev_path)
2647     if on_primary:
2648       state = "primary"
2649     else:
2650       state = "secondary"
2651     if iv_name is None:
2652       iv_name = "not_visible"
2653     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2654     try:
2655       utils.WriteFile(fpath, data=fdata)
2656     except EnvironmentError, err:
2657       logging.exception("Can't update bdev cache for %s", dev_path)
2658
2659   @classmethod
2660   def RemoveCache(cls, dev_path):
2661     """Remove data for a dev_path.
2662
2663     This is just a wrapper over L{utils.RemoveFile} with a converted
2664     path name and logging.
2665
2666     @type dev_path: str
2667     @param dev_path: the pathname of the device
2668
2669     @rtype: None
2670
2671     """
2672     if dev_path is None:
2673       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2674       return
2675     fpath = cls._ConvertPath(dev_path)
2676     try:
2677       utils.RemoveFile(fpath)
2678     except EnvironmentError, err:
2679       logging.exception("Can't update bdev cache for %s", dev_path)