Add a few more checks to verify config
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36 import zlib
37 import base64
38
39 from ganeti import errors
40 from ganeti import utils
41 from ganeti import ssh
42 from ganeti import hypervisor
43 from ganeti import constants
44 from ganeti import bdev
45 from ganeti import objects
46 from ganeti import ssconf
47
48
49 def _GetConfig():
50   """Simple wrapper to return a SimpleStore.
51
52   @rtype: L{ssconf.SimpleStore}
53   @return: a SimpleStore instance
54
55   """
56   return ssconf.SimpleStore()
57
58
59 def _GetSshRunner(cluster_name):
60   """Simple wrapper to return an SshRunner.
61
62   @type cluster_name: str
63   @param cluster_name: the cluster name, which is needed
64       by the SshRunner constructor
65   @rtype: L{ssh.SshRunner}
66   @return: an SshRunner instance
67
68   """
69   return ssh.SshRunner(cluster_name)
70
71
72 def _Decompress(data):
73   """Unpacks data compressed by the RPC client.
74
75   @type data: list or tuple
76   @param data: Data sent by RPC client
77   @rtype: str
78   @return: Decompressed data
79
80   """
81   assert isinstance(data, (list, tuple))
82   assert len(data) == 2
83   (encoding, content) = data
84   if encoding == constants.RPC_ENCODING_NONE:
85     return content
86   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
87     return zlib.decompress(base64.b64decode(content))
88   else:
89     raise AssertionError("Unknown data encoding")
90
91
92 def _CleanDirectory(path, exclude=None):
93   """Removes all regular files in a directory.
94
95   @type path: str
96   @param path: the directory to clean
97   @type exclude: list
98   @param exclude: list of files to be excluded, defaults
99       to the empty list
100
101   """
102   if not os.path.isdir(path):
103     return
104   if exclude is None:
105     exclude = []
106   else:
107     # Normalize excluded paths
108     exclude = [os.path.normpath(i) for i in exclude]
109
110   for rel_name in utils.ListVisibleFiles(path):
111     full_name = os.path.normpath(os.path.join(path, rel_name))
112     if full_name in exclude:
113       continue
114     if os.path.isfile(full_name) and not os.path.islink(full_name):
115       utils.RemoveFile(full_name)
116
117
118 def JobQueuePurge():
119   """Removes job queue files and archived jobs.
120
121   @rtype: None
122
123   """
124   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
125   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
126
127
128 def GetMasterInfo():
129   """Returns master information.
130
131   This is an utility function to compute master information, either
132   for consumption here or from the node daemon.
133
134   @rtype: tuple
135   @return: (master_netdev, master_ip, master_name) if we have a good
136       configuration, otherwise (None, None, None)
137
138   """
139   try:
140     cfg = _GetConfig()
141     master_netdev = cfg.GetMasterNetdev()
142     master_ip = cfg.GetMasterIP()
143     master_node = cfg.GetMasterNode()
144   except errors.ConfigurationError:
145     logging.exception("Cluster configuration incomplete")
146     return (None, None, None)
147   return (master_netdev, master_ip, master_node)
148
149
150 def StartMaster(start_daemons):
151   """Activate local node as master node.
152
153   The function will always try activate the IP address of the master
154   (unless someone else has it). It will also start the master daemons,
155   based on the start_daemons parameter.
156
157   @type start_daemons: boolean
158   @param start_daemons: whther to also start the master
159       daemons (ganeti-masterd and ganeti-rapi)
160   @rtype: None
161
162   """
163   ok = True
164   master_netdev, master_ip, _ = GetMasterInfo()
165   if not master_netdev:
166     return False
167
168   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
169     if utils.OwnIpAddress(master_ip):
170       # we already have the ip:
171       logging.debug("Already started")
172     else:
173       logging.error("Someone else has the master ip, not activating")
174       ok = False
175   else:
176     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
177                            "dev", master_netdev, "label",
178                            "%s:0" % master_netdev])
179     if result.failed:
180       logging.error("Can't activate master IP: %s", result.output)
181       ok = False
182
183     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
184                            "-s", master_ip, master_ip])
185     # we'll ignore the exit code of arping
186
187   # and now start the master and rapi daemons
188   if start_daemons:
189     for daemon in 'ganeti-masterd', 'ganeti-rapi':
190       result = utils.RunCmd([daemon])
191       if result.failed:
192         logging.error("Can't start daemon %s: %s", daemon, result.output)
193         ok = False
194   return ok
195
196
197 def StopMaster(stop_daemons):
198   """Deactivate this node as master.
199
200   The function will always try to deactivate the IP address of the
201   master. It will also stop the master daemons depending on the
202   stop_daemons parameter.
203
204   @type stop_daemons: boolean
205   @param stop_daemons: whether to also stop the master daemons
206       (ganeti-masterd and ganeti-rapi)
207   @rtype: None
208
209   """
210   master_netdev, master_ip, _ = GetMasterInfo()
211   if not master_netdev:
212     return False
213
214   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
215                          "dev", master_netdev])
216   if result.failed:
217     logging.error("Can't remove the master IP, error: %s", result.output)
218     # but otherwise ignore the failure
219
220   if stop_daemons:
221     # stop/kill the rapi and the master daemon
222     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
223       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
224
225   return True
226
227
228 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
229   """Joins this node to the cluster.
230
231   This does the following:
232       - updates the hostkeys of the machine (rsa and dsa)
233       - adds the ssh private key to the user
234       - adds the ssh public key to the users' authorized_keys file
235
236   @type dsa: str
237   @param dsa: the DSA private key to write
238   @type dsapub: str
239   @param dsapub: the DSA public key to write
240   @type rsa: str
241   @param rsa: the RSA private key to write
242   @type rsapub: str
243   @param rsapub: the RSA public key to write
244   @type sshkey: str
245   @param sshkey: the SSH private key to write
246   @type sshpub: str
247   @param sshpub: the SSH public key to write
248   @rtype: boolean
249   @return: the success of the operation
250
251   """
252   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
253                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
254                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
255                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
256   for name, content, mode in sshd_keys:
257     utils.WriteFile(name, data=content, mode=mode)
258
259   try:
260     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
261                                                     mkdir=True)
262   except errors.OpExecError, err:
263     msg = "Error while processing user ssh files"
264     logging.exception(msg)
265     return (False, "%s: %s" % (msg, err))
266
267   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
268     utils.WriteFile(name, data=content, mode=0600)
269
270   utils.AddAuthorizedKey(auth_keys, sshpub)
271
272   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
273
274   return (True, "Node added successfully")
275
276
277 def LeaveCluster():
278   """Cleans up and remove the current node.
279
280   This function cleans up and prepares the current node to be removed
281   from the cluster.
282
283   If processing is successful, then it raises an
284   L{errors.QuitGanetiException} which is used as a special case to
285   shutdown the node daemon.
286
287   """
288   _CleanDirectory(constants.DATA_DIR)
289   JobQueuePurge()
290
291   try:
292     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
293   except errors.OpExecError:
294     logging.exception("Error while processing ssh files")
295     return
296
297   f = open(pub_key, 'r')
298   try:
299     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
300   finally:
301     f.close()
302
303   utils.RemoveFile(priv_key)
304   utils.RemoveFile(pub_key)
305
306   # Return a reassuring string to the caller, and quit
307   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
308
309
310 def GetNodeInfo(vgname, hypervisor_type):
311   """Gives back a hash with different information about the node.
312
313   @type vgname: C{string}
314   @param vgname: the name of the volume group to ask for disk space information
315   @type hypervisor_type: C{str}
316   @param hypervisor_type: the name of the hypervisor to ask for
317       memory information
318   @rtype: C{dict}
319   @return: dictionary with the following keys:
320       - vg_size is the size of the configured volume group in MiB
321       - vg_free is the free size of the volume group in MiB
322       - memory_dom0 is the memory allocated for domain0 in MiB
323       - memory_free is the currently available (free) ram in MiB
324       - memory_total is the total number of ram in MiB
325
326   """
327   outputarray = {}
328   vginfo = _GetVGInfo(vgname)
329   outputarray['vg_size'] = vginfo['vg_size']
330   outputarray['vg_free'] = vginfo['vg_free']
331
332   hyper = hypervisor.GetHypervisor(hypervisor_type)
333   hyp_info = hyper.GetNodeInfo()
334   if hyp_info is not None:
335     outputarray.update(hyp_info)
336
337   f = open("/proc/sys/kernel/random/boot_id", 'r')
338   try:
339     outputarray["bootid"] = f.read(128).rstrip("\n")
340   finally:
341     f.close()
342
343   return outputarray
344
345
346 def VerifyNode(what, cluster_name):
347   """Verify the status of the local node.
348
349   Based on the input L{what} parameter, various checks are done on the
350   local node.
351
352   If the I{filelist} key is present, this list of
353   files is checksummed and the file/checksum pairs are returned.
354
355   If the I{nodelist} key is present, we check that we have
356   connectivity via ssh with the target nodes (and check the hostname
357   report).
358
359   If the I{node-net-test} key is present, we check that we have
360   connectivity to the given nodes via both primary IP and, if
361   applicable, secondary IPs.
362
363   @type what: C{dict}
364   @param what: a dictionary of things to check:
365       - filelist: list of files for which to compute checksums
366       - nodelist: list of nodes we should check ssh communication with
367       - node-net-test: list of nodes we should check node daemon port
368         connectivity with
369       - hypervisor: list with hypervisors to run the verify for
370   @rtype: dict
371   @return: a dictionary with the same keys as the input dict, and
372       values representing the result of the checks
373
374   """
375   result = {}
376
377   if constants.NV_HYPERVISOR in what:
378     result[constants.NV_HYPERVISOR] = tmp = {}
379     for hv_name in what[constants.NV_HYPERVISOR]:
380       tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
381
382   if constants.NV_FILELIST in what:
383     result[constants.NV_FILELIST] = utils.FingerprintFiles(
384       what[constants.NV_FILELIST])
385
386   if constants.NV_NODELIST in what:
387     result[constants.NV_NODELIST] = tmp = {}
388     random.shuffle(what[constants.NV_NODELIST])
389     for node in what[constants.NV_NODELIST]:
390       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
391       if not success:
392         tmp[node] = message
393
394   if constants.NV_NODENETTEST in what:
395     result[constants.NV_NODENETTEST] = tmp = {}
396     my_name = utils.HostInfo().name
397     my_pip = my_sip = None
398     for name, pip, sip in what[constants.NV_NODENETTEST]:
399       if name == my_name:
400         my_pip = pip
401         my_sip = sip
402         break
403     if not my_pip:
404       tmp[my_name] = ("Can't find my own primary/secondary IP"
405                       " in the node list")
406     else:
407       port = utils.GetNodeDaemonPort()
408       for name, pip, sip in what[constants.NV_NODENETTEST]:
409         fail = []
410         if not utils.TcpPing(pip, port, source=my_pip):
411           fail.append("primary")
412         if sip != pip:
413           if not utils.TcpPing(sip, port, source=my_sip):
414             fail.append("secondary")
415         if fail:
416           tmp[name] = ("failure using the %s interface(s)" %
417                        " and ".join(fail))
418
419   if constants.NV_LVLIST in what:
420     result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
421
422   if constants.NV_INSTANCELIST in what:
423     result[constants.NV_INSTANCELIST] = GetInstanceList(
424       what[constants.NV_INSTANCELIST])
425
426   if constants.NV_VGLIST in what:
427     result[constants.NV_VGLIST] = ListVolumeGroups()
428
429   if constants.NV_VERSION in what:
430     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
431                                     constants.RELEASE_VERSION)
432
433   if constants.NV_HVINFO in what:
434     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
435     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
436
437   if constants.NV_DRBDLIST in what:
438     try:
439       used_minors = bdev.DRBD8.GetUsedDevs().keys()
440     except errors.BlockDeviceError, err:
441       logging.warning("Can't get used minors list", exc_info=True)
442       used_minors = str(err)
443     result[constants.NV_DRBDLIST] = used_minors
444
445   return result
446
447
448 def GetVolumeList(vg_name):
449   """Compute list of logical volumes and their size.
450
451   @type vg_name: str
452   @param vg_name: the volume group whose LVs we should list
453   @rtype: dict
454   @return:
455       dictionary of all partions (key) with value being a tuple of
456       their size (in MiB), inactive and online status::
457
458         {'test1': ('20.06', True, True)}
459
460       in case of errors, a string is returned with the error
461       details.
462
463   """
464   lvs = {}
465   sep = '|'
466   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
467                          "--separator=%s" % sep,
468                          "-olv_name,lv_size,lv_attr", vg_name])
469   if result.failed:
470     logging.error("Failed to list logical volumes, lvs output: %s",
471                   result.output)
472     return result.output
473
474   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
475   for line in result.stdout.splitlines():
476     line = line.strip()
477     match = valid_line_re.match(line)
478     if not match:
479       logging.error("Invalid line returned from lvs output: '%s'", line)
480       continue
481     name, size, attr = match.groups()
482     inactive = attr[4] == '-'
483     online = attr[5] == 'o'
484     lvs[name] = (size, inactive, online)
485
486   return lvs
487
488
489 def ListVolumeGroups():
490   """List the volume groups and their size.
491
492   @rtype: dict
493   @return: dictionary with keys volume name and values the
494       size of the volume
495
496   """
497   return utils.ListVolumeGroups()
498
499
500 def NodeVolumes():
501   """List all volumes on this node.
502
503   @rtype: list
504   @return:
505     A list of dictionaries, each having four keys:
506       - name: the logical volume name,
507       - size: the size of the logical volume
508       - dev: the physical device on which the LV lives
509       - vg: the volume group to which it belongs
510
511     In case of errors, we return an empty list and log the
512     error.
513
514     Note that since a logical volume can live on multiple physical
515     volumes, the resulting list might include a logical volume
516     multiple times.
517
518   """
519   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
520                          "--separator=|",
521                          "--options=lv_name,lv_size,devices,vg_name"])
522   if result.failed:
523     logging.error("Failed to list logical volumes, lvs output: %s",
524                   result.output)
525     return []
526
527   def parse_dev(dev):
528     if '(' in dev:
529       return dev.split('(')[0]
530     else:
531       return dev
532
533   def map_line(line):
534     return {
535       'name': line[0].strip(),
536       'size': line[1].strip(),
537       'dev': parse_dev(line[2].strip()),
538       'vg': line[3].strip(),
539     }
540
541   return [map_line(line.split('|')) for line in result.stdout.splitlines()
542           if line.count('|') >= 3]
543
544
545 def BridgesExist(bridges_list):
546   """Check if a list of bridges exist on the current node.
547
548   @rtype: boolean
549   @return: C{True} if all of them exist, C{False} otherwise
550
551   """
552   for bridge in bridges_list:
553     if not utils.BridgeExists(bridge):
554       return False
555
556   return True
557
558
559 def GetInstanceList(hypervisor_list):
560   """Provides a list of instances.
561
562   @type hypervisor_list: list
563   @param hypervisor_list: the list of hypervisors to query information
564
565   @rtype: list
566   @return: a list of all running instances on the current node
567     - instance1.example.com
568     - instance2.example.com
569
570   """
571   results = []
572   for hname in hypervisor_list:
573     try:
574       names = hypervisor.GetHypervisor(hname).ListInstances()
575       results.extend(names)
576     except errors.HypervisorError:
577       logging.exception("Error enumerating instances for hypevisor %s", hname)
578       raise
579
580   return results
581
582
583 def GetInstanceInfo(instance, hname):
584   """Gives back the information about an instance as a dictionary.
585
586   @type instance: string
587   @param instance: the instance name
588   @type hname: string
589   @param hname: the hypervisor type of the instance
590
591   @rtype: dict
592   @return: dictionary with the following keys:
593       - memory: memory size of instance (int)
594       - state: xen state of instance (string)
595       - time: cpu time of instance (float)
596
597   """
598   output = {}
599
600   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
601   if iinfo is not None:
602     output['memory'] = iinfo[2]
603     output['state'] = iinfo[4]
604     output['time'] = iinfo[5]
605
606   return output
607
608
609 def GetInstanceMigratable(instance):
610   """Gives whether an instance can be migrated.
611
612   @type instance: L{objects.Instance}
613   @param instance: object representing the instance to be checked.
614
615   @rtype: tuple
616   @return: tuple of (result, description) where:
617       - result: whether the instance can be migrated or not
618       - description: a description of the issue, if relevant
619
620   """
621   hyper = hypervisor.GetHypervisor(instance.hypervisor)
622   if instance.name not in hyper.ListInstances():
623     return (False, 'not running')
624
625   for idx in range(len(instance.disks)):
626     link_name = _GetBlockDevSymlinkPath(instance.name, idx)
627     if not os.path.islink(link_name):
628       return (False, 'not restarted since ganeti 1.2.5')
629
630   return (True, '')
631
632
633 def GetAllInstancesInfo(hypervisor_list):
634   """Gather data about all instances.
635
636   This is the equivalent of L{GetInstanceInfo}, except that it
637   computes data for all instances at once, thus being faster if one
638   needs data about more than one instance.
639
640   @type hypervisor_list: list
641   @param hypervisor_list: list of hypervisors to query for instance data
642
643   @rtype: dict
644   @return: dictionary of instance: data, with data having the following keys:
645       - memory: memory size of instance (int)
646       - state: xen state of instance (string)
647       - time: cpu time of instance (float)
648       - vcpus: the number of vcpus
649
650   """
651   output = {}
652
653   for hname in hypervisor_list:
654     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
655     if iinfo:
656       for name, inst_id, memory, vcpus, state, times in iinfo:
657         value = {
658           'memory': memory,
659           'vcpus': vcpus,
660           'state': state,
661           'time': times,
662           }
663         if name in output:
664           # we only check static parameters, like memory and vcpus,
665           # and not state and time which can change between the
666           # invocations of the different hypervisors
667           for key in 'memory', 'vcpus':
668             if value[key] != output[name][key]:
669               raise errors.HypervisorError("Instance %s is running twice"
670                                            " with different parameters" % name)
671         output[name] = value
672
673   return output
674
675
676 def InstanceOsAdd(instance):
677   """Add an OS to an instance.
678
679   @type instance: L{objects.Instance}
680   @param instance: Instance whose OS is to be installed
681   @rtype: boolean
682   @return: the success of the operation
683
684   """
685   try:
686     inst_os = OSFromDisk(instance.os)
687   except errors.InvalidOS, err:
688     os_name, os_dir, os_err = err.args
689     if os_dir is None:
690       return (False, "Can't find OS '%s': %s" % (os_name, os_err))
691     else:
692       return (False, "Error parsing OS '%s' in directory %s: %s" %
693               (os_name, os_dir, os_err))
694
695   create_env = OSEnvironment(instance)
696
697   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
698                                      instance.name, int(time.time()))
699
700   result = utils.RunCmd([inst_os.create_script], env=create_env,
701                         cwd=inst_os.path, output=logfile,)
702   if result.failed:
703     logging.error("os create command '%s' returned error: %s, logfile: %s,"
704                   " output: %s", result.cmd, result.fail_reason, logfile,
705                   result.output)
706     lines = [utils.SafeEncode(val)
707              for val in utils.TailFile(logfile, lines=20)]
708     return (False, "OS create script failed (%s), last lines in the"
709             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
710
711   return (True, "Successfully installed")
712
713
714 def RunRenameInstance(instance, old_name):
715   """Run the OS rename script for an instance.
716
717   @type instance: L{objects.Instance}
718   @param instance: Instance whose OS is to be installed
719   @type old_name: string
720   @param old_name: previous instance name
721   @rtype: boolean
722   @return: the success of the operation
723
724   """
725   inst_os = OSFromDisk(instance.os)
726
727   rename_env = OSEnvironment(instance)
728   rename_env['OLD_INSTANCE_NAME'] = old_name
729
730   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
731                                            old_name,
732                                            instance.name, int(time.time()))
733
734   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
735                         cwd=inst_os.path, output=logfile)
736
737   if result.failed:
738     logging.error("os create command '%s' returned error: %s output: %s",
739                   result.cmd, result.fail_reason, result.output)
740     lines = [utils.SafeEncode(val)
741              for val in utils.TailFile(logfile, lines=20)]
742     return (False, "OS rename script failed (%s), last lines in the"
743             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
744
745   return (True, "Rename successful")
746
747
748 def _GetVGInfo(vg_name):
749   """Get information about the volume group.
750
751   @type vg_name: str
752   @param vg_name: the volume group which we query
753   @rtype: dict
754   @return:
755     A dictionary with the following keys:
756       - C{vg_size} is the total size of the volume group in MiB
757       - C{vg_free} is the free size of the volume group in MiB
758       - C{pv_count} are the number of physical disks in that VG
759
760     If an error occurs during gathering of data, we return the same dict
761     with keys all set to None.
762
763   """
764   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
765
766   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
767                          "--nosuffix", "--units=m", "--separator=:", vg_name])
768
769   if retval.failed:
770     logging.error("volume group %s not present", vg_name)
771     return retdic
772   valarr = retval.stdout.strip().rstrip(':').split(':')
773   if len(valarr) == 3:
774     try:
775       retdic = {
776         "vg_size": int(round(float(valarr[0]), 0)),
777         "vg_free": int(round(float(valarr[1]), 0)),
778         "pv_count": int(valarr[2]),
779         }
780     except ValueError, err:
781       logging.exception("Fail to parse vgs output")
782   else:
783     logging.error("vgs output has the wrong number of fields (expected"
784                   " three): %s", str(valarr))
785   return retdic
786
787
788 def _GetBlockDevSymlinkPath(instance_name, idx):
789   return os.path.join(constants.DISK_LINKS_DIR,
790                       "%s:%d" % (instance_name, idx))
791
792
793 def _SymlinkBlockDev(instance_name, device_path, idx):
794   """Set up symlinks to a instance's block device.
795
796   This is an auxiliary function run when an instance is start (on the primary
797   node) or when an instance is migrated (on the target node).
798
799
800   @param instance_name: the name of the target instance
801   @param device_path: path of the physical block device, on the node
802   @param idx: the disk index
803   @return: absolute path to the disk's symlink
804
805   """
806   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
807   try:
808     os.symlink(device_path, link_name)
809   except OSError, err:
810     if err.errno == errno.EEXIST:
811       if (not os.path.islink(link_name) or
812           os.readlink(link_name) != device_path):
813         os.remove(link_name)
814         os.symlink(device_path, link_name)
815     else:
816       raise
817
818   return link_name
819
820
821 def _RemoveBlockDevLinks(instance_name, disks):
822   """Remove the block device symlinks belonging to the given instance.
823
824   """
825   for idx, disk in enumerate(disks):
826     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
827     if os.path.islink(link_name):
828       try:
829         os.remove(link_name)
830       except OSError:
831         logging.exception("Can't remove symlink '%s'", link_name)
832
833
834 def _GatherAndLinkBlockDevs(instance):
835   """Set up an instance's block device(s).
836
837   This is run on the primary node at instance startup. The block
838   devices must be already assembled.
839
840   @type instance: L{objects.Instance}
841   @param instance: the instance whose disks we shoul assemble
842   @rtype: list
843   @return: list of (disk_object, device_path)
844
845   """
846   block_devices = []
847   for idx, disk in enumerate(instance.disks):
848     device = _RecursiveFindBD(disk)
849     if device is None:
850       raise errors.BlockDeviceError("Block device '%s' is not set up." %
851                                     str(disk))
852     device.Open()
853     try:
854       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
855     except OSError, e:
856       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
857                                     e.strerror)
858
859     block_devices.append((disk, link_name))
860
861   return block_devices
862
863
864 def StartInstance(instance):
865   """Start an instance.
866
867   @type instance: L{objects.Instance}
868   @param instance: the instance object
869   @rtype: boolean
870   @return: whether the startup was successful or not
871
872   """
873   running_instances = GetInstanceList([instance.hypervisor])
874
875   if instance.name in running_instances:
876     return (True, "Already running")
877
878   try:
879     block_devices = _GatherAndLinkBlockDevs(instance)
880     hyper = hypervisor.GetHypervisor(instance.hypervisor)
881     hyper.StartInstance(instance, block_devices)
882   except errors.BlockDeviceError, err:
883     logging.exception("Failed to start instance")
884     return (False, "Block device error: %s" % str(err))
885   except errors.HypervisorError, err:
886     logging.exception("Failed to start instance")
887     _RemoveBlockDevLinks(instance.name, instance.disks)
888     return (False, "Hypervisor error: %s" % str(err))
889
890   return (True, "Instance started successfully")
891
892
893 def InstanceShutdown(instance):
894   """Shut an instance down.
895
896   @note: this functions uses polling with a hardcoded timeout.
897
898   @type instance: L{objects.Instance}
899   @param instance: the instance object
900   @rtype: boolean
901   @return: whether the startup was successful or not
902
903   """
904   hv_name = instance.hypervisor
905   running_instances = GetInstanceList([hv_name])
906
907   if instance.name not in running_instances:
908     return (True, "Instance already stopped")
909
910   hyper = hypervisor.GetHypervisor(hv_name)
911   try:
912     hyper.StopInstance(instance)
913   except errors.HypervisorError, err:
914     msg = "Failed to stop instance %s: %s" % (instance.name, err)
915     logging.error(msg)
916     return (False, msg)
917
918   # test every 10secs for 2min
919
920   time.sleep(1)
921   for _ in range(11):
922     if instance.name not in GetInstanceList([hv_name]):
923       break
924     time.sleep(10)
925   else:
926     # the shutdown did not succeed
927     logging.error("Shutdown of '%s' unsuccessful, using destroy",
928                   instance.name)
929
930     try:
931       hyper.StopInstance(instance, force=True)
932     except errors.HypervisorError, err:
933       msg = "Failed to force stop instance %s: %s" % (instance.name, err)
934       logging.error(msg)
935       return (False, msg)
936
937     time.sleep(1)
938     if instance.name in GetInstanceList([hv_name]):
939       msg = ("Could not shutdown instance %s even by destroy" %
940              instance.name)
941       logging.error(msg)
942       return (False, msg)
943
944   _RemoveBlockDevLinks(instance.name, instance.disks)
945
946   return (True, "Instance has been shutdown successfully")
947
948
949 def InstanceReboot(instance, reboot_type):
950   """Reboot an instance.
951
952   @type instance: L{objects.Instance}
953   @param instance: the instance object to reboot
954   @type reboot_type: str
955   @param reboot_type: the type of reboot, one the following
956     constants:
957       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
958         instance OS, do not recreate the VM
959       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
960         restart the VM (at the hypervisor level)
961       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
962         not accepted here, since that mode is handled differently, in
963         cmdlib, and translates into full stop and start of the
964         instance (instead of a call_instance_reboot RPC)
965   @rtype: boolean
966   @return: the success of the operation
967
968   """
969   running_instances = GetInstanceList([instance.hypervisor])
970
971   if instance.name not in running_instances:
972     msg = "Cannot reboot instance %s that is not running" % instance.name
973     logging.error(msg)
974     return (False, msg)
975
976   hyper = hypervisor.GetHypervisor(instance.hypervisor)
977   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
978     try:
979       hyper.RebootInstance(instance)
980     except errors.HypervisorError, err:
981       msg = "Failed to soft reboot instance %s: %s" % (instance.name, err)
982       logging.error(msg)
983       return (False, msg)
984   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
985     try:
986       stop_result = InstanceShutdown(instance)
987       if not stop_result[0]:
988         return stop_result
989       return StartInstance(instance)
990     except errors.HypervisorError, err:
991       msg = "Failed to hard reboot instance %s: %s" % (instance.name, err)
992       logging.error(msg)
993       return (False, msg)
994   else:
995     return (False, "Invalid reboot_type received: %s" % (reboot_type,))
996
997   return (True, "Reboot successful")
998
999
1000 def MigrationInfo(instance):
1001   """Gather information about an instance to be migrated.
1002
1003   @type instance: L{objects.Instance}
1004   @param instance: the instance definition
1005
1006   """
1007   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1008   try:
1009     info = hyper.MigrationInfo(instance)
1010   except errors.HypervisorError, err:
1011     msg = "Failed to fetch migration information"
1012     logging.exception(msg)
1013     return (False, '%s: %s' % (msg, err))
1014   return (True, info)
1015
1016
1017 def AcceptInstance(instance, info, target):
1018   """Prepare the node to accept an instance.
1019
1020   @type instance: L{objects.Instance}
1021   @param instance: the instance definition
1022   @type info: string/data (opaque)
1023   @param info: migration information, from the source node
1024   @type target: string
1025   @param target: target host (usually ip), on this node
1026
1027   """
1028   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1029   try:
1030     hyper.AcceptInstance(instance, info, target)
1031   except errors.HypervisorError, err:
1032     msg = "Failed to accept instance"
1033     logging.exception(msg)
1034     return (False, '%s: %s' % (msg, err))
1035   return (True, "Accept successful")
1036
1037
1038 def FinalizeMigration(instance, info, success):
1039   """Finalize any preparation to accept an instance.
1040
1041   @type instance: L{objects.Instance}
1042   @param instance: the instance definition
1043   @type info: string/data (opaque)
1044   @param info: migration information, from the source node
1045   @type success: boolean
1046   @param success: whether the migration was a success or a failure
1047
1048   """
1049   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1050   try:
1051     hyper.FinalizeMigration(instance, info, success)
1052   except errors.HypervisorError, err:
1053     msg = "Failed to finalize migration"
1054     logging.exception(msg)
1055     return (False, '%s: %s' % (msg, err))
1056   return (True, "Migration Finalized")
1057
1058
1059 def MigrateInstance(instance, target, live):
1060   """Migrates an instance to another node.
1061
1062   @type instance: L{objects.Instance}
1063   @param instance: the instance definition
1064   @type target: string
1065   @param target: the target node name
1066   @type live: boolean
1067   @param live: whether the migration should be done live or not (the
1068       interpretation of this parameter is left to the hypervisor)
1069   @rtype: tuple
1070   @return: a tuple of (success, msg) where:
1071       - succes is a boolean denoting the success/failure of the operation
1072       - msg is a string with details in case of failure
1073
1074   """
1075   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1076
1077   try:
1078     hyper.MigrateInstance(instance.name, target, live)
1079   except errors.HypervisorError, err:
1080     msg = "Failed to migrate instance"
1081     logging.exception(msg)
1082     return (False, "%s: %s" % (msg, err))
1083   return (True, "Migration successful")
1084
1085
1086 def BlockdevCreate(disk, size, owner, on_primary, info):
1087   """Creates a block device for an instance.
1088
1089   @type disk: L{objects.Disk}
1090   @param disk: the object describing the disk we should create
1091   @type size: int
1092   @param size: the size of the physical underlying device, in MiB
1093   @type owner: str
1094   @param owner: the name of the instance for which disk is created,
1095       used for device cache data
1096   @type on_primary: boolean
1097   @param on_primary:  indicates if it is the primary node or not
1098   @type info: string
1099   @param info: string that will be sent to the physical device
1100       creation, used for example to set (LVM) tags on LVs
1101
1102   @return: the new unique_id of the device (this can sometime be
1103       computed only after creation), or None. On secondary nodes,
1104       it's not required to return anything.
1105
1106   """
1107   clist = []
1108   if disk.children:
1109     for child in disk.children:
1110       try:
1111         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1112       except errors.BlockDeviceError, err:
1113         errmsg = "Can't assemble device %s: %s" % (child, err)
1114         logging.error(errmsg)
1115         return False, errmsg
1116       if on_primary or disk.AssembleOnSecondary():
1117         # we need the children open in case the device itself has to
1118         # be assembled
1119         try:
1120           crdev.Open()
1121         except errors.BlockDeviceError, err:
1122           errmsg = "Can't make child '%s' read-write: %s" % (child, err)
1123           logging.error(errmsg)
1124           return False, errmsg
1125       clist.append(crdev)
1126
1127   try:
1128     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1129   except errors.BlockDeviceError, err:
1130     return False, "Can't create block device: %s" % str(err)
1131
1132   if on_primary or disk.AssembleOnSecondary():
1133     try:
1134       device.Assemble()
1135     except errors.BlockDeviceError, err:
1136       errmsg = ("Can't assemble device after creation, very"
1137                 " unusual event: %s" % str(err))
1138       logging.error(errmsg)
1139       return False, errmsg
1140     device.SetSyncSpeed(constants.SYNC_SPEED)
1141     if on_primary or disk.OpenOnSecondary():
1142       try:
1143         device.Open(force=True)
1144       except errors.BlockDeviceError, err:
1145         errmsg = ("Can't make device r/w after creation, very"
1146                   " unusual event: %s" % str(err))
1147         logging.error(errmsg)
1148         return False, errmsg
1149     DevCacheManager.UpdateCache(device.dev_path, owner,
1150                                 on_primary, disk.iv_name)
1151
1152   device.SetInfo(info)
1153
1154   physical_id = device.unique_id
1155   return True, physical_id
1156
1157
1158 def BlockdevRemove(disk):
1159   """Remove a block device.
1160
1161   @note: This is intended to be called recursively.
1162
1163   @type disk: L{objects.Disk}
1164   @param disk: the disk object we should remove
1165   @rtype: boolean
1166   @return: the success of the operation
1167
1168   """
1169   msgs = []
1170   result = True
1171   try:
1172     rdev = _RecursiveFindBD(disk)
1173   except errors.BlockDeviceError, err:
1174     # probably can't attach
1175     logging.info("Can't attach to device %s in remove", disk)
1176     rdev = None
1177   if rdev is not None:
1178     r_path = rdev.dev_path
1179     try:
1180       rdev.Remove()
1181     except errors.BlockDeviceError, err:
1182       msgs.append(str(err))
1183       result = False
1184     if result:
1185       DevCacheManager.RemoveCache(r_path)
1186
1187   if disk.children:
1188     for child in disk.children:
1189       c_status, c_msg = BlockdevRemove(child)
1190       result = result and c_status
1191       if c_msg: # not an empty message
1192         msgs.append(c_msg)
1193
1194   return (result, "; ".join(msgs))
1195
1196
1197 def _RecursiveAssembleBD(disk, owner, as_primary):
1198   """Activate a block device for an instance.
1199
1200   This is run on the primary and secondary nodes for an instance.
1201
1202   @note: this function is called recursively.
1203
1204   @type disk: L{objects.Disk}
1205   @param disk: the disk we try to assemble
1206   @type owner: str
1207   @param owner: the name of the instance which owns the disk
1208   @type as_primary: boolean
1209   @param as_primary: if we should make the block device
1210       read/write
1211
1212   @return: the assembled device or None (in case no device
1213       was assembled)
1214   @raise errors.BlockDeviceError: in case there is an error
1215       during the activation of the children or the device
1216       itself
1217
1218   """
1219   children = []
1220   if disk.children:
1221     mcn = disk.ChildrenNeeded()
1222     if mcn == -1:
1223       mcn = 0 # max number of Nones allowed
1224     else:
1225       mcn = len(disk.children) - mcn # max number of Nones
1226     for chld_disk in disk.children:
1227       try:
1228         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1229       except errors.BlockDeviceError, err:
1230         if children.count(None) >= mcn:
1231           raise
1232         cdev = None
1233         logging.error("Error in child activation (but continuing): %s",
1234                       str(err))
1235       children.append(cdev)
1236
1237   if as_primary or disk.AssembleOnSecondary():
1238     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1239     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1240     result = r_dev
1241     if as_primary or disk.OpenOnSecondary():
1242       r_dev.Open()
1243     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1244                                 as_primary, disk.iv_name)
1245
1246   else:
1247     result = True
1248   return result
1249
1250
1251 def BlockdevAssemble(disk, owner, as_primary):
1252   """Activate a block device for an instance.
1253
1254   This is a wrapper over _RecursiveAssembleBD.
1255
1256   @rtype: str or boolean
1257   @return: a C{/dev/...} path for primary nodes, and
1258       C{True} for secondary nodes
1259
1260   """
1261   status = True
1262   result = "no error information"
1263   try:
1264     result = _RecursiveAssembleBD(disk, owner, as_primary)
1265     if isinstance(result, bdev.BlockDev):
1266       result = result.dev_path
1267   except errors.BlockDeviceError, err:
1268     result = "Error while assembling disk: %s" % str(err)
1269     status = False
1270   return (status, result)
1271
1272
1273 def BlockdevShutdown(disk):
1274   """Shut down a block device.
1275
1276   First, if the device is assembled (Attach() is successful), then
1277   the device is shutdown. Then the children of the device are
1278   shutdown.
1279
1280   This function is called recursively. Note that we don't cache the
1281   children or such, as oppossed to assemble, shutdown of different
1282   devices doesn't require that the upper device was active.
1283
1284   @type disk: L{objects.Disk}
1285   @param disk: the description of the disk we should
1286       shutdown
1287   @rtype: boolean
1288   @return: the success of the operation
1289
1290   """
1291   msgs = []
1292   result = True
1293   r_dev = _RecursiveFindBD(disk)
1294   if r_dev is not None:
1295     r_path = r_dev.dev_path
1296     try:
1297       r_dev.Shutdown()
1298       DevCacheManager.RemoveCache(r_path)
1299     except errors.BlockDeviceError, err:
1300       msgs.append(str(err))
1301       result = False
1302
1303   if disk.children:
1304     for child in disk.children:
1305       c_status, c_msg = BlockdevShutdown(child)
1306       result = result and c_status
1307       if c_msg: # not an empty message
1308         msgs.append(c_msg)
1309
1310   return (result, "; ".join(msgs))
1311
1312
1313 def BlockdevAddchildren(parent_cdev, new_cdevs):
1314   """Extend a mirrored block device.
1315
1316   @type parent_cdev: L{objects.Disk}
1317   @param parent_cdev: the disk to which we should add children
1318   @type new_cdevs: list of L{objects.Disk}
1319   @param new_cdevs: the list of children which we should add
1320   @rtype: boolean
1321   @return: the success of the operation
1322
1323   """
1324   parent_bdev = _RecursiveFindBD(parent_cdev)
1325   if parent_bdev is None:
1326     logging.error("Can't find parent device")
1327     return False
1328   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1329   if new_bdevs.count(None) > 0:
1330     logging.error("Can't find new device(s) to add: %s:%s",
1331                   new_bdevs, new_cdevs)
1332     return False
1333   parent_bdev.AddChildren(new_bdevs)
1334   return True
1335
1336
1337 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1338   """Shrink a mirrored block device.
1339
1340   @type parent_cdev: L{objects.Disk}
1341   @param parent_cdev: the disk from which we should remove children
1342   @type new_cdevs: list of L{objects.Disk}
1343   @param new_cdevs: the list of children which we should remove
1344   @rtype: boolean
1345   @return: the success of the operation
1346
1347   """
1348   parent_bdev = _RecursiveFindBD(parent_cdev)
1349   if parent_bdev is None:
1350     logging.error("Can't find parent in remove children: %s", parent_cdev)
1351     return False
1352   devs = []
1353   for disk in new_cdevs:
1354     rpath = disk.StaticDevPath()
1355     if rpath is None:
1356       bd = _RecursiveFindBD(disk)
1357       if bd is None:
1358         logging.error("Can't find dynamic device %s while removing children",
1359                       disk)
1360         return False
1361       else:
1362         devs.append(bd.dev_path)
1363     else:
1364       devs.append(rpath)
1365   parent_bdev.RemoveChildren(devs)
1366   return True
1367
1368
1369 def BlockdevGetmirrorstatus(disks):
1370   """Get the mirroring status of a list of devices.
1371
1372   @type disks: list of L{objects.Disk}
1373   @param disks: the list of disks which we should query
1374   @rtype: disk
1375   @return:
1376       a list of (mirror_done, estimated_time) tuples, which
1377       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1378   @raise errors.BlockDeviceError: if any of the disks cannot be
1379       found
1380
1381   """
1382   stats = []
1383   for dsk in disks:
1384     rbd = _RecursiveFindBD(dsk)
1385     if rbd is None:
1386       raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1387     stats.append(rbd.CombinedSyncStatus())
1388   return stats
1389
1390
1391 def _RecursiveFindBD(disk):
1392   """Check if a device is activated.
1393
1394   If so, return information about the real device.
1395
1396   @type disk: L{objects.Disk}
1397   @param disk: the disk object we need to find
1398
1399   @return: None if the device can't be found,
1400       otherwise the device instance
1401
1402   """
1403   children = []
1404   if disk.children:
1405     for chdisk in disk.children:
1406       children.append(_RecursiveFindBD(chdisk))
1407
1408   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1409
1410
1411 def BlockdevFind(disk):
1412   """Check if a device is activated.
1413
1414   If it is, return information about the real device.
1415
1416   @type disk: L{objects.Disk}
1417   @param disk: the disk to find
1418   @rtype: None or tuple
1419   @return: None if the disk cannot be found, otherwise a
1420       tuple (device_path, major, minor, sync_percent,
1421       estimated_time, is_degraded)
1422
1423   """
1424   try:
1425     rbd = _RecursiveFindBD(disk)
1426   except errors.BlockDeviceError, err:
1427     return (False, str(err))
1428   if rbd is None:
1429     return (True, None)
1430   return (True, (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus())
1431
1432
1433 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1434   """Write a file to the filesystem.
1435
1436   This allows the master to overwrite(!) a file. It will only perform
1437   the operation if the file belongs to a list of configuration files.
1438
1439   @type file_name: str
1440   @param file_name: the target file name
1441   @type data: str
1442   @param data: the new contents of the file
1443   @type mode: int
1444   @param mode: the mode to give the file (can be None)
1445   @type uid: int
1446   @param uid: the owner of the file (can be -1 for default)
1447   @type gid: int
1448   @param gid: the group of the file (can be -1 for default)
1449   @type atime: float
1450   @param atime: the atime to set on the file (can be None)
1451   @type mtime: float
1452   @param mtime: the mtime to set on the file (can be None)
1453   @rtype: boolean
1454   @return: the success of the operation; errors are logged
1455       in the node daemon log
1456
1457   """
1458   if not os.path.isabs(file_name):
1459     logging.error("Filename passed to UploadFile is not absolute: '%s'",
1460                   file_name)
1461     return False
1462
1463   allowed_files = [
1464     constants.CLUSTER_CONF_FILE,
1465     constants.ETC_HOSTS,
1466     constants.SSH_KNOWN_HOSTS_FILE,
1467     constants.VNC_PASSWORD_FILE,
1468     ]
1469
1470   if file_name not in allowed_files:
1471     logging.error("Filename passed to UploadFile not in allowed"
1472                  " upload targets: '%s'", file_name)
1473     return False
1474
1475   raw_data = _Decompress(data)
1476
1477   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1478                   atime=atime, mtime=mtime)
1479   return True
1480
1481
1482 def WriteSsconfFiles(values):
1483   """Update all ssconf files.
1484
1485   Wrapper around the SimpleStore.WriteFiles.
1486
1487   """
1488   ssconf.SimpleStore().WriteFiles(values)
1489
1490
1491 def _ErrnoOrStr(err):
1492   """Format an EnvironmentError exception.
1493
1494   If the L{err} argument has an errno attribute, it will be looked up
1495   and converted into a textual C{E...} description. Otherwise the
1496   string representation of the error will be returned.
1497
1498   @type err: L{EnvironmentError}
1499   @param err: the exception to format
1500
1501   """
1502   if hasattr(err, 'errno'):
1503     detail = errno.errorcode[err.errno]
1504   else:
1505     detail = str(err)
1506   return detail
1507
1508
1509 def _OSOndiskVersion(name, os_dir):
1510   """Compute and return the API version of a given OS.
1511
1512   This function will try to read the API version of the OS given by
1513   the 'name' parameter and residing in the 'os_dir' directory.
1514
1515   @type name: str
1516   @param name: the OS name we should look for
1517   @type os_dir: str
1518   @param os_dir: the directory inwhich we should look for the OS
1519   @rtype: int or None
1520   @return:
1521       Either an integer denoting the version or None in the
1522       case when this is not a valid OS name.
1523   @raise errors.InvalidOS: if the OS cannot be found
1524
1525   """
1526   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1527
1528   try:
1529     st = os.stat(api_file)
1530   except EnvironmentError, err:
1531     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1532                            " found (%s)" % _ErrnoOrStr(err))
1533
1534   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1535     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1536                            " a regular file")
1537
1538   try:
1539     f = open(api_file)
1540     try:
1541       api_versions = f.readlines()
1542     finally:
1543       f.close()
1544   except EnvironmentError, err:
1545     raise errors.InvalidOS(name, os_dir, "error while reading the"
1546                            " API version (%s)" % _ErrnoOrStr(err))
1547
1548   api_versions = [version.strip() for version in api_versions]
1549   try:
1550     api_versions = [int(version) for version in api_versions]
1551   except (TypeError, ValueError), err:
1552     raise errors.InvalidOS(name, os_dir,
1553                            "API version is not integer (%s)" % str(err))
1554
1555   return api_versions
1556
1557
1558 def DiagnoseOS(top_dirs=None):
1559   """Compute the validity for all OSes.
1560
1561   @type top_dirs: list
1562   @param top_dirs: the list of directories in which to
1563       search (if not given defaults to
1564       L{constants.OS_SEARCH_PATH})
1565   @rtype: list of L{objects.OS}
1566   @return: an OS object for each name in all the given
1567       directories
1568
1569   """
1570   if top_dirs is None:
1571     top_dirs = constants.OS_SEARCH_PATH
1572
1573   result = []
1574   for dir_name in top_dirs:
1575     if os.path.isdir(dir_name):
1576       try:
1577         f_names = utils.ListVisibleFiles(dir_name)
1578       except EnvironmentError, err:
1579         logging.exception("Can't list the OS directory %s", dir_name)
1580         break
1581       for name in f_names:
1582         try:
1583           os_inst = OSFromDisk(name, base_dir=dir_name)
1584           result.append(os_inst)
1585         except errors.InvalidOS, err:
1586           result.append(objects.OS.FromInvalidOS(err))
1587
1588   return result
1589
1590
1591 def OSFromDisk(name, base_dir=None):
1592   """Create an OS instance from disk.
1593
1594   This function will return an OS instance if the given name is a
1595   valid OS name. Otherwise, it will raise an appropriate
1596   L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1597
1598   @type base_dir: string
1599   @keyword base_dir: Base directory containing OS installations.
1600                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1601   @rtype: L{objects.OS}
1602   @return: the OS instance if we find a valid one
1603   @raise errors.InvalidOS: if we don't find a valid OS
1604
1605   """
1606   if base_dir is None:
1607     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1608     if os_dir is None:
1609       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1610   else:
1611     os_dir = os.path.sep.join([base_dir, name])
1612
1613   api_versions = _OSOndiskVersion(name, os_dir)
1614
1615   if constants.OS_API_VERSION not in api_versions:
1616     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1617                            " (found %s want %s)"
1618                            % (api_versions, constants.OS_API_VERSION))
1619
1620   # OS Scripts dictionary, we will populate it with the actual script names
1621   os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1622
1623   for script in os_scripts:
1624     os_scripts[script] = os.path.sep.join([os_dir, script])
1625
1626     try:
1627       st = os.stat(os_scripts[script])
1628     except EnvironmentError, err:
1629       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1630                              (script, _ErrnoOrStr(err)))
1631
1632     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1633       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1634                              script)
1635
1636     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1637       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1638                              script)
1639
1640
1641   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1642                     create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1643                     export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1644                     import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1645                     rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1646                     api_versions=api_versions)
1647
1648 def OSEnvironment(instance, debug=0):
1649   """Calculate the environment for an os script.
1650
1651   @type instance: L{objects.Instance}
1652   @param instance: target instance for the os script run
1653   @type debug: integer
1654   @param debug: debug level (0 or 1, for OS Api 10)
1655   @rtype: dict
1656   @return: dict of environment variables
1657   @raise errors.BlockDeviceError: if the block device
1658       cannot be found
1659
1660   """
1661   result = {}
1662   result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1663   result['INSTANCE_NAME'] = instance.name
1664   result['INSTANCE_OS'] = instance.os
1665   result['HYPERVISOR'] = instance.hypervisor
1666   result['DISK_COUNT'] = '%d' % len(instance.disks)
1667   result['NIC_COUNT'] = '%d' % len(instance.nics)
1668   result['DEBUG_LEVEL'] = '%d' % debug
1669   for idx, disk in enumerate(instance.disks):
1670     real_disk = _RecursiveFindBD(disk)
1671     if real_disk is None:
1672       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1673                                     str(disk))
1674     real_disk.Open()
1675     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1676     result['DISK_%d_ACCESS' % idx] = disk.mode
1677     if constants.HV_DISK_TYPE in instance.hvparams:
1678       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1679         instance.hvparams[constants.HV_DISK_TYPE]
1680     if disk.dev_type in constants.LDS_BLOCK:
1681       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1682     elif disk.dev_type == constants.LD_FILE:
1683       result['DISK_%d_BACKEND_TYPE' % idx] = \
1684         'file:%s' % disk.physical_id[0]
1685   for idx, nic in enumerate(instance.nics):
1686     result['NIC_%d_MAC' % idx] = nic.mac
1687     if nic.ip:
1688       result['NIC_%d_IP' % idx] = nic.ip
1689     result['NIC_%d_BRIDGE' % idx] = nic.bridge
1690     if constants.HV_NIC_TYPE in instance.hvparams:
1691       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1692         instance.hvparams[constants.HV_NIC_TYPE]
1693
1694   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1695     for key, value in source.items():
1696       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1697
1698   return result
1699
1700 def BlockdevGrow(disk, amount):
1701   """Grow a stack of block devices.
1702
1703   This function is called recursively, with the childrens being the
1704   first ones to resize.
1705
1706   @type disk: L{objects.Disk}
1707   @param disk: the disk to be grown
1708   @rtype: (status, result)
1709   @return: a tuple with the status of the operation
1710       (True/False), and the errors message if status
1711       is False
1712
1713   """
1714   r_dev = _RecursiveFindBD(disk)
1715   if r_dev is None:
1716     return False, "Cannot find block device %s" % (disk,)
1717
1718   try:
1719     r_dev.Grow(amount)
1720   except errors.BlockDeviceError, err:
1721     return False, str(err)
1722
1723   return True, None
1724
1725
1726 def BlockdevSnapshot(disk):
1727   """Create a snapshot copy of a block device.
1728
1729   This function is called recursively, and the snapshot is actually created
1730   just for the leaf lvm backend device.
1731
1732   @type disk: L{objects.Disk}
1733   @param disk: the disk to be snapshotted
1734   @rtype: string
1735   @return: snapshot disk path
1736
1737   """
1738   if disk.children:
1739     if len(disk.children) == 1:
1740       # only one child, let's recurse on it
1741       return BlockdevSnapshot(disk.children[0])
1742     else:
1743       # more than one child, choose one that matches
1744       for child in disk.children:
1745         if child.size == disk.size:
1746           # return implies breaking the loop
1747           return BlockdevSnapshot(child)
1748   elif disk.dev_type == constants.LD_LV:
1749     r_dev = _RecursiveFindBD(disk)
1750     if r_dev is not None:
1751       # let's stay on the safe side and ask for the full size, for now
1752       return r_dev.Snapshot(disk.size)
1753     else:
1754       return None
1755   else:
1756     raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1757                                  " '%s' of type '%s'" %
1758                                  (disk.unique_id, disk.dev_type))
1759
1760
1761 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1762   """Export a block device snapshot to a remote node.
1763
1764   @type disk: L{objects.Disk}
1765   @param disk: the description of the disk to export
1766   @type dest_node: str
1767   @param dest_node: the destination node to export to
1768   @type instance: L{objects.Instance}
1769   @param instance: the instance object to whom the disk belongs
1770   @type cluster_name: str
1771   @param cluster_name: the cluster name, needed for SSH hostalias
1772   @type idx: int
1773   @param idx: the index of the disk in the instance's disk list,
1774       used to export to the OS scripts environment
1775   @rtype: boolean
1776   @return: the success of the operation
1777
1778   """
1779   export_env = OSEnvironment(instance)
1780
1781   inst_os = OSFromDisk(instance.os)
1782   export_script = inst_os.export_script
1783
1784   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1785                                      instance.name, int(time.time()))
1786   if not os.path.exists(constants.LOG_OS_DIR):
1787     os.mkdir(constants.LOG_OS_DIR, 0750)
1788   real_disk = _RecursiveFindBD(disk)
1789   if real_disk is None:
1790     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1791                                   str(disk))
1792   real_disk.Open()
1793
1794   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1795   export_env['EXPORT_INDEX'] = str(idx)
1796
1797   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1798   destfile = disk.physical_id[1]
1799
1800   # the target command is built out of three individual commands,
1801   # which are joined by pipes; we check each individual command for
1802   # valid parameters
1803   expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1804                                export_script, logfile)
1805
1806   comprcmd = "gzip"
1807
1808   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1809                                 destdir, destdir, destfile)
1810   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1811                                                    constants.GANETI_RUNAS,
1812                                                    destcmd)
1813
1814   # all commands have been checked, so we're safe to combine them
1815   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1816
1817   result = utils.RunCmd(command, env=export_env)
1818
1819   if result.failed:
1820     logging.error("os snapshot export command '%s' returned error: %s"
1821                   " output: %s", command, result.fail_reason, result.output)
1822     return False
1823
1824   return True
1825
1826
1827 def FinalizeExport(instance, snap_disks):
1828   """Write out the export configuration information.
1829
1830   @type instance: L{objects.Instance}
1831   @param instance: the instance which we export, used for
1832       saving configuration
1833   @type snap_disks: list of L{objects.Disk}
1834   @param snap_disks: list of snapshot block devices, which
1835       will be used to get the actual name of the dump file
1836
1837   @rtype: boolean
1838   @return: the success of the operation
1839
1840   """
1841   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1842   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1843
1844   config = objects.SerializableConfigParser()
1845
1846   config.add_section(constants.INISECT_EXP)
1847   config.set(constants.INISECT_EXP, 'version', '0')
1848   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1849   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1850   config.set(constants.INISECT_EXP, 'os', instance.os)
1851   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1852
1853   config.add_section(constants.INISECT_INS)
1854   config.set(constants.INISECT_INS, 'name', instance.name)
1855   config.set(constants.INISECT_INS, 'memory', '%d' %
1856              instance.beparams[constants.BE_MEMORY])
1857   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1858              instance.beparams[constants.BE_VCPUS])
1859   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1860
1861   nic_total = 0
1862   for nic_count, nic in enumerate(instance.nics):
1863     nic_total += 1
1864     config.set(constants.INISECT_INS, 'nic%d_mac' %
1865                nic_count, '%s' % nic.mac)
1866     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1867     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1868                '%s' % nic.bridge)
1869   # TODO: redundant: on load can read nics until it doesn't exist
1870   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1871
1872   disk_total = 0
1873   for disk_count, disk in enumerate(snap_disks):
1874     if disk:
1875       disk_total += 1
1876       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1877                  ('%s' % disk.iv_name))
1878       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1879                  ('%s' % disk.physical_id[1]))
1880       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1881                  ('%d' % disk.size))
1882
1883   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1884
1885   utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1886                   data=config.Dumps())
1887   shutil.rmtree(finaldestdir, True)
1888   shutil.move(destdir, finaldestdir)
1889
1890   return True
1891
1892
1893 def ExportInfo(dest):
1894   """Get export configuration information.
1895
1896   @type dest: str
1897   @param dest: directory containing the export
1898
1899   @rtype: L{objects.SerializableConfigParser}
1900   @return: a serializable config file containing the
1901       export info
1902
1903   """
1904   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1905
1906   config = objects.SerializableConfigParser()
1907   config.read(cff)
1908
1909   if (not config.has_section(constants.INISECT_EXP) or
1910       not config.has_section(constants.INISECT_INS)):
1911     return None
1912
1913   return config
1914
1915
1916 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1917   """Import an os image into an instance.
1918
1919   @type instance: L{objects.Instance}
1920   @param instance: instance to import the disks into
1921   @type src_node: string
1922   @param src_node: source node for the disk images
1923   @type src_images: list of string
1924   @param src_images: absolute paths of the disk images
1925   @rtype: list of boolean
1926   @return: each boolean represent the success of importing the n-th disk
1927
1928   """
1929   import_env = OSEnvironment(instance)
1930   inst_os = OSFromDisk(instance.os)
1931   import_script = inst_os.import_script
1932
1933   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1934                                         instance.name, int(time.time()))
1935   if not os.path.exists(constants.LOG_OS_DIR):
1936     os.mkdir(constants.LOG_OS_DIR, 0750)
1937
1938   comprcmd = "gunzip"
1939   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1940                                import_script, logfile)
1941
1942   final_result = []
1943   for idx, image in enumerate(src_images):
1944     if image:
1945       destcmd = utils.BuildShellCmd('cat %s', image)
1946       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1947                                                        constants.GANETI_RUNAS,
1948                                                        destcmd)
1949       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1950       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1951       import_env['IMPORT_INDEX'] = str(idx)
1952       result = utils.RunCmd(command, env=import_env)
1953       if result.failed:
1954         logging.error("Disk import command '%s' returned error: %s"
1955                       " output: %s", command, result.fail_reason,
1956                       result.output)
1957         final_result.append(False)
1958       else:
1959         final_result.append(True)
1960     else:
1961       final_result.append(True)
1962
1963   return final_result
1964
1965
1966 def ListExports():
1967   """Return a list of exports currently available on this machine.
1968
1969   @rtype: list
1970   @return: list of the exports
1971
1972   """
1973   if os.path.isdir(constants.EXPORT_DIR):
1974     return utils.ListVisibleFiles(constants.EXPORT_DIR)
1975   else:
1976     return []
1977
1978
1979 def RemoveExport(export):
1980   """Remove an existing export from the node.
1981
1982   @type export: str
1983   @param export: the name of the export to remove
1984   @rtype: boolean
1985   @return: the success of the operation
1986
1987   """
1988   target = os.path.join(constants.EXPORT_DIR, export)
1989
1990   shutil.rmtree(target)
1991   # TODO: catch some of the relevant exceptions and provide a pretty
1992   # error message if rmtree fails.
1993
1994   return True
1995
1996
1997 def BlockdevRename(devlist):
1998   """Rename a list of block devices.
1999
2000   @type devlist: list of tuples
2001   @param devlist: list of tuples of the form  (disk,
2002       new_logical_id, new_physical_id); disk is an
2003       L{objects.Disk} object describing the current disk,
2004       and new logical_id/physical_id is the name we
2005       rename it to
2006   @rtype: boolean
2007   @return: True if all renames succeeded, False otherwise
2008
2009   """
2010   result = True
2011   for disk, unique_id in devlist:
2012     dev = _RecursiveFindBD(disk)
2013     if dev is None:
2014       result = False
2015       continue
2016     try:
2017       old_rpath = dev.dev_path
2018       dev.Rename(unique_id)
2019       new_rpath = dev.dev_path
2020       if old_rpath != new_rpath:
2021         DevCacheManager.RemoveCache(old_rpath)
2022         # FIXME: we should add the new cache information here, like:
2023         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2024         # but we don't have the owner here - maybe parse from existing
2025         # cache? for now, we only lose lvm data when we rename, which
2026         # is less critical than DRBD or MD
2027     except errors.BlockDeviceError:
2028       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2029       result = False
2030   return result
2031
2032
2033 def _TransformFileStorageDir(file_storage_dir):
2034   """Checks whether given file_storage_dir is valid.
2035
2036   Checks wheter the given file_storage_dir is within the cluster-wide
2037   default file_storage_dir stored in SimpleStore. Only paths under that
2038   directory are allowed.
2039
2040   @type file_storage_dir: str
2041   @param file_storage_dir: the path to check
2042
2043   @return: the normalized path if valid, None otherwise
2044
2045   """
2046   cfg = _GetConfig()
2047   file_storage_dir = os.path.normpath(file_storage_dir)
2048   base_file_storage_dir = cfg.GetFileStorageDir()
2049   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2050       base_file_storage_dir):
2051     logging.error("file storage directory '%s' is not under base file"
2052                   " storage directory '%s'",
2053                   file_storage_dir, base_file_storage_dir)
2054     return None
2055   return file_storage_dir
2056
2057
2058 def CreateFileStorageDir(file_storage_dir):
2059   """Create file storage directory.
2060
2061   @type file_storage_dir: str
2062   @param file_storage_dir: directory to create
2063
2064   @rtype: tuple
2065   @return: tuple with first element a boolean indicating wheter dir
2066       creation was successful or not
2067
2068   """
2069   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2070   result = True,
2071   if not file_storage_dir:
2072     result = False,
2073   else:
2074     if os.path.exists(file_storage_dir):
2075       if not os.path.isdir(file_storage_dir):
2076         logging.error("'%s' is not a directory", file_storage_dir)
2077         result = False,
2078     else:
2079       try:
2080         os.makedirs(file_storage_dir, 0750)
2081       except OSError, err:
2082         logging.error("Cannot create file storage directory '%s': %s",
2083                       file_storage_dir, err)
2084         result = False,
2085   return result
2086
2087
2088 def RemoveFileStorageDir(file_storage_dir):
2089   """Remove file storage directory.
2090
2091   Remove it only if it's empty. If not log an error and return.
2092
2093   @type file_storage_dir: str
2094   @param file_storage_dir: the directory we should cleanup
2095   @rtype: tuple (success,)
2096   @return: tuple of one element, C{success}, denoting
2097       whether the operation was successful
2098
2099   """
2100   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2101   result = True,
2102   if not file_storage_dir:
2103     result = False,
2104   else:
2105     if os.path.exists(file_storage_dir):
2106       if not os.path.isdir(file_storage_dir):
2107         logging.error("'%s' is not a directory", file_storage_dir)
2108         result = False,
2109       # deletes dir only if empty, otherwise we want to return False
2110       try:
2111         os.rmdir(file_storage_dir)
2112       except OSError:
2113         logging.exception("Cannot remove file storage directory '%s'",
2114                           file_storage_dir)
2115         result = False,
2116   return result
2117
2118
2119 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2120   """Rename the file storage directory.
2121
2122   @type old_file_storage_dir: str
2123   @param old_file_storage_dir: the current path
2124   @type new_file_storage_dir: str
2125   @param new_file_storage_dir: the name we should rename to
2126   @rtype: tuple (success,)
2127   @return: tuple of one element, C{success}, denoting
2128       whether the operation was successful
2129
2130   """
2131   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2132   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2133   result = True,
2134   if not old_file_storage_dir or not new_file_storage_dir:
2135     result = False,
2136   else:
2137     if not os.path.exists(new_file_storage_dir):
2138       if os.path.isdir(old_file_storage_dir):
2139         try:
2140           os.rename(old_file_storage_dir, new_file_storage_dir)
2141         except OSError:
2142           logging.exception("Cannot rename '%s' to '%s'",
2143                             old_file_storage_dir, new_file_storage_dir)
2144           result =  False,
2145       else:
2146         logging.error("'%s' is not a directory", old_file_storage_dir)
2147         result = False,
2148     else:
2149       if os.path.exists(old_file_storage_dir):
2150         logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
2151                       old_file_storage_dir, new_file_storage_dir)
2152         result = False,
2153   return result
2154
2155
2156 def _IsJobQueueFile(file_name):
2157   """Checks whether the given filename is in the queue directory.
2158
2159   @type file_name: str
2160   @param file_name: the file name we should check
2161   @rtype: boolean
2162   @return: whether the file is under the queue directory
2163
2164   """
2165   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2166   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2167
2168   if not result:
2169     logging.error("'%s' is not a file in the queue directory",
2170                   file_name)
2171
2172   return result
2173
2174
2175 def JobQueueUpdate(file_name, content):
2176   """Updates a file in the queue directory.
2177
2178   This is just a wrapper over L{utils.WriteFile}, with proper
2179   checking.
2180
2181   @type file_name: str
2182   @param file_name: the job file name
2183   @type content: str
2184   @param content: the new job contents
2185   @rtype: boolean
2186   @return: the success of the operation
2187
2188   """
2189   if not _IsJobQueueFile(file_name):
2190     return False
2191
2192   # Write and replace the file atomically
2193   utils.WriteFile(file_name, data=_Decompress(content))
2194
2195   return True
2196
2197
2198 def JobQueueRename(old, new):
2199   """Renames a job queue file.
2200
2201   This is just a wrapper over os.rename with proper checking.
2202
2203   @type old: str
2204   @param old: the old (actual) file name
2205   @type new: str
2206   @param new: the desired file name
2207   @rtype: boolean
2208   @return: the success of the operation
2209
2210   """
2211   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2212     return False
2213
2214   utils.RenameFile(old, new, mkdir=True)
2215
2216   return True
2217
2218
2219 def JobQueueSetDrainFlag(drain_flag):
2220   """Set the drain flag for the queue.
2221
2222   This will set or unset the queue drain flag.
2223
2224   @type drain_flag: boolean
2225   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2226   @rtype: boolean
2227   @return: always True
2228   @warning: the function always returns True
2229
2230   """
2231   if drain_flag:
2232     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2233   else:
2234     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2235
2236   return True
2237
2238
2239 def BlockdevClose(instance_name, disks):
2240   """Closes the given block devices.
2241
2242   This means they will be switched to secondary mode (in case of
2243   DRBD).
2244
2245   @param instance_name: if the argument is not empty, the symlinks
2246       of this instance will be removed
2247   @type disks: list of L{objects.Disk}
2248   @param disks: the list of disks to be closed
2249   @rtype: tuple (success, message)
2250   @return: a tuple of success and message, where success
2251       indicates the succes of the operation, and message
2252       which will contain the error details in case we
2253       failed
2254
2255   """
2256   bdevs = []
2257   for cf in disks:
2258     rd = _RecursiveFindBD(cf)
2259     if rd is None:
2260       return (False, "Can't find device %s" % cf)
2261     bdevs.append(rd)
2262
2263   msg = []
2264   for rd in bdevs:
2265     try:
2266       rd.Close()
2267     except errors.BlockDeviceError, err:
2268       msg.append(str(err))
2269   if msg:
2270     return (False, "Can't make devices secondary: %s" % ",".join(msg))
2271   else:
2272     if instance_name:
2273       _RemoveBlockDevLinks(instance_name, disks)
2274     return (True, "All devices secondary")
2275
2276
2277 def ValidateHVParams(hvname, hvparams):
2278   """Validates the given hypervisor parameters.
2279
2280   @type hvname: string
2281   @param hvname: the hypervisor name
2282   @type hvparams: dict
2283   @param hvparams: the hypervisor parameters to be validated
2284   @rtype: tuple (success, message)
2285   @return: a tuple of success and message, where success
2286       indicates the succes of the operation, and message
2287       which will contain the error details in case we
2288       failed
2289
2290   """
2291   try:
2292     hv_type = hypervisor.GetHypervisor(hvname)
2293     hv_type.ValidateParameters(hvparams)
2294     return (True, "Validation passed")
2295   except errors.HypervisorError, err:
2296     return (False, str(err))
2297
2298
2299 def DemoteFromMC():
2300   """Demotes the current node from master candidate role.
2301
2302   """
2303   # try to ensure we're not the master by mistake
2304   master, myself = ssconf.GetMasterAndMyself()
2305   if master == myself:
2306     return (False, "ssconf status shows I'm the master node, will not demote")
2307   pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2308   if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2309     return (False, "The master daemon is running, will not demote")
2310   try:
2311     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2312       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2313   except EnvironmentError, err:
2314     if err.errno != errno.ENOENT:
2315       return (False, "Error while backing up cluster file: %s" % str(err))
2316   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2317   return (True, "Done")
2318
2319
2320 def _FindDisks(nodes_ip, disks):
2321   """Sets the physical ID on disks and returns the block devices.
2322
2323   """
2324   # set the correct physical ID
2325   my_name = utils.HostInfo().name
2326   for cf in disks:
2327     cf.SetPhysicalID(my_name, nodes_ip)
2328
2329   bdevs = []
2330
2331   for cf in disks:
2332     rd = _RecursiveFindBD(cf)
2333     if rd is None:
2334       return (False, "Can't find device %s" % cf)
2335     bdevs.append(rd)
2336   return (True, bdevs)
2337
2338
2339 def DrbdDisconnectNet(nodes_ip, disks):
2340   """Disconnects the network on a list of drbd devices.
2341
2342   """
2343   status, bdevs = _FindDisks(nodes_ip, disks)
2344   if not status:
2345     return status, bdevs
2346
2347   # disconnect disks
2348   for rd in bdevs:
2349     try:
2350       rd.DisconnectNet()
2351     except errors.BlockDeviceError, err:
2352       logging.exception("Failed to go into standalone mode")
2353       return (False, "Can't change network configuration: %s" % str(err))
2354   return (True, "All disks are now disconnected")
2355
2356
2357 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2358   """Attaches the network on a list of drbd devices.
2359
2360   """
2361   status, bdevs = _FindDisks(nodes_ip, disks)
2362   if not status:
2363     return status, bdevs
2364
2365   if multimaster:
2366     for idx, rd in enumerate(bdevs):
2367       try:
2368         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2369       except EnvironmentError, err:
2370         return (False, "Can't create symlink: %s" % str(err))
2371   # reconnect disks, switch to new master configuration and if
2372   # needed primary mode
2373   for rd in bdevs:
2374     try:
2375       rd.AttachNet(multimaster)
2376     except errors.BlockDeviceError, err:
2377       return (False, "Can't change network configuration: %s" % str(err))
2378   # wait until the disks are connected; we need to retry the re-attach
2379   # if the device becomes standalone, as this might happen if the one
2380   # node disconnects and reconnects in a different mode before the
2381   # other node reconnects; in this case, one or both of the nodes will
2382   # decide it has wrong configuration and switch to standalone
2383   RECONNECT_TIMEOUT = 2 * 60
2384   sleep_time = 0.100 # start with 100 miliseconds
2385   timeout_limit = time.time() + RECONNECT_TIMEOUT
2386   while time.time() < timeout_limit:
2387     all_connected = True
2388     for rd in bdevs:
2389       stats = rd.GetProcStatus()
2390       if not (stats.is_connected or stats.is_in_resync):
2391         all_connected = False
2392       if stats.is_standalone:
2393         # peer had different config info and this node became
2394         # standalone, even though this should not happen with the
2395         # new staged way of changing disk configs
2396         try:
2397           rd.AttachNet(multimaster)
2398         except errors.BlockDeviceError, err:
2399           return (False, "Can't change network configuration: %s" % str(err))
2400     if all_connected:
2401       break
2402     time.sleep(sleep_time)
2403     sleep_time = min(5, sleep_time * 1.5)
2404   if not all_connected:
2405     return (False, "Timeout in disk reconnecting")
2406   if multimaster:
2407     # change to primary mode
2408     for rd in bdevs:
2409       try:
2410         rd.Open()
2411       except errors.BlockDeviceError, err:
2412         return (False, "Can't change to primary mode: %s" % str(err))
2413   if multimaster:
2414     msg = "multi-master and primary"
2415   else:
2416     msg = "single-master"
2417   return (True, "Disks are now configured as %s" % msg)
2418
2419
2420 def DrbdWaitSync(nodes_ip, disks):
2421   """Wait until DRBDs have synchronized.
2422
2423   """
2424   status, bdevs = _FindDisks(nodes_ip, disks)
2425   if not status:
2426     return status, bdevs
2427
2428   min_resync = 100
2429   alldone = True
2430   failure = False
2431   for rd in bdevs:
2432     stats = rd.GetProcStatus()
2433     if not (stats.is_connected or stats.is_in_resync):
2434       failure = True
2435       break
2436     alldone = alldone and (not stats.is_in_resync)
2437     if stats.sync_percent is not None:
2438       min_resync = min(min_resync, stats.sync_percent)
2439   return (not failure, (alldone, min_resync))
2440
2441
2442 class HooksRunner(object):
2443   """Hook runner.
2444
2445   This class is instantiated on the node side (ganeti-noded) and not
2446   on the master side.
2447
2448   """
2449   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2450
2451   def __init__(self, hooks_base_dir=None):
2452     """Constructor for hooks runner.
2453
2454     @type hooks_base_dir: str or None
2455     @param hooks_base_dir: if not None, this overrides the
2456         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2457
2458     """
2459     if hooks_base_dir is None:
2460       hooks_base_dir = constants.HOOKS_BASE_DIR
2461     self._BASE_DIR = hooks_base_dir
2462
2463   @staticmethod
2464   def ExecHook(script, env):
2465     """Exec one hook script.
2466
2467     @type script: str
2468     @param script: the full path to the script
2469     @type env: dict
2470     @param env: the environment with which to exec the script
2471     @rtype: tuple (success, message)
2472     @return: a tuple of success and message, where success
2473         indicates the succes of the operation, and message
2474         which will contain the error details in case we
2475         failed
2476
2477     """
2478     # exec the process using subprocess and log the output
2479     fdstdin = None
2480     try:
2481       fdstdin = open("/dev/null", "r")
2482       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2483                                stderr=subprocess.STDOUT, close_fds=True,
2484                                shell=False, cwd="/", env=env)
2485       output = ""
2486       try:
2487         output = child.stdout.read(4096)
2488         child.stdout.close()
2489       except EnvironmentError, err:
2490         output += "Hook script error: %s" % str(err)
2491
2492       while True:
2493         try:
2494           result = child.wait()
2495           break
2496         except EnvironmentError, err:
2497           if err.errno == errno.EINTR:
2498             continue
2499           raise
2500     finally:
2501       # try not to leak fds
2502       for fd in (fdstdin, ):
2503         if fd is not None:
2504           try:
2505             fd.close()
2506           except EnvironmentError, err:
2507             # just log the error
2508             #logging.exception("Error while closing fd %s", fd)
2509             pass
2510
2511     return result == 0, utils.SafeEncode(output.strip())
2512
2513   def RunHooks(self, hpath, phase, env):
2514     """Run the scripts in the hooks directory.
2515
2516     @type hpath: str
2517     @param hpath: the path to the hooks directory which
2518         holds the scripts
2519     @type phase: str
2520     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2521         L{constants.HOOKS_PHASE_POST}
2522     @type env: dict
2523     @param env: dictionary with the environment for the hook
2524     @rtype: list
2525     @return: list of 3-element tuples:
2526       - script path
2527       - script result, either L{constants.HKR_SUCCESS} or
2528         L{constants.HKR_FAIL}
2529       - output of the script
2530
2531     @raise errors.ProgrammerError: for invalid input
2532         parameters
2533
2534     """
2535     if phase == constants.HOOKS_PHASE_PRE:
2536       suffix = "pre"
2537     elif phase == constants.HOOKS_PHASE_POST:
2538       suffix = "post"
2539     else:
2540       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2541     rr = []
2542
2543     subdir = "%s-%s.d" % (hpath, suffix)
2544     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2545     try:
2546       dir_contents = utils.ListVisibleFiles(dir_name)
2547     except OSError:
2548       # FIXME: must log output in case of failures
2549       return rr
2550
2551     # we use the standard python sort order,
2552     # so 00name is the recommended naming scheme
2553     dir_contents.sort()
2554     for relname in dir_contents:
2555       fname = os.path.join(dir_name, relname)
2556       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2557           self.RE_MASK.match(relname) is not None):
2558         rrval = constants.HKR_SKIP
2559         output = ""
2560       else:
2561         result, output = self.ExecHook(fname, env)
2562         if not result:
2563           rrval = constants.HKR_FAIL
2564         else:
2565           rrval = constants.HKR_SUCCESS
2566       rr.append(("%s/%s" % (subdir, relname), rrval, output))
2567
2568     return rr
2569
2570
2571 class IAllocatorRunner(object):
2572   """IAllocator runner.
2573
2574   This class is instantiated on the node side (ganeti-noded) and not on
2575   the master side.
2576
2577   """
2578   def Run(self, name, idata):
2579     """Run an iallocator script.
2580
2581     @type name: str
2582     @param name: the iallocator script name
2583     @type idata: str
2584     @param idata: the allocator input data
2585
2586     @rtype: tuple
2587     @return: four element tuple of:
2588        - run status (one of the IARUN_ constants)
2589        - stdout
2590        - stderr
2591        - fail reason (as from L{utils.RunResult})
2592
2593     """
2594     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2595                                   os.path.isfile)
2596     if alloc_script is None:
2597       return (constants.IARUN_NOTFOUND, None, None, None)
2598
2599     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2600     try:
2601       os.write(fd, idata)
2602       os.close(fd)
2603       result = utils.RunCmd([alloc_script, fin_name])
2604       if result.failed:
2605         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2606                 result.fail_reason)
2607     finally:
2608       os.unlink(fin_name)
2609
2610     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2611
2612
2613 class DevCacheManager(object):
2614   """Simple class for managing a cache of block device information.
2615
2616   """
2617   _DEV_PREFIX = "/dev/"
2618   _ROOT_DIR = constants.BDEV_CACHE_DIR
2619
2620   @classmethod
2621   def _ConvertPath(cls, dev_path):
2622     """Converts a /dev/name path to the cache file name.
2623
2624     This replaces slashes with underscores and strips the /dev
2625     prefix. It then returns the full path to the cache file.
2626
2627     @type dev_path: str
2628     @param dev_path: the C{/dev/} path name
2629     @rtype: str
2630     @return: the converted path name
2631
2632     """
2633     if dev_path.startswith(cls._DEV_PREFIX):
2634       dev_path = dev_path[len(cls._DEV_PREFIX):]
2635     dev_path = dev_path.replace("/", "_")
2636     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2637     return fpath
2638
2639   @classmethod
2640   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2641     """Updates the cache information for a given device.
2642
2643     @type dev_path: str
2644     @param dev_path: the pathname of the device
2645     @type owner: str
2646     @param owner: the owner (instance name) of the device
2647     @type on_primary: bool
2648     @param on_primary: whether this is the primary
2649         node nor not
2650     @type iv_name: str
2651     @param iv_name: the instance-visible name of the
2652         device, as in objects.Disk.iv_name
2653
2654     @rtype: None
2655
2656     """
2657     if dev_path is None:
2658       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2659       return
2660     fpath = cls._ConvertPath(dev_path)
2661     if on_primary:
2662       state = "primary"
2663     else:
2664       state = "secondary"
2665     if iv_name is None:
2666       iv_name = "not_visible"
2667     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2668     try:
2669       utils.WriteFile(fpath, data=fdata)
2670     except EnvironmentError:
2671       logging.exception("Can't update bdev cache for %s", dev_path)
2672
2673   @classmethod
2674   def RemoveCache(cls, dev_path):
2675     """Remove data for a dev_path.
2676
2677     This is just a wrapper over L{utils.RemoveFile} with a converted
2678     path name and logging.
2679
2680     @type dev_path: str
2681     @param dev_path: the pathname of the device
2682
2683     @rtype: None
2684
2685     """
2686     if dev_path is None:
2687       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2688       return
2689     fpath = cls._ConvertPath(dev_path)
2690     try:
2691       utils.RemoveFile(fpath)
2692     except EnvironmentError:
2693       logging.exception("Can't update bdev cache for %s", dev_path)