Add RPC calls to update /etc/hosts
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable-msg=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50 import signal
51
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62
63
64 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
65 _ALLOWED_CLEAN_DIRS = frozenset([
66   constants.DATA_DIR,
67   constants.JOB_QUEUE_ARCHIVE_DIR,
68   constants.QUEUE_DIR,
69   constants.CRYPTO_KEYS_DIR,
70   ])
71 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
72 _X509_KEY_FILE = "key"
73 _X509_CERT_FILE = "cert"
74 _IES_STATUS_FILE = "status"
75 _IES_PID_FILE = "pid"
76 _IES_CA_FILE = "ca"
77
78
79 class RPCFail(Exception):
80   """Class denoting RPC failure.
81
82   Its argument is the error message.
83
84   """
85
86
87 def _Fail(msg, *args, **kwargs):
88   """Log an error and the raise an RPCFail exception.
89
90   This exception is then handled specially in the ganeti daemon and
91   turned into a 'failed' return type. As such, this function is a
92   useful shortcut for logging the error and returning it to the master
93   daemon.
94
95   @type msg: string
96   @param msg: the text of the exception
97   @raise RPCFail
98
99   """
100   if args:
101     msg = msg % args
102   if "log" not in kwargs or kwargs["log"]: # if we should log this error
103     if "exc" in kwargs and kwargs["exc"]:
104       logging.exception(msg)
105     else:
106       logging.error(msg)
107   raise RPCFail(msg)
108
109
110 def _GetConfig():
111   """Simple wrapper to return a SimpleStore.
112
113   @rtype: L{ssconf.SimpleStore}
114   @return: a SimpleStore instance
115
116   """
117   return ssconf.SimpleStore()
118
119
120 def _GetSshRunner(cluster_name):
121   """Simple wrapper to return an SshRunner.
122
123   @type cluster_name: str
124   @param cluster_name: the cluster name, which is needed
125       by the SshRunner constructor
126   @rtype: L{ssh.SshRunner}
127   @return: an SshRunner instance
128
129   """
130   return ssh.SshRunner(cluster_name)
131
132
133 def _Decompress(data):
134   """Unpacks data compressed by the RPC client.
135
136   @type data: list or tuple
137   @param data: Data sent by RPC client
138   @rtype: str
139   @return: Decompressed data
140
141   """
142   assert isinstance(data, (list, tuple))
143   assert len(data) == 2
144   (encoding, content) = data
145   if encoding == constants.RPC_ENCODING_NONE:
146     return content
147   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
148     return zlib.decompress(base64.b64decode(content))
149   else:
150     raise AssertionError("Unknown data encoding")
151
152
153 def _CleanDirectory(path, exclude=None):
154   """Removes all regular files in a directory.
155
156   @type path: str
157   @param path: the directory to clean
158   @type exclude: list
159   @param exclude: list of files to be excluded, defaults
160       to the empty list
161
162   """
163   if path not in _ALLOWED_CLEAN_DIRS:
164     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
165           path)
166
167   if not os.path.isdir(path):
168     return
169   if exclude is None:
170     exclude = []
171   else:
172     # Normalize excluded paths
173     exclude = [os.path.normpath(i) for i in exclude]
174
175   for rel_name in utils.ListVisibleFiles(path):
176     full_name = utils.PathJoin(path, rel_name)
177     if full_name in exclude:
178       continue
179     if os.path.isfile(full_name) and not os.path.islink(full_name):
180       utils.RemoveFile(full_name)
181
182
183 def _BuildUploadFileList():
184   """Build the list of allowed upload files.
185
186   This is abstracted so that it's built only once at module import time.
187
188   """
189   allowed_files = set([
190     constants.CLUSTER_CONF_FILE,
191     constants.ETC_HOSTS,
192     constants.SSH_KNOWN_HOSTS_FILE,
193     constants.VNC_PASSWORD_FILE,
194     constants.RAPI_CERT_FILE,
195     constants.RAPI_USERS_FILE,
196     constants.CONFD_HMAC_KEY,
197     constants.CLUSTER_DOMAIN_SECRET_FILE,
198     ])
199
200   for hv_name in constants.HYPER_TYPES:
201     hv_class = hypervisor.GetHypervisorClass(hv_name)
202     allowed_files.update(hv_class.GetAncillaryFiles())
203
204   return frozenset(allowed_files)
205
206
207 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
208
209
210 def JobQueuePurge():
211   """Removes job queue files and archived jobs.
212
213   @rtype: tuple
214   @return: True, None
215
216   """
217   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
218   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
219
220
221 def GetMasterInfo():
222   """Returns master information.
223
224   This is an utility function to compute master information, either
225   for consumption here or from the node daemon.
226
227   @rtype: tuple
228   @return: master_netdev, master_ip, master_name, primary_ip_family
229   @raise RPCFail: in case of errors
230
231   """
232   try:
233     cfg = _GetConfig()
234     master_netdev = cfg.GetMasterNetdev()
235     master_ip = cfg.GetMasterIP()
236     master_node = cfg.GetMasterNode()
237     primary_ip_family = cfg.GetPrimaryIPFamily()
238   except errors.ConfigurationError, err:
239     _Fail("Cluster configuration incomplete: %s", err, exc=True)
240   return (master_netdev, master_ip, master_node, primary_ip_family)
241
242
243 def StartMaster(start_daemons, no_voting):
244   """Activate local node as master node.
245
246   The function will either try activate the IP address of the master
247   (unless someone else has it) or also start the master daemons, based
248   on the start_daemons parameter.
249
250   @type start_daemons: boolean
251   @param start_daemons: whether to start the master daemons
252       (ganeti-masterd and ganeti-rapi), or (if false) activate the
253       master ip
254   @type no_voting: boolean
255   @param no_voting: whether to start ganeti-masterd without a node vote
256       (if start_daemons is True), but still non-interactively
257   @rtype: None
258
259   """
260   # GetMasterInfo will raise an exception if not able to return data
261   master_netdev, master_ip, _, family = GetMasterInfo()
262
263   err_msgs = []
264   # either start the master and rapi daemons
265   if start_daemons:
266     if no_voting:
267       masterd_args = "--no-voting --yes-do-it"
268     else:
269       masterd_args = ""
270
271     env = {
272       "EXTRA_MASTERD_ARGS": masterd_args,
273       }
274
275     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
276     if result.failed:
277       msg = "Can't start Ganeti master: %s" % result.output
278       logging.error(msg)
279       err_msgs.append(msg)
280   # or activate the IP
281   else:
282     if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
283       if netutils.IPAddress.Own(master_ip):
284         # we already have the ip:
285         logging.debug("Master IP already configured, doing nothing")
286       else:
287         msg = "Someone else has the master ip, not activating"
288         logging.error(msg)
289         err_msgs.append(msg)
290     else:
291       ipcls = netutils.IP4Address
292       if family == netutils.IP6Address.family:
293         ipcls = netutils.IP6Address
294
295       result = utils.RunCmd(["ip", "address", "add",
296                              "%s/%d" % (master_ip, ipcls.iplen),
297                              "dev", master_netdev, "label",
298                              "%s:0" % master_netdev])
299       if result.failed:
300         msg = "Can't activate master IP: %s" % result.output
301         logging.error(msg)
302         err_msgs.append(msg)
303
304       # we ignore the exit code of the following cmds
305       if ipcls == netutils.IP4Address:
306         utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
307                       master_ip, master_ip])
308       elif ipcls == netutils.IP6Address:
309         utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
310
311   if err_msgs:
312     _Fail("; ".join(err_msgs))
313
314
315 def StopMaster(stop_daemons):
316   """Deactivate this node as master.
317
318   The function will always try to deactivate the IP address of the
319   master. It will also stop the master daemons depending on the
320   stop_daemons parameter.
321
322   @type stop_daemons: boolean
323   @param stop_daemons: whether to also stop the master daemons
324       (ganeti-masterd and ganeti-rapi)
325   @rtype: None
326
327   """
328   # TODO: log and report back to the caller the error failures; we
329   # need to decide in which case we fail the RPC for this
330
331   # GetMasterInfo will raise an exception if not able to return data
332   master_netdev, master_ip, _, family = GetMasterInfo()
333
334   ipcls = netutils.IP4Address
335   if family == netutils.IP6Address.family:
336     ipcls = netutils.IP6Address
337
338   result = utils.RunCmd(["ip", "address", "del",
339                          "%s/%d" % (master_ip, ipcls.iplen),
340                          "dev", master_netdev])
341   if result.failed:
342     logging.error("Can't remove the master IP, error: %s", result.output)
343     # but otherwise ignore the failure
344
345   if stop_daemons:
346     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
347     if result.failed:
348       logging.error("Could not stop Ganeti master, command %s had exitcode %s"
349                     " and error %s",
350                     result.cmd, result.exit_code, result.output)
351
352
353 def EtcHostsModify(mode, host, ip):
354   """Modify a host entry in /etc/hosts.
355
356   @param mode: The mode to operate. Either add or remove entry
357   @param host: The host to operate on
358   @param ip: The ip associated with the entry
359
360   """
361   if mode == constants.ETC_HOSTS_ADD:
362     if not ip:
363       RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
364               " present")
365     utils.AddHostToEtcHosts(host, ip)
366   elif mode == constants.ETC_HOSTS_REMOVE:
367     if ip:
368       RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
369               " parameter is present")
370     utils.RemoveHostFromEtcHosts(host)
371   else:
372     RPCFail("Mode not supported")
373
374
375 def LeaveCluster(modify_ssh_setup):
376   """Cleans up and remove the current node.
377
378   This function cleans up and prepares the current node to be removed
379   from the cluster.
380
381   If processing is successful, then it raises an
382   L{errors.QuitGanetiException} which is used as a special case to
383   shutdown the node daemon.
384
385   @param modify_ssh_setup: boolean
386
387   """
388   _CleanDirectory(constants.DATA_DIR)
389   _CleanDirectory(constants.CRYPTO_KEYS_DIR)
390   JobQueuePurge()
391
392   if modify_ssh_setup:
393     try:
394       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
395
396       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
397
398       utils.RemoveFile(priv_key)
399       utils.RemoveFile(pub_key)
400     except errors.OpExecError:
401       logging.exception("Error while processing ssh files")
402
403   try:
404     utils.RemoveFile(constants.CONFD_HMAC_KEY)
405     utils.RemoveFile(constants.RAPI_CERT_FILE)
406     utils.RemoveFile(constants.NODED_CERT_FILE)
407   except: # pylint: disable-msg=W0702
408     logging.exception("Error while removing cluster secrets")
409
410   result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
411   if result.failed:
412     logging.error("Command %s failed with exitcode %s and error %s",
413                   result.cmd, result.exit_code, result.output)
414
415   # Raise a custom exception (handled in ganeti-noded)
416   raise errors.QuitGanetiException(True, 'Shutdown scheduled')
417
418
419 def GetNodeInfo(vgname, hypervisor_type):
420   """Gives back a hash with different information about the node.
421
422   @type vgname: C{string}
423   @param vgname: the name of the volume group to ask for disk space information
424   @type hypervisor_type: C{str}
425   @param hypervisor_type: the name of the hypervisor to ask for
426       memory information
427   @rtype: C{dict}
428   @return: dictionary with the following keys:
429       - vg_size is the size of the configured volume group in MiB
430       - vg_free is the free size of the volume group in MiB
431       - memory_dom0 is the memory allocated for domain0 in MiB
432       - memory_free is the currently available (free) ram in MiB
433       - memory_total is the total number of ram in MiB
434
435   """
436   outputarray = {}
437   vginfo = _GetVGInfo(vgname)
438   outputarray['vg_size'] = vginfo['vg_size']
439   outputarray['vg_free'] = vginfo['vg_free']
440
441   hyper = hypervisor.GetHypervisor(hypervisor_type)
442   hyp_info = hyper.GetNodeInfo()
443   if hyp_info is not None:
444     outputarray.update(hyp_info)
445
446   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
447
448   return outputarray
449
450
451 def VerifyNode(what, cluster_name):
452   """Verify the status of the local node.
453
454   Based on the input L{what} parameter, various checks are done on the
455   local node.
456
457   If the I{filelist} key is present, this list of
458   files is checksummed and the file/checksum pairs are returned.
459
460   If the I{nodelist} key is present, we check that we have
461   connectivity via ssh with the target nodes (and check the hostname
462   report).
463
464   If the I{node-net-test} key is present, we check that we have
465   connectivity to the given nodes via both primary IP and, if
466   applicable, secondary IPs.
467
468   @type what: C{dict}
469   @param what: a dictionary of things to check:
470       - filelist: list of files for which to compute checksums
471       - nodelist: list of nodes we should check ssh communication with
472       - node-net-test: list of nodes we should check node daemon port
473         connectivity with
474       - hypervisor: list with hypervisors to run the verify for
475   @rtype: dict
476   @return: a dictionary with the same keys as the input dict, and
477       values representing the result of the checks
478
479   """
480   result = {}
481   my_name = netutils.Hostname.GetSysName()
482   port = netutils.GetDaemonPort(constants.NODED)
483
484   if constants.NV_HYPERVISOR in what:
485     result[constants.NV_HYPERVISOR] = tmp = {}
486     for hv_name in what[constants.NV_HYPERVISOR]:
487       try:
488         val = hypervisor.GetHypervisor(hv_name).Verify()
489       except errors.HypervisorError, err:
490         val = "Error while checking hypervisor: %s" % str(err)
491       tmp[hv_name] = val
492
493   if constants.NV_FILELIST in what:
494     result[constants.NV_FILELIST] = utils.FingerprintFiles(
495       what[constants.NV_FILELIST])
496
497   if constants.NV_NODELIST in what:
498     result[constants.NV_NODELIST] = tmp = {}
499     random.shuffle(what[constants.NV_NODELIST])
500     for node in what[constants.NV_NODELIST]:
501       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
502       if not success:
503         tmp[node] = message
504
505   if constants.NV_NODENETTEST in what:
506     result[constants.NV_NODENETTEST] = tmp = {}
507     my_pip = my_sip = None
508     for name, pip, sip in what[constants.NV_NODENETTEST]:
509       if name == my_name:
510         my_pip = pip
511         my_sip = sip
512         break
513     if not my_pip:
514       tmp[my_name] = ("Can't find my own primary/secondary IP"
515                       " in the node list")
516     else:
517       for name, pip, sip in what[constants.NV_NODENETTEST]:
518         fail = []
519         if not netutils.TcpPing(pip, port, source=my_pip):
520           fail.append("primary")
521         if sip != pip:
522           if not netutils.TcpPing(sip, port, source=my_sip):
523             fail.append("secondary")
524         if fail:
525           tmp[name] = ("failure using the %s interface(s)" %
526                        " and ".join(fail))
527
528   if constants.NV_MASTERIP in what:
529     # FIXME: add checks on incoming data structures (here and in the
530     # rest of the function)
531     master_name, master_ip = what[constants.NV_MASTERIP]
532     if master_name == my_name:
533       source = constants.IP4_ADDRESS_LOCALHOST
534     else:
535       source = None
536     result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
537                                                   source=source)
538
539   if constants.NV_LVLIST in what:
540     try:
541       val = GetVolumeList(what[constants.NV_LVLIST])
542     except RPCFail, err:
543       val = str(err)
544     result[constants.NV_LVLIST] = val
545
546   if constants.NV_INSTANCELIST in what:
547     # GetInstanceList can fail
548     try:
549       val = GetInstanceList(what[constants.NV_INSTANCELIST])
550     except RPCFail, err:
551       val = str(err)
552     result[constants.NV_INSTANCELIST] = val
553
554   if constants.NV_VGLIST in what:
555     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
556
557   if constants.NV_PVLIST in what:
558     result[constants.NV_PVLIST] = \
559       bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
560                                    filter_allocatable=False)
561
562   if constants.NV_VERSION in what:
563     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
564                                     constants.RELEASE_VERSION)
565
566   if constants.NV_HVINFO in what:
567     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
568     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
569
570   if constants.NV_DRBDLIST in what:
571     try:
572       used_minors = bdev.DRBD8.GetUsedDevs().keys()
573     except errors.BlockDeviceError, err:
574       logging.warning("Can't get used minors list", exc_info=True)
575       used_minors = str(err)
576     result[constants.NV_DRBDLIST] = used_minors
577
578   if constants.NV_DRBDHELPER in what:
579     status = True
580     try:
581       payload = bdev.BaseDRBD.GetUsermodeHelper()
582     except errors.BlockDeviceError, err:
583       logging.error("Can't get DRBD usermode helper: %s", str(err))
584       status = False
585       payload = str(err)
586     result[constants.NV_DRBDHELPER] = (status, payload)
587
588   if constants.NV_NODESETUP in what:
589     result[constants.NV_NODESETUP] = tmpr = []
590     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
591       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
592                   " under /sys, missing required directories /sys/block"
593                   " and /sys/class/net")
594     if (not os.path.isdir("/proc/sys") or
595         not os.path.isfile("/proc/sysrq-trigger")):
596       tmpr.append("The procfs filesystem doesn't seem to be mounted"
597                   " under /proc, missing required directory /proc/sys and"
598                   " the file /proc/sysrq-trigger")
599
600   if constants.NV_TIME in what:
601     result[constants.NV_TIME] = utils.SplitTime(time.time())
602
603   if constants.NV_OSLIST in what:
604     result[constants.NV_OSLIST] = DiagnoseOS()
605
606   return result
607
608
609 def GetVolumeList(vg_name):
610   """Compute list of logical volumes and their size.
611
612   @type vg_name: str
613   @param vg_name: the volume group whose LVs we should list
614   @rtype: dict
615   @return:
616       dictionary of all partions (key) with value being a tuple of
617       their size (in MiB), inactive and online status::
618
619         {'test1': ('20.06', True, True)}
620
621       in case of errors, a string is returned with the error
622       details.
623
624   """
625   lvs = {}
626   sep = '|'
627   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
628                          "--separator=%s" % sep,
629                          "-olv_name,lv_size,lv_attr", vg_name])
630   if result.failed:
631     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
632
633   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
634   for line in result.stdout.splitlines():
635     line = line.strip()
636     match = valid_line_re.match(line)
637     if not match:
638       logging.error("Invalid line returned from lvs output: '%s'", line)
639       continue
640     name, size, attr = match.groups()
641     inactive = attr[4] == '-'
642     online = attr[5] == 'o'
643     virtual = attr[0] == 'v'
644     if virtual:
645       # we don't want to report such volumes as existing, since they
646       # don't really hold data
647       continue
648     lvs[name] = (size, inactive, online)
649
650   return lvs
651
652
653 def ListVolumeGroups():
654   """List the volume groups and their size.
655
656   @rtype: dict
657   @return: dictionary with keys volume name and values the
658       size of the volume
659
660   """
661   return utils.ListVolumeGroups()
662
663
664 def NodeVolumes():
665   """List all volumes on this node.
666
667   @rtype: list
668   @return:
669     A list of dictionaries, each having four keys:
670       - name: the logical volume name,
671       - size: the size of the logical volume
672       - dev: the physical device on which the LV lives
673       - vg: the volume group to which it belongs
674
675     In case of errors, we return an empty list and log the
676     error.
677
678     Note that since a logical volume can live on multiple physical
679     volumes, the resulting list might include a logical volume
680     multiple times.
681
682   """
683   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
684                          "--separator=|",
685                          "--options=lv_name,lv_size,devices,vg_name"])
686   if result.failed:
687     _Fail("Failed to list logical volumes, lvs output: %s",
688           result.output)
689
690   def parse_dev(dev):
691     return dev.split('(')[0]
692
693   def handle_dev(dev):
694     return [parse_dev(x) for x in dev.split(",")]
695
696   def map_line(line):
697     line = [v.strip() for v in line]
698     return [{'name': line[0], 'size': line[1],
699              'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
700
701   all_devs = []
702   for line in result.stdout.splitlines():
703     if line.count('|') >= 3:
704       all_devs.extend(map_line(line.split('|')))
705     else:
706       logging.warning("Strange line in the output from lvs: '%s'", line)
707   return all_devs
708
709
710 def BridgesExist(bridges_list):
711   """Check if a list of bridges exist on the current node.
712
713   @rtype: boolean
714   @return: C{True} if all of them exist, C{False} otherwise
715
716   """
717   missing = []
718   for bridge in bridges_list:
719     if not utils.BridgeExists(bridge):
720       missing.append(bridge)
721
722   if missing:
723     _Fail("Missing bridges %s", utils.CommaJoin(missing))
724
725
726 def GetInstanceList(hypervisor_list):
727   """Provides a list of instances.
728
729   @type hypervisor_list: list
730   @param hypervisor_list: the list of hypervisors to query information
731
732   @rtype: list
733   @return: a list of all running instances on the current node
734     - instance1.example.com
735     - instance2.example.com
736
737   """
738   results = []
739   for hname in hypervisor_list:
740     try:
741       names = hypervisor.GetHypervisor(hname).ListInstances()
742       results.extend(names)
743     except errors.HypervisorError, err:
744       _Fail("Error enumerating instances (hypervisor %s): %s",
745             hname, err, exc=True)
746
747   return results
748
749
750 def GetInstanceInfo(instance, hname):
751   """Gives back the information about an instance as a dictionary.
752
753   @type instance: string
754   @param instance: the instance name
755   @type hname: string
756   @param hname: the hypervisor type of the instance
757
758   @rtype: dict
759   @return: dictionary with the following keys:
760       - memory: memory size of instance (int)
761       - state: xen state of instance (string)
762       - time: cpu time of instance (float)
763
764   """
765   output = {}
766
767   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
768   if iinfo is not None:
769     output['memory'] = iinfo[2]
770     output['state'] = iinfo[4]
771     output['time'] = iinfo[5]
772
773   return output
774
775
776 def GetInstanceMigratable(instance):
777   """Gives whether an instance can be migrated.
778
779   @type instance: L{objects.Instance}
780   @param instance: object representing the instance to be checked.
781
782   @rtype: tuple
783   @return: tuple of (result, description) where:
784       - result: whether the instance can be migrated or not
785       - description: a description of the issue, if relevant
786
787   """
788   hyper = hypervisor.GetHypervisor(instance.hypervisor)
789   iname = instance.name
790   if iname not in hyper.ListInstances():
791     _Fail("Instance %s is not running", iname)
792
793   for idx in range(len(instance.disks)):
794     link_name = _GetBlockDevSymlinkPath(iname, idx)
795     if not os.path.islink(link_name):
796       logging.warning("Instance %s is missing symlink %s for disk %d",
797                       iname, link_name, idx)
798
799
800 def GetAllInstancesInfo(hypervisor_list):
801   """Gather data about all instances.
802
803   This is the equivalent of L{GetInstanceInfo}, except that it
804   computes data for all instances at once, thus being faster if one
805   needs data about more than one instance.
806
807   @type hypervisor_list: list
808   @param hypervisor_list: list of hypervisors to query for instance data
809
810   @rtype: dict
811   @return: dictionary of instance: data, with data having the following keys:
812       - memory: memory size of instance (int)
813       - state: xen state of instance (string)
814       - time: cpu time of instance (float)
815       - vcpus: the number of vcpus
816
817   """
818   output = {}
819
820   for hname in hypervisor_list:
821     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
822     if iinfo:
823       for name, _, memory, vcpus, state, times in iinfo:
824         value = {
825           'memory': memory,
826           'vcpus': vcpus,
827           'state': state,
828           'time': times,
829           }
830         if name in output:
831           # we only check static parameters, like memory and vcpus,
832           # and not state and time which can change between the
833           # invocations of the different hypervisors
834           for key in 'memory', 'vcpus':
835             if value[key] != output[name][key]:
836               _Fail("Instance %s is running twice"
837                     " with different parameters", name)
838         output[name] = value
839
840   return output
841
842
843 def _InstanceLogName(kind, os_name, instance):
844   """Compute the OS log filename for a given instance and operation.
845
846   The instance name and os name are passed in as strings since not all
847   operations have these as part of an instance object.
848
849   @type kind: string
850   @param kind: the operation type (e.g. add, import, etc.)
851   @type os_name: string
852   @param os_name: the os name
853   @type instance: string
854   @param instance: the name of the instance being imported/added/etc.
855
856   """
857   # TODO: Use tempfile.mkstemp to create unique filename
858   base = ("%s-%s-%s-%s.log" %
859           (kind, os_name, instance, utils.TimestampForFilename()))
860   return utils.PathJoin(constants.LOG_OS_DIR, base)
861
862
863 def InstanceOsAdd(instance, reinstall, debug):
864   """Add an OS to an instance.
865
866   @type instance: L{objects.Instance}
867   @param instance: Instance whose OS is to be installed
868   @type reinstall: boolean
869   @param reinstall: whether this is an instance reinstall
870   @type debug: integer
871   @param debug: debug level, passed to the OS scripts
872   @rtype: None
873
874   """
875   inst_os = OSFromDisk(instance.os)
876
877   create_env = OSEnvironment(instance, inst_os, debug)
878   if reinstall:
879     create_env['INSTANCE_REINSTALL'] = "1"
880
881   logfile = _InstanceLogName("add", instance.os, instance.name)
882
883   result = utils.RunCmd([inst_os.create_script], env=create_env,
884                         cwd=inst_os.path, output=logfile,)
885   if result.failed:
886     logging.error("os create command '%s' returned error: %s, logfile: %s,"
887                   " output: %s", result.cmd, result.fail_reason, logfile,
888                   result.output)
889     lines = [utils.SafeEncode(val)
890              for val in utils.TailFile(logfile, lines=20)]
891     _Fail("OS create script failed (%s), last lines in the"
892           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
893
894
895 def RunRenameInstance(instance, old_name, debug):
896   """Run the OS rename script for an instance.
897
898   @type instance: L{objects.Instance}
899   @param instance: Instance whose OS is to be installed
900   @type old_name: string
901   @param old_name: previous instance name
902   @type debug: integer
903   @param debug: debug level, passed to the OS scripts
904   @rtype: boolean
905   @return: the success of the operation
906
907   """
908   inst_os = OSFromDisk(instance.os)
909
910   rename_env = OSEnvironment(instance, inst_os, debug)
911   rename_env['OLD_INSTANCE_NAME'] = old_name
912
913   logfile = _InstanceLogName("rename", instance.os,
914                              "%s-%s" % (old_name, instance.name))
915
916   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
917                         cwd=inst_os.path, output=logfile)
918
919   if result.failed:
920     logging.error("os create command '%s' returned error: %s output: %s",
921                   result.cmd, result.fail_reason, result.output)
922     lines = [utils.SafeEncode(val)
923              for val in utils.TailFile(logfile, lines=20)]
924     _Fail("OS rename script failed (%s), last lines in the"
925           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
926
927
928 def _GetVGInfo(vg_name):
929   """Get information about the volume group.
930
931   @type vg_name: str
932   @param vg_name: the volume group which we query
933   @rtype: dict
934   @return:
935     A dictionary with the following keys:
936       - C{vg_size} is the total size of the volume group in MiB
937       - C{vg_free} is the free size of the volume group in MiB
938       - C{pv_count} are the number of physical disks in that VG
939
940     If an error occurs during gathering of data, we return the same dict
941     with keys all set to None.
942
943   """
944   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
945
946   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
947                          "--nosuffix", "--units=m", "--separator=:", vg_name])
948
949   if retval.failed:
950     logging.error("volume group %s not present", vg_name)
951     return retdic
952   valarr = retval.stdout.strip().rstrip(':').split(':')
953   if len(valarr) == 3:
954     try:
955       retdic = {
956         "vg_size": int(round(float(valarr[0]), 0)),
957         "vg_free": int(round(float(valarr[1]), 0)),
958         "pv_count": int(valarr[2]),
959         }
960     except (TypeError, ValueError), err:
961       logging.exception("Fail to parse vgs output: %s", err)
962   else:
963     logging.error("vgs output has the wrong number of fields (expected"
964                   " three): %s", str(valarr))
965   return retdic
966
967
968 def _GetBlockDevSymlinkPath(instance_name, idx):
969   return utils.PathJoin(constants.DISK_LINKS_DIR,
970                         "%s:%d" % (instance_name, idx))
971
972
973 def _SymlinkBlockDev(instance_name, device_path, idx):
974   """Set up symlinks to a instance's block device.
975
976   This is an auxiliary function run when an instance is start (on the primary
977   node) or when an instance is migrated (on the target node).
978
979
980   @param instance_name: the name of the target instance
981   @param device_path: path of the physical block device, on the node
982   @param idx: the disk index
983   @return: absolute path to the disk's symlink
984
985   """
986   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
987   try:
988     os.symlink(device_path, link_name)
989   except OSError, err:
990     if err.errno == errno.EEXIST:
991       if (not os.path.islink(link_name) or
992           os.readlink(link_name) != device_path):
993         os.remove(link_name)
994         os.symlink(device_path, link_name)
995     else:
996       raise
997
998   return link_name
999
1000
1001 def _RemoveBlockDevLinks(instance_name, disks):
1002   """Remove the block device symlinks belonging to the given instance.
1003
1004   """
1005   for idx, _ in enumerate(disks):
1006     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1007     if os.path.islink(link_name):
1008       try:
1009         os.remove(link_name)
1010       except OSError:
1011         logging.exception("Can't remove symlink '%s'", link_name)
1012
1013
1014 def _GatherAndLinkBlockDevs(instance):
1015   """Set up an instance's block device(s).
1016
1017   This is run on the primary node at instance startup. The block
1018   devices must be already assembled.
1019
1020   @type instance: L{objects.Instance}
1021   @param instance: the instance whose disks we shoul assemble
1022   @rtype: list
1023   @return: list of (disk_object, device_path)
1024
1025   """
1026   block_devices = []
1027   for idx, disk in enumerate(instance.disks):
1028     device = _RecursiveFindBD(disk)
1029     if device is None:
1030       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1031                                     str(disk))
1032     device.Open()
1033     try:
1034       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1035     except OSError, e:
1036       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1037                                     e.strerror)
1038
1039     block_devices.append((disk, link_name))
1040
1041   return block_devices
1042
1043
1044 def StartInstance(instance):
1045   """Start an instance.
1046
1047   @type instance: L{objects.Instance}
1048   @param instance: the instance object
1049   @rtype: None
1050
1051   """
1052   running_instances = GetInstanceList([instance.hypervisor])
1053
1054   if instance.name in running_instances:
1055     logging.info("Instance %s already running, not starting", instance.name)
1056     return
1057
1058   try:
1059     block_devices = _GatherAndLinkBlockDevs(instance)
1060     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1061     hyper.StartInstance(instance, block_devices)
1062   except errors.BlockDeviceError, err:
1063     _Fail("Block device error: %s", err, exc=True)
1064   except errors.HypervisorError, err:
1065     _RemoveBlockDevLinks(instance.name, instance.disks)
1066     _Fail("Hypervisor error: %s", err, exc=True)
1067
1068
1069 def InstanceShutdown(instance, timeout):
1070   """Shut an instance down.
1071
1072   @note: this functions uses polling with a hardcoded timeout.
1073
1074   @type instance: L{objects.Instance}
1075   @param instance: the instance object
1076   @type timeout: integer
1077   @param timeout: maximum timeout for soft shutdown
1078   @rtype: None
1079
1080   """
1081   hv_name = instance.hypervisor
1082   hyper = hypervisor.GetHypervisor(hv_name)
1083   iname = instance.name
1084
1085   if instance.name not in hyper.ListInstances():
1086     logging.info("Instance %s not running, doing nothing", iname)
1087     return
1088
1089   class _TryShutdown:
1090     def __init__(self):
1091       self.tried_once = False
1092
1093     def __call__(self):
1094       if iname not in hyper.ListInstances():
1095         return
1096
1097       try:
1098         hyper.StopInstance(instance, retry=self.tried_once)
1099       except errors.HypervisorError, err:
1100         if iname not in hyper.ListInstances():
1101           # if the instance is no longer existing, consider this a
1102           # success and go to cleanup
1103           return
1104
1105         _Fail("Failed to stop instance %s: %s", iname, err)
1106
1107       self.tried_once = True
1108
1109       raise utils.RetryAgain()
1110
1111   try:
1112     utils.Retry(_TryShutdown(), 5, timeout)
1113   except utils.RetryTimeout:
1114     # the shutdown did not succeed
1115     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1116
1117     try:
1118       hyper.StopInstance(instance, force=True)
1119     except errors.HypervisorError, err:
1120       if iname in hyper.ListInstances():
1121         # only raise an error if the instance still exists, otherwise
1122         # the error could simply be "instance ... unknown"!
1123         _Fail("Failed to force stop instance %s: %s", iname, err)
1124
1125     time.sleep(1)
1126
1127     if iname in hyper.ListInstances():
1128       _Fail("Could not shutdown instance %s even by destroy", iname)
1129
1130   try:
1131     hyper.CleanupInstance(instance.name)
1132   except errors.HypervisorError, err:
1133     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1134
1135   _RemoveBlockDevLinks(iname, instance.disks)
1136
1137
1138 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1139   """Reboot an instance.
1140
1141   @type instance: L{objects.Instance}
1142   @param instance: the instance object to reboot
1143   @type reboot_type: str
1144   @param reboot_type: the type of reboot, one the following
1145     constants:
1146       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1147         instance OS, do not recreate the VM
1148       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1149         restart the VM (at the hypervisor level)
1150       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1151         not accepted here, since that mode is handled differently, in
1152         cmdlib, and translates into full stop and start of the
1153         instance (instead of a call_instance_reboot RPC)
1154   @type shutdown_timeout: integer
1155   @param shutdown_timeout: maximum timeout for soft shutdown
1156   @rtype: None
1157
1158   """
1159   running_instances = GetInstanceList([instance.hypervisor])
1160
1161   if instance.name not in running_instances:
1162     _Fail("Cannot reboot instance %s that is not running", instance.name)
1163
1164   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1165   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1166     try:
1167       hyper.RebootInstance(instance)
1168     except errors.HypervisorError, err:
1169       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1170   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1171     try:
1172       InstanceShutdown(instance, shutdown_timeout)
1173       return StartInstance(instance)
1174     except errors.HypervisorError, err:
1175       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1176   else:
1177     _Fail("Invalid reboot_type received: %s", reboot_type)
1178
1179
1180 def MigrationInfo(instance):
1181   """Gather information about an instance to be migrated.
1182
1183   @type instance: L{objects.Instance}
1184   @param instance: the instance definition
1185
1186   """
1187   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1188   try:
1189     info = hyper.MigrationInfo(instance)
1190   except errors.HypervisorError, err:
1191     _Fail("Failed to fetch migration information: %s", err, exc=True)
1192   return info
1193
1194
1195 def AcceptInstance(instance, info, target):
1196   """Prepare the node to accept an instance.
1197
1198   @type instance: L{objects.Instance}
1199   @param instance: the instance definition
1200   @type info: string/data (opaque)
1201   @param info: migration information, from the source node
1202   @type target: string
1203   @param target: target host (usually ip), on this node
1204
1205   """
1206   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1207   try:
1208     hyper.AcceptInstance(instance, info, target)
1209   except errors.HypervisorError, err:
1210     _Fail("Failed to accept instance: %s", err, exc=True)
1211
1212
1213 def FinalizeMigration(instance, info, success):
1214   """Finalize any preparation to accept an instance.
1215
1216   @type instance: L{objects.Instance}
1217   @param instance: the instance definition
1218   @type info: string/data (opaque)
1219   @param info: migration information, from the source node
1220   @type success: boolean
1221   @param success: whether the migration was a success or a failure
1222
1223   """
1224   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1225   try:
1226     hyper.FinalizeMigration(instance, info, success)
1227   except errors.HypervisorError, err:
1228     _Fail("Failed to finalize migration: %s", err, exc=True)
1229
1230
1231 def MigrateInstance(instance, target, live):
1232   """Migrates an instance to another node.
1233
1234   @type instance: L{objects.Instance}
1235   @param instance: the instance definition
1236   @type target: string
1237   @param target: the target node name
1238   @type live: boolean
1239   @param live: whether the migration should be done live or not (the
1240       interpretation of this parameter is left to the hypervisor)
1241   @rtype: tuple
1242   @return: a tuple of (success, msg) where:
1243       - succes is a boolean denoting the success/failure of the operation
1244       - msg is a string with details in case of failure
1245
1246   """
1247   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1248
1249   try:
1250     hyper.MigrateInstance(instance, target, live)
1251   except errors.HypervisorError, err:
1252     _Fail("Failed to migrate instance: %s", err, exc=True)
1253
1254
1255 def BlockdevCreate(disk, size, owner, on_primary, info):
1256   """Creates a block device for an instance.
1257
1258   @type disk: L{objects.Disk}
1259   @param disk: the object describing the disk we should create
1260   @type size: int
1261   @param size: the size of the physical underlying device, in MiB
1262   @type owner: str
1263   @param owner: the name of the instance for which disk is created,
1264       used for device cache data
1265   @type on_primary: boolean
1266   @param on_primary:  indicates if it is the primary node or not
1267   @type info: string
1268   @param info: string that will be sent to the physical device
1269       creation, used for example to set (LVM) tags on LVs
1270
1271   @return: the new unique_id of the device (this can sometime be
1272       computed only after creation), or None. On secondary nodes,
1273       it's not required to return anything.
1274
1275   """
1276   # TODO: remove the obsolete 'size' argument
1277   # pylint: disable-msg=W0613
1278   clist = []
1279   if disk.children:
1280     for child in disk.children:
1281       try:
1282         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1283       except errors.BlockDeviceError, err:
1284         _Fail("Can't assemble device %s: %s", child, err)
1285       if on_primary or disk.AssembleOnSecondary():
1286         # we need the children open in case the device itself has to
1287         # be assembled
1288         try:
1289           # pylint: disable-msg=E1103
1290           crdev.Open()
1291         except errors.BlockDeviceError, err:
1292           _Fail("Can't make child '%s' read-write: %s", child, err)
1293       clist.append(crdev)
1294
1295   try:
1296     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1297   except errors.BlockDeviceError, err:
1298     _Fail("Can't create block device: %s", err)
1299
1300   if on_primary or disk.AssembleOnSecondary():
1301     try:
1302       device.Assemble()
1303     except errors.BlockDeviceError, err:
1304       _Fail("Can't assemble device after creation, unusual event: %s", err)
1305     device.SetSyncSpeed(constants.SYNC_SPEED)
1306     if on_primary or disk.OpenOnSecondary():
1307       try:
1308         device.Open(force=True)
1309       except errors.BlockDeviceError, err:
1310         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1311     DevCacheManager.UpdateCache(device.dev_path, owner,
1312                                 on_primary, disk.iv_name)
1313
1314   device.SetInfo(info)
1315
1316   return device.unique_id
1317
1318
1319 def BlockdevRemove(disk):
1320   """Remove a block device.
1321
1322   @note: This is intended to be called recursively.
1323
1324   @type disk: L{objects.Disk}
1325   @param disk: the disk object we should remove
1326   @rtype: boolean
1327   @return: the success of the operation
1328
1329   """
1330   msgs = []
1331   try:
1332     rdev = _RecursiveFindBD(disk)
1333   except errors.BlockDeviceError, err:
1334     # probably can't attach
1335     logging.info("Can't attach to device %s in remove", disk)
1336     rdev = None
1337   if rdev is not None:
1338     r_path = rdev.dev_path
1339     try:
1340       rdev.Remove()
1341     except errors.BlockDeviceError, err:
1342       msgs.append(str(err))
1343     if not msgs:
1344       DevCacheManager.RemoveCache(r_path)
1345
1346   if disk.children:
1347     for child in disk.children:
1348       try:
1349         BlockdevRemove(child)
1350       except RPCFail, err:
1351         msgs.append(str(err))
1352
1353   if msgs:
1354     _Fail("; ".join(msgs))
1355
1356
1357 def _RecursiveAssembleBD(disk, owner, as_primary):
1358   """Activate a block device for an instance.
1359
1360   This is run on the primary and secondary nodes for an instance.
1361
1362   @note: this function is called recursively.
1363
1364   @type disk: L{objects.Disk}
1365   @param disk: the disk we try to assemble
1366   @type owner: str
1367   @param owner: the name of the instance which owns the disk
1368   @type as_primary: boolean
1369   @param as_primary: if we should make the block device
1370       read/write
1371
1372   @return: the assembled device or None (in case no device
1373       was assembled)
1374   @raise errors.BlockDeviceError: in case there is an error
1375       during the activation of the children or the device
1376       itself
1377
1378   """
1379   children = []
1380   if disk.children:
1381     mcn = disk.ChildrenNeeded()
1382     if mcn == -1:
1383       mcn = 0 # max number of Nones allowed
1384     else:
1385       mcn = len(disk.children) - mcn # max number of Nones
1386     for chld_disk in disk.children:
1387       try:
1388         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1389       except errors.BlockDeviceError, err:
1390         if children.count(None) >= mcn:
1391           raise
1392         cdev = None
1393         logging.error("Error in child activation (but continuing): %s",
1394                       str(err))
1395       children.append(cdev)
1396
1397   if as_primary or disk.AssembleOnSecondary():
1398     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1399     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1400     result = r_dev
1401     if as_primary or disk.OpenOnSecondary():
1402       r_dev.Open()
1403     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1404                                 as_primary, disk.iv_name)
1405
1406   else:
1407     result = True
1408   return result
1409
1410
1411 def BlockdevAssemble(disk, owner, as_primary):
1412   """Activate a block device for an instance.
1413
1414   This is a wrapper over _RecursiveAssembleBD.
1415
1416   @rtype: str or boolean
1417   @return: a C{/dev/...} path for primary nodes, and
1418       C{True} for secondary nodes
1419
1420   """
1421   try:
1422     result = _RecursiveAssembleBD(disk, owner, as_primary)
1423     if isinstance(result, bdev.BlockDev):
1424       # pylint: disable-msg=E1103
1425       result = result.dev_path
1426   except errors.BlockDeviceError, err:
1427     _Fail("Error while assembling disk: %s", err, exc=True)
1428
1429   return result
1430
1431
1432 def BlockdevShutdown(disk):
1433   """Shut down a block device.
1434
1435   First, if the device is assembled (Attach() is successful), then
1436   the device is shutdown. Then the children of the device are
1437   shutdown.
1438
1439   This function is called recursively. Note that we don't cache the
1440   children or such, as oppossed to assemble, shutdown of different
1441   devices doesn't require that the upper device was active.
1442
1443   @type disk: L{objects.Disk}
1444   @param disk: the description of the disk we should
1445       shutdown
1446   @rtype: None
1447
1448   """
1449   msgs = []
1450   r_dev = _RecursiveFindBD(disk)
1451   if r_dev is not None:
1452     r_path = r_dev.dev_path
1453     try:
1454       r_dev.Shutdown()
1455       DevCacheManager.RemoveCache(r_path)
1456     except errors.BlockDeviceError, err:
1457       msgs.append(str(err))
1458
1459   if disk.children:
1460     for child in disk.children:
1461       try:
1462         BlockdevShutdown(child)
1463       except RPCFail, err:
1464         msgs.append(str(err))
1465
1466   if msgs:
1467     _Fail("; ".join(msgs))
1468
1469
1470 def BlockdevAddchildren(parent_cdev, new_cdevs):
1471   """Extend a mirrored block device.
1472
1473   @type parent_cdev: L{objects.Disk}
1474   @param parent_cdev: the disk to which we should add children
1475   @type new_cdevs: list of L{objects.Disk}
1476   @param new_cdevs: the list of children which we should add
1477   @rtype: None
1478
1479   """
1480   parent_bdev = _RecursiveFindBD(parent_cdev)
1481   if parent_bdev is None:
1482     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1483   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1484   if new_bdevs.count(None) > 0:
1485     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1486   parent_bdev.AddChildren(new_bdevs)
1487
1488
1489 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1490   """Shrink a mirrored block device.
1491
1492   @type parent_cdev: L{objects.Disk}
1493   @param parent_cdev: the disk from which we should remove children
1494   @type new_cdevs: list of L{objects.Disk}
1495   @param new_cdevs: the list of children which we should remove
1496   @rtype: None
1497
1498   """
1499   parent_bdev = _RecursiveFindBD(parent_cdev)
1500   if parent_bdev is None:
1501     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1502   devs = []
1503   for disk in new_cdevs:
1504     rpath = disk.StaticDevPath()
1505     if rpath is None:
1506       bd = _RecursiveFindBD(disk)
1507       if bd is None:
1508         _Fail("Can't find device %s while removing children", disk)
1509       else:
1510         devs.append(bd.dev_path)
1511     else:
1512       if not utils.IsNormAbsPath(rpath):
1513         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1514       devs.append(rpath)
1515   parent_bdev.RemoveChildren(devs)
1516
1517
1518 def BlockdevGetmirrorstatus(disks):
1519   """Get the mirroring status of a list of devices.
1520
1521   @type disks: list of L{objects.Disk}
1522   @param disks: the list of disks which we should query
1523   @rtype: disk
1524   @return:
1525       a list of (mirror_done, estimated_time) tuples, which
1526       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1527   @raise errors.BlockDeviceError: if any of the disks cannot be
1528       found
1529
1530   """
1531   stats = []
1532   for dsk in disks:
1533     rbd = _RecursiveFindBD(dsk)
1534     if rbd is None:
1535       _Fail("Can't find device %s", dsk)
1536
1537     stats.append(rbd.CombinedSyncStatus())
1538
1539   return stats
1540
1541
1542 def _RecursiveFindBD(disk):
1543   """Check if a device is activated.
1544
1545   If so, return information about the real device.
1546
1547   @type disk: L{objects.Disk}
1548   @param disk: the disk object we need to find
1549
1550   @return: None if the device can't be found,
1551       otherwise the device instance
1552
1553   """
1554   children = []
1555   if disk.children:
1556     for chdisk in disk.children:
1557       children.append(_RecursiveFindBD(chdisk))
1558
1559   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1560
1561
1562 def _OpenRealBD(disk):
1563   """Opens the underlying block device of a disk.
1564
1565   @type disk: L{objects.Disk}
1566   @param disk: the disk object we want to open
1567
1568   """
1569   real_disk = _RecursiveFindBD(disk)
1570   if real_disk is None:
1571     _Fail("Block device '%s' is not set up", disk)
1572
1573   real_disk.Open()
1574
1575   return real_disk
1576
1577
1578 def BlockdevFind(disk):
1579   """Check if a device is activated.
1580
1581   If it is, return information about the real device.
1582
1583   @type disk: L{objects.Disk}
1584   @param disk: the disk to find
1585   @rtype: None or objects.BlockDevStatus
1586   @return: None if the disk cannot be found, otherwise a the current
1587            information
1588
1589   """
1590   try:
1591     rbd = _RecursiveFindBD(disk)
1592   except errors.BlockDeviceError, err:
1593     _Fail("Failed to find device: %s", err, exc=True)
1594
1595   if rbd is None:
1596     return None
1597
1598   return rbd.GetSyncStatus()
1599
1600
1601 def BlockdevGetsize(disks):
1602   """Computes the size of the given disks.
1603
1604   If a disk is not found, returns None instead.
1605
1606   @type disks: list of L{objects.Disk}
1607   @param disks: the list of disk to compute the size for
1608   @rtype: list
1609   @return: list with elements None if the disk cannot be found,
1610       otherwise the size
1611
1612   """
1613   result = []
1614   for cf in disks:
1615     try:
1616       rbd = _RecursiveFindBD(cf)
1617     except errors.BlockDeviceError:
1618       result.append(None)
1619       continue
1620     if rbd is None:
1621       result.append(None)
1622     else:
1623       result.append(rbd.GetActualSize())
1624   return result
1625
1626
1627 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1628   """Export a block device to a remote node.
1629
1630   @type disk: L{objects.Disk}
1631   @param disk: the description of the disk to export
1632   @type dest_node: str
1633   @param dest_node: the destination node to export to
1634   @type dest_path: str
1635   @param dest_path: the destination path on the target node
1636   @type cluster_name: str
1637   @param cluster_name: the cluster name, needed for SSH hostalias
1638   @rtype: None
1639
1640   """
1641   real_disk = _OpenRealBD(disk)
1642
1643   # the block size on the read dd is 1MiB to match our units
1644   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1645                                "dd if=%s bs=1048576 count=%s",
1646                                real_disk.dev_path, str(disk.size))
1647
1648   # we set here a smaller block size as, due to ssh buffering, more
1649   # than 64-128k will mostly ignored; we use nocreat to fail if the
1650   # device is not already there or we pass a wrong path; we use
1651   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1652   # to not buffer too much memory; this means that at best, we flush
1653   # every 64k, which will not be very fast
1654   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1655                                 " oflag=dsync", dest_path)
1656
1657   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1658                                                    constants.GANETI_RUNAS,
1659                                                    destcmd)
1660
1661   # all commands have been checked, so we're safe to combine them
1662   command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1663
1664   result = utils.RunCmd(["bash", "-c", command])
1665
1666   if result.failed:
1667     _Fail("Disk copy command '%s' returned error: %s"
1668           " output: %s", command, result.fail_reason, result.output)
1669
1670
1671 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1672   """Write a file to the filesystem.
1673
1674   This allows the master to overwrite(!) a file. It will only perform
1675   the operation if the file belongs to a list of configuration files.
1676
1677   @type file_name: str
1678   @param file_name: the target file name
1679   @type data: str
1680   @param data: the new contents of the file
1681   @type mode: int
1682   @param mode: the mode to give the file (can be None)
1683   @type uid: int
1684   @param uid: the owner of the file (can be -1 for default)
1685   @type gid: int
1686   @param gid: the group of the file (can be -1 for default)
1687   @type atime: float
1688   @param atime: the atime to set on the file (can be None)
1689   @type mtime: float
1690   @param mtime: the mtime to set on the file (can be None)
1691   @rtype: None
1692
1693   """
1694   if not os.path.isabs(file_name):
1695     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1696
1697   if file_name not in _ALLOWED_UPLOAD_FILES:
1698     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1699           file_name)
1700
1701   raw_data = _Decompress(data)
1702
1703   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1704                   atime=atime, mtime=mtime)
1705
1706
1707 def WriteSsconfFiles(values):
1708   """Update all ssconf files.
1709
1710   Wrapper around the SimpleStore.WriteFiles.
1711
1712   """
1713   ssconf.SimpleStore().WriteFiles(values)
1714
1715
1716 def _ErrnoOrStr(err):
1717   """Format an EnvironmentError exception.
1718
1719   If the L{err} argument has an errno attribute, it will be looked up
1720   and converted into a textual C{E...} description. Otherwise the
1721   string representation of the error will be returned.
1722
1723   @type err: L{EnvironmentError}
1724   @param err: the exception to format
1725
1726   """
1727   if hasattr(err, 'errno'):
1728     detail = errno.errorcode[err.errno]
1729   else:
1730     detail = str(err)
1731   return detail
1732
1733
1734 def _OSOndiskAPIVersion(os_dir):
1735   """Compute and return the API version of a given OS.
1736
1737   This function will try to read the API version of the OS residing in
1738   the 'os_dir' directory.
1739
1740   @type os_dir: str
1741   @param os_dir: the directory in which we should look for the OS
1742   @rtype: tuple
1743   @return: tuple (status, data) with status denoting the validity and
1744       data holding either the vaid versions or an error message
1745
1746   """
1747   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1748
1749   try:
1750     st = os.stat(api_file)
1751   except EnvironmentError, err:
1752     return False, ("Required file '%s' not found under path %s: %s" %
1753                    (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1754
1755   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1756     return False, ("File '%s' in %s is not a regular file" %
1757                    (constants.OS_API_FILE, os_dir))
1758
1759   try:
1760     api_versions = utils.ReadFile(api_file).splitlines()
1761   except EnvironmentError, err:
1762     return False, ("Error while reading the API version file at %s: %s" %
1763                    (api_file, _ErrnoOrStr(err)))
1764
1765   try:
1766     api_versions = [int(version.strip()) for version in api_versions]
1767   except (TypeError, ValueError), err:
1768     return False, ("API version(s) can't be converted to integer: %s" %
1769                    str(err))
1770
1771   return True, api_versions
1772
1773
1774 def DiagnoseOS(top_dirs=None):
1775   """Compute the validity for all OSes.
1776
1777   @type top_dirs: list
1778   @param top_dirs: the list of directories in which to
1779       search (if not given defaults to
1780       L{constants.OS_SEARCH_PATH})
1781   @rtype: list of L{objects.OS}
1782   @return: a list of tuples (name, path, status, diagnose, variants,
1783       parameters, api_version) for all (potential) OSes under all
1784       search paths, where:
1785           - name is the (potential) OS name
1786           - path is the full path to the OS
1787           - status True/False is the validity of the OS
1788           - diagnose is the error message for an invalid OS, otherwise empty
1789           - variants is a list of supported OS variants, if any
1790           - parameters is a list of (name, help) parameters, if any
1791           - api_version is a list of support OS API versions
1792
1793   """
1794   if top_dirs is None:
1795     top_dirs = constants.OS_SEARCH_PATH
1796
1797   result = []
1798   for dir_name in top_dirs:
1799     if os.path.isdir(dir_name):
1800       try:
1801         f_names = utils.ListVisibleFiles(dir_name)
1802       except EnvironmentError, err:
1803         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1804         break
1805       for name in f_names:
1806         os_path = utils.PathJoin(dir_name, name)
1807         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1808         if status:
1809           diagnose = ""
1810           variants = os_inst.supported_variants
1811           parameters = os_inst.supported_parameters
1812           api_versions = os_inst.api_versions
1813         else:
1814           diagnose = os_inst
1815           variants = parameters = api_versions = []
1816         result.append((name, os_path, status, diagnose, variants,
1817                        parameters, api_versions))
1818
1819   return result
1820
1821
1822 def _TryOSFromDisk(name, base_dir=None):
1823   """Create an OS instance from disk.
1824
1825   This function will return an OS instance if the given name is a
1826   valid OS name.
1827
1828   @type base_dir: string
1829   @keyword base_dir: Base directory containing OS installations.
1830                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1831   @rtype: tuple
1832   @return: success and either the OS instance if we find a valid one,
1833       or error message
1834
1835   """
1836   if base_dir is None:
1837     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1838   else:
1839     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1840
1841   if os_dir is None:
1842     return False, "Directory for OS %s not found in search path" % name
1843
1844   status, api_versions = _OSOndiskAPIVersion(os_dir)
1845   if not status:
1846     # push the error up
1847     return status, api_versions
1848
1849   if not constants.OS_API_VERSIONS.intersection(api_versions):
1850     return False, ("API version mismatch for path '%s': found %s, want %s." %
1851                    (os_dir, api_versions, constants.OS_API_VERSIONS))
1852
1853   # OS Files dictionary, we will populate it with the absolute path names
1854   os_files = dict.fromkeys(constants.OS_SCRIPTS)
1855
1856   if max(api_versions) >= constants.OS_API_V15:
1857     os_files[constants.OS_VARIANTS_FILE] = ''
1858
1859   if max(api_versions) >= constants.OS_API_V20:
1860     os_files[constants.OS_PARAMETERS_FILE] = ''
1861   else:
1862     del os_files[constants.OS_SCRIPT_VERIFY]
1863
1864   for filename in os_files:
1865     os_files[filename] = utils.PathJoin(os_dir, filename)
1866
1867     try:
1868       st = os.stat(os_files[filename])
1869     except EnvironmentError, err:
1870       return False, ("File '%s' under path '%s' is missing (%s)" %
1871                      (filename, os_dir, _ErrnoOrStr(err)))
1872
1873     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1874       return False, ("File '%s' under path '%s' is not a regular file" %
1875                      (filename, os_dir))
1876
1877     if filename in constants.OS_SCRIPTS:
1878       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1879         return False, ("File '%s' under path '%s' is not executable" %
1880                        (filename, os_dir))
1881
1882   variants = []
1883   if constants.OS_VARIANTS_FILE in os_files:
1884     variants_file = os_files[constants.OS_VARIANTS_FILE]
1885     try:
1886       variants = utils.ReadFile(variants_file).splitlines()
1887     except EnvironmentError, err:
1888       return False, ("Error while reading the OS variants file at %s: %s" %
1889                      (variants_file, _ErrnoOrStr(err)))
1890     if not variants:
1891       return False, ("No supported os variant found")
1892
1893   parameters = []
1894   if constants.OS_PARAMETERS_FILE in os_files:
1895     parameters_file = os_files[constants.OS_PARAMETERS_FILE]
1896     try:
1897       parameters = utils.ReadFile(parameters_file).splitlines()
1898     except EnvironmentError, err:
1899       return False, ("Error while reading the OS parameters file at %s: %s" %
1900                      (parameters_file, _ErrnoOrStr(err)))
1901     parameters = [v.split(None, 1) for v in parameters]
1902
1903   os_obj = objects.OS(name=name, path=os_dir,
1904                       create_script=os_files[constants.OS_SCRIPT_CREATE],
1905                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
1906                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
1907                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
1908                       verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
1909                                                  None),
1910                       supported_variants=variants,
1911                       supported_parameters=parameters,
1912                       api_versions=api_versions)
1913   return True, os_obj
1914
1915
1916 def OSFromDisk(name, base_dir=None):
1917   """Create an OS instance from disk.
1918
1919   This function will return an OS instance if the given name is a
1920   valid OS name. Otherwise, it will raise an appropriate
1921   L{RPCFail} exception, detailing why this is not a valid OS.
1922
1923   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1924   an exception but returns true/false status data.
1925
1926   @type base_dir: string
1927   @keyword base_dir: Base directory containing OS installations.
1928                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1929   @rtype: L{objects.OS}
1930   @return: the OS instance if we find a valid one
1931   @raise RPCFail: if we don't find a valid OS
1932
1933   """
1934   name_only = name.split("+", 1)[0]
1935   status, payload = _TryOSFromDisk(name_only, base_dir)
1936
1937   if not status:
1938     _Fail(payload)
1939
1940   return payload
1941
1942
1943 def OSCoreEnv(inst_os, os_params, debug=0):
1944   """Calculate the basic environment for an os script.
1945
1946   @type inst_os: L{objects.OS}
1947   @param inst_os: operating system for which the environment is being built
1948   @type os_params: dict
1949   @param os_params: the OS parameters
1950   @type debug: integer
1951   @param debug: debug level (0 or 1, for OS Api 10)
1952   @rtype: dict
1953   @return: dict of environment variables
1954   @raise errors.BlockDeviceError: if the block device
1955       cannot be found
1956
1957   """
1958   result = {}
1959   api_version = \
1960     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1961   result['OS_API_VERSION'] = '%d' % api_version
1962   result['OS_NAME'] = inst_os.name
1963   result['DEBUG_LEVEL'] = '%d' % debug
1964
1965   # OS variants
1966   if api_version >= constants.OS_API_V15:
1967     try:
1968       variant = inst_os.name.split('+', 1)[1]
1969     except IndexError:
1970       variant = inst_os.supported_variants[0]
1971     result['OS_VARIANT'] = variant
1972
1973   # OS params
1974   for pname, pvalue in os_params.items():
1975     result['OSP_%s' % pname.upper()] = pvalue
1976
1977   return result
1978
1979
1980 def OSEnvironment(instance, inst_os, debug=0):
1981   """Calculate the environment for an os script.
1982
1983   @type instance: L{objects.Instance}
1984   @param instance: target instance for the os script run
1985   @type inst_os: L{objects.OS}
1986   @param inst_os: operating system for which the environment is being built
1987   @type debug: integer
1988   @param debug: debug level (0 or 1, for OS Api 10)
1989   @rtype: dict
1990   @return: dict of environment variables
1991   @raise errors.BlockDeviceError: if the block device
1992       cannot be found
1993
1994   """
1995   result = OSCoreEnv(inst_os, instance.osparams, debug=debug)
1996
1997   result['INSTANCE_NAME'] = instance.name
1998   result['INSTANCE_OS'] = instance.os
1999   result['HYPERVISOR'] = instance.hypervisor
2000   result['DISK_COUNT'] = '%d' % len(instance.disks)
2001   result['NIC_COUNT'] = '%d' % len(instance.nics)
2002
2003   # Disks
2004   for idx, disk in enumerate(instance.disks):
2005     real_disk = _OpenRealBD(disk)
2006     result['DISK_%d_PATH' % idx] = real_disk.dev_path
2007     result['DISK_%d_ACCESS' % idx] = disk.mode
2008     if constants.HV_DISK_TYPE in instance.hvparams:
2009       result['DISK_%d_FRONTEND_TYPE' % idx] = \
2010         instance.hvparams[constants.HV_DISK_TYPE]
2011     if disk.dev_type in constants.LDS_BLOCK:
2012       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
2013     elif disk.dev_type == constants.LD_FILE:
2014       result['DISK_%d_BACKEND_TYPE' % idx] = \
2015         'file:%s' % disk.physical_id[0]
2016
2017   # NICs
2018   for idx, nic in enumerate(instance.nics):
2019     result['NIC_%d_MAC' % idx] = nic.mac
2020     if nic.ip:
2021       result['NIC_%d_IP' % idx] = nic.ip
2022     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
2023     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2024       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
2025     if nic.nicparams[constants.NIC_LINK]:
2026       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
2027     if constants.HV_NIC_TYPE in instance.hvparams:
2028       result['NIC_%d_FRONTEND_TYPE' % idx] = \
2029         instance.hvparams[constants.HV_NIC_TYPE]
2030
2031   # HV/BE params
2032   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2033     for key, value in source.items():
2034       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2035
2036   return result
2037
2038
2039 def BlockdevGrow(disk, amount):
2040   """Grow a stack of block devices.
2041
2042   This function is called recursively, with the childrens being the
2043   first ones to resize.
2044
2045   @type disk: L{objects.Disk}
2046   @param disk: the disk to be grown
2047   @rtype: (status, result)
2048   @return: a tuple with the status of the operation
2049       (True/False), and the errors message if status
2050       is False
2051
2052   """
2053   r_dev = _RecursiveFindBD(disk)
2054   if r_dev is None:
2055     _Fail("Cannot find block device %s", disk)
2056
2057   try:
2058     r_dev.Grow(amount)
2059   except errors.BlockDeviceError, err:
2060     _Fail("Failed to grow block device: %s", err, exc=True)
2061
2062
2063 def BlockdevSnapshot(disk):
2064   """Create a snapshot copy of a block device.
2065
2066   This function is called recursively, and the snapshot is actually created
2067   just for the leaf lvm backend device.
2068
2069   @type disk: L{objects.Disk}
2070   @param disk: the disk to be snapshotted
2071   @rtype: string
2072   @return: snapshot disk path
2073
2074   """
2075   if disk.dev_type == constants.LD_DRBD8:
2076     if not disk.children:
2077       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2078             disk.unique_id)
2079     return BlockdevSnapshot(disk.children[0])
2080   elif disk.dev_type == constants.LD_LV:
2081     r_dev = _RecursiveFindBD(disk)
2082     if r_dev is not None:
2083       # FIXME: choose a saner value for the snapshot size
2084       # let's stay on the safe side and ask for the full size, for now
2085       return r_dev.Snapshot(disk.size)
2086     else:
2087       _Fail("Cannot find block device %s", disk)
2088   else:
2089     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2090           disk.unique_id, disk.dev_type)
2091
2092
2093 def FinalizeExport(instance, snap_disks):
2094   """Write out the export configuration information.
2095
2096   @type instance: L{objects.Instance}
2097   @param instance: the instance which we export, used for
2098       saving configuration
2099   @type snap_disks: list of L{objects.Disk}
2100   @param snap_disks: list of snapshot block devices, which
2101       will be used to get the actual name of the dump file
2102
2103   @rtype: None
2104
2105   """
2106   destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2107   finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2108
2109   config = objects.SerializableConfigParser()
2110
2111   config.add_section(constants.INISECT_EXP)
2112   config.set(constants.INISECT_EXP, 'version', '0')
2113   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2114   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2115   config.set(constants.INISECT_EXP, 'os', instance.os)
2116   config.set(constants.INISECT_EXP, 'compression', 'gzip')
2117
2118   config.add_section(constants.INISECT_INS)
2119   config.set(constants.INISECT_INS, 'name', instance.name)
2120   config.set(constants.INISECT_INS, 'memory', '%d' %
2121              instance.beparams[constants.BE_MEMORY])
2122   config.set(constants.INISECT_INS, 'vcpus', '%d' %
2123              instance.beparams[constants.BE_VCPUS])
2124   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2125   config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2126
2127   nic_total = 0
2128   for nic_count, nic in enumerate(instance.nics):
2129     nic_total += 1
2130     config.set(constants.INISECT_INS, 'nic%d_mac' %
2131                nic_count, '%s' % nic.mac)
2132     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2133     for param in constants.NICS_PARAMETER_TYPES:
2134       config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2135                  '%s' % nic.nicparams.get(param, None))
2136   # TODO: redundant: on load can read nics until it doesn't exist
2137   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2138
2139   disk_total = 0
2140   for disk_count, disk in enumerate(snap_disks):
2141     if disk:
2142       disk_total += 1
2143       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2144                  ('%s' % disk.iv_name))
2145       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2146                  ('%s' % disk.physical_id[1]))
2147       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2148                  ('%d' % disk.size))
2149
2150   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2151
2152   # New-style hypervisor/backend parameters
2153
2154   config.add_section(constants.INISECT_HYP)
2155   for name, value in instance.hvparams.items():
2156     if name not in constants.HVC_GLOBALS:
2157       config.set(constants.INISECT_HYP, name, str(value))
2158
2159   config.add_section(constants.INISECT_BEP)
2160   for name, value in instance.beparams.items():
2161     config.set(constants.INISECT_BEP, name, str(value))
2162
2163   config.add_section(constants.INISECT_OSP)
2164   for name, value in instance.osparams.items():
2165     config.set(constants.INISECT_OSP, name, str(value))
2166
2167   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2168                   data=config.Dumps())
2169   shutil.rmtree(finaldestdir, ignore_errors=True)
2170   shutil.move(destdir, finaldestdir)
2171
2172
2173 def ExportInfo(dest):
2174   """Get export configuration information.
2175
2176   @type dest: str
2177   @param dest: directory containing the export
2178
2179   @rtype: L{objects.SerializableConfigParser}
2180   @return: a serializable config file containing the
2181       export info
2182
2183   """
2184   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2185
2186   config = objects.SerializableConfigParser()
2187   config.read(cff)
2188
2189   if (not config.has_section(constants.INISECT_EXP) or
2190       not config.has_section(constants.INISECT_INS)):
2191     _Fail("Export info file doesn't have the required fields")
2192
2193   return config.Dumps()
2194
2195
2196 def ListExports():
2197   """Return a list of exports currently available on this machine.
2198
2199   @rtype: list
2200   @return: list of the exports
2201
2202   """
2203   if os.path.isdir(constants.EXPORT_DIR):
2204     return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2205   else:
2206     _Fail("No exports directory")
2207
2208
2209 def RemoveExport(export):
2210   """Remove an existing export from the node.
2211
2212   @type export: str
2213   @param export: the name of the export to remove
2214   @rtype: None
2215
2216   """
2217   target = utils.PathJoin(constants.EXPORT_DIR, export)
2218
2219   try:
2220     shutil.rmtree(target)
2221   except EnvironmentError, err:
2222     _Fail("Error while removing the export: %s", err, exc=True)
2223
2224
2225 def BlockdevRename(devlist):
2226   """Rename a list of block devices.
2227
2228   @type devlist: list of tuples
2229   @param devlist: list of tuples of the form  (disk,
2230       new_logical_id, new_physical_id); disk is an
2231       L{objects.Disk} object describing the current disk,
2232       and new logical_id/physical_id is the name we
2233       rename it to
2234   @rtype: boolean
2235   @return: True if all renames succeeded, False otherwise
2236
2237   """
2238   msgs = []
2239   result = True
2240   for disk, unique_id in devlist:
2241     dev = _RecursiveFindBD(disk)
2242     if dev is None:
2243       msgs.append("Can't find device %s in rename" % str(disk))
2244       result = False
2245       continue
2246     try:
2247       old_rpath = dev.dev_path
2248       dev.Rename(unique_id)
2249       new_rpath = dev.dev_path
2250       if old_rpath != new_rpath:
2251         DevCacheManager.RemoveCache(old_rpath)
2252         # FIXME: we should add the new cache information here, like:
2253         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2254         # but we don't have the owner here - maybe parse from existing
2255         # cache? for now, we only lose lvm data when we rename, which
2256         # is less critical than DRBD or MD
2257     except errors.BlockDeviceError, err:
2258       msgs.append("Can't rename device '%s' to '%s': %s" %
2259                   (dev, unique_id, err))
2260       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2261       result = False
2262   if not result:
2263     _Fail("; ".join(msgs))
2264
2265
2266 def _TransformFileStorageDir(file_storage_dir):
2267   """Checks whether given file_storage_dir is valid.
2268
2269   Checks wheter the given file_storage_dir is within the cluster-wide
2270   default file_storage_dir stored in SimpleStore. Only paths under that
2271   directory are allowed.
2272
2273   @type file_storage_dir: str
2274   @param file_storage_dir: the path to check
2275
2276   @return: the normalized path if valid, None otherwise
2277
2278   """
2279   if not constants.ENABLE_FILE_STORAGE:
2280     _Fail("File storage disabled at configure time")
2281   cfg = _GetConfig()
2282   file_storage_dir = os.path.normpath(file_storage_dir)
2283   base_file_storage_dir = cfg.GetFileStorageDir()
2284   if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2285       base_file_storage_dir):
2286     _Fail("File storage directory '%s' is not under base file"
2287           " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2288   return file_storage_dir
2289
2290
2291 def CreateFileStorageDir(file_storage_dir):
2292   """Create file storage directory.
2293
2294   @type file_storage_dir: str
2295   @param file_storage_dir: directory to create
2296
2297   @rtype: tuple
2298   @return: tuple with first element a boolean indicating wheter dir
2299       creation was successful or not
2300
2301   """
2302   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2303   if os.path.exists(file_storage_dir):
2304     if not os.path.isdir(file_storage_dir):
2305       _Fail("Specified storage dir '%s' is not a directory",
2306             file_storage_dir)
2307   else:
2308     try:
2309       os.makedirs(file_storage_dir, 0750)
2310     except OSError, err:
2311       _Fail("Cannot create file storage directory '%s': %s",
2312             file_storage_dir, err, exc=True)
2313
2314
2315 def RemoveFileStorageDir(file_storage_dir):
2316   """Remove file storage directory.
2317
2318   Remove it only if it's empty. If not log an error and return.
2319
2320   @type file_storage_dir: str
2321   @param file_storage_dir: the directory we should cleanup
2322   @rtype: tuple (success,)
2323   @return: tuple of one element, C{success}, denoting
2324       whether the operation was successful
2325
2326   """
2327   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2328   if os.path.exists(file_storage_dir):
2329     if not os.path.isdir(file_storage_dir):
2330       _Fail("Specified Storage directory '%s' is not a directory",
2331             file_storage_dir)
2332     # deletes dir only if empty, otherwise we want to fail the rpc call
2333     try:
2334       os.rmdir(file_storage_dir)
2335     except OSError, err:
2336       _Fail("Cannot remove file storage directory '%s': %s",
2337             file_storage_dir, err)
2338
2339
2340 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2341   """Rename the file storage directory.
2342
2343   @type old_file_storage_dir: str
2344   @param old_file_storage_dir: the current path
2345   @type new_file_storage_dir: str
2346   @param new_file_storage_dir: the name we should rename to
2347   @rtype: tuple (success,)
2348   @return: tuple of one element, C{success}, denoting
2349       whether the operation was successful
2350
2351   """
2352   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2353   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2354   if not os.path.exists(new_file_storage_dir):
2355     if os.path.isdir(old_file_storage_dir):
2356       try:
2357         os.rename(old_file_storage_dir, new_file_storage_dir)
2358       except OSError, err:
2359         _Fail("Cannot rename '%s' to '%s': %s",
2360               old_file_storage_dir, new_file_storage_dir, err)
2361     else:
2362       _Fail("Specified storage dir '%s' is not a directory",
2363             old_file_storage_dir)
2364   else:
2365     if os.path.exists(old_file_storage_dir):
2366       _Fail("Cannot rename '%s' to '%s': both locations exist",
2367             old_file_storage_dir, new_file_storage_dir)
2368
2369
2370 def _EnsureJobQueueFile(file_name):
2371   """Checks whether the given filename is in the queue directory.
2372
2373   @type file_name: str
2374   @param file_name: the file name we should check
2375   @rtype: None
2376   @raises RPCFail: if the file is not valid
2377
2378   """
2379   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2380   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2381
2382   if not result:
2383     _Fail("Passed job queue file '%s' does not belong to"
2384           " the queue directory '%s'", file_name, queue_dir)
2385
2386
2387 def JobQueueUpdate(file_name, content):
2388   """Updates a file in the queue directory.
2389
2390   This is just a wrapper over L{utils.WriteFile}, with proper
2391   checking.
2392
2393   @type file_name: str
2394   @param file_name: the job file name
2395   @type content: str
2396   @param content: the new job contents
2397   @rtype: boolean
2398   @return: the success of the operation
2399
2400   """
2401   _EnsureJobQueueFile(file_name)
2402
2403   # Write and replace the file atomically
2404   utils.WriteFile(file_name, data=_Decompress(content))
2405
2406
2407 def JobQueueRename(old, new):
2408   """Renames a job queue file.
2409
2410   This is just a wrapper over os.rename with proper checking.
2411
2412   @type old: str
2413   @param old: the old (actual) file name
2414   @type new: str
2415   @param new: the desired file name
2416   @rtype: tuple
2417   @return: the success of the operation and payload
2418
2419   """
2420   _EnsureJobQueueFile(old)
2421   _EnsureJobQueueFile(new)
2422
2423   utils.RenameFile(old, new, mkdir=True)
2424
2425
2426 def BlockdevClose(instance_name, disks):
2427   """Closes the given block devices.
2428
2429   This means they will be switched to secondary mode (in case of
2430   DRBD).
2431
2432   @param instance_name: if the argument is not empty, the symlinks
2433       of this instance will be removed
2434   @type disks: list of L{objects.Disk}
2435   @param disks: the list of disks to be closed
2436   @rtype: tuple (success, message)
2437   @return: a tuple of success and message, where success
2438       indicates the succes of the operation, and message
2439       which will contain the error details in case we
2440       failed
2441
2442   """
2443   bdevs = []
2444   for cf in disks:
2445     rd = _RecursiveFindBD(cf)
2446     if rd is None:
2447       _Fail("Can't find device %s", cf)
2448     bdevs.append(rd)
2449
2450   msg = []
2451   for rd in bdevs:
2452     try:
2453       rd.Close()
2454     except errors.BlockDeviceError, err:
2455       msg.append(str(err))
2456   if msg:
2457     _Fail("Can't make devices secondary: %s", ",".join(msg))
2458   else:
2459     if instance_name:
2460       _RemoveBlockDevLinks(instance_name, disks)
2461
2462
2463 def ValidateHVParams(hvname, hvparams):
2464   """Validates the given hypervisor parameters.
2465
2466   @type hvname: string
2467   @param hvname: the hypervisor name
2468   @type hvparams: dict
2469   @param hvparams: the hypervisor parameters to be validated
2470   @rtype: None
2471
2472   """
2473   try:
2474     hv_type = hypervisor.GetHypervisor(hvname)
2475     hv_type.ValidateParameters(hvparams)
2476   except errors.HypervisorError, err:
2477     _Fail(str(err), log=False)
2478
2479
2480 def _CheckOSPList(os_obj, parameters):
2481   """Check whether a list of parameters is supported by the OS.
2482
2483   @type os_obj: L{objects.OS}
2484   @param os_obj: OS object to check
2485   @type parameters: list
2486   @param parameters: the list of parameters to check
2487
2488   """
2489   supported = [v[0] for v in os_obj.supported_parameters]
2490   delta = frozenset(parameters).difference(supported)
2491   if delta:
2492     _Fail("The following parameters are not supported"
2493           " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2494
2495
2496 def ValidateOS(required, osname, checks, osparams):
2497   """Validate the given OS' parameters.
2498
2499   @type required: boolean
2500   @param required: whether absence of the OS should translate into
2501       failure or not
2502   @type osname: string
2503   @param osname: the OS to be validated
2504   @type checks: list
2505   @param checks: list of the checks to run (currently only 'parameters')
2506   @type osparams: dict
2507   @param osparams: dictionary with OS parameters
2508   @rtype: boolean
2509   @return: True if the validation passed, or False if the OS was not
2510       found and L{required} was false
2511
2512   """
2513   if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2514     _Fail("Unknown checks required for OS %s: %s", osname,
2515           set(checks).difference(constants.OS_VALIDATE_CALLS))
2516
2517   name_only = osname.split("+", 1)[0]
2518   status, tbv = _TryOSFromDisk(name_only, None)
2519
2520   if not status:
2521     if required:
2522       _Fail(tbv)
2523     else:
2524       return False
2525
2526   if max(tbv.api_versions) < constants.OS_API_V20:
2527     return True
2528
2529   if constants.OS_VALIDATE_PARAMETERS in checks:
2530     _CheckOSPList(tbv, osparams.keys())
2531
2532   validate_env = OSCoreEnv(tbv, osparams)
2533   result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2534                         cwd=tbv.path)
2535   if result.failed:
2536     logging.error("os validate command '%s' returned error: %s output: %s",
2537                   result.cmd, result.fail_reason, result.output)
2538     _Fail("OS validation script failed (%s), output: %s",
2539           result.fail_reason, result.output, log=False)
2540
2541   return True
2542
2543
2544 def DemoteFromMC():
2545   """Demotes the current node from master candidate role.
2546
2547   """
2548   # try to ensure we're not the master by mistake
2549   master, myself = ssconf.GetMasterAndMyself()
2550   if master == myself:
2551     _Fail("ssconf status shows I'm the master node, will not demote")
2552
2553   result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2554   if not result.failed:
2555     _Fail("The master daemon is running, will not demote")
2556
2557   try:
2558     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2559       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2560   except EnvironmentError, err:
2561     if err.errno != errno.ENOENT:
2562       _Fail("Error while backing up cluster file: %s", err, exc=True)
2563
2564   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2565
2566
2567 def _GetX509Filenames(cryptodir, name):
2568   """Returns the full paths for the private key and certificate.
2569
2570   """
2571   return (utils.PathJoin(cryptodir, name),
2572           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2573           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2574
2575
2576 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2577   """Creates a new X509 certificate for SSL/TLS.
2578
2579   @type validity: int
2580   @param validity: Validity in seconds
2581   @rtype: tuple; (string, string)
2582   @return: Certificate name and public part
2583
2584   """
2585   (key_pem, cert_pem) = \
2586     utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2587                                      min(validity, _MAX_SSL_CERT_VALIDITY))
2588
2589   cert_dir = tempfile.mkdtemp(dir=cryptodir,
2590                               prefix="x509-%s-" % utils.TimestampForFilename())
2591   try:
2592     name = os.path.basename(cert_dir)
2593     assert len(name) > 5
2594
2595     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2596
2597     utils.WriteFile(key_file, mode=0400, data=key_pem)
2598     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2599
2600     # Never return private key as it shouldn't leave the node
2601     return (name, cert_pem)
2602   except Exception:
2603     shutil.rmtree(cert_dir, ignore_errors=True)
2604     raise
2605
2606
2607 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2608   """Removes a X509 certificate.
2609
2610   @type name: string
2611   @param name: Certificate name
2612
2613   """
2614   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2615
2616   utils.RemoveFile(key_file)
2617   utils.RemoveFile(cert_file)
2618
2619   try:
2620     os.rmdir(cert_dir)
2621   except EnvironmentError, err:
2622     _Fail("Cannot remove certificate directory '%s': %s",
2623           cert_dir, err)
2624
2625
2626 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2627   """Returns the command for the requested input/output.
2628
2629   @type instance: L{objects.Instance}
2630   @param instance: The instance object
2631   @param mode: Import/export mode
2632   @param ieio: Input/output type
2633   @param ieargs: Input/output arguments
2634
2635   """
2636   assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2637
2638   env = None
2639   prefix = None
2640   suffix = None
2641   exp_size = None
2642
2643   if ieio == constants.IEIO_FILE:
2644     (filename, ) = ieargs
2645
2646     if not utils.IsNormAbsPath(filename):
2647       _Fail("Path '%s' is not normalized or absolute", filename)
2648
2649     directory = os.path.normpath(os.path.dirname(filename))
2650
2651     if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2652         constants.EXPORT_DIR):
2653       _Fail("File '%s' is not under exports directory '%s'",
2654             filename, constants.EXPORT_DIR)
2655
2656     # Create directory
2657     utils.Makedirs(directory, mode=0750)
2658
2659     quoted_filename = utils.ShellQuote(filename)
2660
2661     if mode == constants.IEM_IMPORT:
2662       suffix = "> %s" % quoted_filename
2663     elif mode == constants.IEM_EXPORT:
2664       suffix = "< %s" % quoted_filename
2665
2666       # Retrieve file size
2667       try:
2668         st = os.stat(filename)
2669       except EnvironmentError, err:
2670         logging.error("Can't stat(2) %s: %s", filename, err)
2671       else:
2672         exp_size = utils.BytesToMebibyte(st.st_size)
2673
2674   elif ieio == constants.IEIO_RAW_DISK:
2675     (disk, ) = ieargs
2676
2677     real_disk = _OpenRealBD(disk)
2678
2679     if mode == constants.IEM_IMPORT:
2680       # we set here a smaller block size as, due to transport buffering, more
2681       # than 64-128k will mostly ignored; we use nocreat to fail if the device
2682       # is not already there or we pass a wrong path; we use notrunc to no
2683       # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2684       # much memory; this means that at best, we flush every 64k, which will
2685       # not be very fast
2686       suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2687                                     " bs=%s oflag=dsync"),
2688                                     real_disk.dev_path,
2689                                     str(64 * 1024))
2690
2691     elif mode == constants.IEM_EXPORT:
2692       # the block size on the read dd is 1MiB to match our units
2693       prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2694                                    real_disk.dev_path,
2695                                    str(1024 * 1024), # 1 MB
2696                                    str(disk.size))
2697       exp_size = disk.size
2698
2699   elif ieio == constants.IEIO_SCRIPT:
2700     (disk, disk_index, ) = ieargs
2701
2702     assert isinstance(disk_index, (int, long))
2703
2704     real_disk = _OpenRealBD(disk)
2705
2706     inst_os = OSFromDisk(instance.os)
2707     env = OSEnvironment(instance, inst_os)
2708
2709     if mode == constants.IEM_IMPORT:
2710       env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2711       env["IMPORT_INDEX"] = str(disk_index)
2712       script = inst_os.import_script
2713
2714     elif mode == constants.IEM_EXPORT:
2715       env["EXPORT_DEVICE"] = real_disk.dev_path
2716       env["EXPORT_INDEX"] = str(disk_index)
2717       script = inst_os.export_script
2718
2719     # TODO: Pass special environment only to script
2720     script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2721
2722     if mode == constants.IEM_IMPORT:
2723       suffix = "| %s" % script_cmd
2724
2725     elif mode == constants.IEM_EXPORT:
2726       prefix = "%s |" % script_cmd
2727
2728     # Let script predict size
2729     exp_size = constants.IE_CUSTOM_SIZE
2730
2731   else:
2732     _Fail("Invalid %s I/O mode %r", mode, ieio)
2733
2734   return (env, prefix, suffix, exp_size)
2735
2736
2737 def _CreateImportExportStatusDir(prefix):
2738   """Creates status directory for import/export.
2739
2740   """
2741   return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2742                           prefix=("%s-%s-" %
2743                                   (prefix, utils.TimestampForFilename())))
2744
2745
2746 def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
2747   """Starts an import or export daemon.
2748
2749   @param mode: Import/output mode
2750   @type opts: L{objects.ImportExportOptions}
2751   @param opts: Daemon options
2752   @type host: string
2753   @param host: Remote host for export (None for import)
2754   @type port: int
2755   @param port: Remote port for export (None for import)
2756   @type instance: L{objects.Instance}
2757   @param instance: Instance object
2758   @param ieio: Input/output type
2759   @param ieioargs: Input/output arguments
2760
2761   """
2762   if mode == constants.IEM_IMPORT:
2763     prefix = "import"
2764
2765     if not (host is None and port is None):
2766       _Fail("Can not specify host or port on import")
2767
2768   elif mode == constants.IEM_EXPORT:
2769     prefix = "export"
2770
2771     if host is None or port is None:
2772       _Fail("Host and port must be specified for an export")
2773
2774   else:
2775     _Fail("Invalid mode %r", mode)
2776
2777   if (opts.key_name is None) ^ (opts.ca_pem is None):
2778     _Fail("Cluster certificate can only be used for both key and CA")
2779
2780   (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
2781     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
2782
2783   if opts.key_name is None:
2784     # Use server.pem
2785     key_path = constants.NODED_CERT_FILE
2786     cert_path = constants.NODED_CERT_FILE
2787     assert opts.ca_pem is None
2788   else:
2789     (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
2790                                                  opts.key_name)
2791     assert opts.ca_pem is not None
2792
2793   for i in [key_path, cert_path]:
2794     if not os.path.exists(i):
2795       _Fail("File '%s' does not exist" % i)
2796
2797   status_dir = _CreateImportExportStatusDir(prefix)
2798   try:
2799     status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
2800     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
2801     ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
2802
2803     if opts.ca_pem is None:
2804       # Use server.pem
2805       ca = utils.ReadFile(constants.NODED_CERT_FILE)
2806     else:
2807       ca = opts.ca_pem
2808
2809     # Write CA file
2810     utils.WriteFile(ca_file, data=ca, mode=0400)
2811
2812     cmd = [
2813       constants.IMPORT_EXPORT_DAEMON,
2814       status_file, mode,
2815       "--key=%s" % key_path,
2816       "--cert=%s" % cert_path,
2817       "--ca=%s" % ca_file,
2818       ]
2819
2820     if host:
2821       cmd.append("--host=%s" % host)
2822
2823     if port:
2824       cmd.append("--port=%s" % port)
2825
2826     if opts.compress:
2827       cmd.append("--compress=%s" % opts.compress)
2828
2829     if opts.magic:
2830       cmd.append("--magic=%s" % opts.magic)
2831
2832     if exp_size is not None:
2833       cmd.append("--expected-size=%s" % exp_size)
2834
2835     if cmd_prefix:
2836       cmd.append("--cmd-prefix=%s" % cmd_prefix)
2837
2838     if cmd_suffix:
2839       cmd.append("--cmd-suffix=%s" % cmd_suffix)
2840
2841     logfile = _InstanceLogName(prefix, instance.os, instance.name)
2842
2843     # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
2844     # support for receiving a file descriptor for output
2845     utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
2846                       output=logfile)
2847
2848     # The import/export name is simply the status directory name
2849     return os.path.basename(status_dir)
2850
2851   except Exception:
2852     shutil.rmtree(status_dir, ignore_errors=True)
2853     raise
2854
2855
2856 def GetImportExportStatus(names):
2857   """Returns import/export daemon status.
2858
2859   @type names: sequence
2860   @param names: List of names
2861   @rtype: List of dicts
2862   @return: Returns a list of the state of each named import/export or None if a
2863            status couldn't be read
2864
2865   """
2866   result = []
2867
2868   for name in names:
2869     status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
2870                                  _IES_STATUS_FILE)
2871
2872     try:
2873       data = utils.ReadFile(status_file)
2874     except EnvironmentError, err:
2875       if err.errno != errno.ENOENT:
2876         raise
2877       data = None
2878
2879     if not data:
2880       result.append(None)
2881       continue
2882
2883     result.append(serializer.LoadJson(data))
2884
2885   return result
2886
2887
2888 def AbortImportExport(name):
2889   """Sends SIGTERM to a running import/export daemon.
2890
2891   """
2892   logging.info("Abort import/export %s", name)
2893
2894   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2895   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2896
2897   if pid:
2898     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
2899                  name, pid)
2900     utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
2901
2902
2903 def CleanupImportExport(name):
2904   """Cleanup after an import or export.
2905
2906   If the import/export daemon is still running it's killed. Afterwards the
2907   whole status directory is removed.
2908
2909   """
2910   logging.info("Finalizing import/export %s", name)
2911
2912   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2913
2914   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2915
2916   if pid:
2917     logging.info("Import/export %s is still running with PID %s",
2918                  name, pid)
2919     utils.KillProcess(pid, waitpid=False)
2920
2921   shutil.rmtree(status_dir, ignore_errors=True)
2922
2923
2924 def _FindDisks(nodes_ip, disks):
2925   """Sets the physical ID on disks and returns the block devices.
2926
2927   """
2928   # set the correct physical ID
2929   my_name = netutils.Hostname.GetSysName()
2930   for cf in disks:
2931     cf.SetPhysicalID(my_name, nodes_ip)
2932
2933   bdevs = []
2934
2935   for cf in disks:
2936     rd = _RecursiveFindBD(cf)
2937     if rd is None:
2938       _Fail("Can't find device %s", cf)
2939     bdevs.append(rd)
2940   return bdevs
2941
2942
2943 def DrbdDisconnectNet(nodes_ip, disks):
2944   """Disconnects the network on a list of drbd devices.
2945
2946   """
2947   bdevs = _FindDisks(nodes_ip, disks)
2948
2949   # disconnect disks
2950   for rd in bdevs:
2951     try:
2952       rd.DisconnectNet()
2953     except errors.BlockDeviceError, err:
2954       _Fail("Can't change network configuration to standalone mode: %s",
2955             err, exc=True)
2956
2957
2958 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2959   """Attaches the network on a list of drbd devices.
2960
2961   """
2962   bdevs = _FindDisks(nodes_ip, disks)
2963
2964   if multimaster:
2965     for idx, rd in enumerate(bdevs):
2966       try:
2967         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2968       except EnvironmentError, err:
2969         _Fail("Can't create symlink: %s", err)
2970   # reconnect disks, switch to new master configuration and if
2971   # needed primary mode
2972   for rd in bdevs:
2973     try:
2974       rd.AttachNet(multimaster)
2975     except errors.BlockDeviceError, err:
2976       _Fail("Can't change network configuration: %s", err)
2977
2978   # wait until the disks are connected; we need to retry the re-attach
2979   # if the device becomes standalone, as this might happen if the one
2980   # node disconnects and reconnects in a different mode before the
2981   # other node reconnects; in this case, one or both of the nodes will
2982   # decide it has wrong configuration and switch to standalone
2983
2984   def _Attach():
2985     all_connected = True
2986
2987     for rd in bdevs:
2988       stats = rd.GetProcStatus()
2989
2990       all_connected = (all_connected and
2991                        (stats.is_connected or stats.is_in_resync))
2992
2993       if stats.is_standalone:
2994         # peer had different config info and this node became
2995         # standalone, even though this should not happen with the
2996         # new staged way of changing disk configs
2997         try:
2998           rd.AttachNet(multimaster)
2999         except errors.BlockDeviceError, err:
3000           _Fail("Can't change network configuration: %s", err)
3001
3002     if not all_connected:
3003       raise utils.RetryAgain()
3004
3005   try:
3006     # Start with a delay of 100 miliseconds and go up to 5 seconds
3007     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3008   except utils.RetryTimeout:
3009     _Fail("Timeout in disk reconnecting")
3010
3011   if multimaster:
3012     # change to primary mode
3013     for rd in bdevs:
3014       try:
3015         rd.Open()
3016       except errors.BlockDeviceError, err:
3017         _Fail("Can't change to primary mode: %s", err)
3018
3019
3020 def DrbdWaitSync(nodes_ip, disks):
3021   """Wait until DRBDs have synchronized.
3022
3023   """
3024   def _helper(rd):
3025     stats = rd.GetProcStatus()
3026     if not (stats.is_connected or stats.is_in_resync):
3027       raise utils.RetryAgain()
3028     return stats
3029
3030   bdevs = _FindDisks(nodes_ip, disks)
3031
3032   min_resync = 100
3033   alldone = True
3034   for rd in bdevs:
3035     try:
3036       # poll each second for 15 seconds
3037       stats = utils.Retry(_helper, 1, 15, args=[rd])
3038     except utils.RetryTimeout:
3039       stats = rd.GetProcStatus()
3040       # last check
3041       if not (stats.is_connected or stats.is_in_resync):
3042         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3043     alldone = alldone and (not stats.is_in_resync)
3044     if stats.sync_percent is not None:
3045       min_resync = min(min_resync, stats.sync_percent)
3046
3047   return (alldone, min_resync)
3048
3049
3050 def GetDrbdUsermodeHelper():
3051   """Returns DRBD usermode helper currently configured.
3052
3053   """
3054   try:
3055     return bdev.BaseDRBD.GetUsermodeHelper()
3056   except errors.BlockDeviceError, err:
3057     _Fail(str(err))
3058
3059
3060 def PowercycleNode(hypervisor_type):
3061   """Hard-powercycle the node.
3062
3063   Because we need to return first, and schedule the powercycle in the
3064   background, we won't be able to report failures nicely.
3065
3066   """
3067   hyper = hypervisor.GetHypervisor(hypervisor_type)
3068   try:
3069     pid = os.fork()
3070   except OSError:
3071     # if we can't fork, we'll pretend that we're in the child process
3072     pid = 0
3073   if pid > 0:
3074     return "Reboot scheduled in 5 seconds"
3075   # ensure the child is running on ram
3076   try:
3077     utils.Mlockall()
3078   except Exception: # pylint: disable-msg=W0703
3079     pass
3080   time.sleep(5)
3081   hyper.PowercycleNode()
3082
3083
3084 class HooksRunner(object):
3085   """Hook runner.
3086
3087   This class is instantiated on the node side (ganeti-noded) and not
3088   on the master side.
3089
3090   """
3091   def __init__(self, hooks_base_dir=None):
3092     """Constructor for hooks runner.
3093
3094     @type hooks_base_dir: str or None
3095     @param hooks_base_dir: if not None, this overrides the
3096         L{constants.HOOKS_BASE_DIR} (useful for unittests)
3097
3098     """
3099     if hooks_base_dir is None:
3100       hooks_base_dir = constants.HOOKS_BASE_DIR
3101     # yeah, _BASE_DIR is not valid for attributes, we use it like a
3102     # constant
3103     self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
3104
3105   def RunHooks(self, hpath, phase, env):
3106     """Run the scripts in the hooks directory.
3107
3108     @type hpath: str
3109     @param hpath: the path to the hooks directory which
3110         holds the scripts
3111     @type phase: str
3112     @param phase: either L{constants.HOOKS_PHASE_PRE} or
3113         L{constants.HOOKS_PHASE_POST}
3114     @type env: dict
3115     @param env: dictionary with the environment for the hook
3116     @rtype: list
3117     @return: list of 3-element tuples:
3118       - script path
3119       - script result, either L{constants.HKR_SUCCESS} or
3120         L{constants.HKR_FAIL}
3121       - output of the script
3122
3123     @raise errors.ProgrammerError: for invalid input
3124         parameters
3125
3126     """
3127     if phase == constants.HOOKS_PHASE_PRE:
3128       suffix = "pre"
3129     elif phase == constants.HOOKS_PHASE_POST:
3130       suffix = "post"
3131     else:
3132       _Fail("Unknown hooks phase '%s'", phase)
3133
3134
3135     subdir = "%s-%s.d" % (hpath, suffix)
3136     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3137
3138     results = []
3139
3140     if not os.path.isdir(dir_name):
3141       # for non-existing/non-dirs, we simply exit instead of logging a
3142       # warning at every operation
3143       return results
3144
3145     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3146
3147     for (relname, relstatus, runresult)  in runparts_results:
3148       if relstatus == constants.RUNPARTS_SKIP:
3149         rrval = constants.HKR_SKIP
3150         output = ""
3151       elif relstatus == constants.RUNPARTS_ERR:
3152         rrval = constants.HKR_FAIL
3153         output = "Hook script execution error: %s" % runresult
3154       elif relstatus == constants.RUNPARTS_RUN:
3155         if runresult.failed:
3156           rrval = constants.HKR_FAIL
3157         else:
3158           rrval = constants.HKR_SUCCESS
3159         output = utils.SafeEncode(runresult.output.strip())
3160       results.append(("%s/%s" % (subdir, relname), rrval, output))
3161
3162     return results
3163
3164
3165 class IAllocatorRunner(object):
3166   """IAllocator runner.
3167
3168   This class is instantiated on the node side (ganeti-noded) and not on
3169   the master side.
3170
3171   """
3172   @staticmethod
3173   def Run(name, idata):
3174     """Run an iallocator script.
3175
3176     @type name: str
3177     @param name: the iallocator script name
3178     @type idata: str
3179     @param idata: the allocator input data
3180
3181     @rtype: tuple
3182     @return: two element tuple of:
3183        - status
3184        - either error message or stdout of allocator (for success)
3185
3186     """
3187     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3188                                   os.path.isfile)
3189     if alloc_script is None:
3190       _Fail("iallocator module '%s' not found in the search path", name)
3191
3192     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3193     try:
3194       os.write(fd, idata)
3195       os.close(fd)
3196       result = utils.RunCmd([alloc_script, fin_name])
3197       if result.failed:
3198         _Fail("iallocator module '%s' failed: %s, output '%s'",
3199               name, result.fail_reason, result.output)
3200     finally:
3201       os.unlink(fin_name)
3202
3203     return result.stdout
3204
3205
3206 class DevCacheManager(object):
3207   """Simple class for managing a cache of block device information.
3208
3209   """
3210   _DEV_PREFIX = "/dev/"
3211   _ROOT_DIR = constants.BDEV_CACHE_DIR
3212
3213   @classmethod
3214   def _ConvertPath(cls, dev_path):
3215     """Converts a /dev/name path to the cache file name.
3216
3217     This replaces slashes with underscores and strips the /dev
3218     prefix. It then returns the full path to the cache file.
3219
3220     @type dev_path: str
3221     @param dev_path: the C{/dev/} path name
3222     @rtype: str
3223     @return: the converted path name
3224
3225     """
3226     if dev_path.startswith(cls._DEV_PREFIX):
3227       dev_path = dev_path[len(cls._DEV_PREFIX):]
3228     dev_path = dev_path.replace("/", "_")
3229     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3230     return fpath
3231
3232   @classmethod
3233   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3234     """Updates the cache information for a given device.
3235
3236     @type dev_path: str
3237     @param dev_path: the pathname of the device
3238     @type owner: str
3239     @param owner: the owner (instance name) of the device
3240     @type on_primary: bool
3241     @param on_primary: whether this is the primary
3242         node nor not
3243     @type iv_name: str
3244     @param iv_name: the instance-visible name of the
3245         device, as in objects.Disk.iv_name
3246
3247     @rtype: None
3248
3249     """
3250     if dev_path is None:
3251       logging.error("DevCacheManager.UpdateCache got a None dev_path")
3252       return
3253     fpath = cls._ConvertPath(dev_path)
3254     if on_primary:
3255       state = "primary"
3256     else:
3257       state = "secondary"
3258     if iv_name is None:
3259       iv_name = "not_visible"
3260     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3261     try:
3262       utils.WriteFile(fpath, data=fdata)
3263     except EnvironmentError, err:
3264       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3265
3266   @classmethod
3267   def RemoveCache(cls, dev_path):
3268     """Remove data for a dev_path.
3269
3270     This is just a wrapper over L{utils.RemoveFile} with a converted
3271     path name and logging.
3272
3273     @type dev_path: str
3274     @param dev_path: the pathname of the device
3275
3276     @rtype: None
3277
3278     """
3279     if dev_path is None:
3280       logging.error("DevCacheManager.RemoveCache got a None dev_path")
3281       return
3282     fpath = cls._ConvertPath(dev_path)
3283     try:
3284       utils.RemoveFile(fpath)
3285     except EnvironmentError, err:
3286       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)