Merge branch 'devel-2.1'
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable-msg=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50 import signal
51
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61
62
63 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
64 _ALLOWED_CLEAN_DIRS = frozenset([
65   constants.DATA_DIR,
66   constants.JOB_QUEUE_ARCHIVE_DIR,
67   constants.QUEUE_DIR,
68   constants.CRYPTO_KEYS_DIR,
69   ])
70 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
71 _X509_KEY_FILE = "key"
72 _X509_CERT_FILE = "cert"
73 _IES_STATUS_FILE = "status"
74 _IES_PID_FILE = "pid"
75 _IES_CA_FILE = "ca"
76
77
78 class RPCFail(Exception):
79   """Class denoting RPC failure.
80
81   Its argument is the error message.
82
83   """
84
85
86 def _Fail(msg, *args, **kwargs):
87   """Log an error and the raise an RPCFail exception.
88
89   This exception is then handled specially in the ganeti daemon and
90   turned into a 'failed' return type. As such, this function is a
91   useful shortcut for logging the error and returning it to the master
92   daemon.
93
94   @type msg: string
95   @param msg: the text of the exception
96   @raise RPCFail
97
98   """
99   if args:
100     msg = msg % args
101   if "log" not in kwargs or kwargs["log"]: # if we should log this error
102     if "exc" in kwargs and kwargs["exc"]:
103       logging.exception(msg)
104     else:
105       logging.error(msg)
106   raise RPCFail(msg)
107
108
109 def _GetConfig():
110   """Simple wrapper to return a SimpleStore.
111
112   @rtype: L{ssconf.SimpleStore}
113   @return: a SimpleStore instance
114
115   """
116   return ssconf.SimpleStore()
117
118
119 def _GetSshRunner(cluster_name):
120   """Simple wrapper to return an SshRunner.
121
122   @type cluster_name: str
123   @param cluster_name: the cluster name, which is needed
124       by the SshRunner constructor
125   @rtype: L{ssh.SshRunner}
126   @return: an SshRunner instance
127
128   """
129   return ssh.SshRunner(cluster_name)
130
131
132 def _Decompress(data):
133   """Unpacks data compressed by the RPC client.
134
135   @type data: list or tuple
136   @param data: Data sent by RPC client
137   @rtype: str
138   @return: Decompressed data
139
140   """
141   assert isinstance(data, (list, tuple))
142   assert len(data) == 2
143   (encoding, content) = data
144   if encoding == constants.RPC_ENCODING_NONE:
145     return content
146   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
147     return zlib.decompress(base64.b64decode(content))
148   else:
149     raise AssertionError("Unknown data encoding")
150
151
152 def _CleanDirectory(path, exclude=None):
153   """Removes all regular files in a directory.
154
155   @type path: str
156   @param path: the directory to clean
157   @type exclude: list
158   @param exclude: list of files to be excluded, defaults
159       to the empty list
160
161   """
162   if path not in _ALLOWED_CLEAN_DIRS:
163     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
164           path)
165
166   if not os.path.isdir(path):
167     return
168   if exclude is None:
169     exclude = []
170   else:
171     # Normalize excluded paths
172     exclude = [os.path.normpath(i) for i in exclude]
173
174   for rel_name in utils.ListVisibleFiles(path):
175     full_name = utils.PathJoin(path, rel_name)
176     if full_name in exclude:
177       continue
178     if os.path.isfile(full_name) and not os.path.islink(full_name):
179       utils.RemoveFile(full_name)
180
181
182 def _BuildUploadFileList():
183   """Build the list of allowed upload files.
184
185   This is abstracted so that it's built only once at module import time.
186
187   """
188   allowed_files = set([
189     constants.CLUSTER_CONF_FILE,
190     constants.ETC_HOSTS,
191     constants.SSH_KNOWN_HOSTS_FILE,
192     constants.VNC_PASSWORD_FILE,
193     constants.RAPI_CERT_FILE,
194     constants.RAPI_USERS_FILE,
195     constants.CONFD_HMAC_KEY,
196     ])
197
198   for hv_name in constants.HYPER_TYPES:
199     hv_class = hypervisor.GetHypervisorClass(hv_name)
200     allowed_files.update(hv_class.GetAncillaryFiles())
201
202   return frozenset(allowed_files)
203
204
205 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
206
207
208 def JobQueuePurge():
209   """Removes job queue files and archived jobs.
210
211   @rtype: tuple
212   @return: True, None
213
214   """
215   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
216   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
217
218
219 def GetMasterInfo():
220   """Returns master information.
221
222   This is an utility function to compute master information, either
223   for consumption here or from the node daemon.
224
225   @rtype: tuple
226   @return: master_netdev, master_ip, master_name
227   @raise RPCFail: in case of errors
228
229   """
230   try:
231     cfg = _GetConfig()
232     master_netdev = cfg.GetMasterNetdev()
233     master_ip = cfg.GetMasterIP()
234     master_node = cfg.GetMasterNode()
235   except errors.ConfigurationError, err:
236     _Fail("Cluster configuration incomplete: %s", err, exc=True)
237   return (master_netdev, master_ip, master_node)
238
239
240 def StartMaster(start_daemons, no_voting):
241   """Activate local node as master node.
242
243   The function will always try activate the IP address of the master
244   (unless someone else has it). It will also start the master daemons,
245   based on the start_daemons parameter.
246
247   @type start_daemons: boolean
248   @param start_daemons: whether to also start the master
249       daemons (ganeti-masterd and ganeti-rapi)
250   @type no_voting: boolean
251   @param no_voting: whether to start ganeti-masterd without a node vote
252       (if start_daemons is True), but still non-interactively
253   @rtype: None
254
255   """
256   # GetMasterInfo will raise an exception if not able to return data
257   master_netdev, master_ip, _ = GetMasterInfo()
258
259   err_msgs = []
260   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
261     if utils.OwnIpAddress(master_ip):
262       # we already have the ip:
263       logging.debug("Master IP already configured, doing nothing")
264     else:
265       msg = "Someone else has the master ip, not activating"
266       logging.error(msg)
267       err_msgs.append(msg)
268   else:
269     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
270                            "dev", master_netdev, "label",
271                            "%s:0" % master_netdev])
272     if result.failed:
273       msg = "Can't activate master IP: %s" % result.output
274       logging.error(msg)
275       err_msgs.append(msg)
276
277     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
278                            "-s", master_ip, master_ip])
279     # we'll ignore the exit code of arping
280
281   # and now start the master and rapi daemons
282   if start_daemons:
283     if no_voting:
284       masterd_args = "--no-voting --yes-do-it"
285     else:
286       masterd_args = ""
287
288     env = {
289       "EXTRA_MASTERD_ARGS": masterd_args,
290       }
291
292     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
293     if result.failed:
294       msg = "Can't start Ganeti master: %s" % result.output
295       logging.error(msg)
296       err_msgs.append(msg)
297
298   if err_msgs:
299     _Fail("; ".join(err_msgs))
300
301
302 def StopMaster(stop_daemons):
303   """Deactivate this node as master.
304
305   The function will always try to deactivate the IP address of the
306   master. It will also stop the master daemons depending on the
307   stop_daemons parameter.
308
309   @type stop_daemons: boolean
310   @param stop_daemons: whether to also stop the master daemons
311       (ganeti-masterd and ganeti-rapi)
312   @rtype: None
313
314   """
315   # TODO: log and report back to the caller the error failures; we
316   # need to decide in which case we fail the RPC for this
317
318   # GetMasterInfo will raise an exception if not able to return data
319   master_netdev, master_ip, _ = GetMasterInfo()
320
321   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
322                          "dev", master_netdev])
323   if result.failed:
324     logging.error("Can't remove the master IP, error: %s", result.output)
325     # but otherwise ignore the failure
326
327   if stop_daemons:
328     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
329     if result.failed:
330       logging.error("Could not stop Ganeti master, command %s had exitcode %s"
331                     " and error %s",
332                     result.cmd, result.exit_code, result.output)
333
334
335 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
336   """Joins this node to the cluster.
337
338   This does the following:
339       - updates the hostkeys of the machine (rsa and dsa)
340       - adds the ssh private key to the user
341       - adds the ssh public key to the users' authorized_keys file
342
343   @type dsa: str
344   @param dsa: the DSA private key to write
345   @type dsapub: str
346   @param dsapub: the DSA public key to write
347   @type rsa: str
348   @param rsa: the RSA private key to write
349   @type rsapub: str
350   @param rsapub: the RSA public key to write
351   @type sshkey: str
352   @param sshkey: the SSH private key to write
353   @type sshpub: str
354   @param sshpub: the SSH public key to write
355   @rtype: boolean
356   @return: the success of the operation
357
358   """
359   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
360                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
361                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
362                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
363   for name, content, mode in sshd_keys:
364     utils.WriteFile(name, data=content, mode=mode)
365
366   try:
367     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
368                                                     mkdir=True)
369   except errors.OpExecError, err:
370     _Fail("Error while processing user ssh files: %s", err, exc=True)
371
372   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
373     utils.WriteFile(name, data=content, mode=0600)
374
375   utils.AddAuthorizedKey(auth_keys, sshpub)
376
377   result = utils.RunCmd([constants.DAEMON_UTIL, "reload-ssh-keys"])
378   if result.failed:
379     _Fail("Unable to reload SSH keys (command %r, exit code %s, output %r)",
380           result.cmd, result.exit_code, result.output)
381
382
383 def LeaveCluster(modify_ssh_setup):
384   """Cleans up and remove the current node.
385
386   This function cleans up and prepares the current node to be removed
387   from the cluster.
388
389   If processing is successful, then it raises an
390   L{errors.QuitGanetiException} which is used as a special case to
391   shutdown the node daemon.
392
393   @param modify_ssh_setup: boolean
394
395   """
396   _CleanDirectory(constants.DATA_DIR)
397   _CleanDirectory(constants.CRYPTO_KEYS_DIR)
398   JobQueuePurge()
399
400   if modify_ssh_setup:
401     try:
402       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
403
404       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
405
406       utils.RemoveFile(priv_key)
407       utils.RemoveFile(pub_key)
408     except errors.OpExecError:
409       logging.exception("Error while processing ssh files")
410
411   try:
412     utils.RemoveFile(constants.CONFD_HMAC_KEY)
413     utils.RemoveFile(constants.RAPI_CERT_FILE)
414     utils.RemoveFile(constants.NODED_CERT_FILE)
415   except: # pylint: disable-msg=W0702
416     logging.exception("Error while removing cluster secrets")
417
418   result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
419   if result.failed:
420     logging.error("Command %s failed with exitcode %s and error %s",
421                   result.cmd, result.exit_code, result.output)
422
423   # Raise a custom exception (handled in ganeti-noded)
424   raise errors.QuitGanetiException(True, 'Shutdown scheduled')
425
426
427 def GetNodeInfo(vgname, hypervisor_type):
428   """Gives back a hash with different information about the node.
429
430   @type vgname: C{string}
431   @param vgname: the name of the volume group to ask for disk space information
432   @type hypervisor_type: C{str}
433   @param hypervisor_type: the name of the hypervisor to ask for
434       memory information
435   @rtype: C{dict}
436   @return: dictionary with the following keys:
437       - vg_size is the size of the configured volume group in MiB
438       - vg_free is the free size of the volume group in MiB
439       - memory_dom0 is the memory allocated for domain0 in MiB
440       - memory_free is the currently available (free) ram in MiB
441       - memory_total is the total number of ram in MiB
442
443   """
444   outputarray = {}
445   vginfo = _GetVGInfo(vgname)
446   outputarray['vg_size'] = vginfo['vg_size']
447   outputarray['vg_free'] = vginfo['vg_free']
448
449   hyper = hypervisor.GetHypervisor(hypervisor_type)
450   hyp_info = hyper.GetNodeInfo()
451   if hyp_info is not None:
452     outputarray.update(hyp_info)
453
454   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
455
456   return outputarray
457
458
459 def VerifyNode(what, cluster_name):
460   """Verify the status of the local node.
461
462   Based on the input L{what} parameter, various checks are done on the
463   local node.
464
465   If the I{filelist} key is present, this list of
466   files is checksummed and the file/checksum pairs are returned.
467
468   If the I{nodelist} key is present, we check that we have
469   connectivity via ssh with the target nodes (and check the hostname
470   report).
471
472   If the I{node-net-test} key is present, we check that we have
473   connectivity to the given nodes via both primary IP and, if
474   applicable, secondary IPs.
475
476   @type what: C{dict}
477   @param what: a dictionary of things to check:
478       - filelist: list of files for which to compute checksums
479       - nodelist: list of nodes we should check ssh communication with
480       - node-net-test: list of nodes we should check node daemon port
481         connectivity with
482       - hypervisor: list with hypervisors to run the verify for
483   @rtype: dict
484   @return: a dictionary with the same keys as the input dict, and
485       values representing the result of the checks
486
487   """
488   result = {}
489   my_name = utils.HostInfo().name
490   port = utils.GetDaemonPort(constants.NODED)
491
492   if constants.NV_HYPERVISOR in what:
493     result[constants.NV_HYPERVISOR] = tmp = {}
494     for hv_name in what[constants.NV_HYPERVISOR]:
495       try:
496         val = hypervisor.GetHypervisor(hv_name).Verify()
497       except errors.HypervisorError, err:
498         val = "Error while checking hypervisor: %s" % str(err)
499       tmp[hv_name] = val
500
501   if constants.NV_FILELIST in what:
502     result[constants.NV_FILELIST] = utils.FingerprintFiles(
503       what[constants.NV_FILELIST])
504
505   if constants.NV_NODELIST in what:
506     result[constants.NV_NODELIST] = tmp = {}
507     random.shuffle(what[constants.NV_NODELIST])
508     for node in what[constants.NV_NODELIST]:
509       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
510       if not success:
511         tmp[node] = message
512
513   if constants.NV_NODENETTEST in what:
514     result[constants.NV_NODENETTEST] = tmp = {}
515     my_pip = my_sip = None
516     for name, pip, sip in what[constants.NV_NODENETTEST]:
517       if name == my_name:
518         my_pip = pip
519         my_sip = sip
520         break
521     if not my_pip:
522       tmp[my_name] = ("Can't find my own primary/secondary IP"
523                       " in the node list")
524     else:
525       for name, pip, sip in what[constants.NV_NODENETTEST]:
526         fail = []
527         if not utils.TcpPing(pip, port, source=my_pip):
528           fail.append("primary")
529         if sip != pip:
530           if not utils.TcpPing(sip, port, source=my_sip):
531             fail.append("secondary")
532         if fail:
533           tmp[name] = ("failure using the %s interface(s)" %
534                        " and ".join(fail))
535
536   if constants.NV_MASTERIP in what:
537     # FIXME: add checks on incoming data structures (here and in the
538     # rest of the function)
539     master_name, master_ip = what[constants.NV_MASTERIP]
540     if master_name == my_name:
541       source = constants.LOCALHOST_IP_ADDRESS
542     else:
543       source = None
544     result[constants.NV_MASTERIP] = utils.TcpPing(master_ip, port,
545                                                   source=source)
546
547   if constants.NV_LVLIST in what:
548     try:
549       val = GetVolumeList(what[constants.NV_LVLIST])
550     except RPCFail, err:
551       val = str(err)
552     result[constants.NV_LVLIST] = val
553
554   if constants.NV_INSTANCELIST in what:
555     # GetInstanceList can fail
556     try:
557       val = GetInstanceList(what[constants.NV_INSTANCELIST])
558     except RPCFail, err:
559       val = str(err)
560     result[constants.NV_INSTANCELIST] = val
561
562   if constants.NV_VGLIST in what:
563     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
564
565   if constants.NV_PVLIST in what:
566     result[constants.NV_PVLIST] = \
567       bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
568                                    filter_allocatable=False)
569
570   if constants.NV_VERSION in what:
571     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
572                                     constants.RELEASE_VERSION)
573
574   if constants.NV_HVINFO in what:
575     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
576     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
577
578   if constants.NV_DRBDLIST in what:
579     try:
580       used_minors = bdev.DRBD8.GetUsedDevs().keys()
581     except errors.BlockDeviceError, err:
582       logging.warning("Can't get used minors list", exc_info=True)
583       used_minors = str(err)
584     result[constants.NV_DRBDLIST] = used_minors
585
586   if constants.NV_NODESETUP in what:
587     result[constants.NV_NODESETUP] = tmpr = []
588     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
589       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
590                   " under /sys, missing required directories /sys/block"
591                   " and /sys/class/net")
592     if (not os.path.isdir("/proc/sys") or
593         not os.path.isfile("/proc/sysrq-trigger")):
594       tmpr.append("The procfs filesystem doesn't seem to be mounted"
595                   " under /proc, missing required directory /proc/sys and"
596                   " the file /proc/sysrq-trigger")
597
598   if constants.NV_TIME in what:
599     result[constants.NV_TIME] = utils.SplitTime(time.time())
600
601   return result
602
603
604 def GetVolumeList(vg_name):
605   """Compute list of logical volumes and their size.
606
607   @type vg_name: str
608   @param vg_name: the volume group whose LVs we should list
609   @rtype: dict
610   @return:
611       dictionary of all partions (key) with value being a tuple of
612       their size (in MiB), inactive and online status::
613
614         {'test1': ('20.06', True, True)}
615
616       in case of errors, a string is returned with the error
617       details.
618
619   """
620   lvs = {}
621   sep = '|'
622   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
623                          "--separator=%s" % sep,
624                          "-olv_name,lv_size,lv_attr", vg_name])
625   if result.failed:
626     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
627
628   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
629   for line in result.stdout.splitlines():
630     line = line.strip()
631     match = valid_line_re.match(line)
632     if not match:
633       logging.error("Invalid line returned from lvs output: '%s'", line)
634       continue
635     name, size, attr = match.groups()
636     inactive = attr[4] == '-'
637     online = attr[5] == 'o'
638     virtual = attr[0] == 'v'
639     if virtual:
640       # we don't want to report such volumes as existing, since they
641       # don't really hold data
642       continue
643     lvs[name] = (size, inactive, online)
644
645   return lvs
646
647
648 def ListVolumeGroups():
649   """List the volume groups and their size.
650
651   @rtype: dict
652   @return: dictionary with keys volume name and values the
653       size of the volume
654
655   """
656   return utils.ListVolumeGroups()
657
658
659 def NodeVolumes():
660   """List all volumes on this node.
661
662   @rtype: list
663   @return:
664     A list of dictionaries, each having four keys:
665       - name: the logical volume name,
666       - size: the size of the logical volume
667       - dev: the physical device on which the LV lives
668       - vg: the volume group to which it belongs
669
670     In case of errors, we return an empty list and log the
671     error.
672
673     Note that since a logical volume can live on multiple physical
674     volumes, the resulting list might include a logical volume
675     multiple times.
676
677   """
678   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
679                          "--separator=|",
680                          "--options=lv_name,lv_size,devices,vg_name"])
681   if result.failed:
682     _Fail("Failed to list logical volumes, lvs output: %s",
683           result.output)
684
685   def parse_dev(dev):
686     return dev.split('(')[0]
687
688   def handle_dev(dev):
689     return [parse_dev(x) for x in dev.split(",")]
690
691   def map_line(line):
692     line = [v.strip() for v in line]
693     return [{'name': line[0], 'size': line[1],
694              'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
695
696   all_devs = []
697   for line in result.stdout.splitlines():
698     if line.count('|') >= 3:
699       all_devs.extend(map_line(line.split('|')))
700     else:
701       logging.warning("Strange line in the output from lvs: '%s'", line)
702   return all_devs
703
704
705 def BridgesExist(bridges_list):
706   """Check if a list of bridges exist on the current node.
707
708   @rtype: boolean
709   @return: C{True} if all of them exist, C{False} otherwise
710
711   """
712   missing = []
713   for bridge in bridges_list:
714     if not utils.BridgeExists(bridge):
715       missing.append(bridge)
716
717   if missing:
718     _Fail("Missing bridges %s", utils.CommaJoin(missing))
719
720
721 def GetInstanceList(hypervisor_list):
722   """Provides a list of instances.
723
724   @type hypervisor_list: list
725   @param hypervisor_list: the list of hypervisors to query information
726
727   @rtype: list
728   @return: a list of all running instances on the current node
729     - instance1.example.com
730     - instance2.example.com
731
732   """
733   results = []
734   for hname in hypervisor_list:
735     try:
736       names = hypervisor.GetHypervisor(hname).ListInstances()
737       results.extend(names)
738     except errors.HypervisorError, err:
739       _Fail("Error enumerating instances (hypervisor %s): %s",
740             hname, err, exc=True)
741
742   return results
743
744
745 def GetInstanceInfo(instance, hname):
746   """Gives back the information about an instance as a dictionary.
747
748   @type instance: string
749   @param instance: the instance name
750   @type hname: string
751   @param hname: the hypervisor type of the instance
752
753   @rtype: dict
754   @return: dictionary with the following keys:
755       - memory: memory size of instance (int)
756       - state: xen state of instance (string)
757       - time: cpu time of instance (float)
758
759   """
760   output = {}
761
762   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
763   if iinfo is not None:
764     output['memory'] = iinfo[2]
765     output['state'] = iinfo[4]
766     output['time'] = iinfo[5]
767
768   return output
769
770
771 def GetInstanceMigratable(instance):
772   """Gives whether an instance can be migrated.
773
774   @type instance: L{objects.Instance}
775   @param instance: object representing the instance to be checked.
776
777   @rtype: tuple
778   @return: tuple of (result, description) where:
779       - result: whether the instance can be migrated or not
780       - description: a description of the issue, if relevant
781
782   """
783   hyper = hypervisor.GetHypervisor(instance.hypervisor)
784   iname = instance.name
785   if iname not in hyper.ListInstances():
786     _Fail("Instance %s is not running", iname)
787
788   for idx in range(len(instance.disks)):
789     link_name = _GetBlockDevSymlinkPath(iname, idx)
790     if not os.path.islink(link_name):
791       _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
792
793
794 def GetAllInstancesInfo(hypervisor_list):
795   """Gather data about all instances.
796
797   This is the equivalent of L{GetInstanceInfo}, except that it
798   computes data for all instances at once, thus being faster if one
799   needs data about more than one instance.
800
801   @type hypervisor_list: list
802   @param hypervisor_list: list of hypervisors to query for instance data
803
804   @rtype: dict
805   @return: dictionary of instance: data, with data having the following keys:
806       - memory: memory size of instance (int)
807       - state: xen state of instance (string)
808       - time: cpu time of instance (float)
809       - vcpus: the number of vcpus
810
811   """
812   output = {}
813
814   for hname in hypervisor_list:
815     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
816     if iinfo:
817       for name, _, memory, vcpus, state, times in iinfo:
818         value = {
819           'memory': memory,
820           'vcpus': vcpus,
821           'state': state,
822           'time': times,
823           }
824         if name in output:
825           # we only check static parameters, like memory and vcpus,
826           # and not state and time which can change between the
827           # invocations of the different hypervisors
828           for key in 'memory', 'vcpus':
829             if value[key] != output[name][key]:
830               _Fail("Instance %s is running twice"
831                     " with different parameters", name)
832         output[name] = value
833
834   return output
835
836
837 def _InstanceLogName(kind, os_name, instance):
838   """Compute the OS log filename for a given instance and operation.
839
840   The instance name and os name are passed in as strings since not all
841   operations have these as part of an instance object.
842
843   @type kind: string
844   @param kind: the operation type (e.g. add, import, etc.)
845   @type os_name: string
846   @param os_name: the os name
847   @type instance: string
848   @param instance: the name of the instance being imported/added/etc.
849
850   """
851   # TODO: Use tempfile.mkstemp to create unique filename
852   base = ("%s-%s-%s-%s.log" %
853           (kind, os_name, instance, utils.TimestampForFilename()))
854   return utils.PathJoin(constants.LOG_OS_DIR, base)
855
856
857 def InstanceOsAdd(instance, reinstall, debug):
858   """Add an OS to an instance.
859
860   @type instance: L{objects.Instance}
861   @param instance: Instance whose OS is to be installed
862   @type reinstall: boolean
863   @param reinstall: whether this is an instance reinstall
864   @type debug: integer
865   @param debug: debug level, passed to the OS scripts
866   @rtype: None
867
868   """
869   inst_os = OSFromDisk(instance.os)
870
871   create_env = OSEnvironment(instance, inst_os, debug)
872   if reinstall:
873     create_env['INSTANCE_REINSTALL'] = "1"
874
875   logfile = _InstanceLogName("add", instance.os, instance.name)
876
877   result = utils.RunCmd([inst_os.create_script], env=create_env,
878                         cwd=inst_os.path, output=logfile,)
879   if result.failed:
880     logging.error("os create command '%s' returned error: %s, logfile: %s,"
881                   " output: %s", result.cmd, result.fail_reason, logfile,
882                   result.output)
883     lines = [utils.SafeEncode(val)
884              for val in utils.TailFile(logfile, lines=20)]
885     _Fail("OS create script failed (%s), last lines in the"
886           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
887
888
889 def RunRenameInstance(instance, old_name, debug):
890   """Run the OS rename script for an instance.
891
892   @type instance: L{objects.Instance}
893   @param instance: Instance whose OS is to be installed
894   @type old_name: string
895   @param old_name: previous instance name
896   @type debug: integer
897   @param debug: debug level, passed to the OS scripts
898   @rtype: boolean
899   @return: the success of the operation
900
901   """
902   inst_os = OSFromDisk(instance.os)
903
904   rename_env = OSEnvironment(instance, inst_os, debug)
905   rename_env['OLD_INSTANCE_NAME'] = old_name
906
907   logfile = _InstanceLogName("rename", instance.os,
908                              "%s-%s" % (old_name, instance.name))
909
910   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
911                         cwd=inst_os.path, output=logfile)
912
913   if result.failed:
914     logging.error("os create command '%s' returned error: %s output: %s",
915                   result.cmd, result.fail_reason, result.output)
916     lines = [utils.SafeEncode(val)
917              for val in utils.TailFile(logfile, lines=20)]
918     _Fail("OS rename script failed (%s), last lines in the"
919           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
920
921
922 def _GetVGInfo(vg_name):
923   """Get information about the volume group.
924
925   @type vg_name: str
926   @param vg_name: the volume group which we query
927   @rtype: dict
928   @return:
929     A dictionary with the following keys:
930       - C{vg_size} is the total size of the volume group in MiB
931       - C{vg_free} is the free size of the volume group in MiB
932       - C{pv_count} are the number of physical disks in that VG
933
934     If an error occurs during gathering of data, we return the same dict
935     with keys all set to None.
936
937   """
938   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
939
940   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
941                          "--nosuffix", "--units=m", "--separator=:", vg_name])
942
943   if retval.failed:
944     logging.error("volume group %s not present", vg_name)
945     return retdic
946   valarr = retval.stdout.strip().rstrip(':').split(':')
947   if len(valarr) == 3:
948     try:
949       retdic = {
950         "vg_size": int(round(float(valarr[0]), 0)),
951         "vg_free": int(round(float(valarr[1]), 0)),
952         "pv_count": int(valarr[2]),
953         }
954     except (TypeError, ValueError), err:
955       logging.exception("Fail to parse vgs output: %s", err)
956   else:
957     logging.error("vgs output has the wrong number of fields (expected"
958                   " three): %s", str(valarr))
959   return retdic
960
961
962 def _GetBlockDevSymlinkPath(instance_name, idx):
963   return utils.PathJoin(constants.DISK_LINKS_DIR,
964                         "%s:%d" % (instance_name, idx))
965
966
967 def _SymlinkBlockDev(instance_name, device_path, idx):
968   """Set up symlinks to a instance's block device.
969
970   This is an auxiliary function run when an instance is start (on the primary
971   node) or when an instance is migrated (on the target node).
972
973
974   @param instance_name: the name of the target instance
975   @param device_path: path of the physical block device, on the node
976   @param idx: the disk index
977   @return: absolute path to the disk's symlink
978
979   """
980   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
981   try:
982     os.symlink(device_path, link_name)
983   except OSError, err:
984     if err.errno == errno.EEXIST:
985       if (not os.path.islink(link_name) or
986           os.readlink(link_name) != device_path):
987         os.remove(link_name)
988         os.symlink(device_path, link_name)
989     else:
990       raise
991
992   return link_name
993
994
995 def _RemoveBlockDevLinks(instance_name, disks):
996   """Remove the block device symlinks belonging to the given instance.
997
998   """
999   for idx, _ in enumerate(disks):
1000     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1001     if os.path.islink(link_name):
1002       try:
1003         os.remove(link_name)
1004       except OSError:
1005         logging.exception("Can't remove symlink '%s'", link_name)
1006
1007
1008 def _GatherAndLinkBlockDevs(instance):
1009   """Set up an instance's block device(s).
1010
1011   This is run on the primary node at instance startup. The block
1012   devices must be already assembled.
1013
1014   @type instance: L{objects.Instance}
1015   @param instance: the instance whose disks we shoul assemble
1016   @rtype: list
1017   @return: list of (disk_object, device_path)
1018
1019   """
1020   block_devices = []
1021   for idx, disk in enumerate(instance.disks):
1022     device = _RecursiveFindBD(disk)
1023     if device is None:
1024       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1025                                     str(disk))
1026     device.Open()
1027     try:
1028       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1029     except OSError, e:
1030       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1031                                     e.strerror)
1032
1033     block_devices.append((disk, link_name))
1034
1035   return block_devices
1036
1037
1038 def StartInstance(instance):
1039   """Start an instance.
1040
1041   @type instance: L{objects.Instance}
1042   @param instance: the instance object
1043   @rtype: None
1044
1045   """
1046   running_instances = GetInstanceList([instance.hypervisor])
1047
1048   if instance.name in running_instances:
1049     logging.info("Instance %s already running, not starting", instance.name)
1050     return
1051
1052   try:
1053     block_devices = _GatherAndLinkBlockDevs(instance)
1054     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1055     hyper.StartInstance(instance, block_devices)
1056   except errors.BlockDeviceError, err:
1057     _Fail("Block device error: %s", err, exc=True)
1058   except errors.HypervisorError, err:
1059     _RemoveBlockDevLinks(instance.name, instance.disks)
1060     _Fail("Hypervisor error: %s", err, exc=True)
1061
1062
1063 def InstanceShutdown(instance, timeout):
1064   """Shut an instance down.
1065
1066   @note: this functions uses polling with a hardcoded timeout.
1067
1068   @type instance: L{objects.Instance}
1069   @param instance: the instance object
1070   @type timeout: integer
1071   @param timeout: maximum timeout for soft shutdown
1072   @rtype: None
1073
1074   """
1075   hv_name = instance.hypervisor
1076   hyper = hypervisor.GetHypervisor(hv_name)
1077   iname = instance.name
1078
1079   if instance.name not in hyper.ListInstances():
1080     logging.info("Instance %s not running, doing nothing", iname)
1081     return
1082
1083   class _TryShutdown:
1084     def __init__(self):
1085       self.tried_once = False
1086
1087     def __call__(self):
1088       if iname not in hyper.ListInstances():
1089         return
1090
1091       try:
1092         hyper.StopInstance(instance, retry=self.tried_once)
1093       except errors.HypervisorError, err:
1094         if iname not in hyper.ListInstances():
1095           # if the instance is no longer existing, consider this a
1096           # success and go to cleanup
1097           return
1098
1099         _Fail("Failed to stop instance %s: %s", iname, err)
1100
1101       self.tried_once = True
1102
1103       raise utils.RetryAgain()
1104
1105   try:
1106     utils.Retry(_TryShutdown(), 5, timeout)
1107   except utils.RetryTimeout:
1108     # the shutdown did not succeed
1109     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1110
1111     try:
1112       hyper.StopInstance(instance, force=True)
1113     except errors.HypervisorError, err:
1114       if iname in hyper.ListInstances():
1115         # only raise an error if the instance still exists, otherwise
1116         # the error could simply be "instance ... unknown"!
1117         _Fail("Failed to force stop instance %s: %s", iname, err)
1118
1119     time.sleep(1)
1120
1121     if iname in hyper.ListInstances():
1122       _Fail("Could not shutdown instance %s even by destroy", iname)
1123
1124   try:
1125     hyper.CleanupInstance(instance.name)
1126   except errors.HypervisorError, err:
1127     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1128
1129   _RemoveBlockDevLinks(iname, instance.disks)
1130
1131
1132 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1133   """Reboot an instance.
1134
1135   @type instance: L{objects.Instance}
1136   @param instance: the instance object to reboot
1137   @type reboot_type: str
1138   @param reboot_type: the type of reboot, one the following
1139     constants:
1140       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1141         instance OS, do not recreate the VM
1142       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1143         restart the VM (at the hypervisor level)
1144       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1145         not accepted here, since that mode is handled differently, in
1146         cmdlib, and translates into full stop and start of the
1147         instance (instead of a call_instance_reboot RPC)
1148   @type shutdown_timeout: integer
1149   @param shutdown_timeout: maximum timeout for soft shutdown
1150   @rtype: None
1151
1152   """
1153   running_instances = GetInstanceList([instance.hypervisor])
1154
1155   if instance.name not in running_instances:
1156     _Fail("Cannot reboot instance %s that is not running", instance.name)
1157
1158   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1159   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1160     try:
1161       hyper.RebootInstance(instance)
1162     except errors.HypervisorError, err:
1163       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1164   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1165     try:
1166       InstanceShutdown(instance, shutdown_timeout)
1167       return StartInstance(instance)
1168     except errors.HypervisorError, err:
1169       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1170   else:
1171     _Fail("Invalid reboot_type received: %s", reboot_type)
1172
1173
1174 def MigrationInfo(instance):
1175   """Gather information about an instance to be migrated.
1176
1177   @type instance: L{objects.Instance}
1178   @param instance: the instance definition
1179
1180   """
1181   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1182   try:
1183     info = hyper.MigrationInfo(instance)
1184   except errors.HypervisorError, err:
1185     _Fail("Failed to fetch migration information: %s", err, exc=True)
1186   return info
1187
1188
1189 def AcceptInstance(instance, info, target):
1190   """Prepare the node to accept an instance.
1191
1192   @type instance: L{objects.Instance}
1193   @param instance: the instance definition
1194   @type info: string/data (opaque)
1195   @param info: migration information, from the source node
1196   @type target: string
1197   @param target: target host (usually ip), on this node
1198
1199   """
1200   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1201   try:
1202     hyper.AcceptInstance(instance, info, target)
1203   except errors.HypervisorError, err:
1204     _Fail("Failed to accept instance: %s", err, exc=True)
1205
1206
1207 def FinalizeMigration(instance, info, success):
1208   """Finalize any preparation to accept an instance.
1209
1210   @type instance: L{objects.Instance}
1211   @param instance: the instance definition
1212   @type info: string/data (opaque)
1213   @param info: migration information, from the source node
1214   @type success: boolean
1215   @param success: whether the migration was a success or a failure
1216
1217   """
1218   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1219   try:
1220     hyper.FinalizeMigration(instance, info, success)
1221   except errors.HypervisorError, err:
1222     _Fail("Failed to finalize migration: %s", err, exc=True)
1223
1224
1225 def MigrateInstance(instance, target, live):
1226   """Migrates an instance to another node.
1227
1228   @type instance: L{objects.Instance}
1229   @param instance: the instance definition
1230   @type target: string
1231   @param target: the target node name
1232   @type live: boolean
1233   @param live: whether the migration should be done live or not (the
1234       interpretation of this parameter is left to the hypervisor)
1235   @rtype: tuple
1236   @return: a tuple of (success, msg) where:
1237       - succes is a boolean denoting the success/failure of the operation
1238       - msg is a string with details in case of failure
1239
1240   """
1241   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1242
1243   try:
1244     hyper.MigrateInstance(instance, target, live)
1245   except errors.HypervisorError, err:
1246     _Fail("Failed to migrate instance: %s", err, exc=True)
1247
1248
1249 def BlockdevCreate(disk, size, owner, on_primary, info):
1250   """Creates a block device for an instance.
1251
1252   @type disk: L{objects.Disk}
1253   @param disk: the object describing the disk we should create
1254   @type size: int
1255   @param size: the size of the physical underlying device, in MiB
1256   @type owner: str
1257   @param owner: the name of the instance for which disk is created,
1258       used for device cache data
1259   @type on_primary: boolean
1260   @param on_primary:  indicates if it is the primary node or not
1261   @type info: string
1262   @param info: string that will be sent to the physical device
1263       creation, used for example to set (LVM) tags on LVs
1264
1265   @return: the new unique_id of the device (this can sometime be
1266       computed only after creation), or None. On secondary nodes,
1267       it's not required to return anything.
1268
1269   """
1270   # TODO: remove the obsolete 'size' argument
1271   # pylint: disable-msg=W0613
1272   clist = []
1273   if disk.children:
1274     for child in disk.children:
1275       try:
1276         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1277       except errors.BlockDeviceError, err:
1278         _Fail("Can't assemble device %s: %s", child, err)
1279       if on_primary or disk.AssembleOnSecondary():
1280         # we need the children open in case the device itself has to
1281         # be assembled
1282         try:
1283           # pylint: disable-msg=E1103
1284           crdev.Open()
1285         except errors.BlockDeviceError, err:
1286           _Fail("Can't make child '%s' read-write: %s", child, err)
1287       clist.append(crdev)
1288
1289   try:
1290     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1291   except errors.BlockDeviceError, err:
1292     _Fail("Can't create block device: %s", err)
1293
1294   if on_primary or disk.AssembleOnSecondary():
1295     try:
1296       device.Assemble()
1297     except errors.BlockDeviceError, err:
1298       _Fail("Can't assemble device after creation, unusual event: %s", err)
1299     device.SetSyncSpeed(constants.SYNC_SPEED)
1300     if on_primary or disk.OpenOnSecondary():
1301       try:
1302         device.Open(force=True)
1303       except errors.BlockDeviceError, err:
1304         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1305     DevCacheManager.UpdateCache(device.dev_path, owner,
1306                                 on_primary, disk.iv_name)
1307
1308   device.SetInfo(info)
1309
1310   return device.unique_id
1311
1312
1313 def BlockdevRemove(disk):
1314   """Remove a block device.
1315
1316   @note: This is intended to be called recursively.
1317
1318   @type disk: L{objects.Disk}
1319   @param disk: the disk object we should remove
1320   @rtype: boolean
1321   @return: the success of the operation
1322
1323   """
1324   msgs = []
1325   try:
1326     rdev = _RecursiveFindBD(disk)
1327   except errors.BlockDeviceError, err:
1328     # probably can't attach
1329     logging.info("Can't attach to device %s in remove", disk)
1330     rdev = None
1331   if rdev is not None:
1332     r_path = rdev.dev_path
1333     try:
1334       rdev.Remove()
1335     except errors.BlockDeviceError, err:
1336       msgs.append(str(err))
1337     if not msgs:
1338       DevCacheManager.RemoveCache(r_path)
1339
1340   if disk.children:
1341     for child in disk.children:
1342       try:
1343         BlockdevRemove(child)
1344       except RPCFail, err:
1345         msgs.append(str(err))
1346
1347   if msgs:
1348     _Fail("; ".join(msgs))
1349
1350
1351 def _RecursiveAssembleBD(disk, owner, as_primary):
1352   """Activate a block device for an instance.
1353
1354   This is run on the primary and secondary nodes for an instance.
1355
1356   @note: this function is called recursively.
1357
1358   @type disk: L{objects.Disk}
1359   @param disk: the disk we try to assemble
1360   @type owner: str
1361   @param owner: the name of the instance which owns the disk
1362   @type as_primary: boolean
1363   @param as_primary: if we should make the block device
1364       read/write
1365
1366   @return: the assembled device or None (in case no device
1367       was assembled)
1368   @raise errors.BlockDeviceError: in case there is an error
1369       during the activation of the children or the device
1370       itself
1371
1372   """
1373   children = []
1374   if disk.children:
1375     mcn = disk.ChildrenNeeded()
1376     if mcn == -1:
1377       mcn = 0 # max number of Nones allowed
1378     else:
1379       mcn = len(disk.children) - mcn # max number of Nones
1380     for chld_disk in disk.children:
1381       try:
1382         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1383       except errors.BlockDeviceError, err:
1384         if children.count(None) >= mcn:
1385           raise
1386         cdev = None
1387         logging.error("Error in child activation (but continuing): %s",
1388                       str(err))
1389       children.append(cdev)
1390
1391   if as_primary or disk.AssembleOnSecondary():
1392     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1393     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1394     result = r_dev
1395     if as_primary or disk.OpenOnSecondary():
1396       r_dev.Open()
1397     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1398                                 as_primary, disk.iv_name)
1399
1400   else:
1401     result = True
1402   return result
1403
1404
1405 def BlockdevAssemble(disk, owner, as_primary):
1406   """Activate a block device for an instance.
1407
1408   This is a wrapper over _RecursiveAssembleBD.
1409
1410   @rtype: str or boolean
1411   @return: a C{/dev/...} path for primary nodes, and
1412       C{True} for secondary nodes
1413
1414   """
1415   try:
1416     result = _RecursiveAssembleBD(disk, owner, as_primary)
1417     if isinstance(result, bdev.BlockDev):
1418       # pylint: disable-msg=E1103
1419       result = result.dev_path
1420   except errors.BlockDeviceError, err:
1421     _Fail("Error while assembling disk: %s", err, exc=True)
1422
1423   return result
1424
1425
1426 def BlockdevShutdown(disk):
1427   """Shut down a block device.
1428
1429   First, if the device is assembled (Attach() is successful), then
1430   the device is shutdown. Then the children of the device are
1431   shutdown.
1432
1433   This function is called recursively. Note that we don't cache the
1434   children or such, as oppossed to assemble, shutdown of different
1435   devices doesn't require that the upper device was active.
1436
1437   @type disk: L{objects.Disk}
1438   @param disk: the description of the disk we should
1439       shutdown
1440   @rtype: None
1441
1442   """
1443   msgs = []
1444   r_dev = _RecursiveFindBD(disk)
1445   if r_dev is not None:
1446     r_path = r_dev.dev_path
1447     try:
1448       r_dev.Shutdown()
1449       DevCacheManager.RemoveCache(r_path)
1450     except errors.BlockDeviceError, err:
1451       msgs.append(str(err))
1452
1453   if disk.children:
1454     for child in disk.children:
1455       try:
1456         BlockdevShutdown(child)
1457       except RPCFail, err:
1458         msgs.append(str(err))
1459
1460   if msgs:
1461     _Fail("; ".join(msgs))
1462
1463
1464 def BlockdevAddchildren(parent_cdev, new_cdevs):
1465   """Extend a mirrored block device.
1466
1467   @type parent_cdev: L{objects.Disk}
1468   @param parent_cdev: the disk to which we should add children
1469   @type new_cdevs: list of L{objects.Disk}
1470   @param new_cdevs: the list of children which we should add
1471   @rtype: None
1472
1473   """
1474   parent_bdev = _RecursiveFindBD(parent_cdev)
1475   if parent_bdev is None:
1476     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1477   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1478   if new_bdevs.count(None) > 0:
1479     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1480   parent_bdev.AddChildren(new_bdevs)
1481
1482
1483 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1484   """Shrink a mirrored block device.
1485
1486   @type parent_cdev: L{objects.Disk}
1487   @param parent_cdev: the disk from which we should remove children
1488   @type new_cdevs: list of L{objects.Disk}
1489   @param new_cdevs: the list of children which we should remove
1490   @rtype: None
1491
1492   """
1493   parent_bdev = _RecursiveFindBD(parent_cdev)
1494   if parent_bdev is None:
1495     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1496   devs = []
1497   for disk in new_cdevs:
1498     rpath = disk.StaticDevPath()
1499     if rpath is None:
1500       bd = _RecursiveFindBD(disk)
1501       if bd is None:
1502         _Fail("Can't find device %s while removing children", disk)
1503       else:
1504         devs.append(bd.dev_path)
1505     else:
1506       if not utils.IsNormAbsPath(rpath):
1507         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1508       devs.append(rpath)
1509   parent_bdev.RemoveChildren(devs)
1510
1511
1512 def BlockdevGetmirrorstatus(disks):
1513   """Get the mirroring status of a list of devices.
1514
1515   @type disks: list of L{objects.Disk}
1516   @param disks: the list of disks which we should query
1517   @rtype: disk
1518   @return:
1519       a list of (mirror_done, estimated_time) tuples, which
1520       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1521   @raise errors.BlockDeviceError: if any of the disks cannot be
1522       found
1523
1524   """
1525   stats = []
1526   for dsk in disks:
1527     rbd = _RecursiveFindBD(dsk)
1528     if rbd is None:
1529       _Fail("Can't find device %s", dsk)
1530
1531     stats.append(rbd.CombinedSyncStatus())
1532
1533   return stats
1534
1535
1536 def _RecursiveFindBD(disk):
1537   """Check if a device is activated.
1538
1539   If so, return information about the real device.
1540
1541   @type disk: L{objects.Disk}
1542   @param disk: the disk object we need to find
1543
1544   @return: None if the device can't be found,
1545       otherwise the device instance
1546
1547   """
1548   children = []
1549   if disk.children:
1550     for chdisk in disk.children:
1551       children.append(_RecursiveFindBD(chdisk))
1552
1553   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1554
1555
1556 def _OpenRealBD(disk):
1557   """Opens the underlying block device of a disk.
1558
1559   @type disk: L{objects.Disk}
1560   @param disk: the disk object we want to open
1561
1562   """
1563   real_disk = _RecursiveFindBD(disk)
1564   if real_disk is None:
1565     _Fail("Block device '%s' is not set up", disk)
1566
1567   real_disk.Open()
1568
1569   return real_disk
1570
1571
1572 def BlockdevFind(disk):
1573   """Check if a device is activated.
1574
1575   If it is, return information about the real device.
1576
1577   @type disk: L{objects.Disk}
1578   @param disk: the disk to find
1579   @rtype: None or objects.BlockDevStatus
1580   @return: None if the disk cannot be found, otherwise a the current
1581            information
1582
1583   """
1584   try:
1585     rbd = _RecursiveFindBD(disk)
1586   except errors.BlockDeviceError, err:
1587     _Fail("Failed to find device: %s", err, exc=True)
1588
1589   if rbd is None:
1590     return None
1591
1592   return rbd.GetSyncStatus()
1593
1594
1595 def BlockdevGetsize(disks):
1596   """Computes the size of the given disks.
1597
1598   If a disk is not found, returns None instead.
1599
1600   @type disks: list of L{objects.Disk}
1601   @param disks: the list of disk to compute the size for
1602   @rtype: list
1603   @return: list with elements None if the disk cannot be found,
1604       otherwise the size
1605
1606   """
1607   result = []
1608   for cf in disks:
1609     try:
1610       rbd = _RecursiveFindBD(cf)
1611     except errors.BlockDeviceError:
1612       result.append(None)
1613       continue
1614     if rbd is None:
1615       result.append(None)
1616     else:
1617       result.append(rbd.GetActualSize())
1618   return result
1619
1620
1621 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1622   """Export a block device to a remote node.
1623
1624   @type disk: L{objects.Disk}
1625   @param disk: the description of the disk to export
1626   @type dest_node: str
1627   @param dest_node: the destination node to export to
1628   @type dest_path: str
1629   @param dest_path: the destination path on the target node
1630   @type cluster_name: str
1631   @param cluster_name: the cluster name, needed for SSH hostalias
1632   @rtype: None
1633
1634   """
1635   real_disk = _OpenRealBD(disk)
1636
1637   # the block size on the read dd is 1MiB to match our units
1638   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1639                                "dd if=%s bs=1048576 count=%s",
1640                                real_disk.dev_path, str(disk.size))
1641
1642   # we set here a smaller block size as, due to ssh buffering, more
1643   # than 64-128k will mostly ignored; we use nocreat to fail if the
1644   # device is not already there or we pass a wrong path; we use
1645   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1646   # to not buffer too much memory; this means that at best, we flush
1647   # every 64k, which will not be very fast
1648   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1649                                 " oflag=dsync", dest_path)
1650
1651   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1652                                                    constants.GANETI_RUNAS,
1653                                                    destcmd)
1654
1655   # all commands have been checked, so we're safe to combine them
1656   command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1657
1658   result = utils.RunCmd(["bash", "-c", command])
1659
1660   if result.failed:
1661     _Fail("Disk copy command '%s' returned error: %s"
1662           " output: %s", command, result.fail_reason, result.output)
1663
1664
1665 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1666   """Write a file to the filesystem.
1667
1668   This allows the master to overwrite(!) a file. It will only perform
1669   the operation if the file belongs to a list of configuration files.
1670
1671   @type file_name: str
1672   @param file_name: the target file name
1673   @type data: str
1674   @param data: the new contents of the file
1675   @type mode: int
1676   @param mode: the mode to give the file (can be None)
1677   @type uid: int
1678   @param uid: the owner of the file (can be -1 for default)
1679   @type gid: int
1680   @param gid: the group of the file (can be -1 for default)
1681   @type atime: float
1682   @param atime: the atime to set on the file (can be None)
1683   @type mtime: float
1684   @param mtime: the mtime to set on the file (can be None)
1685   @rtype: None
1686
1687   """
1688   if not os.path.isabs(file_name):
1689     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1690
1691   if file_name not in _ALLOWED_UPLOAD_FILES:
1692     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1693           file_name)
1694
1695   raw_data = _Decompress(data)
1696
1697   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1698                   atime=atime, mtime=mtime)
1699
1700
1701 def WriteSsconfFiles(values):
1702   """Update all ssconf files.
1703
1704   Wrapper around the SimpleStore.WriteFiles.
1705
1706   """
1707   ssconf.SimpleStore().WriteFiles(values)
1708
1709
1710 def _ErrnoOrStr(err):
1711   """Format an EnvironmentError exception.
1712
1713   If the L{err} argument has an errno attribute, it will be looked up
1714   and converted into a textual C{E...} description. Otherwise the
1715   string representation of the error will be returned.
1716
1717   @type err: L{EnvironmentError}
1718   @param err: the exception to format
1719
1720   """
1721   if hasattr(err, 'errno'):
1722     detail = errno.errorcode[err.errno]
1723   else:
1724     detail = str(err)
1725   return detail
1726
1727
1728 def _OSOndiskAPIVersion(os_dir):
1729   """Compute and return the API version of a given OS.
1730
1731   This function will try to read the API version of the OS residing in
1732   the 'os_dir' directory.
1733
1734   @type os_dir: str
1735   @param os_dir: the directory in which we should look for the OS
1736   @rtype: tuple
1737   @return: tuple (status, data) with status denoting the validity and
1738       data holding either the vaid versions or an error message
1739
1740   """
1741   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1742
1743   try:
1744     st = os.stat(api_file)
1745   except EnvironmentError, err:
1746     return False, ("Required file '%s' not found under path %s: %s" %
1747                    (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1748
1749   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1750     return False, ("File '%s' in %s is not a regular file" %
1751                    (constants.OS_API_FILE, os_dir))
1752
1753   try:
1754     api_versions = utils.ReadFile(api_file).splitlines()
1755   except EnvironmentError, err:
1756     return False, ("Error while reading the API version file at %s: %s" %
1757                    (api_file, _ErrnoOrStr(err)))
1758
1759   try:
1760     api_versions = [int(version.strip()) for version in api_versions]
1761   except (TypeError, ValueError), err:
1762     return False, ("API version(s) can't be converted to integer: %s" %
1763                    str(err))
1764
1765   return True, api_versions
1766
1767
1768 def DiagnoseOS(top_dirs=None):
1769   """Compute the validity for all OSes.
1770
1771   @type top_dirs: list
1772   @param top_dirs: the list of directories in which to
1773       search (if not given defaults to
1774       L{constants.OS_SEARCH_PATH})
1775   @rtype: list of L{objects.OS}
1776   @return: a list of tuples (name, path, status, diagnose, variants)
1777       for all (potential) OSes under all search paths, where:
1778           - name is the (potential) OS name
1779           - path is the full path to the OS
1780           - status True/False is the validity of the OS
1781           - diagnose is the error message for an invalid OS, otherwise empty
1782           - variants is a list of supported OS variants, if any
1783
1784   """
1785   if top_dirs is None:
1786     top_dirs = constants.OS_SEARCH_PATH
1787
1788   result = []
1789   for dir_name in top_dirs:
1790     if os.path.isdir(dir_name):
1791       try:
1792         f_names = utils.ListVisibleFiles(dir_name)
1793       except EnvironmentError, err:
1794         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1795         break
1796       for name in f_names:
1797         os_path = utils.PathJoin(dir_name, name)
1798         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1799         if status:
1800           diagnose = ""
1801           variants = os_inst.supported_variants
1802         else:
1803           diagnose = os_inst
1804           variants = []
1805         result.append((name, os_path, status, diagnose, variants))
1806
1807   return result
1808
1809
1810 def _TryOSFromDisk(name, base_dir=None):
1811   """Create an OS instance from disk.
1812
1813   This function will return an OS instance if the given name is a
1814   valid OS name.
1815
1816   @type base_dir: string
1817   @keyword base_dir: Base directory containing OS installations.
1818                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1819   @rtype: tuple
1820   @return: success and either the OS instance if we find a valid one,
1821       or error message
1822
1823   """
1824   if base_dir is None:
1825     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1826   else:
1827     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1828
1829   if os_dir is None:
1830     return False, "Directory for OS %s not found in search path" % name
1831
1832   status, api_versions = _OSOndiskAPIVersion(os_dir)
1833   if not status:
1834     # push the error up
1835     return status, api_versions
1836
1837   if not constants.OS_API_VERSIONS.intersection(api_versions):
1838     return False, ("API version mismatch for path '%s': found %s, want %s." %
1839                    (os_dir, api_versions, constants.OS_API_VERSIONS))
1840
1841   # OS Files dictionary, we will populate it with the absolute path names
1842   os_files = dict.fromkeys(constants.OS_SCRIPTS)
1843
1844   if max(api_versions) >= constants.OS_API_V15:
1845     os_files[constants.OS_VARIANTS_FILE] = ''
1846
1847   for filename in os_files:
1848     os_files[filename] = utils.PathJoin(os_dir, filename)
1849
1850     try:
1851       st = os.stat(os_files[filename])
1852     except EnvironmentError, err:
1853       return False, ("File '%s' under path '%s' is missing (%s)" %
1854                      (filename, os_dir, _ErrnoOrStr(err)))
1855
1856     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1857       return False, ("File '%s' under path '%s' is not a regular file" %
1858                      (filename, os_dir))
1859
1860     if filename in constants.OS_SCRIPTS:
1861       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1862         return False, ("File '%s' under path '%s' is not executable" %
1863                        (filename, os_dir))
1864
1865   variants = None
1866   if constants.OS_VARIANTS_FILE in os_files:
1867     variants_file = os_files[constants.OS_VARIANTS_FILE]
1868     try:
1869       variants = utils.ReadFile(variants_file).splitlines()
1870     except EnvironmentError, err:
1871       return False, ("Error while reading the OS variants file at %s: %s" %
1872                      (variants_file, _ErrnoOrStr(err)))
1873     if not variants:
1874       return False, ("No supported os variant found")
1875
1876   os_obj = objects.OS(name=name, path=os_dir,
1877                       create_script=os_files[constants.OS_SCRIPT_CREATE],
1878                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
1879                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
1880                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
1881                       supported_variants=variants,
1882                       api_versions=api_versions)
1883   return True, os_obj
1884
1885
1886 def OSFromDisk(name, base_dir=None):
1887   """Create an OS instance from disk.
1888
1889   This function will return an OS instance if the given name is a
1890   valid OS name. Otherwise, it will raise an appropriate
1891   L{RPCFail} exception, detailing why this is not a valid OS.
1892
1893   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1894   an exception but returns true/false status data.
1895
1896   @type base_dir: string
1897   @keyword base_dir: Base directory containing OS installations.
1898                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1899   @rtype: L{objects.OS}
1900   @return: the OS instance if we find a valid one
1901   @raise RPCFail: if we don't find a valid OS
1902
1903   """
1904   name_only = name.split("+", 1)[0]
1905   status, payload = _TryOSFromDisk(name_only, base_dir)
1906
1907   if not status:
1908     _Fail(payload)
1909
1910   return payload
1911
1912
1913 def OSEnvironment(instance, inst_os, debug=0):
1914   """Calculate the environment for an os script.
1915
1916   @type instance: L{objects.Instance}
1917   @param instance: target instance for the os script run
1918   @type inst_os: L{objects.OS}
1919   @param inst_os: operating system for which the environment is being built
1920   @type debug: integer
1921   @param debug: debug level (0 or 1, for OS Api 10)
1922   @rtype: dict
1923   @return: dict of environment variables
1924   @raise errors.BlockDeviceError: if the block device
1925       cannot be found
1926
1927   """
1928   result = {}
1929   api_version = \
1930     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1931   result['OS_API_VERSION'] = '%d' % api_version
1932   result['INSTANCE_NAME'] = instance.name
1933   result['INSTANCE_OS'] = instance.os
1934   result['HYPERVISOR'] = instance.hypervisor
1935   result['DISK_COUNT'] = '%d' % len(instance.disks)
1936   result['NIC_COUNT'] = '%d' % len(instance.nics)
1937   result['DEBUG_LEVEL'] = '%d' % debug
1938   if api_version >= constants.OS_API_V15:
1939     try:
1940       variant = instance.os.split('+', 1)[1]
1941     except IndexError:
1942       variant = inst_os.supported_variants[0]
1943     result['OS_VARIANT'] = variant
1944   for idx, disk in enumerate(instance.disks):
1945     real_disk = _OpenRealBD(disk)
1946     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1947     result['DISK_%d_ACCESS' % idx] = disk.mode
1948     if constants.HV_DISK_TYPE in instance.hvparams:
1949       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1950         instance.hvparams[constants.HV_DISK_TYPE]
1951     if disk.dev_type in constants.LDS_BLOCK:
1952       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1953     elif disk.dev_type == constants.LD_FILE:
1954       result['DISK_%d_BACKEND_TYPE' % idx] = \
1955         'file:%s' % disk.physical_id[0]
1956   for idx, nic in enumerate(instance.nics):
1957     result['NIC_%d_MAC' % idx] = nic.mac
1958     if nic.ip:
1959       result['NIC_%d_IP' % idx] = nic.ip
1960     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1961     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1962       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1963     if nic.nicparams[constants.NIC_LINK]:
1964       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1965     if constants.HV_NIC_TYPE in instance.hvparams:
1966       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1967         instance.hvparams[constants.HV_NIC_TYPE]
1968
1969   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1970     for key, value in source.items():
1971       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1972
1973   return result
1974
1975
1976 def BlockdevGrow(disk, amount):
1977   """Grow a stack of block devices.
1978
1979   This function is called recursively, with the childrens being the
1980   first ones to resize.
1981
1982   @type disk: L{objects.Disk}
1983   @param disk: the disk to be grown
1984   @rtype: (status, result)
1985   @return: a tuple with the status of the operation
1986       (True/False), and the errors message if status
1987       is False
1988
1989   """
1990   r_dev = _RecursiveFindBD(disk)
1991   if r_dev is None:
1992     _Fail("Cannot find block device %s", disk)
1993
1994   try:
1995     r_dev.Grow(amount)
1996   except errors.BlockDeviceError, err:
1997     _Fail("Failed to grow block device: %s", err, exc=True)
1998
1999
2000 def BlockdevSnapshot(disk):
2001   """Create a snapshot copy of a block device.
2002
2003   This function is called recursively, and the snapshot is actually created
2004   just for the leaf lvm backend device.
2005
2006   @type disk: L{objects.Disk}
2007   @param disk: the disk to be snapshotted
2008   @rtype: string
2009   @return: snapshot disk path
2010
2011   """
2012   if disk.dev_type == constants.LD_DRBD8:
2013     if not disk.children:
2014       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2015             disk.unique_id)
2016     return BlockdevSnapshot(disk.children[0])
2017   elif disk.dev_type == constants.LD_LV:
2018     r_dev = _RecursiveFindBD(disk)
2019     if r_dev is not None:
2020       # FIXME: choose a saner value for the snapshot size
2021       # let's stay on the safe side and ask for the full size, for now
2022       return r_dev.Snapshot(disk.size)
2023     else:
2024       _Fail("Cannot find block device %s", disk)
2025   else:
2026     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2027           disk.unique_id, disk.dev_type)
2028
2029
2030 def FinalizeExport(instance, snap_disks):
2031   """Write out the export configuration information.
2032
2033   @type instance: L{objects.Instance}
2034   @param instance: the instance which we export, used for
2035       saving configuration
2036   @type snap_disks: list of L{objects.Disk}
2037   @param snap_disks: list of snapshot block devices, which
2038       will be used to get the actual name of the dump file
2039
2040   @rtype: None
2041
2042   """
2043   destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2044   finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2045
2046   config = objects.SerializableConfigParser()
2047
2048   config.add_section(constants.INISECT_EXP)
2049   config.set(constants.INISECT_EXP, 'version', '0')
2050   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2051   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2052   config.set(constants.INISECT_EXP, 'os', instance.os)
2053   config.set(constants.INISECT_EXP, 'compression', 'gzip')
2054
2055   config.add_section(constants.INISECT_INS)
2056   config.set(constants.INISECT_INS, 'name', instance.name)
2057   config.set(constants.INISECT_INS, 'memory', '%d' %
2058              instance.beparams[constants.BE_MEMORY])
2059   config.set(constants.INISECT_INS, 'vcpus', '%d' %
2060              instance.beparams[constants.BE_VCPUS])
2061   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2062   config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2063
2064   nic_total = 0
2065   for nic_count, nic in enumerate(instance.nics):
2066     nic_total += 1
2067     config.set(constants.INISECT_INS, 'nic%d_mac' %
2068                nic_count, '%s' % nic.mac)
2069     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2070     for param in constants.NICS_PARAMETER_TYPES:
2071       config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2072                  '%s' % nic.nicparams.get(param, None))
2073   # TODO: redundant: on load can read nics until it doesn't exist
2074   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2075
2076   disk_total = 0
2077   for disk_count, disk in enumerate(snap_disks):
2078     if disk:
2079       disk_total += 1
2080       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2081                  ('%s' % disk.iv_name))
2082       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2083                  ('%s' % disk.physical_id[1]))
2084       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2085                  ('%d' % disk.size))
2086
2087   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2088
2089   # New-style hypervisor/backend parameters
2090
2091   config.add_section(constants.INISECT_HYP)
2092   for name, value in instance.hvparams.items():
2093     if name not in constants.HVC_GLOBALS:
2094       config.set(constants.INISECT_HYP, name, str(value))
2095
2096   config.add_section(constants.INISECT_BEP)
2097   for name, value in instance.beparams.items():
2098     config.set(constants.INISECT_BEP, name, str(value))
2099
2100   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2101                   data=config.Dumps())
2102   shutil.rmtree(finaldestdir, ignore_errors=True)
2103   shutil.move(destdir, finaldestdir)
2104
2105
2106 def ExportInfo(dest):
2107   """Get export configuration information.
2108
2109   @type dest: str
2110   @param dest: directory containing the export
2111
2112   @rtype: L{objects.SerializableConfigParser}
2113   @return: a serializable config file containing the
2114       export info
2115
2116   """
2117   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2118
2119   config = objects.SerializableConfigParser()
2120   config.read(cff)
2121
2122   if (not config.has_section(constants.INISECT_EXP) or
2123       not config.has_section(constants.INISECT_INS)):
2124     _Fail("Export info file doesn't have the required fields")
2125
2126   return config.Dumps()
2127
2128
2129 def ListExports():
2130   """Return a list of exports currently available on this machine.
2131
2132   @rtype: list
2133   @return: list of the exports
2134
2135   """
2136   if os.path.isdir(constants.EXPORT_DIR):
2137     return utils.ListVisibleFiles(constants.EXPORT_DIR)
2138   else:
2139     _Fail("No exports directory")
2140
2141
2142 def RemoveExport(export):
2143   """Remove an existing export from the node.
2144
2145   @type export: str
2146   @param export: the name of the export to remove
2147   @rtype: None
2148
2149   """
2150   target = utils.PathJoin(constants.EXPORT_DIR, export)
2151
2152   try:
2153     shutil.rmtree(target)
2154   except EnvironmentError, err:
2155     _Fail("Error while removing the export: %s", err, exc=True)
2156
2157
2158 def BlockdevRename(devlist):
2159   """Rename a list of block devices.
2160
2161   @type devlist: list of tuples
2162   @param devlist: list of tuples of the form  (disk,
2163       new_logical_id, new_physical_id); disk is an
2164       L{objects.Disk} object describing the current disk,
2165       and new logical_id/physical_id is the name we
2166       rename it to
2167   @rtype: boolean
2168   @return: True if all renames succeeded, False otherwise
2169
2170   """
2171   msgs = []
2172   result = True
2173   for disk, unique_id in devlist:
2174     dev = _RecursiveFindBD(disk)
2175     if dev is None:
2176       msgs.append("Can't find device %s in rename" % str(disk))
2177       result = False
2178       continue
2179     try:
2180       old_rpath = dev.dev_path
2181       dev.Rename(unique_id)
2182       new_rpath = dev.dev_path
2183       if old_rpath != new_rpath:
2184         DevCacheManager.RemoveCache(old_rpath)
2185         # FIXME: we should add the new cache information here, like:
2186         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2187         # but we don't have the owner here - maybe parse from existing
2188         # cache? for now, we only lose lvm data when we rename, which
2189         # is less critical than DRBD or MD
2190     except errors.BlockDeviceError, err:
2191       msgs.append("Can't rename device '%s' to '%s': %s" %
2192                   (dev, unique_id, err))
2193       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2194       result = False
2195   if not result:
2196     _Fail("; ".join(msgs))
2197
2198
2199 def _TransformFileStorageDir(file_storage_dir):
2200   """Checks whether given file_storage_dir is valid.
2201
2202   Checks wheter the given file_storage_dir is within the cluster-wide
2203   default file_storage_dir stored in SimpleStore. Only paths under that
2204   directory are allowed.
2205
2206   @type file_storage_dir: str
2207   @param file_storage_dir: the path to check
2208
2209   @return: the normalized path if valid, None otherwise
2210
2211   """
2212   if not constants.ENABLE_FILE_STORAGE:
2213     _Fail("File storage disabled at configure time")
2214   cfg = _GetConfig()
2215   file_storage_dir = os.path.normpath(file_storage_dir)
2216   base_file_storage_dir = cfg.GetFileStorageDir()
2217   if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2218       base_file_storage_dir):
2219     _Fail("File storage directory '%s' is not under base file"
2220           " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2221   return file_storage_dir
2222
2223
2224 def CreateFileStorageDir(file_storage_dir):
2225   """Create file storage directory.
2226
2227   @type file_storage_dir: str
2228   @param file_storage_dir: directory to create
2229
2230   @rtype: tuple
2231   @return: tuple with first element a boolean indicating wheter dir
2232       creation was successful or not
2233
2234   """
2235   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2236   if os.path.exists(file_storage_dir):
2237     if not os.path.isdir(file_storage_dir):
2238       _Fail("Specified storage dir '%s' is not a directory",
2239             file_storage_dir)
2240   else:
2241     try:
2242       os.makedirs(file_storage_dir, 0750)
2243     except OSError, err:
2244       _Fail("Cannot create file storage directory '%s': %s",
2245             file_storage_dir, err, exc=True)
2246
2247
2248 def RemoveFileStorageDir(file_storage_dir):
2249   """Remove file storage directory.
2250
2251   Remove it only if it's empty. If not log an error and return.
2252
2253   @type file_storage_dir: str
2254   @param file_storage_dir: the directory we should cleanup
2255   @rtype: tuple (success,)
2256   @return: tuple of one element, C{success}, denoting
2257       whether the operation was successful
2258
2259   """
2260   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2261   if os.path.exists(file_storage_dir):
2262     if not os.path.isdir(file_storage_dir):
2263       _Fail("Specified Storage directory '%s' is not a directory",
2264             file_storage_dir)
2265     # deletes dir only if empty, otherwise we want to fail the rpc call
2266     try:
2267       os.rmdir(file_storage_dir)
2268     except OSError, err:
2269       _Fail("Cannot remove file storage directory '%s': %s",
2270             file_storage_dir, err)
2271
2272
2273 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2274   """Rename the file storage directory.
2275
2276   @type old_file_storage_dir: str
2277   @param old_file_storage_dir: the current path
2278   @type new_file_storage_dir: str
2279   @param new_file_storage_dir: the name we should rename to
2280   @rtype: tuple (success,)
2281   @return: tuple of one element, C{success}, denoting
2282       whether the operation was successful
2283
2284   """
2285   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2286   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2287   if not os.path.exists(new_file_storage_dir):
2288     if os.path.isdir(old_file_storage_dir):
2289       try:
2290         os.rename(old_file_storage_dir, new_file_storage_dir)
2291       except OSError, err:
2292         _Fail("Cannot rename '%s' to '%s': %s",
2293               old_file_storage_dir, new_file_storage_dir, err)
2294     else:
2295       _Fail("Specified storage dir '%s' is not a directory",
2296             old_file_storage_dir)
2297   else:
2298     if os.path.exists(old_file_storage_dir):
2299       _Fail("Cannot rename '%s' to '%s': both locations exist",
2300             old_file_storage_dir, new_file_storage_dir)
2301
2302
2303 def _EnsureJobQueueFile(file_name):
2304   """Checks whether the given filename is in the queue directory.
2305
2306   @type file_name: str
2307   @param file_name: the file name we should check
2308   @rtype: None
2309   @raises RPCFail: if the file is not valid
2310
2311   """
2312   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2313   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2314
2315   if not result:
2316     _Fail("Passed job queue file '%s' does not belong to"
2317           " the queue directory '%s'", file_name, queue_dir)
2318
2319
2320 def JobQueueUpdate(file_name, content):
2321   """Updates a file in the queue directory.
2322
2323   This is just a wrapper over L{utils.WriteFile}, with proper
2324   checking.
2325
2326   @type file_name: str
2327   @param file_name: the job file name
2328   @type content: str
2329   @param content: the new job contents
2330   @rtype: boolean
2331   @return: the success of the operation
2332
2333   """
2334   _EnsureJobQueueFile(file_name)
2335
2336   # Write and replace the file atomically
2337   utils.WriteFile(file_name, data=_Decompress(content))
2338
2339
2340 def JobQueueRename(old, new):
2341   """Renames a job queue file.
2342
2343   This is just a wrapper over os.rename with proper checking.
2344
2345   @type old: str
2346   @param old: the old (actual) file name
2347   @type new: str
2348   @param new: the desired file name
2349   @rtype: tuple
2350   @return: the success of the operation and payload
2351
2352   """
2353   _EnsureJobQueueFile(old)
2354   _EnsureJobQueueFile(new)
2355
2356   utils.RenameFile(old, new, mkdir=True)
2357
2358
2359 def JobQueueSetDrainFlag(drain_flag):
2360   """Set the drain flag for the queue.
2361
2362   This will set or unset the queue drain flag.
2363
2364   @type drain_flag: boolean
2365   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2366   @rtype: truple
2367   @return: always True, None
2368   @warning: the function always returns True
2369
2370   """
2371   if drain_flag:
2372     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2373   else:
2374     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2375
2376
2377 def BlockdevClose(instance_name, disks):
2378   """Closes the given block devices.
2379
2380   This means they will be switched to secondary mode (in case of
2381   DRBD).
2382
2383   @param instance_name: if the argument is not empty, the symlinks
2384       of this instance will be removed
2385   @type disks: list of L{objects.Disk}
2386   @param disks: the list of disks to be closed
2387   @rtype: tuple (success, message)
2388   @return: a tuple of success and message, where success
2389       indicates the succes of the operation, and message
2390       which will contain the error details in case we
2391       failed
2392
2393   """
2394   bdevs = []
2395   for cf in disks:
2396     rd = _RecursiveFindBD(cf)
2397     if rd is None:
2398       _Fail("Can't find device %s", cf)
2399     bdevs.append(rd)
2400
2401   msg = []
2402   for rd in bdevs:
2403     try:
2404       rd.Close()
2405     except errors.BlockDeviceError, err:
2406       msg.append(str(err))
2407   if msg:
2408     _Fail("Can't make devices secondary: %s", ",".join(msg))
2409   else:
2410     if instance_name:
2411       _RemoveBlockDevLinks(instance_name, disks)
2412
2413
2414 def ValidateHVParams(hvname, hvparams):
2415   """Validates the given hypervisor parameters.
2416
2417   @type hvname: string
2418   @param hvname: the hypervisor name
2419   @type hvparams: dict
2420   @param hvparams: the hypervisor parameters to be validated
2421   @rtype: None
2422
2423   """
2424   try:
2425     hv_type = hypervisor.GetHypervisor(hvname)
2426     hv_type.ValidateParameters(hvparams)
2427   except errors.HypervisorError, err:
2428     _Fail(str(err), log=False)
2429
2430
2431 def DemoteFromMC():
2432   """Demotes the current node from master candidate role.
2433
2434   """
2435   # try to ensure we're not the master by mistake
2436   master, myself = ssconf.GetMasterAndMyself()
2437   if master == myself:
2438     _Fail("ssconf status shows I'm the master node, will not demote")
2439
2440   result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2441   if not result.failed:
2442     _Fail("The master daemon is running, will not demote")
2443
2444   try:
2445     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2446       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2447   except EnvironmentError, err:
2448     if err.errno != errno.ENOENT:
2449       _Fail("Error while backing up cluster file: %s", err, exc=True)
2450
2451   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2452
2453
2454 def _GetX509Filenames(cryptodir, name):
2455   """Returns the full paths for the private key and certificate.
2456
2457   """
2458   return (utils.PathJoin(cryptodir, name),
2459           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2460           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2461
2462
2463 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2464   """Creates a new X509 certificate for SSL/TLS.
2465
2466   @type validity: int
2467   @param validity: Validity in seconds
2468   @rtype: tuple; (string, string)
2469   @return: Certificate name and public part
2470
2471   """
2472   (key_pem, cert_pem) = \
2473     utils.GenerateSelfSignedX509Cert(utils.HostInfo.SysName(),
2474                                      min(validity, _MAX_SSL_CERT_VALIDITY))
2475
2476   cert_dir = tempfile.mkdtemp(dir=cryptodir,
2477                               prefix="x509-%s-" % utils.TimestampForFilename())
2478   try:
2479     name = os.path.basename(cert_dir)
2480     assert len(name) > 5
2481
2482     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2483
2484     utils.WriteFile(key_file, mode=0400, data=key_pem)
2485     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2486
2487     # Never return private key as it shouldn't leave the node
2488     return (name, cert_pem)
2489   except Exception:
2490     shutil.rmtree(cert_dir, ignore_errors=True)
2491     raise
2492
2493
2494 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2495   """Removes a X509 certificate.
2496
2497   @type name: string
2498   @param name: Certificate name
2499
2500   """
2501   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2502
2503   utils.RemoveFile(key_file)
2504   utils.RemoveFile(cert_file)
2505
2506   try:
2507     os.rmdir(cert_dir)
2508   except EnvironmentError, err:
2509     _Fail("Cannot remove certificate directory '%s': %s",
2510           cert_dir, err)
2511
2512
2513 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2514   """Returns the command for the requested input/output.
2515
2516   @type instance: L{objects.Instance}
2517   @param instance: The instance object
2518   @param mode: Import/export mode
2519   @param ieio: Input/output type
2520   @param ieargs: Input/output arguments
2521
2522   """
2523   assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2524
2525   env = None
2526   prefix = None
2527   suffix = None
2528
2529   if ieio == constants.IEIO_FILE:
2530     (filename, ) = ieargs
2531
2532     if not utils.IsNormAbsPath(filename):
2533       _Fail("Path '%s' is not normalized or absolute", filename)
2534
2535     directory = os.path.normpath(os.path.dirname(filename))
2536
2537     if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2538         constants.EXPORT_DIR):
2539       _Fail("File '%s' is not under exports directory '%s'",
2540             filename, constants.EXPORT_DIR)
2541
2542     # Create directory
2543     utils.Makedirs(directory, mode=0750)
2544
2545     quoted_filename = utils.ShellQuote(filename)
2546
2547     if mode == constants.IEM_IMPORT:
2548       suffix = "> %s" % quoted_filename
2549     elif mode == constants.IEM_EXPORT:
2550       suffix = "< %s" % quoted_filename
2551
2552   elif ieio == constants.IEIO_RAW_DISK:
2553     (disk, ) = ieargs
2554
2555     real_disk = _OpenRealBD(disk)
2556
2557     if mode == constants.IEM_IMPORT:
2558       # we set here a smaller block size as, due to transport buffering, more
2559       # than 64-128k will mostly ignored; we use nocreat to fail if the device
2560       # is not already there or we pass a wrong path; we use notrunc to no
2561       # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2562       # much memory; this means that at best, we flush every 64k, which will
2563       # not be very fast
2564       suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2565                                     " bs=%s oflag=dsync"),
2566                                     real_disk.dev_path,
2567                                     str(64 * 1024))
2568
2569     elif mode == constants.IEM_EXPORT:
2570       # the block size on the read dd is 1MiB to match our units
2571       prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2572                                    real_disk.dev_path,
2573                                    str(1024 * 1024), # 1 MB
2574                                    str(disk.size))
2575
2576   elif ieio == constants.IEIO_SCRIPT:
2577     (disk, disk_index, ) = ieargs
2578
2579     assert isinstance(disk_index, (int, long))
2580
2581     real_disk = _OpenRealBD(disk)
2582
2583     inst_os = OSFromDisk(instance.os)
2584     env = OSEnvironment(instance, inst_os)
2585
2586     if mode == constants.IEM_IMPORT:
2587       env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2588       env["IMPORT_INDEX"] = str(disk_index)
2589       script = inst_os.import_script
2590
2591     elif mode == constants.IEM_EXPORT:
2592       env["EXPORT_DEVICE"] = real_disk.dev_path
2593       env["EXPORT_INDEX"] = str(disk_index)
2594       script = inst_os.export_script
2595
2596     # TODO: Pass special environment only to script
2597     script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2598
2599     if mode == constants.IEM_IMPORT:
2600       suffix = "| %s" % script_cmd
2601
2602     elif mode == constants.IEM_EXPORT:
2603       prefix = "%s |" % script_cmd
2604
2605   else:
2606     _Fail("Invalid %s I/O mode %r", mode, ieio)
2607
2608   return (env, prefix, suffix)
2609
2610
2611 def _CreateImportExportStatusDir(prefix):
2612   """Creates status directory for import/export.
2613
2614   """
2615   return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2616                           prefix=("%s-%s-" %
2617                                   (prefix, utils.TimestampForFilename())))
2618
2619
2620 def StartImportExportDaemon(mode, key_name, ca, host, port, instance,
2621                             ieio, ieioargs):
2622   """Starts an import or export daemon.
2623
2624   @param mode: Import/output mode
2625   @type key_name: string
2626   @param key_name: RSA key name (None to use cluster certificate)
2627   @type ca: string:
2628   @param ca: Remote CA in PEM format (None to use cluster certificate)
2629   @type host: string
2630   @param host: Remote host for export (None for import)
2631   @type port: int
2632   @param port: Remote port for export (None for import)
2633   @type instance: L{objects.Instance}
2634   @param instance: Instance object
2635   @param ieio: Input/output type
2636   @param ieioargs: Input/output arguments
2637
2638   """
2639   if mode == constants.IEM_IMPORT:
2640     prefix = "import"
2641
2642     if not (host is None and port is None):
2643       _Fail("Can not specify host or port on import")
2644
2645   elif mode == constants.IEM_EXPORT:
2646     prefix = "export"
2647
2648     if host is None or port is None:
2649       _Fail("Host and port must be specified for an export")
2650
2651   else:
2652     _Fail("Invalid mode %r", mode)
2653
2654   if (key_name is None) ^ (ca is None):
2655     _Fail("Cluster certificate can only be used for both key and CA")
2656
2657   (cmd_env, cmd_prefix, cmd_suffix) = \
2658     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
2659
2660   if key_name is None:
2661     # Use server.pem
2662     key_path = constants.NODED_CERT_FILE
2663     cert_path = constants.NODED_CERT_FILE
2664     assert ca is None
2665   else:
2666     (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
2667                                                  key_name)
2668     assert ca is not None
2669
2670   for i in [key_path, cert_path]:
2671     if not os.path.exists(i):
2672       _Fail("File '%s' does not exist" % i)
2673
2674   status_dir = _CreateImportExportStatusDir(prefix)
2675   try:
2676     status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
2677     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
2678     ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
2679
2680     if ca is None:
2681       # Use server.pem
2682       ca = utils.ReadFile(constants.NODED_CERT_FILE)
2683
2684     utils.WriteFile(ca_file, data=ca, mode=0400)
2685
2686     cmd = [
2687       constants.IMPORT_EXPORT_DAEMON,
2688       status_file, mode,
2689       "--key=%s" % key_path,
2690       "--cert=%s" % cert_path,
2691       "--ca=%s" % ca_file,
2692       ]
2693
2694     if host:
2695       cmd.append("--host=%s" % host)
2696
2697     if port:
2698       cmd.append("--port=%s" % port)
2699
2700     if cmd_prefix:
2701       cmd.append("--cmd-prefix=%s" % cmd_prefix)
2702
2703     if cmd_suffix:
2704       cmd.append("--cmd-suffix=%s" % cmd_suffix)
2705
2706     logfile = _InstanceLogName(prefix, instance.os, instance.name)
2707
2708     # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
2709     # support for receiving a file descriptor for output
2710     utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
2711                       output=logfile)
2712
2713     # The import/export name is simply the status directory name
2714     return os.path.basename(status_dir)
2715
2716   except Exception:
2717     shutil.rmtree(status_dir, ignore_errors=True)
2718     raise
2719
2720
2721 def GetImportExportStatus(names):
2722   """Returns import/export daemon status.
2723
2724   @type names: sequence
2725   @param names: List of names
2726   @rtype: List of dicts
2727   @return: Returns a list of the state of each named import/export or None if a
2728            status couldn't be read
2729
2730   """
2731   result = []
2732
2733   for name in names:
2734     status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
2735                                  _IES_STATUS_FILE)
2736
2737     try:
2738       data = utils.ReadFile(status_file)
2739     except EnvironmentError, err:
2740       if err.errno != errno.ENOENT:
2741         raise
2742       data = None
2743
2744     if not data:
2745       result.append(None)
2746       continue
2747
2748     result.append(serializer.LoadJson(data))
2749
2750   return result
2751
2752
2753 def AbortImportExport(name):
2754   """Sends SIGTERM to a running import/export daemon.
2755
2756   """
2757   logging.info("Abort import/export %s", name)
2758
2759   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2760   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2761
2762   if pid:
2763     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
2764                  name, pid)
2765     os.kill(pid, signal.SIGTERM)
2766
2767
2768 def CleanupImportExport(name):
2769   """Cleanup after an import or export.
2770
2771   If the import/export daemon is still running it's killed. Afterwards the
2772   whole status directory is removed.
2773
2774   """
2775   logging.info("Finalizing import/export %s", name)
2776
2777   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2778
2779   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2780
2781   if pid:
2782     logging.info("Import/export %s is still running with PID %s",
2783                  name, pid)
2784     utils.KillProcess(pid, waitpid=False)
2785
2786   shutil.rmtree(status_dir, ignore_errors=True)
2787
2788
2789 def _FindDisks(nodes_ip, disks):
2790   """Sets the physical ID on disks and returns the block devices.
2791
2792   """
2793   # set the correct physical ID
2794   my_name = utils.HostInfo().name
2795   for cf in disks:
2796     cf.SetPhysicalID(my_name, nodes_ip)
2797
2798   bdevs = []
2799
2800   for cf in disks:
2801     rd = _RecursiveFindBD(cf)
2802     if rd is None:
2803       _Fail("Can't find device %s", cf)
2804     bdevs.append(rd)
2805   return bdevs
2806
2807
2808 def DrbdDisconnectNet(nodes_ip, disks):
2809   """Disconnects the network on a list of drbd devices.
2810
2811   """
2812   bdevs = _FindDisks(nodes_ip, disks)
2813
2814   # disconnect disks
2815   for rd in bdevs:
2816     try:
2817       rd.DisconnectNet()
2818     except errors.BlockDeviceError, err:
2819       _Fail("Can't change network configuration to standalone mode: %s",
2820             err, exc=True)
2821
2822
2823 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2824   """Attaches the network on a list of drbd devices.
2825
2826   """
2827   bdevs = _FindDisks(nodes_ip, disks)
2828
2829   if multimaster:
2830     for idx, rd in enumerate(bdevs):
2831       try:
2832         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2833       except EnvironmentError, err:
2834         _Fail("Can't create symlink: %s", err)
2835   # reconnect disks, switch to new master configuration and if
2836   # needed primary mode
2837   for rd in bdevs:
2838     try:
2839       rd.AttachNet(multimaster)
2840     except errors.BlockDeviceError, err:
2841       _Fail("Can't change network configuration: %s", err)
2842
2843   # wait until the disks are connected; we need to retry the re-attach
2844   # if the device becomes standalone, as this might happen if the one
2845   # node disconnects and reconnects in a different mode before the
2846   # other node reconnects; in this case, one or both of the nodes will
2847   # decide it has wrong configuration and switch to standalone
2848
2849   def _Attach():
2850     all_connected = True
2851
2852     for rd in bdevs:
2853       stats = rd.GetProcStatus()
2854
2855       all_connected = (all_connected and
2856                        (stats.is_connected or stats.is_in_resync))
2857
2858       if stats.is_standalone:
2859         # peer had different config info and this node became
2860         # standalone, even though this should not happen with the
2861         # new staged way of changing disk configs
2862         try:
2863           rd.AttachNet(multimaster)
2864         except errors.BlockDeviceError, err:
2865           _Fail("Can't change network configuration: %s", err)
2866
2867     if not all_connected:
2868       raise utils.RetryAgain()
2869
2870   try:
2871     # Start with a delay of 100 miliseconds and go up to 5 seconds
2872     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
2873   except utils.RetryTimeout:
2874     _Fail("Timeout in disk reconnecting")
2875
2876   if multimaster:
2877     # change to primary mode
2878     for rd in bdevs:
2879       try:
2880         rd.Open()
2881       except errors.BlockDeviceError, err:
2882         _Fail("Can't change to primary mode: %s", err)
2883
2884
2885 def DrbdWaitSync(nodes_ip, disks):
2886   """Wait until DRBDs have synchronized.
2887
2888   """
2889   def _helper(rd):
2890     stats = rd.GetProcStatus()
2891     if not (stats.is_connected or stats.is_in_resync):
2892       raise utils.RetryAgain()
2893     return stats
2894
2895   bdevs = _FindDisks(nodes_ip, disks)
2896
2897   min_resync = 100
2898   alldone = True
2899   for rd in bdevs:
2900     try:
2901       # poll each second for 15 seconds
2902       stats = utils.Retry(_helper, 1, 15, args=[rd])
2903     except utils.RetryTimeout:
2904       stats = rd.GetProcStatus()
2905       # last check
2906       if not (stats.is_connected or stats.is_in_resync):
2907         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2908     alldone = alldone and (not stats.is_in_resync)
2909     if stats.sync_percent is not None:
2910       min_resync = min(min_resync, stats.sync_percent)
2911
2912   return (alldone, min_resync)
2913
2914
2915 def PowercycleNode(hypervisor_type):
2916   """Hard-powercycle the node.
2917
2918   Because we need to return first, and schedule the powercycle in the
2919   background, we won't be able to report failures nicely.
2920
2921   """
2922   hyper = hypervisor.GetHypervisor(hypervisor_type)
2923   try:
2924     pid = os.fork()
2925   except OSError:
2926     # if we can't fork, we'll pretend that we're in the child process
2927     pid = 0
2928   if pid > 0:
2929     return "Reboot scheduled in 5 seconds"
2930   # ensure the child is running on ram
2931   try:
2932     utils.Mlockall()
2933   except Exception: # pylint: disable-msg=W0703
2934     pass
2935   time.sleep(5)
2936   hyper.PowercycleNode()
2937
2938
2939 class HooksRunner(object):
2940   """Hook runner.
2941
2942   This class is instantiated on the node side (ganeti-noded) and not
2943   on the master side.
2944
2945   """
2946   def __init__(self, hooks_base_dir=None):
2947     """Constructor for hooks runner.
2948
2949     @type hooks_base_dir: str or None
2950     @param hooks_base_dir: if not None, this overrides the
2951         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2952
2953     """
2954     if hooks_base_dir is None:
2955       hooks_base_dir = constants.HOOKS_BASE_DIR
2956     # yeah, _BASE_DIR is not valid for attributes, we use it like a
2957     # constant
2958     self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
2959
2960   def RunHooks(self, hpath, phase, env):
2961     """Run the scripts in the hooks directory.
2962
2963     @type hpath: str
2964     @param hpath: the path to the hooks directory which
2965         holds the scripts
2966     @type phase: str
2967     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2968         L{constants.HOOKS_PHASE_POST}
2969     @type env: dict
2970     @param env: dictionary with the environment for the hook
2971     @rtype: list
2972     @return: list of 3-element tuples:
2973       - script path
2974       - script result, either L{constants.HKR_SUCCESS} or
2975         L{constants.HKR_FAIL}
2976       - output of the script
2977
2978     @raise errors.ProgrammerError: for invalid input
2979         parameters
2980
2981     """
2982     if phase == constants.HOOKS_PHASE_PRE:
2983       suffix = "pre"
2984     elif phase == constants.HOOKS_PHASE_POST:
2985       suffix = "post"
2986     else:
2987       _Fail("Unknown hooks phase '%s'", phase)
2988
2989
2990     subdir = "%s-%s.d" % (hpath, suffix)
2991     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
2992
2993     results = []
2994
2995     if not os.path.isdir(dir_name):
2996       # for non-existing/non-dirs, we simply exit instead of logging a
2997       # warning at every operation
2998       return results
2999
3000     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3001
3002     for (relname, relstatus, runresult)  in runparts_results:
3003       if relstatus == constants.RUNPARTS_SKIP:
3004         rrval = constants.HKR_SKIP
3005         output = ""
3006       elif relstatus == constants.RUNPARTS_ERR:
3007         rrval = constants.HKR_FAIL
3008         output = "Hook script execution error: %s" % runresult
3009       elif relstatus == constants.RUNPARTS_RUN:
3010         if runresult.failed:
3011           rrval = constants.HKR_FAIL
3012         else:
3013           rrval = constants.HKR_SUCCESS
3014         output = utils.SafeEncode(runresult.output.strip())
3015       results.append(("%s/%s" % (subdir, relname), rrval, output))
3016
3017     return results
3018
3019
3020 class IAllocatorRunner(object):
3021   """IAllocator runner.
3022
3023   This class is instantiated on the node side (ganeti-noded) and not on
3024   the master side.
3025
3026   """
3027   @staticmethod
3028   def Run(name, idata):
3029     """Run an iallocator script.
3030
3031     @type name: str
3032     @param name: the iallocator script name
3033     @type idata: str
3034     @param idata: the allocator input data
3035
3036     @rtype: tuple
3037     @return: two element tuple of:
3038        - status
3039        - either error message or stdout of allocator (for success)
3040
3041     """
3042     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3043                                   os.path.isfile)
3044     if alloc_script is None:
3045       _Fail("iallocator module '%s' not found in the search path", name)
3046
3047     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3048     try:
3049       os.write(fd, idata)
3050       os.close(fd)
3051       result = utils.RunCmd([alloc_script, fin_name])
3052       if result.failed:
3053         _Fail("iallocator module '%s' failed: %s, output '%s'",
3054               name, result.fail_reason, result.output)
3055     finally:
3056       os.unlink(fin_name)
3057
3058     return result.stdout
3059
3060
3061 class DevCacheManager(object):
3062   """Simple class for managing a cache of block device information.
3063
3064   """
3065   _DEV_PREFIX = "/dev/"
3066   _ROOT_DIR = constants.BDEV_CACHE_DIR
3067
3068   @classmethod
3069   def _ConvertPath(cls, dev_path):
3070     """Converts a /dev/name path to the cache file name.
3071
3072     This replaces slashes with underscores and strips the /dev
3073     prefix. It then returns the full path to the cache file.
3074
3075     @type dev_path: str
3076     @param dev_path: the C{/dev/} path name
3077     @rtype: str
3078     @return: the converted path name
3079
3080     """
3081     if dev_path.startswith(cls._DEV_PREFIX):
3082       dev_path = dev_path[len(cls._DEV_PREFIX):]
3083     dev_path = dev_path.replace("/", "_")
3084     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3085     return fpath
3086
3087   @classmethod
3088   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3089     """Updates the cache information for a given device.
3090
3091     @type dev_path: str
3092     @param dev_path: the pathname of the device
3093     @type owner: str
3094     @param owner: the owner (instance name) of the device
3095     @type on_primary: bool
3096     @param on_primary: whether this is the primary
3097         node nor not
3098     @type iv_name: str
3099     @param iv_name: the instance-visible name of the
3100         device, as in objects.Disk.iv_name
3101
3102     @rtype: None
3103
3104     """
3105     if dev_path is None:
3106       logging.error("DevCacheManager.UpdateCache got a None dev_path")
3107       return
3108     fpath = cls._ConvertPath(dev_path)
3109     if on_primary:
3110       state = "primary"
3111     else:
3112       state = "secondary"
3113     if iv_name is None:
3114       iv_name = "not_visible"
3115     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3116     try:
3117       utils.WriteFile(fpath, data=fdata)
3118     except EnvironmentError, err:
3119       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3120
3121   @classmethod
3122   def RemoveCache(cls, dev_path):
3123     """Remove data for a dev_path.
3124
3125     This is just a wrapper over L{utils.RemoveFile} with a converted
3126     path name and logging.
3127
3128     @type dev_path: str
3129     @param dev_path: the pathname of the device
3130
3131     @rtype: None
3132
3133     """
3134     if dev_path is None:
3135       logging.error("DevCacheManager.RemoveCache got a None dev_path")
3136       return
3137     fpath = cls._ConvertPath(dev_path)
3138     try:
3139       utils.RemoveFile(fpath)
3140     except EnvironmentError, err:
3141       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)