Merge branch 'master' into next
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26
27 """
28
29
30 import os
31 import os.path
32 import shutil
33 import time
34 import stat
35 import errno
36 import re
37 import subprocess
38 import random
39 import logging
40 import tempfile
41 import zlib
42 import base64
43
44 from ganeti import errors
45 from ganeti import utils
46 from ganeti import ssh
47 from ganeti import hypervisor
48 from ganeti import constants
49 from ganeti import bdev
50 from ganeti import objects
51 from ganeti import ssconf
52
53
54 def _GetConfig():
55   """Simple wrapper to return a SimpleStore.
56
57   @rtype: L{ssconf.SimpleStore}
58   @return: a SimpleStore instance
59
60   """
61   return ssconf.SimpleStore()
62
63
64 def _GetSshRunner(cluster_name):
65   """Simple wrapper to return an SshRunner.
66
67   @type cluster_name: str
68   @param cluster_name: the cluster name, which is needed
69       by the SshRunner constructor
70   @rtype: L{ssh.SshRunner}
71   @return: an SshRunner instance
72
73   """
74   return ssh.SshRunner(cluster_name)
75
76
77 def _Decompress(data):
78   """Unpacks data compressed by the RPC client.
79
80   @type data: list or tuple
81   @param data: Data sent by RPC client
82   @rtype: str
83   @return: Decompressed data
84
85   """
86   assert isinstance(data, (list, tuple))
87   assert len(data) == 2
88   (encoding, content) = data
89   if encoding == constants.RPC_ENCODING_NONE:
90     return content
91   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
92     return zlib.decompress(base64.b64decode(content))
93   else:
94     raise AssertionError("Unknown data encoding")
95
96
97 def _CleanDirectory(path, exclude=None):
98   """Removes all regular files in a directory.
99
100   @type path: str
101   @param path: the directory to clean
102   @type exclude: list
103   @param exclude: list of files to be excluded, defaults
104       to the empty list
105
106   """
107   if not os.path.isdir(path):
108     return
109   if exclude is None:
110     exclude = []
111   else:
112     # Normalize excluded paths
113     exclude = [os.path.normpath(i) for i in exclude]
114
115   for rel_name in utils.ListVisibleFiles(path):
116     full_name = os.path.normpath(os.path.join(path, rel_name))
117     if full_name in exclude:
118       continue
119     if os.path.isfile(full_name) and not os.path.islink(full_name):
120       utils.RemoveFile(full_name)
121
122
123 def _BuildUploadFileList():
124   """Build the list of allowed upload files.
125
126   This is abstracted so that it's built only once at module import time.
127
128   """
129   return frozenset([
130       constants.CLUSTER_CONF_FILE,
131       constants.ETC_HOSTS,
132       constants.SSH_KNOWN_HOSTS_FILE,
133       constants.VNC_PASSWORD_FILE,
134       ])
135
136
137 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
138
139
140 def JobQueuePurge():
141   """Removes job queue files and archived jobs.
142
143   @rtype: None
144
145   """
146   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
147   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
148
149
150 def GetMasterInfo():
151   """Returns master information.
152
153   This is an utility function to compute master information, either
154   for consumption here or from the node daemon.
155
156   @rtype: tuple
157   @return: (master_netdev, master_ip, master_name) if we have a good
158       configuration, otherwise (None, None, None)
159
160   """
161   try:
162     cfg = _GetConfig()
163     master_netdev = cfg.GetMasterNetdev()
164     master_ip = cfg.GetMasterIP()
165     master_node = cfg.GetMasterNode()
166   except errors.ConfigurationError:
167     logging.exception("Cluster configuration incomplete")
168     return (None, None, None)
169   return (master_netdev, master_ip, master_node)
170
171
172 def StartMaster(start_daemons, no_voting):
173   """Activate local node as master node.
174
175   The function will always try activate the IP address of the master
176   (unless someone else has it). It will also start the master daemons,
177   based on the start_daemons parameter.
178
179   @type start_daemons: boolean
180   @param start_daemons: whther to also start the master
181       daemons (ganeti-masterd and ganeti-rapi)
182   @type no_voting: boolean
183   @param no_voting: whether to start ganeti-masterd without a node vote
184       (if start_daemons is True), but still non-interactively
185   @rtype: None
186
187   """
188   ok = True
189   master_netdev, master_ip, _ = GetMasterInfo()
190   if not master_netdev:
191     return False
192
193   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
194     if utils.OwnIpAddress(master_ip):
195       # we already have the ip:
196       logging.debug("Already started")
197     else:
198       logging.error("Someone else has the master ip, not activating")
199       ok = False
200   else:
201     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
202                            "dev", master_netdev, "label",
203                            "%s:0" % master_netdev])
204     if result.failed:
205       logging.error("Can't activate master IP: %s", result.output)
206       ok = False
207
208     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
209                            "-s", master_ip, master_ip])
210     # we'll ignore the exit code of arping
211
212   # and now start the master and rapi daemons
213   if start_daemons:
214     daemons_params = {
215         'ganeti-masterd': [],
216         'ganeti-rapi': [],
217         }
218     if no_voting:
219       daemons_params['ganeti-masterd'].append('--no-voting')
220       daemons_params['ganeti-masterd'].append('--yes-do-it')
221     for daemon in daemons_params:
222       cmd = [daemon]
223       cmd.extend(daemons_params[daemon])
224       result = utils.RunCmd(cmd)
225       if result.failed:
226         logging.error("Can't start daemon %s: %s", daemon, result.output)
227         ok = False
228   return ok
229
230
231 def StopMaster(stop_daemons):
232   """Deactivate this node as master.
233
234   The function will always try to deactivate the IP address of the
235   master. It will also stop the master daemons depending on the
236   stop_daemons parameter.
237
238   @type stop_daemons: boolean
239   @param stop_daemons: whether to also stop the master daemons
240       (ganeti-masterd and ganeti-rapi)
241   @rtype: None
242
243   """
244   master_netdev, master_ip, _ = GetMasterInfo()
245   if not master_netdev:
246     return False
247
248   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
249                          "dev", master_netdev])
250   if result.failed:
251     logging.error("Can't remove the master IP, error: %s", result.output)
252     # but otherwise ignore the failure
253
254   if stop_daemons:
255     # stop/kill the rapi and the master daemon
256     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
257       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
258
259   return True
260
261
262 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
263   """Joins this node to the cluster.
264
265   This does the following:
266       - updates the hostkeys of the machine (rsa and dsa)
267       - adds the ssh private key to the user
268       - adds the ssh public key to the users' authorized_keys file
269
270   @type dsa: str
271   @param dsa: the DSA private key to write
272   @type dsapub: str
273   @param dsapub: the DSA public key to write
274   @type rsa: str
275   @param rsa: the RSA private key to write
276   @type rsapub: str
277   @param rsapub: the RSA public key to write
278   @type sshkey: str
279   @param sshkey: the SSH private key to write
280   @type sshpub: str
281   @param sshpub: the SSH public key to write
282   @rtype: boolean
283   @return: the success of the operation
284
285   """
286   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
287                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
288                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
289                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
290   for name, content, mode in sshd_keys:
291     utils.WriteFile(name, data=content, mode=mode)
292
293   try:
294     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
295                                                     mkdir=True)
296   except errors.OpExecError, err:
297     msg = "Error while processing user ssh files"
298     logging.exception(msg)
299     return (False, "%s: %s" % (msg, err))
300
301   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
302     utils.WriteFile(name, data=content, mode=0600)
303
304   utils.AddAuthorizedKey(auth_keys, sshpub)
305
306   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
307
308   return (True, "Node added successfully")
309
310
311 def LeaveCluster():
312   """Cleans up and remove the current node.
313
314   This function cleans up and prepares the current node to be removed
315   from the cluster.
316
317   If processing is successful, then it raises an
318   L{errors.QuitGanetiException} which is used as a special case to
319   shutdown the node daemon.
320
321   """
322   _CleanDirectory(constants.DATA_DIR)
323   JobQueuePurge()
324
325   try:
326     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
327   except errors.OpExecError:
328     logging.exception("Error while processing ssh files")
329     return
330
331   f = open(pub_key, 'r')
332   try:
333     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
334   finally:
335     f.close()
336
337   utils.RemoveFile(priv_key)
338   utils.RemoveFile(pub_key)
339
340   # Return a reassuring string to the caller, and quit
341   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
342
343
344 def GetNodeInfo(vgname, hypervisor_type):
345   """Gives back a hash with different information about the node.
346
347   @type vgname: C{string}
348   @param vgname: the name of the volume group to ask for disk space information
349   @type hypervisor_type: C{str}
350   @param hypervisor_type: the name of the hypervisor to ask for
351       memory information
352   @rtype: C{dict}
353   @return: dictionary with the following keys:
354       - vg_size is the size of the configured volume group in MiB
355       - vg_free is the free size of the volume group in MiB
356       - memory_dom0 is the memory allocated for domain0 in MiB
357       - memory_free is the currently available (free) ram in MiB
358       - memory_total is the total number of ram in MiB
359
360   """
361   outputarray = {}
362   vginfo = _GetVGInfo(vgname)
363   outputarray['vg_size'] = vginfo['vg_size']
364   outputarray['vg_free'] = vginfo['vg_free']
365
366   hyper = hypervisor.GetHypervisor(hypervisor_type)
367   hyp_info = hyper.GetNodeInfo()
368   if hyp_info is not None:
369     outputarray.update(hyp_info)
370
371   f = open("/proc/sys/kernel/random/boot_id", 'r')
372   try:
373     outputarray["bootid"] = f.read(128).rstrip("\n")
374   finally:
375     f.close()
376
377   return outputarray
378
379
380 def VerifyNode(what, cluster_name):
381   """Verify the status of the local node.
382
383   Based on the input L{what} parameter, various checks are done on the
384   local node.
385
386   If the I{filelist} key is present, this list of
387   files is checksummed and the file/checksum pairs are returned.
388
389   If the I{nodelist} key is present, we check that we have
390   connectivity via ssh with the target nodes (and check the hostname
391   report).
392
393   If the I{node-net-test} key is present, we check that we have
394   connectivity to the given nodes via both primary IP and, if
395   applicable, secondary IPs.
396
397   @type what: C{dict}
398   @param what: a dictionary of things to check:
399       - filelist: list of files for which to compute checksums
400       - nodelist: list of nodes we should check ssh communication with
401       - node-net-test: list of nodes we should check node daemon port
402         connectivity with
403       - hypervisor: list with hypervisors to run the verify for
404   @rtype: dict
405   @return: a dictionary with the same keys as the input dict, and
406       values representing the result of the checks
407
408   """
409   result = {}
410
411   if constants.NV_HYPERVISOR in what:
412     result[constants.NV_HYPERVISOR] = tmp = {}
413     for hv_name in what[constants.NV_HYPERVISOR]:
414       tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
415
416   if constants.NV_FILELIST in what:
417     result[constants.NV_FILELIST] = utils.FingerprintFiles(
418       what[constants.NV_FILELIST])
419
420   if constants.NV_NODELIST in what:
421     result[constants.NV_NODELIST] = tmp = {}
422     random.shuffle(what[constants.NV_NODELIST])
423     for node in what[constants.NV_NODELIST]:
424       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
425       if not success:
426         tmp[node] = message
427
428   if constants.NV_NODENETTEST in what:
429     result[constants.NV_NODENETTEST] = tmp = {}
430     my_name = utils.HostInfo().name
431     my_pip = my_sip = None
432     for name, pip, sip in what[constants.NV_NODENETTEST]:
433       if name == my_name:
434         my_pip = pip
435         my_sip = sip
436         break
437     if not my_pip:
438       tmp[my_name] = ("Can't find my own primary/secondary IP"
439                       " in the node list")
440     else:
441       port = utils.GetNodeDaemonPort()
442       for name, pip, sip in what[constants.NV_NODENETTEST]:
443         fail = []
444         if not utils.TcpPing(pip, port, source=my_pip):
445           fail.append("primary")
446         if sip != pip:
447           if not utils.TcpPing(sip, port, source=my_sip):
448             fail.append("secondary")
449         if fail:
450           tmp[name] = ("failure using the %s interface(s)" %
451                        " and ".join(fail))
452
453   if constants.NV_LVLIST in what:
454     result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
455
456   if constants.NV_INSTANCELIST in what:
457     result[constants.NV_INSTANCELIST] = GetInstanceList(
458       what[constants.NV_INSTANCELIST])
459
460   if constants.NV_VGLIST in what:
461     result[constants.NV_VGLIST] = ListVolumeGroups()
462
463   if constants.NV_VERSION in what:
464     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
465                                     constants.RELEASE_VERSION)
466
467   if constants.NV_HVINFO in what:
468     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
469     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
470
471   if constants.NV_DRBDLIST in what:
472     try:
473       used_minors = bdev.DRBD8.GetUsedDevs().keys()
474     except errors.BlockDeviceError, err:
475       logging.warning("Can't get used minors list", exc_info=True)
476       used_minors = str(err)
477     result[constants.NV_DRBDLIST] = used_minors
478
479   return result
480
481
482 def GetVolumeList(vg_name):
483   """Compute list of logical volumes and their size.
484
485   @type vg_name: str
486   @param vg_name: the volume group whose LVs we should list
487   @rtype: dict
488   @return:
489       dictionary of all partions (key) with value being a tuple of
490       their size (in MiB), inactive and online status::
491
492         {'test1': ('20.06', True, True)}
493
494       in case of errors, a string is returned with the error
495       details.
496
497   """
498   lvs = {}
499   sep = '|'
500   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
501                          "--separator=%s" % sep,
502                          "-olv_name,lv_size,lv_attr", vg_name])
503   if result.failed:
504     logging.error("Failed to list logical volumes, lvs output: %s",
505                   result.output)
506     return result.output
507
508   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
509   for line in result.stdout.splitlines():
510     line = line.strip()
511     match = valid_line_re.match(line)
512     if not match:
513       logging.error("Invalid line returned from lvs output: '%s'", line)
514       continue
515     name, size, attr = match.groups()
516     inactive = attr[4] == '-'
517     online = attr[5] == 'o'
518     lvs[name] = (size, inactive, online)
519
520   return lvs
521
522
523 def ListVolumeGroups():
524   """List the volume groups and their size.
525
526   @rtype: dict
527   @return: dictionary with keys volume name and values the
528       size of the volume
529
530   """
531   return utils.ListVolumeGroups()
532
533
534 def NodeVolumes():
535   """List all volumes on this node.
536
537   @rtype: list
538   @return:
539     A list of dictionaries, each having four keys:
540       - name: the logical volume name,
541       - size: the size of the logical volume
542       - dev: the physical device on which the LV lives
543       - vg: the volume group to which it belongs
544
545     In case of errors, we return an empty list and log the
546     error.
547
548     Note that since a logical volume can live on multiple physical
549     volumes, the resulting list might include a logical volume
550     multiple times.
551
552   """
553   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
554                          "--separator=|",
555                          "--options=lv_name,lv_size,devices,vg_name"])
556   if result.failed:
557     logging.error("Failed to list logical volumes, lvs output: %s",
558                   result.output)
559     return []
560
561   def parse_dev(dev):
562     if '(' in dev:
563       return dev.split('(')[0]
564     else:
565       return dev
566
567   def map_line(line):
568     return {
569       'name': line[0].strip(),
570       'size': line[1].strip(),
571       'dev': parse_dev(line[2].strip()),
572       'vg': line[3].strip(),
573     }
574
575   return [map_line(line.split('|')) for line in result.stdout.splitlines()
576           if line.count('|') >= 3]
577
578
579 def BridgesExist(bridges_list):
580   """Check if a list of bridges exist on the current node.
581
582   @rtype: boolean
583   @return: C{True} if all of them exist, C{False} otherwise
584
585   """
586   for bridge in bridges_list:
587     if not utils.BridgeExists(bridge):
588       return False
589
590   return True
591
592
593 def GetInstanceList(hypervisor_list):
594   """Provides a list of instances.
595
596   @type hypervisor_list: list
597   @param hypervisor_list: the list of hypervisors to query information
598
599   @rtype: list
600   @return: a list of all running instances on the current node
601     - instance1.example.com
602     - instance2.example.com
603
604   """
605   results = []
606   for hname in hypervisor_list:
607     try:
608       names = hypervisor.GetHypervisor(hname).ListInstances()
609       results.extend(names)
610     except errors.HypervisorError:
611       logging.exception("Error enumerating instances for hypevisor %s", hname)
612       raise
613
614   return results
615
616
617 def GetInstanceInfo(instance, hname):
618   """Gives back the information about an instance as a dictionary.
619
620   @type instance: string
621   @param instance: the instance name
622   @type hname: string
623   @param hname: the hypervisor type of the instance
624
625   @rtype: dict
626   @return: dictionary with the following keys:
627       - memory: memory size of instance (int)
628       - state: xen state of instance (string)
629       - time: cpu time of instance (float)
630
631   """
632   output = {}
633
634   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
635   if iinfo is not None:
636     output['memory'] = iinfo[2]
637     output['state'] = iinfo[4]
638     output['time'] = iinfo[5]
639
640   return output
641
642
643 def GetInstanceMigratable(instance):
644   """Gives whether an instance can be migrated.
645
646   @type instance: L{objects.Instance}
647   @param instance: object representing the instance to be checked.
648
649   @rtype: tuple
650   @return: tuple of (result, description) where:
651       - result: whether the instance can be migrated or not
652       - description: a description of the issue, if relevant
653
654   """
655   hyper = hypervisor.GetHypervisor(instance.hypervisor)
656   if instance.name not in hyper.ListInstances():
657     return (False, 'not running')
658
659   for idx in range(len(instance.disks)):
660     link_name = _GetBlockDevSymlinkPath(instance.name, idx)
661     if not os.path.islink(link_name):
662       return (False, 'not restarted since ganeti 1.2.5')
663
664   return (True, '')
665
666
667 def GetAllInstancesInfo(hypervisor_list):
668   """Gather data about all instances.
669
670   This is the equivalent of L{GetInstanceInfo}, except that it
671   computes data for all instances at once, thus being faster if one
672   needs data about more than one instance.
673
674   @type hypervisor_list: list
675   @param hypervisor_list: list of hypervisors to query for instance data
676
677   @rtype: dict
678   @return: dictionary of instance: data, with data having the following keys:
679       - memory: memory size of instance (int)
680       - state: xen state of instance (string)
681       - time: cpu time of instance (float)
682       - vcpus: the number of vcpus
683
684   """
685   output = {}
686
687   for hname in hypervisor_list:
688     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
689     if iinfo:
690       for name, inst_id, memory, vcpus, state, times in iinfo:
691         value = {
692           'memory': memory,
693           'vcpus': vcpus,
694           'state': state,
695           'time': times,
696           }
697         if name in output:
698           # we only check static parameters, like memory and vcpus,
699           # and not state and time which can change between the
700           # invocations of the different hypervisors
701           for key in 'memory', 'vcpus':
702             if value[key] != output[name][key]:
703               raise errors.HypervisorError("Instance %s is running twice"
704                                            " with different parameters" % name)
705         output[name] = value
706
707   return output
708
709
710 def InstanceOsAdd(instance):
711   """Add an OS to an instance.
712
713   @type instance: L{objects.Instance}
714   @param instance: Instance whose OS is to be installed
715   @rtype: boolean
716   @return: the success of the operation
717
718   """
719   try:
720     inst_os = OSFromDisk(instance.os)
721   except errors.InvalidOS, err:
722     os_name, os_dir, os_err = err.args
723     if os_dir is None:
724       return (False, "Can't find OS '%s': %s" % (os_name, os_err))
725     else:
726       return (False, "Error parsing OS '%s' in directory %s: %s" %
727               (os_name, os_dir, os_err))
728
729   create_env = OSEnvironment(instance)
730
731   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
732                                      instance.name, int(time.time()))
733
734   result = utils.RunCmd([inst_os.create_script], env=create_env,
735                         cwd=inst_os.path, output=logfile,)
736   if result.failed:
737     logging.error("os create command '%s' returned error: %s, logfile: %s,"
738                   " output: %s", result.cmd, result.fail_reason, logfile,
739                   result.output)
740     lines = [utils.SafeEncode(val)
741              for val in utils.TailFile(logfile, lines=20)]
742     return (False, "OS create script failed (%s), last lines in the"
743             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
744
745   return (True, "Successfully installed")
746
747
748 def RunRenameInstance(instance, old_name):
749   """Run the OS rename script for an instance.
750
751   @type instance: L{objects.Instance}
752   @param instance: Instance whose OS is to be installed
753   @type old_name: string
754   @param old_name: previous instance name
755   @rtype: boolean
756   @return: the success of the operation
757
758   """
759   inst_os = OSFromDisk(instance.os)
760
761   rename_env = OSEnvironment(instance)
762   rename_env['OLD_INSTANCE_NAME'] = old_name
763
764   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
765                                            old_name,
766                                            instance.name, int(time.time()))
767
768   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
769                         cwd=inst_os.path, output=logfile)
770
771   if result.failed:
772     logging.error("os create command '%s' returned error: %s output: %s",
773                   result.cmd, result.fail_reason, result.output)
774     lines = [utils.SafeEncode(val)
775              for val in utils.TailFile(logfile, lines=20)]
776     return (False, "OS rename script failed (%s), last lines in the"
777             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
778
779   return (True, "Rename successful")
780
781
782 def _GetVGInfo(vg_name):
783   """Get information about the volume group.
784
785   @type vg_name: str
786   @param vg_name: the volume group which we query
787   @rtype: dict
788   @return:
789     A dictionary with the following keys:
790       - C{vg_size} is the total size of the volume group in MiB
791       - C{vg_free} is the free size of the volume group in MiB
792       - C{pv_count} are the number of physical disks in that VG
793
794     If an error occurs during gathering of data, we return the same dict
795     with keys all set to None.
796
797   """
798   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
799
800   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
801                          "--nosuffix", "--units=m", "--separator=:", vg_name])
802
803   if retval.failed:
804     logging.error("volume group %s not present", vg_name)
805     return retdic
806   valarr = retval.stdout.strip().rstrip(':').split(':')
807   if len(valarr) == 3:
808     try:
809       retdic = {
810         "vg_size": int(round(float(valarr[0]), 0)),
811         "vg_free": int(round(float(valarr[1]), 0)),
812         "pv_count": int(valarr[2]),
813         }
814     except ValueError, err:
815       logging.exception("Fail to parse vgs output")
816   else:
817     logging.error("vgs output has the wrong number of fields (expected"
818                   " three): %s", str(valarr))
819   return retdic
820
821
822 def _GetBlockDevSymlinkPath(instance_name, idx):
823   return os.path.join(constants.DISK_LINKS_DIR,
824                       "%s:%d" % (instance_name, idx))
825
826
827 def _SymlinkBlockDev(instance_name, device_path, idx):
828   """Set up symlinks to a instance's block device.
829
830   This is an auxiliary function run when an instance is start (on the primary
831   node) or when an instance is migrated (on the target node).
832
833
834   @param instance_name: the name of the target instance
835   @param device_path: path of the physical block device, on the node
836   @param idx: the disk index
837   @return: absolute path to the disk's symlink
838
839   """
840   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
841   try:
842     os.symlink(device_path, link_name)
843   except OSError, err:
844     if err.errno == errno.EEXIST:
845       if (not os.path.islink(link_name) or
846           os.readlink(link_name) != device_path):
847         os.remove(link_name)
848         os.symlink(device_path, link_name)
849     else:
850       raise
851
852   return link_name
853
854
855 def _RemoveBlockDevLinks(instance_name, disks):
856   """Remove the block device symlinks belonging to the given instance.
857
858   """
859   for idx, disk in enumerate(disks):
860     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
861     if os.path.islink(link_name):
862       try:
863         os.remove(link_name)
864       except OSError:
865         logging.exception("Can't remove symlink '%s'", link_name)
866
867
868 def _GatherAndLinkBlockDevs(instance):
869   """Set up an instance's block device(s).
870
871   This is run on the primary node at instance startup. The block
872   devices must be already assembled.
873
874   @type instance: L{objects.Instance}
875   @param instance: the instance whose disks we shoul assemble
876   @rtype: list
877   @return: list of (disk_object, device_path)
878
879   """
880   block_devices = []
881   for idx, disk in enumerate(instance.disks):
882     device = _RecursiveFindBD(disk)
883     if device is None:
884       raise errors.BlockDeviceError("Block device '%s' is not set up." %
885                                     str(disk))
886     device.Open()
887     try:
888       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
889     except OSError, e:
890       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
891                                     e.strerror)
892
893     block_devices.append((disk, link_name))
894
895   return block_devices
896
897
898 def StartInstance(instance):
899   """Start an instance.
900
901   @type instance: L{objects.Instance}
902   @param instance: the instance object
903   @rtype: boolean
904   @return: whether the startup was successful or not
905
906   """
907   running_instances = GetInstanceList([instance.hypervisor])
908
909   if instance.name in running_instances:
910     return (True, "Already running")
911
912   try:
913     block_devices = _GatherAndLinkBlockDevs(instance)
914     hyper = hypervisor.GetHypervisor(instance.hypervisor)
915     hyper.StartInstance(instance, block_devices)
916   except errors.BlockDeviceError, err:
917     logging.exception("Failed to start instance")
918     return (False, "Block device error: %s" % str(err))
919   except errors.HypervisorError, err:
920     logging.exception("Failed to start instance")
921     _RemoveBlockDevLinks(instance.name, instance.disks)
922     return (False, "Hypervisor error: %s" % str(err))
923
924   return (True, "Instance started successfully")
925
926
927 def InstanceShutdown(instance):
928   """Shut an instance down.
929
930   @note: this functions uses polling with a hardcoded timeout.
931
932   @type instance: L{objects.Instance}
933   @param instance: the instance object
934   @rtype: boolean
935   @return: whether the startup was successful or not
936
937   """
938   hv_name = instance.hypervisor
939   running_instances = GetInstanceList([hv_name])
940
941   if instance.name not in running_instances:
942     return (True, "Instance already stopped")
943
944   hyper = hypervisor.GetHypervisor(hv_name)
945   try:
946     hyper.StopInstance(instance)
947   except errors.HypervisorError, err:
948     msg = "Failed to stop instance %s: %s" % (instance.name, err)
949     logging.error(msg)
950     return (False, msg)
951
952   # test every 10secs for 2min
953
954   time.sleep(1)
955   for _ in range(11):
956     if instance.name not in GetInstanceList([hv_name]):
957       break
958     time.sleep(10)
959   else:
960     # the shutdown did not succeed
961     logging.error("Shutdown of '%s' unsuccessful, using destroy",
962                   instance.name)
963
964     try:
965       hyper.StopInstance(instance, force=True)
966     except errors.HypervisorError, err:
967       msg = "Failed to force stop instance %s: %s" % (instance.name, err)
968       logging.error(msg)
969       return (False, msg)
970
971     time.sleep(1)
972     if instance.name in GetInstanceList([hv_name]):
973       msg = ("Could not shutdown instance %s even by destroy" %
974              instance.name)
975       logging.error(msg)
976       return (False, msg)
977
978   _RemoveBlockDevLinks(instance.name, instance.disks)
979
980   return (True, "Instance has been shutdown successfully")
981
982
983 def InstanceReboot(instance, reboot_type):
984   """Reboot an instance.
985
986   @type instance: L{objects.Instance}
987   @param instance: the instance object to reboot
988   @type reboot_type: str
989   @param reboot_type: the type of reboot, one the following
990     constants:
991       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
992         instance OS, do not recreate the VM
993       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
994         restart the VM (at the hypervisor level)
995       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
996         not accepted here, since that mode is handled differently, in
997         cmdlib, and translates into full stop and start of the
998         instance (instead of a call_instance_reboot RPC)
999   @rtype: boolean
1000   @return: the success of the operation
1001
1002   """
1003   running_instances = GetInstanceList([instance.hypervisor])
1004
1005   if instance.name not in running_instances:
1006     msg = "Cannot reboot instance %s that is not running" % instance.name
1007     logging.error(msg)
1008     return (False, msg)
1009
1010   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1011   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1012     try:
1013       hyper.RebootInstance(instance)
1014     except errors.HypervisorError, err:
1015       msg = "Failed to soft reboot instance %s: %s" % (instance.name, err)
1016       logging.error(msg)
1017       return (False, msg)
1018   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1019     try:
1020       stop_result = InstanceShutdown(instance)
1021       if not stop_result[0]:
1022         return stop_result
1023       return StartInstance(instance)
1024     except errors.HypervisorError, err:
1025       msg = "Failed to hard reboot instance %s: %s" % (instance.name, err)
1026       logging.error(msg)
1027       return (False, msg)
1028   else:
1029     return (False, "Invalid reboot_type received: %s" % (reboot_type,))
1030
1031   return (True, "Reboot successful")
1032
1033
1034 def MigrationInfo(instance):
1035   """Gather information about an instance to be migrated.
1036
1037   @type instance: L{objects.Instance}
1038   @param instance: the instance definition
1039
1040   """
1041   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1042   try:
1043     info = hyper.MigrationInfo(instance)
1044   except errors.HypervisorError, err:
1045     msg = "Failed to fetch migration information"
1046     logging.exception(msg)
1047     return (False, '%s: %s' % (msg, err))
1048   return (True, info)
1049
1050
1051 def AcceptInstance(instance, info, target):
1052   """Prepare the node to accept an instance.
1053
1054   @type instance: L{objects.Instance}
1055   @param instance: the instance definition
1056   @type info: string/data (opaque)
1057   @param info: migration information, from the source node
1058   @type target: string
1059   @param target: target host (usually ip), on this node
1060
1061   """
1062   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1063   try:
1064     hyper.AcceptInstance(instance, info, target)
1065   except errors.HypervisorError, err:
1066     msg = "Failed to accept instance"
1067     logging.exception(msg)
1068     return (False, '%s: %s' % (msg, err))
1069   return (True, "Accept successful")
1070
1071
1072 def FinalizeMigration(instance, info, success):
1073   """Finalize any preparation to accept an instance.
1074
1075   @type instance: L{objects.Instance}
1076   @param instance: the instance definition
1077   @type info: string/data (opaque)
1078   @param info: migration information, from the source node
1079   @type success: boolean
1080   @param success: whether the migration was a success or a failure
1081
1082   """
1083   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1084   try:
1085     hyper.FinalizeMigration(instance, info, success)
1086   except errors.HypervisorError, err:
1087     msg = "Failed to finalize migration"
1088     logging.exception(msg)
1089     return (False, '%s: %s' % (msg, err))
1090   return (True, "Migration Finalized")
1091
1092
1093 def MigrateInstance(instance, target, live):
1094   """Migrates an instance to another node.
1095
1096   @type instance: L{objects.Instance}
1097   @param instance: the instance definition
1098   @type target: string
1099   @param target: the target node name
1100   @type live: boolean
1101   @param live: whether the migration should be done live or not (the
1102       interpretation of this parameter is left to the hypervisor)
1103   @rtype: tuple
1104   @return: a tuple of (success, msg) where:
1105       - succes is a boolean denoting the success/failure of the operation
1106       - msg is a string with details in case of failure
1107
1108   """
1109   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1110
1111   try:
1112     hyper.MigrateInstance(instance.name, target, live)
1113   except errors.HypervisorError, err:
1114     msg = "Failed to migrate instance"
1115     logging.exception(msg)
1116     return (False, "%s: %s" % (msg, err))
1117   return (True, "Migration successful")
1118
1119
1120 def BlockdevCreate(disk, size, owner, on_primary, info):
1121   """Creates a block device for an instance.
1122
1123   @type disk: L{objects.Disk}
1124   @param disk: the object describing the disk we should create
1125   @type size: int
1126   @param size: the size of the physical underlying device, in MiB
1127   @type owner: str
1128   @param owner: the name of the instance for which disk is created,
1129       used for device cache data
1130   @type on_primary: boolean
1131   @param on_primary:  indicates if it is the primary node or not
1132   @type info: string
1133   @param info: string that will be sent to the physical device
1134       creation, used for example to set (LVM) tags on LVs
1135
1136   @return: the new unique_id of the device (this can sometime be
1137       computed only after creation), or None. On secondary nodes,
1138       it's not required to return anything.
1139
1140   """
1141   clist = []
1142   if disk.children:
1143     for child in disk.children:
1144       try:
1145         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1146       except errors.BlockDeviceError, err:
1147         errmsg = "Can't assemble device %s: %s" % (child, err)
1148         logging.error(errmsg)
1149         return False, errmsg
1150       if on_primary or disk.AssembleOnSecondary():
1151         # we need the children open in case the device itself has to
1152         # be assembled
1153         try:
1154           crdev.Open()
1155         except errors.BlockDeviceError, err:
1156           errmsg = "Can't make child '%s' read-write: %s" % (child, err)
1157           logging.error(errmsg)
1158           return False, errmsg
1159       clist.append(crdev)
1160
1161   try:
1162     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1163   except errors.BlockDeviceError, err:
1164     return False, "Can't create block device: %s" % str(err)
1165
1166   if on_primary or disk.AssembleOnSecondary():
1167     try:
1168       device.Assemble()
1169     except errors.BlockDeviceError, err:
1170       errmsg = ("Can't assemble device after creation, very"
1171                 " unusual event: %s" % str(err))
1172       logging.error(errmsg)
1173       return False, errmsg
1174     device.SetSyncSpeed(constants.SYNC_SPEED)
1175     if on_primary or disk.OpenOnSecondary():
1176       try:
1177         device.Open(force=True)
1178       except errors.BlockDeviceError, err:
1179         errmsg = ("Can't make device r/w after creation, very"
1180                   " unusual event: %s" % str(err))
1181         logging.error(errmsg)
1182         return False, errmsg
1183     DevCacheManager.UpdateCache(device.dev_path, owner,
1184                                 on_primary, disk.iv_name)
1185
1186   device.SetInfo(info)
1187
1188   physical_id = device.unique_id
1189   return True, physical_id
1190
1191
1192 def BlockdevRemove(disk):
1193   """Remove a block device.
1194
1195   @note: This is intended to be called recursively.
1196
1197   @type disk: L{objects.Disk}
1198   @param disk: the disk object we should remove
1199   @rtype: boolean
1200   @return: the success of the operation
1201
1202   """
1203   msgs = []
1204   result = True
1205   try:
1206     rdev = _RecursiveFindBD(disk)
1207   except errors.BlockDeviceError, err:
1208     # probably can't attach
1209     logging.info("Can't attach to device %s in remove", disk)
1210     rdev = None
1211   if rdev is not None:
1212     r_path = rdev.dev_path
1213     try:
1214       rdev.Remove()
1215     except errors.BlockDeviceError, err:
1216       msgs.append(str(err))
1217       result = False
1218     if result:
1219       DevCacheManager.RemoveCache(r_path)
1220
1221   if disk.children:
1222     for child in disk.children:
1223       c_status, c_msg = BlockdevRemove(child)
1224       result = result and c_status
1225       if c_msg: # not an empty message
1226         msgs.append(c_msg)
1227
1228   return (result, "; ".join(msgs))
1229
1230
1231 def _RecursiveAssembleBD(disk, owner, as_primary):
1232   """Activate a block device for an instance.
1233
1234   This is run on the primary and secondary nodes for an instance.
1235
1236   @note: this function is called recursively.
1237
1238   @type disk: L{objects.Disk}
1239   @param disk: the disk we try to assemble
1240   @type owner: str
1241   @param owner: the name of the instance which owns the disk
1242   @type as_primary: boolean
1243   @param as_primary: if we should make the block device
1244       read/write
1245
1246   @return: the assembled device or None (in case no device
1247       was assembled)
1248   @raise errors.BlockDeviceError: in case there is an error
1249       during the activation of the children or the device
1250       itself
1251
1252   """
1253   children = []
1254   if disk.children:
1255     mcn = disk.ChildrenNeeded()
1256     if mcn == -1:
1257       mcn = 0 # max number of Nones allowed
1258     else:
1259       mcn = len(disk.children) - mcn # max number of Nones
1260     for chld_disk in disk.children:
1261       try:
1262         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1263       except errors.BlockDeviceError, err:
1264         if children.count(None) >= mcn:
1265           raise
1266         cdev = None
1267         logging.error("Error in child activation (but continuing): %s",
1268                       str(err))
1269       children.append(cdev)
1270
1271   if as_primary or disk.AssembleOnSecondary():
1272     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1273     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1274     result = r_dev
1275     if as_primary or disk.OpenOnSecondary():
1276       r_dev.Open()
1277     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1278                                 as_primary, disk.iv_name)
1279
1280   else:
1281     result = True
1282   return result
1283
1284
1285 def BlockdevAssemble(disk, owner, as_primary):
1286   """Activate a block device for an instance.
1287
1288   This is a wrapper over _RecursiveAssembleBD.
1289
1290   @rtype: str or boolean
1291   @return: a C{/dev/...} path for primary nodes, and
1292       C{True} for secondary nodes
1293
1294   """
1295   status = True
1296   result = "no error information"
1297   try:
1298     result = _RecursiveAssembleBD(disk, owner, as_primary)
1299     if isinstance(result, bdev.BlockDev):
1300       result = result.dev_path
1301   except errors.BlockDeviceError, err:
1302     result = "Error while assembling disk: %s" % str(err)
1303     status = False
1304   return (status, result)
1305
1306
1307 def BlockdevShutdown(disk):
1308   """Shut down a block device.
1309
1310   First, if the device is assembled (Attach() is successful), then
1311   the device is shutdown. Then the children of the device are
1312   shutdown.
1313
1314   This function is called recursively. Note that we don't cache the
1315   children or such, as oppossed to assemble, shutdown of different
1316   devices doesn't require that the upper device was active.
1317
1318   @type disk: L{objects.Disk}
1319   @param disk: the description of the disk we should
1320       shutdown
1321   @rtype: boolean
1322   @return: the success of the operation
1323
1324   """
1325   msgs = []
1326   result = True
1327   r_dev = _RecursiveFindBD(disk)
1328   if r_dev is not None:
1329     r_path = r_dev.dev_path
1330     try:
1331       r_dev.Shutdown()
1332       DevCacheManager.RemoveCache(r_path)
1333     except errors.BlockDeviceError, err:
1334       msgs.append(str(err))
1335       result = False
1336
1337   if disk.children:
1338     for child in disk.children:
1339       c_status, c_msg = BlockdevShutdown(child)
1340       result = result and c_status
1341       if c_msg: # not an empty message
1342         msgs.append(c_msg)
1343
1344   return (result, "; ".join(msgs))
1345
1346
1347 def BlockdevAddchildren(parent_cdev, new_cdevs):
1348   """Extend a mirrored block device.
1349
1350   @type parent_cdev: L{objects.Disk}
1351   @param parent_cdev: the disk to which we should add children
1352   @type new_cdevs: list of L{objects.Disk}
1353   @param new_cdevs: the list of children which we should add
1354   @rtype: boolean
1355   @return: the success of the operation
1356
1357   """
1358   parent_bdev = _RecursiveFindBD(parent_cdev)
1359   if parent_bdev is None:
1360     logging.error("Can't find parent device")
1361     return False
1362   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1363   if new_bdevs.count(None) > 0:
1364     logging.error("Can't find new device(s) to add: %s:%s",
1365                   new_bdevs, new_cdevs)
1366     return False
1367   parent_bdev.AddChildren(new_bdevs)
1368   return True
1369
1370
1371 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1372   """Shrink a mirrored block device.
1373
1374   @type parent_cdev: L{objects.Disk}
1375   @param parent_cdev: the disk from which we should remove children
1376   @type new_cdevs: list of L{objects.Disk}
1377   @param new_cdevs: the list of children which we should remove
1378   @rtype: boolean
1379   @return: the success of the operation
1380
1381   """
1382   parent_bdev = _RecursiveFindBD(parent_cdev)
1383   if parent_bdev is None:
1384     logging.error("Can't find parent in remove children: %s", parent_cdev)
1385     return False
1386   devs = []
1387   for disk in new_cdevs:
1388     rpath = disk.StaticDevPath()
1389     if rpath is None:
1390       bd = _RecursiveFindBD(disk)
1391       if bd is None:
1392         logging.error("Can't find dynamic device %s while removing children",
1393                       disk)
1394         return False
1395       else:
1396         devs.append(bd.dev_path)
1397     else:
1398       devs.append(rpath)
1399   parent_bdev.RemoveChildren(devs)
1400   return True
1401
1402
1403 def BlockdevGetmirrorstatus(disks):
1404   """Get the mirroring status of a list of devices.
1405
1406   @type disks: list of L{objects.Disk}
1407   @param disks: the list of disks which we should query
1408   @rtype: disk
1409   @return:
1410       a list of (mirror_done, estimated_time) tuples, which
1411       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1412   @raise errors.BlockDeviceError: if any of the disks cannot be
1413       found
1414
1415   """
1416   stats = []
1417   for dsk in disks:
1418     rbd = _RecursiveFindBD(dsk)
1419     if rbd is None:
1420       raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1421     stats.append(rbd.CombinedSyncStatus())
1422   return stats
1423
1424
1425 def _RecursiveFindBD(disk):
1426   """Check if a device is activated.
1427
1428   If so, return information about the real device.
1429
1430   @type disk: L{objects.Disk}
1431   @param disk: the disk object we need to find
1432
1433   @return: None if the device can't be found,
1434       otherwise the device instance
1435
1436   """
1437   children = []
1438   if disk.children:
1439     for chdisk in disk.children:
1440       children.append(_RecursiveFindBD(chdisk))
1441
1442   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1443
1444
1445 def BlockdevFind(disk):
1446   """Check if a device is activated.
1447
1448   If it is, return information about the real device.
1449
1450   @type disk: L{objects.Disk}
1451   @param disk: the disk to find
1452   @rtype: None or tuple
1453   @return: None if the disk cannot be found, otherwise a
1454       tuple (device_path, major, minor, sync_percent,
1455       estimated_time, is_degraded)
1456
1457   """
1458   try:
1459     rbd = _RecursiveFindBD(disk)
1460   except errors.BlockDeviceError, err:
1461     return (False, str(err))
1462   if rbd is None:
1463     return (True, None)
1464   return (True, (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus())
1465
1466
1467 def BlockdevGetsize(disks):
1468   """Computes the size of the given disks.
1469
1470   If a disk is not found, returns None instead.
1471
1472   @type disks: list of L{objects.Disk}
1473   @param disks: the list of disk to compute the size for
1474   @rtype: list
1475   @return: list with elements None if the disk cannot be found,
1476       otherwise the size
1477
1478   """
1479   result = []
1480   for cf in disks:
1481     try:
1482       rbd = _RecursiveFindBD(cf)
1483     except errors.BlockDeviceError, err:
1484       result.append(None)
1485       continue
1486     if rbd is None:
1487       result.append(None)
1488     else:
1489       result.append(rbd.GetActualSize())
1490   return result
1491
1492
1493 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1494   """Write a file to the filesystem.
1495
1496   This allows the master to overwrite(!) a file. It will only perform
1497   the operation if the file belongs to a list of configuration files.
1498
1499   @type file_name: str
1500   @param file_name: the target file name
1501   @type data: str
1502   @param data: the new contents of the file
1503   @type mode: int
1504   @param mode: the mode to give the file (can be None)
1505   @type uid: int
1506   @param uid: the owner of the file (can be -1 for default)
1507   @type gid: int
1508   @param gid: the group of the file (can be -1 for default)
1509   @type atime: float
1510   @param atime: the atime to set on the file (can be None)
1511   @type mtime: float
1512   @param mtime: the mtime to set on the file (can be None)
1513   @rtype: boolean
1514   @return: the success of the operation; errors are logged
1515       in the node daemon log
1516
1517   """
1518   if not os.path.isabs(file_name):
1519     logging.error("Filename passed to UploadFile is not absolute: '%s'",
1520                   file_name)
1521     return False
1522
1523   if file_name not in _ALLOWED_UPLOAD_FILES:
1524     logging.error("Filename passed to UploadFile not in allowed"
1525                  " upload targets: '%s'", file_name)
1526     return False
1527
1528   raw_data = _Decompress(data)
1529
1530   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1531                   atime=atime, mtime=mtime)
1532   return True
1533
1534
1535 def WriteSsconfFiles(values):
1536   """Update all ssconf files.
1537
1538   Wrapper around the SimpleStore.WriteFiles.
1539
1540   """
1541   ssconf.SimpleStore().WriteFiles(values)
1542
1543
1544 def _ErrnoOrStr(err):
1545   """Format an EnvironmentError exception.
1546
1547   If the L{err} argument has an errno attribute, it will be looked up
1548   and converted into a textual C{E...} description. Otherwise the
1549   string representation of the error will be returned.
1550
1551   @type err: L{EnvironmentError}
1552   @param err: the exception to format
1553
1554   """
1555   if hasattr(err, 'errno'):
1556     detail = errno.errorcode[err.errno]
1557   else:
1558     detail = str(err)
1559   return detail
1560
1561
1562 def _OSOndiskVersion(name, os_dir):
1563   """Compute and return the API version of a given OS.
1564
1565   This function will try to read the API version of the OS given by
1566   the 'name' parameter and residing in the 'os_dir' directory.
1567
1568   @type name: str
1569   @param name: the OS name we should look for
1570   @type os_dir: str
1571   @param os_dir: the directory inwhich we should look for the OS
1572   @rtype: int or None
1573   @return:
1574       Either an integer denoting the version or None in the
1575       case when this is not a valid OS name.
1576   @raise errors.InvalidOS: if the OS cannot be found
1577
1578   """
1579   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1580
1581   try:
1582     st = os.stat(api_file)
1583   except EnvironmentError, err:
1584     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1585                            " found (%s)" % _ErrnoOrStr(err))
1586
1587   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1588     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1589                            " a regular file")
1590
1591   try:
1592     f = open(api_file)
1593     try:
1594       api_versions = f.readlines()
1595     finally:
1596       f.close()
1597   except EnvironmentError, err:
1598     raise errors.InvalidOS(name, os_dir, "error while reading the"
1599                            " API version (%s)" % _ErrnoOrStr(err))
1600
1601   api_versions = [version.strip() for version in api_versions]
1602   try:
1603     api_versions = [int(version) for version in api_versions]
1604   except (TypeError, ValueError), err:
1605     raise errors.InvalidOS(name, os_dir,
1606                            "API version is not integer (%s)" % str(err))
1607
1608   return api_versions
1609
1610
1611 def DiagnoseOS(top_dirs=None):
1612   """Compute the validity for all OSes.
1613
1614   @type top_dirs: list
1615   @param top_dirs: the list of directories in which to
1616       search (if not given defaults to
1617       L{constants.OS_SEARCH_PATH})
1618   @rtype: list of L{objects.OS}
1619   @return: an OS object for each name in all the given
1620       directories
1621
1622   """
1623   if top_dirs is None:
1624     top_dirs = constants.OS_SEARCH_PATH
1625
1626   result = []
1627   for dir_name in top_dirs:
1628     if os.path.isdir(dir_name):
1629       try:
1630         f_names = utils.ListVisibleFiles(dir_name)
1631       except EnvironmentError, err:
1632         logging.exception("Can't list the OS directory %s", dir_name)
1633         break
1634       for name in f_names:
1635         try:
1636           os_inst = OSFromDisk(name, base_dir=dir_name)
1637           result.append(os_inst)
1638         except errors.InvalidOS, err:
1639           result.append(objects.OS.FromInvalidOS(err))
1640
1641   return result
1642
1643
1644 def OSFromDisk(name, base_dir=None):
1645   """Create an OS instance from disk.
1646
1647   This function will return an OS instance if the given name is a
1648   valid OS name. Otherwise, it will raise an appropriate
1649   L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1650
1651   @type base_dir: string
1652   @keyword base_dir: Base directory containing OS installations.
1653                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1654   @rtype: L{objects.OS}
1655   @return: the OS instance if we find a valid one
1656   @raise errors.InvalidOS: if we don't find a valid OS
1657
1658   """
1659   if base_dir is None:
1660     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1661     if os_dir is None:
1662       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1663   else:
1664     os_dir = os.path.sep.join([base_dir, name])
1665
1666   api_versions = _OSOndiskVersion(name, os_dir)
1667
1668   if constants.OS_API_VERSION not in api_versions:
1669     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1670                            " (found %s want %s)"
1671                            % (api_versions, constants.OS_API_VERSION))
1672
1673   # OS Scripts dictionary, we will populate it with the actual script names
1674   os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1675
1676   for script in os_scripts:
1677     os_scripts[script] = os.path.sep.join([os_dir, script])
1678
1679     try:
1680       st = os.stat(os_scripts[script])
1681     except EnvironmentError, err:
1682       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1683                              (script, _ErrnoOrStr(err)))
1684
1685     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1686       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1687                              script)
1688
1689     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1690       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1691                              script)
1692
1693
1694   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1695                     create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1696                     export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1697                     import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1698                     rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1699                     api_versions=api_versions)
1700
1701 def OSEnvironment(instance, debug=0):
1702   """Calculate the environment for an os script.
1703
1704   @type instance: L{objects.Instance}
1705   @param instance: target instance for the os script run
1706   @type debug: integer
1707   @param debug: debug level (0 or 1, for OS Api 10)
1708   @rtype: dict
1709   @return: dict of environment variables
1710   @raise errors.BlockDeviceError: if the block device
1711       cannot be found
1712
1713   """
1714   result = {}
1715   result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1716   result['INSTANCE_NAME'] = instance.name
1717   result['INSTANCE_OS'] = instance.os
1718   result['HYPERVISOR'] = instance.hypervisor
1719   result['DISK_COUNT'] = '%d' % len(instance.disks)
1720   result['NIC_COUNT'] = '%d' % len(instance.nics)
1721   result['DEBUG_LEVEL'] = '%d' % debug
1722   for idx, disk in enumerate(instance.disks):
1723     real_disk = _RecursiveFindBD(disk)
1724     if real_disk is None:
1725       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1726                                     str(disk))
1727     real_disk.Open()
1728     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1729     result['DISK_%d_ACCESS' % idx] = disk.mode
1730     if constants.HV_DISK_TYPE in instance.hvparams:
1731       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1732         instance.hvparams[constants.HV_DISK_TYPE]
1733     if disk.dev_type in constants.LDS_BLOCK:
1734       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1735     elif disk.dev_type == constants.LD_FILE:
1736       result['DISK_%d_BACKEND_TYPE' % idx] = \
1737         'file:%s' % disk.physical_id[0]
1738   for idx, nic in enumerate(instance.nics):
1739     result['NIC_%d_MAC' % idx] = nic.mac
1740     if nic.ip:
1741       result['NIC_%d_IP' % idx] = nic.ip
1742     result['NIC_%d_BRIDGE' % idx] = nic.bridge
1743     if constants.HV_NIC_TYPE in instance.hvparams:
1744       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1745         instance.hvparams[constants.HV_NIC_TYPE]
1746
1747   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1748     for key, value in source.items():
1749       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1750
1751   return result
1752
1753 def BlockdevGrow(disk, amount):
1754   """Grow a stack of block devices.
1755
1756   This function is called recursively, with the childrens being the
1757   first ones to resize.
1758
1759   @type disk: L{objects.Disk}
1760   @param disk: the disk to be grown
1761   @rtype: (status, result)
1762   @return: a tuple with the status of the operation
1763       (True/False), and the errors message if status
1764       is False
1765
1766   """
1767   r_dev = _RecursiveFindBD(disk)
1768   if r_dev is None:
1769     return False, "Cannot find block device %s" % (disk,)
1770
1771   try:
1772     r_dev.Grow(amount)
1773   except errors.BlockDeviceError, err:
1774     return False, str(err)
1775
1776   return True, None
1777
1778
1779 def BlockdevSnapshot(disk):
1780   """Create a snapshot copy of a block device.
1781
1782   This function is called recursively, and the snapshot is actually created
1783   just for the leaf lvm backend device.
1784
1785   @type disk: L{objects.Disk}
1786   @param disk: the disk to be snapshotted
1787   @rtype: string
1788   @return: snapshot disk path
1789
1790   """
1791   if disk.children:
1792     if len(disk.children) == 1:
1793       # only one child, let's recurse on it
1794       return BlockdevSnapshot(disk.children[0])
1795     else:
1796       # more than one child, choose one that matches
1797       for child in disk.children:
1798         if child.size == disk.size:
1799           # return implies breaking the loop
1800           return BlockdevSnapshot(child)
1801   elif disk.dev_type == constants.LD_LV:
1802     r_dev = _RecursiveFindBD(disk)
1803     if r_dev is not None:
1804       # let's stay on the safe side and ask for the full size, for now
1805       return r_dev.Snapshot(disk.size)
1806     else:
1807       return None
1808   else:
1809     raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1810                                  " '%s' of type '%s'" %
1811                                  (disk.unique_id, disk.dev_type))
1812
1813
1814 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1815   """Export a block device snapshot to a remote node.
1816
1817   @type disk: L{objects.Disk}
1818   @param disk: the description of the disk to export
1819   @type dest_node: str
1820   @param dest_node: the destination node to export to
1821   @type instance: L{objects.Instance}
1822   @param instance: the instance object to whom the disk belongs
1823   @type cluster_name: str
1824   @param cluster_name: the cluster name, needed for SSH hostalias
1825   @type idx: int
1826   @param idx: the index of the disk in the instance's disk list,
1827       used to export to the OS scripts environment
1828   @rtype: boolean
1829   @return: the success of the operation
1830
1831   """
1832   export_env = OSEnvironment(instance)
1833
1834   inst_os = OSFromDisk(instance.os)
1835   export_script = inst_os.export_script
1836
1837   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1838                                      instance.name, int(time.time()))
1839   if not os.path.exists(constants.LOG_OS_DIR):
1840     os.mkdir(constants.LOG_OS_DIR, 0750)
1841   real_disk = _RecursiveFindBD(disk)
1842   if real_disk is None:
1843     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1844                                   str(disk))
1845   real_disk.Open()
1846
1847   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1848   export_env['EXPORT_INDEX'] = str(idx)
1849
1850   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1851   destfile = disk.physical_id[1]
1852
1853   # the target command is built out of three individual commands,
1854   # which are joined by pipes; we check each individual command for
1855   # valid parameters
1856   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
1857                                inst_os.path, export_script, logfile)
1858
1859   comprcmd = "gzip"
1860
1861   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1862                                 destdir, destdir, destfile)
1863   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1864                                                    constants.GANETI_RUNAS,
1865                                                    destcmd)
1866
1867   # all commands have been checked, so we're safe to combine them
1868   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1869
1870   result = utils.RunCmd(["bash", "-c", command], env=export_env)
1871
1872   if result.failed:
1873     logging.error("os snapshot export command '%s' returned error: %s"
1874                   " output: %s", command, result.fail_reason, result.output)
1875     return False
1876
1877   return True
1878
1879
1880 def FinalizeExport(instance, snap_disks):
1881   """Write out the export configuration information.
1882
1883   @type instance: L{objects.Instance}
1884   @param instance: the instance which we export, used for
1885       saving configuration
1886   @type snap_disks: list of L{objects.Disk}
1887   @param snap_disks: list of snapshot block devices, which
1888       will be used to get the actual name of the dump file
1889
1890   @rtype: boolean
1891   @return: the success of the operation
1892
1893   """
1894   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1895   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1896
1897   config = objects.SerializableConfigParser()
1898
1899   config.add_section(constants.INISECT_EXP)
1900   config.set(constants.INISECT_EXP, 'version', '0')
1901   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1902   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1903   config.set(constants.INISECT_EXP, 'os', instance.os)
1904   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1905
1906   config.add_section(constants.INISECT_INS)
1907   config.set(constants.INISECT_INS, 'name', instance.name)
1908   config.set(constants.INISECT_INS, 'memory', '%d' %
1909              instance.beparams[constants.BE_MEMORY])
1910   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1911              instance.beparams[constants.BE_VCPUS])
1912   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1913
1914   nic_total = 0
1915   for nic_count, nic in enumerate(instance.nics):
1916     nic_total += 1
1917     config.set(constants.INISECT_INS, 'nic%d_mac' %
1918                nic_count, '%s' % nic.mac)
1919     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1920     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1921                '%s' % nic.bridge)
1922   # TODO: redundant: on load can read nics until it doesn't exist
1923   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1924
1925   disk_total = 0
1926   for disk_count, disk in enumerate(snap_disks):
1927     if disk:
1928       disk_total += 1
1929       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1930                  ('%s' % disk.iv_name))
1931       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1932                  ('%s' % disk.physical_id[1]))
1933       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1934                  ('%d' % disk.size))
1935
1936   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1937
1938   utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1939                   data=config.Dumps())
1940   shutil.rmtree(finaldestdir, True)
1941   shutil.move(destdir, finaldestdir)
1942
1943   return True
1944
1945
1946 def ExportInfo(dest):
1947   """Get export configuration information.
1948
1949   @type dest: str
1950   @param dest: directory containing the export
1951
1952   @rtype: L{objects.SerializableConfigParser}
1953   @return: a serializable config file containing the
1954       export info
1955
1956   """
1957   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1958
1959   config = objects.SerializableConfigParser()
1960   config.read(cff)
1961
1962   if (not config.has_section(constants.INISECT_EXP) or
1963       not config.has_section(constants.INISECT_INS)):
1964     return None
1965
1966   return config
1967
1968
1969 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1970   """Import an os image into an instance.
1971
1972   @type instance: L{objects.Instance}
1973   @param instance: instance to import the disks into
1974   @type src_node: string
1975   @param src_node: source node for the disk images
1976   @type src_images: list of string
1977   @param src_images: absolute paths of the disk images
1978   @rtype: list of boolean
1979   @return: each boolean represent the success of importing the n-th disk
1980
1981   """
1982   import_env = OSEnvironment(instance)
1983   inst_os = OSFromDisk(instance.os)
1984   import_script = inst_os.import_script
1985
1986   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1987                                         instance.name, int(time.time()))
1988   if not os.path.exists(constants.LOG_OS_DIR):
1989     os.mkdir(constants.LOG_OS_DIR, 0750)
1990
1991   comprcmd = "gunzip"
1992   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1993                                import_script, logfile)
1994
1995   final_result = []
1996   for idx, image in enumerate(src_images):
1997     if image:
1998       destcmd = utils.BuildShellCmd('cat %s', image)
1999       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
2000                                                        constants.GANETI_RUNAS,
2001                                                        destcmd)
2002       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
2003       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
2004       import_env['IMPORT_INDEX'] = str(idx)
2005       result = utils.RunCmd(command, env=import_env)
2006       if result.failed:
2007         logging.error("Disk import command '%s' returned error: %s"
2008                       " output: %s", command, result.fail_reason,
2009                       result.output)
2010         final_result.append(False)
2011       else:
2012         final_result.append(True)
2013     else:
2014       final_result.append(True)
2015
2016   return final_result
2017
2018
2019 def ListExports():
2020   """Return a list of exports currently available on this machine.
2021
2022   @rtype: list
2023   @return: list of the exports
2024
2025   """
2026   if os.path.isdir(constants.EXPORT_DIR):
2027     return utils.ListVisibleFiles(constants.EXPORT_DIR)
2028   else:
2029     return []
2030
2031
2032 def RemoveExport(export):
2033   """Remove an existing export from the node.
2034
2035   @type export: str
2036   @param export: the name of the export to remove
2037   @rtype: boolean
2038   @return: the success of the operation
2039
2040   """
2041   target = os.path.join(constants.EXPORT_DIR, export)
2042
2043   shutil.rmtree(target)
2044   # TODO: catch some of the relevant exceptions and provide a pretty
2045   # error message if rmtree fails.
2046
2047   return True
2048
2049
2050 def BlockdevRename(devlist):
2051   """Rename a list of block devices.
2052
2053   @type devlist: list of tuples
2054   @param devlist: list of tuples of the form  (disk,
2055       new_logical_id, new_physical_id); disk is an
2056       L{objects.Disk} object describing the current disk,
2057       and new logical_id/physical_id is the name we
2058       rename it to
2059   @rtype: boolean
2060   @return: True if all renames succeeded, False otherwise
2061
2062   """
2063   result = True
2064   for disk, unique_id in devlist:
2065     dev = _RecursiveFindBD(disk)
2066     if dev is None:
2067       result = False
2068       continue
2069     try:
2070       old_rpath = dev.dev_path
2071       dev.Rename(unique_id)
2072       new_rpath = dev.dev_path
2073       if old_rpath != new_rpath:
2074         DevCacheManager.RemoveCache(old_rpath)
2075         # FIXME: we should add the new cache information here, like:
2076         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2077         # but we don't have the owner here - maybe parse from existing
2078         # cache? for now, we only lose lvm data when we rename, which
2079         # is less critical than DRBD or MD
2080     except errors.BlockDeviceError:
2081       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2082       result = False
2083   return result
2084
2085
2086 def _TransformFileStorageDir(file_storage_dir):
2087   """Checks whether given file_storage_dir is valid.
2088
2089   Checks wheter the given file_storage_dir is within the cluster-wide
2090   default file_storage_dir stored in SimpleStore. Only paths under that
2091   directory are allowed.
2092
2093   @type file_storage_dir: str
2094   @param file_storage_dir: the path to check
2095
2096   @return: the normalized path if valid, None otherwise
2097
2098   """
2099   cfg = _GetConfig()
2100   file_storage_dir = os.path.normpath(file_storage_dir)
2101   base_file_storage_dir = cfg.GetFileStorageDir()
2102   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2103       base_file_storage_dir):
2104     logging.error("file storage directory '%s' is not under base file"
2105                   " storage directory '%s'",
2106                   file_storage_dir, base_file_storage_dir)
2107     return None
2108   return file_storage_dir
2109
2110
2111 def CreateFileStorageDir(file_storage_dir):
2112   """Create file storage directory.
2113
2114   @type file_storage_dir: str
2115   @param file_storage_dir: directory to create
2116
2117   @rtype: tuple
2118   @return: tuple with first element a boolean indicating wheter dir
2119       creation was successful or not
2120
2121   """
2122   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2123   result = True,
2124   if not file_storage_dir:
2125     result = False,
2126   else:
2127     if os.path.exists(file_storage_dir):
2128       if not os.path.isdir(file_storage_dir):
2129         logging.error("'%s' is not a directory", file_storage_dir)
2130         result = False,
2131     else:
2132       try:
2133         os.makedirs(file_storage_dir, 0750)
2134       except OSError, err:
2135         logging.error("Cannot create file storage directory '%s': %s",
2136                       file_storage_dir, err)
2137         result = False,
2138   return result
2139
2140
2141 def RemoveFileStorageDir(file_storage_dir):
2142   """Remove file storage directory.
2143
2144   Remove it only if it's empty. If not log an error and return.
2145
2146   @type file_storage_dir: str
2147   @param file_storage_dir: the directory we should cleanup
2148   @rtype: tuple (success,)
2149   @return: tuple of one element, C{success}, denoting
2150       whether the operation was successful
2151
2152   """
2153   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2154   result = True,
2155   if not file_storage_dir:
2156     result = False,
2157   else:
2158     if os.path.exists(file_storage_dir):
2159       if not os.path.isdir(file_storage_dir):
2160         logging.error("'%s' is not a directory", file_storage_dir)
2161         result = False,
2162       # deletes dir only if empty, otherwise we want to return False
2163       try:
2164         os.rmdir(file_storage_dir)
2165       except OSError:
2166         logging.exception("Cannot remove file storage directory '%s'",
2167                           file_storage_dir)
2168         result = False,
2169   return result
2170
2171
2172 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2173   """Rename the file storage directory.
2174
2175   @type old_file_storage_dir: str
2176   @param old_file_storage_dir: the current path
2177   @type new_file_storage_dir: str
2178   @param new_file_storage_dir: the name we should rename to
2179   @rtype: tuple (success,)
2180   @return: tuple of one element, C{success}, denoting
2181       whether the operation was successful
2182
2183   """
2184   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2185   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2186   result = True,
2187   if not old_file_storage_dir or not new_file_storage_dir:
2188     result = False,
2189   else:
2190     if not os.path.exists(new_file_storage_dir):
2191       if os.path.isdir(old_file_storage_dir):
2192         try:
2193           os.rename(old_file_storage_dir, new_file_storage_dir)
2194         except OSError:
2195           logging.exception("Cannot rename '%s' to '%s'",
2196                             old_file_storage_dir, new_file_storage_dir)
2197           result =  False,
2198       else:
2199         logging.error("'%s' is not a directory", old_file_storage_dir)
2200         result = False,
2201     else:
2202       if os.path.exists(old_file_storage_dir):
2203         logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
2204                       old_file_storage_dir, new_file_storage_dir)
2205         result = False,
2206   return result
2207
2208
2209 def _IsJobQueueFile(file_name):
2210   """Checks whether the given filename is in the queue directory.
2211
2212   @type file_name: str
2213   @param file_name: the file name we should check
2214   @rtype: boolean
2215   @return: whether the file is under the queue directory
2216
2217   """
2218   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2219   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2220
2221   if not result:
2222     logging.error("'%s' is not a file in the queue directory",
2223                   file_name)
2224
2225   return result
2226
2227
2228 def JobQueueUpdate(file_name, content):
2229   """Updates a file in the queue directory.
2230
2231   This is just a wrapper over L{utils.WriteFile}, with proper
2232   checking.
2233
2234   @type file_name: str
2235   @param file_name: the job file name
2236   @type content: str
2237   @param content: the new job contents
2238   @rtype: boolean
2239   @return: the success of the operation
2240
2241   """
2242   if not _IsJobQueueFile(file_name):
2243     return False
2244
2245   # Write and replace the file atomically
2246   utils.WriteFile(file_name, data=_Decompress(content))
2247
2248   return True
2249
2250
2251 def JobQueueRename(old, new):
2252   """Renames a job queue file.
2253
2254   This is just a wrapper over os.rename with proper checking.
2255
2256   @type old: str
2257   @param old: the old (actual) file name
2258   @type new: str
2259   @param new: the desired file name
2260   @rtype: boolean
2261   @return: the success of the operation
2262
2263   """
2264   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2265     return False
2266
2267   utils.RenameFile(old, new, mkdir=True)
2268
2269   return True
2270
2271
2272 def JobQueueSetDrainFlag(drain_flag):
2273   """Set the drain flag for the queue.
2274
2275   This will set or unset the queue drain flag.
2276
2277   @type drain_flag: boolean
2278   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2279   @rtype: boolean
2280   @return: always True
2281   @warning: the function always returns True
2282
2283   """
2284   if drain_flag:
2285     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2286   else:
2287     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2288
2289   return True
2290
2291
2292 def BlockdevClose(instance_name, disks):
2293   """Closes the given block devices.
2294
2295   This means they will be switched to secondary mode (in case of
2296   DRBD).
2297
2298   @param instance_name: if the argument is not empty, the symlinks
2299       of this instance will be removed
2300   @type disks: list of L{objects.Disk}
2301   @param disks: the list of disks to be closed
2302   @rtype: tuple (success, message)
2303   @return: a tuple of success and message, where success
2304       indicates the succes of the operation, and message
2305       which will contain the error details in case we
2306       failed
2307
2308   """
2309   bdevs = []
2310   for cf in disks:
2311     rd = _RecursiveFindBD(cf)
2312     if rd is None:
2313       return (False, "Can't find device %s" % cf)
2314     bdevs.append(rd)
2315
2316   msg = []
2317   for rd in bdevs:
2318     try:
2319       rd.Close()
2320     except errors.BlockDeviceError, err:
2321       msg.append(str(err))
2322   if msg:
2323     return (False, "Can't make devices secondary: %s" % ",".join(msg))
2324   else:
2325     if instance_name:
2326       _RemoveBlockDevLinks(instance_name, disks)
2327     return (True, "All devices secondary")
2328
2329
2330 def ValidateHVParams(hvname, hvparams):
2331   """Validates the given hypervisor parameters.
2332
2333   @type hvname: string
2334   @param hvname: the hypervisor name
2335   @type hvparams: dict
2336   @param hvparams: the hypervisor parameters to be validated
2337   @rtype: tuple (success, message)
2338   @return: a tuple of success and message, where success
2339       indicates the succes of the operation, and message
2340       which will contain the error details in case we
2341       failed
2342
2343   """
2344   try:
2345     hv_type = hypervisor.GetHypervisor(hvname)
2346     hv_type.ValidateParameters(hvparams)
2347     return (True, "Validation passed")
2348   except errors.HypervisorError, err:
2349     return (False, str(err))
2350
2351
2352 def DemoteFromMC():
2353   """Demotes the current node from master candidate role.
2354
2355   """
2356   # try to ensure we're not the master by mistake
2357   master, myself = ssconf.GetMasterAndMyself()
2358   if master == myself:
2359     return (False, "ssconf status shows I'm the master node, will not demote")
2360   pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2361   if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2362     return (False, "The master daemon is running, will not demote")
2363   try:
2364     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2365       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2366   except EnvironmentError, err:
2367     if err.errno != errno.ENOENT:
2368       return (False, "Error while backing up cluster file: %s" % str(err))
2369   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2370   return (True, "Done")
2371
2372
2373 def _FindDisks(nodes_ip, disks):
2374   """Sets the physical ID on disks and returns the block devices.
2375
2376   """
2377   # set the correct physical ID
2378   my_name = utils.HostInfo().name
2379   for cf in disks:
2380     cf.SetPhysicalID(my_name, nodes_ip)
2381
2382   bdevs = []
2383
2384   for cf in disks:
2385     rd = _RecursiveFindBD(cf)
2386     if rd is None:
2387       return (False, "Can't find device %s" % cf)
2388     bdevs.append(rd)
2389   return (True, bdevs)
2390
2391
2392 def DrbdDisconnectNet(nodes_ip, disks):
2393   """Disconnects the network on a list of drbd devices.
2394
2395   """
2396   status, bdevs = _FindDisks(nodes_ip, disks)
2397   if not status:
2398     return status, bdevs
2399
2400   # disconnect disks
2401   for rd in bdevs:
2402     try:
2403       rd.DisconnectNet()
2404     except errors.BlockDeviceError, err:
2405       logging.exception("Failed to go into standalone mode")
2406       return (False, "Can't change network configuration: %s" % str(err))
2407   return (True, "All disks are now disconnected")
2408
2409
2410 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2411   """Attaches the network on a list of drbd devices.
2412
2413   """
2414   status, bdevs = _FindDisks(nodes_ip, disks)
2415   if not status:
2416     return status, bdevs
2417
2418   if multimaster:
2419     for idx, rd in enumerate(bdevs):
2420       try:
2421         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2422       except EnvironmentError, err:
2423         return (False, "Can't create symlink: %s" % str(err))
2424   # reconnect disks, switch to new master configuration and if
2425   # needed primary mode
2426   for rd in bdevs:
2427     try:
2428       rd.AttachNet(multimaster)
2429     except errors.BlockDeviceError, err:
2430       return (False, "Can't change network configuration: %s" % str(err))
2431   # wait until the disks are connected; we need to retry the re-attach
2432   # if the device becomes standalone, as this might happen if the one
2433   # node disconnects and reconnects in a different mode before the
2434   # other node reconnects; in this case, one or both of the nodes will
2435   # decide it has wrong configuration and switch to standalone
2436   RECONNECT_TIMEOUT = 2 * 60
2437   sleep_time = 0.100 # start with 100 miliseconds
2438   timeout_limit = time.time() + RECONNECT_TIMEOUT
2439   while time.time() < timeout_limit:
2440     all_connected = True
2441     for rd in bdevs:
2442       stats = rd.GetProcStatus()
2443       if not (stats.is_connected or stats.is_in_resync):
2444         all_connected = False
2445       if stats.is_standalone:
2446         # peer had different config info and this node became
2447         # standalone, even though this should not happen with the
2448         # new staged way of changing disk configs
2449         try:
2450           rd.AttachNet(multimaster)
2451         except errors.BlockDeviceError, err:
2452           return (False, "Can't change network configuration: %s" % str(err))
2453     if all_connected:
2454       break
2455     time.sleep(sleep_time)
2456     sleep_time = min(5, sleep_time * 1.5)
2457   if not all_connected:
2458     return (False, "Timeout in disk reconnecting")
2459   if multimaster:
2460     # change to primary mode
2461     for rd in bdevs:
2462       try:
2463         rd.Open()
2464       except errors.BlockDeviceError, err:
2465         return (False, "Can't change to primary mode: %s" % str(err))
2466   if multimaster:
2467     msg = "multi-master and primary"
2468   else:
2469     msg = "single-master"
2470   return (True, "Disks are now configured as %s" % msg)
2471
2472
2473 def DrbdWaitSync(nodes_ip, disks):
2474   """Wait until DRBDs have synchronized.
2475
2476   """
2477   status, bdevs = _FindDisks(nodes_ip, disks)
2478   if not status:
2479     return status, bdevs
2480
2481   min_resync = 100
2482   alldone = True
2483   failure = False
2484   for rd in bdevs:
2485     stats = rd.GetProcStatus()
2486     if not (stats.is_connected or stats.is_in_resync):
2487       failure = True
2488       break
2489     alldone = alldone and (not stats.is_in_resync)
2490     if stats.sync_percent is not None:
2491       min_resync = min(min_resync, stats.sync_percent)
2492   return (not failure, (alldone, min_resync))
2493
2494
2495 class HooksRunner(object):
2496   """Hook runner.
2497
2498   This class is instantiated on the node side (ganeti-noded) and not
2499   on the master side.
2500
2501   """
2502   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2503
2504   def __init__(self, hooks_base_dir=None):
2505     """Constructor for hooks runner.
2506
2507     @type hooks_base_dir: str or None
2508     @param hooks_base_dir: if not None, this overrides the
2509         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2510
2511     """
2512     if hooks_base_dir is None:
2513       hooks_base_dir = constants.HOOKS_BASE_DIR
2514     self._BASE_DIR = hooks_base_dir
2515
2516   @staticmethod
2517   def ExecHook(script, env):
2518     """Exec one hook script.
2519
2520     @type script: str
2521     @param script: the full path to the script
2522     @type env: dict
2523     @param env: the environment with which to exec the script
2524     @rtype: tuple (success, message)
2525     @return: a tuple of success and message, where success
2526         indicates the succes of the operation, and message
2527         which will contain the error details in case we
2528         failed
2529
2530     """
2531     # exec the process using subprocess and log the output
2532     fdstdin = None
2533     try:
2534       fdstdin = open("/dev/null", "r")
2535       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2536                                stderr=subprocess.STDOUT, close_fds=True,
2537                                shell=False, cwd="/", env=env)
2538       output = ""
2539       try:
2540         output = child.stdout.read(4096)
2541         child.stdout.close()
2542       except EnvironmentError, err:
2543         output += "Hook script error: %s" % str(err)
2544
2545       while True:
2546         try:
2547           result = child.wait()
2548           break
2549         except EnvironmentError, err:
2550           if err.errno == errno.EINTR:
2551             continue
2552           raise
2553     finally:
2554       # try not to leak fds
2555       for fd in (fdstdin, ):
2556         if fd is not None:
2557           try:
2558             fd.close()
2559           except EnvironmentError, err:
2560             # just log the error
2561             #logging.exception("Error while closing fd %s", fd)
2562             pass
2563
2564     return result == 0, utils.SafeEncode(output.strip())
2565
2566   def RunHooks(self, hpath, phase, env):
2567     """Run the scripts in the hooks directory.
2568
2569     @type hpath: str
2570     @param hpath: the path to the hooks directory which
2571         holds the scripts
2572     @type phase: str
2573     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2574         L{constants.HOOKS_PHASE_POST}
2575     @type env: dict
2576     @param env: dictionary with the environment for the hook
2577     @rtype: list
2578     @return: list of 3-element tuples:
2579       - script path
2580       - script result, either L{constants.HKR_SUCCESS} or
2581         L{constants.HKR_FAIL}
2582       - output of the script
2583
2584     @raise errors.ProgrammerError: for invalid input
2585         parameters
2586
2587     """
2588     if phase == constants.HOOKS_PHASE_PRE:
2589       suffix = "pre"
2590     elif phase == constants.HOOKS_PHASE_POST:
2591       suffix = "post"
2592     else:
2593       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2594     rr = []
2595
2596     subdir = "%s-%s.d" % (hpath, suffix)
2597     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2598     try:
2599       dir_contents = utils.ListVisibleFiles(dir_name)
2600     except OSError:
2601       # FIXME: must log output in case of failures
2602       return rr
2603
2604     # we use the standard python sort order,
2605     # so 00name is the recommended naming scheme
2606     dir_contents.sort()
2607     for relname in dir_contents:
2608       fname = os.path.join(dir_name, relname)
2609       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2610           self.RE_MASK.match(relname) is not None):
2611         rrval = constants.HKR_SKIP
2612         output = ""
2613       else:
2614         result, output = self.ExecHook(fname, env)
2615         if not result:
2616           rrval = constants.HKR_FAIL
2617         else:
2618           rrval = constants.HKR_SUCCESS
2619       rr.append(("%s/%s" % (subdir, relname), rrval, output))
2620
2621     return rr
2622
2623
2624 class IAllocatorRunner(object):
2625   """IAllocator runner.
2626
2627   This class is instantiated on the node side (ganeti-noded) and not on
2628   the master side.
2629
2630   """
2631   def Run(self, name, idata):
2632     """Run an iallocator script.
2633
2634     @type name: str
2635     @param name: the iallocator script name
2636     @type idata: str
2637     @param idata: the allocator input data
2638
2639     @rtype: tuple
2640     @return: four element tuple of:
2641        - run status (one of the IARUN_ constants)
2642        - stdout
2643        - stderr
2644        - fail reason (as from L{utils.RunResult})
2645
2646     """
2647     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2648                                   os.path.isfile)
2649     if alloc_script is None:
2650       return (constants.IARUN_NOTFOUND, None, None, None)
2651
2652     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2653     try:
2654       os.write(fd, idata)
2655       os.close(fd)
2656       result = utils.RunCmd([alloc_script, fin_name])
2657       if result.failed:
2658         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2659                 result.fail_reason)
2660     finally:
2661       os.unlink(fin_name)
2662
2663     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2664
2665
2666 class DevCacheManager(object):
2667   """Simple class for managing a cache of block device information.
2668
2669   """
2670   _DEV_PREFIX = "/dev/"
2671   _ROOT_DIR = constants.BDEV_CACHE_DIR
2672
2673   @classmethod
2674   def _ConvertPath(cls, dev_path):
2675     """Converts a /dev/name path to the cache file name.
2676
2677     This replaces slashes with underscores and strips the /dev
2678     prefix. It then returns the full path to the cache file.
2679
2680     @type dev_path: str
2681     @param dev_path: the C{/dev/} path name
2682     @rtype: str
2683     @return: the converted path name
2684
2685     """
2686     if dev_path.startswith(cls._DEV_PREFIX):
2687       dev_path = dev_path[len(cls._DEV_PREFIX):]
2688     dev_path = dev_path.replace("/", "_")
2689     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2690     return fpath
2691
2692   @classmethod
2693   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2694     """Updates the cache information for a given device.
2695
2696     @type dev_path: str
2697     @param dev_path: the pathname of the device
2698     @type owner: str
2699     @param owner: the owner (instance name) of the device
2700     @type on_primary: bool
2701     @param on_primary: whether this is the primary
2702         node nor not
2703     @type iv_name: str
2704     @param iv_name: the instance-visible name of the
2705         device, as in objects.Disk.iv_name
2706
2707     @rtype: None
2708
2709     """
2710     if dev_path is None:
2711       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2712       return
2713     fpath = cls._ConvertPath(dev_path)
2714     if on_primary:
2715       state = "primary"
2716     else:
2717       state = "secondary"
2718     if iv_name is None:
2719       iv_name = "not_visible"
2720     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2721     try:
2722       utils.WriteFile(fpath, data=fdata)
2723     except EnvironmentError:
2724       logging.exception("Can't update bdev cache for %s", dev_path)
2725
2726   @classmethod
2727   def RemoveCache(cls, dev_path):
2728     """Remove data for a dev_path.
2729
2730     This is just a wrapper over L{utils.RemoveFile} with a converted
2731     path name and logging.
2732
2733     @type dev_path: str
2734     @param dev_path: the pathname of the device
2735
2736     @rtype: None
2737
2738     """
2739     if dev_path is None:
2740       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2741       return
2742     fpath = cls._ConvertPath(dev_path)
2743     try:
2744       utils.RemoveFile(fpath)
2745     except EnvironmentError:
2746       logging.exception("Can't update bdev cache for %s", dev_path)