Change the meaning of call_node_start_master
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable-msg=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50 import signal
51
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62
63
64 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
65 _ALLOWED_CLEAN_DIRS = frozenset([
66   constants.DATA_DIR,
67   constants.JOB_QUEUE_ARCHIVE_DIR,
68   constants.QUEUE_DIR,
69   constants.CRYPTO_KEYS_DIR,
70   ])
71 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
72 _X509_KEY_FILE = "key"
73 _X509_CERT_FILE = "cert"
74 _IES_STATUS_FILE = "status"
75 _IES_PID_FILE = "pid"
76 _IES_CA_FILE = "ca"
77
78
79 class RPCFail(Exception):
80   """Class denoting RPC failure.
81
82   Its argument is the error message.
83
84   """
85
86
87 def _Fail(msg, *args, **kwargs):
88   """Log an error and the raise an RPCFail exception.
89
90   This exception is then handled specially in the ganeti daemon and
91   turned into a 'failed' return type. As such, this function is a
92   useful shortcut for logging the error and returning it to the master
93   daemon.
94
95   @type msg: string
96   @param msg: the text of the exception
97   @raise RPCFail
98
99   """
100   if args:
101     msg = msg % args
102   if "log" not in kwargs or kwargs["log"]: # if we should log this error
103     if "exc" in kwargs and kwargs["exc"]:
104       logging.exception(msg)
105     else:
106       logging.error(msg)
107   raise RPCFail(msg)
108
109
110 def _GetConfig():
111   """Simple wrapper to return a SimpleStore.
112
113   @rtype: L{ssconf.SimpleStore}
114   @return: a SimpleStore instance
115
116   """
117   return ssconf.SimpleStore()
118
119
120 def _GetSshRunner(cluster_name):
121   """Simple wrapper to return an SshRunner.
122
123   @type cluster_name: str
124   @param cluster_name: the cluster name, which is needed
125       by the SshRunner constructor
126   @rtype: L{ssh.SshRunner}
127   @return: an SshRunner instance
128
129   """
130   return ssh.SshRunner(cluster_name)
131
132
133 def _Decompress(data):
134   """Unpacks data compressed by the RPC client.
135
136   @type data: list or tuple
137   @param data: Data sent by RPC client
138   @rtype: str
139   @return: Decompressed data
140
141   """
142   assert isinstance(data, (list, tuple))
143   assert len(data) == 2
144   (encoding, content) = data
145   if encoding == constants.RPC_ENCODING_NONE:
146     return content
147   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
148     return zlib.decompress(base64.b64decode(content))
149   else:
150     raise AssertionError("Unknown data encoding")
151
152
153 def _CleanDirectory(path, exclude=None):
154   """Removes all regular files in a directory.
155
156   @type path: str
157   @param path: the directory to clean
158   @type exclude: list
159   @param exclude: list of files to be excluded, defaults
160       to the empty list
161
162   """
163   if path not in _ALLOWED_CLEAN_DIRS:
164     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
165           path)
166
167   if not os.path.isdir(path):
168     return
169   if exclude is None:
170     exclude = []
171   else:
172     # Normalize excluded paths
173     exclude = [os.path.normpath(i) for i in exclude]
174
175   for rel_name in utils.ListVisibleFiles(path):
176     full_name = utils.PathJoin(path, rel_name)
177     if full_name in exclude:
178       continue
179     if os.path.isfile(full_name) and not os.path.islink(full_name):
180       utils.RemoveFile(full_name)
181
182
183 def _BuildUploadFileList():
184   """Build the list of allowed upload files.
185
186   This is abstracted so that it's built only once at module import time.
187
188   """
189   allowed_files = set([
190     constants.CLUSTER_CONF_FILE,
191     constants.ETC_HOSTS,
192     constants.SSH_KNOWN_HOSTS_FILE,
193     constants.VNC_PASSWORD_FILE,
194     constants.RAPI_CERT_FILE,
195     constants.RAPI_USERS_FILE,
196     constants.CONFD_HMAC_KEY,
197     constants.CLUSTER_DOMAIN_SECRET_FILE,
198     ])
199
200   for hv_name in constants.HYPER_TYPES:
201     hv_class = hypervisor.GetHypervisorClass(hv_name)
202     allowed_files.update(hv_class.GetAncillaryFiles())
203
204   return frozenset(allowed_files)
205
206
207 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
208
209
210 def JobQueuePurge():
211   """Removes job queue files and archived jobs.
212
213   @rtype: tuple
214   @return: True, None
215
216   """
217   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
218   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
219
220
221 def GetMasterInfo():
222   """Returns master information.
223
224   This is an utility function to compute master information, either
225   for consumption here or from the node daemon.
226
227   @rtype: tuple
228   @return: master_netdev, master_ip, master_name
229   @raise RPCFail: in case of errors
230
231   """
232   try:
233     cfg = _GetConfig()
234     master_netdev = cfg.GetMasterNetdev()
235     master_ip = cfg.GetMasterIP()
236     master_node = cfg.GetMasterNode()
237   except errors.ConfigurationError, err:
238     _Fail("Cluster configuration incomplete: %s", err, exc=True)
239   return (master_netdev, master_ip, master_node)
240
241
242 def StartMaster(start_daemons, no_voting):
243   """Activate local node as master node.
244
245   The function will either try activate the IP address of the master
246   (unless someone else has it) or also start the master daemons, based
247   on the start_daemons parameter.
248
249   @type start_daemons: boolean
250   @param start_daemons: whether to start the master daemons
251       (ganeti-masterd and ganeti-rapi), or (if false) activate the
252       master ip
253   @type no_voting: boolean
254   @param no_voting: whether to start ganeti-masterd without a node vote
255       (if start_daemons is True), but still non-interactively
256   @rtype: None
257
258   """
259   # GetMasterInfo will raise an exception if not able to return data
260   master_netdev, master_ip, _ = GetMasterInfo()
261
262   err_msgs = []
263   # either start the master and rapi daemons
264   if start_daemons:
265     if no_voting:
266       masterd_args = "--no-voting --yes-do-it"
267     else:
268       masterd_args = ""
269
270     env = {
271       "EXTRA_MASTERD_ARGS": masterd_args,
272       }
273
274     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
275     if result.failed:
276       msg = "Can't start Ganeti master: %s" % result.output
277       logging.error(msg)
278       err_msgs.append(msg)
279   # or activate the IP
280   else:
281     if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
282       if netutils.OwnIpAddress(master_ip):
283         # we already have the ip:
284         logging.debug("Master IP already configured, doing nothing")
285       else:
286         msg = "Someone else has the master ip, not activating"
287         logging.error(msg)
288         err_msgs.append(msg)
289     else:
290       result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
291                              "dev", master_netdev, "label",
292                              "%s:0" % master_netdev])
293       if result.failed:
294         msg = "Can't activate master IP: %s" % result.output
295         logging.error(msg)
296         err_msgs.append(msg)
297
298       result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
299                              "-s", master_ip, master_ip])
300       # we'll ignore the exit code of arping
301
302   if err_msgs:
303     _Fail("; ".join(err_msgs))
304
305
306 def StopMaster(stop_daemons):
307   """Deactivate this node as master.
308
309   The function will always try to deactivate the IP address of the
310   master. It will also stop the master daemons depending on the
311   stop_daemons parameter.
312
313   @type stop_daemons: boolean
314   @param stop_daemons: whether to also stop the master daemons
315       (ganeti-masterd and ganeti-rapi)
316   @rtype: None
317
318   """
319   # TODO: log and report back to the caller the error failures; we
320   # need to decide in which case we fail the RPC for this
321
322   # GetMasterInfo will raise an exception if not able to return data
323   master_netdev, master_ip, _ = GetMasterInfo()
324
325   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
326                          "dev", master_netdev])
327   if result.failed:
328     logging.error("Can't remove the master IP, error: %s", result.output)
329     # but otherwise ignore the failure
330
331   if stop_daemons:
332     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
333     if result.failed:
334       logging.error("Could not stop Ganeti master, command %s had exitcode %s"
335                     " and error %s",
336                     result.cmd, result.exit_code, result.output)
337
338
339 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
340   """Joins this node to the cluster.
341
342   This does the following:
343       - updates the hostkeys of the machine (rsa and dsa)
344       - adds the ssh private key to the user
345       - adds the ssh public key to the users' authorized_keys file
346
347   @type dsa: str
348   @param dsa: the DSA private key to write
349   @type dsapub: str
350   @param dsapub: the DSA public key to write
351   @type rsa: str
352   @param rsa: the RSA private key to write
353   @type rsapub: str
354   @param rsapub: the RSA public key to write
355   @type sshkey: str
356   @param sshkey: the SSH private key to write
357   @type sshpub: str
358   @param sshpub: the SSH public key to write
359   @rtype: boolean
360   @return: the success of the operation
361
362   """
363   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
364                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
365                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
366                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
367   for name, content, mode in sshd_keys:
368     utils.WriteFile(name, data=content, mode=mode)
369
370   try:
371     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
372                                                     mkdir=True)
373   except errors.OpExecError, err:
374     _Fail("Error while processing user ssh files: %s", err, exc=True)
375
376   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
377     utils.WriteFile(name, data=content, mode=0600)
378
379   utils.AddAuthorizedKey(auth_keys, sshpub)
380
381   result = utils.RunCmd([constants.DAEMON_UTIL, "reload-ssh-keys"])
382   if result.failed:
383     _Fail("Unable to reload SSH keys (command %r, exit code %s, output %r)",
384           result.cmd, result.exit_code, result.output)
385
386
387 def LeaveCluster(modify_ssh_setup):
388   """Cleans up and remove the current node.
389
390   This function cleans up and prepares the current node to be removed
391   from the cluster.
392
393   If processing is successful, then it raises an
394   L{errors.QuitGanetiException} which is used as a special case to
395   shutdown the node daemon.
396
397   @param modify_ssh_setup: boolean
398
399   """
400   _CleanDirectory(constants.DATA_DIR)
401   _CleanDirectory(constants.CRYPTO_KEYS_DIR)
402   JobQueuePurge()
403
404   if modify_ssh_setup:
405     try:
406       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
407
408       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
409
410       utils.RemoveFile(priv_key)
411       utils.RemoveFile(pub_key)
412     except errors.OpExecError:
413       logging.exception("Error while processing ssh files")
414
415   try:
416     utils.RemoveFile(constants.CONFD_HMAC_KEY)
417     utils.RemoveFile(constants.RAPI_CERT_FILE)
418     utils.RemoveFile(constants.NODED_CERT_FILE)
419   except: # pylint: disable-msg=W0702
420     logging.exception("Error while removing cluster secrets")
421
422   result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
423   if result.failed:
424     logging.error("Command %s failed with exitcode %s and error %s",
425                   result.cmd, result.exit_code, result.output)
426
427   # Raise a custom exception (handled in ganeti-noded)
428   raise errors.QuitGanetiException(True, 'Shutdown scheduled')
429
430
431 def GetNodeInfo(vgname, hypervisor_type):
432   """Gives back a hash with different information about the node.
433
434   @type vgname: C{string}
435   @param vgname: the name of the volume group to ask for disk space information
436   @type hypervisor_type: C{str}
437   @param hypervisor_type: the name of the hypervisor to ask for
438       memory information
439   @rtype: C{dict}
440   @return: dictionary with the following keys:
441       - vg_size is the size of the configured volume group in MiB
442       - vg_free is the free size of the volume group in MiB
443       - memory_dom0 is the memory allocated for domain0 in MiB
444       - memory_free is the currently available (free) ram in MiB
445       - memory_total is the total number of ram in MiB
446
447   """
448   outputarray = {}
449   vginfo = _GetVGInfo(vgname)
450   outputarray['vg_size'] = vginfo['vg_size']
451   outputarray['vg_free'] = vginfo['vg_free']
452
453   hyper = hypervisor.GetHypervisor(hypervisor_type)
454   hyp_info = hyper.GetNodeInfo()
455   if hyp_info is not None:
456     outputarray.update(hyp_info)
457
458   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
459
460   return outputarray
461
462
463 def VerifyNode(what, cluster_name):
464   """Verify the status of the local node.
465
466   Based on the input L{what} parameter, various checks are done on the
467   local node.
468
469   If the I{filelist} key is present, this list of
470   files is checksummed and the file/checksum pairs are returned.
471
472   If the I{nodelist} key is present, we check that we have
473   connectivity via ssh with the target nodes (and check the hostname
474   report).
475
476   If the I{node-net-test} key is present, we check that we have
477   connectivity to the given nodes via both primary IP and, if
478   applicable, secondary IPs.
479
480   @type what: C{dict}
481   @param what: a dictionary of things to check:
482       - filelist: list of files for which to compute checksums
483       - nodelist: list of nodes we should check ssh communication with
484       - node-net-test: list of nodes we should check node daemon port
485         connectivity with
486       - hypervisor: list with hypervisors to run the verify for
487   @rtype: dict
488   @return: a dictionary with the same keys as the input dict, and
489       values representing the result of the checks
490
491   """
492   result = {}
493   my_name = netutils.HostInfo().name
494   port = netutils.GetDaemonPort(constants.NODED)
495
496   if constants.NV_HYPERVISOR in what:
497     result[constants.NV_HYPERVISOR] = tmp = {}
498     for hv_name in what[constants.NV_HYPERVISOR]:
499       try:
500         val = hypervisor.GetHypervisor(hv_name).Verify()
501       except errors.HypervisorError, err:
502         val = "Error while checking hypervisor: %s" % str(err)
503       tmp[hv_name] = val
504
505   if constants.NV_FILELIST in what:
506     result[constants.NV_FILELIST] = utils.FingerprintFiles(
507       what[constants.NV_FILELIST])
508
509   if constants.NV_NODELIST in what:
510     result[constants.NV_NODELIST] = tmp = {}
511     random.shuffle(what[constants.NV_NODELIST])
512     for node in what[constants.NV_NODELIST]:
513       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
514       if not success:
515         tmp[node] = message
516
517   if constants.NV_NODENETTEST in what:
518     result[constants.NV_NODENETTEST] = tmp = {}
519     my_pip = my_sip = None
520     for name, pip, sip in what[constants.NV_NODENETTEST]:
521       if name == my_name:
522         my_pip = pip
523         my_sip = sip
524         break
525     if not my_pip:
526       tmp[my_name] = ("Can't find my own primary/secondary IP"
527                       " in the node list")
528     else:
529       for name, pip, sip in what[constants.NV_NODENETTEST]:
530         fail = []
531         if not netutils.TcpPing(pip, port, source=my_pip):
532           fail.append("primary")
533         if sip != pip:
534           if not netutils.TcpPing(sip, port, source=my_sip):
535             fail.append("secondary")
536         if fail:
537           tmp[name] = ("failure using the %s interface(s)" %
538                        " and ".join(fail))
539
540   if constants.NV_MASTERIP in what:
541     # FIXME: add checks on incoming data structures (here and in the
542     # rest of the function)
543     master_name, master_ip = what[constants.NV_MASTERIP]
544     if master_name == my_name:
545       source = constants.IP4_ADDRESS_LOCALHOST
546     else:
547       source = None
548     result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
549                                                   source=source)
550
551   if constants.NV_LVLIST in what:
552     try:
553       val = GetVolumeList(what[constants.NV_LVLIST])
554     except RPCFail, err:
555       val = str(err)
556     result[constants.NV_LVLIST] = val
557
558   if constants.NV_INSTANCELIST in what:
559     # GetInstanceList can fail
560     try:
561       val = GetInstanceList(what[constants.NV_INSTANCELIST])
562     except RPCFail, err:
563       val = str(err)
564     result[constants.NV_INSTANCELIST] = val
565
566   if constants.NV_VGLIST in what:
567     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
568
569   if constants.NV_PVLIST in what:
570     result[constants.NV_PVLIST] = \
571       bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
572                                    filter_allocatable=False)
573
574   if constants.NV_VERSION in what:
575     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
576                                     constants.RELEASE_VERSION)
577
578   if constants.NV_HVINFO in what:
579     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
580     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
581
582   if constants.NV_DRBDLIST in what:
583     try:
584       used_minors = bdev.DRBD8.GetUsedDevs().keys()
585     except errors.BlockDeviceError, err:
586       logging.warning("Can't get used minors list", exc_info=True)
587       used_minors = str(err)
588     result[constants.NV_DRBDLIST] = used_minors
589
590   if constants.NV_DRBDHELPER in what:
591     status = True
592     try:
593       payload = bdev.BaseDRBD.GetUsermodeHelper()
594     except errors.BlockDeviceError, err:
595       logging.error("Can't get DRBD usermode helper: %s", str(err))
596       status = False
597       payload = str(err)
598     result[constants.NV_DRBDHELPER] = (status, payload)
599
600   if constants.NV_NODESETUP in what:
601     result[constants.NV_NODESETUP] = tmpr = []
602     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
603       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
604                   " under /sys, missing required directories /sys/block"
605                   " and /sys/class/net")
606     if (not os.path.isdir("/proc/sys") or
607         not os.path.isfile("/proc/sysrq-trigger")):
608       tmpr.append("The procfs filesystem doesn't seem to be mounted"
609                   " under /proc, missing required directory /proc/sys and"
610                   " the file /proc/sysrq-trigger")
611
612   if constants.NV_TIME in what:
613     result[constants.NV_TIME] = utils.SplitTime(time.time())
614
615   if constants.NV_OSLIST in what:
616     result[constants.NV_OSLIST] = DiagnoseOS()
617
618   return result
619
620
621 def GetVolumeList(vg_name):
622   """Compute list of logical volumes and their size.
623
624   @type vg_name: str
625   @param vg_name: the volume group whose LVs we should list
626   @rtype: dict
627   @return:
628       dictionary of all partions (key) with value being a tuple of
629       their size (in MiB), inactive and online status::
630
631         {'test1': ('20.06', True, True)}
632
633       in case of errors, a string is returned with the error
634       details.
635
636   """
637   lvs = {}
638   sep = '|'
639   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
640                          "--separator=%s" % sep,
641                          "-olv_name,lv_size,lv_attr", vg_name])
642   if result.failed:
643     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
644
645   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
646   for line in result.stdout.splitlines():
647     line = line.strip()
648     match = valid_line_re.match(line)
649     if not match:
650       logging.error("Invalid line returned from lvs output: '%s'", line)
651       continue
652     name, size, attr = match.groups()
653     inactive = attr[4] == '-'
654     online = attr[5] == 'o'
655     virtual = attr[0] == 'v'
656     if virtual:
657       # we don't want to report such volumes as existing, since they
658       # don't really hold data
659       continue
660     lvs[name] = (size, inactive, online)
661
662   return lvs
663
664
665 def ListVolumeGroups():
666   """List the volume groups and their size.
667
668   @rtype: dict
669   @return: dictionary with keys volume name and values the
670       size of the volume
671
672   """
673   return utils.ListVolumeGroups()
674
675
676 def NodeVolumes():
677   """List all volumes on this node.
678
679   @rtype: list
680   @return:
681     A list of dictionaries, each having four keys:
682       - name: the logical volume name,
683       - size: the size of the logical volume
684       - dev: the physical device on which the LV lives
685       - vg: the volume group to which it belongs
686
687     In case of errors, we return an empty list and log the
688     error.
689
690     Note that since a logical volume can live on multiple physical
691     volumes, the resulting list might include a logical volume
692     multiple times.
693
694   """
695   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
696                          "--separator=|",
697                          "--options=lv_name,lv_size,devices,vg_name"])
698   if result.failed:
699     _Fail("Failed to list logical volumes, lvs output: %s",
700           result.output)
701
702   def parse_dev(dev):
703     return dev.split('(')[0]
704
705   def handle_dev(dev):
706     return [parse_dev(x) for x in dev.split(",")]
707
708   def map_line(line):
709     line = [v.strip() for v in line]
710     return [{'name': line[0], 'size': line[1],
711              'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
712
713   all_devs = []
714   for line in result.stdout.splitlines():
715     if line.count('|') >= 3:
716       all_devs.extend(map_line(line.split('|')))
717     else:
718       logging.warning("Strange line in the output from lvs: '%s'", line)
719   return all_devs
720
721
722 def BridgesExist(bridges_list):
723   """Check if a list of bridges exist on the current node.
724
725   @rtype: boolean
726   @return: C{True} if all of them exist, C{False} otherwise
727
728   """
729   missing = []
730   for bridge in bridges_list:
731     if not utils.BridgeExists(bridge):
732       missing.append(bridge)
733
734   if missing:
735     _Fail("Missing bridges %s", utils.CommaJoin(missing))
736
737
738 def GetInstanceList(hypervisor_list):
739   """Provides a list of instances.
740
741   @type hypervisor_list: list
742   @param hypervisor_list: the list of hypervisors to query information
743
744   @rtype: list
745   @return: a list of all running instances on the current node
746     - instance1.example.com
747     - instance2.example.com
748
749   """
750   results = []
751   for hname in hypervisor_list:
752     try:
753       names = hypervisor.GetHypervisor(hname).ListInstances()
754       results.extend(names)
755     except errors.HypervisorError, err:
756       _Fail("Error enumerating instances (hypervisor %s): %s",
757             hname, err, exc=True)
758
759   return results
760
761
762 def GetInstanceInfo(instance, hname):
763   """Gives back the information about an instance as a dictionary.
764
765   @type instance: string
766   @param instance: the instance name
767   @type hname: string
768   @param hname: the hypervisor type of the instance
769
770   @rtype: dict
771   @return: dictionary with the following keys:
772       - memory: memory size of instance (int)
773       - state: xen state of instance (string)
774       - time: cpu time of instance (float)
775
776   """
777   output = {}
778
779   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
780   if iinfo is not None:
781     output['memory'] = iinfo[2]
782     output['state'] = iinfo[4]
783     output['time'] = iinfo[5]
784
785   return output
786
787
788 def GetInstanceMigratable(instance):
789   """Gives whether an instance can be migrated.
790
791   @type instance: L{objects.Instance}
792   @param instance: object representing the instance to be checked.
793
794   @rtype: tuple
795   @return: tuple of (result, description) where:
796       - result: whether the instance can be migrated or not
797       - description: a description of the issue, if relevant
798
799   """
800   hyper = hypervisor.GetHypervisor(instance.hypervisor)
801   iname = instance.name
802   if iname not in hyper.ListInstances():
803     _Fail("Instance %s is not running", iname)
804
805   for idx in range(len(instance.disks)):
806     link_name = _GetBlockDevSymlinkPath(iname, idx)
807     if not os.path.islink(link_name):
808       _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
809
810
811 def GetAllInstancesInfo(hypervisor_list):
812   """Gather data about all instances.
813
814   This is the equivalent of L{GetInstanceInfo}, except that it
815   computes data for all instances at once, thus being faster if one
816   needs data about more than one instance.
817
818   @type hypervisor_list: list
819   @param hypervisor_list: list of hypervisors to query for instance data
820
821   @rtype: dict
822   @return: dictionary of instance: data, with data having the following keys:
823       - memory: memory size of instance (int)
824       - state: xen state of instance (string)
825       - time: cpu time of instance (float)
826       - vcpus: the number of vcpus
827
828   """
829   output = {}
830
831   for hname in hypervisor_list:
832     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
833     if iinfo:
834       for name, _, memory, vcpus, state, times in iinfo:
835         value = {
836           'memory': memory,
837           'vcpus': vcpus,
838           'state': state,
839           'time': times,
840           }
841         if name in output:
842           # we only check static parameters, like memory and vcpus,
843           # and not state and time which can change between the
844           # invocations of the different hypervisors
845           for key in 'memory', 'vcpus':
846             if value[key] != output[name][key]:
847               _Fail("Instance %s is running twice"
848                     " with different parameters", name)
849         output[name] = value
850
851   return output
852
853
854 def _InstanceLogName(kind, os_name, instance):
855   """Compute the OS log filename for a given instance and operation.
856
857   The instance name and os name are passed in as strings since not all
858   operations have these as part of an instance object.
859
860   @type kind: string
861   @param kind: the operation type (e.g. add, import, etc.)
862   @type os_name: string
863   @param os_name: the os name
864   @type instance: string
865   @param instance: the name of the instance being imported/added/etc.
866
867   """
868   # TODO: Use tempfile.mkstemp to create unique filename
869   base = ("%s-%s-%s-%s.log" %
870           (kind, os_name, instance, utils.TimestampForFilename()))
871   return utils.PathJoin(constants.LOG_OS_DIR, base)
872
873
874 def InstanceOsAdd(instance, reinstall, debug):
875   """Add an OS to an instance.
876
877   @type instance: L{objects.Instance}
878   @param instance: Instance whose OS is to be installed
879   @type reinstall: boolean
880   @param reinstall: whether this is an instance reinstall
881   @type debug: integer
882   @param debug: debug level, passed to the OS scripts
883   @rtype: None
884
885   """
886   inst_os = OSFromDisk(instance.os)
887
888   create_env = OSEnvironment(instance, inst_os, debug)
889   if reinstall:
890     create_env['INSTANCE_REINSTALL'] = "1"
891
892   logfile = _InstanceLogName("add", instance.os, instance.name)
893
894   result = utils.RunCmd([inst_os.create_script], env=create_env,
895                         cwd=inst_os.path, output=logfile,)
896   if result.failed:
897     logging.error("os create command '%s' returned error: %s, logfile: %s,"
898                   " output: %s", result.cmd, result.fail_reason, logfile,
899                   result.output)
900     lines = [utils.SafeEncode(val)
901              for val in utils.TailFile(logfile, lines=20)]
902     _Fail("OS create script failed (%s), last lines in the"
903           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
904
905
906 def RunRenameInstance(instance, old_name, debug):
907   """Run the OS rename script for an instance.
908
909   @type instance: L{objects.Instance}
910   @param instance: Instance whose OS is to be installed
911   @type old_name: string
912   @param old_name: previous instance name
913   @type debug: integer
914   @param debug: debug level, passed to the OS scripts
915   @rtype: boolean
916   @return: the success of the operation
917
918   """
919   inst_os = OSFromDisk(instance.os)
920
921   rename_env = OSEnvironment(instance, inst_os, debug)
922   rename_env['OLD_INSTANCE_NAME'] = old_name
923
924   logfile = _InstanceLogName("rename", instance.os,
925                              "%s-%s" % (old_name, instance.name))
926
927   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
928                         cwd=inst_os.path, output=logfile)
929
930   if result.failed:
931     logging.error("os create command '%s' returned error: %s output: %s",
932                   result.cmd, result.fail_reason, result.output)
933     lines = [utils.SafeEncode(val)
934              for val in utils.TailFile(logfile, lines=20)]
935     _Fail("OS rename script failed (%s), last lines in the"
936           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
937
938
939 def _GetVGInfo(vg_name):
940   """Get information about the volume group.
941
942   @type vg_name: str
943   @param vg_name: the volume group which we query
944   @rtype: dict
945   @return:
946     A dictionary with the following keys:
947       - C{vg_size} is the total size of the volume group in MiB
948       - C{vg_free} is the free size of the volume group in MiB
949       - C{pv_count} are the number of physical disks in that VG
950
951     If an error occurs during gathering of data, we return the same dict
952     with keys all set to None.
953
954   """
955   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
956
957   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
958                          "--nosuffix", "--units=m", "--separator=:", vg_name])
959
960   if retval.failed:
961     logging.error("volume group %s not present", vg_name)
962     return retdic
963   valarr = retval.stdout.strip().rstrip(':').split(':')
964   if len(valarr) == 3:
965     try:
966       retdic = {
967         "vg_size": int(round(float(valarr[0]), 0)),
968         "vg_free": int(round(float(valarr[1]), 0)),
969         "pv_count": int(valarr[2]),
970         }
971     except (TypeError, ValueError), err:
972       logging.exception("Fail to parse vgs output: %s", err)
973   else:
974     logging.error("vgs output has the wrong number of fields (expected"
975                   " three): %s", str(valarr))
976   return retdic
977
978
979 def _GetBlockDevSymlinkPath(instance_name, idx):
980   return utils.PathJoin(constants.DISK_LINKS_DIR,
981                         "%s:%d" % (instance_name, idx))
982
983
984 def _SymlinkBlockDev(instance_name, device_path, idx):
985   """Set up symlinks to a instance's block device.
986
987   This is an auxiliary function run when an instance is start (on the primary
988   node) or when an instance is migrated (on the target node).
989
990
991   @param instance_name: the name of the target instance
992   @param device_path: path of the physical block device, on the node
993   @param idx: the disk index
994   @return: absolute path to the disk's symlink
995
996   """
997   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
998   try:
999     os.symlink(device_path, link_name)
1000   except OSError, err:
1001     if err.errno == errno.EEXIST:
1002       if (not os.path.islink(link_name) or
1003           os.readlink(link_name) != device_path):
1004         os.remove(link_name)
1005         os.symlink(device_path, link_name)
1006     else:
1007       raise
1008
1009   return link_name
1010
1011
1012 def _RemoveBlockDevLinks(instance_name, disks):
1013   """Remove the block device symlinks belonging to the given instance.
1014
1015   """
1016   for idx, _ in enumerate(disks):
1017     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1018     if os.path.islink(link_name):
1019       try:
1020         os.remove(link_name)
1021       except OSError:
1022         logging.exception("Can't remove symlink '%s'", link_name)
1023
1024
1025 def _GatherAndLinkBlockDevs(instance):
1026   """Set up an instance's block device(s).
1027
1028   This is run on the primary node at instance startup. The block
1029   devices must be already assembled.
1030
1031   @type instance: L{objects.Instance}
1032   @param instance: the instance whose disks we shoul assemble
1033   @rtype: list
1034   @return: list of (disk_object, device_path)
1035
1036   """
1037   block_devices = []
1038   for idx, disk in enumerate(instance.disks):
1039     device = _RecursiveFindBD(disk)
1040     if device is None:
1041       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1042                                     str(disk))
1043     device.Open()
1044     try:
1045       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1046     except OSError, e:
1047       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1048                                     e.strerror)
1049
1050     block_devices.append((disk, link_name))
1051
1052   return block_devices
1053
1054
1055 def StartInstance(instance):
1056   """Start an instance.
1057
1058   @type instance: L{objects.Instance}
1059   @param instance: the instance object
1060   @rtype: None
1061
1062   """
1063   running_instances = GetInstanceList([instance.hypervisor])
1064
1065   if instance.name in running_instances:
1066     logging.info("Instance %s already running, not starting", instance.name)
1067     return
1068
1069   try:
1070     block_devices = _GatherAndLinkBlockDevs(instance)
1071     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1072     hyper.StartInstance(instance, block_devices)
1073   except errors.BlockDeviceError, err:
1074     _Fail("Block device error: %s", err, exc=True)
1075   except errors.HypervisorError, err:
1076     _RemoveBlockDevLinks(instance.name, instance.disks)
1077     _Fail("Hypervisor error: %s", err, exc=True)
1078
1079
1080 def InstanceShutdown(instance, timeout):
1081   """Shut an instance down.
1082
1083   @note: this functions uses polling with a hardcoded timeout.
1084
1085   @type instance: L{objects.Instance}
1086   @param instance: the instance object
1087   @type timeout: integer
1088   @param timeout: maximum timeout for soft shutdown
1089   @rtype: None
1090
1091   """
1092   hv_name = instance.hypervisor
1093   hyper = hypervisor.GetHypervisor(hv_name)
1094   iname = instance.name
1095
1096   if instance.name not in hyper.ListInstances():
1097     logging.info("Instance %s not running, doing nothing", iname)
1098     return
1099
1100   class _TryShutdown:
1101     def __init__(self):
1102       self.tried_once = False
1103
1104     def __call__(self):
1105       if iname not in hyper.ListInstances():
1106         return
1107
1108       try:
1109         hyper.StopInstance(instance, retry=self.tried_once)
1110       except errors.HypervisorError, err:
1111         if iname not in hyper.ListInstances():
1112           # if the instance is no longer existing, consider this a
1113           # success and go to cleanup
1114           return
1115
1116         _Fail("Failed to stop instance %s: %s", iname, err)
1117
1118       self.tried_once = True
1119
1120       raise utils.RetryAgain()
1121
1122   try:
1123     utils.Retry(_TryShutdown(), 5, timeout)
1124   except utils.RetryTimeout:
1125     # the shutdown did not succeed
1126     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1127
1128     try:
1129       hyper.StopInstance(instance, force=True)
1130     except errors.HypervisorError, err:
1131       if iname in hyper.ListInstances():
1132         # only raise an error if the instance still exists, otherwise
1133         # the error could simply be "instance ... unknown"!
1134         _Fail("Failed to force stop instance %s: %s", iname, err)
1135
1136     time.sleep(1)
1137
1138     if iname in hyper.ListInstances():
1139       _Fail("Could not shutdown instance %s even by destroy", iname)
1140
1141   try:
1142     hyper.CleanupInstance(instance.name)
1143   except errors.HypervisorError, err:
1144     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1145
1146   _RemoveBlockDevLinks(iname, instance.disks)
1147
1148
1149 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1150   """Reboot an instance.
1151
1152   @type instance: L{objects.Instance}
1153   @param instance: the instance object to reboot
1154   @type reboot_type: str
1155   @param reboot_type: the type of reboot, one the following
1156     constants:
1157       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1158         instance OS, do not recreate the VM
1159       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1160         restart the VM (at the hypervisor level)
1161       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1162         not accepted here, since that mode is handled differently, in
1163         cmdlib, and translates into full stop and start of the
1164         instance (instead of a call_instance_reboot RPC)
1165   @type shutdown_timeout: integer
1166   @param shutdown_timeout: maximum timeout for soft shutdown
1167   @rtype: None
1168
1169   """
1170   running_instances = GetInstanceList([instance.hypervisor])
1171
1172   if instance.name not in running_instances:
1173     _Fail("Cannot reboot instance %s that is not running", instance.name)
1174
1175   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1176   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1177     try:
1178       hyper.RebootInstance(instance)
1179     except errors.HypervisorError, err:
1180       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1181   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1182     try:
1183       InstanceShutdown(instance, shutdown_timeout)
1184       return StartInstance(instance)
1185     except errors.HypervisorError, err:
1186       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1187   else:
1188     _Fail("Invalid reboot_type received: %s", reboot_type)
1189
1190
1191 def MigrationInfo(instance):
1192   """Gather information about an instance to be migrated.
1193
1194   @type instance: L{objects.Instance}
1195   @param instance: the instance definition
1196
1197   """
1198   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1199   try:
1200     info = hyper.MigrationInfo(instance)
1201   except errors.HypervisorError, err:
1202     _Fail("Failed to fetch migration information: %s", err, exc=True)
1203   return info
1204
1205
1206 def AcceptInstance(instance, info, target):
1207   """Prepare the node to accept an instance.
1208
1209   @type instance: L{objects.Instance}
1210   @param instance: the instance definition
1211   @type info: string/data (opaque)
1212   @param info: migration information, from the source node
1213   @type target: string
1214   @param target: target host (usually ip), on this node
1215
1216   """
1217   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1218   try:
1219     hyper.AcceptInstance(instance, info, target)
1220   except errors.HypervisorError, err:
1221     _Fail("Failed to accept instance: %s", err, exc=True)
1222
1223
1224 def FinalizeMigration(instance, info, success):
1225   """Finalize any preparation to accept an instance.
1226
1227   @type instance: L{objects.Instance}
1228   @param instance: the instance definition
1229   @type info: string/data (opaque)
1230   @param info: migration information, from the source node
1231   @type success: boolean
1232   @param success: whether the migration was a success or a failure
1233
1234   """
1235   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1236   try:
1237     hyper.FinalizeMigration(instance, info, success)
1238   except errors.HypervisorError, err:
1239     _Fail("Failed to finalize migration: %s", err, exc=True)
1240
1241
1242 def MigrateInstance(instance, target, live):
1243   """Migrates an instance to another node.
1244
1245   @type instance: L{objects.Instance}
1246   @param instance: the instance definition
1247   @type target: string
1248   @param target: the target node name
1249   @type live: boolean
1250   @param live: whether the migration should be done live or not (the
1251       interpretation of this parameter is left to the hypervisor)
1252   @rtype: tuple
1253   @return: a tuple of (success, msg) where:
1254       - succes is a boolean denoting the success/failure of the operation
1255       - msg is a string with details in case of failure
1256
1257   """
1258   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1259
1260   try:
1261     hyper.MigrateInstance(instance, target, live)
1262   except errors.HypervisorError, err:
1263     _Fail("Failed to migrate instance: %s", err, exc=True)
1264
1265
1266 def BlockdevCreate(disk, size, owner, on_primary, info):
1267   """Creates a block device for an instance.
1268
1269   @type disk: L{objects.Disk}
1270   @param disk: the object describing the disk we should create
1271   @type size: int
1272   @param size: the size of the physical underlying device, in MiB
1273   @type owner: str
1274   @param owner: the name of the instance for which disk is created,
1275       used for device cache data
1276   @type on_primary: boolean
1277   @param on_primary:  indicates if it is the primary node or not
1278   @type info: string
1279   @param info: string that will be sent to the physical device
1280       creation, used for example to set (LVM) tags on LVs
1281
1282   @return: the new unique_id of the device (this can sometime be
1283       computed only after creation), or None. On secondary nodes,
1284       it's not required to return anything.
1285
1286   """
1287   # TODO: remove the obsolete 'size' argument
1288   # pylint: disable-msg=W0613
1289   clist = []
1290   if disk.children:
1291     for child in disk.children:
1292       try:
1293         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1294       except errors.BlockDeviceError, err:
1295         _Fail("Can't assemble device %s: %s", child, err)
1296       if on_primary or disk.AssembleOnSecondary():
1297         # we need the children open in case the device itself has to
1298         # be assembled
1299         try:
1300           # pylint: disable-msg=E1103
1301           crdev.Open()
1302         except errors.BlockDeviceError, err:
1303           _Fail("Can't make child '%s' read-write: %s", child, err)
1304       clist.append(crdev)
1305
1306   try:
1307     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1308   except errors.BlockDeviceError, err:
1309     _Fail("Can't create block device: %s", err)
1310
1311   if on_primary or disk.AssembleOnSecondary():
1312     try:
1313       device.Assemble()
1314     except errors.BlockDeviceError, err:
1315       _Fail("Can't assemble device after creation, unusual event: %s", err)
1316     device.SetSyncSpeed(constants.SYNC_SPEED)
1317     if on_primary or disk.OpenOnSecondary():
1318       try:
1319         device.Open(force=True)
1320       except errors.BlockDeviceError, err:
1321         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1322     DevCacheManager.UpdateCache(device.dev_path, owner,
1323                                 on_primary, disk.iv_name)
1324
1325   device.SetInfo(info)
1326
1327   return device.unique_id
1328
1329
1330 def BlockdevRemove(disk):
1331   """Remove a block device.
1332
1333   @note: This is intended to be called recursively.
1334
1335   @type disk: L{objects.Disk}
1336   @param disk: the disk object we should remove
1337   @rtype: boolean
1338   @return: the success of the operation
1339
1340   """
1341   msgs = []
1342   try:
1343     rdev = _RecursiveFindBD(disk)
1344   except errors.BlockDeviceError, err:
1345     # probably can't attach
1346     logging.info("Can't attach to device %s in remove", disk)
1347     rdev = None
1348   if rdev is not None:
1349     r_path = rdev.dev_path
1350     try:
1351       rdev.Remove()
1352     except errors.BlockDeviceError, err:
1353       msgs.append(str(err))
1354     if not msgs:
1355       DevCacheManager.RemoveCache(r_path)
1356
1357   if disk.children:
1358     for child in disk.children:
1359       try:
1360         BlockdevRemove(child)
1361       except RPCFail, err:
1362         msgs.append(str(err))
1363
1364   if msgs:
1365     _Fail("; ".join(msgs))
1366
1367
1368 def _RecursiveAssembleBD(disk, owner, as_primary):
1369   """Activate a block device for an instance.
1370
1371   This is run on the primary and secondary nodes for an instance.
1372
1373   @note: this function is called recursively.
1374
1375   @type disk: L{objects.Disk}
1376   @param disk: the disk we try to assemble
1377   @type owner: str
1378   @param owner: the name of the instance which owns the disk
1379   @type as_primary: boolean
1380   @param as_primary: if we should make the block device
1381       read/write
1382
1383   @return: the assembled device or None (in case no device
1384       was assembled)
1385   @raise errors.BlockDeviceError: in case there is an error
1386       during the activation of the children or the device
1387       itself
1388
1389   """
1390   children = []
1391   if disk.children:
1392     mcn = disk.ChildrenNeeded()
1393     if mcn == -1:
1394       mcn = 0 # max number of Nones allowed
1395     else:
1396       mcn = len(disk.children) - mcn # max number of Nones
1397     for chld_disk in disk.children:
1398       try:
1399         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1400       except errors.BlockDeviceError, err:
1401         if children.count(None) >= mcn:
1402           raise
1403         cdev = None
1404         logging.error("Error in child activation (but continuing): %s",
1405                       str(err))
1406       children.append(cdev)
1407
1408   if as_primary or disk.AssembleOnSecondary():
1409     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1410     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1411     result = r_dev
1412     if as_primary or disk.OpenOnSecondary():
1413       r_dev.Open()
1414     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1415                                 as_primary, disk.iv_name)
1416
1417   else:
1418     result = True
1419   return result
1420
1421
1422 def BlockdevAssemble(disk, owner, as_primary):
1423   """Activate a block device for an instance.
1424
1425   This is a wrapper over _RecursiveAssembleBD.
1426
1427   @rtype: str or boolean
1428   @return: a C{/dev/...} path for primary nodes, and
1429       C{True} for secondary nodes
1430
1431   """
1432   try:
1433     result = _RecursiveAssembleBD(disk, owner, as_primary)
1434     if isinstance(result, bdev.BlockDev):
1435       # pylint: disable-msg=E1103
1436       result = result.dev_path
1437   except errors.BlockDeviceError, err:
1438     _Fail("Error while assembling disk: %s", err, exc=True)
1439
1440   return result
1441
1442
1443 def BlockdevShutdown(disk):
1444   """Shut down a block device.
1445
1446   First, if the device is assembled (Attach() is successful), then
1447   the device is shutdown. Then the children of the device are
1448   shutdown.
1449
1450   This function is called recursively. Note that we don't cache the
1451   children or such, as oppossed to assemble, shutdown of different
1452   devices doesn't require that the upper device was active.
1453
1454   @type disk: L{objects.Disk}
1455   @param disk: the description of the disk we should
1456       shutdown
1457   @rtype: None
1458
1459   """
1460   msgs = []
1461   r_dev = _RecursiveFindBD(disk)
1462   if r_dev is not None:
1463     r_path = r_dev.dev_path
1464     try:
1465       r_dev.Shutdown()
1466       DevCacheManager.RemoveCache(r_path)
1467     except errors.BlockDeviceError, err:
1468       msgs.append(str(err))
1469
1470   if disk.children:
1471     for child in disk.children:
1472       try:
1473         BlockdevShutdown(child)
1474       except RPCFail, err:
1475         msgs.append(str(err))
1476
1477   if msgs:
1478     _Fail("; ".join(msgs))
1479
1480
1481 def BlockdevAddchildren(parent_cdev, new_cdevs):
1482   """Extend a mirrored block device.
1483
1484   @type parent_cdev: L{objects.Disk}
1485   @param parent_cdev: the disk to which we should add children
1486   @type new_cdevs: list of L{objects.Disk}
1487   @param new_cdevs: the list of children which we should add
1488   @rtype: None
1489
1490   """
1491   parent_bdev = _RecursiveFindBD(parent_cdev)
1492   if parent_bdev is None:
1493     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1494   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1495   if new_bdevs.count(None) > 0:
1496     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1497   parent_bdev.AddChildren(new_bdevs)
1498
1499
1500 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1501   """Shrink a mirrored block device.
1502
1503   @type parent_cdev: L{objects.Disk}
1504   @param parent_cdev: the disk from which we should remove children
1505   @type new_cdevs: list of L{objects.Disk}
1506   @param new_cdevs: the list of children which we should remove
1507   @rtype: None
1508
1509   """
1510   parent_bdev = _RecursiveFindBD(parent_cdev)
1511   if parent_bdev is None:
1512     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1513   devs = []
1514   for disk in new_cdevs:
1515     rpath = disk.StaticDevPath()
1516     if rpath is None:
1517       bd = _RecursiveFindBD(disk)
1518       if bd is None:
1519         _Fail("Can't find device %s while removing children", disk)
1520       else:
1521         devs.append(bd.dev_path)
1522     else:
1523       if not utils.IsNormAbsPath(rpath):
1524         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1525       devs.append(rpath)
1526   parent_bdev.RemoveChildren(devs)
1527
1528
1529 def BlockdevGetmirrorstatus(disks):
1530   """Get the mirroring status of a list of devices.
1531
1532   @type disks: list of L{objects.Disk}
1533   @param disks: the list of disks which we should query
1534   @rtype: disk
1535   @return:
1536       a list of (mirror_done, estimated_time) tuples, which
1537       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1538   @raise errors.BlockDeviceError: if any of the disks cannot be
1539       found
1540
1541   """
1542   stats = []
1543   for dsk in disks:
1544     rbd = _RecursiveFindBD(dsk)
1545     if rbd is None:
1546       _Fail("Can't find device %s", dsk)
1547
1548     stats.append(rbd.CombinedSyncStatus())
1549
1550   return stats
1551
1552
1553 def _RecursiveFindBD(disk):
1554   """Check if a device is activated.
1555
1556   If so, return information about the real device.
1557
1558   @type disk: L{objects.Disk}
1559   @param disk: the disk object we need to find
1560
1561   @return: None if the device can't be found,
1562       otherwise the device instance
1563
1564   """
1565   children = []
1566   if disk.children:
1567     for chdisk in disk.children:
1568       children.append(_RecursiveFindBD(chdisk))
1569
1570   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1571
1572
1573 def _OpenRealBD(disk):
1574   """Opens the underlying block device of a disk.
1575
1576   @type disk: L{objects.Disk}
1577   @param disk: the disk object we want to open
1578
1579   """
1580   real_disk = _RecursiveFindBD(disk)
1581   if real_disk is None:
1582     _Fail("Block device '%s' is not set up", disk)
1583
1584   real_disk.Open()
1585
1586   return real_disk
1587
1588
1589 def BlockdevFind(disk):
1590   """Check if a device is activated.
1591
1592   If it is, return information about the real device.
1593
1594   @type disk: L{objects.Disk}
1595   @param disk: the disk to find
1596   @rtype: None or objects.BlockDevStatus
1597   @return: None if the disk cannot be found, otherwise a the current
1598            information
1599
1600   """
1601   try:
1602     rbd = _RecursiveFindBD(disk)
1603   except errors.BlockDeviceError, err:
1604     _Fail("Failed to find device: %s", err, exc=True)
1605
1606   if rbd is None:
1607     return None
1608
1609   return rbd.GetSyncStatus()
1610
1611
1612 def BlockdevGetsize(disks):
1613   """Computes the size of the given disks.
1614
1615   If a disk is not found, returns None instead.
1616
1617   @type disks: list of L{objects.Disk}
1618   @param disks: the list of disk to compute the size for
1619   @rtype: list
1620   @return: list with elements None if the disk cannot be found,
1621       otherwise the size
1622
1623   """
1624   result = []
1625   for cf in disks:
1626     try:
1627       rbd = _RecursiveFindBD(cf)
1628     except errors.BlockDeviceError:
1629       result.append(None)
1630       continue
1631     if rbd is None:
1632       result.append(None)
1633     else:
1634       result.append(rbd.GetActualSize())
1635   return result
1636
1637
1638 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1639   """Export a block device to a remote node.
1640
1641   @type disk: L{objects.Disk}
1642   @param disk: the description of the disk to export
1643   @type dest_node: str
1644   @param dest_node: the destination node to export to
1645   @type dest_path: str
1646   @param dest_path: the destination path on the target node
1647   @type cluster_name: str
1648   @param cluster_name: the cluster name, needed for SSH hostalias
1649   @rtype: None
1650
1651   """
1652   real_disk = _OpenRealBD(disk)
1653
1654   # the block size on the read dd is 1MiB to match our units
1655   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1656                                "dd if=%s bs=1048576 count=%s",
1657                                real_disk.dev_path, str(disk.size))
1658
1659   # we set here a smaller block size as, due to ssh buffering, more
1660   # than 64-128k will mostly ignored; we use nocreat to fail if the
1661   # device is not already there or we pass a wrong path; we use
1662   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1663   # to not buffer too much memory; this means that at best, we flush
1664   # every 64k, which will not be very fast
1665   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1666                                 " oflag=dsync", dest_path)
1667
1668   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1669                                                    constants.GANETI_RUNAS,
1670                                                    destcmd)
1671
1672   # all commands have been checked, so we're safe to combine them
1673   command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1674
1675   result = utils.RunCmd(["bash", "-c", command])
1676
1677   if result.failed:
1678     _Fail("Disk copy command '%s' returned error: %s"
1679           " output: %s", command, result.fail_reason, result.output)
1680
1681
1682 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1683   """Write a file to the filesystem.
1684
1685   This allows the master to overwrite(!) a file. It will only perform
1686   the operation if the file belongs to a list of configuration files.
1687
1688   @type file_name: str
1689   @param file_name: the target file name
1690   @type data: str
1691   @param data: the new contents of the file
1692   @type mode: int
1693   @param mode: the mode to give the file (can be None)
1694   @type uid: int
1695   @param uid: the owner of the file (can be -1 for default)
1696   @type gid: int
1697   @param gid: the group of the file (can be -1 for default)
1698   @type atime: float
1699   @param atime: the atime to set on the file (can be None)
1700   @type mtime: float
1701   @param mtime: the mtime to set on the file (can be None)
1702   @rtype: None
1703
1704   """
1705   if not os.path.isabs(file_name):
1706     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1707
1708   if file_name not in _ALLOWED_UPLOAD_FILES:
1709     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1710           file_name)
1711
1712   raw_data = _Decompress(data)
1713
1714   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1715                   atime=atime, mtime=mtime)
1716
1717
1718 def WriteSsconfFiles(values):
1719   """Update all ssconf files.
1720
1721   Wrapper around the SimpleStore.WriteFiles.
1722
1723   """
1724   ssconf.SimpleStore().WriteFiles(values)
1725
1726
1727 def _ErrnoOrStr(err):
1728   """Format an EnvironmentError exception.
1729
1730   If the L{err} argument has an errno attribute, it will be looked up
1731   and converted into a textual C{E...} description. Otherwise the
1732   string representation of the error will be returned.
1733
1734   @type err: L{EnvironmentError}
1735   @param err: the exception to format
1736
1737   """
1738   if hasattr(err, 'errno'):
1739     detail = errno.errorcode[err.errno]
1740   else:
1741     detail = str(err)
1742   return detail
1743
1744
1745 def _OSOndiskAPIVersion(os_dir):
1746   """Compute and return the API version of a given OS.
1747
1748   This function will try to read the API version of the OS residing in
1749   the 'os_dir' directory.
1750
1751   @type os_dir: str
1752   @param os_dir: the directory in which we should look for the OS
1753   @rtype: tuple
1754   @return: tuple (status, data) with status denoting the validity and
1755       data holding either the vaid versions or an error message
1756
1757   """
1758   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1759
1760   try:
1761     st = os.stat(api_file)
1762   except EnvironmentError, err:
1763     return False, ("Required file '%s' not found under path %s: %s" %
1764                    (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1765
1766   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1767     return False, ("File '%s' in %s is not a regular file" %
1768                    (constants.OS_API_FILE, os_dir))
1769
1770   try:
1771     api_versions = utils.ReadFile(api_file).splitlines()
1772   except EnvironmentError, err:
1773     return False, ("Error while reading the API version file at %s: %s" %
1774                    (api_file, _ErrnoOrStr(err)))
1775
1776   try:
1777     api_versions = [int(version.strip()) for version in api_versions]
1778   except (TypeError, ValueError), err:
1779     return False, ("API version(s) can't be converted to integer: %s" %
1780                    str(err))
1781
1782   return True, api_versions
1783
1784
1785 def DiagnoseOS(top_dirs=None):
1786   """Compute the validity for all OSes.
1787
1788   @type top_dirs: list
1789   @param top_dirs: the list of directories in which to
1790       search (if not given defaults to
1791       L{constants.OS_SEARCH_PATH})
1792   @rtype: list of L{objects.OS}
1793   @return: a list of tuples (name, path, status, diagnose, variants,
1794       parameters, api_version) for all (potential) OSes under all
1795       search paths, where:
1796           - name is the (potential) OS name
1797           - path is the full path to the OS
1798           - status True/False is the validity of the OS
1799           - diagnose is the error message for an invalid OS, otherwise empty
1800           - variants is a list of supported OS variants, if any
1801           - parameters is a list of (name, help) parameters, if any
1802           - api_version is a list of support OS API versions
1803
1804   """
1805   if top_dirs is None:
1806     top_dirs = constants.OS_SEARCH_PATH
1807
1808   result = []
1809   for dir_name in top_dirs:
1810     if os.path.isdir(dir_name):
1811       try:
1812         f_names = utils.ListVisibleFiles(dir_name)
1813       except EnvironmentError, err:
1814         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1815         break
1816       for name in f_names:
1817         os_path = utils.PathJoin(dir_name, name)
1818         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1819         if status:
1820           diagnose = ""
1821           variants = os_inst.supported_variants
1822           parameters = os_inst.supported_parameters
1823           api_versions = os_inst.api_versions
1824         else:
1825           diagnose = os_inst
1826           variants = parameters = api_versions = []
1827         result.append((name, os_path, status, diagnose, variants,
1828                        parameters, api_versions))
1829
1830   return result
1831
1832
1833 def _TryOSFromDisk(name, base_dir=None):
1834   """Create an OS instance from disk.
1835
1836   This function will return an OS instance if the given name is a
1837   valid OS name.
1838
1839   @type base_dir: string
1840   @keyword base_dir: Base directory containing OS installations.
1841                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1842   @rtype: tuple
1843   @return: success and either the OS instance if we find a valid one,
1844       or error message
1845
1846   """
1847   if base_dir is None:
1848     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1849   else:
1850     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1851
1852   if os_dir is None:
1853     return False, "Directory for OS %s not found in search path" % name
1854
1855   status, api_versions = _OSOndiskAPIVersion(os_dir)
1856   if not status:
1857     # push the error up
1858     return status, api_versions
1859
1860   if not constants.OS_API_VERSIONS.intersection(api_versions):
1861     return False, ("API version mismatch for path '%s': found %s, want %s." %
1862                    (os_dir, api_versions, constants.OS_API_VERSIONS))
1863
1864   # OS Files dictionary, we will populate it with the absolute path names
1865   os_files = dict.fromkeys(constants.OS_SCRIPTS)
1866
1867   if max(api_versions) >= constants.OS_API_V15:
1868     os_files[constants.OS_VARIANTS_FILE] = ''
1869
1870   if max(api_versions) >= constants.OS_API_V20:
1871     os_files[constants.OS_PARAMETERS_FILE] = ''
1872   else:
1873     del os_files[constants.OS_SCRIPT_VERIFY]
1874
1875   for filename in os_files:
1876     os_files[filename] = utils.PathJoin(os_dir, filename)
1877
1878     try:
1879       st = os.stat(os_files[filename])
1880     except EnvironmentError, err:
1881       return False, ("File '%s' under path '%s' is missing (%s)" %
1882                      (filename, os_dir, _ErrnoOrStr(err)))
1883
1884     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1885       return False, ("File '%s' under path '%s' is not a regular file" %
1886                      (filename, os_dir))
1887
1888     if filename in constants.OS_SCRIPTS:
1889       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1890         return False, ("File '%s' under path '%s' is not executable" %
1891                        (filename, os_dir))
1892
1893   variants = []
1894   if constants.OS_VARIANTS_FILE in os_files:
1895     variants_file = os_files[constants.OS_VARIANTS_FILE]
1896     try:
1897       variants = utils.ReadFile(variants_file).splitlines()
1898     except EnvironmentError, err:
1899       return False, ("Error while reading the OS variants file at %s: %s" %
1900                      (variants_file, _ErrnoOrStr(err)))
1901     if not variants:
1902       return False, ("No supported os variant found")
1903
1904   parameters = []
1905   if constants.OS_PARAMETERS_FILE in os_files:
1906     parameters_file = os_files[constants.OS_PARAMETERS_FILE]
1907     try:
1908       parameters = utils.ReadFile(parameters_file).splitlines()
1909     except EnvironmentError, err:
1910       return False, ("Error while reading the OS parameters file at %s: %s" %
1911                      (parameters_file, _ErrnoOrStr(err)))
1912     parameters = [v.split(None, 1) for v in parameters]
1913
1914   os_obj = objects.OS(name=name, path=os_dir,
1915                       create_script=os_files[constants.OS_SCRIPT_CREATE],
1916                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
1917                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
1918                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
1919                       verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
1920                                                  None),
1921                       supported_variants=variants,
1922                       supported_parameters=parameters,
1923                       api_versions=api_versions)
1924   return True, os_obj
1925
1926
1927 def OSFromDisk(name, base_dir=None):
1928   """Create an OS instance from disk.
1929
1930   This function will return an OS instance if the given name is a
1931   valid OS name. Otherwise, it will raise an appropriate
1932   L{RPCFail} exception, detailing why this is not a valid OS.
1933
1934   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1935   an exception but returns true/false status data.
1936
1937   @type base_dir: string
1938   @keyword base_dir: Base directory containing OS installations.
1939                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1940   @rtype: L{objects.OS}
1941   @return: the OS instance if we find a valid one
1942   @raise RPCFail: if we don't find a valid OS
1943
1944   """
1945   name_only = name.split("+", 1)[0]
1946   status, payload = _TryOSFromDisk(name_only, base_dir)
1947
1948   if not status:
1949     _Fail(payload)
1950
1951   return payload
1952
1953
1954 def OSCoreEnv(inst_os, os_params, debug=0):
1955   """Calculate the basic environment for an os script.
1956
1957   @type inst_os: L{objects.OS}
1958   @param inst_os: operating system for which the environment is being built
1959   @type os_params: dict
1960   @param os_params: the OS parameters
1961   @type debug: integer
1962   @param debug: debug level (0 or 1, for OS Api 10)
1963   @rtype: dict
1964   @return: dict of environment variables
1965   @raise errors.BlockDeviceError: if the block device
1966       cannot be found
1967
1968   """
1969   result = {}
1970   api_version = \
1971     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1972   result['OS_API_VERSION'] = '%d' % api_version
1973   result['OS_NAME'] = inst_os.name
1974   result['DEBUG_LEVEL'] = '%d' % debug
1975
1976   # OS variants
1977   if api_version >= constants.OS_API_V15:
1978     try:
1979       variant = inst_os.name.split('+', 1)[1]
1980     except IndexError:
1981       variant = inst_os.supported_variants[0]
1982     result['OS_VARIANT'] = variant
1983
1984   # OS params
1985   for pname, pvalue in os_params.items():
1986     result['OSP_%s' % pname.upper()] = pvalue
1987
1988   return result
1989
1990
1991 def OSEnvironment(instance, inst_os, debug=0):
1992   """Calculate the environment for an os script.
1993
1994   @type instance: L{objects.Instance}
1995   @param instance: target instance for the os script run
1996   @type inst_os: L{objects.OS}
1997   @param inst_os: operating system for which the environment is being built
1998   @type debug: integer
1999   @param debug: debug level (0 or 1, for OS Api 10)
2000   @rtype: dict
2001   @return: dict of environment variables
2002   @raise errors.BlockDeviceError: if the block device
2003       cannot be found
2004
2005   """
2006   result = OSCoreEnv(inst_os, instance.osparams, debug=debug)
2007
2008   result['INSTANCE_NAME'] = instance.name
2009   result['INSTANCE_OS'] = instance.os
2010   result['HYPERVISOR'] = instance.hypervisor
2011   result['DISK_COUNT'] = '%d' % len(instance.disks)
2012   result['NIC_COUNT'] = '%d' % len(instance.nics)
2013
2014   # Disks
2015   for idx, disk in enumerate(instance.disks):
2016     real_disk = _OpenRealBD(disk)
2017     result['DISK_%d_PATH' % idx] = real_disk.dev_path
2018     result['DISK_%d_ACCESS' % idx] = disk.mode
2019     if constants.HV_DISK_TYPE in instance.hvparams:
2020       result['DISK_%d_FRONTEND_TYPE' % idx] = \
2021         instance.hvparams[constants.HV_DISK_TYPE]
2022     if disk.dev_type in constants.LDS_BLOCK:
2023       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
2024     elif disk.dev_type == constants.LD_FILE:
2025       result['DISK_%d_BACKEND_TYPE' % idx] = \
2026         'file:%s' % disk.physical_id[0]
2027
2028   # NICs
2029   for idx, nic in enumerate(instance.nics):
2030     result['NIC_%d_MAC' % idx] = nic.mac
2031     if nic.ip:
2032       result['NIC_%d_IP' % idx] = nic.ip
2033     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
2034     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2035       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
2036     if nic.nicparams[constants.NIC_LINK]:
2037       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
2038     if constants.HV_NIC_TYPE in instance.hvparams:
2039       result['NIC_%d_FRONTEND_TYPE' % idx] = \
2040         instance.hvparams[constants.HV_NIC_TYPE]
2041
2042   # HV/BE params
2043   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2044     for key, value in source.items():
2045       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2046
2047   return result
2048
2049
2050 def BlockdevGrow(disk, amount):
2051   """Grow a stack of block devices.
2052
2053   This function is called recursively, with the childrens being the
2054   first ones to resize.
2055
2056   @type disk: L{objects.Disk}
2057   @param disk: the disk to be grown
2058   @rtype: (status, result)
2059   @return: a tuple with the status of the operation
2060       (True/False), and the errors message if status
2061       is False
2062
2063   """
2064   r_dev = _RecursiveFindBD(disk)
2065   if r_dev is None:
2066     _Fail("Cannot find block device %s", disk)
2067
2068   try:
2069     r_dev.Grow(amount)
2070   except errors.BlockDeviceError, err:
2071     _Fail("Failed to grow block device: %s", err, exc=True)
2072
2073
2074 def BlockdevSnapshot(disk):
2075   """Create a snapshot copy of a block device.
2076
2077   This function is called recursively, and the snapshot is actually created
2078   just for the leaf lvm backend device.
2079
2080   @type disk: L{objects.Disk}
2081   @param disk: the disk to be snapshotted
2082   @rtype: string
2083   @return: snapshot disk path
2084
2085   """
2086   if disk.dev_type == constants.LD_DRBD8:
2087     if not disk.children:
2088       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2089             disk.unique_id)
2090     return BlockdevSnapshot(disk.children[0])
2091   elif disk.dev_type == constants.LD_LV:
2092     r_dev = _RecursiveFindBD(disk)
2093     if r_dev is not None:
2094       # FIXME: choose a saner value for the snapshot size
2095       # let's stay on the safe side and ask for the full size, for now
2096       return r_dev.Snapshot(disk.size)
2097     else:
2098       _Fail("Cannot find block device %s", disk)
2099   else:
2100     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2101           disk.unique_id, disk.dev_type)
2102
2103
2104 def FinalizeExport(instance, snap_disks):
2105   """Write out the export configuration information.
2106
2107   @type instance: L{objects.Instance}
2108   @param instance: the instance which we export, used for
2109       saving configuration
2110   @type snap_disks: list of L{objects.Disk}
2111   @param snap_disks: list of snapshot block devices, which
2112       will be used to get the actual name of the dump file
2113
2114   @rtype: None
2115
2116   """
2117   destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2118   finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2119
2120   config = objects.SerializableConfigParser()
2121
2122   config.add_section(constants.INISECT_EXP)
2123   config.set(constants.INISECT_EXP, 'version', '0')
2124   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2125   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2126   config.set(constants.INISECT_EXP, 'os', instance.os)
2127   config.set(constants.INISECT_EXP, 'compression', 'gzip')
2128
2129   config.add_section(constants.INISECT_INS)
2130   config.set(constants.INISECT_INS, 'name', instance.name)
2131   config.set(constants.INISECT_INS, 'memory', '%d' %
2132              instance.beparams[constants.BE_MEMORY])
2133   config.set(constants.INISECT_INS, 'vcpus', '%d' %
2134              instance.beparams[constants.BE_VCPUS])
2135   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2136   config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2137
2138   nic_total = 0
2139   for nic_count, nic in enumerate(instance.nics):
2140     nic_total += 1
2141     config.set(constants.INISECT_INS, 'nic%d_mac' %
2142                nic_count, '%s' % nic.mac)
2143     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2144     for param in constants.NICS_PARAMETER_TYPES:
2145       config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2146                  '%s' % nic.nicparams.get(param, None))
2147   # TODO: redundant: on load can read nics until it doesn't exist
2148   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2149
2150   disk_total = 0
2151   for disk_count, disk in enumerate(snap_disks):
2152     if disk:
2153       disk_total += 1
2154       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2155                  ('%s' % disk.iv_name))
2156       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2157                  ('%s' % disk.physical_id[1]))
2158       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2159                  ('%d' % disk.size))
2160
2161   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2162
2163   # New-style hypervisor/backend parameters
2164
2165   config.add_section(constants.INISECT_HYP)
2166   for name, value in instance.hvparams.items():
2167     if name not in constants.HVC_GLOBALS:
2168       config.set(constants.INISECT_HYP, name, str(value))
2169
2170   config.add_section(constants.INISECT_BEP)
2171   for name, value in instance.beparams.items():
2172     config.set(constants.INISECT_BEP, name, str(value))
2173
2174   config.add_section(constants.INISECT_OSP)
2175   for name, value in instance.osparams.items():
2176     config.set(constants.INISECT_OSP, name, str(value))
2177
2178   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2179                   data=config.Dumps())
2180   shutil.rmtree(finaldestdir, ignore_errors=True)
2181   shutil.move(destdir, finaldestdir)
2182
2183
2184 def ExportInfo(dest):
2185   """Get export configuration information.
2186
2187   @type dest: str
2188   @param dest: directory containing the export
2189
2190   @rtype: L{objects.SerializableConfigParser}
2191   @return: a serializable config file containing the
2192       export info
2193
2194   """
2195   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2196
2197   config = objects.SerializableConfigParser()
2198   config.read(cff)
2199
2200   if (not config.has_section(constants.INISECT_EXP) or
2201       not config.has_section(constants.INISECT_INS)):
2202     _Fail("Export info file doesn't have the required fields")
2203
2204   return config.Dumps()
2205
2206
2207 def ListExports():
2208   """Return a list of exports currently available on this machine.
2209
2210   @rtype: list
2211   @return: list of the exports
2212
2213   """
2214   if os.path.isdir(constants.EXPORT_DIR):
2215     return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2216   else:
2217     _Fail("No exports directory")
2218
2219
2220 def RemoveExport(export):
2221   """Remove an existing export from the node.
2222
2223   @type export: str
2224   @param export: the name of the export to remove
2225   @rtype: None
2226
2227   """
2228   target = utils.PathJoin(constants.EXPORT_DIR, export)
2229
2230   try:
2231     shutil.rmtree(target)
2232   except EnvironmentError, err:
2233     _Fail("Error while removing the export: %s", err, exc=True)
2234
2235
2236 def BlockdevRename(devlist):
2237   """Rename a list of block devices.
2238
2239   @type devlist: list of tuples
2240   @param devlist: list of tuples of the form  (disk,
2241       new_logical_id, new_physical_id); disk is an
2242       L{objects.Disk} object describing the current disk,
2243       and new logical_id/physical_id is the name we
2244       rename it to
2245   @rtype: boolean
2246   @return: True if all renames succeeded, False otherwise
2247
2248   """
2249   msgs = []
2250   result = True
2251   for disk, unique_id in devlist:
2252     dev = _RecursiveFindBD(disk)
2253     if dev is None:
2254       msgs.append("Can't find device %s in rename" % str(disk))
2255       result = False
2256       continue
2257     try:
2258       old_rpath = dev.dev_path
2259       dev.Rename(unique_id)
2260       new_rpath = dev.dev_path
2261       if old_rpath != new_rpath:
2262         DevCacheManager.RemoveCache(old_rpath)
2263         # FIXME: we should add the new cache information here, like:
2264         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2265         # but we don't have the owner here - maybe parse from existing
2266         # cache? for now, we only lose lvm data when we rename, which
2267         # is less critical than DRBD or MD
2268     except errors.BlockDeviceError, err:
2269       msgs.append("Can't rename device '%s' to '%s': %s" %
2270                   (dev, unique_id, err))
2271       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2272       result = False
2273   if not result:
2274     _Fail("; ".join(msgs))
2275
2276
2277 def _TransformFileStorageDir(file_storage_dir):
2278   """Checks whether given file_storage_dir is valid.
2279
2280   Checks wheter the given file_storage_dir is within the cluster-wide
2281   default file_storage_dir stored in SimpleStore. Only paths under that
2282   directory are allowed.
2283
2284   @type file_storage_dir: str
2285   @param file_storage_dir: the path to check
2286
2287   @return: the normalized path if valid, None otherwise
2288
2289   """
2290   if not constants.ENABLE_FILE_STORAGE:
2291     _Fail("File storage disabled at configure time")
2292   cfg = _GetConfig()
2293   file_storage_dir = os.path.normpath(file_storage_dir)
2294   base_file_storage_dir = cfg.GetFileStorageDir()
2295   if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2296       base_file_storage_dir):
2297     _Fail("File storage directory '%s' is not under base file"
2298           " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2299   return file_storage_dir
2300
2301
2302 def CreateFileStorageDir(file_storage_dir):
2303   """Create file storage directory.
2304
2305   @type file_storage_dir: str
2306   @param file_storage_dir: directory to create
2307
2308   @rtype: tuple
2309   @return: tuple with first element a boolean indicating wheter dir
2310       creation was successful or not
2311
2312   """
2313   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2314   if os.path.exists(file_storage_dir):
2315     if not os.path.isdir(file_storage_dir):
2316       _Fail("Specified storage dir '%s' is not a directory",
2317             file_storage_dir)
2318   else:
2319     try:
2320       os.makedirs(file_storage_dir, 0750)
2321     except OSError, err:
2322       _Fail("Cannot create file storage directory '%s': %s",
2323             file_storage_dir, err, exc=True)
2324
2325
2326 def RemoveFileStorageDir(file_storage_dir):
2327   """Remove file storage directory.
2328
2329   Remove it only if it's empty. If not log an error and return.
2330
2331   @type file_storage_dir: str
2332   @param file_storage_dir: the directory we should cleanup
2333   @rtype: tuple (success,)
2334   @return: tuple of one element, C{success}, denoting
2335       whether the operation was successful
2336
2337   """
2338   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2339   if os.path.exists(file_storage_dir):
2340     if not os.path.isdir(file_storage_dir):
2341       _Fail("Specified Storage directory '%s' is not a directory",
2342             file_storage_dir)
2343     # deletes dir only if empty, otherwise we want to fail the rpc call
2344     try:
2345       os.rmdir(file_storage_dir)
2346     except OSError, err:
2347       _Fail("Cannot remove file storage directory '%s': %s",
2348             file_storage_dir, err)
2349
2350
2351 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2352   """Rename the file storage directory.
2353
2354   @type old_file_storage_dir: str
2355   @param old_file_storage_dir: the current path
2356   @type new_file_storage_dir: str
2357   @param new_file_storage_dir: the name we should rename to
2358   @rtype: tuple (success,)
2359   @return: tuple of one element, C{success}, denoting
2360       whether the operation was successful
2361
2362   """
2363   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2364   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2365   if not os.path.exists(new_file_storage_dir):
2366     if os.path.isdir(old_file_storage_dir):
2367       try:
2368         os.rename(old_file_storage_dir, new_file_storage_dir)
2369       except OSError, err:
2370         _Fail("Cannot rename '%s' to '%s': %s",
2371               old_file_storage_dir, new_file_storage_dir, err)
2372     else:
2373       _Fail("Specified storage dir '%s' is not a directory",
2374             old_file_storage_dir)
2375   else:
2376     if os.path.exists(old_file_storage_dir):
2377       _Fail("Cannot rename '%s' to '%s': both locations exist",
2378             old_file_storage_dir, new_file_storage_dir)
2379
2380
2381 def _EnsureJobQueueFile(file_name):
2382   """Checks whether the given filename is in the queue directory.
2383
2384   @type file_name: str
2385   @param file_name: the file name we should check
2386   @rtype: None
2387   @raises RPCFail: if the file is not valid
2388
2389   """
2390   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2391   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2392
2393   if not result:
2394     _Fail("Passed job queue file '%s' does not belong to"
2395           " the queue directory '%s'", file_name, queue_dir)
2396
2397
2398 def JobQueueUpdate(file_name, content):
2399   """Updates a file in the queue directory.
2400
2401   This is just a wrapper over L{utils.WriteFile}, with proper
2402   checking.
2403
2404   @type file_name: str
2405   @param file_name: the job file name
2406   @type content: str
2407   @param content: the new job contents
2408   @rtype: boolean
2409   @return: the success of the operation
2410
2411   """
2412   _EnsureJobQueueFile(file_name)
2413
2414   # Write and replace the file atomically
2415   utils.WriteFile(file_name, data=_Decompress(content))
2416
2417
2418 def JobQueueRename(old, new):
2419   """Renames a job queue file.
2420
2421   This is just a wrapper over os.rename with proper checking.
2422
2423   @type old: str
2424   @param old: the old (actual) file name
2425   @type new: str
2426   @param new: the desired file name
2427   @rtype: tuple
2428   @return: the success of the operation and payload
2429
2430   """
2431   _EnsureJobQueueFile(old)
2432   _EnsureJobQueueFile(new)
2433
2434   utils.RenameFile(old, new, mkdir=True)
2435
2436
2437 def BlockdevClose(instance_name, disks):
2438   """Closes the given block devices.
2439
2440   This means they will be switched to secondary mode (in case of
2441   DRBD).
2442
2443   @param instance_name: if the argument is not empty, the symlinks
2444       of this instance will be removed
2445   @type disks: list of L{objects.Disk}
2446   @param disks: the list of disks to be closed
2447   @rtype: tuple (success, message)
2448   @return: a tuple of success and message, where success
2449       indicates the succes of the operation, and message
2450       which will contain the error details in case we
2451       failed
2452
2453   """
2454   bdevs = []
2455   for cf in disks:
2456     rd = _RecursiveFindBD(cf)
2457     if rd is None:
2458       _Fail("Can't find device %s", cf)
2459     bdevs.append(rd)
2460
2461   msg = []
2462   for rd in bdevs:
2463     try:
2464       rd.Close()
2465     except errors.BlockDeviceError, err:
2466       msg.append(str(err))
2467   if msg:
2468     _Fail("Can't make devices secondary: %s", ",".join(msg))
2469   else:
2470     if instance_name:
2471       _RemoveBlockDevLinks(instance_name, disks)
2472
2473
2474 def ValidateHVParams(hvname, hvparams):
2475   """Validates the given hypervisor parameters.
2476
2477   @type hvname: string
2478   @param hvname: the hypervisor name
2479   @type hvparams: dict
2480   @param hvparams: the hypervisor parameters to be validated
2481   @rtype: None
2482
2483   """
2484   try:
2485     hv_type = hypervisor.GetHypervisor(hvname)
2486     hv_type.ValidateParameters(hvparams)
2487   except errors.HypervisorError, err:
2488     _Fail(str(err), log=False)
2489
2490
2491 def _CheckOSPList(os_obj, parameters):
2492   """Check whether a list of parameters is supported by the OS.
2493
2494   @type os_obj: L{objects.OS}
2495   @param os_obj: OS object to check
2496   @type parameters: list
2497   @param parameters: the list of parameters to check
2498
2499   """
2500   supported = [v[0] for v in os_obj.supported_parameters]
2501   delta = frozenset(parameters).difference(supported)
2502   if delta:
2503     _Fail("The following parameters are not supported"
2504           " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2505
2506
2507 def ValidateOS(required, osname, checks, osparams):
2508   """Validate the given OS' parameters.
2509
2510   @type required: boolean
2511   @param required: whether absence of the OS should translate into
2512       failure or not
2513   @type osname: string
2514   @param osname: the OS to be validated
2515   @type checks: list
2516   @param checks: list of the checks to run (currently only 'parameters')
2517   @type osparams: dict
2518   @param osparams: dictionary with OS parameters
2519   @rtype: boolean
2520   @return: True if the validation passed, or False if the OS was not
2521       found and L{required} was false
2522
2523   """
2524   if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2525     _Fail("Unknown checks required for OS %s: %s", osname,
2526           set(checks).difference(constants.OS_VALIDATE_CALLS))
2527
2528   name_only = osname.split("+", 1)[0]
2529   status, tbv = _TryOSFromDisk(name_only, None)
2530
2531   if not status:
2532     if required:
2533       _Fail(tbv)
2534     else:
2535       return False
2536
2537   if max(tbv.api_versions) < constants.OS_API_V20:
2538     return True
2539
2540   if constants.OS_VALIDATE_PARAMETERS in checks:
2541     _CheckOSPList(tbv, osparams.keys())
2542
2543   validate_env = OSCoreEnv(tbv, osparams)
2544   result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2545                         cwd=tbv.path)
2546   if result.failed:
2547     logging.error("os validate command '%s' returned error: %s output: %s",
2548                   result.cmd, result.fail_reason, result.output)
2549     _Fail("OS validation script failed (%s), output: %s",
2550           result.fail_reason, result.output, log=False)
2551
2552   return True
2553
2554
2555 def DemoteFromMC():
2556   """Demotes the current node from master candidate role.
2557
2558   """
2559   # try to ensure we're not the master by mistake
2560   master, myself = ssconf.GetMasterAndMyself()
2561   if master == myself:
2562     _Fail("ssconf status shows I'm the master node, will not demote")
2563
2564   result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2565   if not result.failed:
2566     _Fail("The master daemon is running, will not demote")
2567
2568   try:
2569     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2570       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2571   except EnvironmentError, err:
2572     if err.errno != errno.ENOENT:
2573       _Fail("Error while backing up cluster file: %s", err, exc=True)
2574
2575   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2576
2577
2578 def _GetX509Filenames(cryptodir, name):
2579   """Returns the full paths for the private key and certificate.
2580
2581   """
2582   return (utils.PathJoin(cryptodir, name),
2583           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2584           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2585
2586
2587 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2588   """Creates a new X509 certificate for SSL/TLS.
2589
2590   @type validity: int
2591   @param validity: Validity in seconds
2592   @rtype: tuple; (string, string)
2593   @return: Certificate name and public part
2594
2595   """
2596   (key_pem, cert_pem) = \
2597     utils.GenerateSelfSignedX509Cert(netutils.HostInfo.SysName(),
2598                                      min(validity, _MAX_SSL_CERT_VALIDITY))
2599
2600   cert_dir = tempfile.mkdtemp(dir=cryptodir,
2601                               prefix="x509-%s-" % utils.TimestampForFilename())
2602   try:
2603     name = os.path.basename(cert_dir)
2604     assert len(name) > 5
2605
2606     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2607
2608     utils.WriteFile(key_file, mode=0400, data=key_pem)
2609     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2610
2611     # Never return private key as it shouldn't leave the node
2612     return (name, cert_pem)
2613   except Exception:
2614     shutil.rmtree(cert_dir, ignore_errors=True)
2615     raise
2616
2617
2618 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2619   """Removes a X509 certificate.
2620
2621   @type name: string
2622   @param name: Certificate name
2623
2624   """
2625   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2626
2627   utils.RemoveFile(key_file)
2628   utils.RemoveFile(cert_file)
2629
2630   try:
2631     os.rmdir(cert_dir)
2632   except EnvironmentError, err:
2633     _Fail("Cannot remove certificate directory '%s': %s",
2634           cert_dir, err)
2635
2636
2637 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2638   """Returns the command for the requested input/output.
2639
2640   @type instance: L{objects.Instance}
2641   @param instance: The instance object
2642   @param mode: Import/export mode
2643   @param ieio: Input/output type
2644   @param ieargs: Input/output arguments
2645
2646   """
2647   assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2648
2649   env = None
2650   prefix = None
2651   suffix = None
2652   exp_size = None
2653
2654   if ieio == constants.IEIO_FILE:
2655     (filename, ) = ieargs
2656
2657     if not utils.IsNormAbsPath(filename):
2658       _Fail("Path '%s' is not normalized or absolute", filename)
2659
2660     directory = os.path.normpath(os.path.dirname(filename))
2661
2662     if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2663         constants.EXPORT_DIR):
2664       _Fail("File '%s' is not under exports directory '%s'",
2665             filename, constants.EXPORT_DIR)
2666
2667     # Create directory
2668     utils.Makedirs(directory, mode=0750)
2669
2670     quoted_filename = utils.ShellQuote(filename)
2671
2672     if mode == constants.IEM_IMPORT:
2673       suffix = "> %s" % quoted_filename
2674     elif mode == constants.IEM_EXPORT:
2675       suffix = "< %s" % quoted_filename
2676
2677       # Retrieve file size
2678       try:
2679         st = os.stat(filename)
2680       except EnvironmentError, err:
2681         logging.error("Can't stat(2) %s: %s", filename, err)
2682       else:
2683         exp_size = utils.BytesToMebibyte(st.st_size)
2684
2685   elif ieio == constants.IEIO_RAW_DISK:
2686     (disk, ) = ieargs
2687
2688     real_disk = _OpenRealBD(disk)
2689
2690     if mode == constants.IEM_IMPORT:
2691       # we set here a smaller block size as, due to transport buffering, more
2692       # than 64-128k will mostly ignored; we use nocreat to fail if the device
2693       # is not already there or we pass a wrong path; we use notrunc to no
2694       # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2695       # much memory; this means that at best, we flush every 64k, which will
2696       # not be very fast
2697       suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2698                                     " bs=%s oflag=dsync"),
2699                                     real_disk.dev_path,
2700                                     str(64 * 1024))
2701
2702     elif mode == constants.IEM_EXPORT:
2703       # the block size on the read dd is 1MiB to match our units
2704       prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2705                                    real_disk.dev_path,
2706                                    str(1024 * 1024), # 1 MB
2707                                    str(disk.size))
2708       exp_size = disk.size
2709
2710   elif ieio == constants.IEIO_SCRIPT:
2711     (disk, disk_index, ) = ieargs
2712
2713     assert isinstance(disk_index, (int, long))
2714
2715     real_disk = _OpenRealBD(disk)
2716
2717     inst_os = OSFromDisk(instance.os)
2718     env = OSEnvironment(instance, inst_os)
2719
2720     if mode == constants.IEM_IMPORT:
2721       env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2722       env["IMPORT_INDEX"] = str(disk_index)
2723       script = inst_os.import_script
2724
2725     elif mode == constants.IEM_EXPORT:
2726       env["EXPORT_DEVICE"] = real_disk.dev_path
2727       env["EXPORT_INDEX"] = str(disk_index)
2728       script = inst_os.export_script
2729
2730     # TODO: Pass special environment only to script
2731     script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2732
2733     if mode == constants.IEM_IMPORT:
2734       suffix = "| %s" % script_cmd
2735
2736     elif mode == constants.IEM_EXPORT:
2737       prefix = "%s |" % script_cmd
2738
2739     # Let script predict size
2740     exp_size = constants.IE_CUSTOM_SIZE
2741
2742   else:
2743     _Fail("Invalid %s I/O mode %r", mode, ieio)
2744
2745   return (env, prefix, suffix, exp_size)
2746
2747
2748 def _CreateImportExportStatusDir(prefix):
2749   """Creates status directory for import/export.
2750
2751   """
2752   return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2753                           prefix=("%s-%s-" %
2754                                   (prefix, utils.TimestampForFilename())))
2755
2756
2757 def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
2758   """Starts an import or export daemon.
2759
2760   @param mode: Import/output mode
2761   @type opts: L{objects.ImportExportOptions}
2762   @param opts: Daemon options
2763   @type host: string
2764   @param host: Remote host for export (None for import)
2765   @type port: int
2766   @param port: Remote port for export (None for import)
2767   @type instance: L{objects.Instance}
2768   @param instance: Instance object
2769   @param ieio: Input/output type
2770   @param ieioargs: Input/output arguments
2771
2772   """
2773   if mode == constants.IEM_IMPORT:
2774     prefix = "import"
2775
2776     if not (host is None and port is None):
2777       _Fail("Can not specify host or port on import")
2778
2779   elif mode == constants.IEM_EXPORT:
2780     prefix = "export"
2781
2782     if host is None or port is None:
2783       _Fail("Host and port must be specified for an export")
2784
2785   else:
2786     _Fail("Invalid mode %r", mode)
2787
2788   if (opts.key_name is None) ^ (opts.ca_pem is None):
2789     _Fail("Cluster certificate can only be used for both key and CA")
2790
2791   (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
2792     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
2793
2794   if opts.key_name is None:
2795     # Use server.pem
2796     key_path = constants.NODED_CERT_FILE
2797     cert_path = constants.NODED_CERT_FILE
2798     assert opts.ca_pem is None
2799   else:
2800     (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
2801                                                  opts.key_name)
2802     assert opts.ca_pem is not None
2803
2804   for i in [key_path, cert_path]:
2805     if not os.path.exists(i):
2806       _Fail("File '%s' does not exist" % i)
2807
2808   status_dir = _CreateImportExportStatusDir(prefix)
2809   try:
2810     status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
2811     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
2812     ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
2813
2814     if opts.ca_pem is None:
2815       # Use server.pem
2816       ca = utils.ReadFile(constants.NODED_CERT_FILE)
2817     else:
2818       ca = opts.ca_pem
2819
2820     # Write CA file
2821     utils.WriteFile(ca_file, data=ca, mode=0400)
2822
2823     cmd = [
2824       constants.IMPORT_EXPORT_DAEMON,
2825       status_file, mode,
2826       "--key=%s" % key_path,
2827       "--cert=%s" % cert_path,
2828       "--ca=%s" % ca_file,
2829       ]
2830
2831     if host:
2832       cmd.append("--host=%s" % host)
2833
2834     if port:
2835       cmd.append("--port=%s" % port)
2836
2837     if opts.compress:
2838       cmd.append("--compress=%s" % opts.compress)
2839
2840     if opts.magic:
2841       cmd.append("--magic=%s" % opts.magic)
2842
2843     if exp_size is not None:
2844       cmd.append("--expected-size=%s" % exp_size)
2845
2846     if cmd_prefix:
2847       cmd.append("--cmd-prefix=%s" % cmd_prefix)
2848
2849     if cmd_suffix:
2850       cmd.append("--cmd-suffix=%s" % cmd_suffix)
2851
2852     logfile = _InstanceLogName(prefix, instance.os, instance.name)
2853
2854     # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
2855     # support for receiving a file descriptor for output
2856     utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
2857                       output=logfile)
2858
2859     # The import/export name is simply the status directory name
2860     return os.path.basename(status_dir)
2861
2862   except Exception:
2863     shutil.rmtree(status_dir, ignore_errors=True)
2864     raise
2865
2866
2867 def GetImportExportStatus(names):
2868   """Returns import/export daemon status.
2869
2870   @type names: sequence
2871   @param names: List of names
2872   @rtype: List of dicts
2873   @return: Returns a list of the state of each named import/export or None if a
2874            status couldn't be read
2875
2876   """
2877   result = []
2878
2879   for name in names:
2880     status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
2881                                  _IES_STATUS_FILE)
2882
2883     try:
2884       data = utils.ReadFile(status_file)
2885     except EnvironmentError, err:
2886       if err.errno != errno.ENOENT:
2887         raise
2888       data = None
2889
2890     if not data:
2891       result.append(None)
2892       continue
2893
2894     result.append(serializer.LoadJson(data))
2895
2896   return result
2897
2898
2899 def AbortImportExport(name):
2900   """Sends SIGTERM to a running import/export daemon.
2901
2902   """
2903   logging.info("Abort import/export %s", name)
2904
2905   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2906   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2907
2908   if pid:
2909     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
2910                  name, pid)
2911     utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
2912
2913
2914 def CleanupImportExport(name):
2915   """Cleanup after an import or export.
2916
2917   If the import/export daemon is still running it's killed. Afterwards the
2918   whole status directory is removed.
2919
2920   """
2921   logging.info("Finalizing import/export %s", name)
2922
2923   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2924
2925   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2926
2927   if pid:
2928     logging.info("Import/export %s is still running with PID %s",
2929                  name, pid)
2930     utils.KillProcess(pid, waitpid=False)
2931
2932   shutil.rmtree(status_dir, ignore_errors=True)
2933
2934
2935 def _FindDisks(nodes_ip, disks):
2936   """Sets the physical ID on disks and returns the block devices.
2937
2938   """
2939   # set the correct physical ID
2940   my_name = netutils.HostInfo().name
2941   for cf in disks:
2942     cf.SetPhysicalID(my_name, nodes_ip)
2943
2944   bdevs = []
2945
2946   for cf in disks:
2947     rd = _RecursiveFindBD(cf)
2948     if rd is None:
2949       _Fail("Can't find device %s", cf)
2950     bdevs.append(rd)
2951   return bdevs
2952
2953
2954 def DrbdDisconnectNet(nodes_ip, disks):
2955   """Disconnects the network on a list of drbd devices.
2956
2957   """
2958   bdevs = _FindDisks(nodes_ip, disks)
2959
2960   # disconnect disks
2961   for rd in bdevs:
2962     try:
2963       rd.DisconnectNet()
2964     except errors.BlockDeviceError, err:
2965       _Fail("Can't change network configuration to standalone mode: %s",
2966             err, exc=True)
2967
2968
2969 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2970   """Attaches the network on a list of drbd devices.
2971
2972   """
2973   bdevs = _FindDisks(nodes_ip, disks)
2974
2975   if multimaster:
2976     for idx, rd in enumerate(bdevs):
2977       try:
2978         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2979       except EnvironmentError, err:
2980         _Fail("Can't create symlink: %s", err)
2981   # reconnect disks, switch to new master configuration and if
2982   # needed primary mode
2983   for rd in bdevs:
2984     try:
2985       rd.AttachNet(multimaster)
2986     except errors.BlockDeviceError, err:
2987       _Fail("Can't change network configuration: %s", err)
2988
2989   # wait until the disks are connected; we need to retry the re-attach
2990   # if the device becomes standalone, as this might happen if the one
2991   # node disconnects and reconnects in a different mode before the
2992   # other node reconnects; in this case, one or both of the nodes will
2993   # decide it has wrong configuration and switch to standalone
2994
2995   def _Attach():
2996     all_connected = True
2997
2998     for rd in bdevs:
2999       stats = rd.GetProcStatus()
3000
3001       all_connected = (all_connected and
3002                        (stats.is_connected or stats.is_in_resync))
3003
3004       if stats.is_standalone:
3005         # peer had different config info and this node became
3006         # standalone, even though this should not happen with the
3007         # new staged way of changing disk configs
3008         try:
3009           rd.AttachNet(multimaster)
3010         except errors.BlockDeviceError, err:
3011           _Fail("Can't change network configuration: %s", err)
3012
3013     if not all_connected:
3014       raise utils.RetryAgain()
3015
3016   try:
3017     # Start with a delay of 100 miliseconds and go up to 5 seconds
3018     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3019   except utils.RetryTimeout:
3020     _Fail("Timeout in disk reconnecting")
3021
3022   if multimaster:
3023     # change to primary mode
3024     for rd in bdevs:
3025       try:
3026         rd.Open()
3027       except errors.BlockDeviceError, err:
3028         _Fail("Can't change to primary mode: %s", err)
3029
3030
3031 def DrbdWaitSync(nodes_ip, disks):
3032   """Wait until DRBDs have synchronized.
3033
3034   """
3035   def _helper(rd):
3036     stats = rd.GetProcStatus()
3037     if not (stats.is_connected or stats.is_in_resync):
3038       raise utils.RetryAgain()
3039     return stats
3040
3041   bdevs = _FindDisks(nodes_ip, disks)
3042
3043   min_resync = 100
3044   alldone = True
3045   for rd in bdevs:
3046     try:
3047       # poll each second for 15 seconds
3048       stats = utils.Retry(_helper, 1, 15, args=[rd])
3049     except utils.RetryTimeout:
3050       stats = rd.GetProcStatus()
3051       # last check
3052       if not (stats.is_connected or stats.is_in_resync):
3053         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3054     alldone = alldone and (not stats.is_in_resync)
3055     if stats.sync_percent is not None:
3056       min_resync = min(min_resync, stats.sync_percent)
3057
3058   return (alldone, min_resync)
3059
3060
3061 def GetDrbdUsermodeHelper():
3062   """Returns DRBD usermode helper currently configured.
3063
3064   """
3065   try:
3066     return bdev.BaseDRBD.GetUsermodeHelper()
3067   except errors.BlockDeviceError, err:
3068     _Fail(str(err))
3069
3070
3071 def PowercycleNode(hypervisor_type):
3072   """Hard-powercycle the node.
3073
3074   Because we need to return first, and schedule the powercycle in the
3075   background, we won't be able to report failures nicely.
3076
3077   """
3078   hyper = hypervisor.GetHypervisor(hypervisor_type)
3079   try:
3080     pid = os.fork()
3081   except OSError:
3082     # if we can't fork, we'll pretend that we're in the child process
3083     pid = 0
3084   if pid > 0:
3085     return "Reboot scheduled in 5 seconds"
3086   # ensure the child is running on ram
3087   try:
3088     utils.Mlockall()
3089   except Exception: # pylint: disable-msg=W0703
3090     pass
3091   time.sleep(5)
3092   hyper.PowercycleNode()
3093
3094
3095 class HooksRunner(object):
3096   """Hook runner.
3097
3098   This class is instantiated on the node side (ganeti-noded) and not
3099   on the master side.
3100
3101   """
3102   def __init__(self, hooks_base_dir=None):
3103     """Constructor for hooks runner.
3104
3105     @type hooks_base_dir: str or None
3106     @param hooks_base_dir: if not None, this overrides the
3107         L{constants.HOOKS_BASE_DIR} (useful for unittests)
3108
3109     """
3110     if hooks_base_dir is None:
3111       hooks_base_dir = constants.HOOKS_BASE_DIR
3112     # yeah, _BASE_DIR is not valid for attributes, we use it like a
3113     # constant
3114     self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
3115
3116   def RunHooks(self, hpath, phase, env):
3117     """Run the scripts in the hooks directory.
3118
3119     @type hpath: str
3120     @param hpath: the path to the hooks directory which
3121         holds the scripts
3122     @type phase: str
3123     @param phase: either L{constants.HOOKS_PHASE_PRE} or
3124         L{constants.HOOKS_PHASE_POST}
3125     @type env: dict
3126     @param env: dictionary with the environment for the hook
3127     @rtype: list
3128     @return: list of 3-element tuples:
3129       - script path
3130       - script result, either L{constants.HKR_SUCCESS} or
3131         L{constants.HKR_FAIL}
3132       - output of the script
3133
3134     @raise errors.ProgrammerError: for invalid input
3135         parameters
3136
3137     """
3138     if phase == constants.HOOKS_PHASE_PRE:
3139       suffix = "pre"
3140     elif phase == constants.HOOKS_PHASE_POST:
3141       suffix = "post"
3142     else:
3143       _Fail("Unknown hooks phase '%s'", phase)
3144
3145
3146     subdir = "%s-%s.d" % (hpath, suffix)
3147     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3148
3149     results = []
3150
3151     if not os.path.isdir(dir_name):
3152       # for non-existing/non-dirs, we simply exit instead of logging a
3153       # warning at every operation
3154       return results
3155
3156     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3157
3158     for (relname, relstatus, runresult)  in runparts_results:
3159       if relstatus == constants.RUNPARTS_SKIP:
3160         rrval = constants.HKR_SKIP
3161         output = ""
3162       elif relstatus == constants.RUNPARTS_ERR:
3163         rrval = constants.HKR_FAIL
3164         output = "Hook script execution error: %s" % runresult
3165       elif relstatus == constants.RUNPARTS_RUN:
3166         if runresult.failed:
3167           rrval = constants.HKR_FAIL
3168         else:
3169           rrval = constants.HKR_SUCCESS
3170         output = utils.SafeEncode(runresult.output.strip())
3171       results.append(("%s/%s" % (subdir, relname), rrval, output))
3172
3173     return results
3174
3175
3176 class IAllocatorRunner(object):
3177   """IAllocator runner.
3178
3179   This class is instantiated on the node side (ganeti-noded) and not on
3180   the master side.
3181
3182   """
3183   @staticmethod
3184   def Run(name, idata):
3185     """Run an iallocator script.
3186
3187     @type name: str
3188     @param name: the iallocator script name
3189     @type idata: str
3190     @param idata: the allocator input data
3191
3192     @rtype: tuple
3193     @return: two element tuple of:
3194        - status
3195        - either error message or stdout of allocator (for success)
3196
3197     """
3198     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3199                                   os.path.isfile)
3200     if alloc_script is None:
3201       _Fail("iallocator module '%s' not found in the search path", name)
3202
3203     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3204     try:
3205       os.write(fd, idata)
3206       os.close(fd)
3207       result = utils.RunCmd([alloc_script, fin_name])
3208       if result.failed:
3209         _Fail("iallocator module '%s' failed: %s, output '%s'",
3210               name, result.fail_reason, result.output)
3211     finally:
3212       os.unlink(fin_name)
3213
3214     return result.stdout
3215
3216
3217 class DevCacheManager(object):
3218   """Simple class for managing a cache of block device information.
3219
3220   """
3221   _DEV_PREFIX = "/dev/"
3222   _ROOT_DIR = constants.BDEV_CACHE_DIR
3223
3224   @classmethod
3225   def _ConvertPath(cls, dev_path):
3226     """Converts a /dev/name path to the cache file name.
3227
3228     This replaces slashes with underscores and strips the /dev
3229     prefix. It then returns the full path to the cache file.
3230
3231     @type dev_path: str
3232     @param dev_path: the C{/dev/} path name
3233     @rtype: str
3234     @return: the converted path name
3235
3236     """
3237     if dev_path.startswith(cls._DEV_PREFIX):
3238       dev_path = dev_path[len(cls._DEV_PREFIX):]
3239     dev_path = dev_path.replace("/", "_")
3240     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3241     return fpath
3242
3243   @classmethod
3244   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3245     """Updates the cache information for a given device.
3246
3247     @type dev_path: str
3248     @param dev_path: the pathname of the device
3249     @type owner: str
3250     @param owner: the owner (instance name) of the device
3251     @type on_primary: bool
3252     @param on_primary: whether this is the primary
3253         node nor not
3254     @type iv_name: str
3255     @param iv_name: the instance-visible name of the
3256         device, as in objects.Disk.iv_name
3257
3258     @rtype: None
3259
3260     """
3261     if dev_path is None:
3262       logging.error("DevCacheManager.UpdateCache got a None dev_path")
3263       return
3264     fpath = cls._ConvertPath(dev_path)
3265     if on_primary:
3266       state = "primary"
3267     else:
3268       state = "secondary"
3269     if iv_name is None:
3270       iv_name = "not_visible"
3271     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3272     try:
3273       utils.WriteFile(fpath, data=fdata)
3274     except EnvironmentError, err:
3275       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3276
3277   @classmethod
3278   def RemoveCache(cls, dev_path):
3279     """Remove data for a dev_path.
3280
3281     This is just a wrapper over L{utils.RemoveFile} with a converted
3282     path name and logging.
3283
3284     @type dev_path: str
3285     @param dev_path: the pathname of the device
3286
3287     @rtype: None
3288
3289     """
3290     if dev_path is None:
3291       logging.error("DevCacheManager.RemoveCache got a None dev_path")
3292       return
3293     fpath = cls._ConvertPath(dev_path)
3294     try:
3295       utils.RemoveFile(fpath)
3296     except EnvironmentError, err:
3297       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)