Convert node_stop_master rpc to new style result
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36 import zlib
37 import base64
38
39 from ganeti import errors
40 from ganeti import utils
41 from ganeti import ssh
42 from ganeti import hypervisor
43 from ganeti import constants
44 from ganeti import bdev
45 from ganeti import objects
46 from ganeti import ssconf
47
48
49 class RPCFail(Exception):
50   """Class denoting RPC failure.
51
52   Its argument is the error message.
53
54   """
55
56 def _Fail(msg, *args, **kwargs):
57   """Log an error and the raise an RPCFail exception.
58
59   This exception is then handled specially in the ganeti daemon and
60   turned into a 'failed' return type. As such, this function is a
61   useful shortcut for logging the error and returning it to the master
62   daemon.
63
64   @type msg: string
65   @param msg: the text of the exception
66   @raise RPCFail
67
68   """
69   if args:
70     msg = msg % args
71   if "exc" in kwargs and kwargs["exc"]:
72     logging.exception(msg)
73   else:
74     logging.error(msg)
75   raise RPCFail(msg)
76
77
78 def _GetConfig():
79   """Simple wrapper to return a SimpleStore.
80
81   @rtype: L{ssconf.SimpleStore}
82   @return: a SimpleStore instance
83
84   """
85   return ssconf.SimpleStore()
86
87
88 def _GetSshRunner(cluster_name):
89   """Simple wrapper to return an SshRunner.
90
91   @type cluster_name: str
92   @param cluster_name: the cluster name, which is needed
93       by the SshRunner constructor
94   @rtype: L{ssh.SshRunner}
95   @return: an SshRunner instance
96
97   """
98   return ssh.SshRunner(cluster_name)
99
100
101 def _Decompress(data):
102   """Unpacks data compressed by the RPC client.
103
104   @type data: list or tuple
105   @param data: Data sent by RPC client
106   @rtype: str
107   @return: Decompressed data
108
109   """
110   assert isinstance(data, (list, tuple))
111   assert len(data) == 2
112   (encoding, content) = data
113   if encoding == constants.RPC_ENCODING_NONE:
114     return content
115   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
116     return zlib.decompress(base64.b64decode(content))
117   else:
118     raise AssertionError("Unknown data encoding")
119
120
121 def _CleanDirectory(path, exclude=None):
122   """Removes all regular files in a directory.
123
124   @type path: str
125   @param path: the directory to clean
126   @type exclude: list
127   @param exclude: list of files to be excluded, defaults
128       to the empty list
129
130   """
131   if not os.path.isdir(path):
132     return
133   if exclude is None:
134     exclude = []
135   else:
136     # Normalize excluded paths
137     exclude = [os.path.normpath(i) for i in exclude]
138
139   for rel_name in utils.ListVisibleFiles(path):
140     full_name = os.path.normpath(os.path.join(path, rel_name))
141     if full_name in exclude:
142       continue
143     if os.path.isfile(full_name) and not os.path.islink(full_name):
144       utils.RemoveFile(full_name)
145
146
147 def JobQueuePurge():
148   """Removes job queue files and archived jobs.
149
150   @rtype: None
151
152   """
153   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
154   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
155
156
157 def GetMasterInfo():
158   """Returns master information.
159
160   This is an utility function to compute master information, either
161   for consumption here or from the node daemon.
162
163   @rtype: tuple
164   @return: (master_netdev, master_ip, master_name) if we have a good
165       configuration, otherwise (None, None, None)
166
167   """
168   try:
169     cfg = _GetConfig()
170     master_netdev = cfg.GetMasterNetdev()
171     master_ip = cfg.GetMasterIP()
172     master_node = cfg.GetMasterNode()
173   except errors.ConfigurationError, err:
174     logging.exception("Cluster configuration incomplete")
175     return (None, None, None)
176   return (master_netdev, master_ip, master_node)
177
178
179 def StartMaster(start_daemons):
180   """Activate local node as master node.
181
182   The function will always try activate the IP address of the master
183   (unless someone else has it). It will also start the master daemons,
184   based on the start_daemons parameter.
185
186   @type start_daemons: boolean
187   @param start_daemons: whther to also start the master
188       daemons (ganeti-masterd and ganeti-rapi)
189   @rtype: None
190
191   """
192   master_netdev, master_ip, _ = GetMasterInfo()
193   if not master_netdev:
194     return False, "Cluster configuration incomplete, cannot read ssconf files"
195
196   payload = []
197   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
198     if utils.OwnIpAddress(master_ip):
199       # we already have the ip:
200       logging.debug("Master IP already configured, doing nothing")
201     else:
202       msg = "Someone else has the master ip, not activating"
203       logging.error(msg)
204       payload.append(msg)
205   else:
206     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
207                            "dev", master_netdev, "label",
208                            "%s:0" % master_netdev])
209     if result.failed:
210       msg = "Can't activate master IP: %s" % result.output
211       logging.error(msg)
212       payload.append(msg)
213
214     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
215                            "-s", master_ip, master_ip])
216     # we'll ignore the exit code of arping
217
218   # and now start the master and rapi daemons
219   if start_daemons:
220     for daemon in 'ganeti-masterd', 'ganeti-rapi':
221       result = utils.RunCmd([daemon])
222       if result.failed:
223         msg = "Can't start daemon %s: %s" % (daemon, result.output)
224         logging.error(msg)
225         payload.append(msg)
226
227   return not payload, "; ".join(payload)
228
229
230 def StopMaster(stop_daemons):
231   """Deactivate this node as master.
232
233   The function will always try to deactivate the IP address of the
234   master. It will also stop the master daemons depending on the
235   stop_daemons parameter.
236
237   @type stop_daemons: boolean
238   @param stop_daemons: whether to also stop the master daemons
239       (ganeti-masterd and ganeti-rapi)
240   @rtype: None
241
242   """
243   # TODO: log and report back to the caller the error failures; we
244   # need to decide in which case we fail the RPC for this
245   master_netdev, master_ip, _ = GetMasterInfo()
246   if not master_netdev:
247     return False, "Cluster configuration incomplete, cannot read ssconf files"
248
249   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
250                          "dev", master_netdev])
251   if result.failed:
252     logging.error("Can't remove the master IP, error: %s", result.output)
253     # but otherwise ignore the failure
254
255   if stop_daemons:
256     # stop/kill the rapi and the master daemon
257     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
258       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
259
260   return True, None
261
262
263 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
264   """Joins this node to the cluster.
265
266   This does the following:
267       - updates the hostkeys of the machine (rsa and dsa)
268       - adds the ssh private key to the user
269       - adds the ssh public key to the users' authorized_keys file
270
271   @type dsa: str
272   @param dsa: the DSA private key to write
273   @type dsapub: str
274   @param dsapub: the DSA public key to write
275   @type rsa: str
276   @param rsa: the RSA private key to write
277   @type rsapub: str
278   @param rsapub: the RSA public key to write
279   @type sshkey: str
280   @param sshkey: the SSH private key to write
281   @type sshpub: str
282   @param sshpub: the SSH public key to write
283   @rtype: boolean
284   @return: the success of the operation
285
286   """
287   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
288                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
289                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
290                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
291   for name, content, mode in sshd_keys:
292     utils.WriteFile(name, data=content, mode=mode)
293
294   try:
295     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
296                                                     mkdir=True)
297   except errors.OpExecError, err:
298     _Fail("Error while processing user ssh files: %s", err, exc=True)
299
300   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
301     utils.WriteFile(name, data=content, mode=0600)
302
303   utils.AddAuthorizedKey(auth_keys, sshpub)
304
305   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
306
307   return (True, "Node added successfully")
308
309
310 def LeaveCluster():
311   """Cleans up and remove the current node.
312
313   This function cleans up and prepares the current node to be removed
314   from the cluster.
315
316   If processing is successful, then it raises an
317   L{errors.QuitGanetiException} which is used as a special case to
318   shutdown the node daemon.
319
320   """
321   _CleanDirectory(constants.DATA_DIR)
322   JobQueuePurge()
323
324   try:
325     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
326   except errors.OpExecError:
327     logging.exception("Error while processing ssh files")
328     return
329
330   f = open(pub_key, 'r')
331   try:
332     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
333   finally:
334     f.close()
335
336   utils.RemoveFile(priv_key)
337   utils.RemoveFile(pub_key)
338
339   # Return a reassuring string to the caller, and quit
340   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
341
342
343 def GetNodeInfo(vgname, hypervisor_type):
344   """Gives back a hash with different informations about the node.
345
346   @type vgname: C{string}
347   @param vgname: the name of the volume group to ask for disk space information
348   @type hypervisor_type: C{str}
349   @param hypervisor_type: the name of the hypervisor to ask for
350       memory information
351   @rtype: C{dict}
352   @return: dictionary with the following keys:
353       - vg_size is the size of the configured volume group in MiB
354       - vg_free is the free size of the volume group in MiB
355       - memory_dom0 is the memory allocated for domain0 in MiB
356       - memory_free is the currently available (free) ram in MiB
357       - memory_total is the total number of ram in MiB
358
359   """
360   outputarray = {}
361   vginfo = _GetVGInfo(vgname)
362   outputarray['vg_size'] = vginfo['vg_size']
363   outputarray['vg_free'] = vginfo['vg_free']
364
365   hyper = hypervisor.GetHypervisor(hypervisor_type)
366   hyp_info = hyper.GetNodeInfo()
367   if hyp_info is not None:
368     outputarray.update(hyp_info)
369
370   f = open("/proc/sys/kernel/random/boot_id", 'r')
371   try:
372     outputarray["bootid"] = f.read(128).rstrip("\n")
373   finally:
374     f.close()
375
376   return True, outputarray
377
378
379 def VerifyNode(what, cluster_name):
380   """Verify the status of the local node.
381
382   Based on the input L{what} parameter, various checks are done on the
383   local node.
384
385   If the I{filelist} key is present, this list of
386   files is checksummed and the file/checksum pairs are returned.
387
388   If the I{nodelist} key is present, we check that we have
389   connectivity via ssh with the target nodes (and check the hostname
390   report).
391
392   If the I{node-net-test} key is present, we check that we have
393   connectivity to the given nodes via both primary IP and, if
394   applicable, secondary IPs.
395
396   @type what: C{dict}
397   @param what: a dictionary of things to check:
398       - filelist: list of files for which to compute checksums
399       - nodelist: list of nodes we should check ssh communication with
400       - node-net-test: list of nodes we should check node daemon port
401         connectivity with
402       - hypervisor: list with hypervisors to run the verify for
403   @rtype: dict
404   @return: a dictionary with the same keys as the input dict, and
405       values representing the result of the checks
406
407   """
408   result = {}
409
410   if constants.NV_HYPERVISOR in what:
411     result[constants.NV_HYPERVISOR] = tmp = {}
412     for hv_name in what[constants.NV_HYPERVISOR]:
413       tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
414
415   if constants.NV_FILELIST in what:
416     result[constants.NV_FILELIST] = utils.FingerprintFiles(
417       what[constants.NV_FILELIST])
418
419   if constants.NV_NODELIST in what:
420     result[constants.NV_NODELIST] = tmp = {}
421     random.shuffle(what[constants.NV_NODELIST])
422     for node in what[constants.NV_NODELIST]:
423       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
424       if not success:
425         tmp[node] = message
426
427   if constants.NV_NODENETTEST in what:
428     result[constants.NV_NODENETTEST] = tmp = {}
429     my_name = utils.HostInfo().name
430     my_pip = my_sip = None
431     for name, pip, sip in what[constants.NV_NODENETTEST]:
432       if name == my_name:
433         my_pip = pip
434         my_sip = sip
435         break
436     if not my_pip:
437       tmp[my_name] = ("Can't find my own primary/secondary IP"
438                       " in the node list")
439     else:
440       port = utils.GetNodeDaemonPort()
441       for name, pip, sip in what[constants.NV_NODENETTEST]:
442         fail = []
443         if not utils.TcpPing(pip, port, source=my_pip):
444           fail.append("primary")
445         if sip != pip:
446           if not utils.TcpPing(sip, port, source=my_sip):
447             fail.append("secondary")
448         if fail:
449           tmp[name] = ("failure using the %s interface(s)" %
450                        " and ".join(fail))
451
452   if constants.NV_LVLIST in what:
453     result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
454
455   if constants.NV_INSTANCELIST in what:
456     result[constants.NV_INSTANCELIST] = GetInstanceList(
457       what[constants.NV_INSTANCELIST])
458
459   if constants.NV_VGLIST in what:
460     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
461
462   if constants.NV_VERSION in what:
463     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
464                                     constants.RELEASE_VERSION)
465
466   if constants.NV_HVINFO in what:
467     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
468     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
469
470   if constants.NV_DRBDLIST in what:
471     try:
472       used_minors = bdev.DRBD8.GetUsedDevs().keys()
473     except errors.BlockDeviceError, err:
474       logging.warning("Can't get used minors list", exc_info=True)
475       used_minors = str(err)
476     result[constants.NV_DRBDLIST] = used_minors
477
478   return True, result
479
480
481 def GetVolumeList(vg_name):
482   """Compute list of logical volumes and their size.
483
484   @type vg_name: str
485   @param vg_name: the volume group whose LVs we should list
486   @rtype: dict
487   @return:
488       dictionary of all partions (key) with value being a tuple of
489       their size (in MiB), inactive and online status::
490
491         {'test1': ('20.06', True, True)}
492
493       in case of errors, a string is returned with the error
494       details.
495
496   """
497   lvs = {}
498   sep = '|'
499   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
500                          "--separator=%s" % sep,
501                          "-olv_name,lv_size,lv_attr", vg_name])
502   if result.failed:
503     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
504
505   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
506   for line in result.stdout.splitlines():
507     line = line.strip()
508     match = valid_line_re.match(line)
509     if not match:
510       logging.error("Invalid line returned from lvs output: '%s'", line)
511       continue
512     name, size, attr = match.groups()
513     inactive = attr[4] == '-'
514     online = attr[5] == 'o'
515     lvs[name] = (size, inactive, online)
516
517   return lvs
518
519
520 def ListVolumeGroups():
521   """List the volume groups and their size.
522
523   @rtype: dict
524   @return: dictionary with keys volume name and values the
525       size of the volume
526
527   """
528   return True, utils.ListVolumeGroups()
529
530
531 def NodeVolumes():
532   """List all volumes on this node.
533
534   @rtype: list
535   @return:
536     A list of dictionaries, each having four keys:
537       - name: the logical volume name,
538       - size: the size of the logical volume
539       - dev: the physical device on which the LV lives
540       - vg: the volume group to which it belongs
541
542     In case of errors, we return an empty list and log the
543     error.
544
545     Note that since a logical volume can live on multiple physical
546     volumes, the resulting list might include a logical volume
547     multiple times.
548
549   """
550   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
551                          "--separator=|",
552                          "--options=lv_name,lv_size,devices,vg_name"])
553   if result.failed:
554     logging.error("Failed to list logical volumes, lvs output: %s",
555                   result.output)
556     return []
557
558   def parse_dev(dev):
559     if '(' in dev:
560       return dev.split('(')[0]
561     else:
562       return dev
563
564   def map_line(line):
565     return {
566       'name': line[0].strip(),
567       'size': line[1].strip(),
568       'dev': parse_dev(line[2].strip()),
569       'vg': line[3].strip(),
570     }
571
572   return [map_line(line.split('|')) for line in result.stdout.splitlines()
573           if line.count('|') >= 3]
574
575
576 def BridgesExist(bridges_list):
577   """Check if a list of bridges exist on the current node.
578
579   @rtype: boolean
580   @return: C{True} if all of them exist, C{False} otherwise
581
582   """
583   missing = []
584   for bridge in bridges_list:
585     if not utils.BridgeExists(bridge):
586       missing.append(bridge)
587
588   if missing:
589     return False, "Missing bridges %s" % (", ".join(missing),)
590
591   return True, None
592
593
594 def GetInstanceList(hypervisor_list):
595   """Provides a list of instances.
596
597   @type hypervisor_list: list
598   @param hypervisor_list: the list of hypervisors to query information
599
600   @rtype: list
601   @return: a list of all running instances on the current node
602     - instance1.example.com
603     - instance2.example.com
604
605   """
606   results = []
607   for hname in hypervisor_list:
608     try:
609       names = hypervisor.GetHypervisor(hname).ListInstances()
610       results.extend(names)
611     except errors.HypervisorError, err:
612       _Fail("Error enumerating instances (hypervisor %s): %s",
613             hname, err, exc=True)
614
615   return results
616
617
618 def GetInstanceInfo(instance, hname):
619   """Gives back the informations about an instance as a dictionary.
620
621   @type instance: string
622   @param instance: the instance name
623   @type hname: string
624   @param hname: the hypervisor type of the instance
625
626   @rtype: dict
627   @return: dictionary with the following keys:
628       - memory: memory size of instance (int)
629       - state: xen state of instance (string)
630       - time: cpu time of instance (float)
631
632   """
633   output = {}
634
635   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
636   if iinfo is not None:
637     output['memory'] = iinfo[2]
638     output['state'] = iinfo[4]
639     output['time'] = iinfo[5]
640
641   return True, output
642
643
644 def GetInstanceMigratable(instance):
645   """Gives whether an instance can be migrated.
646
647   @type instance: L{objects.Instance}
648   @param instance: object representing the instance to be checked.
649
650   @rtype: tuple
651   @return: tuple of (result, description) where:
652       - result: whether the instance can be migrated or not
653       - description: a description of the issue, if relevant
654
655   """
656   hyper = hypervisor.GetHypervisor(instance.hypervisor)
657   if instance.name not in hyper.ListInstances():
658     return (False, 'not running')
659
660   for idx in range(len(instance.disks)):
661     link_name = _GetBlockDevSymlinkPath(instance.name, idx)
662     if not os.path.islink(link_name):
663       return (False, 'not restarted since ganeti 1.2.5')
664
665   return (True, '')
666
667
668 def GetAllInstancesInfo(hypervisor_list):
669   """Gather data about all instances.
670
671   This is the equivalent of L{GetInstanceInfo}, except that it
672   computes data for all instances at once, thus being faster if one
673   needs data about more than one instance.
674
675   @type hypervisor_list: list
676   @param hypervisor_list: list of hypervisors to query for instance data
677
678   @rtype: dict
679   @return: dictionary of instance: data, with data having the following keys:
680       - memory: memory size of instance (int)
681       - state: xen state of instance (string)
682       - time: cpu time of instance (float)
683       - vcpus: the number of vcpus
684
685   """
686   output = {}
687
688   for hname in hypervisor_list:
689     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
690     if iinfo:
691       for name, inst_id, memory, vcpus, state, times in iinfo:
692         value = {
693           'memory': memory,
694           'vcpus': vcpus,
695           'state': state,
696           'time': times,
697           }
698         if name in output:
699           # we only check static parameters, like memory and vcpus,
700           # and not state and time which can change between the
701           # invocations of the different hypervisors
702           for key in 'memory', 'vcpus':
703             if value[key] != output[name][key]:
704               _Fail("Instance %s is running twice"
705                     " with different parameters", name)
706         output[name] = value
707
708   return True, output
709
710
711 def InstanceOsAdd(instance, reinstall):
712   """Add an OS to an instance.
713
714   @type instance: L{objects.Instance}
715   @param instance: Instance whose OS is to be installed
716   @type reinstall: boolean
717   @param reinstall: whether this is an instance reinstall
718   @rtype: boolean
719   @return: the success of the operation
720
721   """
722   try:
723     inst_os = OSFromDisk(instance.os)
724   except errors.InvalidOS, err:
725     os_name, os_dir, os_err = err.args
726     if os_dir is None:
727       return (False, "Can't find OS '%s': %s" % (os_name, os_err))
728     else:
729       return (False, "Error parsing OS '%s' in directory %s: %s" %
730               (os_name, os_dir, os_err))
731
732   create_env = OSEnvironment(instance)
733   if reinstall:
734     create_env['INSTANCE_REINSTALL'] = "1"
735
736   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
737                                      instance.name, int(time.time()))
738
739   result = utils.RunCmd([inst_os.create_script], env=create_env,
740                         cwd=inst_os.path, output=logfile,)
741   if result.failed:
742     logging.error("os create command '%s' returned error: %s, logfile: %s,"
743                   " output: %s", result.cmd, result.fail_reason, logfile,
744                   result.output)
745     lines = [utils.SafeEncode(val)
746              for val in utils.TailFile(logfile, lines=20)]
747     return (False, "OS create script failed (%s), last lines in the"
748             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
749
750   return (True, "Successfully installed")
751
752
753 def RunRenameInstance(instance, old_name):
754   """Run the OS rename script for an instance.
755
756   @type instance: L{objects.Instance}
757   @param instance: Instance whose OS is to be installed
758   @type old_name: string
759   @param old_name: previous instance name
760   @rtype: boolean
761   @return: the success of the operation
762
763   """
764   inst_os = OSFromDisk(instance.os)
765
766   rename_env = OSEnvironment(instance)
767   rename_env['OLD_INSTANCE_NAME'] = old_name
768
769   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
770                                            old_name,
771                                            instance.name, int(time.time()))
772
773   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
774                         cwd=inst_os.path, output=logfile)
775
776   if result.failed:
777     logging.error("os create command '%s' returned error: %s output: %s",
778                   result.cmd, result.fail_reason, result.output)
779     lines = [utils.SafeEncode(val)
780              for val in utils.TailFile(logfile, lines=20)]
781     return (False, "OS rename script failed (%s), last lines in the"
782             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
783
784   return (True, "Rename successful")
785
786
787 def _GetVGInfo(vg_name):
788   """Get informations about the volume group.
789
790   @type vg_name: str
791   @param vg_name: the volume group which we query
792   @rtype: dict
793   @return:
794     A dictionary with the following keys:
795       - C{vg_size} is the total size of the volume group in MiB
796       - C{vg_free} is the free size of the volume group in MiB
797       - C{pv_count} are the number of physical disks in that VG
798
799     If an error occurs during gathering of data, we return the same dict
800     with keys all set to None.
801
802   """
803   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
804
805   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
806                          "--nosuffix", "--units=m", "--separator=:", vg_name])
807
808   if retval.failed:
809     logging.error("volume group %s not present", vg_name)
810     return retdic
811   valarr = retval.stdout.strip().rstrip(':').split(':')
812   if len(valarr) == 3:
813     try:
814       retdic = {
815         "vg_size": int(round(float(valarr[0]), 0)),
816         "vg_free": int(round(float(valarr[1]), 0)),
817         "pv_count": int(valarr[2]),
818         }
819     except ValueError, err:
820       logging.exception("Fail to parse vgs output")
821   else:
822     logging.error("vgs output has the wrong number of fields (expected"
823                   " three): %s", str(valarr))
824   return retdic
825
826
827 def _GetBlockDevSymlinkPath(instance_name, idx):
828   return os.path.join(constants.DISK_LINKS_DIR,
829                       "%s:%d" % (instance_name, idx))
830
831
832 def _SymlinkBlockDev(instance_name, device_path, idx):
833   """Set up symlinks to a instance's block device.
834
835   This is an auxiliary function run when an instance is start (on the primary
836   node) or when an instance is migrated (on the target node).
837
838
839   @param instance_name: the name of the target instance
840   @param device_path: path of the physical block device, on the node
841   @param idx: the disk index
842   @return: absolute path to the disk's symlink
843
844   """
845   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
846   try:
847     os.symlink(device_path, link_name)
848   except OSError, err:
849     if err.errno == errno.EEXIST:
850       if (not os.path.islink(link_name) or
851           os.readlink(link_name) != device_path):
852         os.remove(link_name)
853         os.symlink(device_path, link_name)
854     else:
855       raise
856
857   return link_name
858
859
860 def _RemoveBlockDevLinks(instance_name, disks):
861   """Remove the block device symlinks belonging to the given instance.
862
863   """
864   for idx, disk in enumerate(disks):
865     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
866     if os.path.islink(link_name):
867       try:
868         os.remove(link_name)
869       except OSError:
870         logging.exception("Can't remove symlink '%s'", link_name)
871
872
873 def _GatherAndLinkBlockDevs(instance):
874   """Set up an instance's block device(s).
875
876   This is run on the primary node at instance startup. The block
877   devices must be already assembled.
878
879   @type instance: L{objects.Instance}
880   @param instance: the instance whose disks we shoul assemble
881   @rtype: list
882   @return: list of (disk_object, device_path)
883
884   """
885   block_devices = []
886   for idx, disk in enumerate(instance.disks):
887     device = _RecursiveFindBD(disk)
888     if device is None:
889       raise errors.BlockDeviceError("Block device '%s' is not set up." %
890                                     str(disk))
891     device.Open()
892     try:
893       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
894     except OSError, e:
895       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
896                                     e.strerror)
897
898     block_devices.append((disk, link_name))
899
900   return block_devices
901
902
903 def StartInstance(instance):
904   """Start an instance.
905
906   @type instance: L{objects.Instance}
907   @param instance: the instance object
908   @rtype: boolean
909   @return: whether the startup was successful or not
910
911   """
912   running_instances = GetInstanceList([instance.hypervisor])
913
914   if instance.name in running_instances:
915     return (True, "Already running")
916
917   try:
918     block_devices = _GatherAndLinkBlockDevs(instance)
919     hyper = hypervisor.GetHypervisor(instance.hypervisor)
920     hyper.StartInstance(instance, block_devices)
921   except errors.BlockDeviceError, err:
922     _Fail("Block device error: %s", err, exc=True)
923   except errors.HypervisorError, err:
924     _RemoveBlockDevLinks(instance.name, instance.disks)
925     _Fail("Hypervisor error: %s", err, exc=True)
926
927   return (True, "Instance started successfully")
928
929
930 def InstanceShutdown(instance):
931   """Shut an instance down.
932
933   @note: this functions uses polling with a hardcoded timeout.
934
935   @type instance: L{objects.Instance}
936   @param instance: the instance object
937   @rtype: boolean
938   @return: whether the startup was successful or not
939
940   """
941   hv_name = instance.hypervisor
942   running_instances = GetInstanceList([hv_name])
943
944   if instance.name not in running_instances:
945     return (True, "Instance already stopped")
946
947   hyper = hypervisor.GetHypervisor(hv_name)
948   try:
949     hyper.StopInstance(instance)
950   except errors.HypervisorError, err:
951     _Fail("Failed to stop instance %s: %s", instance.name, err)
952
953   # test every 10secs for 2min
954
955   time.sleep(1)
956   for dummy in range(11):
957     if instance.name not in GetInstanceList([hv_name]):
958       break
959     time.sleep(10)
960   else:
961     # the shutdown did not succeed
962     logging.error("Shutdown of '%s' unsuccessful, using destroy",
963                   instance.name)
964
965     try:
966       hyper.StopInstance(instance, force=True)
967     except errors.HypervisorError, err:
968       _Fail("Failed to force stop instance %s: %s", instance.name, err)
969
970     time.sleep(1)
971     if instance.name in GetInstanceList([hv_name]):
972       _Fail("Could not shutdown instance %s even by destroy", instance.name)
973
974   _RemoveBlockDevLinks(instance.name, instance.disks)
975
976   return (True, "Instance has been shutdown successfully")
977
978
979 def InstanceReboot(instance, reboot_type):
980   """Reboot an instance.
981
982   @type instance: L{objects.Instance}
983   @param instance: the instance object to reboot
984   @type reboot_type: str
985   @param reboot_type: the type of reboot, one the following
986     constants:
987       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
988         instance OS, do not recreate the VM
989       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
990         restart the VM (at the hypervisor level)
991       - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
992         is not accepted here, since that mode is handled
993         differently
994   @rtype: boolean
995   @return: the success of the operation
996
997   """
998   running_instances = GetInstanceList([instance.hypervisor])
999
1000   if instance.name not in running_instances:
1001     _Fail("Cannot reboot instance %s that is not running", instance.name)
1002
1003   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1004   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1005     try:
1006       hyper.RebootInstance(instance)
1007     except errors.HypervisorError, err:
1008       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1009   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1010     try:
1011       stop_result = InstanceShutdown(instance)
1012       if not stop_result[0]:
1013         return stop_result
1014       return StartInstance(instance)
1015     except errors.HypervisorError, err:
1016       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1017   else:
1018     _Fail("Invalid reboot_type received: %s", reboot_type)
1019
1020   return (True, "Reboot successful")
1021
1022
1023 def MigrationInfo(instance):
1024   """Gather information about an instance to be migrated.
1025
1026   @type instance: L{objects.Instance}
1027   @param instance: the instance definition
1028
1029   """
1030   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1031   try:
1032     info = hyper.MigrationInfo(instance)
1033   except errors.HypervisorError, err:
1034     _Fail("Failed to fetch migration information: %s", err, exc=True)
1035   return (True, info)
1036
1037
1038 def AcceptInstance(instance, info, target):
1039   """Prepare the node to accept an instance.
1040
1041   @type instance: L{objects.Instance}
1042   @param instance: the instance definition
1043   @type info: string/data (opaque)
1044   @param info: migration information, from the source node
1045   @type target: string
1046   @param target: target host (usually ip), on this node
1047
1048   """
1049   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1050   try:
1051     hyper.AcceptInstance(instance, info, target)
1052   except errors.HypervisorError, err:
1053     _Fail("Failed to accept instance: %s", err, exc=True)
1054   return (True, "Accept successfull")
1055
1056
1057 def FinalizeMigration(instance, info, success):
1058   """Finalize any preparation to accept an instance.
1059
1060   @type instance: L{objects.Instance}
1061   @param instance: the instance definition
1062   @type info: string/data (opaque)
1063   @param info: migration information, from the source node
1064   @type success: boolean
1065   @param success: whether the migration was a success or a failure
1066
1067   """
1068   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1069   try:
1070     hyper.FinalizeMigration(instance, info, success)
1071   except errors.HypervisorError, err:
1072     _Fail("Failed to finalize migration: %s", err, exc=True)
1073   return (True, "Migration Finalized")
1074
1075
1076 def MigrateInstance(instance, target, live):
1077   """Migrates an instance to another node.
1078
1079   @type instance: L{objects.Instance}
1080   @param instance: the instance definition
1081   @type target: string
1082   @param target: the target node name
1083   @type live: boolean
1084   @param live: whether the migration should be done live or not (the
1085       interpretation of this parameter is left to the hypervisor)
1086   @rtype: tuple
1087   @return: a tuple of (success, msg) where:
1088       - succes is a boolean denoting the success/failure of the operation
1089       - msg is a string with details in case of failure
1090
1091   """
1092   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1093
1094   try:
1095     hyper.MigrateInstance(instance.name, target, live)
1096   except errors.HypervisorError, err:
1097     _Fail("Failed to migrate instance: %s", err, exc=True)
1098   return (True, "Migration successfull")
1099
1100
1101 def BlockdevCreate(disk, size, owner, on_primary, info):
1102   """Creates a block device for an instance.
1103
1104   @type disk: L{objects.Disk}
1105   @param disk: the object describing the disk we should create
1106   @type size: int
1107   @param size: the size of the physical underlying device, in MiB
1108   @type owner: str
1109   @param owner: the name of the instance for which disk is created,
1110       used for device cache data
1111   @type on_primary: boolean
1112   @param on_primary:  indicates if it is the primary node or not
1113   @type info: string
1114   @param info: string that will be sent to the physical device
1115       creation, used for example to set (LVM) tags on LVs
1116
1117   @return: the new unique_id of the device (this can sometime be
1118       computed only after creation), or None. On secondary nodes,
1119       it's not required to return anything.
1120
1121   """
1122   clist = []
1123   if disk.children:
1124     for child in disk.children:
1125       try:
1126         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1127       except errors.BlockDeviceError, err:
1128         _Fail("Can't assemble device %s: %s", child, err)
1129       if on_primary or disk.AssembleOnSecondary():
1130         # we need the children open in case the device itself has to
1131         # be assembled
1132         try:
1133           crdev.Open()
1134         except errors.BlockDeviceError, err:
1135           _Fail("Can't make child '%s' read-write: %s", child, err)
1136       clist.append(crdev)
1137
1138   try:
1139     device = bdev.Create(disk.dev_type, disk.physical_id, clist, size)
1140   except errors.BlockDeviceError, err:
1141     _Fail("Can't create block device: %s", err)
1142
1143   if on_primary or disk.AssembleOnSecondary():
1144     try:
1145       device.Assemble()
1146     except errors.BlockDeviceError, err:
1147       _Fail("Can't assemble device after creation, unusual event: %s", err)
1148     device.SetSyncSpeed(constants.SYNC_SPEED)
1149     if on_primary or disk.OpenOnSecondary():
1150       try:
1151         device.Open(force=True)
1152       except errors.BlockDeviceError, err:
1153         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1154     DevCacheManager.UpdateCache(device.dev_path, owner,
1155                                 on_primary, disk.iv_name)
1156
1157   device.SetInfo(info)
1158
1159   physical_id = device.unique_id
1160   return True, physical_id
1161
1162
1163 def BlockdevRemove(disk):
1164   """Remove a block device.
1165
1166   @note: This is intended to be called recursively.
1167
1168   @type disk: L{objects.Disk}
1169   @param disk: the disk object we should remove
1170   @rtype: boolean
1171   @return: the success of the operation
1172
1173   """
1174   msgs = []
1175   result = True
1176   try:
1177     rdev = _RecursiveFindBD(disk)
1178   except errors.BlockDeviceError, err:
1179     # probably can't attach
1180     logging.info("Can't attach to device %s in remove", disk)
1181     rdev = None
1182   if rdev is not None:
1183     r_path = rdev.dev_path
1184     try:
1185       rdev.Remove()
1186     except errors.BlockDeviceError, err:
1187       msgs.append(str(err))
1188       result = False
1189     if result:
1190       DevCacheManager.RemoveCache(r_path)
1191
1192   if disk.children:
1193     for child in disk.children:
1194       c_status, c_msg = BlockdevRemove(child)
1195       result = result and c_status
1196       if c_msg: # not an empty message
1197         msgs.append(c_msg)
1198
1199   return (result, "; ".join(msgs))
1200
1201
1202 def _RecursiveAssembleBD(disk, owner, as_primary):
1203   """Activate a block device for an instance.
1204
1205   This is run on the primary and secondary nodes for an instance.
1206
1207   @note: this function is called recursively.
1208
1209   @type disk: L{objects.Disk}
1210   @param disk: the disk we try to assemble
1211   @type owner: str
1212   @param owner: the name of the instance which owns the disk
1213   @type as_primary: boolean
1214   @param as_primary: if we should make the block device
1215       read/write
1216
1217   @return: the assembled device or None (in case no device
1218       was assembled)
1219   @raise errors.BlockDeviceError: in case there is an error
1220       during the activation of the children or the device
1221       itself
1222
1223   """
1224   children = []
1225   if disk.children:
1226     mcn = disk.ChildrenNeeded()
1227     if mcn == -1:
1228       mcn = 0 # max number of Nones allowed
1229     else:
1230       mcn = len(disk.children) - mcn # max number of Nones
1231     for chld_disk in disk.children:
1232       try:
1233         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1234       except errors.BlockDeviceError, err:
1235         if children.count(None) >= mcn:
1236           raise
1237         cdev = None
1238         logging.error("Error in child activation (but continuing): %s",
1239                       str(err))
1240       children.append(cdev)
1241
1242   if as_primary or disk.AssembleOnSecondary():
1243     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children)
1244     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1245     result = r_dev
1246     if as_primary or disk.OpenOnSecondary():
1247       r_dev.Open()
1248     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1249                                 as_primary, disk.iv_name)
1250
1251   else:
1252     result = True
1253   return result
1254
1255
1256 def BlockdevAssemble(disk, owner, as_primary):
1257   """Activate a block device for an instance.
1258
1259   This is a wrapper over _RecursiveAssembleBD.
1260
1261   @rtype: str or boolean
1262   @return: a C{/dev/...} path for primary nodes, and
1263       C{True} for secondary nodes
1264
1265   """
1266   status = True
1267   result = "no error information"
1268   try:
1269     result = _RecursiveAssembleBD(disk, owner, as_primary)
1270     if isinstance(result, bdev.BlockDev):
1271       result = result.dev_path
1272   except errors.BlockDeviceError, err:
1273     result = "Error while assembling disk: %s" % str(err)
1274     status = False
1275   return (status, result)
1276
1277
1278 def BlockdevShutdown(disk):
1279   """Shut down a block device.
1280
1281   First, if the device is assembled (Attach() is successfull), then
1282   the device is shutdown. Then the children of the device are
1283   shutdown.
1284
1285   This function is called recursively. Note that we don't cache the
1286   children or such, as oppossed to assemble, shutdown of different
1287   devices doesn't require that the upper device was active.
1288
1289   @type disk: L{objects.Disk}
1290   @param disk: the description of the disk we should
1291       shutdown
1292   @rtype: boolean
1293   @return: the success of the operation
1294
1295   """
1296   msgs = []
1297   result = True
1298   r_dev = _RecursiveFindBD(disk)
1299   if r_dev is not None:
1300     r_path = r_dev.dev_path
1301     try:
1302       r_dev.Shutdown()
1303       DevCacheManager.RemoveCache(r_path)
1304     except errors.BlockDeviceError, err:
1305       msgs.append(str(err))
1306       result = False
1307
1308   if disk.children:
1309     for child in disk.children:
1310       c_status, c_msg = BlockdevShutdown(child)
1311       result = result and c_status
1312       if c_msg: # not an empty message
1313         msgs.append(c_msg)
1314
1315   return (result, "; ".join(msgs))
1316
1317
1318 def BlockdevAddchildren(parent_cdev, new_cdevs):
1319   """Extend a mirrored block device.
1320
1321   @type parent_cdev: L{objects.Disk}
1322   @param parent_cdev: the disk to which we should add children
1323   @type new_cdevs: list of L{objects.Disk}
1324   @param new_cdevs: the list of children which we should add
1325   @rtype: boolean
1326   @return: the success of the operation
1327
1328   """
1329   parent_bdev = _RecursiveFindBD(parent_cdev)
1330   if parent_bdev is None:
1331     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1332   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1333   if new_bdevs.count(None) > 0:
1334     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1335   parent_bdev.AddChildren(new_bdevs)
1336   return (True, None)
1337
1338
1339 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1340   """Shrink a mirrored block device.
1341
1342   @type parent_cdev: L{objects.Disk}
1343   @param parent_cdev: the disk from which we should remove children
1344   @type new_cdevs: list of L{objects.Disk}
1345   @param new_cdevs: the list of children which we should remove
1346   @rtype: boolean
1347   @return: the success of the operation
1348
1349   """
1350   parent_bdev = _RecursiveFindBD(parent_cdev)
1351   if parent_bdev is None:
1352     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1353   devs = []
1354   for disk in new_cdevs:
1355     rpath = disk.StaticDevPath()
1356     if rpath is None:
1357       bd = _RecursiveFindBD(disk)
1358       if bd is None:
1359         _Fail("Can't find device %s while removing children", disk)
1360       else:
1361         devs.append(bd.dev_path)
1362     else:
1363       devs.append(rpath)
1364   parent_bdev.RemoveChildren(devs)
1365   return (True, None)
1366
1367
1368 def BlockdevGetmirrorstatus(disks):
1369   """Get the mirroring status of a list of devices.
1370
1371   @type disks: list of L{objects.Disk}
1372   @param disks: the list of disks which we should query
1373   @rtype: disk
1374   @return:
1375       a list of (mirror_done, estimated_time) tuples, which
1376       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1377   @raise errors.BlockDeviceError: if any of the disks cannot be
1378       found
1379
1380   """
1381   stats = []
1382   for dsk in disks:
1383     rbd = _RecursiveFindBD(dsk)
1384     if rbd is None:
1385       _Fail("Can't find device %s", dsk)
1386     stats.append(rbd.CombinedSyncStatus())
1387   return True, stats
1388
1389
1390 def _RecursiveFindBD(disk):
1391   """Check if a device is activated.
1392
1393   If so, return informations about the real device.
1394
1395   @type disk: L{objects.Disk}
1396   @param disk: the disk object we need to find
1397
1398   @return: None if the device can't be found,
1399       otherwise the device instance
1400
1401   """
1402   children = []
1403   if disk.children:
1404     for chdisk in disk.children:
1405       children.append(_RecursiveFindBD(chdisk))
1406
1407   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1408
1409
1410 def BlockdevFind(disk):
1411   """Check if a device is activated.
1412
1413   If it is, return informations about the real device.
1414
1415   @type disk: L{objects.Disk}
1416   @param disk: the disk to find
1417   @rtype: None or tuple
1418   @return: None if the disk cannot be found, otherwise a
1419       tuple (device_path, major, minor, sync_percent,
1420       estimated_time, is_degraded)
1421
1422   """
1423   try:
1424     rbd = _RecursiveFindBD(disk)
1425   except errors.BlockDeviceError, err:
1426     _Fail("Failed to find device: %s", err, exc=True)
1427   if rbd is None:
1428     return (True, None)
1429   return (True, (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus())
1430
1431
1432 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1433   """Write a file to the filesystem.
1434
1435   This allows the master to overwrite(!) a file. It will only perform
1436   the operation if the file belongs to a list of configuration files.
1437
1438   @type file_name: str
1439   @param file_name: the target file name
1440   @type data: str
1441   @param data: the new contents of the file
1442   @type mode: int
1443   @param mode: the mode to give the file (can be None)
1444   @type uid: int
1445   @param uid: the owner of the file (can be -1 for default)
1446   @type gid: int
1447   @param gid: the group of the file (can be -1 for default)
1448   @type atime: float
1449   @param atime: the atime to set on the file (can be None)
1450   @type mtime: float
1451   @param mtime: the mtime to set on the file (can be None)
1452   @rtype: boolean
1453   @return: the success of the operation; errors are logged
1454       in the node daemon log
1455
1456   """
1457   if not os.path.isabs(file_name):
1458     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1459
1460   allowed_files = set([
1461     constants.CLUSTER_CONF_FILE,
1462     constants.ETC_HOSTS,
1463     constants.SSH_KNOWN_HOSTS_FILE,
1464     constants.VNC_PASSWORD_FILE,
1465     constants.RAPI_CERT_FILE,
1466     constants.RAPI_USERS_FILE,
1467     ])
1468
1469   for hv_name in constants.HYPER_TYPES:
1470     hv_class = hypervisor.GetHypervisor(hv_name)
1471     allowed_files.update(hv_class.GetAncillaryFiles())
1472
1473   if file_name not in allowed_files:
1474     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1475           file_name)
1476
1477   raw_data = _Decompress(data)
1478
1479   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1480                   atime=atime, mtime=mtime)
1481   return (True, "success")
1482
1483
1484 def WriteSsconfFiles(values):
1485   """Update all ssconf files.
1486
1487   Wrapper around the SimpleStore.WriteFiles.
1488
1489   """
1490   ssconf.SimpleStore().WriteFiles(values)
1491
1492
1493 def _ErrnoOrStr(err):
1494   """Format an EnvironmentError exception.
1495
1496   If the L{err} argument has an errno attribute, it will be looked up
1497   and converted into a textual C{E...} description. Otherwise the
1498   string representation of the error will be returned.
1499
1500   @type err: L{EnvironmentError}
1501   @param err: the exception to format
1502
1503   """
1504   if hasattr(err, 'errno'):
1505     detail = errno.errorcode[err.errno]
1506   else:
1507     detail = str(err)
1508   return detail
1509
1510
1511 def _OSOndiskVersion(name, os_dir):
1512   """Compute and return the API version of a given OS.
1513
1514   This function will try to read the API version of the OS given by
1515   the 'name' parameter and residing in the 'os_dir' directory.
1516
1517   @type name: str
1518   @param name: the OS name we should look for
1519   @type os_dir: str
1520   @param os_dir: the directory inwhich we should look for the OS
1521   @rtype: int or None
1522   @return:
1523       Either an integer denoting the version or None in the
1524       case when this is not a valid OS name.
1525   @raise errors.InvalidOS: if the OS cannot be found
1526
1527   """
1528   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1529
1530   try:
1531     st = os.stat(api_file)
1532   except EnvironmentError, err:
1533     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1534                            " found (%s)" % _ErrnoOrStr(err))
1535
1536   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1537     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1538                            " a regular file")
1539
1540   try:
1541     f = open(api_file)
1542     try:
1543       api_versions = f.readlines()
1544     finally:
1545       f.close()
1546   except EnvironmentError, err:
1547     raise errors.InvalidOS(name, os_dir, "error while reading the"
1548                            " API version (%s)" % _ErrnoOrStr(err))
1549
1550   api_versions = [version.strip() for version in api_versions]
1551   try:
1552     api_versions = [int(version) for version in api_versions]
1553   except (TypeError, ValueError), err:
1554     raise errors.InvalidOS(name, os_dir,
1555                            "API version is not integer (%s)" % str(err))
1556
1557   return api_versions
1558
1559
1560 def DiagnoseOS(top_dirs=None):
1561   """Compute the validity for all OSes.
1562
1563   @type top_dirs: list
1564   @param top_dirs: the list of directories in which to
1565       search (if not given defaults to
1566       L{constants.OS_SEARCH_PATH})
1567   @rtype: list of L{objects.OS}
1568   @return: an OS object for each name in all the given
1569       directories
1570
1571   """
1572   if top_dirs is None:
1573     top_dirs = constants.OS_SEARCH_PATH
1574
1575   result = []
1576   for dir_name in top_dirs:
1577     if os.path.isdir(dir_name):
1578       try:
1579         f_names = utils.ListVisibleFiles(dir_name)
1580       except EnvironmentError, err:
1581         logging.exception("Can't list the OS directory %s", dir_name)
1582         break
1583       for name in f_names:
1584         try:
1585           os_inst = OSFromDisk(name, base_dir=dir_name)
1586           result.append(os_inst)
1587         except errors.InvalidOS, err:
1588           result.append(objects.OS.FromInvalidOS(err))
1589
1590   return result
1591
1592
1593 def OSFromDisk(name, base_dir=None):
1594   """Create an OS instance from disk.
1595
1596   This function will return an OS instance if the given name is a
1597   valid OS name. Otherwise, it will raise an appropriate
1598   L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1599
1600   @type base_dir: string
1601   @keyword base_dir: Base directory containing OS installations.
1602                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1603   @rtype: L{objects.OS}
1604   @return: the OS instance if we find a valid one
1605   @raise errors.InvalidOS: if we don't find a valid OS
1606
1607   """
1608   if base_dir is None:
1609     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1610     if os_dir is None:
1611       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1612   else:
1613     os_dir = os.path.sep.join([base_dir, name])
1614
1615   api_versions = _OSOndiskVersion(name, os_dir)
1616
1617   if constants.OS_API_VERSION not in api_versions:
1618     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1619                            " (found %s want %s)"
1620                            % (api_versions, constants.OS_API_VERSION))
1621
1622   # OS Scripts dictionary, we will populate it with the actual script names
1623   os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1624
1625   for script in os_scripts:
1626     os_scripts[script] = os.path.sep.join([os_dir, script])
1627
1628     try:
1629       st = os.stat(os_scripts[script])
1630     except EnvironmentError, err:
1631       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1632                              (script, _ErrnoOrStr(err)))
1633
1634     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1635       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1636                              script)
1637
1638     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1639       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1640                              script)
1641
1642
1643   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1644                     create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1645                     export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1646                     import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1647                     rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1648                     api_versions=api_versions)
1649
1650 def OSEnvironment(instance, debug=0):
1651   """Calculate the environment for an os script.
1652
1653   @type instance: L{objects.Instance}
1654   @param instance: target instance for the os script run
1655   @type debug: integer
1656   @param debug: debug level (0 or 1, for OS Api 10)
1657   @rtype: dict
1658   @return: dict of environment variables
1659   @raise errors.BlockDeviceError: if the block device
1660       cannot be found
1661
1662   """
1663   result = {}
1664   result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1665   result['INSTANCE_NAME'] = instance.name
1666   result['INSTANCE_OS'] = instance.os
1667   result['HYPERVISOR'] = instance.hypervisor
1668   result['DISK_COUNT'] = '%d' % len(instance.disks)
1669   result['NIC_COUNT'] = '%d' % len(instance.nics)
1670   result['DEBUG_LEVEL'] = '%d' % debug
1671   for idx, disk in enumerate(instance.disks):
1672     real_disk = _RecursiveFindBD(disk)
1673     if real_disk is None:
1674       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1675                                     str(disk))
1676     real_disk.Open()
1677     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1678     result['DISK_%d_ACCESS' % idx] = disk.mode
1679     if constants.HV_DISK_TYPE in instance.hvparams:
1680       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1681         instance.hvparams[constants.HV_DISK_TYPE]
1682     if disk.dev_type in constants.LDS_BLOCK:
1683       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1684     elif disk.dev_type == constants.LD_FILE:
1685       result['DISK_%d_BACKEND_TYPE' % idx] = \
1686         'file:%s' % disk.physical_id[0]
1687   for idx, nic in enumerate(instance.nics):
1688     result['NIC_%d_MAC' % idx] = nic.mac
1689     if nic.ip:
1690       result['NIC_%d_IP' % idx] = nic.ip
1691     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1692     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1693       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1694     if nic.nicparams[constants.NIC_LINK]:
1695       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1696     if constants.HV_NIC_TYPE in instance.hvparams:
1697       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1698         instance.hvparams[constants.HV_NIC_TYPE]
1699
1700   return result
1701
1702 def BlockdevGrow(disk, amount):
1703   """Grow a stack of block devices.
1704
1705   This function is called recursively, with the childrens being the
1706   first ones to resize.
1707
1708   @type disk: L{objects.Disk}
1709   @param disk: the disk to be grown
1710   @rtype: (status, result)
1711   @return: a tuple with the status of the operation
1712       (True/False), and the errors message if status
1713       is False
1714
1715   """
1716   r_dev = _RecursiveFindBD(disk)
1717   if r_dev is None:
1718     return False, "Cannot find block device %s" % (disk,)
1719
1720   try:
1721     r_dev.Grow(amount)
1722   except errors.BlockDeviceError, err:
1723     _Fail("Failed to grow block device: %s", err, exc=True)
1724
1725   return True, None
1726
1727
1728 def BlockdevSnapshot(disk):
1729   """Create a snapshot copy of a block device.
1730
1731   This function is called recursively, and the snapshot is actually created
1732   just for the leaf lvm backend device.
1733
1734   @type disk: L{objects.Disk}
1735   @param disk: the disk to be snapshotted
1736   @rtype: string
1737   @return: snapshot disk path
1738
1739   """
1740   if disk.children:
1741     if len(disk.children) == 1:
1742       # only one child, let's recurse on it
1743       return BlockdevSnapshot(disk.children[0])
1744     else:
1745       # more than one child, choose one that matches
1746       for child in disk.children:
1747         if child.size == disk.size:
1748           # return implies breaking the loop
1749           return BlockdevSnapshot(child)
1750   elif disk.dev_type == constants.LD_LV:
1751     r_dev = _RecursiveFindBD(disk)
1752     if r_dev is not None:
1753       # let's stay on the safe side and ask for the full size, for now
1754       return True, r_dev.Snapshot(disk.size)
1755     else:
1756       _Fail("Cannot find block device %s", disk)
1757   else:
1758     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1759           disk.unique_id, disk.dev_type)
1760
1761
1762 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1763   """Export a block device snapshot to a remote node.
1764
1765   @type disk: L{objects.Disk}
1766   @param disk: the description of the disk to export
1767   @type dest_node: str
1768   @param dest_node: the destination node to export to
1769   @type instance: L{objects.Instance}
1770   @param instance: the instance object to whom the disk belongs
1771   @type cluster_name: str
1772   @param cluster_name: the cluster name, needed for SSH hostalias
1773   @type idx: int
1774   @param idx: the index of the disk in the instance's disk list,
1775       used to export to the OS scripts environment
1776   @rtype: boolean
1777   @return: the success of the operation
1778
1779   """
1780   export_env = OSEnvironment(instance)
1781
1782   inst_os = OSFromDisk(instance.os)
1783   export_script = inst_os.export_script
1784
1785   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1786                                      instance.name, int(time.time()))
1787   if not os.path.exists(constants.LOG_OS_DIR):
1788     os.mkdir(constants.LOG_OS_DIR, 0750)
1789   real_disk = _RecursiveFindBD(disk)
1790   if real_disk is None:
1791     _Fail("Block device '%s' is not set up", disk)
1792
1793   real_disk.Open()
1794
1795   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1796   export_env['EXPORT_INDEX'] = str(idx)
1797
1798   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1799   destfile = disk.physical_id[1]
1800
1801   # the target command is built out of three individual commands,
1802   # which are joined by pipes; we check each individual command for
1803   # valid parameters
1804   expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1805                                export_script, logfile)
1806
1807   comprcmd = "gzip"
1808
1809   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1810                                 destdir, destdir, destfile)
1811   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1812                                                    constants.GANETI_RUNAS,
1813                                                    destcmd)
1814
1815   # all commands have been checked, so we're safe to combine them
1816   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1817
1818   result = utils.RunCmd(command, env=export_env)
1819
1820   if result.failed:
1821     _Fail("OS snapshot export command '%s' returned error: %s"
1822           " output: %s", command, result.fail_reason, result.output)
1823
1824   return (True, None)
1825
1826
1827 def FinalizeExport(instance, snap_disks):
1828   """Write out the export configuration information.
1829
1830   @type instance: L{objects.Instance}
1831   @param instance: the instance which we export, used for
1832       saving configuration
1833   @type snap_disks: list of L{objects.Disk}
1834   @param snap_disks: list of snapshot block devices, which
1835       will be used to get the actual name of the dump file
1836
1837   @rtype: boolean
1838   @return: the success of the operation
1839
1840   """
1841   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1842   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1843
1844   config = objects.SerializableConfigParser()
1845
1846   config.add_section(constants.INISECT_EXP)
1847   config.set(constants.INISECT_EXP, 'version', '0')
1848   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1849   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1850   config.set(constants.INISECT_EXP, 'os', instance.os)
1851   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1852
1853   config.add_section(constants.INISECT_INS)
1854   config.set(constants.INISECT_INS, 'name', instance.name)
1855   config.set(constants.INISECT_INS, 'memory', '%d' %
1856              instance.beparams[constants.BE_MEMORY])
1857   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1858              instance.beparams[constants.BE_VCPUS])
1859   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1860
1861   nic_total = 0
1862   for nic_count, nic in enumerate(instance.nics):
1863     nic_total += 1
1864     config.set(constants.INISECT_INS, 'nic%d_mac' %
1865                nic_count, '%s' % nic.mac)
1866     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1867     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1868                '%s' % nic.bridge)
1869   # TODO: redundant: on load can read nics until it doesn't exist
1870   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1871
1872   disk_total = 0
1873   for disk_count, disk in enumerate(snap_disks):
1874     if disk:
1875       disk_total += 1
1876       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1877                  ('%s' % disk.iv_name))
1878       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1879                  ('%s' % disk.physical_id[1]))
1880       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1881                  ('%d' % disk.size))
1882
1883   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1884
1885   utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1886                   data=config.Dumps())
1887   shutil.rmtree(finaldestdir, True)
1888   shutil.move(destdir, finaldestdir)
1889
1890   return True, None
1891
1892
1893 def ExportInfo(dest):
1894   """Get export configuration information.
1895
1896   @type dest: str
1897   @param dest: directory containing the export
1898
1899   @rtype: L{objects.SerializableConfigParser}
1900   @return: a serializable config file containing the
1901       export info
1902
1903   """
1904   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1905
1906   config = objects.SerializableConfigParser()
1907   config.read(cff)
1908
1909   if (not config.has_section(constants.INISECT_EXP) or
1910       not config.has_section(constants.INISECT_INS)):
1911     _Fail("Export info file doesn't have the required fields")
1912
1913   return True, config.Dumps()
1914
1915
1916 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1917   """Import an os image into an instance.
1918
1919   @type instance: L{objects.Instance}
1920   @param instance: instance to import the disks into
1921   @type src_node: string
1922   @param src_node: source node for the disk images
1923   @type src_images: list of string
1924   @param src_images: absolute paths of the disk images
1925   @rtype: list of boolean
1926   @return: each boolean represent the success of importing the n-th disk
1927
1928   """
1929   import_env = OSEnvironment(instance)
1930   inst_os = OSFromDisk(instance.os)
1931   import_script = inst_os.import_script
1932
1933   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1934                                         instance.name, int(time.time()))
1935   if not os.path.exists(constants.LOG_OS_DIR):
1936     os.mkdir(constants.LOG_OS_DIR, 0750)
1937
1938   comprcmd = "gunzip"
1939   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1940                                import_script, logfile)
1941
1942   final_result = []
1943   for idx, image in enumerate(src_images):
1944     if image:
1945       destcmd = utils.BuildShellCmd('cat %s', image)
1946       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1947                                                        constants.GANETI_RUNAS,
1948                                                        destcmd)
1949       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1950       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1951       import_env['IMPORT_INDEX'] = str(idx)
1952       result = utils.RunCmd(command, env=import_env)
1953       if result.failed:
1954         logging.error("Disk import command '%s' returned error: %s"
1955                       " output: %s", command, result.fail_reason,
1956                       result.output)
1957         final_result.append("error importing disk %d: %s, %s" %
1958                             (idx, result.fail_reason, result.output[-100]))
1959
1960   if final_result:
1961     return False, "; ".join(final_result)
1962   return True, None
1963
1964
1965 def ListExports():
1966   """Return a list of exports currently available on this machine.
1967
1968   @rtype: list
1969   @return: list of the exports
1970
1971   """
1972   if os.path.isdir(constants.EXPORT_DIR):
1973     return True, utils.ListVisibleFiles(constants.EXPORT_DIR)
1974   else:
1975     return False, "No exports directory"
1976
1977
1978 def RemoveExport(export):
1979   """Remove an existing export from the node.
1980
1981   @type export: str
1982   @param export: the name of the export to remove
1983   @rtype: boolean
1984   @return: the success of the operation
1985
1986   """
1987   target = os.path.join(constants.EXPORT_DIR, export)
1988
1989   try:
1990     shutil.rmtree(target)
1991   except EnvironmentError, err:
1992     _Fail("Error while removing the export: %s", err, exc=True)
1993
1994   return True, None
1995
1996
1997 def BlockdevRename(devlist):
1998   """Rename a list of block devices.
1999
2000   @type devlist: list of tuples
2001   @param devlist: list of tuples of the form  (disk,
2002       new_logical_id, new_physical_id); disk is an
2003       L{objects.Disk} object describing the current disk,
2004       and new logical_id/physical_id is the name we
2005       rename it to
2006   @rtype: boolean
2007   @return: True if all renames succeeded, False otherwise
2008
2009   """
2010   msgs = []
2011   result = True
2012   for disk, unique_id in devlist:
2013     dev = _RecursiveFindBD(disk)
2014     if dev is None:
2015       msgs.append("Can't find device %s in rename" % str(disk))
2016       result = False
2017       continue
2018     try:
2019       old_rpath = dev.dev_path
2020       dev.Rename(unique_id)
2021       new_rpath = dev.dev_path
2022       if old_rpath != new_rpath:
2023         DevCacheManager.RemoveCache(old_rpath)
2024         # FIXME: we should add the new cache information here, like:
2025         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2026         # but we don't have the owner here - maybe parse from existing
2027         # cache? for now, we only lose lvm data when we rename, which
2028         # is less critical than DRBD or MD
2029     except errors.BlockDeviceError, err:
2030       msgs.append("Can't rename device '%s' to '%s': %s" %
2031                   (dev, unique_id, err))
2032       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2033       result = False
2034   return (result, "; ".join(msgs))
2035
2036
2037 def _TransformFileStorageDir(file_storage_dir):
2038   """Checks whether given file_storage_dir is valid.
2039
2040   Checks wheter the given file_storage_dir is within the cluster-wide
2041   default file_storage_dir stored in SimpleStore. Only paths under that
2042   directory are allowed.
2043
2044   @type file_storage_dir: str
2045   @param file_storage_dir: the path to check
2046
2047   @return: the normalized path if valid, None otherwise
2048
2049   """
2050   cfg = _GetConfig()
2051   file_storage_dir = os.path.normpath(file_storage_dir)
2052   base_file_storage_dir = cfg.GetFileStorageDir()
2053   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2054       base_file_storage_dir):
2055     logging.error("file storage directory '%s' is not under base file"
2056                   " storage directory '%s'",
2057                   file_storage_dir, base_file_storage_dir)
2058     return None
2059   return file_storage_dir
2060
2061
2062 def CreateFileStorageDir(file_storage_dir):
2063   """Create file storage directory.
2064
2065   @type file_storage_dir: str
2066   @param file_storage_dir: directory to create
2067
2068   @rtype: tuple
2069   @return: tuple with first element a boolean indicating wheter dir
2070       creation was successful or not
2071
2072   """
2073   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2074   result = True,
2075   if not file_storage_dir:
2076     result = False,
2077   else:
2078     if os.path.exists(file_storage_dir):
2079       if not os.path.isdir(file_storage_dir):
2080         logging.error("'%s' is not a directory", file_storage_dir)
2081         result = False,
2082     else:
2083       try:
2084         os.makedirs(file_storage_dir, 0750)
2085       except OSError, err:
2086         logging.error("Cannot create file storage directory '%s': %s",
2087                       file_storage_dir, err)
2088         result = False,
2089   return result
2090
2091
2092 def RemoveFileStorageDir(file_storage_dir):
2093   """Remove file storage directory.
2094
2095   Remove it only if it's empty. If not log an error and return.
2096
2097   @type file_storage_dir: str
2098   @param file_storage_dir: the directory we should cleanup
2099   @rtype: tuple (success,)
2100   @return: tuple of one element, C{success}, denoting
2101       whether the operation was successfull
2102
2103   """
2104   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2105   result = True,
2106   if not file_storage_dir:
2107     result = False,
2108   else:
2109     if os.path.exists(file_storage_dir):
2110       if not os.path.isdir(file_storage_dir):
2111         logging.error("'%s' is not a directory", file_storage_dir)
2112         result = False,
2113       # deletes dir only if empty, otherwise we want to return False
2114       try:
2115         os.rmdir(file_storage_dir)
2116       except OSError, err:
2117         logging.exception("Cannot remove file storage directory '%s'",
2118                           file_storage_dir)
2119         result = False,
2120   return result
2121
2122
2123 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2124   """Rename the file storage directory.
2125
2126   @type old_file_storage_dir: str
2127   @param old_file_storage_dir: the current path
2128   @type new_file_storage_dir: str
2129   @param new_file_storage_dir: the name we should rename to
2130   @rtype: tuple (success,)
2131   @return: tuple of one element, C{success}, denoting
2132       whether the operation was successful
2133
2134   """
2135   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2136   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2137   result = True,
2138   if not old_file_storage_dir or not new_file_storage_dir:
2139     result = False,
2140   else:
2141     if not os.path.exists(new_file_storage_dir):
2142       if os.path.isdir(old_file_storage_dir):
2143         try:
2144           os.rename(old_file_storage_dir, new_file_storage_dir)
2145         except OSError, err:
2146           logging.exception("Cannot rename '%s' to '%s'",
2147                             old_file_storage_dir, new_file_storage_dir)
2148           result =  False,
2149       else:
2150         logging.error("'%s' is not a directory", old_file_storage_dir)
2151         result = False,
2152     else:
2153       if os.path.exists(old_file_storage_dir):
2154         logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
2155                       old_file_storage_dir, new_file_storage_dir)
2156         result = False,
2157   return result
2158
2159
2160 def _IsJobQueueFile(file_name):
2161   """Checks whether the given filename is in the queue directory.
2162
2163   @type file_name: str
2164   @param file_name: the file name we should check
2165   @rtype: boolean
2166   @return: whether the file is under the queue directory
2167
2168   """
2169   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2170   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2171
2172   if not result:
2173     logging.error("'%s' is not a file in the queue directory",
2174                   file_name)
2175
2176   return result
2177
2178
2179 def JobQueueUpdate(file_name, content):
2180   """Updates a file in the queue directory.
2181
2182   This is just a wrapper over L{utils.WriteFile}, with proper
2183   checking.
2184
2185   @type file_name: str
2186   @param file_name: the job file name
2187   @type content: str
2188   @param content: the new job contents
2189   @rtype: boolean
2190   @return: the success of the operation
2191
2192   """
2193   if not _IsJobQueueFile(file_name):
2194     return False
2195
2196   # Write and replace the file atomically
2197   utils.WriteFile(file_name, data=_Decompress(content))
2198
2199   return True
2200
2201
2202 def JobQueueRename(old, new):
2203   """Renames a job queue file.
2204
2205   This is just a wrapper over os.rename with proper checking.
2206
2207   @type old: str
2208   @param old: the old (actual) file name
2209   @type new: str
2210   @param new: the desired file name
2211   @rtype: boolean
2212   @return: the success of the operation
2213
2214   """
2215   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2216     return False
2217
2218   utils.RenameFile(old, new, mkdir=True)
2219
2220   return True
2221
2222
2223 def JobQueueSetDrainFlag(drain_flag):
2224   """Set the drain flag for the queue.
2225
2226   This will set or unset the queue drain flag.
2227
2228   @type drain_flag: boolean
2229   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2230   @rtype: boolean
2231   @return: always True
2232   @warning: the function always returns True
2233
2234   """
2235   if drain_flag:
2236     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2237   else:
2238     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2239
2240   return True
2241
2242
2243 def BlockdevClose(instance_name, disks):
2244   """Closes the given block devices.
2245
2246   This means they will be switched to secondary mode (in case of
2247   DRBD).
2248
2249   @param instance_name: if the argument is not empty, the symlinks
2250       of this instance will be removed
2251   @type disks: list of L{objects.Disk}
2252   @param disks: the list of disks to be closed
2253   @rtype: tuple (success, message)
2254   @return: a tuple of success and message, where success
2255       indicates the succes of the operation, and message
2256       which will contain the error details in case we
2257       failed
2258
2259   """
2260   bdevs = []
2261   for cf in disks:
2262     rd = _RecursiveFindBD(cf)
2263     if rd is None:
2264       _Fail("Can't find device %s", cf)
2265     bdevs.append(rd)
2266
2267   msg = []
2268   for rd in bdevs:
2269     try:
2270       rd.Close()
2271     except errors.BlockDeviceError, err:
2272       msg.append(str(err))
2273   if msg:
2274     return (False, "Can't make devices secondary: %s" % ",".join(msg))
2275   else:
2276     if instance_name:
2277       _RemoveBlockDevLinks(instance_name, disks)
2278     return (True, "All devices secondary")
2279
2280
2281 def ValidateHVParams(hvname, hvparams):
2282   """Validates the given hypervisor parameters.
2283
2284   @type hvname: string
2285   @param hvname: the hypervisor name
2286   @type hvparams: dict
2287   @param hvparams: the hypervisor parameters to be validated
2288   @rtype: tuple (success, message)
2289   @return: a tuple of success and message, where success
2290       indicates the succes of the operation, and message
2291       which will contain the error details in case we
2292       failed
2293
2294   """
2295   try:
2296     hv_type = hypervisor.GetHypervisor(hvname)
2297     hv_type.ValidateParameters(hvparams)
2298     return (True, "Validation passed")
2299   except errors.HypervisorError, err:
2300     return (False, str(err))
2301
2302
2303 def DemoteFromMC():
2304   """Demotes the current node from master candidate role.
2305
2306   """
2307   # try to ensure we're not the master by mistake
2308   master, myself = ssconf.GetMasterAndMyself()
2309   if master == myself:
2310     return (False, "ssconf status shows I'm the master node, will not demote")
2311   pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2312   if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2313     return (False, "The master daemon is running, will not demote")
2314   try:
2315     utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2316   except EnvironmentError, err:
2317     if err.errno != errno.ENOENT:
2318       return (False, "Error while backing up cluster file: %s" % str(err))
2319   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2320   return (True, "Done")
2321
2322
2323 def _FindDisks(nodes_ip, disks):
2324   """Sets the physical ID on disks and returns the block devices.
2325
2326   """
2327   # set the correct physical ID
2328   my_name = utils.HostInfo().name
2329   for cf in disks:
2330     cf.SetPhysicalID(my_name, nodes_ip)
2331
2332   bdevs = []
2333
2334   for cf in disks:
2335     rd = _RecursiveFindBD(cf)
2336     if rd is None:
2337       return (False, "Can't find device %s" % cf)
2338     bdevs.append(rd)
2339   return (True, bdevs)
2340
2341
2342 def DrbdDisconnectNet(nodes_ip, disks):
2343   """Disconnects the network on a list of drbd devices.
2344
2345   """
2346   status, bdevs = _FindDisks(nodes_ip, disks)
2347   if not status:
2348     return status, bdevs
2349
2350   # disconnect disks
2351   for rd in bdevs:
2352     try:
2353       rd.DisconnectNet()
2354     except errors.BlockDeviceError, err:
2355       _Fail("Can't change network configuration to standalone mode: %s",
2356             err, exc=True)
2357   return (True, "All disks are now disconnected")
2358
2359
2360 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2361   """Attaches the network on a list of drbd devices.
2362
2363   """
2364   status, bdevs = _FindDisks(nodes_ip, disks)
2365   if not status:
2366     return status, bdevs
2367
2368   if multimaster:
2369     for idx, rd in enumerate(bdevs):
2370       try:
2371         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2372       except EnvironmentError, err:
2373         _Fail("Can't create symlink: %s", err)
2374   # reconnect disks, switch to new master configuration and if
2375   # needed primary mode
2376   for rd in bdevs:
2377     try:
2378       rd.AttachNet(multimaster)
2379     except errors.BlockDeviceError, err:
2380       _Fail("Can't change network configuration: %s", err)
2381   # wait until the disks are connected; we need to retry the re-attach
2382   # if the device becomes standalone, as this might happen if the one
2383   # node disconnects and reconnects in a different mode before the
2384   # other node reconnects; in this case, one or both of the nodes will
2385   # decide it has wrong configuration and switch to standalone
2386   RECONNECT_TIMEOUT = 2 * 60
2387   sleep_time = 0.100 # start with 100 miliseconds
2388   timeout_limit = time.time() + RECONNECT_TIMEOUT
2389   while time.time() < timeout_limit:
2390     all_connected = True
2391     for rd in bdevs:
2392       stats = rd.GetProcStatus()
2393       if not (stats.is_connected or stats.is_in_resync):
2394         all_connected = False
2395       if stats.is_standalone:
2396         # peer had different config info and this node became
2397         # standalone, even though this should not happen with the
2398         # new staged way of changing disk configs
2399         try:
2400           rd.ReAttachNet(multimaster)
2401         except errors.BlockDeviceError, err:
2402           _Fail("Can't change network configuration: %s", err)
2403     if all_connected:
2404       break
2405     time.sleep(sleep_time)
2406     sleep_time = min(5, sleep_time * 1.5)
2407   if not all_connected:
2408     return (False, "Timeout in disk reconnecting")
2409   if multimaster:
2410     # change to primary mode
2411     for rd in bdevs:
2412       try:
2413         rd.Open()
2414       except errors.BlockDeviceError, err:
2415         _Fail("Can't change to primary mode: %s", err)
2416   if multimaster:
2417     msg = "multi-master and primary"
2418   else:
2419     msg = "single-master"
2420   return (True, "Disks are now configured as %s" % msg)
2421
2422
2423 def DrbdWaitSync(nodes_ip, disks):
2424   """Wait until DRBDs have synchronized.
2425
2426   """
2427   status, bdevs = _FindDisks(nodes_ip, disks)
2428   if not status:
2429     return status, bdevs
2430
2431   min_resync = 100
2432   alldone = True
2433   failure = False
2434   for rd in bdevs:
2435     stats = rd.GetProcStatus()
2436     if not (stats.is_connected or stats.is_in_resync):
2437       failure = True
2438       break
2439     alldone = alldone and (not stats.is_in_resync)
2440     if stats.sync_percent is not None:
2441       min_resync = min(min_resync, stats.sync_percent)
2442   return (not failure, (alldone, min_resync))
2443
2444
2445 def PowercycleNode(hypervisor_type):
2446   """Hard-powercycle the node.
2447
2448   Because we need to return first, and schedule the powercycle in the
2449   background, we won't be able to report failures nicely.
2450
2451   """
2452   hyper = hypervisor.GetHypervisor(hypervisor_type)
2453   try:
2454     pid = os.fork()
2455   except OSError, err:
2456     # if we can't fork, we'll pretend that we're in the child process
2457     pid = 0
2458   if pid > 0:
2459     return (True, "Reboot scheduled in 5 seconds")
2460   time.sleep(5)
2461   hyper.PowercycleNode()
2462
2463
2464 class HooksRunner(object):
2465   """Hook runner.
2466
2467   This class is instantiated on the node side (ganeti-noded) and not
2468   on the master side.
2469
2470   """
2471   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2472
2473   def __init__(self, hooks_base_dir=None):
2474     """Constructor for hooks runner.
2475
2476     @type hooks_base_dir: str or None
2477     @param hooks_base_dir: if not None, this overrides the
2478         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2479
2480     """
2481     if hooks_base_dir is None:
2482       hooks_base_dir = constants.HOOKS_BASE_DIR
2483     self._BASE_DIR = hooks_base_dir
2484
2485   @staticmethod
2486   def ExecHook(script, env):
2487     """Exec one hook script.
2488
2489     @type script: str
2490     @param script: the full path to the script
2491     @type env: dict
2492     @param env: the environment with which to exec the script
2493     @rtype: tuple (success, message)
2494     @return: a tuple of success and message, where success
2495         indicates the succes of the operation, and message
2496         which will contain the error details in case we
2497         failed
2498
2499     """
2500     # exec the process using subprocess and log the output
2501     fdstdin = None
2502     try:
2503       fdstdin = open("/dev/null", "r")
2504       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2505                                stderr=subprocess.STDOUT, close_fds=True,
2506                                shell=False, cwd="/", env=env)
2507       output = ""
2508       try:
2509         output = child.stdout.read(4096)
2510         child.stdout.close()
2511       except EnvironmentError, err:
2512         output += "Hook script error: %s" % str(err)
2513
2514       while True:
2515         try:
2516           result = child.wait()
2517           break
2518         except EnvironmentError, err:
2519           if err.errno == errno.EINTR:
2520             continue
2521           raise
2522     finally:
2523       # try not to leak fds
2524       for fd in (fdstdin, ):
2525         if fd is not None:
2526           try:
2527             fd.close()
2528           except EnvironmentError, err:
2529             # just log the error
2530             #logging.exception("Error while closing fd %s", fd)
2531             pass
2532
2533     return result == 0, utils.SafeEncode(output.strip())
2534
2535   def RunHooks(self, hpath, phase, env):
2536     """Run the scripts in the hooks directory.
2537
2538     @type hpath: str
2539     @param hpath: the path to the hooks directory which
2540         holds the scripts
2541     @type phase: str
2542     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2543         L{constants.HOOKS_PHASE_POST}
2544     @type env: dict
2545     @param env: dictionary with the environment for the hook
2546     @rtype: list
2547     @return: list of 3-element tuples:
2548       - script path
2549       - script result, either L{constants.HKR_SUCCESS} or
2550         L{constants.HKR_FAIL}
2551       - output of the script
2552
2553     @raise errors.ProgrammerError: for invalid input
2554         parameters
2555
2556     """
2557     if phase == constants.HOOKS_PHASE_PRE:
2558       suffix = "pre"
2559     elif phase == constants.HOOKS_PHASE_POST:
2560       suffix = "post"
2561     else:
2562       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2563     rr = []
2564
2565     subdir = "%s-%s.d" % (hpath, suffix)
2566     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2567     try:
2568       dir_contents = utils.ListVisibleFiles(dir_name)
2569     except OSError, err:
2570       # FIXME: must log output in case of failures
2571       return rr
2572
2573     # we use the standard python sort order,
2574     # so 00name is the recommended naming scheme
2575     dir_contents.sort()
2576     for relname in dir_contents:
2577       fname = os.path.join(dir_name, relname)
2578       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2579           self.RE_MASK.match(relname) is not None):
2580         rrval = constants.HKR_SKIP
2581         output = ""
2582       else:
2583         result, output = self.ExecHook(fname, env)
2584         if not result:
2585           rrval = constants.HKR_FAIL
2586         else:
2587           rrval = constants.HKR_SUCCESS
2588       rr.append(("%s/%s" % (subdir, relname), rrval, output))
2589
2590     return rr
2591
2592
2593 class IAllocatorRunner(object):
2594   """IAllocator runner.
2595
2596   This class is instantiated on the node side (ganeti-noded) and not on
2597   the master side.
2598
2599   """
2600   def Run(self, name, idata):
2601     """Run an iallocator script.
2602
2603     @type name: str
2604     @param name: the iallocator script name
2605     @type idata: str
2606     @param idata: the allocator input data
2607
2608     @rtype: tuple
2609     @return: four element tuple of:
2610        - run status (one of the IARUN_ constants)
2611        - stdout
2612        - stderr
2613        - fail reason (as from L{utils.RunResult})
2614
2615     """
2616     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2617                                   os.path.isfile)
2618     if alloc_script is None:
2619       return (constants.IARUN_NOTFOUND, None, None, None)
2620
2621     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2622     try:
2623       os.write(fd, idata)
2624       os.close(fd)
2625       result = utils.RunCmd([alloc_script, fin_name])
2626       if result.failed:
2627         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2628                 result.fail_reason)
2629     finally:
2630       os.unlink(fin_name)
2631
2632     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2633
2634
2635 class DevCacheManager(object):
2636   """Simple class for managing a cache of block device information.
2637
2638   """
2639   _DEV_PREFIX = "/dev/"
2640   _ROOT_DIR = constants.BDEV_CACHE_DIR
2641
2642   @classmethod
2643   def _ConvertPath(cls, dev_path):
2644     """Converts a /dev/name path to the cache file name.
2645
2646     This replaces slashes with underscores and strips the /dev
2647     prefix. It then returns the full path to the cache file.
2648
2649     @type dev_path: str
2650     @param dev_path: the C{/dev/} path name
2651     @rtype: str
2652     @return: the converted path name
2653
2654     """
2655     if dev_path.startswith(cls._DEV_PREFIX):
2656       dev_path = dev_path[len(cls._DEV_PREFIX):]
2657     dev_path = dev_path.replace("/", "_")
2658     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2659     return fpath
2660
2661   @classmethod
2662   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2663     """Updates the cache information for a given device.
2664
2665     @type dev_path: str
2666     @param dev_path: the pathname of the device
2667     @type owner: str
2668     @param owner: the owner (instance name) of the device
2669     @type on_primary: bool
2670     @param on_primary: whether this is the primary
2671         node nor not
2672     @type iv_name: str
2673     @param iv_name: the instance-visible name of the
2674         device, as in objects.Disk.iv_name
2675
2676     @rtype: None
2677
2678     """
2679     if dev_path is None:
2680       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2681       return
2682     fpath = cls._ConvertPath(dev_path)
2683     if on_primary:
2684       state = "primary"
2685     else:
2686       state = "secondary"
2687     if iv_name is None:
2688       iv_name = "not_visible"
2689     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2690     try:
2691       utils.WriteFile(fpath, data=fdata)
2692     except EnvironmentError, err:
2693       logging.exception("Can't update bdev cache for %s", dev_path)
2694
2695   @classmethod
2696   def RemoveCache(cls, dev_path):
2697     """Remove data for a dev_path.
2698
2699     This is just a wrapper over L{utils.RemoveFile} with a converted
2700     path name and logging.
2701
2702     @type dev_path: str
2703     @param dev_path: the pathname of the device
2704
2705     @rtype: None
2706
2707     """
2708     if dev_path is None:
2709       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2710       return
2711     fpath = cls._ConvertPath(dev_path)
2712     try:
2713       utils.RemoveFile(fpath)
2714     except EnvironmentError, err:
2715       logging.exception("Can't update bdev cache for %s", dev_path)