Rename the device type constants
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36 import zlib
37 import base64
38
39 from ganeti import errors
40 from ganeti import utils
41 from ganeti import ssh
42 from ganeti import hypervisor
43 from ganeti import constants
44 from ganeti import bdev
45 from ganeti import objects
46 from ganeti import ssconf
47
48
49 def _GetConfig():
50   """Simple wrapper to return a SimpleStore.
51
52   @rtype: L{ssconf.SimpleStore}
53   @return: a SimpleStore instance
54
55   """
56   return ssconf.SimpleStore()
57
58
59 def _GetSshRunner(cluster_name):
60   """Simple wrapper to return an SshRunner.
61
62   @type cluster_name: str
63   @param cluster_name: the cluster name, which is needed
64       by the SshRunner constructor
65   @rtype: L{ssh.SshRunner}
66   @return: an SshRunner instance
67
68   """
69   return ssh.SshRunner(cluster_name)
70
71
72 def _Decompress(data):
73   """Unpacks data compressed by the RPC client.
74
75   @type data: list or tuple
76   @param data: Data sent by RPC client
77   @rtype: str
78   @return: Decompressed data
79
80   """
81   assert isinstance(data, (list, tuple))
82   assert len(data) == 2
83   (encoding, content) = data
84   if encoding == constants.RPC_ENCODING_NONE:
85     return content
86   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
87     return zlib.decompress(base64.b64decode(content))
88   else:
89     raise AssertionError("Unknown data encoding")
90
91
92 def _CleanDirectory(path, exclude=None):
93   """Removes all regular files in a directory.
94
95   @type path: str
96   @param path: the directory to clean
97   @type exclude: list
98   @param exclude: list of files to be excluded, defaults
99       to the empty list
100
101   """
102   if not os.path.isdir(path):
103     return
104   if exclude is None:
105     exclude = []
106   else:
107     # Normalize excluded paths
108     exclude = [os.path.normpath(i) for i in exclude]
109
110   for rel_name in utils.ListVisibleFiles(path):
111     full_name = os.path.normpath(os.path.join(path, rel_name))
112     if full_name in exclude:
113       continue
114     if os.path.isfile(full_name) and not os.path.islink(full_name):
115       utils.RemoveFile(full_name)
116
117
118 def JobQueuePurge():
119   """Removes job queue files and archived jobs.
120
121   @rtype: None
122
123   """
124   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
125   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
126
127
128 def GetMasterInfo():
129   """Returns master information.
130
131   This is an utility function to compute master information, either
132   for consumption here or from the node daemon.
133
134   @rtype: tuple
135   @return: (master_netdev, master_ip, master_name) if we have a good
136       configuration, otherwise (None, None, None)
137
138   """
139   try:
140     cfg = _GetConfig()
141     master_netdev = cfg.GetMasterNetdev()
142     master_ip = cfg.GetMasterIP()
143     master_node = cfg.GetMasterNode()
144   except errors.ConfigurationError, err:
145     logging.exception("Cluster configuration incomplete")
146     return (None, None, None)
147   return (master_netdev, master_ip, master_node)
148
149
150 def StartMaster(start_daemons):
151   """Activate local node as master node.
152
153   The function will always try activate the IP address of the master
154   (unless someone else has it). It will also start the master daemons,
155   based on the start_daemons parameter.
156
157   @type start_daemons: boolean
158   @param start_daemons: whther to also start the master
159       daemons (ganeti-masterd and ganeti-rapi)
160   @rtype: None
161
162   """
163   ok = True
164   master_netdev, master_ip, _ = GetMasterInfo()
165   if not master_netdev:
166     return False
167
168   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
169     if utils.OwnIpAddress(master_ip):
170       # we already have the ip:
171       logging.debug("Already started")
172     else:
173       logging.error("Someone else has the master ip, not activating")
174       ok = False
175   else:
176     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
177                            "dev", master_netdev, "label",
178                            "%s:0" % master_netdev])
179     if result.failed:
180       logging.error("Can't activate master IP: %s", result.output)
181       ok = False
182
183     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
184                            "-s", master_ip, master_ip])
185     # we'll ignore the exit code of arping
186
187   # and now start the master and rapi daemons
188   if start_daemons:
189     for daemon in 'ganeti-masterd', 'ganeti-rapi':
190       result = utils.RunCmd([daemon])
191       if result.failed:
192         logging.error("Can't start daemon %s: %s", daemon, result.output)
193         ok = False
194   return ok
195
196
197 def StopMaster(stop_daemons):
198   """Deactivate this node as master.
199
200   The function will always try to deactivate the IP address of the
201   master. It will also stop the master daemons depending on the
202   stop_daemons parameter.
203
204   @type stop_daemons: boolean
205   @param stop_daemons: whether to also stop the master daemons
206       (ganeti-masterd and ganeti-rapi)
207   @rtype: None
208
209   """
210   master_netdev, master_ip, _ = GetMasterInfo()
211   if not master_netdev:
212     return False
213
214   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
215                          "dev", master_netdev])
216   if result.failed:
217     logging.error("Can't remove the master IP, error: %s", result.output)
218     # but otherwise ignore the failure
219
220   if stop_daemons:
221     # stop/kill the rapi and the master daemon
222     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
223       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
224
225   return True
226
227
228 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
229   """Joins this node to the cluster.
230
231   This does the following:
232       - updates the hostkeys of the machine (rsa and dsa)
233       - adds the ssh private key to the user
234       - adds the ssh public key to the users' authorized_keys file
235
236   @type dsa: str
237   @param dsa: the DSA private key to write
238   @type dsapub: str
239   @param dsapub: the DSA public key to write
240   @type rsa: str
241   @param rsa: the RSA private key to write
242   @type rsapub: str
243   @param rsapub: the RSA public key to write
244   @type sshkey: str
245   @param sshkey: the SSH private key to write
246   @type sshpub: str
247   @param sshpub: the SSH public key to write
248   @rtype: boolean
249   @return: the success of the operation
250
251   """
252   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
253                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
254                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
255                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
256   for name, content, mode in sshd_keys:
257     utils.WriteFile(name, data=content, mode=mode)
258
259   try:
260     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
261                                                     mkdir=True)
262   except errors.OpExecError, err:
263     msg = "Error while processing user ssh files"
264     logging.exception(msg)
265     return (False, "%s: %s" % (msg, err))
266
267   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
268     utils.WriteFile(name, data=content, mode=0600)
269
270   utils.AddAuthorizedKey(auth_keys, sshpub)
271
272   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
273
274   return (True, "Node added successfully")
275
276
277 def LeaveCluster():
278   """Cleans up and remove the current node.
279
280   This function cleans up and prepares the current node to be removed
281   from the cluster.
282
283   If processing is successful, then it raises an
284   L{errors.QuitGanetiException} which is used as a special case to
285   shutdown the node daemon.
286
287   """
288   _CleanDirectory(constants.DATA_DIR)
289   JobQueuePurge()
290
291   try:
292     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
293   except errors.OpExecError:
294     logging.exception("Error while processing ssh files")
295     return
296
297   f = open(pub_key, 'r')
298   try:
299     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
300   finally:
301     f.close()
302
303   utils.RemoveFile(priv_key)
304   utils.RemoveFile(pub_key)
305
306   # Return a reassuring string to the caller, and quit
307   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
308
309
310 def GetNodeInfo(vgname, hypervisor_type):
311   """Gives back a hash with different informations about the node.
312
313   @type vgname: C{string}
314   @param vgname: the name of the volume group to ask for disk space information
315   @type hypervisor_type: C{str}
316   @param hypervisor_type: the name of the hypervisor to ask for
317       memory information
318   @rtype: C{dict}
319   @return: dictionary with the following keys:
320       - vg_size is the size of the configured volume group in MiB
321       - vg_free is the free size of the volume group in MiB
322       - memory_dom0 is the memory allocated for domain0 in MiB
323       - memory_free is the currently available (free) ram in MiB
324       - memory_total is the total number of ram in MiB
325
326   """
327   outputarray = {}
328   vginfo = _GetVGInfo(vgname)
329   outputarray['vg_size'] = vginfo['vg_size']
330   outputarray['vg_free'] = vginfo['vg_free']
331
332   hyper = hypervisor.GetHypervisor(hypervisor_type)
333   hyp_info = hyper.GetNodeInfo()
334   if hyp_info is not None:
335     outputarray.update(hyp_info)
336
337   f = open("/proc/sys/kernel/random/boot_id", 'r')
338   try:
339     outputarray["bootid"] = f.read(128).rstrip("\n")
340   finally:
341     f.close()
342
343   return outputarray
344
345
346 def VerifyNode(what, cluster_name):
347   """Verify the status of the local node.
348
349   Based on the input L{what} parameter, various checks are done on the
350   local node.
351
352   If the I{filelist} key is present, this list of
353   files is checksummed and the file/checksum pairs are returned.
354
355   If the I{nodelist} key is present, we check that we have
356   connectivity via ssh with the target nodes (and check the hostname
357   report).
358
359   If the I{node-net-test} key is present, we check that we have
360   connectivity to the given nodes via both primary IP and, if
361   applicable, secondary IPs.
362
363   @type what: C{dict}
364   @param what: a dictionary of things to check:
365       - filelist: list of files for which to compute checksums
366       - nodelist: list of nodes we should check ssh communication with
367       - node-net-test: list of nodes we should check node daemon port
368         connectivity with
369       - hypervisor: list with hypervisors to run the verify for
370   @rtype: dict
371   @return: a dictionary with the same keys as the input dict, and
372       values representing the result of the checks
373
374   """
375   result = {}
376
377   if constants.NV_HYPERVISOR in what:
378     result[constants.NV_HYPERVISOR] = tmp = {}
379     for hv_name in what[constants.NV_HYPERVISOR]:
380       tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
381
382   if constants.NV_FILELIST in what:
383     result[constants.NV_FILELIST] = utils.FingerprintFiles(
384       what[constants.NV_FILELIST])
385
386   if constants.NV_NODELIST in what:
387     result[constants.NV_NODELIST] = tmp = {}
388     random.shuffle(what[constants.NV_NODELIST])
389     for node in what[constants.NV_NODELIST]:
390       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
391       if not success:
392         tmp[node] = message
393
394   if constants.NV_NODENETTEST in what:
395     result[constants.NV_NODENETTEST] = tmp = {}
396     my_name = utils.HostInfo().name
397     my_pip = my_sip = None
398     for name, pip, sip in what[constants.NV_NODENETTEST]:
399       if name == my_name:
400         my_pip = pip
401         my_sip = sip
402         break
403     if not my_pip:
404       tmp[my_name] = ("Can't find my own primary/secondary IP"
405                       " in the node list")
406     else:
407       port = utils.GetNodeDaemonPort()
408       for name, pip, sip in what[constants.NV_NODENETTEST]:
409         fail = []
410         if not utils.TcpPing(pip, port, source=my_pip):
411           fail.append("primary")
412         if sip != pip:
413           if not utils.TcpPing(sip, port, source=my_sip):
414             fail.append("secondary")
415         if fail:
416           tmp[name] = ("failure using the %s interface(s)" %
417                        " and ".join(fail))
418
419   if constants.NV_LVLIST in what:
420     result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
421
422   if constants.NV_INSTANCELIST in what:
423     result[constants.NV_INSTANCELIST] = GetInstanceList(
424       what[constants.NV_INSTANCELIST])
425
426   if constants.NV_VGLIST in what:
427     result[constants.NV_VGLIST] = ListVolumeGroups()
428
429   if constants.NV_VERSION in what:
430     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
431                                     constants.RELEASE_VERSION)
432
433   if constants.NV_HVINFO in what:
434     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
435     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
436
437   if constants.NV_DRBDLIST in what:
438     try:
439       used_minors = bdev.DRBD8.GetUsedDevs().keys()
440     except errors.BlockDeviceError:
441       logging.warning("Can't get used minors list", exc_info=True)
442       used_minors = []
443     result[constants.NV_DRBDLIST] = used_minors
444
445   return result
446
447
448 def GetVolumeList(vg_name):
449   """Compute list of logical volumes and their size.
450
451   @type vg_name: str
452   @param vg_name: the volume group whose LVs we should list
453   @rtype: dict
454   @return:
455       dictionary of all partions (key) with value being a tuple of
456       their size (in MiB), inactive and online status::
457
458         {'test1': ('20.06', True, True)}
459
460       in case of errors, a string is returned with the error
461       details.
462
463   """
464   lvs = {}
465   sep = '|'
466   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
467                          "--separator=%s" % sep,
468                          "-olv_name,lv_size,lv_attr", vg_name])
469   if result.failed:
470     logging.error("Failed to list logical volumes, lvs output: %s",
471                   result.output)
472     return result.output
473
474   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
475   for line in result.stdout.splitlines():
476     line = line.strip()
477     match = valid_line_re.match(line)
478     if not match:
479       logging.error("Invalid line returned from lvs output: '%s'", line)
480       continue
481     name, size, attr = match.groups()
482     inactive = attr[4] == '-'
483     online = attr[5] == 'o'
484     lvs[name] = (size, inactive, online)
485
486   return lvs
487
488
489 def ListVolumeGroups():
490   """List the volume groups and their size.
491
492   @rtype: dict
493   @return: dictionary with keys volume name and values the
494       size of the volume
495
496   """
497   return utils.ListVolumeGroups()
498
499
500 def NodeVolumes():
501   """List all volumes on this node.
502
503   @rtype: list
504   @return:
505     A list of dictionaries, each having four keys:
506       - name: the logical volume name,
507       - size: the size of the logical volume
508       - dev: the physical device on which the LV lives
509       - vg: the volume group to which it belongs
510
511     In case of errors, we return an empty list and log the
512     error.
513
514     Note that since a logical volume can live on multiple physical
515     volumes, the resulting list might include a logical volume
516     multiple times.
517
518   """
519   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
520                          "--separator=|",
521                          "--options=lv_name,lv_size,devices,vg_name"])
522   if result.failed:
523     logging.error("Failed to list logical volumes, lvs output: %s",
524                   result.output)
525     return []
526
527   def parse_dev(dev):
528     if '(' in dev:
529       return dev.split('(')[0]
530     else:
531       return dev
532
533   def map_line(line):
534     return {
535       'name': line[0].strip(),
536       'size': line[1].strip(),
537       'dev': parse_dev(line[2].strip()),
538       'vg': line[3].strip(),
539     }
540
541   return [map_line(line.split('|')) for line in result.stdout.splitlines()
542           if line.count('|') >= 3]
543
544
545 def BridgesExist(bridges_list):
546   """Check if a list of bridges exist on the current node.
547
548   @rtype: boolean
549   @return: C{True} if all of them exist, C{False} otherwise
550
551   """
552   for bridge in bridges_list:
553     if not utils.BridgeExists(bridge):
554       return False
555
556   return True
557
558
559 def GetInstanceList(hypervisor_list):
560   """Provides a list of instances.
561
562   @type hypervisor_list: list
563   @param hypervisor_list: the list of hypervisors to query information
564
565   @rtype: list
566   @return: a list of all running instances on the current node
567     - instance1.example.com
568     - instance2.example.com
569
570   """
571   results = []
572   for hname in hypervisor_list:
573     try:
574       names = hypervisor.GetHypervisor(hname).ListInstances()
575       results.extend(names)
576     except errors.HypervisorError, err:
577       logging.exception("Error enumerating instances for hypevisor %s", hname)
578       # FIXME: should we somehow not propagate this to the master?
579       raise
580
581   return results
582
583
584 def GetInstanceInfo(instance, hname):
585   """Gives back the informations about an instance as a dictionary.
586
587   @type instance: string
588   @param instance: the instance name
589   @type hname: string
590   @param hname: the hypervisor type of the instance
591
592   @rtype: dict
593   @return: dictionary with the following keys:
594       - memory: memory size of instance (int)
595       - state: xen state of instance (string)
596       - time: cpu time of instance (float)
597
598   """
599   output = {}
600
601   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
602   if iinfo is not None:
603     output['memory'] = iinfo[2]
604     output['state'] = iinfo[4]
605     output['time'] = iinfo[5]
606
607   return output
608
609
610 def GetInstanceMigratable(instance):
611   """Gives whether an instance can be migrated.
612
613   @type instance: L{objects.Instance}
614   @param instance: object representing the instance to be checked.
615
616   @rtype: tuple
617   @return: tuple of (result, description) where:
618       - result: whether the instance can be migrated or not
619       - description: a description of the issue, if relevant
620
621   """
622   hyper = hypervisor.GetHypervisor(instance.hypervisor)
623   if instance.name not in hyper.ListInstances():
624     return (False, 'not running')
625
626   for idx in range(len(instance.disks)):
627     link_name = _GetBlockDevSymlinkPath(instance.name, idx)
628     if not os.path.islink(link_name):
629       return (False, 'not restarted since ganeti 1.2.5')
630
631   return (True, '')
632
633
634 def GetAllInstancesInfo(hypervisor_list):
635   """Gather data about all instances.
636
637   This is the equivalent of L{GetInstanceInfo}, except that it
638   computes data for all instances at once, thus being faster if one
639   needs data about more than one instance.
640
641   @type hypervisor_list: list
642   @param hypervisor_list: list of hypervisors to query for instance data
643
644   @rtype: dict
645   @return: dictionary of instance: data, with data having the following keys:
646       - memory: memory size of instance (int)
647       - state: xen state of instance (string)
648       - time: cpu time of instance (float)
649       - vcpus: the number of vcpus
650
651   """
652   output = {}
653
654   for hname in hypervisor_list:
655     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
656     if iinfo:
657       for name, inst_id, memory, vcpus, state, times in iinfo:
658         value = {
659           'memory': memory,
660           'vcpus': vcpus,
661           'state': state,
662           'time': times,
663           }
664         if name in output and output[name] != value:
665           raise errors.HypervisorError("Instance %s running duplicate"
666                                        " with different parameters" % name)
667         output[name] = value
668
669   return output
670
671
672 def InstanceOsAdd(instance):
673   """Add an OS to an instance.
674
675   @type instance: L{objects.Instance}
676   @param instance: Instance whose OS is to be installed
677   @rtype: boolean
678   @return: the success of the operation
679
680   """
681   try:
682     inst_os = OSFromDisk(instance.os)
683   except errors.InvalidOS, err:
684     os_name, os_dir, os_err = err.args
685     if os_dir is None:
686       return (False, "Can't find OS '%s': %s" % (os_name, os_err))
687     else:
688       return (False, "Error parsing OS '%s' in directory %s: %s" %
689               (os_name, os_dir, os_err))
690
691   create_env = OSEnvironment(instance)
692
693   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
694                                      instance.name, int(time.time()))
695
696   result = utils.RunCmd([inst_os.create_script], env=create_env,
697                         cwd=inst_os.path, output=logfile,)
698   if result.failed:
699     logging.error("os create command '%s' returned error: %s, logfile: %s,"
700                   " output: %s", result.cmd, result.fail_reason, logfile,
701                   result.output)
702     lines = [utils.SafeEncode(val)
703              for val in utils.TailFile(logfile, lines=20)]
704     return (False, "OS create script failed (%s), last lines in the"
705             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
706
707   return (True, "Successfully installed")
708
709
710 def RunRenameInstance(instance, old_name):
711   """Run the OS rename script for an instance.
712
713   @type instance: L{objects.Instance}
714   @param instance: Instance whose OS is to be installed
715   @type old_name: string
716   @param old_name: previous instance name
717   @rtype: boolean
718   @return: the success of the operation
719
720   """
721   inst_os = OSFromDisk(instance.os)
722
723   rename_env = OSEnvironment(instance)
724   rename_env['OLD_INSTANCE_NAME'] = old_name
725
726   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
727                                            old_name,
728                                            instance.name, int(time.time()))
729
730   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
731                         cwd=inst_os.path, output=logfile)
732
733   if result.failed:
734     logging.error("os create command '%s' returned error: %s output: %s",
735                   result.cmd, result.fail_reason, result.output)
736     lines = [utils.SafeEncode(val)
737              for val in utils.TailFile(logfile, lines=20)]
738     return (False, "OS rename script failed (%s), last lines in the"
739             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
740
741   return (True, "Rename successful")
742
743
744 def _GetVGInfo(vg_name):
745   """Get informations about the volume group.
746
747   @type vg_name: str
748   @param vg_name: the volume group which we query
749   @rtype: dict
750   @return:
751     A dictionary with the following keys:
752       - C{vg_size} is the total size of the volume group in MiB
753       - C{vg_free} is the free size of the volume group in MiB
754       - C{pv_count} are the number of physical disks in that VG
755
756     If an error occurs during gathering of data, we return the same dict
757     with keys all set to None.
758
759   """
760   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
761
762   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
763                          "--nosuffix", "--units=m", "--separator=:", vg_name])
764
765   if retval.failed:
766     logging.error("volume group %s not present", vg_name)
767     return retdic
768   valarr = retval.stdout.strip().rstrip(':').split(':')
769   if len(valarr) == 3:
770     try:
771       retdic = {
772         "vg_size": int(round(float(valarr[0]), 0)),
773         "vg_free": int(round(float(valarr[1]), 0)),
774         "pv_count": int(valarr[2]),
775         }
776     except ValueError, err:
777       logging.exception("Fail to parse vgs output")
778   else:
779     logging.error("vgs output has the wrong number of fields (expected"
780                   " three): %s", str(valarr))
781   return retdic
782
783
784 def _GetBlockDevSymlinkPath(instance_name, idx):
785   return os.path.join(constants.DISK_LINKS_DIR,
786                       "%s:%d" % (instance_name, idx))
787
788
789 def _SymlinkBlockDev(instance_name, device_path, idx):
790   """Set up symlinks to a instance's block device.
791
792   This is an auxiliary function run when an instance is start (on the primary
793   node) or when an instance is migrated (on the target node).
794
795
796   @param instance_name: the name of the target instance
797   @param device_path: path of the physical block device, on the node
798   @param idx: the disk index
799   @return: absolute path to the disk's symlink
800
801   """
802   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
803   try:
804     os.symlink(device_path, link_name)
805   except OSError, err:
806     if err.errno == errno.EEXIST:
807       if (not os.path.islink(link_name) or
808           os.readlink(link_name) != device_path):
809         os.remove(link_name)
810         os.symlink(device_path, link_name)
811     else:
812       raise
813
814   return link_name
815
816
817 def _RemoveBlockDevLinks(instance_name, disks):
818   """Remove the block device symlinks belonging to the given instance.
819
820   """
821   for idx, disk in enumerate(disks):
822     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
823     if os.path.islink(link_name):
824       try:
825         os.remove(link_name)
826       except OSError:
827         logging.exception("Can't remove symlink '%s'", link_name)
828
829
830 def _GatherAndLinkBlockDevs(instance):
831   """Set up an instance's block device(s).
832
833   This is run on the primary node at instance startup. The block
834   devices must be already assembled.
835
836   @type instance: L{objects.Instance}
837   @param instance: the instance whose disks we shoul assemble
838   @rtype: list
839   @return: list of (disk_object, device_path)
840
841   """
842   block_devices = []
843   for idx, disk in enumerate(instance.disks):
844     device = _RecursiveFindBD(disk)
845     if device is None:
846       raise errors.BlockDeviceError("Block device '%s' is not set up." %
847                                     str(disk))
848     device.Open()
849     try:
850       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
851     except OSError, e:
852       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
853                                     e.strerror)
854
855     block_devices.append((disk, link_name))
856
857   return block_devices
858
859
860 def StartInstance(instance, extra_args):
861   """Start an instance.
862
863   @type instance: L{objects.Instance}
864   @param instance: the instance object
865   @rtype: boolean
866   @return: whether the startup was successful or not
867
868   """
869   running_instances = GetInstanceList([instance.hypervisor])
870
871   if instance.name in running_instances:
872     return (True, "Already running")
873
874   try:
875     block_devices = _GatherAndLinkBlockDevs(instance)
876     hyper = hypervisor.GetHypervisor(instance.hypervisor)
877     hyper.StartInstance(instance, block_devices, extra_args)
878   except errors.BlockDeviceError, err:
879     logging.exception("Failed to start instance")
880     return (False, "Block device error: %s" % str(err))
881   except errors.HypervisorError, err:
882     logging.exception("Failed to start instance")
883     _RemoveBlockDevLinks(instance.name, instance.disks)
884     return (False, "Hypervisor error: %s" % str(err))
885
886   return (True, "Instance started successfully")
887
888
889 def ShutdownInstance(instance):
890   """Shut an instance down.
891
892   @note: this functions uses polling with a hardcoded timeout.
893
894   @type instance: L{objects.Instance}
895   @param instance: the instance object
896   @rtype: boolean
897   @return: whether the startup was successful or not
898
899   """
900   hv_name = instance.hypervisor
901   running_instances = GetInstanceList([hv_name])
902
903   if instance.name not in running_instances:
904     return True
905
906   hyper = hypervisor.GetHypervisor(hv_name)
907   try:
908     hyper.StopInstance(instance)
909   except errors.HypervisorError, err:
910     logging.error("Failed to stop instance: %s" % err)
911     return False
912
913   # test every 10secs for 2min
914
915   time.sleep(1)
916   for dummy in range(11):
917     if instance.name not in GetInstanceList([hv_name]):
918       break
919     time.sleep(10)
920   else:
921     # the shutdown did not succeed
922     logging.error("Shutdown of '%s' unsuccessful, using destroy",
923                   instance.name)
924
925     try:
926       hyper.StopInstance(instance, force=True)
927     except errors.HypervisorError, err:
928       logging.exception("Failed to stop instance: %s" % err)
929       return False
930
931     time.sleep(1)
932     if instance.name in GetInstanceList([hv_name]):
933       logging.error("Could not shutdown instance '%s' even by destroy",
934                     instance.name)
935       return False
936
937   _RemoveBlockDevLinks(instance.name, instance.disks)
938
939   return True
940
941
942 def RebootInstance(instance, reboot_type, extra_args):
943   """Reboot an instance.
944
945   @type instance: L{objects.Instance}
946   @param instance: the instance object to reboot
947   @type reboot_type: str
948   @param reboot_type: the type of reboot, one the following
949     constants:
950       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
951         instance OS, do not recreate the VM
952       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
953         restart the VM (at the hypervisor level)
954       - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
955         is not accepted here, since that mode is handled
956         differently
957   @rtype: boolean
958   @return: the success of the operation
959
960   """
961   running_instances = GetInstanceList([instance.hypervisor])
962
963   if instance.name not in running_instances:
964     logging.error("Cannot reboot instance that is not running")
965     return False
966
967   hyper = hypervisor.GetHypervisor(instance.hypervisor)
968   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
969     try:
970       hyper.RebootInstance(instance)
971     except errors.HypervisorError, err:
972       logging.exception("Failed to soft reboot instance")
973       return False
974   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
975     try:
976       ShutdownInstance(instance)
977       StartInstance(instance, extra_args)
978     except errors.HypervisorError, err:
979       logging.exception("Failed to hard reboot instance")
980       return False
981   else:
982     raise errors.ParameterError("reboot_type invalid")
983
984   return True
985
986
987 def MigrationInfo(instance):
988   """Gather information about an instance to be migrated.
989
990   @type instance: L{objects.Instance}
991   @param instance: the instance definition
992
993   """
994   hyper = hypervisor.GetHypervisor(instance.hypervisor)
995   try:
996     info = hyper.MigrationInfo(instance)
997   except errors.HypervisorError, err:
998     msg = "Failed to fetch migration information"
999     logging.exception(msg)
1000     return (False, '%s: %s' % (msg, err))
1001   return (True, info)
1002
1003
1004 def AcceptInstance(instance, info, target):
1005   """Prepare the node to accept an instance.
1006
1007   @type instance: L{objects.Instance}
1008   @param instance: the instance definition
1009   @type info: string/data (opaque)
1010   @param info: migration information, from the source node
1011   @type target: string
1012   @param target: target host (usually ip), on this node
1013
1014   """
1015   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1016   try:
1017     hyper.AcceptInstance(instance, info, target)
1018   except errors.HypervisorError, err:
1019     msg = "Failed to accept instance"
1020     logging.exception(msg)
1021     return (False, '%s: %s' % (msg, err))
1022   return (True, "Accept successfull")
1023
1024
1025 def FinalizeMigration(instance, info, success):
1026   """Finalize any preparation to accept an instance.
1027
1028   @type instance: L{objects.Instance}
1029   @param instance: the instance definition
1030   @type info: string/data (opaque)
1031   @param info: migration information, from the source node
1032   @type success: boolean
1033   @param success: whether the migration was a success or a failure
1034
1035   """
1036   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1037   try:
1038     hyper.FinalizeMigration(instance, info, success)
1039   except errors.HypervisorError, err:
1040     msg = "Failed to finalize migration"
1041     logging.exception(msg)
1042     return (False, '%s: %s' % (msg, err))
1043   return (True, "Migration Finalized")
1044
1045
1046 def MigrateInstance(instance, target, live):
1047   """Migrates an instance to another node.
1048
1049   @type instance: L{objects.Instance}
1050   @param instance: the instance definition
1051   @type target: string
1052   @param target: the target node name
1053   @type live: boolean
1054   @param live: whether the migration should be done live or not (the
1055       interpretation of this parameter is left to the hypervisor)
1056   @rtype: tuple
1057   @return: a tuple of (success, msg) where:
1058       - succes is a boolean denoting the success/failure of the operation
1059       - msg is a string with details in case of failure
1060
1061   """
1062   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1063
1064   try:
1065     hyper.MigrateInstance(instance.name, target, live)
1066   except errors.HypervisorError, err:
1067     msg = "Failed to migrate instance"
1068     logging.exception(msg)
1069     return (False, "%s: %s" % (msg, err))
1070   return (True, "Migration successfull")
1071
1072
1073 def BlockdevCreate(disk, size, owner, on_primary, info):
1074   """Creates a block device for an instance.
1075
1076   @type disk: L{objects.Disk}
1077   @param disk: the object describing the disk we should create
1078   @type size: int
1079   @param size: the size of the physical underlying device, in MiB
1080   @type owner: str
1081   @param owner: the name of the instance for which disk is created,
1082       used for device cache data
1083   @type on_primary: boolean
1084   @param on_primary:  indicates if it is the primary node or not
1085   @type info: string
1086   @param info: string that will be sent to the physical device
1087       creation, used for example to set (LVM) tags on LVs
1088
1089   @return: the new unique_id of the device (this can sometime be
1090       computed only after creation), or None. On secondary nodes,
1091       it's not required to return anything.
1092
1093   """
1094   clist = []
1095   if disk.children:
1096     for child in disk.children:
1097       crdev = _RecursiveAssembleBD(child, owner, on_primary)
1098       if on_primary or disk.AssembleOnSecondary():
1099         # we need the children open in case the device itself has to
1100         # be assembled
1101         crdev.Open()
1102       clist.append(crdev)
1103
1104   try:
1105     device = bdev.Create(disk.dev_type, disk.physical_id, clist, size)
1106   except errors.GenericError, err:
1107     return False, "Can't create block device: %s" % str(err)
1108
1109   if on_primary or disk.AssembleOnSecondary():
1110     if not device.Assemble():
1111       errorstring = "Can't assemble device after creation, very unusual event"
1112       logging.error(errorstring)
1113       return False, errorstring
1114     device.SetSyncSpeed(constants.SYNC_SPEED)
1115     if on_primary or disk.OpenOnSecondary():
1116       device.Open(force=True)
1117     DevCacheManager.UpdateCache(device.dev_path, owner,
1118                                 on_primary, disk.iv_name)
1119
1120   device.SetInfo(info)
1121
1122   physical_id = device.unique_id
1123   return True, physical_id
1124
1125
1126 def BlockdevRemove(disk):
1127   """Remove a block device.
1128
1129   @note: This is intended to be called recursively.
1130
1131   @type disk: L{objects.Disk}
1132   @param disk: the disk object we should remove
1133   @rtype: boolean
1134   @return: the success of the operation
1135
1136   """
1137   try:
1138     rdev = _RecursiveFindBD(disk)
1139   except errors.BlockDeviceError, err:
1140     # probably can't attach
1141     logging.info("Can't attach to device %s in remove", disk)
1142     rdev = None
1143   if rdev is not None:
1144     r_path = rdev.dev_path
1145     result = rdev.Remove()
1146     if result:
1147       DevCacheManager.RemoveCache(r_path)
1148   else:
1149     result = True
1150   if disk.children:
1151     for child in disk.children:
1152       result = result and BlockdevRemove(child)
1153   return result
1154
1155
1156 def _RecursiveAssembleBD(disk, owner, as_primary):
1157   """Activate a block device for an instance.
1158
1159   This is run on the primary and secondary nodes for an instance.
1160
1161   @note: this function is called recursively.
1162
1163   @type disk: L{objects.Disk}
1164   @param disk: the disk we try to assemble
1165   @type owner: str
1166   @param owner: the name of the instance which owns the disk
1167   @type as_primary: boolean
1168   @param as_primary: if we should make the block device
1169       read/write
1170
1171   @return: the assembled device or None (in case no device
1172       was assembled)
1173   @raise errors.BlockDeviceError: in case there is an error
1174       during the activation of the children or the device
1175       itself
1176
1177   """
1178   children = []
1179   if disk.children:
1180     mcn = disk.ChildrenNeeded()
1181     if mcn == -1:
1182       mcn = 0 # max number of Nones allowed
1183     else:
1184       mcn = len(disk.children) - mcn # max number of Nones
1185     for chld_disk in disk.children:
1186       try:
1187         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1188       except errors.BlockDeviceError, err:
1189         if children.count(None) >= mcn:
1190           raise
1191         cdev = None
1192         logging.error("Error in child activation: %s", str(err))
1193       children.append(cdev)
1194
1195   if as_primary or disk.AssembleOnSecondary():
1196     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children)
1197     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1198     result = r_dev
1199     if as_primary or disk.OpenOnSecondary():
1200       r_dev.Open()
1201     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1202                                 as_primary, disk.iv_name)
1203
1204   else:
1205     result = True
1206   return result
1207
1208
1209 def BlockdevAssemble(disk, owner, as_primary):
1210   """Activate a block device for an instance.
1211
1212   This is a wrapper over _RecursiveAssembleBD.
1213
1214   @rtype: str or boolean
1215   @return: a C{/dev/...} path for primary nodes, and
1216       C{True} for secondary nodes
1217
1218   """
1219   result = _RecursiveAssembleBD(disk, owner, as_primary)
1220   if isinstance(result, bdev.BlockDev):
1221     result = result.dev_path
1222   return result
1223
1224
1225 def BlockdevShutdown(disk):
1226   """Shut down a block device.
1227
1228   First, if the device is assembled (Attach() is successfull), then
1229   the device is shutdown. Then the children of the device are
1230   shutdown.
1231
1232   This function is called recursively. Note that we don't cache the
1233   children or such, as oppossed to assemble, shutdown of different
1234   devices doesn't require that the upper device was active.
1235
1236   @type disk: L{objects.Disk}
1237   @param disk: the description of the disk we should
1238       shutdown
1239   @rtype: boolean
1240   @return: the success of the operation
1241
1242   """
1243   r_dev = _RecursiveFindBD(disk)
1244   if r_dev is not None:
1245     r_path = r_dev.dev_path
1246     result = r_dev.Shutdown()
1247     if result:
1248       DevCacheManager.RemoveCache(r_path)
1249   else:
1250     result = True
1251   if disk.children:
1252     for child in disk.children:
1253       result = result and BlockdevShutdown(child)
1254   return result
1255
1256
1257 def BlockdevAddchildren(parent_cdev, new_cdevs):
1258   """Extend a mirrored block device.
1259
1260   @type parent_cdev: L{objects.Disk}
1261   @param parent_cdev: the disk to which we should add children
1262   @type new_cdevs: list of L{objects.Disk}
1263   @param new_cdevs: the list of children which we should add
1264   @rtype: boolean
1265   @return: the success of the operation
1266
1267   """
1268   parent_bdev = _RecursiveFindBD(parent_cdev)
1269   if parent_bdev is None:
1270     logging.error("Can't find parent device")
1271     return False
1272   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1273   if new_bdevs.count(None) > 0:
1274     logging.error("Can't find new device(s) to add: %s:%s",
1275                   new_bdevs, new_cdevs)
1276     return False
1277   parent_bdev.AddChildren(new_bdevs)
1278   return True
1279
1280
1281 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1282   """Shrink a mirrored block device.
1283
1284   @type parent_cdev: L{objects.Disk}
1285   @param parent_cdev: the disk from which we should remove children
1286   @type new_cdevs: list of L{objects.Disk}
1287   @param new_cdevs: the list of children which we should remove
1288   @rtype: boolean
1289   @return: the success of the operation
1290
1291   """
1292   parent_bdev = _RecursiveFindBD(parent_cdev)
1293   if parent_bdev is None:
1294     logging.error("Can't find parent in remove children: %s", parent_cdev)
1295     return False
1296   devs = []
1297   for disk in new_cdevs:
1298     rpath = disk.StaticDevPath()
1299     if rpath is None:
1300       bd = _RecursiveFindBD(disk)
1301       if bd is None:
1302         logging.error("Can't find dynamic device %s while removing children",
1303                       disk)
1304         return False
1305       else:
1306         devs.append(bd.dev_path)
1307     else:
1308       devs.append(rpath)
1309   parent_bdev.RemoveChildren(devs)
1310   return True
1311
1312
1313 def BlockdevGetmirrorstatus(disks):
1314   """Get the mirroring status of a list of devices.
1315
1316   @type disks: list of L{objects.Disk}
1317   @param disks: the list of disks which we should query
1318   @rtype: disk
1319   @return:
1320       a list of (mirror_done, estimated_time) tuples, which
1321       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1322   @raise errors.BlockDeviceError: if any of the disks cannot be
1323       found
1324
1325   """
1326   stats = []
1327   for dsk in disks:
1328     rbd = _RecursiveFindBD(dsk)
1329     if rbd is None:
1330       raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1331     stats.append(rbd.CombinedSyncStatus())
1332   return stats
1333
1334
1335 def _RecursiveFindBD(disk):
1336   """Check if a device is activated.
1337
1338   If so, return informations about the real device.
1339
1340   @type disk: L{objects.Disk}
1341   @param disk: the disk object we need to find
1342
1343   @return: None if the device can't be found,
1344       otherwise the device instance
1345
1346   """
1347   children = []
1348   if disk.children:
1349     for chdisk in disk.children:
1350       children.append(_RecursiveFindBD(chdisk))
1351
1352   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1353
1354
1355 def BlockdevFind(disk):
1356   """Check if a device is activated.
1357
1358   If it is, return informations about the real device.
1359
1360   @type disk: L{objects.Disk}
1361   @param disk: the disk to find
1362   @rtype: None or tuple
1363   @return: None if the disk cannot be found, otherwise a
1364       tuple (device_path, major, minor, sync_percent,
1365       estimated_time, is_degraded)
1366
1367   """
1368   try:
1369     rbd = _RecursiveFindBD(disk)
1370   except errors.BlockDeviceError, err:
1371     return (False, str(err))
1372   if rbd is None:
1373     return (True, None)
1374   return (True, (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus())
1375
1376
1377 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1378   """Write a file to the filesystem.
1379
1380   This allows the master to overwrite(!) a file. It will only perform
1381   the operation if the file belongs to a list of configuration files.
1382
1383   @type file_name: str
1384   @param file_name: the target file name
1385   @type data: str
1386   @param data: the new contents of the file
1387   @type mode: int
1388   @param mode: the mode to give the file (can be None)
1389   @type uid: int
1390   @param uid: the owner of the file (can be -1 for default)
1391   @type gid: int
1392   @param gid: the group of the file (can be -1 for default)
1393   @type atime: float
1394   @param atime: the atime to set on the file (can be None)
1395   @type mtime: float
1396   @param mtime: the mtime to set on the file (can be None)
1397   @rtype: boolean
1398   @return: the success of the operation; errors are logged
1399       in the node daemon log
1400
1401   """
1402   if not os.path.isabs(file_name):
1403     logging.error("Filename passed to UploadFile is not absolute: '%s'",
1404                   file_name)
1405     return False
1406
1407   allowed_files = [
1408     constants.CLUSTER_CONF_FILE,
1409     constants.ETC_HOSTS,
1410     constants.SSH_KNOWN_HOSTS_FILE,
1411     constants.VNC_PASSWORD_FILE,
1412     ]
1413
1414   if file_name not in allowed_files:
1415     logging.error("Filename passed to UploadFile not in allowed"
1416                  " upload targets: '%s'", file_name)
1417     return False
1418
1419   raw_data = _Decompress(data)
1420
1421   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1422                   atime=atime, mtime=mtime)
1423   return True
1424
1425
1426 def WriteSsconfFiles(values):
1427   """Update all ssconf files.
1428
1429   Wrapper around the SimpleStore.WriteFiles.
1430
1431   """
1432   ssconf.SimpleStore().WriteFiles(values)
1433
1434
1435 def _ErrnoOrStr(err):
1436   """Format an EnvironmentError exception.
1437
1438   If the L{err} argument has an errno attribute, it will be looked up
1439   and converted into a textual C{E...} description. Otherwise the
1440   string representation of the error will be returned.
1441
1442   @type err: L{EnvironmentError}
1443   @param err: the exception to format
1444
1445   """
1446   if hasattr(err, 'errno'):
1447     detail = errno.errorcode[err.errno]
1448   else:
1449     detail = str(err)
1450   return detail
1451
1452
1453 def _OSOndiskVersion(name, os_dir):
1454   """Compute and return the API version of a given OS.
1455
1456   This function will try to read the API version of the OS given by
1457   the 'name' parameter and residing in the 'os_dir' directory.
1458
1459   @type name: str
1460   @param name: the OS name we should look for
1461   @type os_dir: str
1462   @param os_dir: the directory inwhich we should look for the OS
1463   @rtype: int or None
1464   @return:
1465       Either an integer denoting the version or None in the
1466       case when this is not a valid OS name.
1467   @raise errors.InvalidOS: if the OS cannot be found
1468
1469   """
1470   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1471
1472   try:
1473     st = os.stat(api_file)
1474   except EnvironmentError, err:
1475     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1476                            " found (%s)" % _ErrnoOrStr(err))
1477
1478   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1479     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1480                            " a regular file")
1481
1482   try:
1483     f = open(api_file)
1484     try:
1485       api_versions = f.readlines()
1486     finally:
1487       f.close()
1488   except EnvironmentError, err:
1489     raise errors.InvalidOS(name, os_dir, "error while reading the"
1490                            " API version (%s)" % _ErrnoOrStr(err))
1491
1492   api_versions = [version.strip() for version in api_versions]
1493   try:
1494     api_versions = [int(version) for version in api_versions]
1495   except (TypeError, ValueError), err:
1496     raise errors.InvalidOS(name, os_dir,
1497                            "API version is not integer (%s)" % str(err))
1498
1499   return api_versions
1500
1501
1502 def DiagnoseOS(top_dirs=None):
1503   """Compute the validity for all OSes.
1504
1505   @type top_dirs: list
1506   @param top_dirs: the list of directories in which to
1507       search (if not given defaults to
1508       L{constants.OS_SEARCH_PATH})
1509   @rtype: list of L{objects.OS}
1510   @return: an OS object for each name in all the given
1511       directories
1512
1513   """
1514   if top_dirs is None:
1515     top_dirs = constants.OS_SEARCH_PATH
1516
1517   result = []
1518   for dir_name in top_dirs:
1519     if os.path.isdir(dir_name):
1520       try:
1521         f_names = utils.ListVisibleFiles(dir_name)
1522       except EnvironmentError, err:
1523         logging.exception("Can't list the OS directory %s", dir_name)
1524         break
1525       for name in f_names:
1526         try:
1527           os_inst = OSFromDisk(name, base_dir=dir_name)
1528           result.append(os_inst)
1529         except errors.InvalidOS, err:
1530           result.append(objects.OS.FromInvalidOS(err))
1531
1532   return result
1533
1534
1535 def OSFromDisk(name, base_dir=None):
1536   """Create an OS instance from disk.
1537
1538   This function will return an OS instance if the given name is a
1539   valid OS name. Otherwise, it will raise an appropriate
1540   L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1541
1542   @type base_dir: string
1543   @keyword base_dir: Base directory containing OS installations.
1544                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1545   @rtype: L{objects.OS}
1546   @return: the OS instance if we find a valid one
1547   @raise errors.InvalidOS: if we don't find a valid OS
1548
1549   """
1550   if base_dir is None:
1551     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1552     if os_dir is None:
1553       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1554   else:
1555     os_dir = os.path.sep.join([base_dir, name])
1556
1557   api_versions = _OSOndiskVersion(name, os_dir)
1558
1559   if constants.OS_API_VERSION not in api_versions:
1560     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1561                            " (found %s want %s)"
1562                            % (api_versions, constants.OS_API_VERSION))
1563
1564   # OS Scripts dictionary, we will populate it with the actual script names
1565   os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1566
1567   for script in os_scripts:
1568     os_scripts[script] = os.path.sep.join([os_dir, script])
1569
1570     try:
1571       st = os.stat(os_scripts[script])
1572     except EnvironmentError, err:
1573       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1574                              (script, _ErrnoOrStr(err)))
1575
1576     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1577       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1578                              script)
1579
1580     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1581       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1582                              script)
1583
1584
1585   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1586                     create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1587                     export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1588                     import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1589                     rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1590                     api_versions=api_versions)
1591
1592 def OSEnvironment(instance, debug=0):
1593   """Calculate the environment for an os script.
1594
1595   @type instance: L{objects.Instance}
1596   @param instance: target instance for the os script run
1597   @type debug: integer
1598   @param debug: debug level (0 or 1, for OS Api 10)
1599   @rtype: dict
1600   @return: dict of environment variables
1601   @raise errors.BlockDeviceError: if the block device
1602       cannot be found
1603
1604   """
1605   result = {}
1606   result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1607   result['INSTANCE_NAME'] = instance.name
1608   result['INSTANCE_OS'] = instance.os
1609   result['HYPERVISOR'] = instance.hypervisor
1610   result['DISK_COUNT'] = '%d' % len(instance.disks)
1611   result['NIC_COUNT'] = '%d' % len(instance.nics)
1612   result['DEBUG_LEVEL'] = '%d' % debug
1613   for idx, disk in enumerate(instance.disks):
1614     real_disk = _RecursiveFindBD(disk)
1615     if real_disk is None:
1616       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1617                                     str(disk))
1618     real_disk.Open()
1619     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1620     # FIXME: When disks will have read-only mode, populate this
1621     result['DISK_%d_ACCESS' % idx] = disk.mode
1622     if constants.HV_DISK_TYPE in instance.hvparams:
1623       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1624         instance.hvparams[constants.HV_DISK_TYPE]
1625     if disk.dev_type in constants.LDS_BLOCK:
1626       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1627     elif disk.dev_type == constants.LD_FILE:
1628       result['DISK_%d_BACKEND_TYPE' % idx] = \
1629         'file:%s' % disk.physical_id[0]
1630   for idx, nic in enumerate(instance.nics):
1631     result['NIC_%d_MAC' % idx] = nic.mac
1632     if nic.ip:
1633       result['NIC_%d_IP' % idx] = nic.ip
1634     result['NIC_%d_BRIDGE' % idx] = nic.bridge
1635     if constants.HV_NIC_TYPE in instance.hvparams:
1636       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1637         instance.hvparams[constants.HV_NIC_TYPE]
1638
1639   return result
1640
1641 def BlockdevGrow(disk, amount):
1642   """Grow a stack of block devices.
1643
1644   This function is called recursively, with the childrens being the
1645   first ones to resize.
1646
1647   @type disk: L{objects.Disk}
1648   @param disk: the disk to be grown
1649   @rtype: (status, result)
1650   @return: a tuple with the status of the operation
1651       (True/False), and the errors message if status
1652       is False
1653
1654   """
1655   r_dev = _RecursiveFindBD(disk)
1656   if r_dev is None:
1657     return False, "Cannot find block device %s" % (disk,)
1658
1659   try:
1660     r_dev.Grow(amount)
1661   except errors.BlockDeviceError, err:
1662     return False, str(err)
1663
1664   return True, None
1665
1666
1667 def BlockdevSnapshot(disk):
1668   """Create a snapshot copy of a block device.
1669
1670   This function is called recursively, and the snapshot is actually created
1671   just for the leaf lvm backend device.
1672
1673   @type disk: L{objects.Disk}
1674   @param disk: the disk to be snapshotted
1675   @rtype: string
1676   @return: snapshot disk path
1677
1678   """
1679   if disk.children:
1680     if len(disk.children) == 1:
1681       # only one child, let's recurse on it
1682       return BlockdevSnapshot(disk.children[0])
1683     else:
1684       # more than one child, choose one that matches
1685       for child in disk.children:
1686         if child.size == disk.size:
1687           # return implies breaking the loop
1688           return BlockdevSnapshot(child)
1689   elif disk.dev_type == constants.LD_LV:
1690     r_dev = _RecursiveFindBD(disk)
1691     if r_dev is not None:
1692       # let's stay on the safe side and ask for the full size, for now
1693       return r_dev.Snapshot(disk.size)
1694     else:
1695       return None
1696   else:
1697     raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1698                                  " '%s' of type '%s'" %
1699                                  (disk.unique_id, disk.dev_type))
1700
1701
1702 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1703   """Export a block device snapshot to a remote node.
1704
1705   @type disk: L{objects.Disk}
1706   @param disk: the description of the disk to export
1707   @type dest_node: str
1708   @param dest_node: the destination node to export to
1709   @type instance: L{objects.Instance}
1710   @param instance: the instance object to whom the disk belongs
1711   @type cluster_name: str
1712   @param cluster_name: the cluster name, needed for SSH hostalias
1713   @type idx: int
1714   @param idx: the index of the disk in the instance's disk list,
1715       used to export to the OS scripts environment
1716   @rtype: boolean
1717   @return: the success of the operation
1718
1719   """
1720   export_env = OSEnvironment(instance)
1721
1722   inst_os = OSFromDisk(instance.os)
1723   export_script = inst_os.export_script
1724
1725   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1726                                      instance.name, int(time.time()))
1727   if not os.path.exists(constants.LOG_OS_DIR):
1728     os.mkdir(constants.LOG_OS_DIR, 0750)
1729   real_disk = _RecursiveFindBD(disk)
1730   if real_disk is None:
1731     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1732                                   str(disk))
1733   real_disk.Open()
1734
1735   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1736   export_env['EXPORT_INDEX'] = str(idx)
1737
1738   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1739   destfile = disk.physical_id[1]
1740
1741   # the target command is built out of three individual commands,
1742   # which are joined by pipes; we check each individual command for
1743   # valid parameters
1744   expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1745                                export_script, logfile)
1746
1747   comprcmd = "gzip"
1748
1749   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1750                                 destdir, destdir, destfile)
1751   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1752                                                    constants.GANETI_RUNAS,
1753                                                    destcmd)
1754
1755   # all commands have been checked, so we're safe to combine them
1756   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1757
1758   result = utils.RunCmd(command, env=export_env)
1759
1760   if result.failed:
1761     logging.error("os snapshot export command '%s' returned error: %s"
1762                   " output: %s", command, result.fail_reason, result.output)
1763     return False
1764
1765   return True
1766
1767
1768 def FinalizeExport(instance, snap_disks):
1769   """Write out the export configuration information.
1770
1771   @type instance: L{objects.Instance}
1772   @param instance: the instance which we export, used for
1773       saving configuration
1774   @type snap_disks: list of L{objects.Disk}
1775   @param snap_disks: list of snapshot block devices, which
1776       will be used to get the actual name of the dump file
1777
1778   @rtype: boolean
1779   @return: the success of the operation
1780
1781   """
1782   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1783   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1784
1785   config = objects.SerializableConfigParser()
1786
1787   config.add_section(constants.INISECT_EXP)
1788   config.set(constants.INISECT_EXP, 'version', '0')
1789   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1790   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1791   config.set(constants.INISECT_EXP, 'os', instance.os)
1792   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1793
1794   config.add_section(constants.INISECT_INS)
1795   config.set(constants.INISECT_INS, 'name', instance.name)
1796   config.set(constants.INISECT_INS, 'memory', '%d' %
1797              instance.beparams[constants.BE_MEMORY])
1798   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1799              instance.beparams[constants.BE_VCPUS])
1800   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1801
1802   nic_total = 0
1803   for nic_count, nic in enumerate(instance.nics):
1804     nic_total += 1
1805     config.set(constants.INISECT_INS, 'nic%d_mac' %
1806                nic_count, '%s' % nic.mac)
1807     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1808     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1809                '%s' % nic.bridge)
1810   # TODO: redundant: on load can read nics until it doesn't exist
1811   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1812
1813   disk_total = 0
1814   for disk_count, disk in enumerate(snap_disks):
1815     if disk:
1816       disk_total += 1
1817       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1818                  ('%s' % disk.iv_name))
1819       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1820                  ('%s' % disk.physical_id[1]))
1821       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1822                  ('%d' % disk.size))
1823
1824   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1825
1826   utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1827                   data=config.Dumps())
1828   shutil.rmtree(finaldestdir, True)
1829   shutil.move(destdir, finaldestdir)
1830
1831   return True
1832
1833
1834 def ExportInfo(dest):
1835   """Get export configuration information.
1836
1837   @type dest: str
1838   @param dest: directory containing the export
1839
1840   @rtype: L{objects.SerializableConfigParser}
1841   @return: a serializable config file containing the
1842       export info
1843
1844   """
1845   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1846
1847   config = objects.SerializableConfigParser()
1848   config.read(cff)
1849
1850   if (not config.has_section(constants.INISECT_EXP) or
1851       not config.has_section(constants.INISECT_INS)):
1852     return None
1853
1854   return config
1855
1856
1857 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1858   """Import an os image into an instance.
1859
1860   @type instance: L{objects.Instance}
1861   @param instance: instance to import the disks into
1862   @type src_node: string
1863   @param src_node: source node for the disk images
1864   @type src_images: list of string
1865   @param src_images: absolute paths of the disk images
1866   @rtype: list of boolean
1867   @return: each boolean represent the success of importing the n-th disk
1868
1869   """
1870   import_env = OSEnvironment(instance)
1871   inst_os = OSFromDisk(instance.os)
1872   import_script = inst_os.import_script
1873
1874   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1875                                         instance.name, int(time.time()))
1876   if not os.path.exists(constants.LOG_OS_DIR):
1877     os.mkdir(constants.LOG_OS_DIR, 0750)
1878
1879   comprcmd = "gunzip"
1880   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1881                                import_script, logfile)
1882
1883   final_result = []
1884   for idx, image in enumerate(src_images):
1885     if image:
1886       destcmd = utils.BuildShellCmd('cat %s', image)
1887       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1888                                                        constants.GANETI_RUNAS,
1889                                                        destcmd)
1890       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1891       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1892       import_env['IMPORT_INDEX'] = str(idx)
1893       result = utils.RunCmd(command, env=import_env)
1894       if result.failed:
1895         logging.error("Disk import command '%s' returned error: %s"
1896                       " output: %s", command, result.fail_reason,
1897                       result.output)
1898         final_result.append(False)
1899       else:
1900         final_result.append(True)
1901     else:
1902       final_result.append(True)
1903
1904   return final_result
1905
1906
1907 def ListExports():
1908   """Return a list of exports currently available on this machine.
1909
1910   @rtype: list
1911   @return: list of the exports
1912
1913   """
1914   if os.path.isdir(constants.EXPORT_DIR):
1915     return utils.ListVisibleFiles(constants.EXPORT_DIR)
1916   else:
1917     return []
1918
1919
1920 def RemoveExport(export):
1921   """Remove an existing export from the node.
1922
1923   @type export: str
1924   @param export: the name of the export to remove
1925   @rtype: boolean
1926   @return: the success of the operation
1927
1928   """
1929   target = os.path.join(constants.EXPORT_DIR, export)
1930
1931   shutil.rmtree(target)
1932   # TODO: catch some of the relevant exceptions and provide a pretty
1933   # error message if rmtree fails.
1934
1935   return True
1936
1937
1938 def BlockdevRename(devlist):
1939   """Rename a list of block devices.
1940
1941   @type devlist: list of tuples
1942   @param devlist: list of tuples of the form  (disk,
1943       new_logical_id, new_physical_id); disk is an
1944       L{objects.Disk} object describing the current disk,
1945       and new logical_id/physical_id is the name we
1946       rename it to
1947   @rtype: boolean
1948   @return: True if all renames succeeded, False otherwise
1949
1950   """
1951   result = True
1952   for disk, unique_id in devlist:
1953     dev = _RecursiveFindBD(disk)
1954     if dev is None:
1955       result = False
1956       continue
1957     try:
1958       old_rpath = dev.dev_path
1959       dev.Rename(unique_id)
1960       new_rpath = dev.dev_path
1961       if old_rpath != new_rpath:
1962         DevCacheManager.RemoveCache(old_rpath)
1963         # FIXME: we should add the new cache information here, like:
1964         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1965         # but we don't have the owner here - maybe parse from existing
1966         # cache? for now, we only lose lvm data when we rename, which
1967         # is less critical than DRBD or MD
1968     except errors.BlockDeviceError, err:
1969       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1970       result = False
1971   return result
1972
1973
1974 def _TransformFileStorageDir(file_storage_dir):
1975   """Checks whether given file_storage_dir is valid.
1976
1977   Checks wheter the given file_storage_dir is within the cluster-wide
1978   default file_storage_dir stored in SimpleStore. Only paths under that
1979   directory are allowed.
1980
1981   @type file_storage_dir: str
1982   @param file_storage_dir: the path to check
1983
1984   @return: the normalized path if valid, None otherwise
1985
1986   """
1987   cfg = _GetConfig()
1988   file_storage_dir = os.path.normpath(file_storage_dir)
1989   base_file_storage_dir = cfg.GetFileStorageDir()
1990   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1991       base_file_storage_dir):
1992     logging.error("file storage directory '%s' is not under base file"
1993                   " storage directory '%s'",
1994                   file_storage_dir, base_file_storage_dir)
1995     return None
1996   return file_storage_dir
1997
1998
1999 def CreateFileStorageDir(file_storage_dir):
2000   """Create file storage directory.
2001
2002   @type file_storage_dir: str
2003   @param file_storage_dir: directory to create
2004
2005   @rtype: tuple
2006   @return: tuple with first element a boolean indicating wheter dir
2007       creation was successful or not
2008
2009   """
2010   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2011   result = True,
2012   if not file_storage_dir:
2013     result = False,
2014   else:
2015     if os.path.exists(file_storage_dir):
2016       if not os.path.isdir(file_storage_dir):
2017         logging.error("'%s' is not a directory", file_storage_dir)
2018         result = False,
2019     else:
2020       try:
2021         os.makedirs(file_storage_dir, 0750)
2022       except OSError, err:
2023         logging.error("Cannot create file storage directory '%s': %s",
2024                       file_storage_dir, err)
2025         result = False,
2026   return result
2027
2028
2029 def RemoveFileStorageDir(file_storage_dir):
2030   """Remove file storage directory.
2031
2032   Remove it only if it's empty. If not log an error and return.
2033
2034   @type file_storage_dir: str
2035   @param file_storage_dir: the directory we should cleanup
2036   @rtype: tuple (success,)
2037   @return: tuple of one element, C{success}, denoting
2038       whether the operation was successfull
2039
2040   """
2041   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2042   result = True,
2043   if not file_storage_dir:
2044     result = False,
2045   else:
2046     if os.path.exists(file_storage_dir):
2047       if not os.path.isdir(file_storage_dir):
2048         logging.error("'%s' is not a directory", file_storage_dir)
2049         result = False,
2050       # deletes dir only if empty, otherwise we want to return False
2051       try:
2052         os.rmdir(file_storage_dir)
2053       except OSError, err:
2054         logging.exception("Cannot remove file storage directory '%s'",
2055                           file_storage_dir)
2056         result = False,
2057   return result
2058
2059
2060 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2061   """Rename the file storage directory.
2062
2063   @type old_file_storage_dir: str
2064   @param old_file_storage_dir: the current path
2065   @type new_file_storage_dir: str
2066   @param new_file_storage_dir: the name we should rename to
2067   @rtype: tuple (success,)
2068   @return: tuple of one element, C{success}, denoting
2069       whether the operation was successful
2070
2071   """
2072   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2073   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2074   result = True,
2075   if not old_file_storage_dir or not new_file_storage_dir:
2076     result = False,
2077   else:
2078     if not os.path.exists(new_file_storage_dir):
2079       if os.path.isdir(old_file_storage_dir):
2080         try:
2081           os.rename(old_file_storage_dir, new_file_storage_dir)
2082         except OSError, err:
2083           logging.exception("Cannot rename '%s' to '%s'",
2084                             old_file_storage_dir, new_file_storage_dir)
2085           result =  False,
2086       else:
2087         logging.error("'%s' is not a directory", old_file_storage_dir)
2088         result = False,
2089     else:
2090       if os.path.exists(old_file_storage_dir):
2091         logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
2092                       old_file_storage_dir, new_file_storage_dir)
2093         result = False,
2094   return result
2095
2096
2097 def _IsJobQueueFile(file_name):
2098   """Checks whether the given filename is in the queue directory.
2099
2100   @type file_name: str
2101   @param file_name: the file name we should check
2102   @rtype: boolean
2103   @return: whether the file is under the queue directory
2104
2105   """
2106   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2107   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2108
2109   if not result:
2110     logging.error("'%s' is not a file in the queue directory",
2111                   file_name)
2112
2113   return result
2114
2115
2116 def JobQueueUpdate(file_name, content):
2117   """Updates a file in the queue directory.
2118
2119   This is just a wrapper over L{utils.WriteFile}, with proper
2120   checking.
2121
2122   @type file_name: str
2123   @param file_name: the job file name
2124   @type content: str
2125   @param content: the new job contents
2126   @rtype: boolean
2127   @return: the success of the operation
2128
2129   """
2130   if not _IsJobQueueFile(file_name):
2131     return False
2132
2133   # Write and replace the file atomically
2134   utils.WriteFile(file_name, data=_Decompress(content))
2135
2136   return True
2137
2138
2139 def JobQueueRename(old, new):
2140   """Renames a job queue file.
2141
2142   This is just a wrapper over os.rename with proper checking.
2143
2144   @type old: str
2145   @param old: the old (actual) file name
2146   @type new: str
2147   @param new: the desired file name
2148   @rtype: boolean
2149   @return: the success of the operation
2150
2151   """
2152   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2153     return False
2154
2155   utils.RenameFile(old, new, mkdir=True)
2156
2157   return True
2158
2159
2160 def JobQueueSetDrainFlag(drain_flag):
2161   """Set the drain flag for the queue.
2162
2163   This will set or unset the queue drain flag.
2164
2165   @type drain_flag: boolean
2166   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2167   @rtype: boolean
2168   @return: always True
2169   @warning: the function always returns True
2170
2171   """
2172   if drain_flag:
2173     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2174   else:
2175     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2176
2177   return True
2178
2179
2180 def BlockdevClose(instance_name, disks):
2181   """Closes the given block devices.
2182
2183   This means they will be switched to secondary mode (in case of
2184   DRBD).
2185
2186   @param instance_name: if the argument is not empty, the symlinks
2187       of this instance will be removed
2188   @type disks: list of L{objects.Disk}
2189   @param disks: the list of disks to be closed
2190   @rtype: tuple (success, message)
2191   @return: a tuple of success and message, where success
2192       indicates the succes of the operation, and message
2193       which will contain the error details in case we
2194       failed
2195
2196   """
2197   bdevs = []
2198   for cf in disks:
2199     rd = _RecursiveFindBD(cf)
2200     if rd is None:
2201       return (False, "Can't find device %s" % cf)
2202     bdevs.append(rd)
2203
2204   msg = []
2205   for rd in bdevs:
2206     try:
2207       rd.Close()
2208     except errors.BlockDeviceError, err:
2209       msg.append(str(err))
2210   if msg:
2211     return (False, "Can't make devices secondary: %s" % ",".join(msg))
2212   else:
2213     if instance_name:
2214       _RemoveBlockDevLinks(instance_name, disks)
2215     return (True, "All devices secondary")
2216
2217
2218 def ValidateHVParams(hvname, hvparams):
2219   """Validates the given hypervisor parameters.
2220
2221   @type hvname: string
2222   @param hvname: the hypervisor name
2223   @type hvparams: dict
2224   @param hvparams: the hypervisor parameters to be validated
2225   @rtype: tuple (success, message)
2226   @return: a tuple of success and message, where success
2227       indicates the succes of the operation, and message
2228       which will contain the error details in case we
2229       failed
2230
2231   """
2232   try:
2233     hv_type = hypervisor.GetHypervisor(hvname)
2234     hv_type.ValidateParameters(hvparams)
2235     return (True, "Validation passed")
2236   except errors.HypervisorError, err:
2237     return (False, str(err))
2238
2239
2240 def DemoteFromMC():
2241   """Demotes the current node from master candidate role.
2242
2243   """
2244   # try to ensure we're not the master by mistake
2245   master, myself = ssconf.GetMasterAndMyself()
2246   if master == myself:
2247     return (False, "ssconf status shows I'm the master node, will not demote")
2248   pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2249   if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2250     return (False, "The master daemon is running, will not demote")
2251   try:
2252     utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2253   except EnvironmentError, err:
2254     if err.errno != errno.ENOENT:
2255       return (False, "Error while backing up cluster file: %s" % str(err))
2256   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2257   return (True, "Done")
2258
2259
2260 def _FindDisks(nodes_ip, disks):
2261   """Sets the physical ID on disks and returns the block devices.
2262
2263   """
2264   # set the correct physical ID
2265   my_name = utils.HostInfo().name
2266   for cf in disks:
2267     cf.SetPhysicalID(my_name, nodes_ip)
2268
2269   bdevs = []
2270
2271   for cf in disks:
2272     rd = _RecursiveFindBD(cf)
2273     if rd is None:
2274       return (False, "Can't find device %s" % cf)
2275     bdevs.append(rd)
2276   return (True, bdevs)
2277
2278
2279 def DrbdDisconnectNet(nodes_ip, disks):
2280   """Disconnects the network on a list of drbd devices.
2281
2282   """
2283   status, bdevs = _FindDisks(nodes_ip, disks)
2284   if not status:
2285     return status, bdevs
2286
2287   # disconnect disks
2288   for rd in bdevs:
2289     try:
2290       rd.DisconnectNet()
2291     except errors.BlockDeviceError, err:
2292       logging.exception("Failed to go into standalone mode")
2293       return (False, "Can't change network configuration: %s" % str(err))
2294   return (True, "All disks are now disconnected")
2295
2296
2297 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2298   """Attaches the network on a list of drbd devices.
2299
2300   """
2301   status, bdevs = _FindDisks(nodes_ip, disks)
2302   if not status:
2303     return status, bdevs
2304
2305   if multimaster:
2306     for idx, rd in enumerate(bdevs):
2307       try:
2308         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2309       except EnvironmentError, err:
2310         return (False, "Can't create symlink: %s" % str(err))
2311   # reconnect disks, switch to new master configuration and if
2312   # needed primary mode
2313   for rd in bdevs:
2314     try:
2315       rd.AttachNet(multimaster)
2316     except errors.BlockDeviceError, err:
2317       return (False, "Can't change network configuration: %s" % str(err))
2318   # wait until the disks are connected; we need to retry the re-attach
2319   # if the device becomes standalone, as this might happen if the one
2320   # node disconnects and reconnects in a different mode before the
2321   # other node reconnects; in this case, one or both of the nodes will
2322   # decide it has wrong configuration and switch to standalone
2323   RECONNECT_TIMEOUT = 2 * 60
2324   sleep_time = 0.100 # start with 100 miliseconds
2325   timeout_limit = time.time() + RECONNECT_TIMEOUT
2326   while time.time() < timeout_limit:
2327     all_connected = True
2328     for rd in bdevs:
2329       stats = rd.GetProcStatus()
2330       if not (stats.is_connected or stats.is_in_resync):
2331         all_connected = False
2332       if stats.is_standalone:
2333         # peer had different config info and this node became
2334         # standalone, even though this should not happen with the
2335         # new staged way of changing disk configs
2336         try:
2337           rd.ReAttachNet(multimaster)
2338         except errors.BlockDeviceError, err:
2339           return (False, "Can't change network configuration: %s" % str(err))
2340     if all_connected:
2341       break
2342     time.sleep(sleep_time)
2343     sleep_time = min(5, sleep_time * 1.5)
2344   if not all_connected:
2345     return (False, "Timeout in disk reconnecting")
2346   if multimaster:
2347     # change to primary mode
2348     for rd in bdevs:
2349       try:
2350         rd.Open()
2351       except errors.BlockDeviceError, err:
2352         return (False, "Can't change to primary mode: %s" % str(err))
2353   if multimaster:
2354     msg = "multi-master and primary"
2355   else:
2356     msg = "single-master"
2357   return (True, "Disks are now configured as %s" % msg)
2358
2359
2360 def DrbdWaitSync(nodes_ip, disks):
2361   """Wait until DRBDs have synchronized.
2362
2363   """
2364   status, bdevs = _FindDisks(nodes_ip, disks)
2365   if not status:
2366     return status, bdevs
2367
2368   min_resync = 100
2369   alldone = True
2370   failure = False
2371   for rd in bdevs:
2372     stats = rd.GetProcStatus()
2373     if not (stats.is_connected or stats.is_in_resync):
2374       failure = True
2375       break
2376     alldone = alldone and (not stats.is_in_resync)
2377     if stats.sync_percent is not None:
2378       min_resync = min(min_resync, stats.sync_percent)
2379   return (not failure, (alldone, min_resync))
2380
2381
2382 class HooksRunner(object):
2383   """Hook runner.
2384
2385   This class is instantiated on the node side (ganeti-noded) and not
2386   on the master side.
2387
2388   """
2389   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2390
2391   def __init__(self, hooks_base_dir=None):
2392     """Constructor for hooks runner.
2393
2394     @type hooks_base_dir: str or None
2395     @param hooks_base_dir: if not None, this overrides the
2396         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2397
2398     """
2399     if hooks_base_dir is None:
2400       hooks_base_dir = constants.HOOKS_BASE_DIR
2401     self._BASE_DIR = hooks_base_dir
2402
2403   @staticmethod
2404   def ExecHook(script, env):
2405     """Exec one hook script.
2406
2407     @type script: str
2408     @param script: the full path to the script
2409     @type env: dict
2410     @param env: the environment with which to exec the script
2411     @rtype: tuple (success, message)
2412     @return: a tuple of success and message, where success
2413         indicates the succes of the operation, and message
2414         which will contain the error details in case we
2415         failed
2416
2417     """
2418     # exec the process using subprocess and log the output
2419     fdstdin = None
2420     try:
2421       fdstdin = open("/dev/null", "r")
2422       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2423                                stderr=subprocess.STDOUT, close_fds=True,
2424                                shell=False, cwd="/", env=env)
2425       output = ""
2426       try:
2427         output = child.stdout.read(4096)
2428         child.stdout.close()
2429       except EnvironmentError, err:
2430         output += "Hook script error: %s" % str(err)
2431
2432       while True:
2433         try:
2434           result = child.wait()
2435           break
2436         except EnvironmentError, err:
2437           if err.errno == errno.EINTR:
2438             continue
2439           raise
2440     finally:
2441       # try not to leak fds
2442       for fd in (fdstdin, ):
2443         if fd is not None:
2444           try:
2445             fd.close()
2446           except EnvironmentError, err:
2447             # just log the error
2448             #logging.exception("Error while closing fd %s", fd)
2449             pass
2450
2451     return result == 0, utils.SafeEncode(output.strip())
2452
2453   def RunHooks(self, hpath, phase, env):
2454     """Run the scripts in the hooks directory.
2455
2456     @type hpath: str
2457     @param hpath: the path to the hooks directory which
2458         holds the scripts
2459     @type phase: str
2460     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2461         L{constants.HOOKS_PHASE_POST}
2462     @type env: dict
2463     @param env: dictionary with the environment for the hook
2464     @rtype: list
2465     @return: list of 3-element tuples:
2466       - script path
2467       - script result, either L{constants.HKR_SUCCESS} or
2468         L{constants.HKR_FAIL}
2469       - output of the script
2470
2471     @raise errors.ProgrammerError: for invalid input
2472         parameters
2473
2474     """
2475     if phase == constants.HOOKS_PHASE_PRE:
2476       suffix = "pre"
2477     elif phase == constants.HOOKS_PHASE_POST:
2478       suffix = "post"
2479     else:
2480       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2481     rr = []
2482
2483     subdir = "%s-%s.d" % (hpath, suffix)
2484     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2485     try:
2486       dir_contents = utils.ListVisibleFiles(dir_name)
2487     except OSError, err:
2488       # FIXME: must log output in case of failures
2489       return rr
2490
2491     # we use the standard python sort order,
2492     # so 00name is the recommended naming scheme
2493     dir_contents.sort()
2494     for relname in dir_contents:
2495       fname = os.path.join(dir_name, relname)
2496       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2497           self.RE_MASK.match(relname) is not None):
2498         rrval = constants.HKR_SKIP
2499         output = ""
2500       else:
2501         result, output = self.ExecHook(fname, env)
2502         if not result:
2503           rrval = constants.HKR_FAIL
2504         else:
2505           rrval = constants.HKR_SUCCESS
2506       rr.append(("%s/%s" % (subdir, relname), rrval, output))
2507
2508     return rr
2509
2510
2511 class IAllocatorRunner(object):
2512   """IAllocator runner.
2513
2514   This class is instantiated on the node side (ganeti-noded) and not on
2515   the master side.
2516
2517   """
2518   def Run(self, name, idata):
2519     """Run an iallocator script.
2520
2521     @type name: str
2522     @param name: the iallocator script name
2523     @type idata: str
2524     @param idata: the allocator input data
2525
2526     @rtype: tuple
2527     @return: four element tuple of:
2528        - run status (one of the IARUN_ constants)
2529        - stdout
2530        - stderr
2531        - fail reason (as from L{utils.RunResult})
2532
2533     """
2534     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2535                                   os.path.isfile)
2536     if alloc_script is None:
2537       return (constants.IARUN_NOTFOUND, None, None, None)
2538
2539     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2540     try:
2541       os.write(fd, idata)
2542       os.close(fd)
2543       result = utils.RunCmd([alloc_script, fin_name])
2544       if result.failed:
2545         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2546                 result.fail_reason)
2547     finally:
2548       os.unlink(fin_name)
2549
2550     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2551
2552
2553 class DevCacheManager(object):
2554   """Simple class for managing a cache of block device information.
2555
2556   """
2557   _DEV_PREFIX = "/dev/"
2558   _ROOT_DIR = constants.BDEV_CACHE_DIR
2559
2560   @classmethod
2561   def _ConvertPath(cls, dev_path):
2562     """Converts a /dev/name path to the cache file name.
2563
2564     This replaces slashes with underscores and strips the /dev
2565     prefix. It then returns the full path to the cache file.
2566
2567     @type dev_path: str
2568     @param dev_path: the C{/dev/} path name
2569     @rtype: str
2570     @return: the converted path name
2571
2572     """
2573     if dev_path.startswith(cls._DEV_PREFIX):
2574       dev_path = dev_path[len(cls._DEV_PREFIX):]
2575     dev_path = dev_path.replace("/", "_")
2576     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2577     return fpath
2578
2579   @classmethod
2580   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2581     """Updates the cache information for a given device.
2582
2583     @type dev_path: str
2584     @param dev_path: the pathname of the device
2585     @type owner: str
2586     @param owner: the owner (instance name) of the device
2587     @type on_primary: bool
2588     @param on_primary: whether this is the primary
2589         node nor not
2590     @type iv_name: str
2591     @param iv_name: the instance-visible name of the
2592         device, as in objects.Disk.iv_name
2593
2594     @rtype: None
2595
2596     """
2597     if dev_path is None:
2598       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2599       return
2600     fpath = cls._ConvertPath(dev_path)
2601     if on_primary:
2602       state = "primary"
2603     else:
2604       state = "secondary"
2605     if iv_name is None:
2606       iv_name = "not_visible"
2607     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2608     try:
2609       utils.WriteFile(fpath, data=fdata)
2610     except EnvironmentError, err:
2611       logging.exception("Can't update bdev cache for %s", dev_path)
2612
2613   @classmethod
2614   def RemoveCache(cls, dev_path):
2615     """Remove data for a dev_path.
2616
2617     This is just a wrapper over L{utils.RemoveFile} with a converted
2618     path name and logging.
2619
2620     @type dev_path: str
2621     @param dev_path: the pathname of the device
2622
2623     @rtype: None
2624
2625     """
2626     if dev_path is None:
2627       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2628       return
2629     fpath = cls._ConvertPath(dev_path)
2630     try:
2631       utils.RemoveFile(fpath)
2632     except EnvironmentError, err:
2633       logging.exception("Can't update bdev cache for %s", dev_path)