c865b0fc1317654bafb0791ae1ccf21aa6823b9d
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36 import zlib
37 import base64
38
39 from ganeti import errors
40 from ganeti import utils
41 from ganeti import ssh
42 from ganeti import hypervisor
43 from ganeti import constants
44 from ganeti import bdev
45 from ganeti import objects
46 from ganeti import ssconf
47
48
49 def _GetConfig():
50   """Simple wrapper to return a SimpleStore.
51
52   @rtype: L{ssconf.SimpleStore}
53   @return: a SimpleStore instance
54
55   """
56   return ssconf.SimpleStore()
57
58
59 def _GetSshRunner(cluster_name):
60   """Simple wrapper to return an SshRunner.
61
62   @type cluster_name: str
63   @param cluster_name: the cluster name, which is needed
64       by the SshRunner constructor
65   @rtype: L{ssh.SshRunner}
66   @return: an SshRunner instance
67
68   """
69   return ssh.SshRunner(cluster_name)
70
71
72 def _Decompress(data):
73   """Unpacks data compressed by the RPC client.
74
75   @type data: list or tuple
76   @param data: Data sent by RPC client
77   @rtype: str
78   @return: Decompressed data
79
80   """
81   assert isinstance(data, (list, tuple))
82   assert len(data) == 2
83   (encoding, content) = data
84   if encoding == constants.RPC_ENCODING_NONE:
85     return content
86   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
87     return zlib.decompress(base64.b64decode(content))
88   else:
89     raise AssertionError("Unknown data encoding")
90
91
92 def _CleanDirectory(path, exclude=None):
93   """Removes all regular files in a directory.
94
95   @type path: str
96   @param path: the directory to clean
97   @type exclude: list
98   @param exclude: list of files to be excluded, defaults
99       to the empty list
100
101   """
102   if not os.path.isdir(path):
103     return
104   if exclude is None:
105     exclude = []
106   else:
107     # Normalize excluded paths
108     exclude = [os.path.normpath(i) for i in exclude]
109
110   for rel_name in utils.ListVisibleFiles(path):
111     full_name = os.path.normpath(os.path.join(path, rel_name))
112     if full_name in exclude:
113       continue
114     if os.path.isfile(full_name) and not os.path.islink(full_name):
115       utils.RemoveFile(full_name)
116
117
118 def JobQueuePurge():
119   """Removes job queue files and archived jobs.
120
121   @rtype: None
122
123   """
124   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
125   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
126
127
128 def GetMasterInfo():
129   """Returns master information.
130
131   This is an utility function to compute master information, either
132   for consumption here or from the node daemon.
133
134   @rtype: tuple
135   @return: (master_netdev, master_ip, master_name) if we have a good
136       configuration, otherwise (None, None, None)
137
138   """
139   try:
140     cfg = _GetConfig()
141     master_netdev = cfg.GetMasterNetdev()
142     master_ip = cfg.GetMasterIP()
143     master_node = cfg.GetMasterNode()
144   except errors.ConfigurationError, err:
145     logging.exception("Cluster configuration incomplete")
146     return (None, None, None)
147   return (master_netdev, master_ip, master_node)
148
149
150 def StartMaster(start_daemons, no_voting):
151   """Activate local node as master node.
152
153   The function will always try activate the IP address of the master
154   (unless someone else has it). It will also start the master daemons,
155   based on the start_daemons parameter.
156
157   @type start_daemons: boolean
158   @param start_daemons: whther to also start the master
159       daemons (ganeti-masterd and ganeti-rapi)
160   @type no_voting: boolean
161   @param no_voting: whether to start ganeti-masterd without a node vote
162       (if start_daemons is True), but still non-interactively
163   @rtype: None
164
165   """
166   ok = True
167   master_netdev, master_ip, _ = GetMasterInfo()
168   if not master_netdev:
169     return False
170
171   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
172     if utils.OwnIpAddress(master_ip):
173       # we already have the ip:
174       logging.debug("Already started")
175     else:
176       logging.error("Someone else has the master ip, not activating")
177       ok = False
178   else:
179     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
180                            "dev", master_netdev, "label",
181                            "%s:0" % master_netdev])
182     if result.failed:
183       logging.error("Can't activate master IP: %s", result.output)
184       ok = False
185
186     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
187                            "-s", master_ip, master_ip])
188     # we'll ignore the exit code of arping
189
190   # and now start the master and rapi daemons
191   if start_daemons:
192     daemons_params = {
193         'ganeti-masterd': [],
194         'ganeti-rapi': [],
195         }
196     if no_voting:
197       daemons_params['ganeti-masterd'].append('--no-voting')
198       daemons_params['ganeti-masterd'].append('--yes-do-it')
199     for daemon in daemons_params:
200       cmd = [daemon]
201       cmd.extend(daemons_params[daemon])
202       result = utils.RunCmd(cmd)
203       if result.failed:
204         logging.error("Can't start daemon %s: %s", daemon, result.output)
205         ok = False
206   return ok
207
208
209 def StopMaster(stop_daemons):
210   """Deactivate this node as master.
211
212   The function will always try to deactivate the IP address of the
213   master. It will also stop the master daemons depending on the
214   stop_daemons parameter.
215
216   @type stop_daemons: boolean
217   @param stop_daemons: whether to also stop the master daemons
218       (ganeti-masterd and ganeti-rapi)
219   @rtype: None
220
221   """
222   master_netdev, master_ip, _ = GetMasterInfo()
223   if not master_netdev:
224     return False
225
226   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
227                          "dev", master_netdev])
228   if result.failed:
229     logging.error("Can't remove the master IP, error: %s", result.output)
230     # but otherwise ignore the failure
231
232   if stop_daemons:
233     # stop/kill the rapi and the master daemon
234     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
235       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
236
237   return True
238
239
240 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
241   """Joins this node to the cluster.
242
243   This does the following:
244       - updates the hostkeys of the machine (rsa and dsa)
245       - adds the ssh private key to the user
246       - adds the ssh public key to the users' authorized_keys file
247
248   @type dsa: str
249   @param dsa: the DSA private key to write
250   @type dsapub: str
251   @param dsapub: the DSA public key to write
252   @type rsa: str
253   @param rsa: the RSA private key to write
254   @type rsapub: str
255   @param rsapub: the RSA public key to write
256   @type sshkey: str
257   @param sshkey: the SSH private key to write
258   @type sshpub: str
259   @param sshpub: the SSH public key to write
260   @rtype: boolean
261   @return: the success of the operation
262
263   """
264   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
265                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
266                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
267                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
268   for name, content, mode in sshd_keys:
269     utils.WriteFile(name, data=content, mode=mode)
270
271   try:
272     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
273                                                     mkdir=True)
274   except errors.OpExecError, err:
275     msg = "Error while processing user ssh files"
276     logging.exception(msg)
277     return (False, "%s: %s" % (msg, err))
278
279   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
280     utils.WriteFile(name, data=content, mode=0600)
281
282   utils.AddAuthorizedKey(auth_keys, sshpub)
283
284   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
285
286   return (True, "Node added successfully")
287
288
289 def LeaveCluster():
290   """Cleans up and remove the current node.
291
292   This function cleans up and prepares the current node to be removed
293   from the cluster.
294
295   If processing is successful, then it raises an
296   L{errors.QuitGanetiException} which is used as a special case to
297   shutdown the node daemon.
298
299   """
300   _CleanDirectory(constants.DATA_DIR)
301   JobQueuePurge()
302
303   try:
304     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
305   except errors.OpExecError:
306     logging.exception("Error while processing ssh files")
307     return
308
309   f = open(pub_key, 'r')
310   try:
311     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
312   finally:
313     f.close()
314
315   utils.RemoveFile(priv_key)
316   utils.RemoveFile(pub_key)
317
318   # Return a reassuring string to the caller, and quit
319   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
320
321
322 def GetNodeInfo(vgname, hypervisor_type):
323   """Gives back a hash with different informations about the node.
324
325   @type vgname: C{string}
326   @param vgname: the name of the volume group to ask for disk space information
327   @type hypervisor_type: C{str}
328   @param hypervisor_type: the name of the hypervisor to ask for
329       memory information
330   @rtype: C{dict}
331   @return: dictionary with the following keys:
332       - vg_size is the size of the configured volume group in MiB
333       - vg_free is the free size of the volume group in MiB
334       - memory_dom0 is the memory allocated for domain0 in MiB
335       - memory_free is the currently available (free) ram in MiB
336       - memory_total is the total number of ram in MiB
337
338   """
339   outputarray = {}
340   vginfo = _GetVGInfo(vgname)
341   outputarray['vg_size'] = vginfo['vg_size']
342   outputarray['vg_free'] = vginfo['vg_free']
343
344   hyper = hypervisor.GetHypervisor(hypervisor_type)
345   hyp_info = hyper.GetNodeInfo()
346   if hyp_info is not None:
347     outputarray.update(hyp_info)
348
349   f = open("/proc/sys/kernel/random/boot_id", 'r')
350   try:
351     outputarray["bootid"] = f.read(128).rstrip("\n")
352   finally:
353     f.close()
354
355   return outputarray
356
357
358 def VerifyNode(what, cluster_name):
359   """Verify the status of the local node.
360
361   Based on the input L{what} parameter, various checks are done on the
362   local node.
363
364   If the I{filelist} key is present, this list of
365   files is checksummed and the file/checksum pairs are returned.
366
367   If the I{nodelist} key is present, we check that we have
368   connectivity via ssh with the target nodes (and check the hostname
369   report).
370
371   If the I{node-net-test} key is present, we check that we have
372   connectivity to the given nodes via both primary IP and, if
373   applicable, secondary IPs.
374
375   @type what: C{dict}
376   @param what: a dictionary of things to check:
377       - filelist: list of files for which to compute checksums
378       - nodelist: list of nodes we should check ssh communication with
379       - node-net-test: list of nodes we should check node daemon port
380         connectivity with
381       - hypervisor: list with hypervisors to run the verify for
382   @rtype: dict
383   @return: a dictionary with the same keys as the input dict, and
384       values representing the result of the checks
385
386   """
387   result = {}
388
389   if constants.NV_HYPERVISOR in what:
390     result[constants.NV_HYPERVISOR] = tmp = {}
391     for hv_name in what[constants.NV_HYPERVISOR]:
392       tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
393
394   if constants.NV_FILELIST in what:
395     result[constants.NV_FILELIST] = utils.FingerprintFiles(
396       what[constants.NV_FILELIST])
397
398   if constants.NV_NODELIST in what:
399     result[constants.NV_NODELIST] = tmp = {}
400     random.shuffle(what[constants.NV_NODELIST])
401     for node in what[constants.NV_NODELIST]:
402       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
403       if not success:
404         tmp[node] = message
405
406   if constants.NV_NODENETTEST in what:
407     result[constants.NV_NODENETTEST] = tmp = {}
408     my_name = utils.HostInfo().name
409     my_pip = my_sip = None
410     for name, pip, sip in what[constants.NV_NODENETTEST]:
411       if name == my_name:
412         my_pip = pip
413         my_sip = sip
414         break
415     if not my_pip:
416       tmp[my_name] = ("Can't find my own primary/secondary IP"
417                       " in the node list")
418     else:
419       port = utils.GetNodeDaemonPort()
420       for name, pip, sip in what[constants.NV_NODENETTEST]:
421         fail = []
422         if not utils.TcpPing(pip, port, source=my_pip):
423           fail.append("primary")
424         if sip != pip:
425           if not utils.TcpPing(sip, port, source=my_sip):
426             fail.append("secondary")
427         if fail:
428           tmp[name] = ("failure using the %s interface(s)" %
429                        " and ".join(fail))
430
431   if constants.NV_LVLIST in what:
432     result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
433
434   if constants.NV_INSTANCELIST in what:
435     result[constants.NV_INSTANCELIST] = GetInstanceList(
436       what[constants.NV_INSTANCELIST])
437
438   if constants.NV_VGLIST in what:
439     result[constants.NV_VGLIST] = ListVolumeGroups()
440
441   if constants.NV_VERSION in what:
442     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
443                                     constants.RELEASE_VERSION)
444
445   if constants.NV_HVINFO in what:
446     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
447     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
448
449   if constants.NV_DRBDLIST in what:
450     try:
451       used_minors = bdev.DRBD8.GetUsedDevs().keys()
452     except errors.BlockDeviceError, err:
453       logging.warning("Can't get used minors list", exc_info=True)
454       used_minors = str(err)
455     result[constants.NV_DRBDLIST] = used_minors
456
457   return result
458
459
460 def GetVolumeList(vg_name):
461   """Compute list of logical volumes and their size.
462
463   @type vg_name: str
464   @param vg_name: the volume group whose LVs we should list
465   @rtype: dict
466   @return:
467       dictionary of all partions (key) with value being a tuple of
468       their size (in MiB), inactive and online status::
469
470         {'test1': ('20.06', True, True)}
471
472       in case of errors, a string is returned with the error
473       details.
474
475   """
476   lvs = {}
477   sep = '|'
478   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
479                          "--separator=%s" % sep,
480                          "-olv_name,lv_size,lv_attr", vg_name])
481   if result.failed:
482     logging.error("Failed to list logical volumes, lvs output: %s",
483                   result.output)
484     return result.output
485
486   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
487   for line in result.stdout.splitlines():
488     line = line.strip()
489     match = valid_line_re.match(line)
490     if not match:
491       logging.error("Invalid line returned from lvs output: '%s'", line)
492       continue
493     name, size, attr = match.groups()
494     inactive = attr[4] == '-'
495     online = attr[5] == 'o'
496     lvs[name] = (size, inactive, online)
497
498   return lvs
499
500
501 def ListVolumeGroups():
502   """List the volume groups and their size.
503
504   @rtype: dict
505   @return: dictionary with keys volume name and values the
506       size of the volume
507
508   """
509   return utils.ListVolumeGroups()
510
511
512 def NodeVolumes():
513   """List all volumes on this node.
514
515   @rtype: list
516   @return:
517     A list of dictionaries, each having four keys:
518       - name: the logical volume name,
519       - size: the size of the logical volume
520       - dev: the physical device on which the LV lives
521       - vg: the volume group to which it belongs
522
523     In case of errors, we return an empty list and log the
524     error.
525
526     Note that since a logical volume can live on multiple physical
527     volumes, the resulting list might include a logical volume
528     multiple times.
529
530   """
531   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
532                          "--separator=|",
533                          "--options=lv_name,lv_size,devices,vg_name"])
534   if result.failed:
535     logging.error("Failed to list logical volumes, lvs output: %s",
536                   result.output)
537     return []
538
539   def parse_dev(dev):
540     if '(' in dev:
541       return dev.split('(')[0]
542     else:
543       return dev
544
545   def map_line(line):
546     return {
547       'name': line[0].strip(),
548       'size': line[1].strip(),
549       'dev': parse_dev(line[2].strip()),
550       'vg': line[3].strip(),
551     }
552
553   return [map_line(line.split('|')) for line in result.stdout.splitlines()
554           if line.count('|') >= 3]
555
556
557 def BridgesExist(bridges_list):
558   """Check if a list of bridges exist on the current node.
559
560   @rtype: boolean
561   @return: C{True} if all of them exist, C{False} otherwise
562
563   """
564   for bridge in bridges_list:
565     if not utils.BridgeExists(bridge):
566       return False
567
568   return True
569
570
571 def GetInstanceList(hypervisor_list):
572   """Provides a list of instances.
573
574   @type hypervisor_list: list
575   @param hypervisor_list: the list of hypervisors to query information
576
577   @rtype: list
578   @return: a list of all running instances on the current node
579     - instance1.example.com
580     - instance2.example.com
581
582   """
583   results = []
584   for hname in hypervisor_list:
585     try:
586       names = hypervisor.GetHypervisor(hname).ListInstances()
587       results.extend(names)
588     except errors.HypervisorError, err:
589       logging.exception("Error enumerating instances for hypevisor %s", hname)
590       raise
591
592   return results
593
594
595 def GetInstanceInfo(instance, hname):
596   """Gives back the informations about an instance as a dictionary.
597
598   @type instance: string
599   @param instance: the instance name
600   @type hname: string
601   @param hname: the hypervisor type of the instance
602
603   @rtype: dict
604   @return: dictionary with the following keys:
605       - memory: memory size of instance (int)
606       - state: xen state of instance (string)
607       - time: cpu time of instance (float)
608
609   """
610   output = {}
611
612   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
613   if iinfo is not None:
614     output['memory'] = iinfo[2]
615     output['state'] = iinfo[4]
616     output['time'] = iinfo[5]
617
618   return output
619
620
621 def GetInstanceMigratable(instance):
622   """Gives whether an instance can be migrated.
623
624   @type instance: L{objects.Instance}
625   @param instance: object representing the instance to be checked.
626
627   @rtype: tuple
628   @return: tuple of (result, description) where:
629       - result: whether the instance can be migrated or not
630       - description: a description of the issue, if relevant
631
632   """
633   hyper = hypervisor.GetHypervisor(instance.hypervisor)
634   if instance.name not in hyper.ListInstances():
635     return (False, 'not running')
636
637   for idx in range(len(instance.disks)):
638     link_name = _GetBlockDevSymlinkPath(instance.name, idx)
639     if not os.path.islink(link_name):
640       return (False, 'not restarted since ganeti 1.2.5')
641
642   return (True, '')
643
644
645 def GetAllInstancesInfo(hypervisor_list):
646   """Gather data about all instances.
647
648   This is the equivalent of L{GetInstanceInfo}, except that it
649   computes data for all instances at once, thus being faster if one
650   needs data about more than one instance.
651
652   @type hypervisor_list: list
653   @param hypervisor_list: list of hypervisors to query for instance data
654
655   @rtype: dict
656   @return: dictionary of instance: data, with data having the following keys:
657       - memory: memory size of instance (int)
658       - state: xen state of instance (string)
659       - time: cpu time of instance (float)
660       - vcpus: the number of vcpus
661
662   """
663   output = {}
664
665   for hname in hypervisor_list:
666     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
667     if iinfo:
668       for name, inst_id, memory, vcpus, state, times in iinfo:
669         value = {
670           'memory': memory,
671           'vcpus': vcpus,
672           'state': state,
673           'time': times,
674           }
675         if name in output:
676           # we only check static parameters, like memory and vcpus,
677           # and not state and time which can change between the
678           # invocations of the different hypervisors
679           for key in 'memory', 'vcpus':
680             if value[key] != output[name][key]:
681               raise errors.HypervisorError("Instance %s is running twice"
682                                            " with different parameters" % name)
683         output[name] = value
684
685   return output
686
687
688 def InstanceOsAdd(instance):
689   """Add an OS to an instance.
690
691   @type instance: L{objects.Instance}
692   @param instance: Instance whose OS is to be installed
693   @rtype: boolean
694   @return: the success of the operation
695
696   """
697   try:
698     inst_os = OSFromDisk(instance.os)
699   except errors.InvalidOS, err:
700     os_name, os_dir, os_err = err.args
701     if os_dir is None:
702       return (False, "Can't find OS '%s': %s" % (os_name, os_err))
703     else:
704       return (False, "Error parsing OS '%s' in directory %s: %s" %
705               (os_name, os_dir, os_err))
706
707   create_env = OSEnvironment(instance)
708
709   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
710                                      instance.name, int(time.time()))
711
712   result = utils.RunCmd([inst_os.create_script], env=create_env,
713                         cwd=inst_os.path, output=logfile,)
714   if result.failed:
715     logging.error("os create command '%s' returned error: %s, logfile: %s,"
716                   " output: %s", result.cmd, result.fail_reason, logfile,
717                   result.output)
718     lines = [utils.SafeEncode(val)
719              for val in utils.TailFile(logfile, lines=20)]
720     return (False, "OS create script failed (%s), last lines in the"
721             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
722
723   return (True, "Successfully installed")
724
725
726 def RunRenameInstance(instance, old_name):
727   """Run the OS rename script for an instance.
728
729   @type instance: L{objects.Instance}
730   @param instance: Instance whose OS is to be installed
731   @type old_name: string
732   @param old_name: previous instance name
733   @rtype: boolean
734   @return: the success of the operation
735
736   """
737   inst_os = OSFromDisk(instance.os)
738
739   rename_env = OSEnvironment(instance)
740   rename_env['OLD_INSTANCE_NAME'] = old_name
741
742   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
743                                            old_name,
744                                            instance.name, int(time.time()))
745
746   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
747                         cwd=inst_os.path, output=logfile)
748
749   if result.failed:
750     logging.error("os create command '%s' returned error: %s output: %s",
751                   result.cmd, result.fail_reason, result.output)
752     lines = [utils.SafeEncode(val)
753              for val in utils.TailFile(logfile, lines=20)]
754     return (False, "OS rename script failed (%s), last lines in the"
755             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
756
757   return (True, "Rename successful")
758
759
760 def _GetVGInfo(vg_name):
761   """Get informations about the volume group.
762
763   @type vg_name: str
764   @param vg_name: the volume group which we query
765   @rtype: dict
766   @return:
767     A dictionary with the following keys:
768       - C{vg_size} is the total size of the volume group in MiB
769       - C{vg_free} is the free size of the volume group in MiB
770       - C{pv_count} are the number of physical disks in that VG
771
772     If an error occurs during gathering of data, we return the same dict
773     with keys all set to None.
774
775   """
776   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
777
778   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
779                          "--nosuffix", "--units=m", "--separator=:", vg_name])
780
781   if retval.failed:
782     logging.error("volume group %s not present", vg_name)
783     return retdic
784   valarr = retval.stdout.strip().rstrip(':').split(':')
785   if len(valarr) == 3:
786     try:
787       retdic = {
788         "vg_size": int(round(float(valarr[0]), 0)),
789         "vg_free": int(round(float(valarr[1]), 0)),
790         "pv_count": int(valarr[2]),
791         }
792     except ValueError, err:
793       logging.exception("Fail to parse vgs output")
794   else:
795     logging.error("vgs output has the wrong number of fields (expected"
796                   " three): %s", str(valarr))
797   return retdic
798
799
800 def _GetBlockDevSymlinkPath(instance_name, idx):
801   return os.path.join(constants.DISK_LINKS_DIR,
802                       "%s:%d" % (instance_name, idx))
803
804
805 def _SymlinkBlockDev(instance_name, device_path, idx):
806   """Set up symlinks to a instance's block device.
807
808   This is an auxiliary function run when an instance is start (on the primary
809   node) or when an instance is migrated (on the target node).
810
811
812   @param instance_name: the name of the target instance
813   @param device_path: path of the physical block device, on the node
814   @param idx: the disk index
815   @return: absolute path to the disk's symlink
816
817   """
818   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
819   try:
820     os.symlink(device_path, link_name)
821   except OSError, err:
822     if err.errno == errno.EEXIST:
823       if (not os.path.islink(link_name) or
824           os.readlink(link_name) != device_path):
825         os.remove(link_name)
826         os.symlink(device_path, link_name)
827     else:
828       raise
829
830   return link_name
831
832
833 def _RemoveBlockDevLinks(instance_name, disks):
834   """Remove the block device symlinks belonging to the given instance.
835
836   """
837   for idx, disk in enumerate(disks):
838     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
839     if os.path.islink(link_name):
840       try:
841         os.remove(link_name)
842       except OSError:
843         logging.exception("Can't remove symlink '%s'", link_name)
844
845
846 def _GatherAndLinkBlockDevs(instance):
847   """Set up an instance's block device(s).
848
849   This is run on the primary node at instance startup. The block
850   devices must be already assembled.
851
852   @type instance: L{objects.Instance}
853   @param instance: the instance whose disks we shoul assemble
854   @rtype: list
855   @return: list of (disk_object, device_path)
856
857   """
858   block_devices = []
859   for idx, disk in enumerate(instance.disks):
860     device = _RecursiveFindBD(disk)
861     if device is None:
862       raise errors.BlockDeviceError("Block device '%s' is not set up." %
863                                     str(disk))
864     device.Open()
865     try:
866       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
867     except OSError, e:
868       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
869                                     e.strerror)
870
871     block_devices.append((disk, link_name))
872
873   return block_devices
874
875
876 def StartInstance(instance):
877   """Start an instance.
878
879   @type instance: L{objects.Instance}
880   @param instance: the instance object
881   @rtype: boolean
882   @return: whether the startup was successful or not
883
884   """
885   running_instances = GetInstanceList([instance.hypervisor])
886
887   if instance.name in running_instances:
888     return (True, "Already running")
889
890   try:
891     block_devices = _GatherAndLinkBlockDevs(instance)
892     hyper = hypervisor.GetHypervisor(instance.hypervisor)
893     hyper.StartInstance(instance, block_devices)
894   except errors.BlockDeviceError, err:
895     logging.exception("Failed to start instance")
896     return (False, "Block device error: %s" % str(err))
897   except errors.HypervisorError, err:
898     logging.exception("Failed to start instance")
899     _RemoveBlockDevLinks(instance.name, instance.disks)
900     return (False, "Hypervisor error: %s" % str(err))
901
902   return (True, "Instance started successfully")
903
904
905 def InstanceShutdown(instance):
906   """Shut an instance down.
907
908   @note: this functions uses polling with a hardcoded timeout.
909
910   @type instance: L{objects.Instance}
911   @param instance: the instance object
912   @rtype: boolean
913   @return: whether the startup was successful or not
914
915   """
916   hv_name = instance.hypervisor
917   running_instances = GetInstanceList([hv_name])
918
919   if instance.name not in running_instances:
920     return (True, "Instance already stopped")
921
922   hyper = hypervisor.GetHypervisor(hv_name)
923   try:
924     hyper.StopInstance(instance)
925   except errors.HypervisorError, err:
926     msg = "Failed to stop instance %s: %s" % (instance.name, err)
927     logging.error(msg)
928     return (False, msg)
929
930   # test every 10secs for 2min
931
932   time.sleep(1)
933   for dummy in range(11):
934     if instance.name not in GetInstanceList([hv_name]):
935       break
936     time.sleep(10)
937   else:
938     # the shutdown did not succeed
939     logging.error("Shutdown of '%s' unsuccessful, using destroy",
940                   instance.name)
941
942     try:
943       hyper.StopInstance(instance, force=True)
944     except errors.HypervisorError, err:
945       msg = "Failed to force stop instance %s: %s" % (instance.name, err)
946       logging.error(msg)
947       return (False, msg)
948
949     time.sleep(1)
950     if instance.name in GetInstanceList([hv_name]):
951       msg = ("Could not shutdown instance %s even by destroy" %
952              instance.name)
953       logging.error(msg)
954       return (False, msg)
955
956   _RemoveBlockDevLinks(instance.name, instance.disks)
957
958   return (True, "Instance has been shutdown successfully")
959
960
961 def InstanceReboot(instance, reboot_type):
962   """Reboot an instance.
963
964   @type instance: L{objects.Instance}
965   @param instance: the instance object to reboot
966   @type reboot_type: str
967   @param reboot_type: the type of reboot, one the following
968     constants:
969       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
970         instance OS, do not recreate the VM
971       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
972         restart the VM (at the hypervisor level)
973       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
974         not accepted here, since that mode is handled differently, in
975         cmdlib, and translates into full stop and start of the
976         instance (instead of a call_instance_reboot RPC)
977   @rtype: boolean
978   @return: the success of the operation
979
980   """
981   running_instances = GetInstanceList([instance.hypervisor])
982
983   if instance.name not in running_instances:
984     msg = "Cannot reboot instance %s that is not running" % instance.name
985     logging.error(msg)
986     return (False, msg)
987
988   hyper = hypervisor.GetHypervisor(instance.hypervisor)
989   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
990     try:
991       hyper.RebootInstance(instance)
992     except errors.HypervisorError, err:
993       msg = "Failed to soft reboot instance %s: %s" % (instance.name, err)
994       logging.error(msg)
995       return (False, msg)
996   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
997     try:
998       stop_result = InstanceShutdown(instance)
999       if not stop_result[0]:
1000         return stop_result
1001       return StartInstance(instance)
1002     except errors.HypervisorError, err:
1003       msg = "Failed to hard reboot instance %s: %s" % (instance.name, err)
1004       logging.error(msg)
1005       return (False, msg)
1006   else:
1007     return (False, "Invalid reboot_type received: %s" % (reboot_type,))
1008
1009   return (True, "Reboot successful")
1010
1011
1012 def MigrationInfo(instance):
1013   """Gather information about an instance to be migrated.
1014
1015   @type instance: L{objects.Instance}
1016   @param instance: the instance definition
1017
1018   """
1019   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1020   try:
1021     info = hyper.MigrationInfo(instance)
1022   except errors.HypervisorError, err:
1023     msg = "Failed to fetch migration information"
1024     logging.exception(msg)
1025     return (False, '%s: %s' % (msg, err))
1026   return (True, info)
1027
1028
1029 def AcceptInstance(instance, info, target):
1030   """Prepare the node to accept an instance.
1031
1032   @type instance: L{objects.Instance}
1033   @param instance: the instance definition
1034   @type info: string/data (opaque)
1035   @param info: migration information, from the source node
1036   @type target: string
1037   @param target: target host (usually ip), on this node
1038
1039   """
1040   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1041   try:
1042     hyper.AcceptInstance(instance, info, target)
1043   except errors.HypervisorError, err:
1044     msg = "Failed to accept instance"
1045     logging.exception(msg)
1046     return (False, '%s: %s' % (msg, err))
1047   return (True, "Accept successfull")
1048
1049
1050 def FinalizeMigration(instance, info, success):
1051   """Finalize any preparation to accept an instance.
1052
1053   @type instance: L{objects.Instance}
1054   @param instance: the instance definition
1055   @type info: string/data (opaque)
1056   @param info: migration information, from the source node
1057   @type success: boolean
1058   @param success: whether the migration was a success or a failure
1059
1060   """
1061   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1062   try:
1063     hyper.FinalizeMigration(instance, info, success)
1064   except errors.HypervisorError, err:
1065     msg = "Failed to finalize migration"
1066     logging.exception(msg)
1067     return (False, '%s: %s' % (msg, err))
1068   return (True, "Migration Finalized")
1069
1070
1071 def MigrateInstance(instance, target, live):
1072   """Migrates an instance to another node.
1073
1074   @type instance: L{objects.Instance}
1075   @param instance: the instance definition
1076   @type target: string
1077   @param target: the target node name
1078   @type live: boolean
1079   @param live: whether the migration should be done live or not (the
1080       interpretation of this parameter is left to the hypervisor)
1081   @rtype: tuple
1082   @return: a tuple of (success, msg) where:
1083       - succes is a boolean denoting the success/failure of the operation
1084       - msg is a string with details in case of failure
1085
1086   """
1087   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1088
1089   try:
1090     hyper.MigrateInstance(instance.name, target, live)
1091   except errors.HypervisorError, err:
1092     msg = "Failed to migrate instance"
1093     logging.exception(msg)
1094     return (False, "%s: %s" % (msg, err))
1095   return (True, "Migration successfull")
1096
1097
1098 def BlockdevCreate(disk, size, owner, on_primary, info):
1099   """Creates a block device for an instance.
1100
1101   @type disk: L{objects.Disk}
1102   @param disk: the object describing the disk we should create
1103   @type size: int
1104   @param size: the size of the physical underlying device, in MiB
1105   @type owner: str
1106   @param owner: the name of the instance for which disk is created,
1107       used for device cache data
1108   @type on_primary: boolean
1109   @param on_primary:  indicates if it is the primary node or not
1110   @type info: string
1111   @param info: string that will be sent to the physical device
1112       creation, used for example to set (LVM) tags on LVs
1113
1114   @return: the new unique_id of the device (this can sometime be
1115       computed only after creation), or None. On secondary nodes,
1116       it's not required to return anything.
1117
1118   """
1119   clist = []
1120   if disk.children:
1121     for child in disk.children:
1122       try:
1123         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1124       except errors.BlockDeviceError, err:
1125         errmsg = "Can't assemble device %s: %s" % (child, err)
1126         logging.error(errmsg)
1127         return False, errmsg
1128       if on_primary or disk.AssembleOnSecondary():
1129         # we need the children open in case the device itself has to
1130         # be assembled
1131         try:
1132           crdev.Open()
1133         except errors.BlockDeviceError, err:
1134           errmsg = "Can't make child '%s' read-write: %s" % (child, err)
1135           logging.error(errmsg)
1136           return False, errmsg
1137       clist.append(crdev)
1138
1139   try:
1140     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1141   except errors.BlockDeviceError, err:
1142     return False, "Can't create block device: %s" % str(err)
1143
1144   if on_primary or disk.AssembleOnSecondary():
1145     try:
1146       device.Assemble()
1147     except errors.BlockDeviceError, err:
1148       errmsg = ("Can't assemble device after creation, very"
1149                 " unusual event: %s" % str(err))
1150       logging.error(errmsg)
1151       return False, errmsg
1152     device.SetSyncSpeed(constants.SYNC_SPEED)
1153     if on_primary or disk.OpenOnSecondary():
1154       try:
1155         device.Open(force=True)
1156       except errors.BlockDeviceError, err:
1157         errmsg = ("Can't make device r/w after creation, very"
1158                   " unusual event: %s" % str(err))
1159         logging.error(errmsg)
1160         return False, errmsg
1161     DevCacheManager.UpdateCache(device.dev_path, owner,
1162                                 on_primary, disk.iv_name)
1163
1164   device.SetInfo(info)
1165
1166   physical_id = device.unique_id
1167   return True, physical_id
1168
1169
1170 def BlockdevRemove(disk):
1171   """Remove a block device.
1172
1173   @note: This is intended to be called recursively.
1174
1175   @type disk: L{objects.Disk}
1176   @param disk: the disk object we should remove
1177   @rtype: boolean
1178   @return: the success of the operation
1179
1180   """
1181   msgs = []
1182   result = True
1183   try:
1184     rdev = _RecursiveFindBD(disk)
1185   except errors.BlockDeviceError, err:
1186     # probably can't attach
1187     logging.info("Can't attach to device %s in remove", disk)
1188     rdev = None
1189   if rdev is not None:
1190     r_path = rdev.dev_path
1191     try:
1192       rdev.Remove()
1193     except errors.BlockDeviceError, err:
1194       msgs.append(str(err))
1195       result = False
1196     if result:
1197       DevCacheManager.RemoveCache(r_path)
1198
1199   if disk.children:
1200     for child in disk.children:
1201       c_status, c_msg = BlockdevRemove(child)
1202       result = result and c_status
1203       if c_msg: # not an empty message
1204         msgs.append(c_msg)
1205
1206   return (result, "; ".join(msgs))
1207
1208
1209 def _RecursiveAssembleBD(disk, owner, as_primary):
1210   """Activate a block device for an instance.
1211
1212   This is run on the primary and secondary nodes for an instance.
1213
1214   @note: this function is called recursively.
1215
1216   @type disk: L{objects.Disk}
1217   @param disk: the disk we try to assemble
1218   @type owner: str
1219   @param owner: the name of the instance which owns the disk
1220   @type as_primary: boolean
1221   @param as_primary: if we should make the block device
1222       read/write
1223
1224   @return: the assembled device or None (in case no device
1225       was assembled)
1226   @raise errors.BlockDeviceError: in case there is an error
1227       during the activation of the children or the device
1228       itself
1229
1230   """
1231   children = []
1232   if disk.children:
1233     mcn = disk.ChildrenNeeded()
1234     if mcn == -1:
1235       mcn = 0 # max number of Nones allowed
1236     else:
1237       mcn = len(disk.children) - mcn # max number of Nones
1238     for chld_disk in disk.children:
1239       try:
1240         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1241       except errors.BlockDeviceError, err:
1242         if children.count(None) >= mcn:
1243           raise
1244         cdev = None
1245         logging.error("Error in child activation (but continuing): %s",
1246                       str(err))
1247       children.append(cdev)
1248
1249   if as_primary or disk.AssembleOnSecondary():
1250     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1251     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1252     result = r_dev
1253     if as_primary or disk.OpenOnSecondary():
1254       r_dev.Open()
1255     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1256                                 as_primary, disk.iv_name)
1257
1258   else:
1259     result = True
1260   return result
1261
1262
1263 def BlockdevAssemble(disk, owner, as_primary):
1264   """Activate a block device for an instance.
1265
1266   This is a wrapper over _RecursiveAssembleBD.
1267
1268   @rtype: str or boolean
1269   @return: a C{/dev/...} path for primary nodes, and
1270       C{True} for secondary nodes
1271
1272   """
1273   status = True
1274   result = "no error information"
1275   try:
1276     result = _RecursiveAssembleBD(disk, owner, as_primary)
1277     if isinstance(result, bdev.BlockDev):
1278       result = result.dev_path
1279   except errors.BlockDeviceError, err:
1280     result = "Error while assembling disk: %s" % str(err)
1281     status = False
1282   return (status, result)
1283
1284
1285 def BlockdevShutdown(disk):
1286   """Shut down a block device.
1287
1288   First, if the device is assembled (Attach() is successfull), then
1289   the device is shutdown. Then the children of the device are
1290   shutdown.
1291
1292   This function is called recursively. Note that we don't cache the
1293   children or such, as oppossed to assemble, shutdown of different
1294   devices doesn't require that the upper device was active.
1295
1296   @type disk: L{objects.Disk}
1297   @param disk: the description of the disk we should
1298       shutdown
1299   @rtype: boolean
1300   @return: the success of the operation
1301
1302   """
1303   msgs = []
1304   result = True
1305   r_dev = _RecursiveFindBD(disk)
1306   if r_dev is not None:
1307     r_path = r_dev.dev_path
1308     try:
1309       r_dev.Shutdown()
1310       DevCacheManager.RemoveCache(r_path)
1311     except errors.BlockDeviceError, err:
1312       msgs.append(str(err))
1313       result = False
1314
1315   if disk.children:
1316     for child in disk.children:
1317       c_status, c_msg = BlockdevShutdown(child)
1318       result = result and c_status
1319       if c_msg: # not an empty message
1320         msgs.append(c_msg)
1321
1322   return (result, "; ".join(msgs))
1323
1324
1325 def BlockdevAddchildren(parent_cdev, new_cdevs):
1326   """Extend a mirrored block device.
1327
1328   @type parent_cdev: L{objects.Disk}
1329   @param parent_cdev: the disk to which we should add children
1330   @type new_cdevs: list of L{objects.Disk}
1331   @param new_cdevs: the list of children which we should add
1332   @rtype: boolean
1333   @return: the success of the operation
1334
1335   """
1336   parent_bdev = _RecursiveFindBD(parent_cdev)
1337   if parent_bdev is None:
1338     logging.error("Can't find parent device")
1339     return False
1340   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1341   if new_bdevs.count(None) > 0:
1342     logging.error("Can't find new device(s) to add: %s:%s",
1343                   new_bdevs, new_cdevs)
1344     return False
1345   parent_bdev.AddChildren(new_bdevs)
1346   return True
1347
1348
1349 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1350   """Shrink a mirrored block device.
1351
1352   @type parent_cdev: L{objects.Disk}
1353   @param parent_cdev: the disk from which we should remove children
1354   @type new_cdevs: list of L{objects.Disk}
1355   @param new_cdevs: the list of children which we should remove
1356   @rtype: boolean
1357   @return: the success of the operation
1358
1359   """
1360   parent_bdev = _RecursiveFindBD(parent_cdev)
1361   if parent_bdev is None:
1362     logging.error("Can't find parent in remove children: %s", parent_cdev)
1363     return False
1364   devs = []
1365   for disk in new_cdevs:
1366     rpath = disk.StaticDevPath()
1367     if rpath is None:
1368       bd = _RecursiveFindBD(disk)
1369       if bd is None:
1370         logging.error("Can't find dynamic device %s while removing children",
1371                       disk)
1372         return False
1373       else:
1374         devs.append(bd.dev_path)
1375     else:
1376       devs.append(rpath)
1377   parent_bdev.RemoveChildren(devs)
1378   return True
1379
1380
1381 def BlockdevGetmirrorstatus(disks):
1382   """Get the mirroring status of a list of devices.
1383
1384   @type disks: list of L{objects.Disk}
1385   @param disks: the list of disks which we should query
1386   @rtype: disk
1387   @return:
1388       a list of (mirror_done, estimated_time) tuples, which
1389       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1390   @raise errors.BlockDeviceError: if any of the disks cannot be
1391       found
1392
1393   """
1394   stats = []
1395   for dsk in disks:
1396     rbd = _RecursiveFindBD(dsk)
1397     if rbd is None:
1398       raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1399     stats.append(rbd.CombinedSyncStatus())
1400   return stats
1401
1402
1403 def _RecursiveFindBD(disk):
1404   """Check if a device is activated.
1405
1406   If so, return informations about the real device.
1407
1408   @type disk: L{objects.Disk}
1409   @param disk: the disk object we need to find
1410
1411   @return: None if the device can't be found,
1412       otherwise the device instance
1413
1414   """
1415   children = []
1416   if disk.children:
1417     for chdisk in disk.children:
1418       children.append(_RecursiveFindBD(chdisk))
1419
1420   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1421
1422
1423 def BlockdevFind(disk):
1424   """Check if a device is activated.
1425
1426   If it is, return informations about the real device.
1427
1428   @type disk: L{objects.Disk}
1429   @param disk: the disk to find
1430   @rtype: None or tuple
1431   @return: None if the disk cannot be found, otherwise a
1432       tuple (device_path, major, minor, sync_percent,
1433       estimated_time, is_degraded)
1434
1435   """
1436   try:
1437     rbd = _RecursiveFindBD(disk)
1438   except errors.BlockDeviceError, err:
1439     return (False, str(err))
1440   if rbd is None:
1441     return (True, None)
1442   return (True, (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus())
1443
1444
1445 def BlockdevGetsize(disks):
1446   """Computes the size of the given disks.
1447
1448   If a disk is not found, returns None instead.
1449
1450   @type disks: list of L{objects.Disk}
1451   @param disks: the list of disk to compute the size for
1452   @rtype: list
1453   @return: list with elements None if the disk cannot be found,
1454       otherwise the size
1455
1456   """
1457   result = []
1458   for cf in disks:
1459     try:
1460       rbd = _RecursiveFindBD(cf)
1461     except errors.BlockDeviceError, err:
1462       result.append(None)
1463       continue
1464     if rbd is None:
1465       result.append(None)
1466     else:
1467       result.append(rbd.GetActualSize())
1468   return result
1469
1470
1471 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1472   """Write a file to the filesystem.
1473
1474   This allows the master to overwrite(!) a file. It will only perform
1475   the operation if the file belongs to a list of configuration files.
1476
1477   @type file_name: str
1478   @param file_name: the target file name
1479   @type data: str
1480   @param data: the new contents of the file
1481   @type mode: int
1482   @param mode: the mode to give the file (can be None)
1483   @type uid: int
1484   @param uid: the owner of the file (can be -1 for default)
1485   @type gid: int
1486   @param gid: the group of the file (can be -1 for default)
1487   @type atime: float
1488   @param atime: the atime to set on the file (can be None)
1489   @type mtime: float
1490   @param mtime: the mtime to set on the file (can be None)
1491   @rtype: boolean
1492   @return: the success of the operation; errors are logged
1493       in the node daemon log
1494
1495   """
1496   if not os.path.isabs(file_name):
1497     logging.error("Filename passed to UploadFile is not absolute: '%s'",
1498                   file_name)
1499     return False
1500
1501   allowed_files = [
1502     constants.CLUSTER_CONF_FILE,
1503     constants.ETC_HOSTS,
1504     constants.SSH_KNOWN_HOSTS_FILE,
1505     constants.VNC_PASSWORD_FILE,
1506     ]
1507
1508   if file_name not in allowed_files:
1509     logging.error("Filename passed to UploadFile not in allowed"
1510                  " upload targets: '%s'", file_name)
1511     return False
1512
1513   raw_data = _Decompress(data)
1514
1515   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1516                   atime=atime, mtime=mtime)
1517   return True
1518
1519
1520 def WriteSsconfFiles(values):
1521   """Update all ssconf files.
1522
1523   Wrapper around the SimpleStore.WriteFiles.
1524
1525   """
1526   ssconf.SimpleStore().WriteFiles(values)
1527
1528
1529 def _ErrnoOrStr(err):
1530   """Format an EnvironmentError exception.
1531
1532   If the L{err} argument has an errno attribute, it will be looked up
1533   and converted into a textual C{E...} description. Otherwise the
1534   string representation of the error will be returned.
1535
1536   @type err: L{EnvironmentError}
1537   @param err: the exception to format
1538
1539   """
1540   if hasattr(err, 'errno'):
1541     detail = errno.errorcode[err.errno]
1542   else:
1543     detail = str(err)
1544   return detail
1545
1546
1547 def _OSOndiskVersion(name, os_dir):
1548   """Compute and return the API version of a given OS.
1549
1550   This function will try to read the API version of the OS given by
1551   the 'name' parameter and residing in the 'os_dir' directory.
1552
1553   @type name: str
1554   @param name: the OS name we should look for
1555   @type os_dir: str
1556   @param os_dir: the directory inwhich we should look for the OS
1557   @rtype: int or None
1558   @return:
1559       Either an integer denoting the version or None in the
1560       case when this is not a valid OS name.
1561   @raise errors.InvalidOS: if the OS cannot be found
1562
1563   """
1564   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1565
1566   try:
1567     st = os.stat(api_file)
1568   except EnvironmentError, err:
1569     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1570                            " found (%s)" % _ErrnoOrStr(err))
1571
1572   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1573     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1574                            " a regular file")
1575
1576   try:
1577     f = open(api_file)
1578     try:
1579       api_versions = f.readlines()
1580     finally:
1581       f.close()
1582   except EnvironmentError, err:
1583     raise errors.InvalidOS(name, os_dir, "error while reading the"
1584                            " API version (%s)" % _ErrnoOrStr(err))
1585
1586   api_versions = [version.strip() for version in api_versions]
1587   try:
1588     api_versions = [int(version) for version in api_versions]
1589   except (TypeError, ValueError), err:
1590     raise errors.InvalidOS(name, os_dir,
1591                            "API version is not integer (%s)" % str(err))
1592
1593   return api_versions
1594
1595
1596 def DiagnoseOS(top_dirs=None):
1597   """Compute the validity for all OSes.
1598
1599   @type top_dirs: list
1600   @param top_dirs: the list of directories in which to
1601       search (if not given defaults to
1602       L{constants.OS_SEARCH_PATH})
1603   @rtype: list of L{objects.OS}
1604   @return: an OS object for each name in all the given
1605       directories
1606
1607   """
1608   if top_dirs is None:
1609     top_dirs = constants.OS_SEARCH_PATH
1610
1611   result = []
1612   for dir_name in top_dirs:
1613     if os.path.isdir(dir_name):
1614       try:
1615         f_names = utils.ListVisibleFiles(dir_name)
1616       except EnvironmentError, err:
1617         logging.exception("Can't list the OS directory %s", dir_name)
1618         break
1619       for name in f_names:
1620         try:
1621           os_inst = OSFromDisk(name, base_dir=dir_name)
1622           result.append(os_inst)
1623         except errors.InvalidOS, err:
1624           result.append(objects.OS.FromInvalidOS(err))
1625
1626   return result
1627
1628
1629 def OSFromDisk(name, base_dir=None):
1630   """Create an OS instance from disk.
1631
1632   This function will return an OS instance if the given name is a
1633   valid OS name. Otherwise, it will raise an appropriate
1634   L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1635
1636   @type base_dir: string
1637   @keyword base_dir: Base directory containing OS installations.
1638                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1639   @rtype: L{objects.OS}
1640   @return: the OS instance if we find a valid one
1641   @raise errors.InvalidOS: if we don't find a valid OS
1642
1643   """
1644   if base_dir is None:
1645     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1646     if os_dir is None:
1647       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1648   else:
1649     os_dir = os.path.sep.join([base_dir, name])
1650
1651   api_versions = _OSOndiskVersion(name, os_dir)
1652
1653   if constants.OS_API_VERSION not in api_versions:
1654     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1655                            " (found %s want %s)"
1656                            % (api_versions, constants.OS_API_VERSION))
1657
1658   # OS Scripts dictionary, we will populate it with the actual script names
1659   os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1660
1661   for script in os_scripts:
1662     os_scripts[script] = os.path.sep.join([os_dir, script])
1663
1664     try:
1665       st = os.stat(os_scripts[script])
1666     except EnvironmentError, err:
1667       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1668                              (script, _ErrnoOrStr(err)))
1669
1670     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1671       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1672                              script)
1673
1674     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1675       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1676                              script)
1677
1678
1679   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1680                     create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1681                     export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1682                     import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1683                     rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1684                     api_versions=api_versions)
1685
1686 def OSEnvironment(instance, debug=0):
1687   """Calculate the environment for an os script.
1688
1689   @type instance: L{objects.Instance}
1690   @param instance: target instance for the os script run
1691   @type debug: integer
1692   @param debug: debug level (0 or 1, for OS Api 10)
1693   @rtype: dict
1694   @return: dict of environment variables
1695   @raise errors.BlockDeviceError: if the block device
1696       cannot be found
1697
1698   """
1699   result = {}
1700   result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1701   result['INSTANCE_NAME'] = instance.name
1702   result['INSTANCE_OS'] = instance.os
1703   result['HYPERVISOR'] = instance.hypervisor
1704   result['DISK_COUNT'] = '%d' % len(instance.disks)
1705   result['NIC_COUNT'] = '%d' % len(instance.nics)
1706   result['DEBUG_LEVEL'] = '%d' % debug
1707   for idx, disk in enumerate(instance.disks):
1708     real_disk = _RecursiveFindBD(disk)
1709     if real_disk is None:
1710       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1711                                     str(disk))
1712     real_disk.Open()
1713     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1714     result['DISK_%d_ACCESS' % idx] = disk.mode
1715     if constants.HV_DISK_TYPE in instance.hvparams:
1716       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1717         instance.hvparams[constants.HV_DISK_TYPE]
1718     if disk.dev_type in constants.LDS_BLOCK:
1719       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1720     elif disk.dev_type == constants.LD_FILE:
1721       result['DISK_%d_BACKEND_TYPE' % idx] = \
1722         'file:%s' % disk.physical_id[0]
1723   for idx, nic in enumerate(instance.nics):
1724     result['NIC_%d_MAC' % idx] = nic.mac
1725     if nic.ip:
1726       result['NIC_%d_IP' % idx] = nic.ip
1727     result['NIC_%d_BRIDGE' % idx] = nic.bridge
1728     if constants.HV_NIC_TYPE in instance.hvparams:
1729       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1730         instance.hvparams[constants.HV_NIC_TYPE]
1731
1732   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1733     for key, value in source.items():
1734       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1735
1736   return result
1737
1738 def BlockdevGrow(disk, amount):
1739   """Grow a stack of block devices.
1740
1741   This function is called recursively, with the childrens being the
1742   first ones to resize.
1743
1744   @type disk: L{objects.Disk}
1745   @param disk: the disk to be grown
1746   @rtype: (status, result)
1747   @return: a tuple with the status of the operation
1748       (True/False), and the errors message if status
1749       is False
1750
1751   """
1752   r_dev = _RecursiveFindBD(disk)
1753   if r_dev is None:
1754     return False, "Cannot find block device %s" % (disk,)
1755
1756   try:
1757     r_dev.Grow(amount)
1758   except errors.BlockDeviceError, err:
1759     return False, str(err)
1760
1761   return True, None
1762
1763
1764 def BlockdevSnapshot(disk):
1765   """Create a snapshot copy of a block device.
1766
1767   This function is called recursively, and the snapshot is actually created
1768   just for the leaf lvm backend device.
1769
1770   @type disk: L{objects.Disk}
1771   @param disk: the disk to be snapshotted
1772   @rtype: string
1773   @return: snapshot disk path
1774
1775   """
1776   if disk.children:
1777     if len(disk.children) == 1:
1778       # only one child, let's recurse on it
1779       return BlockdevSnapshot(disk.children[0])
1780     else:
1781       # more than one child, choose one that matches
1782       for child in disk.children:
1783         if child.size == disk.size:
1784           # return implies breaking the loop
1785           return BlockdevSnapshot(child)
1786   elif disk.dev_type == constants.LD_LV:
1787     r_dev = _RecursiveFindBD(disk)
1788     if r_dev is not None:
1789       # let's stay on the safe side and ask for the full size, for now
1790       return r_dev.Snapshot(disk.size)
1791     else:
1792       return None
1793   else:
1794     raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1795                                  " '%s' of type '%s'" %
1796                                  (disk.unique_id, disk.dev_type))
1797
1798
1799 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1800   """Export a block device snapshot to a remote node.
1801
1802   @type disk: L{objects.Disk}
1803   @param disk: the description of the disk to export
1804   @type dest_node: str
1805   @param dest_node: the destination node to export to
1806   @type instance: L{objects.Instance}
1807   @param instance: the instance object to whom the disk belongs
1808   @type cluster_name: str
1809   @param cluster_name: the cluster name, needed for SSH hostalias
1810   @type idx: int
1811   @param idx: the index of the disk in the instance's disk list,
1812       used to export to the OS scripts environment
1813   @rtype: boolean
1814   @return: the success of the operation
1815
1816   """
1817   export_env = OSEnvironment(instance)
1818
1819   inst_os = OSFromDisk(instance.os)
1820   export_script = inst_os.export_script
1821
1822   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1823                                      instance.name, int(time.time()))
1824   if not os.path.exists(constants.LOG_OS_DIR):
1825     os.mkdir(constants.LOG_OS_DIR, 0750)
1826   real_disk = _RecursiveFindBD(disk)
1827   if real_disk is None:
1828     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1829                                   str(disk))
1830   real_disk.Open()
1831
1832   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1833   export_env['EXPORT_INDEX'] = str(idx)
1834
1835   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1836   destfile = disk.physical_id[1]
1837
1838   # the target command is built out of three individual commands,
1839   # which are joined by pipes; we check each individual command for
1840   # valid parameters
1841   expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1842                                export_script, logfile)
1843
1844   comprcmd = "gzip"
1845
1846   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1847                                 destdir, destdir, destfile)
1848   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1849                                                    constants.GANETI_RUNAS,
1850                                                    destcmd)
1851
1852   # all commands have been checked, so we're safe to combine them
1853   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1854
1855   result = utils.RunCmd(command, env=export_env)
1856
1857   if result.failed:
1858     logging.error("os snapshot export command '%s' returned error: %s"
1859                   " output: %s", command, result.fail_reason, result.output)
1860     return False
1861
1862   return True
1863
1864
1865 def FinalizeExport(instance, snap_disks):
1866   """Write out the export configuration information.
1867
1868   @type instance: L{objects.Instance}
1869   @param instance: the instance which we export, used for
1870       saving configuration
1871   @type snap_disks: list of L{objects.Disk}
1872   @param snap_disks: list of snapshot block devices, which
1873       will be used to get the actual name of the dump file
1874
1875   @rtype: boolean
1876   @return: the success of the operation
1877
1878   """
1879   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1880   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1881
1882   config = objects.SerializableConfigParser()
1883
1884   config.add_section(constants.INISECT_EXP)
1885   config.set(constants.INISECT_EXP, 'version', '0')
1886   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1887   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1888   config.set(constants.INISECT_EXP, 'os', instance.os)
1889   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1890
1891   config.add_section(constants.INISECT_INS)
1892   config.set(constants.INISECT_INS, 'name', instance.name)
1893   config.set(constants.INISECT_INS, 'memory', '%d' %
1894              instance.beparams[constants.BE_MEMORY])
1895   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1896              instance.beparams[constants.BE_VCPUS])
1897   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1898
1899   nic_total = 0
1900   for nic_count, nic in enumerate(instance.nics):
1901     nic_total += 1
1902     config.set(constants.INISECT_INS, 'nic%d_mac' %
1903                nic_count, '%s' % nic.mac)
1904     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1905     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1906                '%s' % nic.bridge)
1907   # TODO: redundant: on load can read nics until it doesn't exist
1908   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1909
1910   disk_total = 0
1911   for disk_count, disk in enumerate(snap_disks):
1912     if disk:
1913       disk_total += 1
1914       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1915                  ('%s' % disk.iv_name))
1916       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1917                  ('%s' % disk.physical_id[1]))
1918       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1919                  ('%d' % disk.size))
1920
1921   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1922
1923   utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1924                   data=config.Dumps())
1925   shutil.rmtree(finaldestdir, True)
1926   shutil.move(destdir, finaldestdir)
1927
1928   return True
1929
1930
1931 def ExportInfo(dest):
1932   """Get export configuration information.
1933
1934   @type dest: str
1935   @param dest: directory containing the export
1936
1937   @rtype: L{objects.SerializableConfigParser}
1938   @return: a serializable config file containing the
1939       export info
1940
1941   """
1942   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1943
1944   config = objects.SerializableConfigParser()
1945   config.read(cff)
1946
1947   if (not config.has_section(constants.INISECT_EXP) or
1948       not config.has_section(constants.INISECT_INS)):
1949     return None
1950
1951   return config
1952
1953
1954 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1955   """Import an os image into an instance.
1956
1957   @type instance: L{objects.Instance}
1958   @param instance: instance to import the disks into
1959   @type src_node: string
1960   @param src_node: source node for the disk images
1961   @type src_images: list of string
1962   @param src_images: absolute paths of the disk images
1963   @rtype: list of boolean
1964   @return: each boolean represent the success of importing the n-th disk
1965
1966   """
1967   import_env = OSEnvironment(instance)
1968   inst_os = OSFromDisk(instance.os)
1969   import_script = inst_os.import_script
1970
1971   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1972                                         instance.name, int(time.time()))
1973   if not os.path.exists(constants.LOG_OS_DIR):
1974     os.mkdir(constants.LOG_OS_DIR, 0750)
1975
1976   comprcmd = "gunzip"
1977   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1978                                import_script, logfile)
1979
1980   final_result = []
1981   for idx, image in enumerate(src_images):
1982     if image:
1983       destcmd = utils.BuildShellCmd('cat %s', image)
1984       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1985                                                        constants.GANETI_RUNAS,
1986                                                        destcmd)
1987       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1988       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1989       import_env['IMPORT_INDEX'] = str(idx)
1990       result = utils.RunCmd(command, env=import_env)
1991       if result.failed:
1992         logging.error("Disk import command '%s' returned error: %s"
1993                       " output: %s", command, result.fail_reason,
1994                       result.output)
1995         final_result.append(False)
1996       else:
1997         final_result.append(True)
1998     else:
1999       final_result.append(True)
2000
2001   return final_result
2002
2003
2004 def ListExports():
2005   """Return a list of exports currently available on this machine.
2006
2007   @rtype: list
2008   @return: list of the exports
2009
2010   """
2011   if os.path.isdir(constants.EXPORT_DIR):
2012     return utils.ListVisibleFiles(constants.EXPORT_DIR)
2013   else:
2014     return []
2015
2016
2017 def RemoveExport(export):
2018   """Remove an existing export from the node.
2019
2020   @type export: str
2021   @param export: the name of the export to remove
2022   @rtype: boolean
2023   @return: the success of the operation
2024
2025   """
2026   target = os.path.join(constants.EXPORT_DIR, export)
2027
2028   shutil.rmtree(target)
2029   # TODO: catch some of the relevant exceptions and provide a pretty
2030   # error message if rmtree fails.
2031
2032   return True
2033
2034
2035 def BlockdevRename(devlist):
2036   """Rename a list of block devices.
2037
2038   @type devlist: list of tuples
2039   @param devlist: list of tuples of the form  (disk,
2040       new_logical_id, new_physical_id); disk is an
2041       L{objects.Disk} object describing the current disk,
2042       and new logical_id/physical_id is the name we
2043       rename it to
2044   @rtype: boolean
2045   @return: True if all renames succeeded, False otherwise
2046
2047   """
2048   result = True
2049   for disk, unique_id in devlist:
2050     dev = _RecursiveFindBD(disk)
2051     if dev is None:
2052       result = False
2053       continue
2054     try:
2055       old_rpath = dev.dev_path
2056       dev.Rename(unique_id)
2057       new_rpath = dev.dev_path
2058       if old_rpath != new_rpath:
2059         DevCacheManager.RemoveCache(old_rpath)
2060         # FIXME: we should add the new cache information here, like:
2061         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2062         # but we don't have the owner here - maybe parse from existing
2063         # cache? for now, we only lose lvm data when we rename, which
2064         # is less critical than DRBD or MD
2065     except errors.BlockDeviceError, err:
2066       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2067       result = False
2068   return result
2069
2070
2071 def _TransformFileStorageDir(file_storage_dir):
2072   """Checks whether given file_storage_dir is valid.
2073
2074   Checks wheter the given file_storage_dir is within the cluster-wide
2075   default file_storage_dir stored in SimpleStore. Only paths under that
2076   directory are allowed.
2077
2078   @type file_storage_dir: str
2079   @param file_storage_dir: the path to check
2080
2081   @return: the normalized path if valid, None otherwise
2082
2083   """
2084   cfg = _GetConfig()
2085   file_storage_dir = os.path.normpath(file_storage_dir)
2086   base_file_storage_dir = cfg.GetFileStorageDir()
2087   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2088       base_file_storage_dir):
2089     logging.error("file storage directory '%s' is not under base file"
2090                   " storage directory '%s'",
2091                   file_storage_dir, base_file_storage_dir)
2092     return None
2093   return file_storage_dir
2094
2095
2096 def CreateFileStorageDir(file_storage_dir):
2097   """Create file storage directory.
2098
2099   @type file_storage_dir: str
2100   @param file_storage_dir: directory to create
2101
2102   @rtype: tuple
2103   @return: tuple with first element a boolean indicating wheter dir
2104       creation was successful or not
2105
2106   """
2107   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2108   result = True,
2109   if not file_storage_dir:
2110     result = False,
2111   else:
2112     if os.path.exists(file_storage_dir):
2113       if not os.path.isdir(file_storage_dir):
2114         logging.error("'%s' is not a directory", file_storage_dir)
2115         result = False,
2116     else:
2117       try:
2118         os.makedirs(file_storage_dir, 0750)
2119       except OSError, err:
2120         logging.error("Cannot create file storage directory '%s': %s",
2121                       file_storage_dir, err)
2122         result = False,
2123   return result
2124
2125
2126 def RemoveFileStorageDir(file_storage_dir):
2127   """Remove file storage directory.
2128
2129   Remove it only if it's empty. If not log an error and return.
2130
2131   @type file_storage_dir: str
2132   @param file_storage_dir: the directory we should cleanup
2133   @rtype: tuple (success,)
2134   @return: tuple of one element, C{success}, denoting
2135       whether the operation was successfull
2136
2137   """
2138   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2139   result = True,
2140   if not file_storage_dir:
2141     result = False,
2142   else:
2143     if os.path.exists(file_storage_dir):
2144       if not os.path.isdir(file_storage_dir):
2145         logging.error("'%s' is not a directory", file_storage_dir)
2146         result = False,
2147       # deletes dir only if empty, otherwise we want to return False
2148       try:
2149         os.rmdir(file_storage_dir)
2150       except OSError, err:
2151         logging.exception("Cannot remove file storage directory '%s'",
2152                           file_storage_dir)
2153         result = False,
2154   return result
2155
2156
2157 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2158   """Rename the file storage directory.
2159
2160   @type old_file_storage_dir: str
2161   @param old_file_storage_dir: the current path
2162   @type new_file_storage_dir: str
2163   @param new_file_storage_dir: the name we should rename to
2164   @rtype: tuple (success,)
2165   @return: tuple of one element, C{success}, denoting
2166       whether the operation was successful
2167
2168   """
2169   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2170   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2171   result = True,
2172   if not old_file_storage_dir or not new_file_storage_dir:
2173     result = False,
2174   else:
2175     if not os.path.exists(new_file_storage_dir):
2176       if os.path.isdir(old_file_storage_dir):
2177         try:
2178           os.rename(old_file_storage_dir, new_file_storage_dir)
2179         except OSError, err:
2180           logging.exception("Cannot rename '%s' to '%s'",
2181                             old_file_storage_dir, new_file_storage_dir)
2182           result =  False,
2183       else:
2184         logging.error("'%s' is not a directory", old_file_storage_dir)
2185         result = False,
2186     else:
2187       if os.path.exists(old_file_storage_dir):
2188         logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
2189                       old_file_storage_dir, new_file_storage_dir)
2190         result = False,
2191   return result
2192
2193
2194 def _IsJobQueueFile(file_name):
2195   """Checks whether the given filename is in the queue directory.
2196
2197   @type file_name: str
2198   @param file_name: the file name we should check
2199   @rtype: boolean
2200   @return: whether the file is under the queue directory
2201
2202   """
2203   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2204   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2205
2206   if not result:
2207     logging.error("'%s' is not a file in the queue directory",
2208                   file_name)
2209
2210   return result
2211
2212
2213 def JobQueueUpdate(file_name, content):
2214   """Updates a file in the queue directory.
2215
2216   This is just a wrapper over L{utils.WriteFile}, with proper
2217   checking.
2218
2219   @type file_name: str
2220   @param file_name: the job file name
2221   @type content: str
2222   @param content: the new job contents
2223   @rtype: boolean
2224   @return: the success of the operation
2225
2226   """
2227   if not _IsJobQueueFile(file_name):
2228     return False
2229
2230   # Write and replace the file atomically
2231   utils.WriteFile(file_name, data=_Decompress(content))
2232
2233   return True
2234
2235
2236 def JobQueueRename(old, new):
2237   """Renames a job queue file.
2238
2239   This is just a wrapper over os.rename with proper checking.
2240
2241   @type old: str
2242   @param old: the old (actual) file name
2243   @type new: str
2244   @param new: the desired file name
2245   @rtype: boolean
2246   @return: the success of the operation
2247
2248   """
2249   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2250     return False
2251
2252   utils.RenameFile(old, new, mkdir=True)
2253
2254   return True
2255
2256
2257 def JobQueueSetDrainFlag(drain_flag):
2258   """Set the drain flag for the queue.
2259
2260   This will set or unset the queue drain flag.
2261
2262   @type drain_flag: boolean
2263   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2264   @rtype: boolean
2265   @return: always True
2266   @warning: the function always returns True
2267
2268   """
2269   if drain_flag:
2270     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2271   else:
2272     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2273
2274   return True
2275
2276
2277 def BlockdevClose(instance_name, disks):
2278   """Closes the given block devices.
2279
2280   This means they will be switched to secondary mode (in case of
2281   DRBD).
2282
2283   @param instance_name: if the argument is not empty, the symlinks
2284       of this instance will be removed
2285   @type disks: list of L{objects.Disk}
2286   @param disks: the list of disks to be closed
2287   @rtype: tuple (success, message)
2288   @return: a tuple of success and message, where success
2289       indicates the succes of the operation, and message
2290       which will contain the error details in case we
2291       failed
2292
2293   """
2294   bdevs = []
2295   for cf in disks:
2296     rd = _RecursiveFindBD(cf)
2297     if rd is None:
2298       return (False, "Can't find device %s" % cf)
2299     bdevs.append(rd)
2300
2301   msg = []
2302   for rd in bdevs:
2303     try:
2304       rd.Close()
2305     except errors.BlockDeviceError, err:
2306       msg.append(str(err))
2307   if msg:
2308     return (False, "Can't make devices secondary: %s" % ",".join(msg))
2309   else:
2310     if instance_name:
2311       _RemoveBlockDevLinks(instance_name, disks)
2312     return (True, "All devices secondary")
2313
2314
2315 def ValidateHVParams(hvname, hvparams):
2316   """Validates the given hypervisor parameters.
2317
2318   @type hvname: string
2319   @param hvname: the hypervisor name
2320   @type hvparams: dict
2321   @param hvparams: the hypervisor parameters to be validated
2322   @rtype: tuple (success, message)
2323   @return: a tuple of success and message, where success
2324       indicates the succes of the operation, and message
2325       which will contain the error details in case we
2326       failed
2327
2328   """
2329   try:
2330     hv_type = hypervisor.GetHypervisor(hvname)
2331     hv_type.ValidateParameters(hvparams)
2332     return (True, "Validation passed")
2333   except errors.HypervisorError, err:
2334     return (False, str(err))
2335
2336
2337 def DemoteFromMC():
2338   """Demotes the current node from master candidate role.
2339
2340   """
2341   # try to ensure we're not the master by mistake
2342   master, myself = ssconf.GetMasterAndMyself()
2343   if master == myself:
2344     return (False, "ssconf status shows I'm the master node, will not demote")
2345   pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2346   if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2347     return (False, "The master daemon is running, will not demote")
2348   try:
2349     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2350       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2351   except EnvironmentError, err:
2352     if err.errno != errno.ENOENT:
2353       return (False, "Error while backing up cluster file: %s" % str(err))
2354   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2355   return (True, "Done")
2356
2357
2358 def _FindDisks(nodes_ip, disks):
2359   """Sets the physical ID on disks and returns the block devices.
2360
2361   """
2362   # set the correct physical ID
2363   my_name = utils.HostInfo().name
2364   for cf in disks:
2365     cf.SetPhysicalID(my_name, nodes_ip)
2366
2367   bdevs = []
2368
2369   for cf in disks:
2370     rd = _RecursiveFindBD(cf)
2371     if rd is None:
2372       return (False, "Can't find device %s" % cf)
2373     bdevs.append(rd)
2374   return (True, bdevs)
2375
2376
2377 def DrbdDisconnectNet(nodes_ip, disks):
2378   """Disconnects the network on a list of drbd devices.
2379
2380   """
2381   status, bdevs = _FindDisks(nodes_ip, disks)
2382   if not status:
2383     return status, bdevs
2384
2385   # disconnect disks
2386   for rd in bdevs:
2387     try:
2388       rd.DisconnectNet()
2389     except errors.BlockDeviceError, err:
2390       logging.exception("Failed to go into standalone mode")
2391       return (False, "Can't change network configuration: %s" % str(err))
2392   return (True, "All disks are now disconnected")
2393
2394
2395 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2396   """Attaches the network on a list of drbd devices.
2397
2398   """
2399   status, bdevs = _FindDisks(nodes_ip, disks)
2400   if not status:
2401     return status, bdevs
2402
2403   if multimaster:
2404     for idx, rd in enumerate(bdevs):
2405       try:
2406         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2407       except EnvironmentError, err:
2408         return (False, "Can't create symlink: %s" % str(err))
2409   # reconnect disks, switch to new master configuration and if
2410   # needed primary mode
2411   for rd in bdevs:
2412     try:
2413       rd.AttachNet(multimaster)
2414     except errors.BlockDeviceError, err:
2415       return (False, "Can't change network configuration: %s" % str(err))
2416   # wait until the disks are connected; we need to retry the re-attach
2417   # if the device becomes standalone, as this might happen if the one
2418   # node disconnects and reconnects in a different mode before the
2419   # other node reconnects; in this case, one or both of the nodes will
2420   # decide it has wrong configuration and switch to standalone
2421   RECONNECT_TIMEOUT = 2 * 60
2422   sleep_time = 0.100 # start with 100 miliseconds
2423   timeout_limit = time.time() + RECONNECT_TIMEOUT
2424   while time.time() < timeout_limit:
2425     all_connected = True
2426     for rd in bdevs:
2427       stats = rd.GetProcStatus()
2428       if not (stats.is_connected or stats.is_in_resync):
2429         all_connected = False
2430       if stats.is_standalone:
2431         # peer had different config info and this node became
2432         # standalone, even though this should not happen with the
2433         # new staged way of changing disk configs
2434         try:
2435           rd.AttachNet(multimaster)
2436         except errors.BlockDeviceError, err:
2437           return (False, "Can't change network configuration: %s" % str(err))
2438     if all_connected:
2439       break
2440     time.sleep(sleep_time)
2441     sleep_time = min(5, sleep_time * 1.5)
2442   if not all_connected:
2443     return (False, "Timeout in disk reconnecting")
2444   if multimaster:
2445     # change to primary mode
2446     for rd in bdevs:
2447       try:
2448         rd.Open()
2449       except errors.BlockDeviceError, err:
2450         return (False, "Can't change to primary mode: %s" % str(err))
2451   if multimaster:
2452     msg = "multi-master and primary"
2453   else:
2454     msg = "single-master"
2455   return (True, "Disks are now configured as %s" % msg)
2456
2457
2458 def DrbdWaitSync(nodes_ip, disks):
2459   """Wait until DRBDs have synchronized.
2460
2461   """
2462   status, bdevs = _FindDisks(nodes_ip, disks)
2463   if not status:
2464     return status, bdevs
2465
2466   min_resync = 100
2467   alldone = True
2468   failure = False
2469   for rd in bdevs:
2470     stats = rd.GetProcStatus()
2471     if not (stats.is_connected or stats.is_in_resync):
2472       failure = True
2473       break
2474     alldone = alldone and (not stats.is_in_resync)
2475     if stats.sync_percent is not None:
2476       min_resync = min(min_resync, stats.sync_percent)
2477   return (not failure, (alldone, min_resync))
2478
2479
2480 class HooksRunner(object):
2481   """Hook runner.
2482
2483   This class is instantiated on the node side (ganeti-noded) and not
2484   on the master side.
2485
2486   """
2487   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2488
2489   def __init__(self, hooks_base_dir=None):
2490     """Constructor for hooks runner.
2491
2492     @type hooks_base_dir: str or None
2493     @param hooks_base_dir: if not None, this overrides the
2494         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2495
2496     """
2497     if hooks_base_dir is None:
2498       hooks_base_dir = constants.HOOKS_BASE_DIR
2499     self._BASE_DIR = hooks_base_dir
2500
2501   @staticmethod
2502   def ExecHook(script, env):
2503     """Exec one hook script.
2504
2505     @type script: str
2506     @param script: the full path to the script
2507     @type env: dict
2508     @param env: the environment with which to exec the script
2509     @rtype: tuple (success, message)
2510     @return: a tuple of success and message, where success
2511         indicates the succes of the operation, and message
2512         which will contain the error details in case we
2513         failed
2514
2515     """
2516     # exec the process using subprocess and log the output
2517     fdstdin = None
2518     try:
2519       fdstdin = open("/dev/null", "r")
2520       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2521                                stderr=subprocess.STDOUT, close_fds=True,
2522                                shell=False, cwd="/", env=env)
2523       output = ""
2524       try:
2525         output = child.stdout.read(4096)
2526         child.stdout.close()
2527       except EnvironmentError, err:
2528         output += "Hook script error: %s" % str(err)
2529
2530       while True:
2531         try:
2532           result = child.wait()
2533           break
2534         except EnvironmentError, err:
2535           if err.errno == errno.EINTR:
2536             continue
2537           raise
2538     finally:
2539       # try not to leak fds
2540       for fd in (fdstdin, ):
2541         if fd is not None:
2542           try:
2543             fd.close()
2544           except EnvironmentError, err:
2545             # just log the error
2546             #logging.exception("Error while closing fd %s", fd)
2547             pass
2548
2549     return result == 0, utils.SafeEncode(output.strip())
2550
2551   def RunHooks(self, hpath, phase, env):
2552     """Run the scripts in the hooks directory.
2553
2554     @type hpath: str
2555     @param hpath: the path to the hooks directory which
2556         holds the scripts
2557     @type phase: str
2558     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2559         L{constants.HOOKS_PHASE_POST}
2560     @type env: dict
2561     @param env: dictionary with the environment for the hook
2562     @rtype: list
2563     @return: list of 3-element tuples:
2564       - script path
2565       - script result, either L{constants.HKR_SUCCESS} or
2566         L{constants.HKR_FAIL}
2567       - output of the script
2568
2569     @raise errors.ProgrammerError: for invalid input
2570         parameters
2571
2572     """
2573     if phase == constants.HOOKS_PHASE_PRE:
2574       suffix = "pre"
2575     elif phase == constants.HOOKS_PHASE_POST:
2576       suffix = "post"
2577     else:
2578       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2579     rr = []
2580
2581     subdir = "%s-%s.d" % (hpath, suffix)
2582     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2583     try:
2584       dir_contents = utils.ListVisibleFiles(dir_name)
2585     except OSError, err:
2586       # FIXME: must log output in case of failures
2587       return rr
2588
2589     # we use the standard python sort order,
2590     # so 00name is the recommended naming scheme
2591     dir_contents.sort()
2592     for relname in dir_contents:
2593       fname = os.path.join(dir_name, relname)
2594       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2595           self.RE_MASK.match(relname) is not None):
2596         rrval = constants.HKR_SKIP
2597         output = ""
2598       else:
2599         result, output = self.ExecHook(fname, env)
2600         if not result:
2601           rrval = constants.HKR_FAIL
2602         else:
2603           rrval = constants.HKR_SUCCESS
2604       rr.append(("%s/%s" % (subdir, relname), rrval, output))
2605
2606     return rr
2607
2608
2609 class IAllocatorRunner(object):
2610   """IAllocator runner.
2611
2612   This class is instantiated on the node side (ganeti-noded) and not on
2613   the master side.
2614
2615   """
2616   def Run(self, name, idata):
2617     """Run an iallocator script.
2618
2619     @type name: str
2620     @param name: the iallocator script name
2621     @type idata: str
2622     @param idata: the allocator input data
2623
2624     @rtype: tuple
2625     @return: four element tuple of:
2626        - run status (one of the IARUN_ constants)
2627        - stdout
2628        - stderr
2629        - fail reason (as from L{utils.RunResult})
2630
2631     """
2632     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2633                                   os.path.isfile)
2634     if alloc_script is None:
2635       return (constants.IARUN_NOTFOUND, None, None, None)
2636
2637     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2638     try:
2639       os.write(fd, idata)
2640       os.close(fd)
2641       result = utils.RunCmd([alloc_script, fin_name])
2642       if result.failed:
2643         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2644                 result.fail_reason)
2645     finally:
2646       os.unlink(fin_name)
2647
2648     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2649
2650
2651 class DevCacheManager(object):
2652   """Simple class for managing a cache of block device information.
2653
2654   """
2655   _DEV_PREFIX = "/dev/"
2656   _ROOT_DIR = constants.BDEV_CACHE_DIR
2657
2658   @classmethod
2659   def _ConvertPath(cls, dev_path):
2660     """Converts a /dev/name path to the cache file name.
2661
2662     This replaces slashes with underscores and strips the /dev
2663     prefix. It then returns the full path to the cache file.
2664
2665     @type dev_path: str
2666     @param dev_path: the C{/dev/} path name
2667     @rtype: str
2668     @return: the converted path name
2669
2670     """
2671     if dev_path.startswith(cls._DEV_PREFIX):
2672       dev_path = dev_path[len(cls._DEV_PREFIX):]
2673     dev_path = dev_path.replace("/", "_")
2674     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2675     return fpath
2676
2677   @classmethod
2678   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2679     """Updates the cache information for a given device.
2680
2681     @type dev_path: str
2682     @param dev_path: the pathname of the device
2683     @type owner: str
2684     @param owner: the owner (instance name) of the device
2685     @type on_primary: bool
2686     @param on_primary: whether this is the primary
2687         node nor not
2688     @type iv_name: str
2689     @param iv_name: the instance-visible name of the
2690         device, as in objects.Disk.iv_name
2691
2692     @rtype: None
2693
2694     """
2695     if dev_path is None:
2696       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2697       return
2698     fpath = cls._ConvertPath(dev_path)
2699     if on_primary:
2700       state = "primary"
2701     else:
2702       state = "secondary"
2703     if iv_name is None:
2704       iv_name = "not_visible"
2705     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2706     try:
2707       utils.WriteFile(fpath, data=fdata)
2708     except EnvironmentError, err:
2709       logging.exception("Can't update bdev cache for %s", dev_path)
2710
2711   @classmethod
2712   def RemoveCache(cls, dev_path):
2713     """Remove data for a dev_path.
2714
2715     This is just a wrapper over L{utils.RemoveFile} with a converted
2716     path name and logging.
2717
2718     @type dev_path: str
2719     @param dev_path: the pathname of the device
2720
2721     @rtype: None
2722
2723     """
2724     if dev_path is None:
2725       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2726       return
2727     fpath = cls._ConvertPath(dev_path)
2728     try:
2729       utils.RemoveFile(fpath)
2730     except EnvironmentError, err:
2731       logging.exception("Can't update bdev cache for %s", dev_path)