Improve import/export timeout settings
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable-msg=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50 import signal
51
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62 from ganeti import runtime
63
64
65 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
66 _ALLOWED_CLEAN_DIRS = frozenset([
67   constants.DATA_DIR,
68   constants.JOB_QUEUE_ARCHIVE_DIR,
69   constants.QUEUE_DIR,
70   constants.CRYPTO_KEYS_DIR,
71   ])
72 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
73 _X509_KEY_FILE = "key"
74 _X509_CERT_FILE = "cert"
75 _IES_STATUS_FILE = "status"
76 _IES_PID_FILE = "pid"
77 _IES_CA_FILE = "ca"
78
79
80 class RPCFail(Exception):
81   """Class denoting RPC failure.
82
83   Its argument is the error message.
84
85   """
86
87
88 def _Fail(msg, *args, **kwargs):
89   """Log an error and the raise an RPCFail exception.
90
91   This exception is then handled specially in the ganeti daemon and
92   turned into a 'failed' return type. As such, this function is a
93   useful shortcut for logging the error and returning it to the master
94   daemon.
95
96   @type msg: string
97   @param msg: the text of the exception
98   @raise RPCFail
99
100   """
101   if args:
102     msg = msg % args
103   if "log" not in kwargs or kwargs["log"]: # if we should log this error
104     if "exc" in kwargs and kwargs["exc"]:
105       logging.exception(msg)
106     else:
107       logging.error(msg)
108   raise RPCFail(msg)
109
110
111 def _GetConfig():
112   """Simple wrapper to return a SimpleStore.
113
114   @rtype: L{ssconf.SimpleStore}
115   @return: a SimpleStore instance
116
117   """
118   return ssconf.SimpleStore()
119
120
121 def _GetSshRunner(cluster_name):
122   """Simple wrapper to return an SshRunner.
123
124   @type cluster_name: str
125   @param cluster_name: the cluster name, which is needed
126       by the SshRunner constructor
127   @rtype: L{ssh.SshRunner}
128   @return: an SshRunner instance
129
130   """
131   return ssh.SshRunner(cluster_name)
132
133
134 def _Decompress(data):
135   """Unpacks data compressed by the RPC client.
136
137   @type data: list or tuple
138   @param data: Data sent by RPC client
139   @rtype: str
140   @return: Decompressed data
141
142   """
143   assert isinstance(data, (list, tuple))
144   assert len(data) == 2
145   (encoding, content) = data
146   if encoding == constants.RPC_ENCODING_NONE:
147     return content
148   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
149     return zlib.decompress(base64.b64decode(content))
150   else:
151     raise AssertionError("Unknown data encoding")
152
153
154 def _CleanDirectory(path, exclude=None):
155   """Removes all regular files in a directory.
156
157   @type path: str
158   @param path: the directory to clean
159   @type exclude: list
160   @param exclude: list of files to be excluded, defaults
161       to the empty list
162
163   """
164   if path not in _ALLOWED_CLEAN_DIRS:
165     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
166           path)
167
168   if not os.path.isdir(path):
169     return
170   if exclude is None:
171     exclude = []
172   else:
173     # Normalize excluded paths
174     exclude = [os.path.normpath(i) for i in exclude]
175
176   for rel_name in utils.ListVisibleFiles(path):
177     full_name = utils.PathJoin(path, rel_name)
178     if full_name in exclude:
179       continue
180     if os.path.isfile(full_name) and not os.path.islink(full_name):
181       utils.RemoveFile(full_name)
182
183
184 def _BuildUploadFileList():
185   """Build the list of allowed upload files.
186
187   This is abstracted so that it's built only once at module import time.
188
189   """
190   allowed_files = set([
191     constants.CLUSTER_CONF_FILE,
192     constants.ETC_HOSTS,
193     constants.SSH_KNOWN_HOSTS_FILE,
194     constants.VNC_PASSWORD_FILE,
195     constants.RAPI_CERT_FILE,
196     constants.RAPI_USERS_FILE,
197     constants.CONFD_HMAC_KEY,
198     constants.CLUSTER_DOMAIN_SECRET_FILE,
199     ])
200
201   for hv_name in constants.HYPER_TYPES:
202     hv_class = hypervisor.GetHypervisorClass(hv_name)
203     allowed_files.update(hv_class.GetAncillaryFiles())
204
205   return frozenset(allowed_files)
206
207
208 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
209
210
211 def JobQueuePurge():
212   """Removes job queue files and archived jobs.
213
214   @rtype: tuple
215   @return: True, None
216
217   """
218   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
219   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
220
221
222 def GetMasterInfo():
223   """Returns master information.
224
225   This is an utility function to compute master information, either
226   for consumption here or from the node daemon.
227
228   @rtype: tuple
229   @return: master_netdev, master_ip, master_name, primary_ip_family
230   @raise RPCFail: in case of errors
231
232   """
233   try:
234     cfg = _GetConfig()
235     master_netdev = cfg.GetMasterNetdev()
236     master_ip = cfg.GetMasterIP()
237     master_node = cfg.GetMasterNode()
238     primary_ip_family = cfg.GetPrimaryIPFamily()
239   except errors.ConfigurationError, err:
240     _Fail("Cluster configuration incomplete: %s", err, exc=True)
241   return (master_netdev, master_ip, master_node, primary_ip_family)
242
243
244 def StartMaster(start_daemons, no_voting):
245   """Activate local node as master node.
246
247   The function will either try activate the IP address of the master
248   (unless someone else has it) or also start the master daemons, based
249   on the start_daemons parameter.
250
251   @type start_daemons: boolean
252   @param start_daemons: whether to start the master daemons
253       (ganeti-masterd and ganeti-rapi), or (if false) activate the
254       master ip
255   @type no_voting: boolean
256   @param no_voting: whether to start ganeti-masterd without a node vote
257       (if start_daemons is True), but still non-interactively
258   @rtype: None
259
260   """
261   # GetMasterInfo will raise an exception if not able to return data
262   master_netdev, master_ip, _, family = GetMasterInfo()
263
264   err_msgs = []
265   # either start the master and rapi daemons
266   if start_daemons:
267     if no_voting:
268       masterd_args = "--no-voting --yes-do-it"
269     else:
270       masterd_args = ""
271
272     env = {
273       "EXTRA_MASTERD_ARGS": masterd_args,
274       }
275
276     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
277     if result.failed:
278       msg = "Can't start Ganeti master: %s" % result.output
279       logging.error(msg)
280       err_msgs.append(msg)
281   # or activate the IP
282   else:
283     if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
284       if netutils.IPAddress.Own(master_ip):
285         # we already have the ip:
286         logging.debug("Master IP already configured, doing nothing")
287       else:
288         msg = "Someone else has the master ip, not activating"
289         logging.error(msg)
290         err_msgs.append(msg)
291     else:
292       ipcls = netutils.IP4Address
293       if family == netutils.IP6Address.family:
294         ipcls = netutils.IP6Address
295
296       result = utils.RunCmd(["ip", "address", "add",
297                              "%s/%d" % (master_ip, ipcls.iplen),
298                              "dev", master_netdev, "label",
299                              "%s:0" % master_netdev])
300       if result.failed:
301         msg = "Can't activate master IP: %s" % result.output
302         logging.error(msg)
303         err_msgs.append(msg)
304
305       # we ignore the exit code of the following cmds
306       if ipcls == netutils.IP4Address:
307         utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
308                       master_ip, master_ip])
309       elif ipcls == netutils.IP6Address:
310         try:
311           utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
312         except errors.OpExecError:
313           # TODO: Better error reporting
314           logging.warning("Can't execute ndisc6, please install if missing")
315
316   if err_msgs:
317     _Fail("; ".join(err_msgs))
318
319
320 def StopMaster(stop_daemons):
321   """Deactivate this node as master.
322
323   The function will always try to deactivate the IP address of the
324   master. It will also stop the master daemons depending on the
325   stop_daemons parameter.
326
327   @type stop_daemons: boolean
328   @param stop_daemons: whether to also stop the master daemons
329       (ganeti-masterd and ganeti-rapi)
330   @rtype: None
331
332   """
333   # TODO: log and report back to the caller the error failures; we
334   # need to decide in which case we fail the RPC for this
335
336   # GetMasterInfo will raise an exception if not able to return data
337   master_netdev, master_ip, _, family = GetMasterInfo()
338
339   ipcls = netutils.IP4Address
340   if family == netutils.IP6Address.family:
341     ipcls = netutils.IP6Address
342
343   result = utils.RunCmd(["ip", "address", "del",
344                          "%s/%d" % (master_ip, ipcls.iplen),
345                          "dev", master_netdev])
346   if result.failed:
347     logging.error("Can't remove the master IP, error: %s", result.output)
348     # but otherwise ignore the failure
349
350   if stop_daemons:
351     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
352     if result.failed:
353       logging.error("Could not stop Ganeti master, command %s had exitcode %s"
354                     " and error %s",
355                     result.cmd, result.exit_code, result.output)
356
357
358 def EtcHostsModify(mode, host, ip):
359   """Modify a host entry in /etc/hosts.
360
361   @param mode: The mode to operate. Either add or remove entry
362   @param host: The host to operate on
363   @param ip: The ip associated with the entry
364
365   """
366   if mode == constants.ETC_HOSTS_ADD:
367     if not ip:
368       RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
369               " present")
370     utils.AddHostToEtcHosts(host, ip)
371   elif mode == constants.ETC_HOSTS_REMOVE:
372     if ip:
373       RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
374               " parameter is present")
375     utils.RemoveHostFromEtcHosts(host)
376   else:
377     RPCFail("Mode not supported")
378
379
380 def LeaveCluster(modify_ssh_setup):
381   """Cleans up and remove the current node.
382
383   This function cleans up and prepares the current node to be removed
384   from the cluster.
385
386   If processing is successful, then it raises an
387   L{errors.QuitGanetiException} which is used as a special case to
388   shutdown the node daemon.
389
390   @param modify_ssh_setup: boolean
391
392   """
393   _CleanDirectory(constants.DATA_DIR)
394   _CleanDirectory(constants.CRYPTO_KEYS_DIR)
395   JobQueuePurge()
396
397   if modify_ssh_setup:
398     try:
399       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
400
401       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
402
403       utils.RemoveFile(priv_key)
404       utils.RemoveFile(pub_key)
405     except errors.OpExecError:
406       logging.exception("Error while processing ssh files")
407
408   try:
409     utils.RemoveFile(constants.CONFD_HMAC_KEY)
410     utils.RemoveFile(constants.RAPI_CERT_FILE)
411     utils.RemoveFile(constants.NODED_CERT_FILE)
412   except: # pylint: disable-msg=W0702
413     logging.exception("Error while removing cluster secrets")
414
415   result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
416   if result.failed:
417     logging.error("Command %s failed with exitcode %s and error %s",
418                   result.cmd, result.exit_code, result.output)
419
420   # Raise a custom exception (handled in ganeti-noded)
421   raise errors.QuitGanetiException(True, 'Shutdown scheduled')
422
423
424 def GetNodeInfo(vgname, hypervisor_type):
425   """Gives back a hash with different information about the node.
426
427   @type vgname: C{string}
428   @param vgname: the name of the volume group to ask for disk space information
429   @type hypervisor_type: C{str}
430   @param hypervisor_type: the name of the hypervisor to ask for
431       memory information
432   @rtype: C{dict}
433   @return: dictionary with the following keys:
434       - vg_size is the size of the configured volume group in MiB
435       - vg_free is the free size of the volume group in MiB
436       - memory_dom0 is the memory allocated for domain0 in MiB
437       - memory_free is the currently available (free) ram in MiB
438       - memory_total is the total number of ram in MiB
439
440   """
441   outputarray = {}
442
443   vginfo = bdev.LogicalVolume.GetVGInfo([vgname])
444   vg_free = vg_size = None
445   if vginfo:
446     vg_free = int(round(vginfo[0][0], 0))
447     vg_size = int(round(vginfo[0][1], 0))
448
449   outputarray['vg_size'] = vg_size
450   outputarray['vg_free'] = vg_free
451
452   hyper = hypervisor.GetHypervisor(hypervisor_type)
453   hyp_info = hyper.GetNodeInfo()
454   if hyp_info is not None:
455     outputarray.update(hyp_info)
456
457   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
458
459   return outputarray
460
461
462 def VerifyNode(what, cluster_name):
463   """Verify the status of the local node.
464
465   Based on the input L{what} parameter, various checks are done on the
466   local node.
467
468   If the I{filelist} key is present, this list of
469   files is checksummed and the file/checksum pairs are returned.
470
471   If the I{nodelist} key is present, we check that we have
472   connectivity via ssh with the target nodes (and check the hostname
473   report).
474
475   If the I{node-net-test} key is present, we check that we have
476   connectivity to the given nodes via both primary IP and, if
477   applicable, secondary IPs.
478
479   @type what: C{dict}
480   @param what: a dictionary of things to check:
481       - filelist: list of files for which to compute checksums
482       - nodelist: list of nodes we should check ssh communication with
483       - node-net-test: list of nodes we should check node daemon port
484         connectivity with
485       - hypervisor: list with hypervisors to run the verify for
486   @rtype: dict
487   @return: a dictionary with the same keys as the input dict, and
488       values representing the result of the checks
489
490   """
491   result = {}
492   my_name = netutils.Hostname.GetSysName()
493   port = netutils.GetDaemonPort(constants.NODED)
494   vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
495
496   if constants.NV_HYPERVISOR in what and vm_capable:
497     result[constants.NV_HYPERVISOR] = tmp = {}
498     for hv_name in what[constants.NV_HYPERVISOR]:
499       try:
500         val = hypervisor.GetHypervisor(hv_name).Verify()
501       except errors.HypervisorError, err:
502         val = "Error while checking hypervisor: %s" % str(err)
503       tmp[hv_name] = val
504
505   if constants.NV_FILELIST in what:
506     result[constants.NV_FILELIST] = utils.FingerprintFiles(
507       what[constants.NV_FILELIST])
508
509   if constants.NV_NODELIST in what:
510     result[constants.NV_NODELIST] = tmp = {}
511     random.shuffle(what[constants.NV_NODELIST])
512     for node in what[constants.NV_NODELIST]:
513       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
514       if not success:
515         tmp[node] = message
516
517   if constants.NV_NODENETTEST in what:
518     result[constants.NV_NODENETTEST] = tmp = {}
519     my_pip = my_sip = None
520     for name, pip, sip in what[constants.NV_NODENETTEST]:
521       if name == my_name:
522         my_pip = pip
523         my_sip = sip
524         break
525     if not my_pip:
526       tmp[my_name] = ("Can't find my own primary/secondary IP"
527                       " in the node list")
528     else:
529       for name, pip, sip in what[constants.NV_NODENETTEST]:
530         fail = []
531         if not netutils.TcpPing(pip, port, source=my_pip):
532           fail.append("primary")
533         if sip != pip:
534           if not netutils.TcpPing(sip, port, source=my_sip):
535             fail.append("secondary")
536         if fail:
537           tmp[name] = ("failure using the %s interface(s)" %
538                        " and ".join(fail))
539
540   if constants.NV_MASTERIP in what:
541     # FIXME: add checks on incoming data structures (here and in the
542     # rest of the function)
543     master_name, master_ip = what[constants.NV_MASTERIP]
544     if master_name == my_name:
545       source = constants.IP4_ADDRESS_LOCALHOST
546     else:
547       source = None
548     result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
549                                                   source=source)
550
551   if constants.NV_LVLIST in what and vm_capable:
552     try:
553       val = GetVolumeList(what[constants.NV_LVLIST])
554     except RPCFail, err:
555       val = str(err)
556     result[constants.NV_LVLIST] = val
557
558   if constants.NV_INSTANCELIST in what and vm_capable:
559     # GetInstanceList can fail
560     try:
561       val = GetInstanceList(what[constants.NV_INSTANCELIST])
562     except RPCFail, err:
563       val = str(err)
564     result[constants.NV_INSTANCELIST] = val
565
566   if constants.NV_VGLIST in what and vm_capable:
567     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
568
569   if constants.NV_PVLIST in what and vm_capable:
570     result[constants.NV_PVLIST] = \
571       bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
572                                    filter_allocatable=False)
573
574   if constants.NV_VERSION in what:
575     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
576                                     constants.RELEASE_VERSION)
577
578   if constants.NV_HVINFO in what and vm_capable:
579     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
580     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
581
582   if constants.NV_DRBDLIST in what and vm_capable:
583     try:
584       used_minors = bdev.DRBD8.GetUsedDevs().keys()
585     except errors.BlockDeviceError, err:
586       logging.warning("Can't get used minors list", exc_info=True)
587       used_minors = str(err)
588     result[constants.NV_DRBDLIST] = used_minors
589
590   if constants.NV_DRBDHELPER in what and vm_capable:
591     status = True
592     try:
593       payload = bdev.BaseDRBD.GetUsermodeHelper()
594     except errors.BlockDeviceError, err:
595       logging.error("Can't get DRBD usermode helper: %s", str(err))
596       status = False
597       payload = str(err)
598     result[constants.NV_DRBDHELPER] = (status, payload)
599
600   if constants.NV_NODESETUP in what:
601     result[constants.NV_NODESETUP] = tmpr = []
602     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
603       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
604                   " under /sys, missing required directories /sys/block"
605                   " and /sys/class/net")
606     if (not os.path.isdir("/proc/sys") or
607         not os.path.isfile("/proc/sysrq-trigger")):
608       tmpr.append("The procfs filesystem doesn't seem to be mounted"
609                   " under /proc, missing required directory /proc/sys and"
610                   " the file /proc/sysrq-trigger")
611
612   if constants.NV_TIME in what:
613     result[constants.NV_TIME] = utils.SplitTime(time.time())
614
615   if constants.NV_OSLIST in what and vm_capable:
616     result[constants.NV_OSLIST] = DiagnoseOS()
617
618   return result
619
620
621 def GetVolumeList(vg_name):
622   """Compute list of logical volumes and their size.
623
624   @type vg_name: str
625   @param vg_name: the volume group whose LVs we should list
626   @rtype: dict
627   @return:
628       dictionary of all partions (key) with value being a tuple of
629       their size (in MiB), inactive and online status::
630
631         {'test1': ('20.06', True, True)}
632
633       in case of errors, a string is returned with the error
634       details.
635
636   """
637   lvs = {}
638   sep = '|'
639   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
640                          "--separator=%s" % sep,
641                          "-olv_name,lv_size,lv_attr", vg_name])
642   if result.failed:
643     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
644
645   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
646   for line in result.stdout.splitlines():
647     line = line.strip()
648     match = valid_line_re.match(line)
649     if not match:
650       logging.error("Invalid line returned from lvs output: '%s'", line)
651       continue
652     name, size, attr = match.groups()
653     inactive = attr[4] == '-'
654     online = attr[5] == 'o'
655     virtual = attr[0] == 'v'
656     if virtual:
657       # we don't want to report such volumes as existing, since they
658       # don't really hold data
659       continue
660     lvs[name] = (size, inactive, online)
661
662   return lvs
663
664
665 def ListVolumeGroups():
666   """List the volume groups and their size.
667
668   @rtype: dict
669   @return: dictionary with keys volume name and values the
670       size of the volume
671
672   """
673   return utils.ListVolumeGroups()
674
675
676 def NodeVolumes():
677   """List all volumes on this node.
678
679   @rtype: list
680   @return:
681     A list of dictionaries, each having four keys:
682       - name: the logical volume name,
683       - size: the size of the logical volume
684       - dev: the physical device on which the LV lives
685       - vg: the volume group to which it belongs
686
687     In case of errors, we return an empty list and log the
688     error.
689
690     Note that since a logical volume can live on multiple physical
691     volumes, the resulting list might include a logical volume
692     multiple times.
693
694   """
695   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
696                          "--separator=|",
697                          "--options=lv_name,lv_size,devices,vg_name"])
698   if result.failed:
699     _Fail("Failed to list logical volumes, lvs output: %s",
700           result.output)
701
702   def parse_dev(dev):
703     return dev.split('(')[0]
704
705   def handle_dev(dev):
706     return [parse_dev(x) for x in dev.split(",")]
707
708   def map_line(line):
709     line = [v.strip() for v in line]
710     return [{'name': line[0], 'size': line[1],
711              'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
712
713   all_devs = []
714   for line in result.stdout.splitlines():
715     if line.count('|') >= 3:
716       all_devs.extend(map_line(line.split('|')))
717     else:
718       logging.warning("Strange line in the output from lvs: '%s'", line)
719   return all_devs
720
721
722 def BridgesExist(bridges_list):
723   """Check if a list of bridges exist on the current node.
724
725   @rtype: boolean
726   @return: C{True} if all of them exist, C{False} otherwise
727
728   """
729   missing = []
730   for bridge in bridges_list:
731     if not utils.BridgeExists(bridge):
732       missing.append(bridge)
733
734   if missing:
735     _Fail("Missing bridges %s", utils.CommaJoin(missing))
736
737
738 def GetInstanceList(hypervisor_list):
739   """Provides a list of instances.
740
741   @type hypervisor_list: list
742   @param hypervisor_list: the list of hypervisors to query information
743
744   @rtype: list
745   @return: a list of all running instances on the current node
746     - instance1.example.com
747     - instance2.example.com
748
749   """
750   results = []
751   for hname in hypervisor_list:
752     try:
753       names = hypervisor.GetHypervisor(hname).ListInstances()
754       results.extend(names)
755     except errors.HypervisorError, err:
756       _Fail("Error enumerating instances (hypervisor %s): %s",
757             hname, err, exc=True)
758
759   return results
760
761
762 def GetInstanceInfo(instance, hname):
763   """Gives back the information about an instance as a dictionary.
764
765   @type instance: string
766   @param instance: the instance name
767   @type hname: string
768   @param hname: the hypervisor type of the instance
769
770   @rtype: dict
771   @return: dictionary with the following keys:
772       - memory: memory size of instance (int)
773       - state: xen state of instance (string)
774       - time: cpu time of instance (float)
775
776   """
777   output = {}
778
779   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
780   if iinfo is not None:
781     output['memory'] = iinfo[2]
782     output['state'] = iinfo[4]
783     output['time'] = iinfo[5]
784
785   return output
786
787
788 def GetInstanceMigratable(instance):
789   """Gives whether an instance can be migrated.
790
791   @type instance: L{objects.Instance}
792   @param instance: object representing the instance to be checked.
793
794   @rtype: tuple
795   @return: tuple of (result, description) where:
796       - result: whether the instance can be migrated or not
797       - description: a description of the issue, if relevant
798
799   """
800   hyper = hypervisor.GetHypervisor(instance.hypervisor)
801   iname = instance.name
802   if iname not in hyper.ListInstances():
803     _Fail("Instance %s is not running", iname)
804
805   for idx in range(len(instance.disks)):
806     link_name = _GetBlockDevSymlinkPath(iname, idx)
807     if not os.path.islink(link_name):
808       logging.warning("Instance %s is missing symlink %s for disk %d",
809                       iname, link_name, idx)
810
811
812 def GetAllInstancesInfo(hypervisor_list):
813   """Gather data about all instances.
814
815   This is the equivalent of L{GetInstanceInfo}, except that it
816   computes data for all instances at once, thus being faster if one
817   needs data about more than one instance.
818
819   @type hypervisor_list: list
820   @param hypervisor_list: list of hypervisors to query for instance data
821
822   @rtype: dict
823   @return: dictionary of instance: data, with data having the following keys:
824       - memory: memory size of instance (int)
825       - state: xen state of instance (string)
826       - time: cpu time of instance (float)
827       - vcpus: the number of vcpus
828
829   """
830   output = {}
831
832   for hname in hypervisor_list:
833     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
834     if iinfo:
835       for name, _, memory, vcpus, state, times in iinfo:
836         value = {
837           'memory': memory,
838           'vcpus': vcpus,
839           'state': state,
840           'time': times,
841           }
842         if name in output:
843           # we only check static parameters, like memory and vcpus,
844           # and not state and time which can change between the
845           # invocations of the different hypervisors
846           for key in 'memory', 'vcpus':
847             if value[key] != output[name][key]:
848               _Fail("Instance %s is running twice"
849                     " with different parameters", name)
850         output[name] = value
851
852   return output
853
854
855 def _InstanceLogName(kind, os_name, instance):
856   """Compute the OS log filename for a given instance and operation.
857
858   The instance name and os name are passed in as strings since not all
859   operations have these as part of an instance object.
860
861   @type kind: string
862   @param kind: the operation type (e.g. add, import, etc.)
863   @type os_name: string
864   @param os_name: the os name
865   @type instance: string
866   @param instance: the name of the instance being imported/added/etc.
867
868   """
869   # TODO: Use tempfile.mkstemp to create unique filename
870   base = ("%s-%s-%s-%s.log" %
871           (kind, os_name, instance, utils.TimestampForFilename()))
872   return utils.PathJoin(constants.LOG_OS_DIR, base)
873
874
875 def InstanceOsAdd(instance, reinstall, debug):
876   """Add an OS to an instance.
877
878   @type instance: L{objects.Instance}
879   @param instance: Instance whose OS is to be installed
880   @type reinstall: boolean
881   @param reinstall: whether this is an instance reinstall
882   @type debug: integer
883   @param debug: debug level, passed to the OS scripts
884   @rtype: None
885
886   """
887   inst_os = OSFromDisk(instance.os)
888
889   create_env = OSEnvironment(instance, inst_os, debug)
890   if reinstall:
891     create_env['INSTANCE_REINSTALL'] = "1"
892
893   logfile = _InstanceLogName("add", instance.os, instance.name)
894
895   result = utils.RunCmd([inst_os.create_script], env=create_env,
896                         cwd=inst_os.path, output=logfile,)
897   if result.failed:
898     logging.error("os create command '%s' returned error: %s, logfile: %s,"
899                   " output: %s", result.cmd, result.fail_reason, logfile,
900                   result.output)
901     lines = [utils.SafeEncode(val)
902              for val in utils.TailFile(logfile, lines=20)]
903     _Fail("OS create script failed (%s), last lines in the"
904           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
905
906
907 def RunRenameInstance(instance, old_name, debug):
908   """Run the OS rename script for an instance.
909
910   @type instance: L{objects.Instance}
911   @param instance: Instance whose OS is to be installed
912   @type old_name: string
913   @param old_name: previous instance name
914   @type debug: integer
915   @param debug: debug level, passed to the OS scripts
916   @rtype: boolean
917   @return: the success of the operation
918
919   """
920   inst_os = OSFromDisk(instance.os)
921
922   rename_env = OSEnvironment(instance, inst_os, debug)
923   rename_env['OLD_INSTANCE_NAME'] = old_name
924
925   logfile = _InstanceLogName("rename", instance.os,
926                              "%s-%s" % (old_name, instance.name))
927
928   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
929                         cwd=inst_os.path, output=logfile)
930
931   if result.failed:
932     logging.error("os create command '%s' returned error: %s output: %s",
933                   result.cmd, result.fail_reason, result.output)
934     lines = [utils.SafeEncode(val)
935              for val in utils.TailFile(logfile, lines=20)]
936     _Fail("OS rename script failed (%s), last lines in the"
937           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
938
939
940 def _GetBlockDevSymlinkPath(instance_name, idx):
941   return utils.PathJoin(constants.DISK_LINKS_DIR,
942                         "%s:%d" % (instance_name, idx))
943
944
945 def _SymlinkBlockDev(instance_name, device_path, idx):
946   """Set up symlinks to a instance's block device.
947
948   This is an auxiliary function run when an instance is start (on the primary
949   node) or when an instance is migrated (on the target node).
950
951
952   @param instance_name: the name of the target instance
953   @param device_path: path of the physical block device, on the node
954   @param idx: the disk index
955   @return: absolute path to the disk's symlink
956
957   """
958   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
959   try:
960     os.symlink(device_path, link_name)
961   except OSError, err:
962     if err.errno == errno.EEXIST:
963       if (not os.path.islink(link_name) or
964           os.readlink(link_name) != device_path):
965         os.remove(link_name)
966         os.symlink(device_path, link_name)
967     else:
968       raise
969
970   return link_name
971
972
973 def _RemoveBlockDevLinks(instance_name, disks):
974   """Remove the block device symlinks belonging to the given instance.
975
976   """
977   for idx, _ in enumerate(disks):
978     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
979     if os.path.islink(link_name):
980       try:
981         os.remove(link_name)
982       except OSError:
983         logging.exception("Can't remove symlink '%s'", link_name)
984
985
986 def _GatherAndLinkBlockDevs(instance):
987   """Set up an instance's block device(s).
988
989   This is run on the primary node at instance startup. The block
990   devices must be already assembled.
991
992   @type instance: L{objects.Instance}
993   @param instance: the instance whose disks we shoul assemble
994   @rtype: list
995   @return: list of (disk_object, device_path)
996
997   """
998   block_devices = []
999   for idx, disk in enumerate(instance.disks):
1000     device = _RecursiveFindBD(disk)
1001     if device is None:
1002       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1003                                     str(disk))
1004     device.Open()
1005     try:
1006       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1007     except OSError, e:
1008       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1009                                     e.strerror)
1010
1011     block_devices.append((disk, link_name))
1012
1013   return block_devices
1014
1015
1016 def StartInstance(instance):
1017   """Start an instance.
1018
1019   @type instance: L{objects.Instance}
1020   @param instance: the instance object
1021   @rtype: None
1022
1023   """
1024   running_instances = GetInstanceList([instance.hypervisor])
1025
1026   if instance.name in running_instances:
1027     logging.info("Instance %s already running, not starting", instance.name)
1028     return
1029
1030   try:
1031     block_devices = _GatherAndLinkBlockDevs(instance)
1032     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1033     hyper.StartInstance(instance, block_devices)
1034   except errors.BlockDeviceError, err:
1035     _Fail("Block device error: %s", err, exc=True)
1036   except errors.HypervisorError, err:
1037     _RemoveBlockDevLinks(instance.name, instance.disks)
1038     _Fail("Hypervisor error: %s", err, exc=True)
1039
1040
1041 def InstanceShutdown(instance, timeout):
1042   """Shut an instance down.
1043
1044   @note: this functions uses polling with a hardcoded timeout.
1045
1046   @type instance: L{objects.Instance}
1047   @param instance: the instance object
1048   @type timeout: integer
1049   @param timeout: maximum timeout for soft shutdown
1050   @rtype: None
1051
1052   """
1053   hv_name = instance.hypervisor
1054   hyper = hypervisor.GetHypervisor(hv_name)
1055   iname = instance.name
1056
1057   if instance.name not in hyper.ListInstances():
1058     logging.info("Instance %s not running, doing nothing", iname)
1059     return
1060
1061   class _TryShutdown:
1062     def __init__(self):
1063       self.tried_once = False
1064
1065     def __call__(self):
1066       if iname not in hyper.ListInstances():
1067         return
1068
1069       try:
1070         hyper.StopInstance(instance, retry=self.tried_once)
1071       except errors.HypervisorError, err:
1072         if iname not in hyper.ListInstances():
1073           # if the instance is no longer existing, consider this a
1074           # success and go to cleanup
1075           return
1076
1077         _Fail("Failed to stop instance %s: %s", iname, err)
1078
1079       self.tried_once = True
1080
1081       raise utils.RetryAgain()
1082
1083   try:
1084     utils.Retry(_TryShutdown(), 5, timeout)
1085   except utils.RetryTimeout:
1086     # the shutdown did not succeed
1087     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1088
1089     try:
1090       hyper.StopInstance(instance, force=True)
1091     except errors.HypervisorError, err:
1092       if iname in hyper.ListInstances():
1093         # only raise an error if the instance still exists, otherwise
1094         # the error could simply be "instance ... unknown"!
1095         _Fail("Failed to force stop instance %s: %s", iname, err)
1096
1097     time.sleep(1)
1098
1099     if iname in hyper.ListInstances():
1100       _Fail("Could not shutdown instance %s even by destroy", iname)
1101
1102   try:
1103     hyper.CleanupInstance(instance.name)
1104   except errors.HypervisorError, err:
1105     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1106
1107   _RemoveBlockDevLinks(iname, instance.disks)
1108
1109
1110 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1111   """Reboot an instance.
1112
1113   @type instance: L{objects.Instance}
1114   @param instance: the instance object to reboot
1115   @type reboot_type: str
1116   @param reboot_type: the type of reboot, one the following
1117     constants:
1118       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1119         instance OS, do not recreate the VM
1120       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1121         restart the VM (at the hypervisor level)
1122       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1123         not accepted here, since that mode is handled differently, in
1124         cmdlib, and translates into full stop and start of the
1125         instance (instead of a call_instance_reboot RPC)
1126   @type shutdown_timeout: integer
1127   @param shutdown_timeout: maximum timeout for soft shutdown
1128   @rtype: None
1129
1130   """
1131   running_instances = GetInstanceList([instance.hypervisor])
1132
1133   if instance.name not in running_instances:
1134     _Fail("Cannot reboot instance %s that is not running", instance.name)
1135
1136   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1137   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1138     try:
1139       hyper.RebootInstance(instance)
1140     except errors.HypervisorError, err:
1141       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1142   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1143     try:
1144       InstanceShutdown(instance, shutdown_timeout)
1145       return StartInstance(instance)
1146     except errors.HypervisorError, err:
1147       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1148   else:
1149     _Fail("Invalid reboot_type received: %s", reboot_type)
1150
1151
1152 def MigrationInfo(instance):
1153   """Gather information about an instance to be migrated.
1154
1155   @type instance: L{objects.Instance}
1156   @param instance: the instance definition
1157
1158   """
1159   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1160   try:
1161     info = hyper.MigrationInfo(instance)
1162   except errors.HypervisorError, err:
1163     _Fail("Failed to fetch migration information: %s", err, exc=True)
1164   return info
1165
1166
1167 def AcceptInstance(instance, info, target):
1168   """Prepare the node to accept an instance.
1169
1170   @type instance: L{objects.Instance}
1171   @param instance: the instance definition
1172   @type info: string/data (opaque)
1173   @param info: migration information, from the source node
1174   @type target: string
1175   @param target: target host (usually ip), on this node
1176
1177   """
1178   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1179   try:
1180     hyper.AcceptInstance(instance, info, target)
1181   except errors.HypervisorError, err:
1182     _Fail("Failed to accept instance: %s", err, exc=True)
1183
1184
1185 def FinalizeMigration(instance, info, success):
1186   """Finalize any preparation to accept an instance.
1187
1188   @type instance: L{objects.Instance}
1189   @param instance: the instance definition
1190   @type info: string/data (opaque)
1191   @param info: migration information, from the source node
1192   @type success: boolean
1193   @param success: whether the migration was a success or a failure
1194
1195   """
1196   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1197   try:
1198     hyper.FinalizeMigration(instance, info, success)
1199   except errors.HypervisorError, err:
1200     _Fail("Failed to finalize migration: %s", err, exc=True)
1201
1202
1203 def MigrateInstance(instance, target, live):
1204   """Migrates an instance to another node.
1205
1206   @type instance: L{objects.Instance}
1207   @param instance: the instance definition
1208   @type target: string
1209   @param target: the target node name
1210   @type live: boolean
1211   @param live: whether the migration should be done live or not (the
1212       interpretation of this parameter is left to the hypervisor)
1213   @rtype: tuple
1214   @return: a tuple of (success, msg) where:
1215       - succes is a boolean denoting the success/failure of the operation
1216       - msg is a string with details in case of failure
1217
1218   """
1219   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1220
1221   try:
1222     hyper.MigrateInstance(instance, target, live)
1223   except errors.HypervisorError, err:
1224     _Fail("Failed to migrate instance: %s", err, exc=True)
1225
1226
1227 def BlockdevCreate(disk, size, owner, on_primary, info):
1228   """Creates a block device for an instance.
1229
1230   @type disk: L{objects.Disk}
1231   @param disk: the object describing the disk we should create
1232   @type size: int
1233   @param size: the size of the physical underlying device, in MiB
1234   @type owner: str
1235   @param owner: the name of the instance for which disk is created,
1236       used for device cache data
1237   @type on_primary: boolean
1238   @param on_primary:  indicates if it is the primary node or not
1239   @type info: string
1240   @param info: string that will be sent to the physical device
1241       creation, used for example to set (LVM) tags on LVs
1242
1243   @return: the new unique_id of the device (this can sometime be
1244       computed only after creation), or None. On secondary nodes,
1245       it's not required to return anything.
1246
1247   """
1248   # TODO: remove the obsolete 'size' argument
1249   # pylint: disable-msg=W0613
1250   clist = []
1251   if disk.children:
1252     for child in disk.children:
1253       try:
1254         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1255       except errors.BlockDeviceError, err:
1256         _Fail("Can't assemble device %s: %s", child, err)
1257       if on_primary or disk.AssembleOnSecondary():
1258         # we need the children open in case the device itself has to
1259         # be assembled
1260         try:
1261           # pylint: disable-msg=E1103
1262           crdev.Open()
1263         except errors.BlockDeviceError, err:
1264           _Fail("Can't make child '%s' read-write: %s", child, err)
1265       clist.append(crdev)
1266
1267   try:
1268     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1269   except errors.BlockDeviceError, err:
1270     _Fail("Can't create block device: %s", err)
1271
1272   if on_primary or disk.AssembleOnSecondary():
1273     try:
1274       device.Assemble()
1275     except errors.BlockDeviceError, err:
1276       _Fail("Can't assemble device after creation, unusual event: %s", err)
1277     device.SetSyncSpeed(constants.SYNC_SPEED)
1278     if on_primary or disk.OpenOnSecondary():
1279       try:
1280         device.Open(force=True)
1281       except errors.BlockDeviceError, err:
1282         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1283     DevCacheManager.UpdateCache(device.dev_path, owner,
1284                                 on_primary, disk.iv_name)
1285
1286   device.SetInfo(info)
1287
1288   return device.unique_id
1289
1290
1291 def _WipeDevice(path, offset, size):
1292   """This function actually wipes the device.
1293
1294   @param path: The path to the device to wipe
1295   @param offset: The offset in MiB in the file
1296   @param size: The size in MiB to write
1297
1298   """
1299   cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1300          "bs=%d" % constants.WIPE_BLOCK_SIZE, "oflag=direct", "of=%s" % path,
1301          "count=%d" % size]
1302   result = utils.RunCmd(cmd)
1303
1304   if result.failed:
1305     _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1306           result.fail_reason, result.output)
1307
1308
1309 def BlockdevWipe(disk, offset, size):
1310   """Wipes a block device.
1311
1312   @type disk: L{objects.Disk}
1313   @param disk: the disk object we want to wipe
1314   @type offset: int
1315   @param offset: The offset in MiB in the file
1316   @type size: int
1317   @param size: The size in MiB to write
1318
1319   """
1320   try:
1321     rdev = _RecursiveFindBD(disk)
1322   except errors.BlockDeviceError:
1323     rdev = None
1324
1325   if not rdev:
1326     _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1327
1328   # Do cross verify some of the parameters
1329   if offset > rdev.size:
1330     _Fail("Offset is bigger than device size")
1331   if (offset + size) > rdev.size:
1332     _Fail("The provided offset and size to wipe is bigger than device size")
1333
1334   _WipeDevice(rdev.dev_path, offset, size)
1335
1336
1337 def BlockdevRemove(disk):
1338   """Remove a block device.
1339
1340   @note: This is intended to be called recursively.
1341
1342   @type disk: L{objects.Disk}
1343   @param disk: the disk object we should remove
1344   @rtype: boolean
1345   @return: the success of the operation
1346
1347   """
1348   msgs = []
1349   try:
1350     rdev = _RecursiveFindBD(disk)
1351   except errors.BlockDeviceError, err:
1352     # probably can't attach
1353     logging.info("Can't attach to device %s in remove", disk)
1354     rdev = None
1355   if rdev is not None:
1356     r_path = rdev.dev_path
1357     try:
1358       rdev.Remove()
1359     except errors.BlockDeviceError, err:
1360       msgs.append(str(err))
1361     if not msgs:
1362       DevCacheManager.RemoveCache(r_path)
1363
1364   if disk.children:
1365     for child in disk.children:
1366       try:
1367         BlockdevRemove(child)
1368       except RPCFail, err:
1369         msgs.append(str(err))
1370
1371   if msgs:
1372     _Fail("; ".join(msgs))
1373
1374
1375 def _RecursiveAssembleBD(disk, owner, as_primary):
1376   """Activate a block device for an instance.
1377
1378   This is run on the primary and secondary nodes for an instance.
1379
1380   @note: this function is called recursively.
1381
1382   @type disk: L{objects.Disk}
1383   @param disk: the disk we try to assemble
1384   @type owner: str
1385   @param owner: the name of the instance which owns the disk
1386   @type as_primary: boolean
1387   @param as_primary: if we should make the block device
1388       read/write
1389
1390   @return: the assembled device or None (in case no device
1391       was assembled)
1392   @raise errors.BlockDeviceError: in case there is an error
1393       during the activation of the children or the device
1394       itself
1395
1396   """
1397   children = []
1398   if disk.children:
1399     mcn = disk.ChildrenNeeded()
1400     if mcn == -1:
1401       mcn = 0 # max number of Nones allowed
1402     else:
1403       mcn = len(disk.children) - mcn # max number of Nones
1404     for chld_disk in disk.children:
1405       try:
1406         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1407       except errors.BlockDeviceError, err:
1408         if children.count(None) >= mcn:
1409           raise
1410         cdev = None
1411         logging.error("Error in child activation (but continuing): %s",
1412                       str(err))
1413       children.append(cdev)
1414
1415   if as_primary or disk.AssembleOnSecondary():
1416     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1417     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1418     result = r_dev
1419     if as_primary or disk.OpenOnSecondary():
1420       r_dev.Open()
1421     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1422                                 as_primary, disk.iv_name)
1423
1424   else:
1425     result = True
1426   return result
1427
1428
1429 def BlockdevAssemble(disk, owner, as_primary):
1430   """Activate a block device for an instance.
1431
1432   This is a wrapper over _RecursiveAssembleBD.
1433
1434   @rtype: str or boolean
1435   @return: a C{/dev/...} path for primary nodes, and
1436       C{True} for secondary nodes
1437
1438   """
1439   try:
1440     result = _RecursiveAssembleBD(disk, owner, as_primary)
1441     if isinstance(result, bdev.BlockDev):
1442       # pylint: disable-msg=E1103
1443       result = result.dev_path
1444   except errors.BlockDeviceError, err:
1445     _Fail("Error while assembling disk: %s", err, exc=True)
1446
1447   return result
1448
1449
1450 def BlockdevShutdown(disk):
1451   """Shut down a block device.
1452
1453   First, if the device is assembled (Attach() is successful), then
1454   the device is shutdown. Then the children of the device are
1455   shutdown.
1456
1457   This function is called recursively. Note that we don't cache the
1458   children or such, as oppossed to assemble, shutdown of different
1459   devices doesn't require that the upper device was active.
1460
1461   @type disk: L{objects.Disk}
1462   @param disk: the description of the disk we should
1463       shutdown
1464   @rtype: None
1465
1466   """
1467   msgs = []
1468   r_dev = _RecursiveFindBD(disk)
1469   if r_dev is not None:
1470     r_path = r_dev.dev_path
1471     try:
1472       r_dev.Shutdown()
1473       DevCacheManager.RemoveCache(r_path)
1474     except errors.BlockDeviceError, err:
1475       msgs.append(str(err))
1476
1477   if disk.children:
1478     for child in disk.children:
1479       try:
1480         BlockdevShutdown(child)
1481       except RPCFail, err:
1482         msgs.append(str(err))
1483
1484   if msgs:
1485     _Fail("; ".join(msgs))
1486
1487
1488 def BlockdevAddchildren(parent_cdev, new_cdevs):
1489   """Extend a mirrored block device.
1490
1491   @type parent_cdev: L{objects.Disk}
1492   @param parent_cdev: the disk to which we should add children
1493   @type new_cdevs: list of L{objects.Disk}
1494   @param new_cdevs: the list of children which we should add
1495   @rtype: None
1496
1497   """
1498   parent_bdev = _RecursiveFindBD(parent_cdev)
1499   if parent_bdev is None:
1500     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1501   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1502   if new_bdevs.count(None) > 0:
1503     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1504   parent_bdev.AddChildren(new_bdevs)
1505
1506
1507 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1508   """Shrink a mirrored block device.
1509
1510   @type parent_cdev: L{objects.Disk}
1511   @param parent_cdev: the disk from which we should remove children
1512   @type new_cdevs: list of L{objects.Disk}
1513   @param new_cdevs: the list of children which we should remove
1514   @rtype: None
1515
1516   """
1517   parent_bdev = _RecursiveFindBD(parent_cdev)
1518   if parent_bdev is None:
1519     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1520   devs = []
1521   for disk in new_cdevs:
1522     rpath = disk.StaticDevPath()
1523     if rpath is None:
1524       bd = _RecursiveFindBD(disk)
1525       if bd is None:
1526         _Fail("Can't find device %s while removing children", disk)
1527       else:
1528         devs.append(bd.dev_path)
1529     else:
1530       if not utils.IsNormAbsPath(rpath):
1531         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1532       devs.append(rpath)
1533   parent_bdev.RemoveChildren(devs)
1534
1535
1536 def BlockdevGetmirrorstatus(disks):
1537   """Get the mirroring status of a list of devices.
1538
1539   @type disks: list of L{objects.Disk}
1540   @param disks: the list of disks which we should query
1541   @rtype: disk
1542   @return: List of L{objects.BlockDevStatus}, one for each disk
1543   @raise errors.BlockDeviceError: if any of the disks cannot be
1544       found
1545
1546   """
1547   stats = []
1548   for dsk in disks:
1549     rbd = _RecursiveFindBD(dsk)
1550     if rbd is None:
1551       _Fail("Can't find device %s", dsk)
1552
1553     stats.append(rbd.CombinedSyncStatus())
1554
1555   return stats
1556
1557
1558 def BlockdevGetmirrorstatusMulti(disks):
1559   """Get the mirroring status of a list of devices.
1560
1561   @type disks: list of L{objects.Disk}
1562   @param disks: the list of disks which we should query
1563   @rtype: disk
1564   @return: List of tuples, (bool, status), one for each disk; bool denotes
1565     success/failure, status is L{objects.BlockDevStatus} on success, string
1566     otherwise
1567
1568   """
1569   result = []
1570   for disk in disks:
1571     try:
1572       rbd = _RecursiveFindBD(disk)
1573       if rbd is None:
1574         result.append((False, "Can't find device %s" % disk))
1575         continue
1576
1577       status = rbd.CombinedSyncStatus()
1578     except errors.BlockDeviceError, err:
1579       logging.exception("Error while getting disk status")
1580       result.append((False, str(err)))
1581     else:
1582       result.append((True, status))
1583
1584   assert len(disks) == len(result)
1585
1586   return result
1587
1588
1589 def _RecursiveFindBD(disk):
1590   """Check if a device is activated.
1591
1592   If so, return information about the real device.
1593
1594   @type disk: L{objects.Disk}
1595   @param disk: the disk object we need to find
1596
1597   @return: None if the device can't be found,
1598       otherwise the device instance
1599
1600   """
1601   children = []
1602   if disk.children:
1603     for chdisk in disk.children:
1604       children.append(_RecursiveFindBD(chdisk))
1605
1606   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1607
1608
1609 def _OpenRealBD(disk):
1610   """Opens the underlying block device of a disk.
1611
1612   @type disk: L{objects.Disk}
1613   @param disk: the disk object we want to open
1614
1615   """
1616   real_disk = _RecursiveFindBD(disk)
1617   if real_disk is None:
1618     _Fail("Block device '%s' is not set up", disk)
1619
1620   real_disk.Open()
1621
1622   return real_disk
1623
1624
1625 def BlockdevFind(disk):
1626   """Check if a device is activated.
1627
1628   If it is, return information about the real device.
1629
1630   @type disk: L{objects.Disk}
1631   @param disk: the disk to find
1632   @rtype: None or objects.BlockDevStatus
1633   @return: None if the disk cannot be found, otherwise a the current
1634            information
1635
1636   """
1637   try:
1638     rbd = _RecursiveFindBD(disk)
1639   except errors.BlockDeviceError, err:
1640     _Fail("Failed to find device: %s", err, exc=True)
1641
1642   if rbd is None:
1643     return None
1644
1645   return rbd.GetSyncStatus()
1646
1647
1648 def BlockdevGetsize(disks):
1649   """Computes the size of the given disks.
1650
1651   If a disk is not found, returns None instead.
1652
1653   @type disks: list of L{objects.Disk}
1654   @param disks: the list of disk to compute the size for
1655   @rtype: list
1656   @return: list with elements None if the disk cannot be found,
1657       otherwise the size
1658
1659   """
1660   result = []
1661   for cf in disks:
1662     try:
1663       rbd = _RecursiveFindBD(cf)
1664     except errors.BlockDeviceError:
1665       result.append(None)
1666       continue
1667     if rbd is None:
1668       result.append(None)
1669     else:
1670       result.append(rbd.GetActualSize())
1671   return result
1672
1673
1674 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1675   """Export a block device to a remote node.
1676
1677   @type disk: L{objects.Disk}
1678   @param disk: the description of the disk to export
1679   @type dest_node: str
1680   @param dest_node: the destination node to export to
1681   @type dest_path: str
1682   @param dest_path: the destination path on the target node
1683   @type cluster_name: str
1684   @param cluster_name: the cluster name, needed for SSH hostalias
1685   @rtype: None
1686
1687   """
1688   real_disk = _OpenRealBD(disk)
1689
1690   # the block size on the read dd is 1MiB to match our units
1691   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1692                                "dd if=%s bs=1048576 count=%s",
1693                                real_disk.dev_path, str(disk.size))
1694
1695   # we set here a smaller block size as, due to ssh buffering, more
1696   # than 64-128k will mostly ignored; we use nocreat to fail if the
1697   # device is not already there or we pass a wrong path; we use
1698   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1699   # to not buffer too much memory; this means that at best, we flush
1700   # every 64k, which will not be very fast
1701   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1702                                 " oflag=dsync", dest_path)
1703
1704   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1705                                                    constants.GANETI_RUNAS,
1706                                                    destcmd)
1707
1708   # all commands have been checked, so we're safe to combine them
1709   command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1710
1711   result = utils.RunCmd(["bash", "-c", command])
1712
1713   if result.failed:
1714     _Fail("Disk copy command '%s' returned error: %s"
1715           " output: %s", command, result.fail_reason, result.output)
1716
1717
1718 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1719   """Write a file to the filesystem.
1720
1721   This allows the master to overwrite(!) a file. It will only perform
1722   the operation if the file belongs to a list of configuration files.
1723
1724   @type file_name: str
1725   @param file_name: the target file name
1726   @type data: str
1727   @param data: the new contents of the file
1728   @type mode: int
1729   @param mode: the mode to give the file (can be None)
1730   @type uid: int
1731   @param uid: the owner of the file (can be -1 for default)
1732   @type gid: int
1733   @param gid: the group of the file (can be -1 for default)
1734   @type atime: float
1735   @param atime: the atime to set on the file (can be None)
1736   @type mtime: float
1737   @param mtime: the mtime to set on the file (can be None)
1738   @rtype: None
1739
1740   """
1741   if not os.path.isabs(file_name):
1742     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1743
1744   if file_name not in _ALLOWED_UPLOAD_FILES:
1745     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1746           file_name)
1747
1748   raw_data = _Decompress(data)
1749
1750   utils.SafeWriteFile(file_name, None,
1751                       data=raw_data, mode=mode, uid=uid, gid=gid,
1752                       atime=atime, mtime=mtime)
1753
1754
1755 def WriteSsconfFiles(values):
1756   """Update all ssconf files.
1757
1758   Wrapper around the SimpleStore.WriteFiles.
1759
1760   """
1761   ssconf.SimpleStore().WriteFiles(values)
1762
1763
1764 def _ErrnoOrStr(err):
1765   """Format an EnvironmentError exception.
1766
1767   If the L{err} argument has an errno attribute, it will be looked up
1768   and converted into a textual C{E...} description. Otherwise the
1769   string representation of the error will be returned.
1770
1771   @type err: L{EnvironmentError}
1772   @param err: the exception to format
1773
1774   """
1775   if hasattr(err, 'errno'):
1776     detail = errno.errorcode[err.errno]
1777   else:
1778     detail = str(err)
1779   return detail
1780
1781
1782 def _OSOndiskAPIVersion(os_dir):
1783   """Compute and return the API version of a given OS.
1784
1785   This function will try to read the API version of the OS residing in
1786   the 'os_dir' directory.
1787
1788   @type os_dir: str
1789   @param os_dir: the directory in which we should look for the OS
1790   @rtype: tuple
1791   @return: tuple (status, data) with status denoting the validity and
1792       data holding either the vaid versions or an error message
1793
1794   """
1795   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1796
1797   try:
1798     st = os.stat(api_file)
1799   except EnvironmentError, err:
1800     return False, ("Required file '%s' not found under path %s: %s" %
1801                    (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1802
1803   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1804     return False, ("File '%s' in %s is not a regular file" %
1805                    (constants.OS_API_FILE, os_dir))
1806
1807   try:
1808     api_versions = utils.ReadFile(api_file).splitlines()
1809   except EnvironmentError, err:
1810     return False, ("Error while reading the API version file at %s: %s" %
1811                    (api_file, _ErrnoOrStr(err)))
1812
1813   try:
1814     api_versions = [int(version.strip()) for version in api_versions]
1815   except (TypeError, ValueError), err:
1816     return False, ("API version(s) can't be converted to integer: %s" %
1817                    str(err))
1818
1819   return True, api_versions
1820
1821
1822 def DiagnoseOS(top_dirs=None):
1823   """Compute the validity for all OSes.
1824
1825   @type top_dirs: list
1826   @param top_dirs: the list of directories in which to
1827       search (if not given defaults to
1828       L{constants.OS_SEARCH_PATH})
1829   @rtype: list of L{objects.OS}
1830   @return: a list of tuples (name, path, status, diagnose, variants,
1831       parameters, api_version) for all (potential) OSes under all
1832       search paths, where:
1833           - name is the (potential) OS name
1834           - path is the full path to the OS
1835           - status True/False is the validity of the OS
1836           - diagnose is the error message for an invalid OS, otherwise empty
1837           - variants is a list of supported OS variants, if any
1838           - parameters is a list of (name, help) parameters, if any
1839           - api_version is a list of support OS API versions
1840
1841   """
1842   if top_dirs is None:
1843     top_dirs = constants.OS_SEARCH_PATH
1844
1845   result = []
1846   for dir_name in top_dirs:
1847     if os.path.isdir(dir_name):
1848       try:
1849         f_names = utils.ListVisibleFiles(dir_name)
1850       except EnvironmentError, err:
1851         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1852         break
1853       for name in f_names:
1854         os_path = utils.PathJoin(dir_name, name)
1855         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1856         if status:
1857           diagnose = ""
1858           variants = os_inst.supported_variants
1859           parameters = os_inst.supported_parameters
1860           api_versions = os_inst.api_versions
1861         else:
1862           diagnose = os_inst
1863           variants = parameters = api_versions = []
1864         result.append((name, os_path, status, diagnose, variants,
1865                        parameters, api_versions))
1866
1867   return result
1868
1869
1870 def _TryOSFromDisk(name, base_dir=None):
1871   """Create an OS instance from disk.
1872
1873   This function will return an OS instance if the given name is a
1874   valid OS name.
1875
1876   @type base_dir: string
1877   @keyword base_dir: Base directory containing OS installations.
1878                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1879   @rtype: tuple
1880   @return: success and either the OS instance if we find a valid one,
1881       or error message
1882
1883   """
1884   if base_dir is None:
1885     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1886   else:
1887     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1888
1889   if os_dir is None:
1890     return False, "Directory for OS %s not found in search path" % name
1891
1892   status, api_versions = _OSOndiskAPIVersion(os_dir)
1893   if not status:
1894     # push the error up
1895     return status, api_versions
1896
1897   if not constants.OS_API_VERSIONS.intersection(api_versions):
1898     return False, ("API version mismatch for path '%s': found %s, want %s." %
1899                    (os_dir, api_versions, constants.OS_API_VERSIONS))
1900
1901   # OS Files dictionary, we will populate it with the absolute path names
1902   os_files = dict.fromkeys(constants.OS_SCRIPTS)
1903
1904   if max(api_versions) >= constants.OS_API_V15:
1905     os_files[constants.OS_VARIANTS_FILE] = ''
1906
1907   if max(api_versions) >= constants.OS_API_V20:
1908     os_files[constants.OS_PARAMETERS_FILE] = ''
1909   else:
1910     del os_files[constants.OS_SCRIPT_VERIFY]
1911
1912   for filename in os_files:
1913     os_files[filename] = utils.PathJoin(os_dir, filename)
1914
1915     try:
1916       st = os.stat(os_files[filename])
1917     except EnvironmentError, err:
1918       return False, ("File '%s' under path '%s' is missing (%s)" %
1919                      (filename, os_dir, _ErrnoOrStr(err)))
1920
1921     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1922       return False, ("File '%s' under path '%s' is not a regular file" %
1923                      (filename, os_dir))
1924
1925     if filename in constants.OS_SCRIPTS:
1926       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1927         return False, ("File '%s' under path '%s' is not executable" %
1928                        (filename, os_dir))
1929
1930   variants = []
1931   if constants.OS_VARIANTS_FILE in os_files:
1932     variants_file = os_files[constants.OS_VARIANTS_FILE]
1933     try:
1934       variants = utils.ReadFile(variants_file).splitlines()
1935     except EnvironmentError, err:
1936       return False, ("Error while reading the OS variants file at %s: %s" %
1937                      (variants_file, _ErrnoOrStr(err)))
1938     if not variants:
1939       return False, ("No supported os variant found")
1940
1941   parameters = []
1942   if constants.OS_PARAMETERS_FILE in os_files:
1943     parameters_file = os_files[constants.OS_PARAMETERS_FILE]
1944     try:
1945       parameters = utils.ReadFile(parameters_file).splitlines()
1946     except EnvironmentError, err:
1947       return False, ("Error while reading the OS parameters file at %s: %s" %
1948                      (parameters_file, _ErrnoOrStr(err)))
1949     parameters = [v.split(None, 1) for v in parameters]
1950
1951   os_obj = objects.OS(name=name, path=os_dir,
1952                       create_script=os_files[constants.OS_SCRIPT_CREATE],
1953                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
1954                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
1955                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
1956                       verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
1957                                                  None),
1958                       supported_variants=variants,
1959                       supported_parameters=parameters,
1960                       api_versions=api_versions)
1961   return True, os_obj
1962
1963
1964 def OSFromDisk(name, base_dir=None):
1965   """Create an OS instance from disk.
1966
1967   This function will return an OS instance if the given name is a
1968   valid OS name. Otherwise, it will raise an appropriate
1969   L{RPCFail} exception, detailing why this is not a valid OS.
1970
1971   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1972   an exception but returns true/false status data.
1973
1974   @type base_dir: string
1975   @keyword base_dir: Base directory containing OS installations.
1976                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1977   @rtype: L{objects.OS}
1978   @return: the OS instance if we find a valid one
1979   @raise RPCFail: if we don't find a valid OS
1980
1981   """
1982   name_only = objects.OS.GetName(name)
1983   status, payload = _TryOSFromDisk(name_only, base_dir)
1984
1985   if not status:
1986     _Fail(payload)
1987
1988   return payload
1989
1990
1991 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
1992   """Calculate the basic environment for an os script.
1993
1994   @type os_name: str
1995   @param os_name: full operating system name (including variant)
1996   @type inst_os: L{objects.OS}
1997   @param inst_os: operating system for which the environment is being built
1998   @type os_params: dict
1999   @param os_params: the OS parameters
2000   @type debug: integer
2001   @param debug: debug level (0 or 1, for OS Api 10)
2002   @rtype: dict
2003   @return: dict of environment variables
2004   @raise errors.BlockDeviceError: if the block device
2005       cannot be found
2006
2007   """
2008   result = {}
2009   api_version = \
2010     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2011   result['OS_API_VERSION'] = '%d' % api_version
2012   result['OS_NAME'] = inst_os.name
2013   result['DEBUG_LEVEL'] = '%d' % debug
2014
2015   # OS variants
2016   if api_version >= constants.OS_API_V15:
2017     variant = objects.OS.GetVariant(os_name)
2018     if not variant:
2019       variant = inst_os.supported_variants[0]
2020     result['OS_VARIANT'] = variant
2021
2022   # OS params
2023   for pname, pvalue in os_params.items():
2024     result['OSP_%s' % pname.upper()] = pvalue
2025
2026   return result
2027
2028
2029 def OSEnvironment(instance, inst_os, debug=0):
2030   """Calculate the environment for an os script.
2031
2032   @type instance: L{objects.Instance}
2033   @param instance: target instance for the os script run
2034   @type inst_os: L{objects.OS}
2035   @param inst_os: operating system for which the environment is being built
2036   @type debug: integer
2037   @param debug: debug level (0 or 1, for OS Api 10)
2038   @rtype: dict
2039   @return: dict of environment variables
2040   @raise errors.BlockDeviceError: if the block device
2041       cannot be found
2042
2043   """
2044   result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2045
2046   for attr in ["name", "os", "uuid", "ctime", "mtime"]:
2047     result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2048
2049   result['HYPERVISOR'] = instance.hypervisor
2050   result['DISK_COUNT'] = '%d' % len(instance.disks)
2051   result['NIC_COUNT'] = '%d' % len(instance.nics)
2052
2053   # Disks
2054   for idx, disk in enumerate(instance.disks):
2055     real_disk = _OpenRealBD(disk)
2056     result['DISK_%d_PATH' % idx] = real_disk.dev_path
2057     result['DISK_%d_ACCESS' % idx] = disk.mode
2058     if constants.HV_DISK_TYPE in instance.hvparams:
2059       result['DISK_%d_FRONTEND_TYPE' % idx] = \
2060         instance.hvparams[constants.HV_DISK_TYPE]
2061     if disk.dev_type in constants.LDS_BLOCK:
2062       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
2063     elif disk.dev_type == constants.LD_FILE:
2064       result['DISK_%d_BACKEND_TYPE' % idx] = \
2065         'file:%s' % disk.physical_id[0]
2066
2067   # NICs
2068   for idx, nic in enumerate(instance.nics):
2069     result['NIC_%d_MAC' % idx] = nic.mac
2070     if nic.ip:
2071       result['NIC_%d_IP' % idx] = nic.ip
2072     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
2073     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2074       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
2075     if nic.nicparams[constants.NIC_LINK]:
2076       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
2077     if constants.HV_NIC_TYPE in instance.hvparams:
2078       result['NIC_%d_FRONTEND_TYPE' % idx] = \
2079         instance.hvparams[constants.HV_NIC_TYPE]
2080
2081   # HV/BE params
2082   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2083     for key, value in source.items():
2084       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2085
2086   return result
2087
2088
2089 def BlockdevGrow(disk, amount):
2090   """Grow a stack of block devices.
2091
2092   This function is called recursively, with the childrens being the
2093   first ones to resize.
2094
2095   @type disk: L{objects.Disk}
2096   @param disk: the disk to be grown
2097   @rtype: (status, result)
2098   @return: a tuple with the status of the operation
2099       (True/False), and the errors message if status
2100       is False
2101
2102   """
2103   r_dev = _RecursiveFindBD(disk)
2104   if r_dev is None:
2105     _Fail("Cannot find block device %s", disk)
2106
2107   try:
2108     r_dev.Grow(amount)
2109   except errors.BlockDeviceError, err:
2110     _Fail("Failed to grow block device: %s", err, exc=True)
2111
2112
2113 def BlockdevSnapshot(disk):
2114   """Create a snapshot copy of a block device.
2115
2116   This function is called recursively, and the snapshot is actually created
2117   just for the leaf lvm backend device.
2118
2119   @type disk: L{objects.Disk}
2120   @param disk: the disk to be snapshotted
2121   @rtype: string
2122   @return: snapshot disk path
2123
2124   """
2125   if disk.dev_type == constants.LD_DRBD8:
2126     if not disk.children:
2127       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2128             disk.unique_id)
2129     return BlockdevSnapshot(disk.children[0])
2130   elif disk.dev_type == constants.LD_LV:
2131     r_dev = _RecursiveFindBD(disk)
2132     if r_dev is not None:
2133       # FIXME: choose a saner value for the snapshot size
2134       # let's stay on the safe side and ask for the full size, for now
2135       return r_dev.Snapshot(disk.size)
2136     else:
2137       _Fail("Cannot find block device %s", disk)
2138   else:
2139     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2140           disk.unique_id, disk.dev_type)
2141
2142
2143 def FinalizeExport(instance, snap_disks):
2144   """Write out the export configuration information.
2145
2146   @type instance: L{objects.Instance}
2147   @param instance: the instance which we export, used for
2148       saving configuration
2149   @type snap_disks: list of L{objects.Disk}
2150   @param snap_disks: list of snapshot block devices, which
2151       will be used to get the actual name of the dump file
2152
2153   @rtype: None
2154
2155   """
2156   destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2157   finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2158
2159   config = objects.SerializableConfigParser()
2160
2161   config.add_section(constants.INISECT_EXP)
2162   config.set(constants.INISECT_EXP, 'version', '0')
2163   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2164   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2165   config.set(constants.INISECT_EXP, 'os', instance.os)
2166   config.set(constants.INISECT_EXP, 'compression', 'gzip')
2167
2168   config.add_section(constants.INISECT_INS)
2169   config.set(constants.INISECT_INS, 'name', instance.name)
2170   config.set(constants.INISECT_INS, 'memory', '%d' %
2171              instance.beparams[constants.BE_MEMORY])
2172   config.set(constants.INISECT_INS, 'vcpus', '%d' %
2173              instance.beparams[constants.BE_VCPUS])
2174   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2175   config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2176
2177   nic_total = 0
2178   for nic_count, nic in enumerate(instance.nics):
2179     nic_total += 1
2180     config.set(constants.INISECT_INS, 'nic%d_mac' %
2181                nic_count, '%s' % nic.mac)
2182     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2183     for param in constants.NICS_PARAMETER_TYPES:
2184       config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2185                  '%s' % nic.nicparams.get(param, None))
2186   # TODO: redundant: on load can read nics until it doesn't exist
2187   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2188
2189   disk_total = 0
2190   for disk_count, disk in enumerate(snap_disks):
2191     if disk:
2192       disk_total += 1
2193       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2194                  ('%s' % disk.iv_name))
2195       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2196                  ('%s' % disk.physical_id[1]))
2197       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2198                  ('%d' % disk.size))
2199
2200   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2201
2202   # New-style hypervisor/backend parameters
2203
2204   config.add_section(constants.INISECT_HYP)
2205   for name, value in instance.hvparams.items():
2206     if name not in constants.HVC_GLOBALS:
2207       config.set(constants.INISECT_HYP, name, str(value))
2208
2209   config.add_section(constants.INISECT_BEP)
2210   for name, value in instance.beparams.items():
2211     config.set(constants.INISECT_BEP, name, str(value))
2212
2213   config.add_section(constants.INISECT_OSP)
2214   for name, value in instance.osparams.items():
2215     config.set(constants.INISECT_OSP, name, str(value))
2216
2217   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2218                   data=config.Dumps())
2219   shutil.rmtree(finaldestdir, ignore_errors=True)
2220   shutil.move(destdir, finaldestdir)
2221
2222
2223 def ExportInfo(dest):
2224   """Get export configuration information.
2225
2226   @type dest: str
2227   @param dest: directory containing the export
2228
2229   @rtype: L{objects.SerializableConfigParser}
2230   @return: a serializable config file containing the
2231       export info
2232
2233   """
2234   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2235
2236   config = objects.SerializableConfigParser()
2237   config.read(cff)
2238
2239   if (not config.has_section(constants.INISECT_EXP) or
2240       not config.has_section(constants.INISECT_INS)):
2241     _Fail("Export info file doesn't have the required fields")
2242
2243   return config.Dumps()
2244
2245
2246 def ListExports():
2247   """Return a list of exports currently available on this machine.
2248
2249   @rtype: list
2250   @return: list of the exports
2251
2252   """
2253   if os.path.isdir(constants.EXPORT_DIR):
2254     return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2255   else:
2256     _Fail("No exports directory")
2257
2258
2259 def RemoveExport(export):
2260   """Remove an existing export from the node.
2261
2262   @type export: str
2263   @param export: the name of the export to remove
2264   @rtype: None
2265
2266   """
2267   target = utils.PathJoin(constants.EXPORT_DIR, export)
2268
2269   try:
2270     shutil.rmtree(target)
2271   except EnvironmentError, err:
2272     _Fail("Error while removing the export: %s", err, exc=True)
2273
2274
2275 def BlockdevRename(devlist):
2276   """Rename a list of block devices.
2277
2278   @type devlist: list of tuples
2279   @param devlist: list of tuples of the form  (disk,
2280       new_logical_id, new_physical_id); disk is an
2281       L{objects.Disk} object describing the current disk,
2282       and new logical_id/physical_id is the name we
2283       rename it to
2284   @rtype: boolean
2285   @return: True if all renames succeeded, False otherwise
2286
2287   """
2288   msgs = []
2289   result = True
2290   for disk, unique_id in devlist:
2291     dev = _RecursiveFindBD(disk)
2292     if dev is None:
2293       msgs.append("Can't find device %s in rename" % str(disk))
2294       result = False
2295       continue
2296     try:
2297       old_rpath = dev.dev_path
2298       dev.Rename(unique_id)
2299       new_rpath = dev.dev_path
2300       if old_rpath != new_rpath:
2301         DevCacheManager.RemoveCache(old_rpath)
2302         # FIXME: we should add the new cache information here, like:
2303         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2304         # but we don't have the owner here - maybe parse from existing
2305         # cache? for now, we only lose lvm data when we rename, which
2306         # is less critical than DRBD or MD
2307     except errors.BlockDeviceError, err:
2308       msgs.append("Can't rename device '%s' to '%s': %s" %
2309                   (dev, unique_id, err))
2310       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2311       result = False
2312   if not result:
2313     _Fail("; ".join(msgs))
2314
2315
2316 def _TransformFileStorageDir(file_storage_dir):
2317   """Checks whether given file_storage_dir is valid.
2318
2319   Checks wheter the given file_storage_dir is within the cluster-wide
2320   default file_storage_dir stored in SimpleStore. Only paths under that
2321   directory are allowed.
2322
2323   @type file_storage_dir: str
2324   @param file_storage_dir: the path to check
2325
2326   @return: the normalized path if valid, None otherwise
2327
2328   """
2329   if not constants.ENABLE_FILE_STORAGE:
2330     _Fail("File storage disabled at configure time")
2331   cfg = _GetConfig()
2332   file_storage_dir = os.path.normpath(file_storage_dir)
2333   base_file_storage_dir = cfg.GetFileStorageDir()
2334   if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2335       base_file_storage_dir):
2336     _Fail("File storage directory '%s' is not under base file"
2337           " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2338   return file_storage_dir
2339
2340
2341 def CreateFileStorageDir(file_storage_dir):
2342   """Create file storage directory.
2343
2344   @type file_storage_dir: str
2345   @param file_storage_dir: directory to create
2346
2347   @rtype: tuple
2348   @return: tuple with first element a boolean indicating wheter dir
2349       creation was successful or not
2350
2351   """
2352   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2353   if os.path.exists(file_storage_dir):
2354     if not os.path.isdir(file_storage_dir):
2355       _Fail("Specified storage dir '%s' is not a directory",
2356             file_storage_dir)
2357   else:
2358     try:
2359       os.makedirs(file_storage_dir, 0750)
2360     except OSError, err:
2361       _Fail("Cannot create file storage directory '%s': %s",
2362             file_storage_dir, err, exc=True)
2363
2364
2365 def RemoveFileStorageDir(file_storage_dir):
2366   """Remove file storage directory.
2367
2368   Remove it only if it's empty. If not log an error and return.
2369
2370   @type file_storage_dir: str
2371   @param file_storage_dir: the directory we should cleanup
2372   @rtype: tuple (success,)
2373   @return: tuple of one element, C{success}, denoting
2374       whether the operation was successful
2375
2376   """
2377   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2378   if os.path.exists(file_storage_dir):
2379     if not os.path.isdir(file_storage_dir):
2380       _Fail("Specified Storage directory '%s' is not a directory",
2381             file_storage_dir)
2382     # deletes dir only if empty, otherwise we want to fail the rpc call
2383     try:
2384       os.rmdir(file_storage_dir)
2385     except OSError, err:
2386       _Fail("Cannot remove file storage directory '%s': %s",
2387             file_storage_dir, err)
2388
2389
2390 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2391   """Rename the file storage directory.
2392
2393   @type old_file_storage_dir: str
2394   @param old_file_storage_dir: the current path
2395   @type new_file_storage_dir: str
2396   @param new_file_storage_dir: the name we should rename to
2397   @rtype: tuple (success,)
2398   @return: tuple of one element, C{success}, denoting
2399       whether the operation was successful
2400
2401   """
2402   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2403   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2404   if not os.path.exists(new_file_storage_dir):
2405     if os.path.isdir(old_file_storage_dir):
2406       try:
2407         os.rename(old_file_storage_dir, new_file_storage_dir)
2408       except OSError, err:
2409         _Fail("Cannot rename '%s' to '%s': %s",
2410               old_file_storage_dir, new_file_storage_dir, err)
2411     else:
2412       _Fail("Specified storage dir '%s' is not a directory",
2413             old_file_storage_dir)
2414   else:
2415     if os.path.exists(old_file_storage_dir):
2416       _Fail("Cannot rename '%s' to '%s': both locations exist",
2417             old_file_storage_dir, new_file_storage_dir)
2418
2419
2420 def _EnsureJobQueueFile(file_name):
2421   """Checks whether the given filename is in the queue directory.
2422
2423   @type file_name: str
2424   @param file_name: the file name we should check
2425   @rtype: None
2426   @raises RPCFail: if the file is not valid
2427
2428   """
2429   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2430   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2431
2432   if not result:
2433     _Fail("Passed job queue file '%s' does not belong to"
2434           " the queue directory '%s'", file_name, queue_dir)
2435
2436
2437 def JobQueueUpdate(file_name, content):
2438   """Updates a file in the queue directory.
2439
2440   This is just a wrapper over L{utils.WriteFile}, with proper
2441   checking.
2442
2443   @type file_name: str
2444   @param file_name: the job file name
2445   @type content: str
2446   @param content: the new job contents
2447   @rtype: boolean
2448   @return: the success of the operation
2449
2450   """
2451   _EnsureJobQueueFile(file_name)
2452   getents = runtime.GetEnts()
2453
2454   # Write and replace the file atomically
2455   utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2456                   gid=getents.masterd_gid)
2457
2458
2459 def JobQueueRename(old, new):
2460   """Renames a job queue file.
2461
2462   This is just a wrapper over os.rename with proper checking.
2463
2464   @type old: str
2465   @param old: the old (actual) file name
2466   @type new: str
2467   @param new: the desired file name
2468   @rtype: tuple
2469   @return: the success of the operation and payload
2470
2471   """
2472   _EnsureJobQueueFile(old)
2473   _EnsureJobQueueFile(new)
2474
2475   utils.RenameFile(old, new, mkdir=True)
2476
2477
2478 def BlockdevClose(instance_name, disks):
2479   """Closes the given block devices.
2480
2481   This means they will be switched to secondary mode (in case of
2482   DRBD).
2483
2484   @param instance_name: if the argument is not empty, the symlinks
2485       of this instance will be removed
2486   @type disks: list of L{objects.Disk}
2487   @param disks: the list of disks to be closed
2488   @rtype: tuple (success, message)
2489   @return: a tuple of success and message, where success
2490       indicates the succes of the operation, and message
2491       which will contain the error details in case we
2492       failed
2493
2494   """
2495   bdevs = []
2496   for cf in disks:
2497     rd = _RecursiveFindBD(cf)
2498     if rd is None:
2499       _Fail("Can't find device %s", cf)
2500     bdevs.append(rd)
2501
2502   msg = []
2503   for rd in bdevs:
2504     try:
2505       rd.Close()
2506     except errors.BlockDeviceError, err:
2507       msg.append(str(err))
2508   if msg:
2509     _Fail("Can't make devices secondary: %s", ",".join(msg))
2510   else:
2511     if instance_name:
2512       _RemoveBlockDevLinks(instance_name, disks)
2513
2514
2515 def ValidateHVParams(hvname, hvparams):
2516   """Validates the given hypervisor parameters.
2517
2518   @type hvname: string
2519   @param hvname: the hypervisor name
2520   @type hvparams: dict
2521   @param hvparams: the hypervisor parameters to be validated
2522   @rtype: None
2523
2524   """
2525   try:
2526     hv_type = hypervisor.GetHypervisor(hvname)
2527     hv_type.ValidateParameters(hvparams)
2528   except errors.HypervisorError, err:
2529     _Fail(str(err), log=False)
2530
2531
2532 def _CheckOSPList(os_obj, parameters):
2533   """Check whether a list of parameters is supported by the OS.
2534
2535   @type os_obj: L{objects.OS}
2536   @param os_obj: OS object to check
2537   @type parameters: list
2538   @param parameters: the list of parameters to check
2539
2540   """
2541   supported = [v[0] for v in os_obj.supported_parameters]
2542   delta = frozenset(parameters).difference(supported)
2543   if delta:
2544     _Fail("The following parameters are not supported"
2545           " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2546
2547
2548 def ValidateOS(required, osname, checks, osparams):
2549   """Validate the given OS' parameters.
2550
2551   @type required: boolean
2552   @param required: whether absence of the OS should translate into
2553       failure or not
2554   @type osname: string
2555   @param osname: the OS to be validated
2556   @type checks: list
2557   @param checks: list of the checks to run (currently only 'parameters')
2558   @type osparams: dict
2559   @param osparams: dictionary with OS parameters
2560   @rtype: boolean
2561   @return: True if the validation passed, or False if the OS was not
2562       found and L{required} was false
2563
2564   """
2565   if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2566     _Fail("Unknown checks required for OS %s: %s", osname,
2567           set(checks).difference(constants.OS_VALIDATE_CALLS))
2568
2569   name_only = objects.OS.GetName(osname)
2570   status, tbv = _TryOSFromDisk(name_only, None)
2571
2572   if not status:
2573     if required:
2574       _Fail(tbv)
2575     else:
2576       return False
2577
2578   if max(tbv.api_versions) < constants.OS_API_V20:
2579     return True
2580
2581   if constants.OS_VALIDATE_PARAMETERS in checks:
2582     _CheckOSPList(tbv, osparams.keys())
2583
2584   validate_env = OSCoreEnv(osname, tbv, osparams)
2585   result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2586                         cwd=tbv.path)
2587   if result.failed:
2588     logging.error("os validate command '%s' returned error: %s output: %s",
2589                   result.cmd, result.fail_reason, result.output)
2590     _Fail("OS validation script failed (%s), output: %s",
2591           result.fail_reason, result.output, log=False)
2592
2593   return True
2594
2595
2596 def DemoteFromMC():
2597   """Demotes the current node from master candidate role.
2598
2599   """
2600   # try to ensure we're not the master by mistake
2601   master, myself = ssconf.GetMasterAndMyself()
2602   if master == myself:
2603     _Fail("ssconf status shows I'm the master node, will not demote")
2604
2605   result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2606   if not result.failed:
2607     _Fail("The master daemon is running, will not demote")
2608
2609   try:
2610     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2611       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2612   except EnvironmentError, err:
2613     if err.errno != errno.ENOENT:
2614       _Fail("Error while backing up cluster file: %s", err, exc=True)
2615
2616   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2617
2618
2619 def _GetX509Filenames(cryptodir, name):
2620   """Returns the full paths for the private key and certificate.
2621
2622   """
2623   return (utils.PathJoin(cryptodir, name),
2624           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2625           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2626
2627
2628 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2629   """Creates a new X509 certificate for SSL/TLS.
2630
2631   @type validity: int
2632   @param validity: Validity in seconds
2633   @rtype: tuple; (string, string)
2634   @return: Certificate name and public part
2635
2636   """
2637   (key_pem, cert_pem) = \
2638     utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2639                                      min(validity, _MAX_SSL_CERT_VALIDITY))
2640
2641   cert_dir = tempfile.mkdtemp(dir=cryptodir,
2642                               prefix="x509-%s-" % utils.TimestampForFilename())
2643   try:
2644     name = os.path.basename(cert_dir)
2645     assert len(name) > 5
2646
2647     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2648
2649     utils.WriteFile(key_file, mode=0400, data=key_pem)
2650     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2651
2652     # Never return private key as it shouldn't leave the node
2653     return (name, cert_pem)
2654   except Exception:
2655     shutil.rmtree(cert_dir, ignore_errors=True)
2656     raise
2657
2658
2659 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2660   """Removes a X509 certificate.
2661
2662   @type name: string
2663   @param name: Certificate name
2664
2665   """
2666   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2667
2668   utils.RemoveFile(key_file)
2669   utils.RemoveFile(cert_file)
2670
2671   try:
2672     os.rmdir(cert_dir)
2673   except EnvironmentError, err:
2674     _Fail("Cannot remove certificate directory '%s': %s",
2675           cert_dir, err)
2676
2677
2678 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2679   """Returns the command for the requested input/output.
2680
2681   @type instance: L{objects.Instance}
2682   @param instance: The instance object
2683   @param mode: Import/export mode
2684   @param ieio: Input/output type
2685   @param ieargs: Input/output arguments
2686
2687   """
2688   assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2689
2690   env = None
2691   prefix = None
2692   suffix = None
2693   exp_size = None
2694
2695   if ieio == constants.IEIO_FILE:
2696     (filename, ) = ieargs
2697
2698     if not utils.IsNormAbsPath(filename):
2699       _Fail("Path '%s' is not normalized or absolute", filename)
2700
2701     directory = os.path.normpath(os.path.dirname(filename))
2702
2703     if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2704         constants.EXPORT_DIR):
2705       _Fail("File '%s' is not under exports directory '%s'",
2706             filename, constants.EXPORT_DIR)
2707
2708     # Create directory
2709     utils.Makedirs(directory, mode=0750)
2710
2711     quoted_filename = utils.ShellQuote(filename)
2712
2713     if mode == constants.IEM_IMPORT:
2714       suffix = "> %s" % quoted_filename
2715     elif mode == constants.IEM_EXPORT:
2716       suffix = "< %s" % quoted_filename
2717
2718       # Retrieve file size
2719       try:
2720         st = os.stat(filename)
2721       except EnvironmentError, err:
2722         logging.error("Can't stat(2) %s: %s", filename, err)
2723       else:
2724         exp_size = utils.BytesToMebibyte(st.st_size)
2725
2726   elif ieio == constants.IEIO_RAW_DISK:
2727     (disk, ) = ieargs
2728
2729     real_disk = _OpenRealBD(disk)
2730
2731     if mode == constants.IEM_IMPORT:
2732       # we set here a smaller block size as, due to transport buffering, more
2733       # than 64-128k will mostly ignored; we use nocreat to fail if the device
2734       # is not already there or we pass a wrong path; we use notrunc to no
2735       # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2736       # much memory; this means that at best, we flush every 64k, which will
2737       # not be very fast
2738       suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2739                                     " bs=%s oflag=dsync"),
2740                                     real_disk.dev_path,
2741                                     str(64 * 1024))
2742
2743     elif mode == constants.IEM_EXPORT:
2744       # the block size on the read dd is 1MiB to match our units
2745       prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2746                                    real_disk.dev_path,
2747                                    str(1024 * 1024), # 1 MB
2748                                    str(disk.size))
2749       exp_size = disk.size
2750
2751   elif ieio == constants.IEIO_SCRIPT:
2752     (disk, disk_index, ) = ieargs
2753
2754     assert isinstance(disk_index, (int, long))
2755
2756     real_disk = _OpenRealBD(disk)
2757
2758     inst_os = OSFromDisk(instance.os)
2759     env = OSEnvironment(instance, inst_os)
2760
2761     if mode == constants.IEM_IMPORT:
2762       env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2763       env["IMPORT_INDEX"] = str(disk_index)
2764       script = inst_os.import_script
2765
2766     elif mode == constants.IEM_EXPORT:
2767       env["EXPORT_DEVICE"] = real_disk.dev_path
2768       env["EXPORT_INDEX"] = str(disk_index)
2769       script = inst_os.export_script
2770
2771     # TODO: Pass special environment only to script
2772     script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2773
2774     if mode == constants.IEM_IMPORT:
2775       suffix = "| %s" % script_cmd
2776
2777     elif mode == constants.IEM_EXPORT:
2778       prefix = "%s |" % script_cmd
2779
2780     # Let script predict size
2781     exp_size = constants.IE_CUSTOM_SIZE
2782
2783   else:
2784     _Fail("Invalid %s I/O mode %r", mode, ieio)
2785
2786   return (env, prefix, suffix, exp_size)
2787
2788
2789 def _CreateImportExportStatusDir(prefix):
2790   """Creates status directory for import/export.
2791
2792   """
2793   return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2794                           prefix=("%s-%s-" %
2795                                   (prefix, utils.TimestampForFilename())))
2796
2797
2798 def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
2799   """Starts an import or export daemon.
2800
2801   @param mode: Import/output mode
2802   @type opts: L{objects.ImportExportOptions}
2803   @param opts: Daemon options
2804   @type host: string
2805   @param host: Remote host for export (None for import)
2806   @type port: int
2807   @param port: Remote port for export (None for import)
2808   @type instance: L{objects.Instance}
2809   @param instance: Instance object
2810   @param ieio: Input/output type
2811   @param ieioargs: Input/output arguments
2812
2813   """
2814   if mode == constants.IEM_IMPORT:
2815     prefix = "import"
2816
2817     if not (host is None and port is None):
2818       _Fail("Can not specify host or port on import")
2819
2820   elif mode == constants.IEM_EXPORT:
2821     prefix = "export"
2822
2823     if host is None or port is None:
2824       _Fail("Host and port must be specified for an export")
2825
2826   else:
2827     _Fail("Invalid mode %r", mode)
2828
2829   if (opts.key_name is None) ^ (opts.ca_pem is None):
2830     _Fail("Cluster certificate can only be used for both key and CA")
2831
2832   (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
2833     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
2834
2835   if opts.key_name is None:
2836     # Use server.pem
2837     key_path = constants.NODED_CERT_FILE
2838     cert_path = constants.NODED_CERT_FILE
2839     assert opts.ca_pem is None
2840   else:
2841     (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
2842                                                  opts.key_name)
2843     assert opts.ca_pem is not None
2844
2845   for i in [key_path, cert_path]:
2846     if not os.path.exists(i):
2847       _Fail("File '%s' does not exist" % i)
2848
2849   status_dir = _CreateImportExportStatusDir(prefix)
2850   try:
2851     status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
2852     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
2853     ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
2854
2855     if opts.ca_pem is None:
2856       # Use server.pem
2857       ca = utils.ReadFile(constants.NODED_CERT_FILE)
2858     else:
2859       ca = opts.ca_pem
2860
2861     # Write CA file
2862     utils.WriteFile(ca_file, data=ca, mode=0400)
2863
2864     cmd = [
2865       constants.IMPORT_EXPORT_DAEMON,
2866       status_file, mode,
2867       "--key=%s" % key_path,
2868       "--cert=%s" % cert_path,
2869       "--ca=%s" % ca_file,
2870       ]
2871
2872     if host:
2873       cmd.append("--host=%s" % host)
2874
2875     if port:
2876       cmd.append("--port=%s" % port)
2877
2878     if opts.compress:
2879       cmd.append("--compress=%s" % opts.compress)
2880
2881     if opts.magic:
2882       cmd.append("--magic=%s" % opts.magic)
2883
2884     if exp_size is not None:
2885       cmd.append("--expected-size=%s" % exp_size)
2886
2887     if cmd_prefix:
2888       cmd.append("--cmd-prefix=%s" % cmd_prefix)
2889
2890     if cmd_suffix:
2891       cmd.append("--cmd-suffix=%s" % cmd_suffix)
2892
2893     if mode == constants.IEM_EXPORT:
2894       # Retry connection a few times when connecting to remote peer
2895       cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
2896       cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
2897     elif opts.connect_timeout is not None:
2898       assert mode == constants.IEM_IMPORT
2899       # Overall timeout for establishing connection while listening
2900       cmd.append("--connect-timeout=%s" % opts.connect_timeout)
2901
2902     logfile = _InstanceLogName(prefix, instance.os, instance.name)
2903
2904     # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
2905     # support for receiving a file descriptor for output
2906     utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
2907                       output=logfile)
2908
2909     # The import/export name is simply the status directory name
2910     return os.path.basename(status_dir)
2911
2912   except Exception:
2913     shutil.rmtree(status_dir, ignore_errors=True)
2914     raise
2915
2916
2917 def GetImportExportStatus(names):
2918   """Returns import/export daemon status.
2919
2920   @type names: sequence
2921   @param names: List of names
2922   @rtype: List of dicts
2923   @return: Returns a list of the state of each named import/export or None if a
2924            status couldn't be read
2925
2926   """
2927   result = []
2928
2929   for name in names:
2930     status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
2931                                  _IES_STATUS_FILE)
2932
2933     try:
2934       data = utils.ReadFile(status_file)
2935     except EnvironmentError, err:
2936       if err.errno != errno.ENOENT:
2937         raise
2938       data = None
2939
2940     if not data:
2941       result.append(None)
2942       continue
2943
2944     result.append(serializer.LoadJson(data))
2945
2946   return result
2947
2948
2949 def AbortImportExport(name):
2950   """Sends SIGTERM to a running import/export daemon.
2951
2952   """
2953   logging.info("Abort import/export %s", name)
2954
2955   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2956   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2957
2958   if pid:
2959     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
2960                  name, pid)
2961     utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
2962
2963
2964 def CleanupImportExport(name):
2965   """Cleanup after an import or export.
2966
2967   If the import/export daemon is still running it's killed. Afterwards the
2968   whole status directory is removed.
2969
2970   """
2971   logging.info("Finalizing import/export %s", name)
2972
2973   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2974
2975   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2976
2977   if pid:
2978     logging.info("Import/export %s is still running with PID %s",
2979                  name, pid)
2980     utils.KillProcess(pid, waitpid=False)
2981
2982   shutil.rmtree(status_dir, ignore_errors=True)
2983
2984
2985 def _FindDisks(nodes_ip, disks):
2986   """Sets the physical ID on disks and returns the block devices.
2987
2988   """
2989   # set the correct physical ID
2990   my_name = netutils.Hostname.GetSysName()
2991   for cf in disks:
2992     cf.SetPhysicalID(my_name, nodes_ip)
2993
2994   bdevs = []
2995
2996   for cf in disks:
2997     rd = _RecursiveFindBD(cf)
2998     if rd is None:
2999       _Fail("Can't find device %s", cf)
3000     bdevs.append(rd)
3001   return bdevs
3002
3003
3004 def DrbdDisconnectNet(nodes_ip, disks):
3005   """Disconnects the network on a list of drbd devices.
3006
3007   """
3008   bdevs = _FindDisks(nodes_ip, disks)
3009
3010   # disconnect disks
3011   for rd in bdevs:
3012     try:
3013       rd.DisconnectNet()
3014     except errors.BlockDeviceError, err:
3015       _Fail("Can't change network configuration to standalone mode: %s",
3016             err, exc=True)
3017
3018
3019 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3020   """Attaches the network on a list of drbd devices.
3021
3022   """
3023   bdevs = _FindDisks(nodes_ip, disks)
3024
3025   if multimaster:
3026     for idx, rd in enumerate(bdevs):
3027       try:
3028         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3029       except EnvironmentError, err:
3030         _Fail("Can't create symlink: %s", err)
3031   # reconnect disks, switch to new master configuration and if
3032   # needed primary mode
3033   for rd in bdevs:
3034     try:
3035       rd.AttachNet(multimaster)
3036     except errors.BlockDeviceError, err:
3037       _Fail("Can't change network configuration: %s", err)
3038
3039   # wait until the disks are connected; we need to retry the re-attach
3040   # if the device becomes standalone, as this might happen if the one
3041   # node disconnects and reconnects in a different mode before the
3042   # other node reconnects; in this case, one or both of the nodes will
3043   # decide it has wrong configuration and switch to standalone
3044
3045   def _Attach():
3046     all_connected = True
3047
3048     for rd in bdevs:
3049       stats = rd.GetProcStatus()
3050
3051       all_connected = (all_connected and
3052                        (stats.is_connected or stats.is_in_resync))
3053
3054       if stats.is_standalone:
3055         # peer had different config info and this node became
3056         # standalone, even though this should not happen with the
3057         # new staged way of changing disk configs
3058         try:
3059           rd.AttachNet(multimaster)
3060         except errors.BlockDeviceError, err:
3061           _Fail("Can't change network configuration: %s", err)
3062
3063     if not all_connected:
3064       raise utils.RetryAgain()
3065
3066   try:
3067     # Start with a delay of 100 miliseconds and go up to 5 seconds
3068     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3069   except utils.RetryTimeout:
3070     _Fail("Timeout in disk reconnecting")
3071
3072   if multimaster:
3073     # change to primary mode
3074     for rd in bdevs:
3075       try:
3076         rd.Open()
3077       except errors.BlockDeviceError, err:
3078         _Fail("Can't change to primary mode: %s", err)
3079
3080
3081 def DrbdWaitSync(nodes_ip, disks):
3082   """Wait until DRBDs have synchronized.
3083
3084   """
3085   def _helper(rd):
3086     stats = rd.GetProcStatus()
3087     if not (stats.is_connected or stats.is_in_resync):
3088       raise utils.RetryAgain()
3089     return stats
3090
3091   bdevs = _FindDisks(nodes_ip, disks)
3092
3093   min_resync = 100
3094   alldone = True
3095   for rd in bdevs:
3096     try:
3097       # poll each second for 15 seconds
3098       stats = utils.Retry(_helper, 1, 15, args=[rd])
3099     except utils.RetryTimeout:
3100       stats = rd.GetProcStatus()
3101       # last check
3102       if not (stats.is_connected or stats.is_in_resync):
3103         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3104     alldone = alldone and (not stats.is_in_resync)
3105     if stats.sync_percent is not None:
3106       min_resync = min(min_resync, stats.sync_percent)
3107
3108   return (alldone, min_resync)
3109
3110
3111 def GetDrbdUsermodeHelper():
3112   """Returns DRBD usermode helper currently configured.
3113
3114   """
3115   try:
3116     return bdev.BaseDRBD.GetUsermodeHelper()
3117   except errors.BlockDeviceError, err:
3118     _Fail(str(err))
3119
3120
3121 def PowercycleNode(hypervisor_type):
3122   """Hard-powercycle the node.
3123
3124   Because we need to return first, and schedule the powercycle in the
3125   background, we won't be able to report failures nicely.
3126
3127   """
3128   hyper = hypervisor.GetHypervisor(hypervisor_type)
3129   try:
3130     pid = os.fork()
3131   except OSError:
3132     # if we can't fork, we'll pretend that we're in the child process
3133     pid = 0
3134   if pid > 0:
3135     return "Reboot scheduled in 5 seconds"
3136   # ensure the child is running on ram
3137   try:
3138     utils.Mlockall()
3139   except Exception: # pylint: disable-msg=W0703
3140     pass
3141   time.sleep(5)
3142   hyper.PowercycleNode()
3143
3144
3145 class HooksRunner(object):
3146   """Hook runner.
3147
3148   This class is instantiated on the node side (ganeti-noded) and not
3149   on the master side.
3150
3151   """
3152   def __init__(self, hooks_base_dir=None):
3153     """Constructor for hooks runner.
3154
3155     @type hooks_base_dir: str or None
3156     @param hooks_base_dir: if not None, this overrides the
3157         L{constants.HOOKS_BASE_DIR} (useful for unittests)
3158
3159     """
3160     if hooks_base_dir is None:
3161       hooks_base_dir = constants.HOOKS_BASE_DIR
3162     # yeah, _BASE_DIR is not valid for attributes, we use it like a
3163     # constant
3164     self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
3165
3166   def RunHooks(self, hpath, phase, env):
3167     """Run the scripts in the hooks directory.
3168
3169     @type hpath: str
3170     @param hpath: the path to the hooks directory which
3171         holds the scripts
3172     @type phase: str
3173     @param phase: either L{constants.HOOKS_PHASE_PRE} or
3174         L{constants.HOOKS_PHASE_POST}
3175     @type env: dict
3176     @param env: dictionary with the environment for the hook
3177     @rtype: list
3178     @return: list of 3-element tuples:
3179       - script path
3180       - script result, either L{constants.HKR_SUCCESS} or
3181         L{constants.HKR_FAIL}
3182       - output of the script
3183
3184     @raise errors.ProgrammerError: for invalid input
3185         parameters
3186
3187     """
3188     if phase == constants.HOOKS_PHASE_PRE:
3189       suffix = "pre"
3190     elif phase == constants.HOOKS_PHASE_POST:
3191       suffix = "post"
3192     else:
3193       _Fail("Unknown hooks phase '%s'", phase)
3194
3195
3196     subdir = "%s-%s.d" % (hpath, suffix)
3197     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3198
3199     results = []
3200
3201     if not os.path.isdir(dir_name):
3202       # for non-existing/non-dirs, we simply exit instead of logging a
3203       # warning at every operation
3204       return results
3205
3206     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3207
3208     for (relname, relstatus, runresult)  in runparts_results:
3209       if relstatus == constants.RUNPARTS_SKIP:
3210         rrval = constants.HKR_SKIP
3211         output = ""
3212       elif relstatus == constants.RUNPARTS_ERR:
3213         rrval = constants.HKR_FAIL
3214         output = "Hook script execution error: %s" % runresult
3215       elif relstatus == constants.RUNPARTS_RUN:
3216         if runresult.failed:
3217           rrval = constants.HKR_FAIL
3218         else:
3219           rrval = constants.HKR_SUCCESS
3220         output = utils.SafeEncode(runresult.output.strip())
3221       results.append(("%s/%s" % (subdir, relname), rrval, output))
3222
3223     return results
3224
3225
3226 class IAllocatorRunner(object):
3227   """IAllocator runner.
3228
3229   This class is instantiated on the node side (ganeti-noded) and not on
3230   the master side.
3231
3232   """
3233   @staticmethod
3234   def Run(name, idata):
3235     """Run an iallocator script.
3236
3237     @type name: str
3238     @param name: the iallocator script name
3239     @type idata: str
3240     @param idata: the allocator input data
3241
3242     @rtype: tuple
3243     @return: two element tuple of:
3244        - status
3245        - either error message or stdout of allocator (for success)
3246
3247     """
3248     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3249                                   os.path.isfile)
3250     if alloc_script is None:
3251       _Fail("iallocator module '%s' not found in the search path", name)
3252
3253     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3254     try:
3255       os.write(fd, idata)
3256       os.close(fd)
3257       result = utils.RunCmd([alloc_script, fin_name])
3258       if result.failed:
3259         _Fail("iallocator module '%s' failed: %s, output '%s'",
3260               name, result.fail_reason, result.output)
3261     finally:
3262       os.unlink(fin_name)
3263
3264     return result.stdout
3265
3266
3267 class DevCacheManager(object):
3268   """Simple class for managing a cache of block device information.
3269
3270   """
3271   _DEV_PREFIX = "/dev/"
3272   _ROOT_DIR = constants.BDEV_CACHE_DIR
3273
3274   @classmethod
3275   def _ConvertPath(cls, dev_path):
3276     """Converts a /dev/name path to the cache file name.
3277
3278     This replaces slashes with underscores and strips the /dev
3279     prefix. It then returns the full path to the cache file.
3280
3281     @type dev_path: str
3282     @param dev_path: the C{/dev/} path name
3283     @rtype: str
3284     @return: the converted path name
3285
3286     """
3287     if dev_path.startswith(cls._DEV_PREFIX):
3288       dev_path = dev_path[len(cls._DEV_PREFIX):]
3289     dev_path = dev_path.replace("/", "_")
3290     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3291     return fpath
3292
3293   @classmethod
3294   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3295     """Updates the cache information for a given device.
3296
3297     @type dev_path: str
3298     @param dev_path: the pathname of the device
3299     @type owner: str
3300     @param owner: the owner (instance name) of the device
3301     @type on_primary: bool
3302     @param on_primary: whether this is the primary
3303         node nor not
3304     @type iv_name: str
3305     @param iv_name: the instance-visible name of the
3306         device, as in objects.Disk.iv_name
3307
3308     @rtype: None
3309
3310     """
3311     if dev_path is None:
3312       logging.error("DevCacheManager.UpdateCache got a None dev_path")
3313       return
3314     fpath = cls._ConvertPath(dev_path)
3315     if on_primary:
3316       state = "primary"
3317     else:
3318       state = "secondary"
3319     if iv_name is None:
3320       iv_name = "not_visible"
3321     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3322     try:
3323       utils.WriteFile(fpath, data=fdata)
3324     except EnvironmentError, err:
3325       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3326
3327   @classmethod
3328   def RemoveCache(cls, dev_path):
3329     """Remove data for a dev_path.
3330
3331     This is just a wrapper over L{utils.RemoveFile} with a converted
3332     path name and logging.
3333
3334     @type dev_path: str
3335     @param dev_path: the pathname of the device
3336
3337     @rtype: None
3338
3339     """
3340     if dev_path is None:
3341       logging.error("DevCacheManager.RemoveCache got a None dev_path")
3342       return
3343     fpath = cls._ConvertPath(dev_path)
3344     try:
3345       utils.RemoveFile(fpath)
3346     except EnvironmentError, err:
3347       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)