Prevent race condition on MAC addresses
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36 import zlib
37 import base64
38
39 from ganeti import errors
40 from ganeti import utils
41 from ganeti import ssh
42 from ganeti import hypervisor
43 from ganeti import constants
44 from ganeti import bdev
45 from ganeti import objects
46 from ganeti import ssconf
47
48
49 def _GetConfig():
50   """Simple wrapper to return a SimpleStore.
51
52   @rtype: L{ssconf.SimpleStore}
53   @return: a SimpleStore instance
54
55   """
56   return ssconf.SimpleStore()
57
58
59 def _GetSshRunner(cluster_name):
60   """Simple wrapper to return an SshRunner.
61
62   @type cluster_name: str
63   @param cluster_name: the cluster name, which is needed
64       by the SshRunner constructor
65   @rtype: L{ssh.SshRunner}
66   @return: an SshRunner instance
67
68   """
69   return ssh.SshRunner(cluster_name)
70
71
72 def _Decompress(data):
73   """Unpacks data compressed by the RPC client.
74
75   @type data: list or tuple
76   @param data: Data sent by RPC client
77   @rtype: str
78   @return: Decompressed data
79
80   """
81   assert isinstance(data, (list, tuple))
82   assert len(data) == 2
83   (encoding, content) = data
84   if encoding == constants.RPC_ENCODING_NONE:
85     return content
86   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
87     return zlib.decompress(base64.b64decode(content))
88   else:
89     raise AssertionError("Unknown data encoding")
90
91
92 def _CleanDirectory(path, exclude=None):
93   """Removes all regular files in a directory.
94
95   @type path: str
96   @param path: the directory to clean
97   @type exclude: list
98   @param exclude: list of files to be excluded, defaults
99       to the empty list
100
101   """
102   if not os.path.isdir(path):
103     return
104   if exclude is None:
105     exclude = []
106   else:
107     # Normalize excluded paths
108     exclude = [os.path.normpath(i) for i in exclude]
109
110   for rel_name in utils.ListVisibleFiles(path):
111     full_name = os.path.normpath(os.path.join(path, rel_name))
112     if full_name in exclude:
113       continue
114     if os.path.isfile(full_name) and not os.path.islink(full_name):
115       utils.RemoveFile(full_name)
116
117
118 def JobQueuePurge():
119   """Removes job queue files and archived jobs.
120
121   @rtype: None
122
123   """
124   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
125   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
126
127
128 def GetMasterInfo():
129   """Returns master information.
130
131   This is an utility function to compute master information, either
132   for consumption here or from the node daemon.
133
134   @rtype: tuple
135   @return: (master_netdev, master_ip, master_name) if we have a good
136       configuration, otherwise (None, None, None)
137
138   """
139   try:
140     cfg = _GetConfig()
141     master_netdev = cfg.GetMasterNetdev()
142     master_ip = cfg.GetMasterIP()
143     master_node = cfg.GetMasterNode()
144   except errors.ConfigurationError, err:
145     logging.exception("Cluster configuration incomplete")
146     return (None, None, None)
147   return (master_netdev, master_ip, master_node)
148
149
150 def StartMaster(start_daemons):
151   """Activate local node as master node.
152
153   The function will always try activate the IP address of the master
154   (unless someone else has it). It will also start the master daemons,
155   based on the start_daemons parameter.
156
157   @type start_daemons: boolean
158   @param start_daemons: whther to also start the master
159       daemons (ganeti-masterd and ganeti-rapi)
160   @rtype: None
161
162   """
163   ok = True
164   master_netdev, master_ip, _ = GetMasterInfo()
165   if not master_netdev:
166     return False
167
168   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
169     if utils.OwnIpAddress(master_ip):
170       # we already have the ip:
171       logging.debug("Already started")
172     else:
173       logging.error("Someone else has the master ip, not activating")
174       ok = False
175   else:
176     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
177                            "dev", master_netdev, "label",
178                            "%s:0" % master_netdev])
179     if result.failed:
180       logging.error("Can't activate master IP: %s", result.output)
181       ok = False
182
183     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
184                            "-s", master_ip, master_ip])
185     # we'll ignore the exit code of arping
186
187   # and now start the master and rapi daemons
188   if start_daemons:
189     for daemon in 'ganeti-masterd', 'ganeti-rapi':
190       result = utils.RunCmd([daemon])
191       if result.failed:
192         logging.error("Can't start daemon %s: %s", daemon, result.output)
193         ok = False
194   return ok
195
196
197 def StopMaster(stop_daemons):
198   """Deactivate this node as master.
199
200   The function will always try to deactivate the IP address of the
201   master. It will also stop the master daemons depending on the
202   stop_daemons parameter.
203
204   @type stop_daemons: boolean
205   @param stop_daemons: whether to also stop the master daemons
206       (ganeti-masterd and ganeti-rapi)
207   @rtype: None
208
209   """
210   master_netdev, master_ip, _ = GetMasterInfo()
211   if not master_netdev:
212     return False
213
214   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
215                          "dev", master_netdev])
216   if result.failed:
217     logging.error("Can't remove the master IP, error: %s", result.output)
218     # but otherwise ignore the failure
219
220   if stop_daemons:
221     # stop/kill the rapi and the master daemon
222     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
223       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
224
225   return True
226
227
228 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
229   """Joins this node to the cluster.
230
231   This does the following:
232       - updates the hostkeys of the machine (rsa and dsa)
233       - adds the ssh private key to the user
234       - adds the ssh public key to the users' authorized_keys file
235
236   @type dsa: str
237   @param dsa: the DSA private key to write
238   @type dsapub: str
239   @param dsapub: the DSA public key to write
240   @type rsa: str
241   @param rsa: the RSA private key to write
242   @type rsapub: str
243   @param rsapub: the RSA public key to write
244   @type sshkey: str
245   @param sshkey: the SSH private key to write
246   @type sshpub: str
247   @param sshpub: the SSH public key to write
248   @rtype: boolean
249   @return: the success of the operation
250
251   """
252   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
253                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
254                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
255                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
256   for name, content, mode in sshd_keys:
257     utils.WriteFile(name, data=content, mode=mode)
258
259   try:
260     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
261                                                     mkdir=True)
262   except errors.OpExecError, err:
263     msg = "Error while processing user ssh files"
264     logging.exception(msg)
265     return (False, "%s: %s" % (msg, err))
266
267   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
268     utils.WriteFile(name, data=content, mode=0600)
269
270   utils.AddAuthorizedKey(auth_keys, sshpub)
271
272   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
273
274   return (True, "Node added successfully")
275
276
277 def LeaveCluster():
278   """Cleans up and remove the current node.
279
280   This function cleans up and prepares the current node to be removed
281   from the cluster.
282
283   If processing is successful, then it raises an
284   L{errors.QuitGanetiException} which is used as a special case to
285   shutdown the node daemon.
286
287   """
288   _CleanDirectory(constants.DATA_DIR)
289   JobQueuePurge()
290
291   try:
292     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
293   except errors.OpExecError:
294     logging.exception("Error while processing ssh files")
295     return
296
297   f = open(pub_key, 'r')
298   try:
299     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
300   finally:
301     f.close()
302
303   utils.RemoveFile(priv_key)
304   utils.RemoveFile(pub_key)
305
306   # Return a reassuring string to the caller, and quit
307   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
308
309
310 def GetNodeInfo(vgname, hypervisor_type):
311   """Gives back a hash with different informations about the node.
312
313   @type vgname: C{string}
314   @param vgname: the name of the volume group to ask for disk space information
315   @type hypervisor_type: C{str}
316   @param hypervisor_type: the name of the hypervisor to ask for
317       memory information
318   @rtype: C{dict}
319   @return: dictionary with the following keys:
320       - vg_size is the size of the configured volume group in MiB
321       - vg_free is the free size of the volume group in MiB
322       - memory_dom0 is the memory allocated for domain0 in MiB
323       - memory_free is the currently available (free) ram in MiB
324       - memory_total is the total number of ram in MiB
325
326   """
327   outputarray = {}
328   vginfo = _GetVGInfo(vgname)
329   outputarray['vg_size'] = vginfo['vg_size']
330   outputarray['vg_free'] = vginfo['vg_free']
331
332   hyper = hypervisor.GetHypervisor(hypervisor_type)
333   hyp_info = hyper.GetNodeInfo()
334   if hyp_info is not None:
335     outputarray.update(hyp_info)
336
337   f = open("/proc/sys/kernel/random/boot_id", 'r')
338   try:
339     outputarray["bootid"] = f.read(128).rstrip("\n")
340   finally:
341     f.close()
342
343   return outputarray
344
345
346 def VerifyNode(what, cluster_name):
347   """Verify the status of the local node.
348
349   Based on the input L{what} parameter, various checks are done on the
350   local node.
351
352   If the I{filelist} key is present, this list of
353   files is checksummed and the file/checksum pairs are returned.
354
355   If the I{nodelist} key is present, we check that we have
356   connectivity via ssh with the target nodes (and check the hostname
357   report).
358
359   If the I{node-net-test} key is present, we check that we have
360   connectivity to the given nodes via both primary IP and, if
361   applicable, secondary IPs.
362
363   @type what: C{dict}
364   @param what: a dictionary of things to check:
365       - filelist: list of files for which to compute checksums
366       - nodelist: list of nodes we should check ssh communication with
367       - node-net-test: list of nodes we should check node daemon port
368         connectivity with
369       - hypervisor: list with hypervisors to run the verify for
370   @rtype: dict
371   @return: a dictionary with the same keys as the input dict, and
372       values representing the result of the checks
373
374   """
375   result = {}
376
377   if constants.NV_HYPERVISOR in what:
378     result[constants.NV_HYPERVISOR] = tmp = {}
379     for hv_name in what[constants.NV_HYPERVISOR]:
380       tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
381
382   if constants.NV_FILELIST in what:
383     result[constants.NV_FILELIST] = utils.FingerprintFiles(
384       what[constants.NV_FILELIST])
385
386   if constants.NV_NODELIST in what:
387     result[constants.NV_NODELIST] = tmp = {}
388     random.shuffle(what[constants.NV_NODELIST])
389     for node in what[constants.NV_NODELIST]:
390       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
391       if not success:
392         tmp[node] = message
393
394   if constants.NV_NODENETTEST in what:
395     result[constants.NV_NODENETTEST] = tmp = {}
396     my_name = utils.HostInfo().name
397     my_pip = my_sip = None
398     for name, pip, sip in what[constants.NV_NODENETTEST]:
399       if name == my_name:
400         my_pip = pip
401         my_sip = sip
402         break
403     if not my_pip:
404       tmp[my_name] = ("Can't find my own primary/secondary IP"
405                       " in the node list")
406     else:
407       port = utils.GetNodeDaemonPort()
408       for name, pip, sip in what[constants.NV_NODENETTEST]:
409         fail = []
410         if not utils.TcpPing(pip, port, source=my_pip):
411           fail.append("primary")
412         if sip != pip:
413           if not utils.TcpPing(sip, port, source=my_sip):
414             fail.append("secondary")
415         if fail:
416           tmp[name] = ("failure using the %s interface(s)" %
417                        " and ".join(fail))
418
419   if constants.NV_LVLIST in what:
420     result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
421
422   if constants.NV_INSTANCELIST in what:
423     result[constants.NV_INSTANCELIST] = GetInstanceList(
424       what[constants.NV_INSTANCELIST])
425
426   if constants.NV_VGLIST in what:
427     result[constants.NV_VGLIST] = ListVolumeGroups()
428
429   if constants.NV_VERSION in what:
430     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
431                                     constants.RELEASE_VERSION)
432
433   if constants.NV_HVINFO in what:
434     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
435     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
436
437   if constants.NV_DRBDLIST in what:
438     try:
439       used_minors = bdev.DRBD8.GetUsedDevs().keys()
440     except errors.BlockDeviceError:
441       logging.warning("Can't get used minors list", exc_info=True)
442       used_minors = []
443     result[constants.NV_DRBDLIST] = used_minors
444
445   return result
446
447
448 def GetVolumeList(vg_name):
449   """Compute list of logical volumes and their size.
450
451   @type vg_name: str
452   @param vg_name: the volume group whose LVs we should list
453   @rtype: dict
454   @return:
455       dictionary of all partions (key) with value being a tuple of
456       their size (in MiB), inactive and online status::
457
458         {'test1': ('20.06', True, True)}
459
460       in case of errors, a string is returned with the error
461       details.
462
463   """
464   lvs = {}
465   sep = '|'
466   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
467                          "--separator=%s" % sep,
468                          "-olv_name,lv_size,lv_attr", vg_name])
469   if result.failed:
470     logging.error("Failed to list logical volumes, lvs output: %s",
471                   result.output)
472     return result.output
473
474   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
475   for line in result.stdout.splitlines():
476     line = line.strip()
477     match = valid_line_re.match(line)
478     if not match:
479       logging.error("Invalid line returned from lvs output: '%s'", line)
480       continue
481     name, size, attr = match.groups()
482     inactive = attr[4] == '-'
483     online = attr[5] == 'o'
484     lvs[name] = (size, inactive, online)
485
486   return lvs
487
488
489 def ListVolumeGroups():
490   """List the volume groups and their size.
491
492   @rtype: dict
493   @return: dictionary with keys volume name and values the
494       size of the volume
495
496   """
497   return utils.ListVolumeGroups()
498
499
500 def NodeVolumes():
501   """List all volumes on this node.
502
503   @rtype: list
504   @return:
505     A list of dictionaries, each having four keys:
506       - name: the logical volume name,
507       - size: the size of the logical volume
508       - dev: the physical device on which the LV lives
509       - vg: the volume group to which it belongs
510
511     In case of errors, we return an empty list and log the
512     error.
513
514     Note that since a logical volume can live on multiple physical
515     volumes, the resulting list might include a logical volume
516     multiple times.
517
518   """
519   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
520                          "--separator=|",
521                          "--options=lv_name,lv_size,devices,vg_name"])
522   if result.failed:
523     logging.error("Failed to list logical volumes, lvs output: %s",
524                   result.output)
525     return []
526
527   def parse_dev(dev):
528     if '(' in dev:
529       return dev.split('(')[0]
530     else:
531       return dev
532
533   def map_line(line):
534     return {
535       'name': line[0].strip(),
536       'size': line[1].strip(),
537       'dev': parse_dev(line[2].strip()),
538       'vg': line[3].strip(),
539     }
540
541   return [map_line(line.split('|')) for line in result.stdout.splitlines()
542           if line.count('|') >= 3]
543
544
545 def BridgesExist(bridges_list):
546   """Check if a list of bridges exist on the current node.
547
548   @rtype: boolean
549   @return: C{True} if all of them exist, C{False} otherwise
550
551   """
552   for bridge in bridges_list:
553     if not utils.BridgeExists(bridge):
554       return False
555
556   return True
557
558
559 def GetInstanceList(hypervisor_list):
560   """Provides a list of instances.
561
562   @type hypervisor_list: list
563   @param hypervisor_list: the list of hypervisors to query information
564
565   @rtype: list
566   @return: a list of all running instances on the current node
567     - instance1.example.com
568     - instance2.example.com
569
570   """
571   results = []
572   for hname in hypervisor_list:
573     try:
574       names = hypervisor.GetHypervisor(hname).ListInstances()
575       results.extend(names)
576     except errors.HypervisorError, err:
577       logging.exception("Error enumerating instances for hypevisor %s", hname)
578       # FIXME: should we somehow not propagate this to the master?
579       raise
580
581   return results
582
583
584 def GetInstanceInfo(instance, hname):
585   """Gives back the informations about an instance as a dictionary.
586
587   @type instance: string
588   @param instance: the instance name
589   @type hname: string
590   @param hname: the hypervisor type of the instance
591
592   @rtype: dict
593   @return: dictionary with the following keys:
594       - memory: memory size of instance (int)
595       - state: xen state of instance (string)
596       - time: cpu time of instance (float)
597
598   """
599   output = {}
600
601   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
602   if iinfo is not None:
603     output['memory'] = iinfo[2]
604     output['state'] = iinfo[4]
605     output['time'] = iinfo[5]
606
607   return output
608
609
610 def GetInstanceMigratable(instance):
611   """Gives whether an instance can be migrated.
612
613   @type instance: L{objects.Instance}
614   @param instance: object representing the instance to be checked.
615
616   @rtype: tuple
617   @return: tuple of (result, description) where:
618       - result: whether the instance can be migrated or not
619       - description: a description of the issue, if relevant
620
621   """
622   hyper = hypervisor.GetHypervisor(instance.hypervisor)
623   if instance.name not in hyper.ListInstances():
624     return (False, 'not running')
625
626   for idx in range(len(instance.disks)):
627     link_name = _GetBlockDevSymlinkPath(instance.name, idx)
628     if not os.path.islink(link_name):
629       return (False, 'not restarted since ganeti 1.2.5')
630
631   return (True, '')
632
633
634 def GetAllInstancesInfo(hypervisor_list):
635   """Gather data about all instances.
636
637   This is the equivalent of L{GetInstanceInfo}, except that it
638   computes data for all instances at once, thus being faster if one
639   needs data about more than one instance.
640
641   @type hypervisor_list: list
642   @param hypervisor_list: list of hypervisors to query for instance data
643
644   @rtype: dict
645   @return: dictionary of instance: data, with data having the following keys:
646       - memory: memory size of instance (int)
647       - state: xen state of instance (string)
648       - time: cpu time of instance (float)
649       - vcpus: the number of vcpus
650
651   """
652   output = {}
653
654   for hname in hypervisor_list:
655     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
656     if iinfo:
657       for name, inst_id, memory, vcpus, state, times in iinfo:
658         value = {
659           'memory': memory,
660           'vcpus': vcpus,
661           'state': state,
662           'time': times,
663           }
664         if name in output and output[name] != value:
665           raise errors.HypervisorError("Instance %s running duplicate"
666                                        " with different parameters" % name)
667         output[name] = value
668
669   return output
670
671
672 def InstanceOsAdd(instance):
673   """Add an OS to an instance.
674
675   @type instance: L{objects.Instance}
676   @param instance: Instance whose OS is to be installed
677   @rtype: boolean
678   @return: the success of the operation
679
680   """
681   try:
682     inst_os = OSFromDisk(instance.os)
683   except errors.InvalidOS, err:
684     os_name, os_dir, os_err = err.args
685     if os_dir is None:
686       return (False, "Can't find OS '%s': %s" % (os_name, os_err))
687     else:
688       return (False, "Error parsing OS '%s' in directory %s: %s" %
689               (os_name, os_dir, os_err))
690
691   create_env = OSEnvironment(instance)
692
693   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
694                                      instance.name, int(time.time()))
695
696   result = utils.RunCmd([inst_os.create_script], env=create_env,
697                         cwd=inst_os.path, output=logfile,)
698   if result.failed:
699     logging.error("os create command '%s' returned error: %s, logfile: %s,"
700                   " output: %s", result.cmd, result.fail_reason, logfile,
701                   result.output)
702     lines = [utils.SafeEncode(val)
703              for val in utils.TailFile(logfile, lines=20)]
704     return (False, "OS create script failed (%s), last lines in the"
705             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
706
707   return (True, "Successfully installed")
708
709
710 def RunRenameInstance(instance, old_name):
711   """Run the OS rename script for an instance.
712
713   @type instance: L{objects.Instance}
714   @param instance: Instance whose OS is to be installed
715   @type old_name: string
716   @param old_name: previous instance name
717   @rtype: boolean
718   @return: the success of the operation
719
720   """
721   inst_os = OSFromDisk(instance.os)
722
723   rename_env = OSEnvironment(instance)
724   rename_env['OLD_INSTANCE_NAME'] = old_name
725
726   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
727                                            old_name,
728                                            instance.name, int(time.time()))
729
730   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
731                         cwd=inst_os.path, output=logfile)
732
733   if result.failed:
734     logging.error("os create command '%s' returned error: %s output: %s",
735                   result.cmd, result.fail_reason, result.output)
736     lines = [utils.SafeEncode(val)
737              for val in utils.TailFile(logfile, lines=20)]
738     return (False, "OS rename script failed (%s), last lines in the"
739             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
740
741   return (True, "Rename successful")
742
743
744 def _GetVGInfo(vg_name):
745   """Get informations about the volume group.
746
747   @type vg_name: str
748   @param vg_name: the volume group which we query
749   @rtype: dict
750   @return:
751     A dictionary with the following keys:
752       - C{vg_size} is the total size of the volume group in MiB
753       - C{vg_free} is the free size of the volume group in MiB
754       - C{pv_count} are the number of physical disks in that VG
755
756     If an error occurs during gathering of data, we return the same dict
757     with keys all set to None.
758
759   """
760   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
761
762   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
763                          "--nosuffix", "--units=m", "--separator=:", vg_name])
764
765   if retval.failed:
766     logging.error("volume group %s not present", vg_name)
767     return retdic
768   valarr = retval.stdout.strip().rstrip(':').split(':')
769   if len(valarr) == 3:
770     try:
771       retdic = {
772         "vg_size": int(round(float(valarr[0]), 0)),
773         "vg_free": int(round(float(valarr[1]), 0)),
774         "pv_count": int(valarr[2]),
775         }
776     except ValueError, err:
777       logging.exception("Fail to parse vgs output")
778   else:
779     logging.error("vgs output has the wrong number of fields (expected"
780                   " three): %s", str(valarr))
781   return retdic
782
783
784 def _GetBlockDevSymlinkPath(instance_name, idx):
785   return os.path.join(constants.DISK_LINKS_DIR,
786                       "%s:%d" % (instance_name, idx))
787
788
789 def _SymlinkBlockDev(instance_name, device_path, idx):
790   """Set up symlinks to a instance's block device.
791
792   This is an auxiliary function run when an instance is start (on the primary
793   node) or when an instance is migrated (on the target node).
794
795
796   @param instance_name: the name of the target instance
797   @param device_path: path of the physical block device, on the node
798   @param idx: the disk index
799   @return: absolute path to the disk's symlink
800
801   """
802   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
803   try:
804     os.symlink(device_path, link_name)
805   except OSError, err:
806     if err.errno == errno.EEXIST:
807       if (not os.path.islink(link_name) or
808           os.readlink(link_name) != device_path):
809         os.remove(link_name)
810         os.symlink(device_path, link_name)
811     else:
812       raise
813
814   return link_name
815
816
817 def _RemoveBlockDevLinks(instance_name, disks):
818   """Remove the block device symlinks belonging to the given instance.
819
820   """
821   for idx, disk in enumerate(disks):
822     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
823     if os.path.islink(link_name):
824       try:
825         os.remove(link_name)
826       except OSError:
827         logging.exception("Can't remove symlink '%s'", link_name)
828
829
830 def _GatherAndLinkBlockDevs(instance):
831   """Set up an instance's block device(s).
832
833   This is run on the primary node at instance startup. The block
834   devices must be already assembled.
835
836   @type instance: L{objects.Instance}
837   @param instance: the instance whose disks we shoul assemble
838   @rtype: list
839   @return: list of (disk_object, device_path)
840
841   """
842   block_devices = []
843   for idx, disk in enumerate(instance.disks):
844     device = _RecursiveFindBD(disk)
845     if device is None:
846       raise errors.BlockDeviceError("Block device '%s' is not set up." %
847                                     str(disk))
848     device.Open()
849     try:
850       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
851     except OSError, e:
852       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
853                                     e.strerror)
854
855     block_devices.append((disk, link_name))
856
857   return block_devices
858
859
860 def StartInstance(instance, extra_args):
861   """Start an instance.
862
863   @type instance: L{objects.Instance}
864   @param instance: the instance object
865   @rtype: boolean
866   @return: whether the startup was successful or not
867
868   """
869   running_instances = GetInstanceList([instance.hypervisor])
870
871   if instance.name in running_instances:
872     return (True, "Already running")
873
874   try:
875     block_devices = _GatherAndLinkBlockDevs(instance)
876     hyper = hypervisor.GetHypervisor(instance.hypervisor)
877     hyper.StartInstance(instance, block_devices, extra_args)
878   except errors.BlockDeviceError, err:
879     logging.exception("Failed to start instance")
880     return (False, "Block device error: %s" % str(err))
881   except errors.HypervisorError, err:
882     logging.exception("Failed to start instance")
883     _RemoveBlockDevLinks(instance.name, instance.disks)
884     return (False, "Hypervisor error: %s" % str(err))
885
886   return (True, "Instance started successfully")
887
888
889 def InstanceShutdown(instance):
890   """Shut an instance down.
891
892   @note: this functions uses polling with a hardcoded timeout.
893
894   @type instance: L{objects.Instance}
895   @param instance: the instance object
896   @rtype: boolean
897   @return: whether the startup was successful or not
898
899   """
900   hv_name = instance.hypervisor
901   running_instances = GetInstanceList([hv_name])
902
903   if instance.name not in running_instances:
904     return (True, "Instance already stopped")
905
906   hyper = hypervisor.GetHypervisor(hv_name)
907   try:
908     hyper.StopInstance(instance)
909   except errors.HypervisorError, err:
910     msg = "Failed to stop instance %s: %s" % (instance.name, err)
911     logging.error(msg)
912     return (False, msg)
913
914   # test every 10secs for 2min
915
916   time.sleep(1)
917   for dummy in range(11):
918     if instance.name not in GetInstanceList([hv_name]):
919       break
920     time.sleep(10)
921   else:
922     # the shutdown did not succeed
923     logging.error("Shutdown of '%s' unsuccessful, using destroy",
924                   instance.name)
925
926     try:
927       hyper.StopInstance(instance, force=True)
928     except errors.HypervisorError, err:
929       msg = "Failed to force stop instance %s: %s" % (instance.name, err)
930       logging.error(msg)
931       return (False, msg)
932
933     time.sleep(1)
934     if instance.name in GetInstanceList([hv_name]):
935       msg = ("Could not shutdown instance %s even by destroy" %
936              instance.name)
937       logging.error(msg)
938       return (False, msg)
939
940   _RemoveBlockDevLinks(instance.name, instance.disks)
941
942   return (True, "Instance has been shutdown successfully")
943
944
945 def InstanceReboot(instance, reboot_type, extra_args):
946   """Reboot an instance.
947
948   @type instance: L{objects.Instance}
949   @param instance: the instance object to reboot
950   @type reboot_type: str
951   @param reboot_type: the type of reboot, one the following
952     constants:
953       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
954         instance OS, do not recreate the VM
955       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
956         restart the VM (at the hypervisor level)
957       - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
958         is not accepted here, since that mode is handled
959         differently
960   @rtype: boolean
961   @return: the success of the operation
962
963   """
964   running_instances = GetInstanceList([instance.hypervisor])
965
966   if instance.name not in running_instances:
967     msg = "Cannot reboot instance %s that is not running" % instance.name
968     logging.error(msg)
969     return (False, msg)
970
971   hyper = hypervisor.GetHypervisor(instance.hypervisor)
972   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
973     try:
974       hyper.RebootInstance(instance)
975     except errors.HypervisorError, err:
976       msg = "Failed to soft reboot instance %s: %s" % (instance.name, err)
977       logging.error(msg)
978       return (False, msg)
979   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
980     try:
981       InstanceShutdown(instance)
982       StartInstance(instance, extra_args)
983     except errors.HypervisorError, err:
984       msg = "Failed to hard reboot instance %s: %s" % (instance.name, err)
985       logging.error(msg)
986       return (False, msg)
987   else:
988     return (False, "Invalid reboot_type received: %s" % (reboot_type,))
989
990   return (True, "Reboot successful")
991
992
993 def MigrationInfo(instance):
994   """Gather information about an instance to be migrated.
995
996   @type instance: L{objects.Instance}
997   @param instance: the instance definition
998
999   """
1000   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1001   try:
1002     info = hyper.MigrationInfo(instance)
1003   except errors.HypervisorError, err:
1004     msg = "Failed to fetch migration information"
1005     logging.exception(msg)
1006     return (False, '%s: %s' % (msg, err))
1007   return (True, info)
1008
1009
1010 def AcceptInstance(instance, info, target):
1011   """Prepare the node to accept an instance.
1012
1013   @type instance: L{objects.Instance}
1014   @param instance: the instance definition
1015   @type info: string/data (opaque)
1016   @param info: migration information, from the source node
1017   @type target: string
1018   @param target: target host (usually ip), on this node
1019
1020   """
1021   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1022   try:
1023     hyper.AcceptInstance(instance, info, target)
1024   except errors.HypervisorError, err:
1025     msg = "Failed to accept instance"
1026     logging.exception(msg)
1027     return (False, '%s: %s' % (msg, err))
1028   return (True, "Accept successfull")
1029
1030
1031 def FinalizeMigration(instance, info, success):
1032   """Finalize any preparation to accept an instance.
1033
1034   @type instance: L{objects.Instance}
1035   @param instance: the instance definition
1036   @type info: string/data (opaque)
1037   @param info: migration information, from the source node
1038   @type success: boolean
1039   @param success: whether the migration was a success or a failure
1040
1041   """
1042   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1043   try:
1044     hyper.FinalizeMigration(instance, info, success)
1045   except errors.HypervisorError, err:
1046     msg = "Failed to finalize migration"
1047     logging.exception(msg)
1048     return (False, '%s: %s' % (msg, err))
1049   return (True, "Migration Finalized")
1050
1051
1052 def MigrateInstance(instance, target, live):
1053   """Migrates an instance to another node.
1054
1055   @type instance: L{objects.Instance}
1056   @param instance: the instance definition
1057   @type target: string
1058   @param target: the target node name
1059   @type live: boolean
1060   @param live: whether the migration should be done live or not (the
1061       interpretation of this parameter is left to the hypervisor)
1062   @rtype: tuple
1063   @return: a tuple of (success, msg) where:
1064       - succes is a boolean denoting the success/failure of the operation
1065       - msg is a string with details in case of failure
1066
1067   """
1068   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1069
1070   try:
1071     hyper.MigrateInstance(instance.name, target, live)
1072   except errors.HypervisorError, err:
1073     msg = "Failed to migrate instance"
1074     logging.exception(msg)
1075     return (False, "%s: %s" % (msg, err))
1076   return (True, "Migration successfull")
1077
1078
1079 def BlockdevCreate(disk, size, owner, on_primary, info):
1080   """Creates a block device for an instance.
1081
1082   @type disk: L{objects.Disk}
1083   @param disk: the object describing the disk we should create
1084   @type size: int
1085   @param size: the size of the physical underlying device, in MiB
1086   @type owner: str
1087   @param owner: the name of the instance for which disk is created,
1088       used for device cache data
1089   @type on_primary: boolean
1090   @param on_primary:  indicates if it is the primary node or not
1091   @type info: string
1092   @param info: string that will be sent to the physical device
1093       creation, used for example to set (LVM) tags on LVs
1094
1095   @return: the new unique_id of the device (this can sometime be
1096       computed only after creation), or None. On secondary nodes,
1097       it's not required to return anything.
1098
1099   """
1100   clist = []
1101   if disk.children:
1102     for child in disk.children:
1103       try:
1104         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1105       except errors.BlockDeviceError, err:
1106         errmsg = "Can't assemble device %s: %s" % (child, err)
1107         logging.error(errmsg)
1108         return False, errmsg
1109       if on_primary or disk.AssembleOnSecondary():
1110         # we need the children open in case the device itself has to
1111         # be assembled
1112         try:
1113           crdev.Open()
1114         except errors.BlockDeviceError, err:
1115           errmsg = "Can't make child '%s' read-write: %s" % (child, err)
1116           logging.error(errmsg)
1117           return False, errmsg
1118       clist.append(crdev)
1119
1120   try:
1121     device = bdev.Create(disk.dev_type, disk.physical_id, clist, size)
1122   except errors.BlockDeviceError, err:
1123     return False, "Can't create block device: %s" % str(err)
1124
1125   if on_primary or disk.AssembleOnSecondary():
1126     try:
1127       device.Assemble()
1128     except errors.BlockDeviceError, err:
1129       errmsg = ("Can't assemble device after creation, very"
1130                 " unusual event: %s" % str(err))
1131       logging.error(errmsg)
1132       return False, errmsg
1133     device.SetSyncSpeed(constants.SYNC_SPEED)
1134     if on_primary or disk.OpenOnSecondary():
1135       try:
1136         device.Open(force=True)
1137       except errors.BlockDeviceError, err:
1138         errmsg = ("Can't make device r/w after creation, very"
1139                   " unusual event: %s" % str(err))
1140         logging.error(errmsg)
1141         return False, errmsg
1142     DevCacheManager.UpdateCache(device.dev_path, owner,
1143                                 on_primary, disk.iv_name)
1144
1145   device.SetInfo(info)
1146
1147   physical_id = device.unique_id
1148   return True, physical_id
1149
1150
1151 def BlockdevRemove(disk):
1152   """Remove a block device.
1153
1154   @note: This is intended to be called recursively.
1155
1156   @type disk: L{objects.Disk}
1157   @param disk: the disk object we should remove
1158   @rtype: boolean
1159   @return: the success of the operation
1160
1161   """
1162   msgs = []
1163   result = True
1164   try:
1165     rdev = _RecursiveFindBD(disk)
1166   except errors.BlockDeviceError, err:
1167     # probably can't attach
1168     logging.info("Can't attach to device %s in remove", disk)
1169     rdev = None
1170   if rdev is not None:
1171     r_path = rdev.dev_path
1172     try:
1173       rdev.Remove()
1174     except errors.BlockDeviceError, err:
1175       msgs.append(str(err))
1176       result = False
1177     if result:
1178       DevCacheManager.RemoveCache(r_path)
1179
1180   if disk.children:
1181     for child in disk.children:
1182       c_status, c_msg = BlockdevRemove(child)
1183       result = result and c_status
1184       if c_msg: # not an empty message
1185         msgs.append(c_msg)
1186
1187   return (result, "; ".join(msgs))
1188
1189
1190 def _RecursiveAssembleBD(disk, owner, as_primary):
1191   """Activate a block device for an instance.
1192
1193   This is run on the primary and secondary nodes for an instance.
1194
1195   @note: this function is called recursively.
1196
1197   @type disk: L{objects.Disk}
1198   @param disk: the disk we try to assemble
1199   @type owner: str
1200   @param owner: the name of the instance which owns the disk
1201   @type as_primary: boolean
1202   @param as_primary: if we should make the block device
1203       read/write
1204
1205   @return: the assembled device or None (in case no device
1206       was assembled)
1207   @raise errors.BlockDeviceError: in case there is an error
1208       during the activation of the children or the device
1209       itself
1210
1211   """
1212   children = []
1213   if disk.children:
1214     mcn = disk.ChildrenNeeded()
1215     if mcn == -1:
1216       mcn = 0 # max number of Nones allowed
1217     else:
1218       mcn = len(disk.children) - mcn # max number of Nones
1219     for chld_disk in disk.children:
1220       try:
1221         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1222       except errors.BlockDeviceError, err:
1223         if children.count(None) >= mcn:
1224           raise
1225         cdev = None
1226         logging.error("Error in child activation (but continuing): %s",
1227                       str(err))
1228       children.append(cdev)
1229
1230   if as_primary or disk.AssembleOnSecondary():
1231     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children)
1232     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1233     result = r_dev
1234     if as_primary or disk.OpenOnSecondary():
1235       r_dev.Open()
1236     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1237                                 as_primary, disk.iv_name)
1238
1239   else:
1240     result = True
1241   return result
1242
1243
1244 def BlockdevAssemble(disk, owner, as_primary):
1245   """Activate a block device for an instance.
1246
1247   This is a wrapper over _RecursiveAssembleBD.
1248
1249   @rtype: str or boolean
1250   @return: a C{/dev/...} path for primary nodes, and
1251       C{True} for secondary nodes
1252
1253   """
1254   status = True
1255   result = "no error information"
1256   try:
1257     result = _RecursiveAssembleBD(disk, owner, as_primary)
1258     if isinstance(result, bdev.BlockDev):
1259       result = result.dev_path
1260   except errors.BlockDeviceError, err:
1261     result = "Error while assembling disk: %s" % str(err)
1262     status = False
1263   return (status, result)
1264
1265
1266 def BlockdevShutdown(disk):
1267   """Shut down a block device.
1268
1269   First, if the device is assembled (Attach() is successfull), then
1270   the device is shutdown. Then the children of the device are
1271   shutdown.
1272
1273   This function is called recursively. Note that we don't cache the
1274   children or such, as oppossed to assemble, shutdown of different
1275   devices doesn't require that the upper device was active.
1276
1277   @type disk: L{objects.Disk}
1278   @param disk: the description of the disk we should
1279       shutdown
1280   @rtype: boolean
1281   @return: the success of the operation
1282
1283   """
1284   msgs = []
1285   result = True
1286   r_dev = _RecursiveFindBD(disk)
1287   if r_dev is not None:
1288     r_path = r_dev.dev_path
1289     try:
1290       r_dev.Shutdown()
1291       DevCacheManager.RemoveCache(r_path)
1292     except errors.BlockDeviceError, err:
1293       msgs.append(str(err))
1294       result = False
1295
1296   if disk.children:
1297     for child in disk.children:
1298       c_status, c_msg = BlockdevShutdown(child)
1299       result = result and c_status
1300       if c_msg: # not an empty message
1301         msgs.append(c_msg)
1302
1303   return (result, "; ".join(msgs))
1304
1305
1306 def BlockdevAddchildren(parent_cdev, new_cdevs):
1307   """Extend a mirrored block device.
1308
1309   @type parent_cdev: L{objects.Disk}
1310   @param parent_cdev: the disk to which we should add children
1311   @type new_cdevs: list of L{objects.Disk}
1312   @param new_cdevs: the list of children which we should add
1313   @rtype: boolean
1314   @return: the success of the operation
1315
1316   """
1317   parent_bdev = _RecursiveFindBD(parent_cdev)
1318   if parent_bdev is None:
1319     logging.error("Can't find parent device")
1320     return False
1321   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1322   if new_bdevs.count(None) > 0:
1323     logging.error("Can't find new device(s) to add: %s:%s",
1324                   new_bdevs, new_cdevs)
1325     return False
1326   parent_bdev.AddChildren(new_bdevs)
1327   return True
1328
1329
1330 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1331   """Shrink a mirrored block device.
1332
1333   @type parent_cdev: L{objects.Disk}
1334   @param parent_cdev: the disk from which we should remove children
1335   @type new_cdevs: list of L{objects.Disk}
1336   @param new_cdevs: the list of children which we should remove
1337   @rtype: boolean
1338   @return: the success of the operation
1339
1340   """
1341   parent_bdev = _RecursiveFindBD(parent_cdev)
1342   if parent_bdev is None:
1343     logging.error("Can't find parent in remove children: %s", parent_cdev)
1344     return False
1345   devs = []
1346   for disk in new_cdevs:
1347     rpath = disk.StaticDevPath()
1348     if rpath is None:
1349       bd = _RecursiveFindBD(disk)
1350       if bd is None:
1351         logging.error("Can't find dynamic device %s while removing children",
1352                       disk)
1353         return False
1354       else:
1355         devs.append(bd.dev_path)
1356     else:
1357       devs.append(rpath)
1358   parent_bdev.RemoveChildren(devs)
1359   return True
1360
1361
1362 def BlockdevGetmirrorstatus(disks):
1363   """Get the mirroring status of a list of devices.
1364
1365   @type disks: list of L{objects.Disk}
1366   @param disks: the list of disks which we should query
1367   @rtype: disk
1368   @return:
1369       a list of (mirror_done, estimated_time) tuples, which
1370       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1371   @raise errors.BlockDeviceError: if any of the disks cannot be
1372       found
1373
1374   """
1375   stats = []
1376   for dsk in disks:
1377     rbd = _RecursiveFindBD(dsk)
1378     if rbd is None:
1379       raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1380     stats.append(rbd.CombinedSyncStatus())
1381   return stats
1382
1383
1384 def _RecursiveFindBD(disk):
1385   """Check if a device is activated.
1386
1387   If so, return informations about the real device.
1388
1389   @type disk: L{objects.Disk}
1390   @param disk: the disk object we need to find
1391
1392   @return: None if the device can't be found,
1393       otherwise the device instance
1394
1395   """
1396   children = []
1397   if disk.children:
1398     for chdisk in disk.children:
1399       children.append(_RecursiveFindBD(chdisk))
1400
1401   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1402
1403
1404 def BlockdevFind(disk):
1405   """Check if a device is activated.
1406
1407   If it is, return informations about the real device.
1408
1409   @type disk: L{objects.Disk}
1410   @param disk: the disk to find
1411   @rtype: None or tuple
1412   @return: None if the disk cannot be found, otherwise a
1413       tuple (device_path, major, minor, sync_percent,
1414       estimated_time, is_degraded)
1415
1416   """
1417   try:
1418     rbd = _RecursiveFindBD(disk)
1419   except errors.BlockDeviceError, err:
1420     return (False, str(err))
1421   if rbd is None:
1422     return (True, None)
1423   return (True, (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus())
1424
1425
1426 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1427   """Write a file to the filesystem.
1428
1429   This allows the master to overwrite(!) a file. It will only perform
1430   the operation if the file belongs to a list of configuration files.
1431
1432   @type file_name: str
1433   @param file_name: the target file name
1434   @type data: str
1435   @param data: the new contents of the file
1436   @type mode: int
1437   @param mode: the mode to give the file (can be None)
1438   @type uid: int
1439   @param uid: the owner of the file (can be -1 for default)
1440   @type gid: int
1441   @param gid: the group of the file (can be -1 for default)
1442   @type atime: float
1443   @param atime: the atime to set on the file (can be None)
1444   @type mtime: float
1445   @param mtime: the mtime to set on the file (can be None)
1446   @rtype: boolean
1447   @return: the success of the operation; errors are logged
1448       in the node daemon log
1449
1450   """
1451   if not os.path.isabs(file_name):
1452     logging.error("Filename passed to UploadFile is not absolute: '%s'",
1453                   file_name)
1454     return False
1455
1456   allowed_files = [
1457     constants.CLUSTER_CONF_FILE,
1458     constants.ETC_HOSTS,
1459     constants.SSH_KNOWN_HOSTS_FILE,
1460     constants.VNC_PASSWORD_FILE,
1461     ]
1462
1463   if file_name not in allowed_files:
1464     logging.error("Filename passed to UploadFile not in allowed"
1465                  " upload targets: '%s'", file_name)
1466     return False
1467
1468   raw_data = _Decompress(data)
1469
1470   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1471                   atime=atime, mtime=mtime)
1472   return True
1473
1474
1475 def WriteSsconfFiles(values):
1476   """Update all ssconf files.
1477
1478   Wrapper around the SimpleStore.WriteFiles.
1479
1480   """
1481   ssconf.SimpleStore().WriteFiles(values)
1482
1483
1484 def _ErrnoOrStr(err):
1485   """Format an EnvironmentError exception.
1486
1487   If the L{err} argument has an errno attribute, it will be looked up
1488   and converted into a textual C{E...} description. Otherwise the
1489   string representation of the error will be returned.
1490
1491   @type err: L{EnvironmentError}
1492   @param err: the exception to format
1493
1494   """
1495   if hasattr(err, 'errno'):
1496     detail = errno.errorcode[err.errno]
1497   else:
1498     detail = str(err)
1499   return detail
1500
1501
1502 def _OSOndiskVersion(name, os_dir):
1503   """Compute and return the API version of a given OS.
1504
1505   This function will try to read the API version of the OS given by
1506   the 'name' parameter and residing in the 'os_dir' directory.
1507
1508   @type name: str
1509   @param name: the OS name we should look for
1510   @type os_dir: str
1511   @param os_dir: the directory inwhich we should look for the OS
1512   @rtype: int or None
1513   @return:
1514       Either an integer denoting the version or None in the
1515       case when this is not a valid OS name.
1516   @raise errors.InvalidOS: if the OS cannot be found
1517
1518   """
1519   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1520
1521   try:
1522     st = os.stat(api_file)
1523   except EnvironmentError, err:
1524     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1525                            " found (%s)" % _ErrnoOrStr(err))
1526
1527   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1528     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1529                            " a regular file")
1530
1531   try:
1532     f = open(api_file)
1533     try:
1534       api_versions = f.readlines()
1535     finally:
1536       f.close()
1537   except EnvironmentError, err:
1538     raise errors.InvalidOS(name, os_dir, "error while reading the"
1539                            " API version (%s)" % _ErrnoOrStr(err))
1540
1541   api_versions = [version.strip() for version in api_versions]
1542   try:
1543     api_versions = [int(version) for version in api_versions]
1544   except (TypeError, ValueError), err:
1545     raise errors.InvalidOS(name, os_dir,
1546                            "API version is not integer (%s)" % str(err))
1547
1548   return api_versions
1549
1550
1551 def DiagnoseOS(top_dirs=None):
1552   """Compute the validity for all OSes.
1553
1554   @type top_dirs: list
1555   @param top_dirs: the list of directories in which to
1556       search (if not given defaults to
1557       L{constants.OS_SEARCH_PATH})
1558   @rtype: list of L{objects.OS}
1559   @return: an OS object for each name in all the given
1560       directories
1561
1562   """
1563   if top_dirs is None:
1564     top_dirs = constants.OS_SEARCH_PATH
1565
1566   result = []
1567   for dir_name in top_dirs:
1568     if os.path.isdir(dir_name):
1569       try:
1570         f_names = utils.ListVisibleFiles(dir_name)
1571       except EnvironmentError, err:
1572         logging.exception("Can't list the OS directory %s", dir_name)
1573         break
1574       for name in f_names:
1575         try:
1576           os_inst = OSFromDisk(name, base_dir=dir_name)
1577           result.append(os_inst)
1578         except errors.InvalidOS, err:
1579           result.append(objects.OS.FromInvalidOS(err))
1580
1581   return result
1582
1583
1584 def OSFromDisk(name, base_dir=None):
1585   """Create an OS instance from disk.
1586
1587   This function will return an OS instance if the given name is a
1588   valid OS name. Otherwise, it will raise an appropriate
1589   L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1590
1591   @type base_dir: string
1592   @keyword base_dir: Base directory containing OS installations.
1593                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1594   @rtype: L{objects.OS}
1595   @return: the OS instance if we find a valid one
1596   @raise errors.InvalidOS: if we don't find a valid OS
1597
1598   """
1599   if base_dir is None:
1600     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1601     if os_dir is None:
1602       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1603   else:
1604     os_dir = os.path.sep.join([base_dir, name])
1605
1606   api_versions = _OSOndiskVersion(name, os_dir)
1607
1608   if constants.OS_API_VERSION not in api_versions:
1609     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1610                            " (found %s want %s)"
1611                            % (api_versions, constants.OS_API_VERSION))
1612
1613   # OS Scripts dictionary, we will populate it with the actual script names
1614   os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1615
1616   for script in os_scripts:
1617     os_scripts[script] = os.path.sep.join([os_dir, script])
1618
1619     try:
1620       st = os.stat(os_scripts[script])
1621     except EnvironmentError, err:
1622       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1623                              (script, _ErrnoOrStr(err)))
1624
1625     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1626       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1627                              script)
1628
1629     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1630       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1631                              script)
1632
1633
1634   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1635                     create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1636                     export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1637                     import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1638                     rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1639                     api_versions=api_versions)
1640
1641 def OSEnvironment(instance, debug=0):
1642   """Calculate the environment for an os script.
1643
1644   @type instance: L{objects.Instance}
1645   @param instance: target instance for the os script run
1646   @type debug: integer
1647   @param debug: debug level (0 or 1, for OS Api 10)
1648   @rtype: dict
1649   @return: dict of environment variables
1650   @raise errors.BlockDeviceError: if the block device
1651       cannot be found
1652
1653   """
1654   result = {}
1655   result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1656   result['INSTANCE_NAME'] = instance.name
1657   result['INSTANCE_OS'] = instance.os
1658   result['HYPERVISOR'] = instance.hypervisor
1659   result['DISK_COUNT'] = '%d' % len(instance.disks)
1660   result['NIC_COUNT'] = '%d' % len(instance.nics)
1661   result['DEBUG_LEVEL'] = '%d' % debug
1662   for idx, disk in enumerate(instance.disks):
1663     real_disk = _RecursiveFindBD(disk)
1664     if real_disk is None:
1665       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1666                                     str(disk))
1667     real_disk.Open()
1668     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1669     # FIXME: When disks will have read-only mode, populate this
1670     result['DISK_%d_ACCESS' % idx] = disk.mode
1671     if constants.HV_DISK_TYPE in instance.hvparams:
1672       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1673         instance.hvparams[constants.HV_DISK_TYPE]
1674     if disk.dev_type in constants.LDS_BLOCK:
1675       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1676     elif disk.dev_type == constants.LD_FILE:
1677       result['DISK_%d_BACKEND_TYPE' % idx] = \
1678         'file:%s' % disk.physical_id[0]
1679   for idx, nic in enumerate(instance.nics):
1680     result['NIC_%d_MAC' % idx] = nic.mac
1681     if nic.ip:
1682       result['NIC_%d_IP' % idx] = nic.ip
1683     result['NIC_%d_BRIDGE' % idx] = nic.bridge
1684     if constants.HV_NIC_TYPE in instance.hvparams:
1685       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1686         instance.hvparams[constants.HV_NIC_TYPE]
1687
1688   return result
1689
1690 def BlockdevGrow(disk, amount):
1691   """Grow a stack of block devices.
1692
1693   This function is called recursively, with the childrens being the
1694   first ones to resize.
1695
1696   @type disk: L{objects.Disk}
1697   @param disk: the disk to be grown
1698   @rtype: (status, result)
1699   @return: a tuple with the status of the operation
1700       (True/False), and the errors message if status
1701       is False
1702
1703   """
1704   r_dev = _RecursiveFindBD(disk)
1705   if r_dev is None:
1706     return False, "Cannot find block device %s" % (disk,)
1707
1708   try:
1709     r_dev.Grow(amount)
1710   except errors.BlockDeviceError, err:
1711     return False, str(err)
1712
1713   return True, None
1714
1715
1716 def BlockdevSnapshot(disk):
1717   """Create a snapshot copy of a block device.
1718
1719   This function is called recursively, and the snapshot is actually created
1720   just for the leaf lvm backend device.
1721
1722   @type disk: L{objects.Disk}
1723   @param disk: the disk to be snapshotted
1724   @rtype: string
1725   @return: snapshot disk path
1726
1727   """
1728   if disk.children:
1729     if len(disk.children) == 1:
1730       # only one child, let's recurse on it
1731       return BlockdevSnapshot(disk.children[0])
1732     else:
1733       # more than one child, choose one that matches
1734       for child in disk.children:
1735         if child.size == disk.size:
1736           # return implies breaking the loop
1737           return BlockdevSnapshot(child)
1738   elif disk.dev_type == constants.LD_LV:
1739     r_dev = _RecursiveFindBD(disk)
1740     if r_dev is not None:
1741       # let's stay on the safe side and ask for the full size, for now
1742       return r_dev.Snapshot(disk.size)
1743     else:
1744       return None
1745   else:
1746     raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1747                                  " '%s' of type '%s'" %
1748                                  (disk.unique_id, disk.dev_type))
1749
1750
1751 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1752   """Export a block device snapshot to a remote node.
1753
1754   @type disk: L{objects.Disk}
1755   @param disk: the description of the disk to export
1756   @type dest_node: str
1757   @param dest_node: the destination node to export to
1758   @type instance: L{objects.Instance}
1759   @param instance: the instance object to whom the disk belongs
1760   @type cluster_name: str
1761   @param cluster_name: the cluster name, needed for SSH hostalias
1762   @type idx: int
1763   @param idx: the index of the disk in the instance's disk list,
1764       used to export to the OS scripts environment
1765   @rtype: boolean
1766   @return: the success of the operation
1767
1768   """
1769   export_env = OSEnvironment(instance)
1770
1771   inst_os = OSFromDisk(instance.os)
1772   export_script = inst_os.export_script
1773
1774   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1775                                      instance.name, int(time.time()))
1776   if not os.path.exists(constants.LOG_OS_DIR):
1777     os.mkdir(constants.LOG_OS_DIR, 0750)
1778   real_disk = _RecursiveFindBD(disk)
1779   if real_disk is None:
1780     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1781                                   str(disk))
1782   real_disk.Open()
1783
1784   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1785   export_env['EXPORT_INDEX'] = str(idx)
1786
1787   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1788   destfile = disk.physical_id[1]
1789
1790   # the target command is built out of three individual commands,
1791   # which are joined by pipes; we check each individual command for
1792   # valid parameters
1793   expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1794                                export_script, logfile)
1795
1796   comprcmd = "gzip"
1797
1798   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1799                                 destdir, destdir, destfile)
1800   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1801                                                    constants.GANETI_RUNAS,
1802                                                    destcmd)
1803
1804   # all commands have been checked, so we're safe to combine them
1805   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1806
1807   result = utils.RunCmd(command, env=export_env)
1808
1809   if result.failed:
1810     logging.error("os snapshot export command '%s' returned error: %s"
1811                   " output: %s", command, result.fail_reason, result.output)
1812     return False
1813
1814   return True
1815
1816
1817 def FinalizeExport(instance, snap_disks):
1818   """Write out the export configuration information.
1819
1820   @type instance: L{objects.Instance}
1821   @param instance: the instance which we export, used for
1822       saving configuration
1823   @type snap_disks: list of L{objects.Disk}
1824   @param snap_disks: list of snapshot block devices, which
1825       will be used to get the actual name of the dump file
1826
1827   @rtype: boolean
1828   @return: the success of the operation
1829
1830   """
1831   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1832   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1833
1834   config = objects.SerializableConfigParser()
1835
1836   config.add_section(constants.INISECT_EXP)
1837   config.set(constants.INISECT_EXP, 'version', '0')
1838   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1839   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1840   config.set(constants.INISECT_EXP, 'os', instance.os)
1841   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1842
1843   config.add_section(constants.INISECT_INS)
1844   config.set(constants.INISECT_INS, 'name', instance.name)
1845   config.set(constants.INISECT_INS, 'memory', '%d' %
1846              instance.beparams[constants.BE_MEMORY])
1847   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1848              instance.beparams[constants.BE_VCPUS])
1849   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1850
1851   nic_total = 0
1852   for nic_count, nic in enumerate(instance.nics):
1853     nic_total += 1
1854     config.set(constants.INISECT_INS, 'nic%d_mac' %
1855                nic_count, '%s' % nic.mac)
1856     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1857     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1858                '%s' % nic.bridge)
1859   # TODO: redundant: on load can read nics until it doesn't exist
1860   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1861
1862   disk_total = 0
1863   for disk_count, disk in enumerate(snap_disks):
1864     if disk:
1865       disk_total += 1
1866       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1867                  ('%s' % disk.iv_name))
1868       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1869                  ('%s' % disk.physical_id[1]))
1870       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1871                  ('%d' % disk.size))
1872
1873   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1874
1875   utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1876                   data=config.Dumps())
1877   shutil.rmtree(finaldestdir, True)
1878   shutil.move(destdir, finaldestdir)
1879
1880   return True
1881
1882
1883 def ExportInfo(dest):
1884   """Get export configuration information.
1885
1886   @type dest: str
1887   @param dest: directory containing the export
1888
1889   @rtype: L{objects.SerializableConfigParser}
1890   @return: a serializable config file containing the
1891       export info
1892
1893   """
1894   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1895
1896   config = objects.SerializableConfigParser()
1897   config.read(cff)
1898
1899   if (not config.has_section(constants.INISECT_EXP) or
1900       not config.has_section(constants.INISECT_INS)):
1901     return None
1902
1903   return config
1904
1905
1906 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1907   """Import an os image into an instance.
1908
1909   @type instance: L{objects.Instance}
1910   @param instance: instance to import the disks into
1911   @type src_node: string
1912   @param src_node: source node for the disk images
1913   @type src_images: list of string
1914   @param src_images: absolute paths of the disk images
1915   @rtype: list of boolean
1916   @return: each boolean represent the success of importing the n-th disk
1917
1918   """
1919   import_env = OSEnvironment(instance)
1920   inst_os = OSFromDisk(instance.os)
1921   import_script = inst_os.import_script
1922
1923   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1924                                         instance.name, int(time.time()))
1925   if not os.path.exists(constants.LOG_OS_DIR):
1926     os.mkdir(constants.LOG_OS_DIR, 0750)
1927
1928   comprcmd = "gunzip"
1929   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1930                                import_script, logfile)
1931
1932   final_result = []
1933   for idx, image in enumerate(src_images):
1934     if image:
1935       destcmd = utils.BuildShellCmd('cat %s', image)
1936       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1937                                                        constants.GANETI_RUNAS,
1938                                                        destcmd)
1939       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1940       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1941       import_env['IMPORT_INDEX'] = str(idx)
1942       result = utils.RunCmd(command, env=import_env)
1943       if result.failed:
1944         logging.error("Disk import command '%s' returned error: %s"
1945                       " output: %s", command, result.fail_reason,
1946                       result.output)
1947         final_result.append(False)
1948       else:
1949         final_result.append(True)
1950     else:
1951       final_result.append(True)
1952
1953   return final_result
1954
1955
1956 def ListExports():
1957   """Return a list of exports currently available on this machine.
1958
1959   @rtype: list
1960   @return: list of the exports
1961
1962   """
1963   if os.path.isdir(constants.EXPORT_DIR):
1964     return utils.ListVisibleFiles(constants.EXPORT_DIR)
1965   else:
1966     return []
1967
1968
1969 def RemoveExport(export):
1970   """Remove an existing export from the node.
1971
1972   @type export: str
1973   @param export: the name of the export to remove
1974   @rtype: boolean
1975   @return: the success of the operation
1976
1977   """
1978   target = os.path.join(constants.EXPORT_DIR, export)
1979
1980   shutil.rmtree(target)
1981   # TODO: catch some of the relevant exceptions and provide a pretty
1982   # error message if rmtree fails.
1983
1984   return True
1985
1986
1987 def BlockdevRename(devlist):
1988   """Rename a list of block devices.
1989
1990   @type devlist: list of tuples
1991   @param devlist: list of tuples of the form  (disk,
1992       new_logical_id, new_physical_id); disk is an
1993       L{objects.Disk} object describing the current disk,
1994       and new logical_id/physical_id is the name we
1995       rename it to
1996   @rtype: boolean
1997   @return: True if all renames succeeded, False otherwise
1998
1999   """
2000   result = True
2001   for disk, unique_id in devlist:
2002     dev = _RecursiveFindBD(disk)
2003     if dev is None:
2004       result = False
2005       continue
2006     try:
2007       old_rpath = dev.dev_path
2008       dev.Rename(unique_id)
2009       new_rpath = dev.dev_path
2010       if old_rpath != new_rpath:
2011         DevCacheManager.RemoveCache(old_rpath)
2012         # FIXME: we should add the new cache information here, like:
2013         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2014         # but we don't have the owner here - maybe parse from existing
2015         # cache? for now, we only lose lvm data when we rename, which
2016         # is less critical than DRBD or MD
2017     except errors.BlockDeviceError, err:
2018       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2019       result = False
2020   return result
2021
2022
2023 def _TransformFileStorageDir(file_storage_dir):
2024   """Checks whether given file_storage_dir is valid.
2025
2026   Checks wheter the given file_storage_dir is within the cluster-wide
2027   default file_storage_dir stored in SimpleStore. Only paths under that
2028   directory are allowed.
2029
2030   @type file_storage_dir: str
2031   @param file_storage_dir: the path to check
2032
2033   @return: the normalized path if valid, None otherwise
2034
2035   """
2036   cfg = _GetConfig()
2037   file_storage_dir = os.path.normpath(file_storage_dir)
2038   base_file_storage_dir = cfg.GetFileStorageDir()
2039   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2040       base_file_storage_dir):
2041     logging.error("file storage directory '%s' is not under base file"
2042                   " storage directory '%s'",
2043                   file_storage_dir, base_file_storage_dir)
2044     return None
2045   return file_storage_dir
2046
2047
2048 def CreateFileStorageDir(file_storage_dir):
2049   """Create file storage directory.
2050
2051   @type file_storage_dir: str
2052   @param file_storage_dir: directory to create
2053
2054   @rtype: tuple
2055   @return: tuple with first element a boolean indicating wheter dir
2056       creation was successful or not
2057
2058   """
2059   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2060   result = True,
2061   if not file_storage_dir:
2062     result = False,
2063   else:
2064     if os.path.exists(file_storage_dir):
2065       if not os.path.isdir(file_storage_dir):
2066         logging.error("'%s' is not a directory", file_storage_dir)
2067         result = False,
2068     else:
2069       try:
2070         os.makedirs(file_storage_dir, 0750)
2071       except OSError, err:
2072         logging.error("Cannot create file storage directory '%s': %s",
2073                       file_storage_dir, err)
2074         result = False,
2075   return result
2076
2077
2078 def RemoveFileStorageDir(file_storage_dir):
2079   """Remove file storage directory.
2080
2081   Remove it only if it's empty. If not log an error and return.
2082
2083   @type file_storage_dir: str
2084   @param file_storage_dir: the directory we should cleanup
2085   @rtype: tuple (success,)
2086   @return: tuple of one element, C{success}, denoting
2087       whether the operation was successfull
2088
2089   """
2090   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2091   result = True,
2092   if not file_storage_dir:
2093     result = False,
2094   else:
2095     if os.path.exists(file_storage_dir):
2096       if not os.path.isdir(file_storage_dir):
2097         logging.error("'%s' is not a directory", file_storage_dir)
2098         result = False,
2099       # deletes dir only if empty, otherwise we want to return False
2100       try:
2101         os.rmdir(file_storage_dir)
2102       except OSError, err:
2103         logging.exception("Cannot remove file storage directory '%s'",
2104                           file_storage_dir)
2105         result = False,
2106   return result
2107
2108
2109 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2110   """Rename the file storage directory.
2111
2112   @type old_file_storage_dir: str
2113   @param old_file_storage_dir: the current path
2114   @type new_file_storage_dir: str
2115   @param new_file_storage_dir: the name we should rename to
2116   @rtype: tuple (success,)
2117   @return: tuple of one element, C{success}, denoting
2118       whether the operation was successful
2119
2120   """
2121   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2122   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2123   result = True,
2124   if not old_file_storage_dir or not new_file_storage_dir:
2125     result = False,
2126   else:
2127     if not os.path.exists(new_file_storage_dir):
2128       if os.path.isdir(old_file_storage_dir):
2129         try:
2130           os.rename(old_file_storage_dir, new_file_storage_dir)
2131         except OSError, err:
2132           logging.exception("Cannot rename '%s' to '%s'",
2133                             old_file_storage_dir, new_file_storage_dir)
2134           result =  False,
2135       else:
2136         logging.error("'%s' is not a directory", old_file_storage_dir)
2137         result = False,
2138     else:
2139       if os.path.exists(old_file_storage_dir):
2140         logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
2141                       old_file_storage_dir, new_file_storage_dir)
2142         result = False,
2143   return result
2144
2145
2146 def _IsJobQueueFile(file_name):
2147   """Checks whether the given filename is in the queue directory.
2148
2149   @type file_name: str
2150   @param file_name: the file name we should check
2151   @rtype: boolean
2152   @return: whether the file is under the queue directory
2153
2154   """
2155   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2156   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2157
2158   if not result:
2159     logging.error("'%s' is not a file in the queue directory",
2160                   file_name)
2161
2162   return result
2163
2164
2165 def JobQueueUpdate(file_name, content):
2166   """Updates a file in the queue directory.
2167
2168   This is just a wrapper over L{utils.WriteFile}, with proper
2169   checking.
2170
2171   @type file_name: str
2172   @param file_name: the job file name
2173   @type content: str
2174   @param content: the new job contents
2175   @rtype: boolean
2176   @return: the success of the operation
2177
2178   """
2179   if not _IsJobQueueFile(file_name):
2180     return False
2181
2182   # Write and replace the file atomically
2183   utils.WriteFile(file_name, data=_Decompress(content))
2184
2185   return True
2186
2187
2188 def JobQueueRename(old, new):
2189   """Renames a job queue file.
2190
2191   This is just a wrapper over os.rename with proper checking.
2192
2193   @type old: str
2194   @param old: the old (actual) file name
2195   @type new: str
2196   @param new: the desired file name
2197   @rtype: boolean
2198   @return: the success of the operation
2199
2200   """
2201   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2202     return False
2203
2204   utils.RenameFile(old, new, mkdir=True)
2205
2206   return True
2207
2208
2209 def JobQueueSetDrainFlag(drain_flag):
2210   """Set the drain flag for the queue.
2211
2212   This will set or unset the queue drain flag.
2213
2214   @type drain_flag: boolean
2215   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2216   @rtype: boolean
2217   @return: always True
2218   @warning: the function always returns True
2219
2220   """
2221   if drain_flag:
2222     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2223   else:
2224     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2225
2226   return True
2227
2228
2229 def BlockdevClose(instance_name, disks):
2230   """Closes the given block devices.
2231
2232   This means they will be switched to secondary mode (in case of
2233   DRBD).
2234
2235   @param instance_name: if the argument is not empty, the symlinks
2236       of this instance will be removed
2237   @type disks: list of L{objects.Disk}
2238   @param disks: the list of disks to be closed
2239   @rtype: tuple (success, message)
2240   @return: a tuple of success and message, where success
2241       indicates the succes of the operation, and message
2242       which will contain the error details in case we
2243       failed
2244
2245   """
2246   bdevs = []
2247   for cf in disks:
2248     rd = _RecursiveFindBD(cf)
2249     if rd is None:
2250       return (False, "Can't find device %s" % cf)
2251     bdevs.append(rd)
2252
2253   msg = []
2254   for rd in bdevs:
2255     try:
2256       rd.Close()
2257     except errors.BlockDeviceError, err:
2258       msg.append(str(err))
2259   if msg:
2260     return (False, "Can't make devices secondary: %s" % ",".join(msg))
2261   else:
2262     if instance_name:
2263       _RemoveBlockDevLinks(instance_name, disks)
2264     return (True, "All devices secondary")
2265
2266
2267 def ValidateHVParams(hvname, hvparams):
2268   """Validates the given hypervisor parameters.
2269
2270   @type hvname: string
2271   @param hvname: the hypervisor name
2272   @type hvparams: dict
2273   @param hvparams: the hypervisor parameters to be validated
2274   @rtype: tuple (success, message)
2275   @return: a tuple of success and message, where success
2276       indicates the succes of the operation, and message
2277       which will contain the error details in case we
2278       failed
2279
2280   """
2281   try:
2282     hv_type = hypervisor.GetHypervisor(hvname)
2283     hv_type.ValidateParameters(hvparams)
2284     return (True, "Validation passed")
2285   except errors.HypervisorError, err:
2286     return (False, str(err))
2287
2288
2289 def DemoteFromMC():
2290   """Demotes the current node from master candidate role.
2291
2292   """
2293   # try to ensure we're not the master by mistake
2294   master, myself = ssconf.GetMasterAndMyself()
2295   if master == myself:
2296     return (False, "ssconf status shows I'm the master node, will not demote")
2297   pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2298   if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2299     return (False, "The master daemon is running, will not demote")
2300   try:
2301     utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2302   except EnvironmentError, err:
2303     if err.errno != errno.ENOENT:
2304       return (False, "Error while backing up cluster file: %s" % str(err))
2305   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2306   return (True, "Done")
2307
2308
2309 def _FindDisks(nodes_ip, disks):
2310   """Sets the physical ID on disks and returns the block devices.
2311
2312   """
2313   # set the correct physical ID
2314   my_name = utils.HostInfo().name
2315   for cf in disks:
2316     cf.SetPhysicalID(my_name, nodes_ip)
2317
2318   bdevs = []
2319
2320   for cf in disks:
2321     rd = _RecursiveFindBD(cf)
2322     if rd is None:
2323       return (False, "Can't find device %s" % cf)
2324     bdevs.append(rd)
2325   return (True, bdevs)
2326
2327
2328 def DrbdDisconnectNet(nodes_ip, disks):
2329   """Disconnects the network on a list of drbd devices.
2330
2331   """
2332   status, bdevs = _FindDisks(nodes_ip, disks)
2333   if not status:
2334     return status, bdevs
2335
2336   # disconnect disks
2337   for rd in bdevs:
2338     try:
2339       rd.DisconnectNet()
2340     except errors.BlockDeviceError, err:
2341       logging.exception("Failed to go into standalone mode")
2342       return (False, "Can't change network configuration: %s" % str(err))
2343   return (True, "All disks are now disconnected")
2344
2345
2346 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2347   """Attaches the network on a list of drbd devices.
2348
2349   """
2350   status, bdevs = _FindDisks(nodes_ip, disks)
2351   if not status:
2352     return status, bdevs
2353
2354   if multimaster:
2355     for idx, rd in enumerate(bdevs):
2356       try:
2357         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2358       except EnvironmentError, err:
2359         return (False, "Can't create symlink: %s" % str(err))
2360   # reconnect disks, switch to new master configuration and if
2361   # needed primary mode
2362   for rd in bdevs:
2363     try:
2364       rd.AttachNet(multimaster)
2365     except errors.BlockDeviceError, err:
2366       return (False, "Can't change network configuration: %s" % str(err))
2367   # wait until the disks are connected; we need to retry the re-attach
2368   # if the device becomes standalone, as this might happen if the one
2369   # node disconnects and reconnects in a different mode before the
2370   # other node reconnects; in this case, one or both of the nodes will
2371   # decide it has wrong configuration and switch to standalone
2372   RECONNECT_TIMEOUT = 2 * 60
2373   sleep_time = 0.100 # start with 100 miliseconds
2374   timeout_limit = time.time() + RECONNECT_TIMEOUT
2375   while time.time() < timeout_limit:
2376     all_connected = True
2377     for rd in bdevs:
2378       stats = rd.GetProcStatus()
2379       if not (stats.is_connected or stats.is_in_resync):
2380         all_connected = False
2381       if stats.is_standalone:
2382         # peer had different config info and this node became
2383         # standalone, even though this should not happen with the
2384         # new staged way of changing disk configs
2385         try:
2386           rd.ReAttachNet(multimaster)
2387         except errors.BlockDeviceError, err:
2388           return (False, "Can't change network configuration: %s" % str(err))
2389     if all_connected:
2390       break
2391     time.sleep(sleep_time)
2392     sleep_time = min(5, sleep_time * 1.5)
2393   if not all_connected:
2394     return (False, "Timeout in disk reconnecting")
2395   if multimaster:
2396     # change to primary mode
2397     for rd in bdevs:
2398       try:
2399         rd.Open()
2400       except errors.BlockDeviceError, err:
2401         return (False, "Can't change to primary mode: %s" % str(err))
2402   if multimaster:
2403     msg = "multi-master and primary"
2404   else:
2405     msg = "single-master"
2406   return (True, "Disks are now configured as %s" % msg)
2407
2408
2409 def DrbdWaitSync(nodes_ip, disks):
2410   """Wait until DRBDs have synchronized.
2411
2412   """
2413   status, bdevs = _FindDisks(nodes_ip, disks)
2414   if not status:
2415     return status, bdevs
2416
2417   min_resync = 100
2418   alldone = True
2419   failure = False
2420   for rd in bdevs:
2421     stats = rd.GetProcStatus()
2422     if not (stats.is_connected or stats.is_in_resync):
2423       failure = True
2424       break
2425     alldone = alldone and (not stats.is_in_resync)
2426     if stats.sync_percent is not None:
2427       min_resync = min(min_resync, stats.sync_percent)
2428   return (not failure, (alldone, min_resync))
2429
2430
2431 class HooksRunner(object):
2432   """Hook runner.
2433
2434   This class is instantiated on the node side (ganeti-noded) and not
2435   on the master side.
2436
2437   """
2438   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2439
2440   def __init__(self, hooks_base_dir=None):
2441     """Constructor for hooks runner.
2442
2443     @type hooks_base_dir: str or None
2444     @param hooks_base_dir: if not None, this overrides the
2445         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2446
2447     """
2448     if hooks_base_dir is None:
2449       hooks_base_dir = constants.HOOKS_BASE_DIR
2450     self._BASE_DIR = hooks_base_dir
2451
2452   @staticmethod
2453   def ExecHook(script, env):
2454     """Exec one hook script.
2455
2456     @type script: str
2457     @param script: the full path to the script
2458     @type env: dict
2459     @param env: the environment with which to exec the script
2460     @rtype: tuple (success, message)
2461     @return: a tuple of success and message, where success
2462         indicates the succes of the operation, and message
2463         which will contain the error details in case we
2464         failed
2465
2466     """
2467     # exec the process using subprocess and log the output
2468     fdstdin = None
2469     try:
2470       fdstdin = open("/dev/null", "r")
2471       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2472                                stderr=subprocess.STDOUT, close_fds=True,
2473                                shell=False, cwd="/", env=env)
2474       output = ""
2475       try:
2476         output = child.stdout.read(4096)
2477         child.stdout.close()
2478       except EnvironmentError, err:
2479         output += "Hook script error: %s" % str(err)
2480
2481       while True:
2482         try:
2483           result = child.wait()
2484           break
2485         except EnvironmentError, err:
2486           if err.errno == errno.EINTR:
2487             continue
2488           raise
2489     finally:
2490       # try not to leak fds
2491       for fd in (fdstdin, ):
2492         if fd is not None:
2493           try:
2494             fd.close()
2495           except EnvironmentError, err:
2496             # just log the error
2497             #logging.exception("Error while closing fd %s", fd)
2498             pass
2499
2500     return result == 0, utils.SafeEncode(output.strip())
2501
2502   def RunHooks(self, hpath, phase, env):
2503     """Run the scripts in the hooks directory.
2504
2505     @type hpath: str
2506     @param hpath: the path to the hooks directory which
2507         holds the scripts
2508     @type phase: str
2509     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2510         L{constants.HOOKS_PHASE_POST}
2511     @type env: dict
2512     @param env: dictionary with the environment for the hook
2513     @rtype: list
2514     @return: list of 3-element tuples:
2515       - script path
2516       - script result, either L{constants.HKR_SUCCESS} or
2517         L{constants.HKR_FAIL}
2518       - output of the script
2519
2520     @raise errors.ProgrammerError: for invalid input
2521         parameters
2522
2523     """
2524     if phase == constants.HOOKS_PHASE_PRE:
2525       suffix = "pre"
2526     elif phase == constants.HOOKS_PHASE_POST:
2527       suffix = "post"
2528     else:
2529       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2530     rr = []
2531
2532     subdir = "%s-%s.d" % (hpath, suffix)
2533     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2534     try:
2535       dir_contents = utils.ListVisibleFiles(dir_name)
2536     except OSError, err:
2537       # FIXME: must log output in case of failures
2538       return rr
2539
2540     # we use the standard python sort order,
2541     # so 00name is the recommended naming scheme
2542     dir_contents.sort()
2543     for relname in dir_contents:
2544       fname = os.path.join(dir_name, relname)
2545       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2546           self.RE_MASK.match(relname) is not None):
2547         rrval = constants.HKR_SKIP
2548         output = ""
2549       else:
2550         result, output = self.ExecHook(fname, env)
2551         if not result:
2552           rrval = constants.HKR_FAIL
2553         else:
2554           rrval = constants.HKR_SUCCESS
2555       rr.append(("%s/%s" % (subdir, relname), rrval, output))
2556
2557     return rr
2558
2559
2560 class IAllocatorRunner(object):
2561   """IAllocator runner.
2562
2563   This class is instantiated on the node side (ganeti-noded) and not on
2564   the master side.
2565
2566   """
2567   def Run(self, name, idata):
2568     """Run an iallocator script.
2569
2570     @type name: str
2571     @param name: the iallocator script name
2572     @type idata: str
2573     @param idata: the allocator input data
2574
2575     @rtype: tuple
2576     @return: four element tuple of:
2577        - run status (one of the IARUN_ constants)
2578        - stdout
2579        - stderr
2580        - fail reason (as from L{utils.RunResult})
2581
2582     """
2583     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2584                                   os.path.isfile)
2585     if alloc_script is None:
2586       return (constants.IARUN_NOTFOUND, None, None, None)
2587
2588     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2589     try:
2590       os.write(fd, idata)
2591       os.close(fd)
2592       result = utils.RunCmd([alloc_script, fin_name])
2593       if result.failed:
2594         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2595                 result.fail_reason)
2596     finally:
2597       os.unlink(fin_name)
2598
2599     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2600
2601
2602 class DevCacheManager(object):
2603   """Simple class for managing a cache of block device information.
2604
2605   """
2606   _DEV_PREFIX = "/dev/"
2607   _ROOT_DIR = constants.BDEV_CACHE_DIR
2608
2609   @classmethod
2610   def _ConvertPath(cls, dev_path):
2611     """Converts a /dev/name path to the cache file name.
2612
2613     This replaces slashes with underscores and strips the /dev
2614     prefix. It then returns the full path to the cache file.
2615
2616     @type dev_path: str
2617     @param dev_path: the C{/dev/} path name
2618     @rtype: str
2619     @return: the converted path name
2620
2621     """
2622     if dev_path.startswith(cls._DEV_PREFIX):
2623       dev_path = dev_path[len(cls._DEV_PREFIX):]
2624     dev_path = dev_path.replace("/", "_")
2625     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2626     return fpath
2627
2628   @classmethod
2629   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2630     """Updates the cache information for a given device.
2631
2632     @type dev_path: str
2633     @param dev_path: the pathname of the device
2634     @type owner: str
2635     @param owner: the owner (instance name) of the device
2636     @type on_primary: bool
2637     @param on_primary: whether this is the primary
2638         node nor not
2639     @type iv_name: str
2640     @param iv_name: the instance-visible name of the
2641         device, as in objects.Disk.iv_name
2642
2643     @rtype: None
2644
2645     """
2646     if dev_path is None:
2647       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2648       return
2649     fpath = cls._ConvertPath(dev_path)
2650     if on_primary:
2651       state = "primary"
2652     else:
2653       state = "secondary"
2654     if iv_name is None:
2655       iv_name = "not_visible"
2656     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2657     try:
2658       utils.WriteFile(fpath, data=fdata)
2659     except EnvironmentError, err:
2660       logging.exception("Can't update bdev cache for %s", dev_path)
2661
2662   @classmethod
2663   def RemoveCache(cls, dev_path):
2664     """Remove data for a dev_path.
2665
2666     This is just a wrapper over L{utils.RemoveFile} with a converted
2667     path name and logging.
2668
2669     @type dev_path: str
2670     @param dev_path: the pathname of the device
2671
2672     @rtype: None
2673
2674     """
2675     if dev_path is None:
2676       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2677       return
2678     fpath = cls._ConvertPath(dev_path)
2679     try:
2680       utils.RemoveFile(fpath)
2681     except EnvironmentError, err:
2682       logging.exception("Can't update bdev cache for %s", dev_path)