Change BlockDev.Shutdown() failure result
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36 import zlib
37 import base64
38
39 from ganeti import errors
40 from ganeti import utils
41 from ganeti import ssh
42 from ganeti import hypervisor
43 from ganeti import constants
44 from ganeti import bdev
45 from ganeti import objects
46 from ganeti import ssconf
47
48
49 def _GetConfig():
50   """Simple wrapper to return a SimpleStore.
51
52   @rtype: L{ssconf.SimpleStore}
53   @return: a SimpleStore instance
54
55   """
56   return ssconf.SimpleStore()
57
58
59 def _GetSshRunner(cluster_name):
60   """Simple wrapper to return an SshRunner.
61
62   @type cluster_name: str
63   @param cluster_name: the cluster name, which is needed
64       by the SshRunner constructor
65   @rtype: L{ssh.SshRunner}
66   @return: an SshRunner instance
67
68   """
69   return ssh.SshRunner(cluster_name)
70
71
72 def _Decompress(data):
73   """Unpacks data compressed by the RPC client.
74
75   @type data: list or tuple
76   @param data: Data sent by RPC client
77   @rtype: str
78   @return: Decompressed data
79
80   """
81   assert isinstance(data, (list, tuple))
82   assert len(data) == 2
83   (encoding, content) = data
84   if encoding == constants.RPC_ENCODING_NONE:
85     return content
86   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
87     return zlib.decompress(base64.b64decode(content))
88   else:
89     raise AssertionError("Unknown data encoding")
90
91
92 def _CleanDirectory(path, exclude=None):
93   """Removes all regular files in a directory.
94
95   @type path: str
96   @param path: the directory to clean
97   @type exclude: list
98   @param exclude: list of files to be excluded, defaults
99       to the empty list
100
101   """
102   if not os.path.isdir(path):
103     return
104   if exclude is None:
105     exclude = []
106   else:
107     # Normalize excluded paths
108     exclude = [os.path.normpath(i) for i in exclude]
109
110   for rel_name in utils.ListVisibleFiles(path):
111     full_name = os.path.normpath(os.path.join(path, rel_name))
112     if full_name in exclude:
113       continue
114     if os.path.isfile(full_name) and not os.path.islink(full_name):
115       utils.RemoveFile(full_name)
116
117
118 def JobQueuePurge():
119   """Removes job queue files and archived jobs.
120
121   @rtype: None
122
123   """
124   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
125   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
126
127
128 def GetMasterInfo():
129   """Returns master information.
130
131   This is an utility function to compute master information, either
132   for consumption here or from the node daemon.
133
134   @rtype: tuple
135   @return: (master_netdev, master_ip, master_name) if we have a good
136       configuration, otherwise (None, None, None)
137
138   """
139   try:
140     cfg = _GetConfig()
141     master_netdev = cfg.GetMasterNetdev()
142     master_ip = cfg.GetMasterIP()
143     master_node = cfg.GetMasterNode()
144   except errors.ConfigurationError, err:
145     logging.exception("Cluster configuration incomplete")
146     return (None, None, None)
147   return (master_netdev, master_ip, master_node)
148
149
150 def StartMaster(start_daemons):
151   """Activate local node as master node.
152
153   The function will always try activate the IP address of the master
154   (unless someone else has it). It will also start the master daemons,
155   based on the start_daemons parameter.
156
157   @type start_daemons: boolean
158   @param start_daemons: whther to also start the master
159       daemons (ganeti-masterd and ganeti-rapi)
160   @rtype: None
161
162   """
163   ok = True
164   master_netdev, master_ip, _ = GetMasterInfo()
165   if not master_netdev:
166     return False
167
168   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
169     if utils.OwnIpAddress(master_ip):
170       # we already have the ip:
171       logging.debug("Already started")
172     else:
173       logging.error("Someone else has the master ip, not activating")
174       ok = False
175   else:
176     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
177                            "dev", master_netdev, "label",
178                            "%s:0" % master_netdev])
179     if result.failed:
180       logging.error("Can't activate master IP: %s", result.output)
181       ok = False
182
183     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
184                            "-s", master_ip, master_ip])
185     # we'll ignore the exit code of arping
186
187   # and now start the master and rapi daemons
188   if start_daemons:
189     for daemon in 'ganeti-masterd', 'ganeti-rapi':
190       result = utils.RunCmd([daemon])
191       if result.failed:
192         logging.error("Can't start daemon %s: %s", daemon, result.output)
193         ok = False
194   return ok
195
196
197 def StopMaster(stop_daemons):
198   """Deactivate this node as master.
199
200   The function will always try to deactivate the IP address of the
201   master. It will also stop the master daemons depending on the
202   stop_daemons parameter.
203
204   @type stop_daemons: boolean
205   @param stop_daemons: whether to also stop the master daemons
206       (ganeti-masterd and ganeti-rapi)
207   @rtype: None
208
209   """
210   master_netdev, master_ip, _ = GetMasterInfo()
211   if not master_netdev:
212     return False
213
214   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
215                          "dev", master_netdev])
216   if result.failed:
217     logging.error("Can't remove the master IP, error: %s", result.output)
218     # but otherwise ignore the failure
219
220   if stop_daemons:
221     # stop/kill the rapi and the master daemon
222     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
223       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
224
225   return True
226
227
228 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
229   """Joins this node to the cluster.
230
231   This does the following:
232       - updates the hostkeys of the machine (rsa and dsa)
233       - adds the ssh private key to the user
234       - adds the ssh public key to the users' authorized_keys file
235
236   @type dsa: str
237   @param dsa: the DSA private key to write
238   @type dsapub: str
239   @param dsapub: the DSA public key to write
240   @type rsa: str
241   @param rsa: the RSA private key to write
242   @type rsapub: str
243   @param rsapub: the RSA public key to write
244   @type sshkey: str
245   @param sshkey: the SSH private key to write
246   @type sshpub: str
247   @param sshpub: the SSH public key to write
248   @rtype: boolean
249   @return: the success of the operation
250
251   """
252   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
253                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
254                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
255                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
256   for name, content, mode in sshd_keys:
257     utils.WriteFile(name, data=content, mode=mode)
258
259   try:
260     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
261                                                     mkdir=True)
262   except errors.OpExecError, err:
263     msg = "Error while processing user ssh files"
264     logging.exception(msg)
265     return (False, "%s: %s" % (msg, err))
266
267   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
268     utils.WriteFile(name, data=content, mode=0600)
269
270   utils.AddAuthorizedKey(auth_keys, sshpub)
271
272   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
273
274   return (True, "Node added successfully")
275
276
277 def LeaveCluster():
278   """Cleans up and remove the current node.
279
280   This function cleans up and prepares the current node to be removed
281   from the cluster.
282
283   If processing is successful, then it raises an
284   L{errors.QuitGanetiException} which is used as a special case to
285   shutdown the node daemon.
286
287   """
288   _CleanDirectory(constants.DATA_DIR)
289   JobQueuePurge()
290
291   try:
292     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
293   except errors.OpExecError:
294     logging.exception("Error while processing ssh files")
295     return
296
297   f = open(pub_key, 'r')
298   try:
299     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
300   finally:
301     f.close()
302
303   utils.RemoveFile(priv_key)
304   utils.RemoveFile(pub_key)
305
306   # Return a reassuring string to the caller, and quit
307   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
308
309
310 def GetNodeInfo(vgname, hypervisor_type):
311   """Gives back a hash with different informations about the node.
312
313   @type vgname: C{string}
314   @param vgname: the name of the volume group to ask for disk space information
315   @type hypervisor_type: C{str}
316   @param hypervisor_type: the name of the hypervisor to ask for
317       memory information
318   @rtype: C{dict}
319   @return: dictionary with the following keys:
320       - vg_size is the size of the configured volume group in MiB
321       - vg_free is the free size of the volume group in MiB
322       - memory_dom0 is the memory allocated for domain0 in MiB
323       - memory_free is the currently available (free) ram in MiB
324       - memory_total is the total number of ram in MiB
325
326   """
327   outputarray = {}
328   vginfo = _GetVGInfo(vgname)
329   outputarray['vg_size'] = vginfo['vg_size']
330   outputarray['vg_free'] = vginfo['vg_free']
331
332   hyper = hypervisor.GetHypervisor(hypervisor_type)
333   hyp_info = hyper.GetNodeInfo()
334   if hyp_info is not None:
335     outputarray.update(hyp_info)
336
337   f = open("/proc/sys/kernel/random/boot_id", 'r')
338   try:
339     outputarray["bootid"] = f.read(128).rstrip("\n")
340   finally:
341     f.close()
342
343   return outputarray
344
345
346 def VerifyNode(what, cluster_name):
347   """Verify the status of the local node.
348
349   Based on the input L{what} parameter, various checks are done on the
350   local node.
351
352   If the I{filelist} key is present, this list of
353   files is checksummed and the file/checksum pairs are returned.
354
355   If the I{nodelist} key is present, we check that we have
356   connectivity via ssh with the target nodes (and check the hostname
357   report).
358
359   If the I{node-net-test} key is present, we check that we have
360   connectivity to the given nodes via both primary IP and, if
361   applicable, secondary IPs.
362
363   @type what: C{dict}
364   @param what: a dictionary of things to check:
365       - filelist: list of files for which to compute checksums
366       - nodelist: list of nodes we should check ssh communication with
367       - node-net-test: list of nodes we should check node daemon port
368         connectivity with
369       - hypervisor: list with hypervisors to run the verify for
370   @rtype: dict
371   @return: a dictionary with the same keys as the input dict, and
372       values representing the result of the checks
373
374   """
375   result = {}
376
377   if constants.NV_HYPERVISOR in what:
378     result[constants.NV_HYPERVISOR] = tmp = {}
379     for hv_name in what[constants.NV_HYPERVISOR]:
380       tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
381
382   if constants.NV_FILELIST in what:
383     result[constants.NV_FILELIST] = utils.FingerprintFiles(
384       what[constants.NV_FILELIST])
385
386   if constants.NV_NODELIST in what:
387     result[constants.NV_NODELIST] = tmp = {}
388     random.shuffle(what[constants.NV_NODELIST])
389     for node in what[constants.NV_NODELIST]:
390       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
391       if not success:
392         tmp[node] = message
393
394   if constants.NV_NODENETTEST in what:
395     result[constants.NV_NODENETTEST] = tmp = {}
396     my_name = utils.HostInfo().name
397     my_pip = my_sip = None
398     for name, pip, sip in what[constants.NV_NODENETTEST]:
399       if name == my_name:
400         my_pip = pip
401         my_sip = sip
402         break
403     if not my_pip:
404       tmp[my_name] = ("Can't find my own primary/secondary IP"
405                       " in the node list")
406     else:
407       port = utils.GetNodeDaemonPort()
408       for name, pip, sip in what[constants.NV_NODENETTEST]:
409         fail = []
410         if not utils.TcpPing(pip, port, source=my_pip):
411           fail.append("primary")
412         if sip != pip:
413           if not utils.TcpPing(sip, port, source=my_sip):
414             fail.append("secondary")
415         if fail:
416           tmp[name] = ("failure using the %s interface(s)" %
417                        " and ".join(fail))
418
419   if constants.NV_LVLIST in what:
420     result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
421
422   if constants.NV_INSTANCELIST in what:
423     result[constants.NV_INSTANCELIST] = GetInstanceList(
424       what[constants.NV_INSTANCELIST])
425
426   if constants.NV_VGLIST in what:
427     result[constants.NV_VGLIST] = ListVolumeGroups()
428
429   if constants.NV_VERSION in what:
430     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
431                                     constants.RELEASE_VERSION)
432
433   if constants.NV_HVINFO in what:
434     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
435     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
436
437   if constants.NV_DRBDLIST in what:
438     try:
439       used_minors = bdev.DRBD8.GetUsedDevs().keys()
440     except errors.BlockDeviceError:
441       logging.warning("Can't get used minors list", exc_info=True)
442       used_minors = []
443     result[constants.NV_DRBDLIST] = used_minors
444
445   return result
446
447
448 def GetVolumeList(vg_name):
449   """Compute list of logical volumes and their size.
450
451   @type vg_name: str
452   @param vg_name: the volume group whose LVs we should list
453   @rtype: dict
454   @return:
455       dictionary of all partions (key) with value being a tuple of
456       their size (in MiB), inactive and online status::
457
458         {'test1': ('20.06', True, True)}
459
460       in case of errors, a string is returned with the error
461       details.
462
463   """
464   lvs = {}
465   sep = '|'
466   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
467                          "--separator=%s" % sep,
468                          "-olv_name,lv_size,lv_attr", vg_name])
469   if result.failed:
470     logging.error("Failed to list logical volumes, lvs output: %s",
471                   result.output)
472     return result.output
473
474   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
475   for line in result.stdout.splitlines():
476     line = line.strip()
477     match = valid_line_re.match(line)
478     if not match:
479       logging.error("Invalid line returned from lvs output: '%s'", line)
480       continue
481     name, size, attr = match.groups()
482     inactive = attr[4] == '-'
483     online = attr[5] == 'o'
484     lvs[name] = (size, inactive, online)
485
486   return lvs
487
488
489 def ListVolumeGroups():
490   """List the volume groups and their size.
491
492   @rtype: dict
493   @return: dictionary with keys volume name and values the
494       size of the volume
495
496   """
497   return utils.ListVolumeGroups()
498
499
500 def NodeVolumes():
501   """List all volumes on this node.
502
503   @rtype: list
504   @return:
505     A list of dictionaries, each having four keys:
506       - name: the logical volume name,
507       - size: the size of the logical volume
508       - dev: the physical device on which the LV lives
509       - vg: the volume group to which it belongs
510
511     In case of errors, we return an empty list and log the
512     error.
513
514     Note that since a logical volume can live on multiple physical
515     volumes, the resulting list might include a logical volume
516     multiple times.
517
518   """
519   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
520                          "--separator=|",
521                          "--options=lv_name,lv_size,devices,vg_name"])
522   if result.failed:
523     logging.error("Failed to list logical volumes, lvs output: %s",
524                   result.output)
525     return []
526
527   def parse_dev(dev):
528     if '(' in dev:
529       return dev.split('(')[0]
530     else:
531       return dev
532
533   def map_line(line):
534     return {
535       'name': line[0].strip(),
536       'size': line[1].strip(),
537       'dev': parse_dev(line[2].strip()),
538       'vg': line[3].strip(),
539     }
540
541   return [map_line(line.split('|')) for line in result.stdout.splitlines()
542           if line.count('|') >= 3]
543
544
545 def BridgesExist(bridges_list):
546   """Check if a list of bridges exist on the current node.
547
548   @rtype: boolean
549   @return: C{True} if all of them exist, C{False} otherwise
550
551   """
552   for bridge in bridges_list:
553     if not utils.BridgeExists(bridge):
554       return False
555
556   return True
557
558
559 def GetInstanceList(hypervisor_list):
560   """Provides a list of instances.
561
562   @type hypervisor_list: list
563   @param hypervisor_list: the list of hypervisors to query information
564
565   @rtype: list
566   @return: a list of all running instances on the current node
567     - instance1.example.com
568     - instance2.example.com
569
570   """
571   results = []
572   for hname in hypervisor_list:
573     try:
574       names = hypervisor.GetHypervisor(hname).ListInstances()
575       results.extend(names)
576     except errors.HypervisorError, err:
577       logging.exception("Error enumerating instances for hypevisor %s", hname)
578       # FIXME: should we somehow not propagate this to the master?
579       raise
580
581   return results
582
583
584 def GetInstanceInfo(instance, hname):
585   """Gives back the informations about an instance as a dictionary.
586
587   @type instance: string
588   @param instance: the instance name
589   @type hname: string
590   @param hname: the hypervisor type of the instance
591
592   @rtype: dict
593   @return: dictionary with the following keys:
594       - memory: memory size of instance (int)
595       - state: xen state of instance (string)
596       - time: cpu time of instance (float)
597
598   """
599   output = {}
600
601   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
602   if iinfo is not None:
603     output['memory'] = iinfo[2]
604     output['state'] = iinfo[4]
605     output['time'] = iinfo[5]
606
607   return output
608
609
610 def GetInstanceMigratable(instance):
611   """Gives whether an instance can be migrated.
612
613   @type instance: L{objects.Instance}
614   @param instance: object representing the instance to be checked.
615
616   @rtype: tuple
617   @return: tuple of (result, description) where:
618       - result: whether the instance can be migrated or not
619       - description: a description of the issue, if relevant
620
621   """
622   hyper = hypervisor.GetHypervisor(instance.hypervisor)
623   if instance.name not in hyper.ListInstances():
624     return (False, 'not running')
625
626   for idx in range(len(instance.disks)):
627     link_name = _GetBlockDevSymlinkPath(instance.name, idx)
628     if not os.path.islink(link_name):
629       return (False, 'not restarted since ganeti 1.2.5')
630
631   return (True, '')
632
633
634 def GetAllInstancesInfo(hypervisor_list):
635   """Gather data about all instances.
636
637   This is the equivalent of L{GetInstanceInfo}, except that it
638   computes data for all instances at once, thus being faster if one
639   needs data about more than one instance.
640
641   @type hypervisor_list: list
642   @param hypervisor_list: list of hypervisors to query for instance data
643
644   @rtype: dict
645   @return: dictionary of instance: data, with data having the following keys:
646       - memory: memory size of instance (int)
647       - state: xen state of instance (string)
648       - time: cpu time of instance (float)
649       - vcpus: the number of vcpus
650
651   """
652   output = {}
653
654   for hname in hypervisor_list:
655     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
656     if iinfo:
657       for name, inst_id, memory, vcpus, state, times in iinfo:
658         value = {
659           'memory': memory,
660           'vcpus': vcpus,
661           'state': state,
662           'time': times,
663           }
664         if name in output and output[name] != value:
665           raise errors.HypervisorError("Instance %s running duplicate"
666                                        " with different parameters" % name)
667         output[name] = value
668
669   return output
670
671
672 def InstanceOsAdd(instance):
673   """Add an OS to an instance.
674
675   @type instance: L{objects.Instance}
676   @param instance: Instance whose OS is to be installed
677   @rtype: boolean
678   @return: the success of the operation
679
680   """
681   try:
682     inst_os = OSFromDisk(instance.os)
683   except errors.InvalidOS, err:
684     os_name, os_dir, os_err = err.args
685     if os_dir is None:
686       return (False, "Can't find OS '%s': %s" % (os_name, os_err))
687     else:
688       return (False, "Error parsing OS '%s' in directory %s: %s" %
689               (os_name, os_dir, os_err))
690
691   create_env = OSEnvironment(instance)
692
693   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
694                                      instance.name, int(time.time()))
695
696   result = utils.RunCmd([inst_os.create_script], env=create_env,
697                         cwd=inst_os.path, output=logfile,)
698   if result.failed:
699     logging.error("os create command '%s' returned error: %s, logfile: %s,"
700                   " output: %s", result.cmd, result.fail_reason, logfile,
701                   result.output)
702     lines = [utils.SafeEncode(val)
703              for val in utils.TailFile(logfile, lines=20)]
704     return (False, "OS create script failed (%s), last lines in the"
705             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
706
707   return (True, "Successfully installed")
708
709
710 def RunRenameInstance(instance, old_name):
711   """Run the OS rename script for an instance.
712
713   @type instance: L{objects.Instance}
714   @param instance: Instance whose OS is to be installed
715   @type old_name: string
716   @param old_name: previous instance name
717   @rtype: boolean
718   @return: the success of the operation
719
720   """
721   inst_os = OSFromDisk(instance.os)
722
723   rename_env = OSEnvironment(instance)
724   rename_env['OLD_INSTANCE_NAME'] = old_name
725
726   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
727                                            old_name,
728                                            instance.name, int(time.time()))
729
730   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
731                         cwd=inst_os.path, output=logfile)
732
733   if result.failed:
734     logging.error("os create command '%s' returned error: %s output: %s",
735                   result.cmd, result.fail_reason, result.output)
736     lines = [utils.SafeEncode(val)
737              for val in utils.TailFile(logfile, lines=20)]
738     return (False, "OS rename script failed (%s), last lines in the"
739             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
740
741   return (True, "Rename successful")
742
743
744 def _GetVGInfo(vg_name):
745   """Get informations about the volume group.
746
747   @type vg_name: str
748   @param vg_name: the volume group which we query
749   @rtype: dict
750   @return:
751     A dictionary with the following keys:
752       - C{vg_size} is the total size of the volume group in MiB
753       - C{vg_free} is the free size of the volume group in MiB
754       - C{pv_count} are the number of physical disks in that VG
755
756     If an error occurs during gathering of data, we return the same dict
757     with keys all set to None.
758
759   """
760   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
761
762   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
763                          "--nosuffix", "--units=m", "--separator=:", vg_name])
764
765   if retval.failed:
766     logging.error("volume group %s not present", vg_name)
767     return retdic
768   valarr = retval.stdout.strip().rstrip(':').split(':')
769   if len(valarr) == 3:
770     try:
771       retdic = {
772         "vg_size": int(round(float(valarr[0]), 0)),
773         "vg_free": int(round(float(valarr[1]), 0)),
774         "pv_count": int(valarr[2]),
775         }
776     except ValueError, err:
777       logging.exception("Fail to parse vgs output")
778   else:
779     logging.error("vgs output has the wrong number of fields (expected"
780                   " three): %s", str(valarr))
781   return retdic
782
783
784 def _GetBlockDevSymlinkPath(instance_name, idx):
785   return os.path.join(constants.DISK_LINKS_DIR,
786                       "%s:%d" % (instance_name, idx))
787
788
789 def _SymlinkBlockDev(instance_name, device_path, idx):
790   """Set up symlinks to a instance's block device.
791
792   This is an auxiliary function run when an instance is start (on the primary
793   node) or when an instance is migrated (on the target node).
794
795
796   @param instance_name: the name of the target instance
797   @param device_path: path of the physical block device, on the node
798   @param idx: the disk index
799   @return: absolute path to the disk's symlink
800
801   """
802   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
803   try:
804     os.symlink(device_path, link_name)
805   except OSError, err:
806     if err.errno == errno.EEXIST:
807       if (not os.path.islink(link_name) or
808           os.readlink(link_name) != device_path):
809         os.remove(link_name)
810         os.symlink(device_path, link_name)
811     else:
812       raise
813
814   return link_name
815
816
817 def _RemoveBlockDevLinks(instance_name, disks):
818   """Remove the block device symlinks belonging to the given instance.
819
820   """
821   for idx, disk in enumerate(disks):
822     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
823     if os.path.islink(link_name):
824       try:
825         os.remove(link_name)
826       except OSError:
827         logging.exception("Can't remove symlink '%s'", link_name)
828
829
830 def _GatherAndLinkBlockDevs(instance):
831   """Set up an instance's block device(s).
832
833   This is run on the primary node at instance startup. The block
834   devices must be already assembled.
835
836   @type instance: L{objects.Instance}
837   @param instance: the instance whose disks we shoul assemble
838   @rtype: list
839   @return: list of (disk_object, device_path)
840
841   """
842   block_devices = []
843   for idx, disk in enumerate(instance.disks):
844     device = _RecursiveFindBD(disk)
845     if device is None:
846       raise errors.BlockDeviceError("Block device '%s' is not set up." %
847                                     str(disk))
848     device.Open()
849     try:
850       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
851     except OSError, e:
852       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
853                                     e.strerror)
854
855     block_devices.append((disk, link_name))
856
857   return block_devices
858
859
860 def StartInstance(instance, extra_args):
861   """Start an instance.
862
863   @type instance: L{objects.Instance}
864   @param instance: the instance object
865   @rtype: boolean
866   @return: whether the startup was successful or not
867
868   """
869   running_instances = GetInstanceList([instance.hypervisor])
870
871   if instance.name in running_instances:
872     return (True, "Already running")
873
874   try:
875     block_devices = _GatherAndLinkBlockDevs(instance)
876     hyper = hypervisor.GetHypervisor(instance.hypervisor)
877     hyper.StartInstance(instance, block_devices, extra_args)
878   except errors.BlockDeviceError, err:
879     logging.exception("Failed to start instance")
880     return (False, "Block device error: %s" % str(err))
881   except errors.HypervisorError, err:
882     logging.exception("Failed to start instance")
883     _RemoveBlockDevLinks(instance.name, instance.disks)
884     return (False, "Hypervisor error: %s" % str(err))
885
886   return (True, "Instance started successfully")
887
888
889 def ShutdownInstance(instance):
890   """Shut an instance down.
891
892   @note: this functions uses polling with a hardcoded timeout.
893
894   @type instance: L{objects.Instance}
895   @param instance: the instance object
896   @rtype: boolean
897   @return: whether the startup was successful or not
898
899   """
900   hv_name = instance.hypervisor
901   running_instances = GetInstanceList([hv_name])
902
903   if instance.name not in running_instances:
904     return True
905
906   hyper = hypervisor.GetHypervisor(hv_name)
907   try:
908     hyper.StopInstance(instance)
909   except errors.HypervisorError, err:
910     logging.error("Failed to stop instance: %s" % err)
911     return False
912
913   # test every 10secs for 2min
914
915   time.sleep(1)
916   for dummy in range(11):
917     if instance.name not in GetInstanceList([hv_name]):
918       break
919     time.sleep(10)
920   else:
921     # the shutdown did not succeed
922     logging.error("Shutdown of '%s' unsuccessful, using destroy",
923                   instance.name)
924
925     try:
926       hyper.StopInstance(instance, force=True)
927     except errors.HypervisorError, err:
928       logging.exception("Failed to stop instance: %s" % err)
929       return False
930
931     time.sleep(1)
932     if instance.name in GetInstanceList([hv_name]):
933       logging.error("Could not shutdown instance '%s' even by destroy",
934                     instance.name)
935       return False
936
937   _RemoveBlockDevLinks(instance.name, instance.disks)
938
939   return True
940
941
942 def RebootInstance(instance, reboot_type, extra_args):
943   """Reboot an instance.
944
945   @type instance: L{objects.Instance}
946   @param instance: the instance object to reboot
947   @type reboot_type: str
948   @param reboot_type: the type of reboot, one the following
949     constants:
950       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
951         instance OS, do not recreate the VM
952       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
953         restart the VM (at the hypervisor level)
954       - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
955         is not accepted here, since that mode is handled
956         differently
957   @rtype: boolean
958   @return: the success of the operation
959
960   """
961   running_instances = GetInstanceList([instance.hypervisor])
962
963   if instance.name not in running_instances:
964     logging.error("Cannot reboot instance that is not running")
965     return False
966
967   hyper = hypervisor.GetHypervisor(instance.hypervisor)
968   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
969     try:
970       hyper.RebootInstance(instance)
971     except errors.HypervisorError, err:
972       logging.exception("Failed to soft reboot instance")
973       return False
974   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
975     try:
976       ShutdownInstance(instance)
977       StartInstance(instance, extra_args)
978     except errors.HypervisorError, err:
979       logging.exception("Failed to hard reboot instance")
980       return False
981   else:
982     raise errors.ParameterError("reboot_type invalid")
983
984   return True
985
986
987 def MigrationInfo(instance):
988   """Gather information about an instance to be migrated.
989
990   @type instance: L{objects.Instance}
991   @param instance: the instance definition
992
993   """
994   hyper = hypervisor.GetHypervisor(instance.hypervisor)
995   try:
996     info = hyper.MigrationInfo(instance)
997   except errors.HypervisorError, err:
998     msg = "Failed to fetch migration information"
999     logging.exception(msg)
1000     return (False, '%s: %s' % (msg, err))
1001   return (True, info)
1002
1003
1004 def AcceptInstance(instance, info, target):
1005   """Prepare the node to accept an instance.
1006
1007   @type instance: L{objects.Instance}
1008   @param instance: the instance definition
1009   @type info: string/data (opaque)
1010   @param info: migration information, from the source node
1011   @type target: string
1012   @param target: target host (usually ip), on this node
1013
1014   """
1015   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1016   try:
1017     hyper.AcceptInstance(instance, info, target)
1018   except errors.HypervisorError, err:
1019     msg = "Failed to accept instance"
1020     logging.exception(msg)
1021     return (False, '%s: %s' % (msg, err))
1022   return (True, "Accept successfull")
1023
1024
1025 def FinalizeMigration(instance, info, success):
1026   """Finalize any preparation to accept an instance.
1027
1028   @type instance: L{objects.Instance}
1029   @param instance: the instance definition
1030   @type info: string/data (opaque)
1031   @param info: migration information, from the source node
1032   @type success: boolean
1033   @param success: whether the migration was a success or a failure
1034
1035   """
1036   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1037   try:
1038     hyper.FinalizeMigration(instance, info, success)
1039   except errors.HypervisorError, err:
1040     msg = "Failed to finalize migration"
1041     logging.exception(msg)
1042     return (False, '%s: %s' % (msg, err))
1043   return (True, "Migration Finalized")
1044
1045
1046 def MigrateInstance(instance, target, live):
1047   """Migrates an instance to another node.
1048
1049   @type instance: L{objects.Instance}
1050   @param instance: the instance definition
1051   @type target: string
1052   @param target: the target node name
1053   @type live: boolean
1054   @param live: whether the migration should be done live or not (the
1055       interpretation of this parameter is left to the hypervisor)
1056   @rtype: tuple
1057   @return: a tuple of (success, msg) where:
1058       - succes is a boolean denoting the success/failure of the operation
1059       - msg is a string with details in case of failure
1060
1061   """
1062   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1063
1064   try:
1065     hyper.MigrateInstance(instance.name, target, live)
1066   except errors.HypervisorError, err:
1067     msg = "Failed to migrate instance"
1068     logging.exception(msg)
1069     return (False, "%s: %s" % (msg, err))
1070   return (True, "Migration successfull")
1071
1072
1073 def BlockdevCreate(disk, size, owner, on_primary, info):
1074   """Creates a block device for an instance.
1075
1076   @type disk: L{objects.Disk}
1077   @param disk: the object describing the disk we should create
1078   @type size: int
1079   @param size: the size of the physical underlying device, in MiB
1080   @type owner: str
1081   @param owner: the name of the instance for which disk is created,
1082       used for device cache data
1083   @type on_primary: boolean
1084   @param on_primary:  indicates if it is the primary node or not
1085   @type info: string
1086   @param info: string that will be sent to the physical device
1087       creation, used for example to set (LVM) tags on LVs
1088
1089   @return: the new unique_id of the device (this can sometime be
1090       computed only after creation), or None. On secondary nodes,
1091       it's not required to return anything.
1092
1093   """
1094   clist = []
1095   if disk.children:
1096     for child in disk.children:
1097       crdev = _RecursiveAssembleBD(child, owner, on_primary)
1098       if on_primary or disk.AssembleOnSecondary():
1099         # we need the children open in case the device itself has to
1100         # be assembled
1101         crdev.Open()
1102       clist.append(crdev)
1103
1104   try:
1105     device = bdev.Create(disk.dev_type, disk.physical_id, clist, size)
1106   except errors.GenericError, err:
1107     return False, "Can't create block device: %s" % str(err)
1108
1109   if on_primary or disk.AssembleOnSecondary():
1110     if not device.Assemble():
1111       errorstring = "Can't assemble device after creation, very unusual event"
1112       logging.error(errorstring)
1113       return False, errorstring
1114     device.SetSyncSpeed(constants.SYNC_SPEED)
1115     if on_primary or disk.OpenOnSecondary():
1116       device.Open(force=True)
1117     DevCacheManager.UpdateCache(device.dev_path, owner,
1118                                 on_primary, disk.iv_name)
1119
1120   device.SetInfo(info)
1121
1122   physical_id = device.unique_id
1123   return True, physical_id
1124
1125
1126 def BlockdevRemove(disk):
1127   """Remove a block device.
1128
1129   @note: This is intended to be called recursively.
1130
1131   @type disk: L{objects.Disk}
1132   @param disk: the disk object we should remove
1133   @rtype: boolean
1134   @return: the success of the operation
1135
1136   """
1137   try:
1138     rdev = _RecursiveFindBD(disk)
1139   except errors.BlockDeviceError, err:
1140     # probably can't attach
1141     logging.info("Can't attach to device %s in remove", disk)
1142     rdev = None
1143   if rdev is not None:
1144     r_path = rdev.dev_path
1145     result = rdev.Remove()
1146     if result:
1147       DevCacheManager.RemoveCache(r_path)
1148   else:
1149     result = True
1150   if disk.children:
1151     for child in disk.children:
1152       result = result and BlockdevRemove(child)
1153   return result
1154
1155
1156 def _RecursiveAssembleBD(disk, owner, as_primary):
1157   """Activate a block device for an instance.
1158
1159   This is run on the primary and secondary nodes for an instance.
1160
1161   @note: this function is called recursively.
1162
1163   @type disk: L{objects.Disk}
1164   @param disk: the disk we try to assemble
1165   @type owner: str
1166   @param owner: the name of the instance which owns the disk
1167   @type as_primary: boolean
1168   @param as_primary: if we should make the block device
1169       read/write
1170
1171   @return: the assembled device or None (in case no device
1172       was assembled)
1173   @raise errors.BlockDeviceError: in case there is an error
1174       during the activation of the children or the device
1175       itself
1176
1177   """
1178   children = []
1179   if disk.children:
1180     mcn = disk.ChildrenNeeded()
1181     if mcn == -1:
1182       mcn = 0 # max number of Nones allowed
1183     else:
1184       mcn = len(disk.children) - mcn # max number of Nones
1185     for chld_disk in disk.children:
1186       try:
1187         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1188       except errors.BlockDeviceError, err:
1189         if children.count(None) >= mcn:
1190           raise
1191         cdev = None
1192         logging.error("Error in child activation: %s", str(err))
1193       children.append(cdev)
1194
1195   if as_primary or disk.AssembleOnSecondary():
1196     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children)
1197     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1198     result = r_dev
1199     if as_primary or disk.OpenOnSecondary():
1200       r_dev.Open()
1201     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1202                                 as_primary, disk.iv_name)
1203
1204   else:
1205     result = True
1206   return result
1207
1208
1209 def BlockdevAssemble(disk, owner, as_primary):
1210   """Activate a block device for an instance.
1211
1212   This is a wrapper over _RecursiveAssembleBD.
1213
1214   @rtype: str or boolean
1215   @return: a C{/dev/...} path for primary nodes, and
1216       C{True} for secondary nodes
1217
1218   """
1219   status = False
1220   result = "no error information"
1221   try:
1222     result = _RecursiveAssembleBD(disk, owner, as_primary)
1223     if isinstance(result, bdev.BlockDev):
1224       result = result.dev_path
1225       status = True
1226     if result == True:
1227       status = True
1228   except errors.BlockDeviceError, err:
1229     result = "Error while assembling disk: %s" % str(err)
1230   return (status, result)
1231
1232
1233 def BlockdevShutdown(disk):
1234   """Shut down a block device.
1235
1236   First, if the device is assembled (Attach() is successfull), then
1237   the device is shutdown. Then the children of the device are
1238   shutdown.
1239
1240   This function is called recursively. Note that we don't cache the
1241   children or such, as oppossed to assemble, shutdown of different
1242   devices doesn't require that the upper device was active.
1243
1244   @type disk: L{objects.Disk}
1245   @param disk: the description of the disk we should
1246       shutdown
1247   @rtype: boolean
1248   @return: the success of the operation
1249
1250   """
1251   msgs = []
1252   result = True
1253   r_dev = _RecursiveFindBD(disk)
1254   if r_dev is not None:
1255     r_path = r_dev.dev_path
1256     try:
1257       r_dev.Shutdown()
1258       DevCacheManager.RemoveCache(r_path)
1259     except errors.BlockDeviceError, err:
1260       msgs.append(str(err))
1261       result = False
1262
1263   if disk.children:
1264     for child in disk.children:
1265       c_status, c_msg = BlockdevShutdown(child)
1266       result = result and c_status
1267       if c_msg: # not an empty message
1268         msgs.append(c_msg)
1269
1270   return (result, "; ".join(msgs))
1271
1272
1273 def BlockdevAddchildren(parent_cdev, new_cdevs):
1274   """Extend a mirrored block device.
1275
1276   @type parent_cdev: L{objects.Disk}
1277   @param parent_cdev: the disk to which we should add children
1278   @type new_cdevs: list of L{objects.Disk}
1279   @param new_cdevs: the list of children which we should add
1280   @rtype: boolean
1281   @return: the success of the operation
1282
1283   """
1284   parent_bdev = _RecursiveFindBD(parent_cdev)
1285   if parent_bdev is None:
1286     logging.error("Can't find parent device")
1287     return False
1288   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1289   if new_bdevs.count(None) > 0:
1290     logging.error("Can't find new device(s) to add: %s:%s",
1291                   new_bdevs, new_cdevs)
1292     return False
1293   parent_bdev.AddChildren(new_bdevs)
1294   return True
1295
1296
1297 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1298   """Shrink a mirrored block device.
1299
1300   @type parent_cdev: L{objects.Disk}
1301   @param parent_cdev: the disk from which we should remove children
1302   @type new_cdevs: list of L{objects.Disk}
1303   @param new_cdevs: the list of children which we should remove
1304   @rtype: boolean
1305   @return: the success of the operation
1306
1307   """
1308   parent_bdev = _RecursiveFindBD(parent_cdev)
1309   if parent_bdev is None:
1310     logging.error("Can't find parent in remove children: %s", parent_cdev)
1311     return False
1312   devs = []
1313   for disk in new_cdevs:
1314     rpath = disk.StaticDevPath()
1315     if rpath is None:
1316       bd = _RecursiveFindBD(disk)
1317       if bd is None:
1318         logging.error("Can't find dynamic device %s while removing children",
1319                       disk)
1320         return False
1321       else:
1322         devs.append(bd.dev_path)
1323     else:
1324       devs.append(rpath)
1325   parent_bdev.RemoveChildren(devs)
1326   return True
1327
1328
1329 def BlockdevGetmirrorstatus(disks):
1330   """Get the mirroring status of a list of devices.
1331
1332   @type disks: list of L{objects.Disk}
1333   @param disks: the list of disks which we should query
1334   @rtype: disk
1335   @return:
1336       a list of (mirror_done, estimated_time) tuples, which
1337       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1338   @raise errors.BlockDeviceError: if any of the disks cannot be
1339       found
1340
1341   """
1342   stats = []
1343   for dsk in disks:
1344     rbd = _RecursiveFindBD(dsk)
1345     if rbd is None:
1346       raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1347     stats.append(rbd.CombinedSyncStatus())
1348   return stats
1349
1350
1351 def _RecursiveFindBD(disk):
1352   """Check if a device is activated.
1353
1354   If so, return informations about the real device.
1355
1356   @type disk: L{objects.Disk}
1357   @param disk: the disk object we need to find
1358
1359   @return: None if the device can't be found,
1360       otherwise the device instance
1361
1362   """
1363   children = []
1364   if disk.children:
1365     for chdisk in disk.children:
1366       children.append(_RecursiveFindBD(chdisk))
1367
1368   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1369
1370
1371 def BlockdevFind(disk):
1372   """Check if a device is activated.
1373
1374   If it is, return informations about the real device.
1375
1376   @type disk: L{objects.Disk}
1377   @param disk: the disk to find
1378   @rtype: None or tuple
1379   @return: None if the disk cannot be found, otherwise a
1380       tuple (device_path, major, minor, sync_percent,
1381       estimated_time, is_degraded)
1382
1383   """
1384   try:
1385     rbd = _RecursiveFindBD(disk)
1386   except errors.BlockDeviceError, err:
1387     return (False, str(err))
1388   if rbd is None:
1389     return (True, None)
1390   return (True, (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus())
1391
1392
1393 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1394   """Write a file to the filesystem.
1395
1396   This allows the master to overwrite(!) a file. It will only perform
1397   the operation if the file belongs to a list of configuration files.
1398
1399   @type file_name: str
1400   @param file_name: the target file name
1401   @type data: str
1402   @param data: the new contents of the file
1403   @type mode: int
1404   @param mode: the mode to give the file (can be None)
1405   @type uid: int
1406   @param uid: the owner of the file (can be -1 for default)
1407   @type gid: int
1408   @param gid: the group of the file (can be -1 for default)
1409   @type atime: float
1410   @param atime: the atime to set on the file (can be None)
1411   @type mtime: float
1412   @param mtime: the mtime to set on the file (can be None)
1413   @rtype: boolean
1414   @return: the success of the operation; errors are logged
1415       in the node daemon log
1416
1417   """
1418   if not os.path.isabs(file_name):
1419     logging.error("Filename passed to UploadFile is not absolute: '%s'",
1420                   file_name)
1421     return False
1422
1423   allowed_files = [
1424     constants.CLUSTER_CONF_FILE,
1425     constants.ETC_HOSTS,
1426     constants.SSH_KNOWN_HOSTS_FILE,
1427     constants.VNC_PASSWORD_FILE,
1428     ]
1429
1430   if file_name not in allowed_files:
1431     logging.error("Filename passed to UploadFile not in allowed"
1432                  " upload targets: '%s'", file_name)
1433     return False
1434
1435   raw_data = _Decompress(data)
1436
1437   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1438                   atime=atime, mtime=mtime)
1439   return True
1440
1441
1442 def WriteSsconfFiles(values):
1443   """Update all ssconf files.
1444
1445   Wrapper around the SimpleStore.WriteFiles.
1446
1447   """
1448   ssconf.SimpleStore().WriteFiles(values)
1449
1450
1451 def _ErrnoOrStr(err):
1452   """Format an EnvironmentError exception.
1453
1454   If the L{err} argument has an errno attribute, it will be looked up
1455   and converted into a textual C{E...} description. Otherwise the
1456   string representation of the error will be returned.
1457
1458   @type err: L{EnvironmentError}
1459   @param err: the exception to format
1460
1461   """
1462   if hasattr(err, 'errno'):
1463     detail = errno.errorcode[err.errno]
1464   else:
1465     detail = str(err)
1466   return detail
1467
1468
1469 def _OSOndiskVersion(name, os_dir):
1470   """Compute and return the API version of a given OS.
1471
1472   This function will try to read the API version of the OS given by
1473   the 'name' parameter and residing in the 'os_dir' directory.
1474
1475   @type name: str
1476   @param name: the OS name we should look for
1477   @type os_dir: str
1478   @param os_dir: the directory inwhich we should look for the OS
1479   @rtype: int or None
1480   @return:
1481       Either an integer denoting the version or None in the
1482       case when this is not a valid OS name.
1483   @raise errors.InvalidOS: if the OS cannot be found
1484
1485   """
1486   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1487
1488   try:
1489     st = os.stat(api_file)
1490   except EnvironmentError, err:
1491     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1492                            " found (%s)" % _ErrnoOrStr(err))
1493
1494   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1495     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1496                            " a regular file")
1497
1498   try:
1499     f = open(api_file)
1500     try:
1501       api_versions = f.readlines()
1502     finally:
1503       f.close()
1504   except EnvironmentError, err:
1505     raise errors.InvalidOS(name, os_dir, "error while reading the"
1506                            " API version (%s)" % _ErrnoOrStr(err))
1507
1508   api_versions = [version.strip() for version in api_versions]
1509   try:
1510     api_versions = [int(version) for version in api_versions]
1511   except (TypeError, ValueError), err:
1512     raise errors.InvalidOS(name, os_dir,
1513                            "API version is not integer (%s)" % str(err))
1514
1515   return api_versions
1516
1517
1518 def DiagnoseOS(top_dirs=None):
1519   """Compute the validity for all OSes.
1520
1521   @type top_dirs: list
1522   @param top_dirs: the list of directories in which to
1523       search (if not given defaults to
1524       L{constants.OS_SEARCH_PATH})
1525   @rtype: list of L{objects.OS}
1526   @return: an OS object for each name in all the given
1527       directories
1528
1529   """
1530   if top_dirs is None:
1531     top_dirs = constants.OS_SEARCH_PATH
1532
1533   result = []
1534   for dir_name in top_dirs:
1535     if os.path.isdir(dir_name):
1536       try:
1537         f_names = utils.ListVisibleFiles(dir_name)
1538       except EnvironmentError, err:
1539         logging.exception("Can't list the OS directory %s", dir_name)
1540         break
1541       for name in f_names:
1542         try:
1543           os_inst = OSFromDisk(name, base_dir=dir_name)
1544           result.append(os_inst)
1545         except errors.InvalidOS, err:
1546           result.append(objects.OS.FromInvalidOS(err))
1547
1548   return result
1549
1550
1551 def OSFromDisk(name, base_dir=None):
1552   """Create an OS instance from disk.
1553
1554   This function will return an OS instance if the given name is a
1555   valid OS name. Otherwise, it will raise an appropriate
1556   L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1557
1558   @type base_dir: string
1559   @keyword base_dir: Base directory containing OS installations.
1560                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1561   @rtype: L{objects.OS}
1562   @return: the OS instance if we find a valid one
1563   @raise errors.InvalidOS: if we don't find a valid OS
1564
1565   """
1566   if base_dir is None:
1567     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1568     if os_dir is None:
1569       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1570   else:
1571     os_dir = os.path.sep.join([base_dir, name])
1572
1573   api_versions = _OSOndiskVersion(name, os_dir)
1574
1575   if constants.OS_API_VERSION not in api_versions:
1576     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1577                            " (found %s want %s)"
1578                            % (api_versions, constants.OS_API_VERSION))
1579
1580   # OS Scripts dictionary, we will populate it with the actual script names
1581   os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1582
1583   for script in os_scripts:
1584     os_scripts[script] = os.path.sep.join([os_dir, script])
1585
1586     try:
1587       st = os.stat(os_scripts[script])
1588     except EnvironmentError, err:
1589       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1590                              (script, _ErrnoOrStr(err)))
1591
1592     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1593       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1594                              script)
1595
1596     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1597       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1598                              script)
1599
1600
1601   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1602                     create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1603                     export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1604                     import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1605                     rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1606                     api_versions=api_versions)
1607
1608 def OSEnvironment(instance, debug=0):
1609   """Calculate the environment for an os script.
1610
1611   @type instance: L{objects.Instance}
1612   @param instance: target instance for the os script run
1613   @type debug: integer
1614   @param debug: debug level (0 or 1, for OS Api 10)
1615   @rtype: dict
1616   @return: dict of environment variables
1617   @raise errors.BlockDeviceError: if the block device
1618       cannot be found
1619
1620   """
1621   result = {}
1622   result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1623   result['INSTANCE_NAME'] = instance.name
1624   result['INSTANCE_OS'] = instance.os
1625   result['HYPERVISOR'] = instance.hypervisor
1626   result['DISK_COUNT'] = '%d' % len(instance.disks)
1627   result['NIC_COUNT'] = '%d' % len(instance.nics)
1628   result['DEBUG_LEVEL'] = '%d' % debug
1629   for idx, disk in enumerate(instance.disks):
1630     real_disk = _RecursiveFindBD(disk)
1631     if real_disk is None:
1632       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1633                                     str(disk))
1634     real_disk.Open()
1635     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1636     # FIXME: When disks will have read-only mode, populate this
1637     result['DISK_%d_ACCESS' % idx] = disk.mode
1638     if constants.HV_DISK_TYPE in instance.hvparams:
1639       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1640         instance.hvparams[constants.HV_DISK_TYPE]
1641     if disk.dev_type in constants.LDS_BLOCK:
1642       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1643     elif disk.dev_type == constants.LD_FILE:
1644       result['DISK_%d_BACKEND_TYPE' % idx] = \
1645         'file:%s' % disk.physical_id[0]
1646   for idx, nic in enumerate(instance.nics):
1647     result['NIC_%d_MAC' % idx] = nic.mac
1648     if nic.ip:
1649       result['NIC_%d_IP' % idx] = nic.ip
1650     result['NIC_%d_BRIDGE' % idx] = nic.bridge
1651     if constants.HV_NIC_TYPE in instance.hvparams:
1652       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1653         instance.hvparams[constants.HV_NIC_TYPE]
1654
1655   return result
1656
1657 def BlockdevGrow(disk, amount):
1658   """Grow a stack of block devices.
1659
1660   This function is called recursively, with the childrens being the
1661   first ones to resize.
1662
1663   @type disk: L{objects.Disk}
1664   @param disk: the disk to be grown
1665   @rtype: (status, result)
1666   @return: a tuple with the status of the operation
1667       (True/False), and the errors message if status
1668       is False
1669
1670   """
1671   r_dev = _RecursiveFindBD(disk)
1672   if r_dev is None:
1673     return False, "Cannot find block device %s" % (disk,)
1674
1675   try:
1676     r_dev.Grow(amount)
1677   except errors.BlockDeviceError, err:
1678     return False, str(err)
1679
1680   return True, None
1681
1682
1683 def BlockdevSnapshot(disk):
1684   """Create a snapshot copy of a block device.
1685
1686   This function is called recursively, and the snapshot is actually created
1687   just for the leaf lvm backend device.
1688
1689   @type disk: L{objects.Disk}
1690   @param disk: the disk to be snapshotted
1691   @rtype: string
1692   @return: snapshot disk path
1693
1694   """
1695   if disk.children:
1696     if len(disk.children) == 1:
1697       # only one child, let's recurse on it
1698       return BlockdevSnapshot(disk.children[0])
1699     else:
1700       # more than one child, choose one that matches
1701       for child in disk.children:
1702         if child.size == disk.size:
1703           # return implies breaking the loop
1704           return BlockdevSnapshot(child)
1705   elif disk.dev_type == constants.LD_LV:
1706     r_dev = _RecursiveFindBD(disk)
1707     if r_dev is not None:
1708       # let's stay on the safe side and ask for the full size, for now
1709       return r_dev.Snapshot(disk.size)
1710     else:
1711       return None
1712   else:
1713     raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1714                                  " '%s' of type '%s'" %
1715                                  (disk.unique_id, disk.dev_type))
1716
1717
1718 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1719   """Export a block device snapshot to a remote node.
1720
1721   @type disk: L{objects.Disk}
1722   @param disk: the description of the disk to export
1723   @type dest_node: str
1724   @param dest_node: the destination node to export to
1725   @type instance: L{objects.Instance}
1726   @param instance: the instance object to whom the disk belongs
1727   @type cluster_name: str
1728   @param cluster_name: the cluster name, needed for SSH hostalias
1729   @type idx: int
1730   @param idx: the index of the disk in the instance's disk list,
1731       used to export to the OS scripts environment
1732   @rtype: boolean
1733   @return: the success of the operation
1734
1735   """
1736   export_env = OSEnvironment(instance)
1737
1738   inst_os = OSFromDisk(instance.os)
1739   export_script = inst_os.export_script
1740
1741   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1742                                      instance.name, int(time.time()))
1743   if not os.path.exists(constants.LOG_OS_DIR):
1744     os.mkdir(constants.LOG_OS_DIR, 0750)
1745   real_disk = _RecursiveFindBD(disk)
1746   if real_disk is None:
1747     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1748                                   str(disk))
1749   real_disk.Open()
1750
1751   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1752   export_env['EXPORT_INDEX'] = str(idx)
1753
1754   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1755   destfile = disk.physical_id[1]
1756
1757   # the target command is built out of three individual commands,
1758   # which are joined by pipes; we check each individual command for
1759   # valid parameters
1760   expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1761                                export_script, logfile)
1762
1763   comprcmd = "gzip"
1764
1765   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1766                                 destdir, destdir, destfile)
1767   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1768                                                    constants.GANETI_RUNAS,
1769                                                    destcmd)
1770
1771   # all commands have been checked, so we're safe to combine them
1772   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1773
1774   result = utils.RunCmd(command, env=export_env)
1775
1776   if result.failed:
1777     logging.error("os snapshot export command '%s' returned error: %s"
1778                   " output: %s", command, result.fail_reason, result.output)
1779     return False
1780
1781   return True
1782
1783
1784 def FinalizeExport(instance, snap_disks):
1785   """Write out the export configuration information.
1786
1787   @type instance: L{objects.Instance}
1788   @param instance: the instance which we export, used for
1789       saving configuration
1790   @type snap_disks: list of L{objects.Disk}
1791   @param snap_disks: list of snapshot block devices, which
1792       will be used to get the actual name of the dump file
1793
1794   @rtype: boolean
1795   @return: the success of the operation
1796
1797   """
1798   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1799   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1800
1801   config = objects.SerializableConfigParser()
1802
1803   config.add_section(constants.INISECT_EXP)
1804   config.set(constants.INISECT_EXP, 'version', '0')
1805   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1806   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1807   config.set(constants.INISECT_EXP, 'os', instance.os)
1808   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1809
1810   config.add_section(constants.INISECT_INS)
1811   config.set(constants.INISECT_INS, 'name', instance.name)
1812   config.set(constants.INISECT_INS, 'memory', '%d' %
1813              instance.beparams[constants.BE_MEMORY])
1814   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1815              instance.beparams[constants.BE_VCPUS])
1816   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1817
1818   nic_total = 0
1819   for nic_count, nic in enumerate(instance.nics):
1820     nic_total += 1
1821     config.set(constants.INISECT_INS, 'nic%d_mac' %
1822                nic_count, '%s' % nic.mac)
1823     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1824     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1825                '%s' % nic.bridge)
1826   # TODO: redundant: on load can read nics until it doesn't exist
1827   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1828
1829   disk_total = 0
1830   for disk_count, disk in enumerate(snap_disks):
1831     if disk:
1832       disk_total += 1
1833       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1834                  ('%s' % disk.iv_name))
1835       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1836                  ('%s' % disk.physical_id[1]))
1837       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1838                  ('%d' % disk.size))
1839
1840   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1841
1842   utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1843                   data=config.Dumps())
1844   shutil.rmtree(finaldestdir, True)
1845   shutil.move(destdir, finaldestdir)
1846
1847   return True
1848
1849
1850 def ExportInfo(dest):
1851   """Get export configuration information.
1852
1853   @type dest: str
1854   @param dest: directory containing the export
1855
1856   @rtype: L{objects.SerializableConfigParser}
1857   @return: a serializable config file containing the
1858       export info
1859
1860   """
1861   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1862
1863   config = objects.SerializableConfigParser()
1864   config.read(cff)
1865
1866   if (not config.has_section(constants.INISECT_EXP) or
1867       not config.has_section(constants.INISECT_INS)):
1868     return None
1869
1870   return config
1871
1872
1873 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1874   """Import an os image into an instance.
1875
1876   @type instance: L{objects.Instance}
1877   @param instance: instance to import the disks into
1878   @type src_node: string
1879   @param src_node: source node for the disk images
1880   @type src_images: list of string
1881   @param src_images: absolute paths of the disk images
1882   @rtype: list of boolean
1883   @return: each boolean represent the success of importing the n-th disk
1884
1885   """
1886   import_env = OSEnvironment(instance)
1887   inst_os = OSFromDisk(instance.os)
1888   import_script = inst_os.import_script
1889
1890   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1891                                         instance.name, int(time.time()))
1892   if not os.path.exists(constants.LOG_OS_DIR):
1893     os.mkdir(constants.LOG_OS_DIR, 0750)
1894
1895   comprcmd = "gunzip"
1896   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1897                                import_script, logfile)
1898
1899   final_result = []
1900   for idx, image in enumerate(src_images):
1901     if image:
1902       destcmd = utils.BuildShellCmd('cat %s', image)
1903       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1904                                                        constants.GANETI_RUNAS,
1905                                                        destcmd)
1906       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1907       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1908       import_env['IMPORT_INDEX'] = str(idx)
1909       result = utils.RunCmd(command, env=import_env)
1910       if result.failed:
1911         logging.error("Disk import command '%s' returned error: %s"
1912                       " output: %s", command, result.fail_reason,
1913                       result.output)
1914         final_result.append(False)
1915       else:
1916         final_result.append(True)
1917     else:
1918       final_result.append(True)
1919
1920   return final_result
1921
1922
1923 def ListExports():
1924   """Return a list of exports currently available on this machine.
1925
1926   @rtype: list
1927   @return: list of the exports
1928
1929   """
1930   if os.path.isdir(constants.EXPORT_DIR):
1931     return utils.ListVisibleFiles(constants.EXPORT_DIR)
1932   else:
1933     return []
1934
1935
1936 def RemoveExport(export):
1937   """Remove an existing export from the node.
1938
1939   @type export: str
1940   @param export: the name of the export to remove
1941   @rtype: boolean
1942   @return: the success of the operation
1943
1944   """
1945   target = os.path.join(constants.EXPORT_DIR, export)
1946
1947   shutil.rmtree(target)
1948   # TODO: catch some of the relevant exceptions and provide a pretty
1949   # error message if rmtree fails.
1950
1951   return True
1952
1953
1954 def BlockdevRename(devlist):
1955   """Rename a list of block devices.
1956
1957   @type devlist: list of tuples
1958   @param devlist: list of tuples of the form  (disk,
1959       new_logical_id, new_physical_id); disk is an
1960       L{objects.Disk} object describing the current disk,
1961       and new logical_id/physical_id is the name we
1962       rename it to
1963   @rtype: boolean
1964   @return: True if all renames succeeded, False otherwise
1965
1966   """
1967   result = True
1968   for disk, unique_id in devlist:
1969     dev = _RecursiveFindBD(disk)
1970     if dev is None:
1971       result = False
1972       continue
1973     try:
1974       old_rpath = dev.dev_path
1975       dev.Rename(unique_id)
1976       new_rpath = dev.dev_path
1977       if old_rpath != new_rpath:
1978         DevCacheManager.RemoveCache(old_rpath)
1979         # FIXME: we should add the new cache information here, like:
1980         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1981         # but we don't have the owner here - maybe parse from existing
1982         # cache? for now, we only lose lvm data when we rename, which
1983         # is less critical than DRBD or MD
1984     except errors.BlockDeviceError, err:
1985       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1986       result = False
1987   return result
1988
1989
1990 def _TransformFileStorageDir(file_storage_dir):
1991   """Checks whether given file_storage_dir is valid.
1992
1993   Checks wheter the given file_storage_dir is within the cluster-wide
1994   default file_storage_dir stored in SimpleStore. Only paths under that
1995   directory are allowed.
1996
1997   @type file_storage_dir: str
1998   @param file_storage_dir: the path to check
1999
2000   @return: the normalized path if valid, None otherwise
2001
2002   """
2003   cfg = _GetConfig()
2004   file_storage_dir = os.path.normpath(file_storage_dir)
2005   base_file_storage_dir = cfg.GetFileStorageDir()
2006   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2007       base_file_storage_dir):
2008     logging.error("file storage directory '%s' is not under base file"
2009                   " storage directory '%s'",
2010                   file_storage_dir, base_file_storage_dir)
2011     return None
2012   return file_storage_dir
2013
2014
2015 def CreateFileStorageDir(file_storage_dir):
2016   """Create file storage directory.
2017
2018   @type file_storage_dir: str
2019   @param file_storage_dir: directory to create
2020
2021   @rtype: tuple
2022   @return: tuple with first element a boolean indicating wheter dir
2023       creation was successful or not
2024
2025   """
2026   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2027   result = True,
2028   if not file_storage_dir:
2029     result = False,
2030   else:
2031     if os.path.exists(file_storage_dir):
2032       if not os.path.isdir(file_storage_dir):
2033         logging.error("'%s' is not a directory", file_storage_dir)
2034         result = False,
2035     else:
2036       try:
2037         os.makedirs(file_storage_dir, 0750)
2038       except OSError, err:
2039         logging.error("Cannot create file storage directory '%s': %s",
2040                       file_storage_dir, err)
2041         result = False,
2042   return result
2043
2044
2045 def RemoveFileStorageDir(file_storage_dir):
2046   """Remove file storage directory.
2047
2048   Remove it only if it's empty. If not log an error and return.
2049
2050   @type file_storage_dir: str
2051   @param file_storage_dir: the directory we should cleanup
2052   @rtype: tuple (success,)
2053   @return: tuple of one element, C{success}, denoting
2054       whether the operation was successfull
2055
2056   """
2057   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2058   result = True,
2059   if not file_storage_dir:
2060     result = False,
2061   else:
2062     if os.path.exists(file_storage_dir):
2063       if not os.path.isdir(file_storage_dir):
2064         logging.error("'%s' is not a directory", file_storage_dir)
2065         result = False,
2066       # deletes dir only if empty, otherwise we want to return False
2067       try:
2068         os.rmdir(file_storage_dir)
2069       except OSError, err:
2070         logging.exception("Cannot remove file storage directory '%s'",
2071                           file_storage_dir)
2072         result = False,
2073   return result
2074
2075
2076 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2077   """Rename the file storage directory.
2078
2079   @type old_file_storage_dir: str
2080   @param old_file_storage_dir: the current path
2081   @type new_file_storage_dir: str
2082   @param new_file_storage_dir: the name we should rename to
2083   @rtype: tuple (success,)
2084   @return: tuple of one element, C{success}, denoting
2085       whether the operation was successful
2086
2087   """
2088   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2089   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2090   result = True,
2091   if not old_file_storage_dir or not new_file_storage_dir:
2092     result = False,
2093   else:
2094     if not os.path.exists(new_file_storage_dir):
2095       if os.path.isdir(old_file_storage_dir):
2096         try:
2097           os.rename(old_file_storage_dir, new_file_storage_dir)
2098         except OSError, err:
2099           logging.exception("Cannot rename '%s' to '%s'",
2100                             old_file_storage_dir, new_file_storage_dir)
2101           result =  False,
2102       else:
2103         logging.error("'%s' is not a directory", old_file_storage_dir)
2104         result = False,
2105     else:
2106       if os.path.exists(old_file_storage_dir):
2107         logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
2108                       old_file_storage_dir, new_file_storage_dir)
2109         result = False,
2110   return result
2111
2112
2113 def _IsJobQueueFile(file_name):
2114   """Checks whether the given filename is in the queue directory.
2115
2116   @type file_name: str
2117   @param file_name: the file name we should check
2118   @rtype: boolean
2119   @return: whether the file is under the queue directory
2120
2121   """
2122   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2123   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2124
2125   if not result:
2126     logging.error("'%s' is not a file in the queue directory",
2127                   file_name)
2128
2129   return result
2130
2131
2132 def JobQueueUpdate(file_name, content):
2133   """Updates a file in the queue directory.
2134
2135   This is just a wrapper over L{utils.WriteFile}, with proper
2136   checking.
2137
2138   @type file_name: str
2139   @param file_name: the job file name
2140   @type content: str
2141   @param content: the new job contents
2142   @rtype: boolean
2143   @return: the success of the operation
2144
2145   """
2146   if not _IsJobQueueFile(file_name):
2147     return False
2148
2149   # Write and replace the file atomically
2150   utils.WriteFile(file_name, data=_Decompress(content))
2151
2152   return True
2153
2154
2155 def JobQueueRename(old, new):
2156   """Renames a job queue file.
2157
2158   This is just a wrapper over os.rename with proper checking.
2159
2160   @type old: str
2161   @param old: the old (actual) file name
2162   @type new: str
2163   @param new: the desired file name
2164   @rtype: boolean
2165   @return: the success of the operation
2166
2167   """
2168   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2169     return False
2170
2171   utils.RenameFile(old, new, mkdir=True)
2172
2173   return True
2174
2175
2176 def JobQueueSetDrainFlag(drain_flag):
2177   """Set the drain flag for the queue.
2178
2179   This will set or unset the queue drain flag.
2180
2181   @type drain_flag: boolean
2182   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2183   @rtype: boolean
2184   @return: always True
2185   @warning: the function always returns True
2186
2187   """
2188   if drain_flag:
2189     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2190   else:
2191     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2192
2193   return True
2194
2195
2196 def BlockdevClose(instance_name, disks):
2197   """Closes the given block devices.
2198
2199   This means they will be switched to secondary mode (in case of
2200   DRBD).
2201
2202   @param instance_name: if the argument is not empty, the symlinks
2203       of this instance will be removed
2204   @type disks: list of L{objects.Disk}
2205   @param disks: the list of disks to be closed
2206   @rtype: tuple (success, message)
2207   @return: a tuple of success and message, where success
2208       indicates the succes of the operation, and message
2209       which will contain the error details in case we
2210       failed
2211
2212   """
2213   bdevs = []
2214   for cf in disks:
2215     rd = _RecursiveFindBD(cf)
2216     if rd is None:
2217       return (False, "Can't find device %s" % cf)
2218     bdevs.append(rd)
2219
2220   msg = []
2221   for rd in bdevs:
2222     try:
2223       rd.Close()
2224     except errors.BlockDeviceError, err:
2225       msg.append(str(err))
2226   if msg:
2227     return (False, "Can't make devices secondary: %s" % ",".join(msg))
2228   else:
2229     if instance_name:
2230       _RemoveBlockDevLinks(instance_name, disks)
2231     return (True, "All devices secondary")
2232
2233
2234 def ValidateHVParams(hvname, hvparams):
2235   """Validates the given hypervisor parameters.
2236
2237   @type hvname: string
2238   @param hvname: the hypervisor name
2239   @type hvparams: dict
2240   @param hvparams: the hypervisor parameters to be validated
2241   @rtype: tuple (success, message)
2242   @return: a tuple of success and message, where success
2243       indicates the succes of the operation, and message
2244       which will contain the error details in case we
2245       failed
2246
2247   """
2248   try:
2249     hv_type = hypervisor.GetHypervisor(hvname)
2250     hv_type.ValidateParameters(hvparams)
2251     return (True, "Validation passed")
2252   except errors.HypervisorError, err:
2253     return (False, str(err))
2254
2255
2256 def DemoteFromMC():
2257   """Demotes the current node from master candidate role.
2258
2259   """
2260   # try to ensure we're not the master by mistake
2261   master, myself = ssconf.GetMasterAndMyself()
2262   if master == myself:
2263     return (False, "ssconf status shows I'm the master node, will not demote")
2264   pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2265   if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2266     return (False, "The master daemon is running, will not demote")
2267   try:
2268     utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2269   except EnvironmentError, err:
2270     if err.errno != errno.ENOENT:
2271       return (False, "Error while backing up cluster file: %s" % str(err))
2272   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2273   return (True, "Done")
2274
2275
2276 def _FindDisks(nodes_ip, disks):
2277   """Sets the physical ID on disks and returns the block devices.
2278
2279   """
2280   # set the correct physical ID
2281   my_name = utils.HostInfo().name
2282   for cf in disks:
2283     cf.SetPhysicalID(my_name, nodes_ip)
2284
2285   bdevs = []
2286
2287   for cf in disks:
2288     rd = _RecursiveFindBD(cf)
2289     if rd is None:
2290       return (False, "Can't find device %s" % cf)
2291     bdevs.append(rd)
2292   return (True, bdevs)
2293
2294
2295 def DrbdDisconnectNet(nodes_ip, disks):
2296   """Disconnects the network on a list of drbd devices.
2297
2298   """
2299   status, bdevs = _FindDisks(nodes_ip, disks)
2300   if not status:
2301     return status, bdevs
2302
2303   # disconnect disks
2304   for rd in bdevs:
2305     try:
2306       rd.DisconnectNet()
2307     except errors.BlockDeviceError, err:
2308       logging.exception("Failed to go into standalone mode")
2309       return (False, "Can't change network configuration: %s" % str(err))
2310   return (True, "All disks are now disconnected")
2311
2312
2313 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2314   """Attaches the network on a list of drbd devices.
2315
2316   """
2317   status, bdevs = _FindDisks(nodes_ip, disks)
2318   if not status:
2319     return status, bdevs
2320
2321   if multimaster:
2322     for idx, rd in enumerate(bdevs):
2323       try:
2324         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2325       except EnvironmentError, err:
2326         return (False, "Can't create symlink: %s" % str(err))
2327   # reconnect disks, switch to new master configuration and if
2328   # needed primary mode
2329   for rd in bdevs:
2330     try:
2331       rd.AttachNet(multimaster)
2332     except errors.BlockDeviceError, err:
2333       return (False, "Can't change network configuration: %s" % str(err))
2334   # wait until the disks are connected; we need to retry the re-attach
2335   # if the device becomes standalone, as this might happen if the one
2336   # node disconnects and reconnects in a different mode before the
2337   # other node reconnects; in this case, one or both of the nodes will
2338   # decide it has wrong configuration and switch to standalone
2339   RECONNECT_TIMEOUT = 2 * 60
2340   sleep_time = 0.100 # start with 100 miliseconds
2341   timeout_limit = time.time() + RECONNECT_TIMEOUT
2342   while time.time() < timeout_limit:
2343     all_connected = True
2344     for rd in bdevs:
2345       stats = rd.GetProcStatus()
2346       if not (stats.is_connected or stats.is_in_resync):
2347         all_connected = False
2348       if stats.is_standalone:
2349         # peer had different config info and this node became
2350         # standalone, even though this should not happen with the
2351         # new staged way of changing disk configs
2352         try:
2353           rd.ReAttachNet(multimaster)
2354         except errors.BlockDeviceError, err:
2355           return (False, "Can't change network configuration: %s" % str(err))
2356     if all_connected:
2357       break
2358     time.sleep(sleep_time)
2359     sleep_time = min(5, sleep_time * 1.5)
2360   if not all_connected:
2361     return (False, "Timeout in disk reconnecting")
2362   if multimaster:
2363     # change to primary mode
2364     for rd in bdevs:
2365       try:
2366         rd.Open()
2367       except errors.BlockDeviceError, err:
2368         return (False, "Can't change to primary mode: %s" % str(err))
2369   if multimaster:
2370     msg = "multi-master and primary"
2371   else:
2372     msg = "single-master"
2373   return (True, "Disks are now configured as %s" % msg)
2374
2375
2376 def DrbdWaitSync(nodes_ip, disks):
2377   """Wait until DRBDs have synchronized.
2378
2379   """
2380   status, bdevs = _FindDisks(nodes_ip, disks)
2381   if not status:
2382     return status, bdevs
2383
2384   min_resync = 100
2385   alldone = True
2386   failure = False
2387   for rd in bdevs:
2388     stats = rd.GetProcStatus()
2389     if not (stats.is_connected or stats.is_in_resync):
2390       failure = True
2391       break
2392     alldone = alldone and (not stats.is_in_resync)
2393     if stats.sync_percent is not None:
2394       min_resync = min(min_resync, stats.sync_percent)
2395   return (not failure, (alldone, min_resync))
2396
2397
2398 class HooksRunner(object):
2399   """Hook runner.
2400
2401   This class is instantiated on the node side (ganeti-noded) and not
2402   on the master side.
2403
2404   """
2405   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2406
2407   def __init__(self, hooks_base_dir=None):
2408     """Constructor for hooks runner.
2409
2410     @type hooks_base_dir: str or None
2411     @param hooks_base_dir: if not None, this overrides the
2412         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2413
2414     """
2415     if hooks_base_dir is None:
2416       hooks_base_dir = constants.HOOKS_BASE_DIR
2417     self._BASE_DIR = hooks_base_dir
2418
2419   @staticmethod
2420   def ExecHook(script, env):
2421     """Exec one hook script.
2422
2423     @type script: str
2424     @param script: the full path to the script
2425     @type env: dict
2426     @param env: the environment with which to exec the script
2427     @rtype: tuple (success, message)
2428     @return: a tuple of success and message, where success
2429         indicates the succes of the operation, and message
2430         which will contain the error details in case we
2431         failed
2432
2433     """
2434     # exec the process using subprocess and log the output
2435     fdstdin = None
2436     try:
2437       fdstdin = open("/dev/null", "r")
2438       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2439                                stderr=subprocess.STDOUT, close_fds=True,
2440                                shell=False, cwd="/", env=env)
2441       output = ""
2442       try:
2443         output = child.stdout.read(4096)
2444         child.stdout.close()
2445       except EnvironmentError, err:
2446         output += "Hook script error: %s" % str(err)
2447
2448       while True:
2449         try:
2450           result = child.wait()
2451           break
2452         except EnvironmentError, err:
2453           if err.errno == errno.EINTR:
2454             continue
2455           raise
2456     finally:
2457       # try not to leak fds
2458       for fd in (fdstdin, ):
2459         if fd is not None:
2460           try:
2461             fd.close()
2462           except EnvironmentError, err:
2463             # just log the error
2464             #logging.exception("Error while closing fd %s", fd)
2465             pass
2466
2467     return result == 0, utils.SafeEncode(output.strip())
2468
2469   def RunHooks(self, hpath, phase, env):
2470     """Run the scripts in the hooks directory.
2471
2472     @type hpath: str
2473     @param hpath: the path to the hooks directory which
2474         holds the scripts
2475     @type phase: str
2476     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2477         L{constants.HOOKS_PHASE_POST}
2478     @type env: dict
2479     @param env: dictionary with the environment for the hook
2480     @rtype: list
2481     @return: list of 3-element tuples:
2482       - script path
2483       - script result, either L{constants.HKR_SUCCESS} or
2484         L{constants.HKR_FAIL}
2485       - output of the script
2486
2487     @raise errors.ProgrammerError: for invalid input
2488         parameters
2489
2490     """
2491     if phase == constants.HOOKS_PHASE_PRE:
2492       suffix = "pre"
2493     elif phase == constants.HOOKS_PHASE_POST:
2494       suffix = "post"
2495     else:
2496       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2497     rr = []
2498
2499     subdir = "%s-%s.d" % (hpath, suffix)
2500     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2501     try:
2502       dir_contents = utils.ListVisibleFiles(dir_name)
2503     except OSError, err:
2504       # FIXME: must log output in case of failures
2505       return rr
2506
2507     # we use the standard python sort order,
2508     # so 00name is the recommended naming scheme
2509     dir_contents.sort()
2510     for relname in dir_contents:
2511       fname = os.path.join(dir_name, relname)
2512       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2513           self.RE_MASK.match(relname) is not None):
2514         rrval = constants.HKR_SKIP
2515         output = ""
2516       else:
2517         result, output = self.ExecHook(fname, env)
2518         if not result:
2519           rrval = constants.HKR_FAIL
2520         else:
2521           rrval = constants.HKR_SUCCESS
2522       rr.append(("%s/%s" % (subdir, relname), rrval, output))
2523
2524     return rr
2525
2526
2527 class IAllocatorRunner(object):
2528   """IAllocator runner.
2529
2530   This class is instantiated on the node side (ganeti-noded) and not on
2531   the master side.
2532
2533   """
2534   def Run(self, name, idata):
2535     """Run an iallocator script.
2536
2537     @type name: str
2538     @param name: the iallocator script name
2539     @type idata: str
2540     @param idata: the allocator input data
2541
2542     @rtype: tuple
2543     @return: four element tuple of:
2544        - run status (one of the IARUN_ constants)
2545        - stdout
2546        - stderr
2547        - fail reason (as from L{utils.RunResult})
2548
2549     """
2550     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2551                                   os.path.isfile)
2552     if alloc_script is None:
2553       return (constants.IARUN_NOTFOUND, None, None, None)
2554
2555     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2556     try:
2557       os.write(fd, idata)
2558       os.close(fd)
2559       result = utils.RunCmd([alloc_script, fin_name])
2560       if result.failed:
2561         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2562                 result.fail_reason)
2563     finally:
2564       os.unlink(fin_name)
2565
2566     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2567
2568
2569 class DevCacheManager(object):
2570   """Simple class for managing a cache of block device information.
2571
2572   """
2573   _DEV_PREFIX = "/dev/"
2574   _ROOT_DIR = constants.BDEV_CACHE_DIR
2575
2576   @classmethod
2577   def _ConvertPath(cls, dev_path):
2578     """Converts a /dev/name path to the cache file name.
2579
2580     This replaces slashes with underscores and strips the /dev
2581     prefix. It then returns the full path to the cache file.
2582
2583     @type dev_path: str
2584     @param dev_path: the C{/dev/} path name
2585     @rtype: str
2586     @return: the converted path name
2587
2588     """
2589     if dev_path.startswith(cls._DEV_PREFIX):
2590       dev_path = dev_path[len(cls._DEV_PREFIX):]
2591     dev_path = dev_path.replace("/", "_")
2592     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2593     return fpath
2594
2595   @classmethod
2596   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2597     """Updates the cache information for a given device.
2598
2599     @type dev_path: str
2600     @param dev_path: the pathname of the device
2601     @type owner: str
2602     @param owner: the owner (instance name) of the device
2603     @type on_primary: bool
2604     @param on_primary: whether this is the primary
2605         node nor not
2606     @type iv_name: str
2607     @param iv_name: the instance-visible name of the
2608         device, as in objects.Disk.iv_name
2609
2610     @rtype: None
2611
2612     """
2613     if dev_path is None:
2614       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2615       return
2616     fpath = cls._ConvertPath(dev_path)
2617     if on_primary:
2618       state = "primary"
2619     else:
2620       state = "secondary"
2621     if iv_name is None:
2622       iv_name = "not_visible"
2623     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2624     try:
2625       utils.WriteFile(fpath, data=fdata)
2626     except EnvironmentError, err:
2627       logging.exception("Can't update bdev cache for %s", dev_path)
2628
2629   @classmethod
2630   def RemoveCache(cls, dev_path):
2631     """Remove data for a dev_path.
2632
2633     This is just a wrapper over L{utils.RemoveFile} with a converted
2634     path name and logging.
2635
2636     @type dev_path: str
2637     @param dev_path: the pathname of the device
2638
2639     @rtype: None
2640
2641     """
2642     if dev_path is None:
2643       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2644       return
2645     fpath = cls._ConvertPath(dev_path)
2646     try:
2647       utils.RemoveFile(fpath)
2648     except EnvironmentError, err:
2649       logging.exception("Can't update bdev cache for %s", dev_path)