OSFromDisk: handle variants when loading os
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26
27 """
28
29
30 import os
31 import os.path
32 import shutil
33 import time
34 import stat
35 import errno
36 import re
37 import subprocess
38 import random
39 import logging
40 import tempfile
41 import zlib
42 import base64
43
44 from ganeti import errors
45 from ganeti import utils
46 from ganeti import ssh
47 from ganeti import hypervisor
48 from ganeti import constants
49 from ganeti import bdev
50 from ganeti import objects
51 from ganeti import ssconf
52
53
54 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
55
56
57 class RPCFail(Exception):
58   """Class denoting RPC failure.
59
60   Its argument is the error message.
61
62   """
63
64
65 def _Fail(msg, *args, **kwargs):
66   """Log an error and the raise an RPCFail exception.
67
68   This exception is then handled specially in the ganeti daemon and
69   turned into a 'failed' return type. As such, this function is a
70   useful shortcut for logging the error and returning it to the master
71   daemon.
72
73   @type msg: string
74   @param msg: the text of the exception
75   @raise RPCFail
76
77   """
78   if args:
79     msg = msg % args
80   if "log" not in kwargs or kwargs["log"]: # if we should log this error
81     if "exc" in kwargs and kwargs["exc"]:
82       logging.exception(msg)
83     else:
84       logging.error(msg)
85   raise RPCFail(msg)
86
87
88 def _GetConfig():
89   """Simple wrapper to return a SimpleStore.
90
91   @rtype: L{ssconf.SimpleStore}
92   @return: a SimpleStore instance
93
94   """
95   return ssconf.SimpleStore()
96
97
98 def _GetSshRunner(cluster_name):
99   """Simple wrapper to return an SshRunner.
100
101   @type cluster_name: str
102   @param cluster_name: the cluster name, which is needed
103       by the SshRunner constructor
104   @rtype: L{ssh.SshRunner}
105   @return: an SshRunner instance
106
107   """
108   return ssh.SshRunner(cluster_name)
109
110
111 def _Decompress(data):
112   """Unpacks data compressed by the RPC client.
113
114   @type data: list or tuple
115   @param data: Data sent by RPC client
116   @rtype: str
117   @return: Decompressed data
118
119   """
120   assert isinstance(data, (list, tuple))
121   assert len(data) == 2
122   (encoding, content) = data
123   if encoding == constants.RPC_ENCODING_NONE:
124     return content
125   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
126     return zlib.decompress(base64.b64decode(content))
127   else:
128     raise AssertionError("Unknown data encoding")
129
130
131 def _CleanDirectory(path, exclude=None):
132   """Removes all regular files in a directory.
133
134   @type path: str
135   @param path: the directory to clean
136   @type exclude: list
137   @param exclude: list of files to be excluded, defaults
138       to the empty list
139
140   """
141   if not os.path.isdir(path):
142     return
143   if exclude is None:
144     exclude = []
145   else:
146     # Normalize excluded paths
147     exclude = [os.path.normpath(i) for i in exclude]
148
149   for rel_name in utils.ListVisibleFiles(path):
150     full_name = os.path.normpath(os.path.join(path, rel_name))
151     if full_name in exclude:
152       continue
153     if os.path.isfile(full_name) and not os.path.islink(full_name):
154       utils.RemoveFile(full_name)
155
156
157 def _BuildUploadFileList():
158   """Build the list of allowed upload files.
159
160   This is abstracted so that it's built only once at module import time.
161
162   """
163   allowed_files = set([
164     constants.CLUSTER_CONF_FILE,
165     constants.ETC_HOSTS,
166     constants.SSH_KNOWN_HOSTS_FILE,
167     constants.VNC_PASSWORD_FILE,
168     constants.RAPI_CERT_FILE,
169     constants.RAPI_USERS_FILE,
170     constants.HMAC_CLUSTER_KEY,
171     ])
172
173   for hv_name in constants.HYPER_TYPES:
174     hv_class = hypervisor.GetHypervisorClass(hv_name)
175     allowed_files.update(hv_class.GetAncillaryFiles())
176
177   return frozenset(allowed_files)
178
179
180 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
181
182
183 def JobQueuePurge():
184   """Removes job queue files and archived jobs.
185
186   @rtype: tuple
187   @return: True, None
188
189   """
190   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
191   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
192
193
194 def GetMasterInfo():
195   """Returns master information.
196
197   This is an utility function to compute master information, either
198   for consumption here or from the node daemon.
199
200   @rtype: tuple
201   @return: master_netdev, master_ip, master_name
202   @raise RPCFail: in case of errors
203
204   """
205   try:
206     cfg = _GetConfig()
207     master_netdev = cfg.GetMasterNetdev()
208     master_ip = cfg.GetMasterIP()
209     master_node = cfg.GetMasterNode()
210   except errors.ConfigurationError, err:
211     _Fail("Cluster configuration incomplete: %s", err, exc=True)
212   return (master_netdev, master_ip, master_node)
213
214
215 def StartMaster(start_daemons, no_voting):
216   """Activate local node as master node.
217
218   The function will always try activate the IP address of the master
219   (unless someone else has it). It will also start the master daemons,
220   based on the start_daemons parameter.
221
222   @type start_daemons: boolean
223   @param start_daemons: whether to also start the master
224       daemons (ganeti-masterd and ganeti-rapi)
225   @type no_voting: boolean
226   @param no_voting: whether to start ganeti-masterd without a node vote
227       (if start_daemons is True), but still non-interactively
228   @rtype: None
229
230   """
231   # GetMasterInfo will raise an exception if not able to return data
232   master_netdev, master_ip, _ = GetMasterInfo()
233
234   err_msgs = []
235   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
236     if utils.OwnIpAddress(master_ip):
237       # we already have the ip:
238       logging.debug("Master IP already configured, doing nothing")
239     else:
240       msg = "Someone else has the master ip, not activating"
241       logging.error(msg)
242       err_msgs.append(msg)
243   else:
244     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
245                            "dev", master_netdev, "label",
246                            "%s:0" % master_netdev])
247     if result.failed:
248       msg = "Can't activate master IP: %s" % result.output
249       logging.error(msg)
250       err_msgs.append(msg)
251
252     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
253                            "-s", master_ip, master_ip])
254     # we'll ignore the exit code of arping
255
256   # and now start the master and rapi daemons
257   if start_daemons:
258     daemons_params = {
259         'ganeti-masterd': [],
260         'ganeti-rapi': [],
261         }
262     if no_voting:
263       daemons_params['ganeti-masterd'].append('--no-voting')
264       daemons_params['ganeti-masterd'].append('--yes-do-it')
265     for daemon in daemons_params:
266       cmd = [daemon]
267       cmd.extend(daemons_params[daemon])
268       result = utils.RunCmd(cmd)
269       if result.failed:
270         msg = "Can't start daemon %s: %s" % (daemon, result.output)
271         logging.error(msg)
272         err_msgs.append(msg)
273
274   if err_msgs:
275     _Fail("; ".join(err_msgs))
276
277
278 def StopMaster(stop_daemons):
279   """Deactivate this node as master.
280
281   The function will always try to deactivate the IP address of the
282   master. It will also stop the master daemons depending on the
283   stop_daemons parameter.
284
285   @type stop_daemons: boolean
286   @param stop_daemons: whether to also stop the master daemons
287       (ganeti-masterd and ganeti-rapi)
288   @rtype: None
289
290   """
291   # TODO: log and report back to the caller the error failures; we
292   # need to decide in which case we fail the RPC for this
293
294   # GetMasterInfo will raise an exception if not able to return data
295   master_netdev, master_ip, _ = GetMasterInfo()
296
297   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
298                          "dev", master_netdev])
299   if result.failed:
300     logging.error("Can't remove the master IP, error: %s", result.output)
301     # but otherwise ignore the failure
302
303   if stop_daemons:
304     # stop/kill the rapi and the master daemon
305     for daemon in constants.RAPI, constants.MASTERD:
306       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
307
308
309 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
310   """Joins this node to the cluster.
311
312   This does the following:
313       - updates the hostkeys of the machine (rsa and dsa)
314       - adds the ssh private key to the user
315       - adds the ssh public key to the users' authorized_keys file
316
317   @type dsa: str
318   @param dsa: the DSA private key to write
319   @type dsapub: str
320   @param dsapub: the DSA public key to write
321   @type rsa: str
322   @param rsa: the RSA private key to write
323   @type rsapub: str
324   @param rsapub: the RSA public key to write
325   @type sshkey: str
326   @param sshkey: the SSH private key to write
327   @type sshpub: str
328   @param sshpub: the SSH public key to write
329   @rtype: boolean
330   @return: the success of the operation
331
332   """
333   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
334                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
335                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
336                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
337   for name, content, mode in sshd_keys:
338     utils.WriteFile(name, data=content, mode=mode)
339
340   try:
341     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
342                                                     mkdir=True)
343   except errors.OpExecError, err:
344     _Fail("Error while processing user ssh files: %s", err, exc=True)
345
346   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
347     utils.WriteFile(name, data=content, mode=0600)
348
349   utils.AddAuthorizedKey(auth_keys, sshpub)
350
351   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
352
353
354 def LeaveCluster():
355   """Cleans up and remove the current node.
356
357   This function cleans up and prepares the current node to be removed
358   from the cluster.
359
360   If processing is successful, then it raises an
361   L{errors.QuitGanetiException} which is used as a special case to
362   shutdown the node daemon.
363
364   """
365   _CleanDirectory(constants.DATA_DIR)
366   JobQueuePurge()
367
368   try:
369     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
370
371     utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
372
373     utils.RemoveFile(priv_key)
374     utils.RemoveFile(pub_key)
375   except errors.OpExecError:
376     logging.exception("Error while processing ssh files")
377
378   try:
379     utils.RemoveFile(constants.HMAC_CLUSTER_KEY)
380     utils.RemoveFile(constants.RAPI_CERT_FILE)
381     utils.RemoveFile(constants.SSL_CERT_FILE)
382   except:
383     logging.exception("Error while removing cluster secrets")
384
385   confd_pid = utils.ReadPidFile(utils.DaemonPidFileName(constants.CONFD))
386
387   if confd_pid:
388     utils.KillProcess(confd_pid, timeout=2)
389
390   # Raise a custom exception (handled in ganeti-noded)
391   raise errors.QuitGanetiException(True, 'Shutdown scheduled')
392
393
394 def GetNodeInfo(vgname, hypervisor_type):
395   """Gives back a hash with different information about the node.
396
397   @type vgname: C{string}
398   @param vgname: the name of the volume group to ask for disk space information
399   @type hypervisor_type: C{str}
400   @param hypervisor_type: the name of the hypervisor to ask for
401       memory information
402   @rtype: C{dict}
403   @return: dictionary with the following keys:
404       - vg_size is the size of the configured volume group in MiB
405       - vg_free is the free size of the volume group in MiB
406       - memory_dom0 is the memory allocated for domain0 in MiB
407       - memory_free is the currently available (free) ram in MiB
408       - memory_total is the total number of ram in MiB
409
410   """
411   outputarray = {}
412   vginfo = _GetVGInfo(vgname)
413   outputarray['vg_size'] = vginfo['vg_size']
414   outputarray['vg_free'] = vginfo['vg_free']
415
416   hyper = hypervisor.GetHypervisor(hypervisor_type)
417   hyp_info = hyper.GetNodeInfo()
418   if hyp_info is not None:
419     outputarray.update(hyp_info)
420
421   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
422
423   return outputarray
424
425
426 def VerifyNode(what, cluster_name):
427   """Verify the status of the local node.
428
429   Based on the input L{what} parameter, various checks are done on the
430   local node.
431
432   If the I{filelist} key is present, this list of
433   files is checksummed and the file/checksum pairs are returned.
434
435   If the I{nodelist} key is present, we check that we have
436   connectivity via ssh with the target nodes (and check the hostname
437   report).
438
439   If the I{node-net-test} key is present, we check that we have
440   connectivity to the given nodes via both primary IP and, if
441   applicable, secondary IPs.
442
443   @type what: C{dict}
444   @param what: a dictionary of things to check:
445       - filelist: list of files for which to compute checksums
446       - nodelist: list of nodes we should check ssh communication with
447       - node-net-test: list of nodes we should check node daemon port
448         connectivity with
449       - hypervisor: list with hypervisors to run the verify for
450   @rtype: dict
451   @return: a dictionary with the same keys as the input dict, and
452       values representing the result of the checks
453
454   """
455   result = {}
456
457   if constants.NV_HYPERVISOR in what:
458     result[constants.NV_HYPERVISOR] = tmp = {}
459     for hv_name in what[constants.NV_HYPERVISOR]:
460       tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
461
462   if constants.NV_FILELIST in what:
463     result[constants.NV_FILELIST] = utils.FingerprintFiles(
464       what[constants.NV_FILELIST])
465
466   if constants.NV_NODELIST in what:
467     result[constants.NV_NODELIST] = tmp = {}
468     random.shuffle(what[constants.NV_NODELIST])
469     for node in what[constants.NV_NODELIST]:
470       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
471       if not success:
472         tmp[node] = message
473
474   if constants.NV_NODENETTEST in what:
475     result[constants.NV_NODENETTEST] = tmp = {}
476     my_name = utils.HostInfo().name
477     my_pip = my_sip = None
478     for name, pip, sip in what[constants.NV_NODENETTEST]:
479       if name == my_name:
480         my_pip = pip
481         my_sip = sip
482         break
483     if not my_pip:
484       tmp[my_name] = ("Can't find my own primary/secondary IP"
485                       " in the node list")
486     else:
487       port = utils.GetDaemonPort(constants.NODED)
488       for name, pip, sip in what[constants.NV_NODENETTEST]:
489         fail = []
490         if not utils.TcpPing(pip, port, source=my_pip):
491           fail.append("primary")
492         if sip != pip:
493           if not utils.TcpPing(sip, port, source=my_sip):
494             fail.append("secondary")
495         if fail:
496           tmp[name] = ("failure using the %s interface(s)" %
497                        " and ".join(fail))
498
499   if constants.NV_LVLIST in what:
500     result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
501
502   if constants.NV_INSTANCELIST in what:
503     result[constants.NV_INSTANCELIST] = GetInstanceList(
504       what[constants.NV_INSTANCELIST])
505
506   if constants.NV_VGLIST in what:
507     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
508
509   if constants.NV_VERSION in what:
510     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
511                                     constants.RELEASE_VERSION)
512
513   if constants.NV_HVINFO in what:
514     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
515     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
516
517   if constants.NV_DRBDLIST in what:
518     try:
519       used_minors = bdev.DRBD8.GetUsedDevs().keys()
520     except errors.BlockDeviceError, err:
521       logging.warning("Can't get used minors list", exc_info=True)
522       used_minors = str(err)
523     result[constants.NV_DRBDLIST] = used_minors
524
525   return result
526
527
528 def GetVolumeList(vg_name):
529   """Compute list of logical volumes and their size.
530
531   @type vg_name: str
532   @param vg_name: the volume group whose LVs we should list
533   @rtype: dict
534   @return:
535       dictionary of all partions (key) with value being a tuple of
536       their size (in MiB), inactive and online status::
537
538         {'test1': ('20.06', True, True)}
539
540       in case of errors, a string is returned with the error
541       details.
542
543   """
544   lvs = {}
545   sep = '|'
546   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
547                          "--separator=%s" % sep,
548                          "-olv_name,lv_size,lv_attr", vg_name])
549   if result.failed:
550     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
551
552   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
553   for line in result.stdout.splitlines():
554     line = line.strip()
555     match = valid_line_re.match(line)
556     if not match:
557       logging.error("Invalid line returned from lvs output: '%s'", line)
558       continue
559     name, size, attr = match.groups()
560     inactive = attr[4] == '-'
561     online = attr[5] == 'o'
562     virtual = attr[0] == 'v'
563     if virtual:
564       # we don't want to report such volumes as existing, since they
565       # don't really hold data
566       continue
567     lvs[name] = (size, inactive, online)
568
569   return lvs
570
571
572 def ListVolumeGroups():
573   """List the volume groups and their size.
574
575   @rtype: dict
576   @return: dictionary with keys volume name and values the
577       size of the volume
578
579   """
580   return utils.ListVolumeGroups()
581
582
583 def NodeVolumes():
584   """List all volumes on this node.
585
586   @rtype: list
587   @return:
588     A list of dictionaries, each having four keys:
589       - name: the logical volume name,
590       - size: the size of the logical volume
591       - dev: the physical device on which the LV lives
592       - vg: the volume group to which it belongs
593
594     In case of errors, we return an empty list and log the
595     error.
596
597     Note that since a logical volume can live on multiple physical
598     volumes, the resulting list might include a logical volume
599     multiple times.
600
601   """
602   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
603                          "--separator=|",
604                          "--options=lv_name,lv_size,devices,vg_name"])
605   if result.failed:
606     _Fail("Failed to list logical volumes, lvs output: %s",
607           result.output)
608
609   def parse_dev(dev):
610     if '(' in dev:
611       return dev.split('(')[0]
612     else:
613       return dev
614
615   def map_line(line):
616     return {
617       'name': line[0].strip(),
618       'size': line[1].strip(),
619       'dev': parse_dev(line[2].strip()),
620       'vg': line[3].strip(),
621     }
622
623   return [map_line(line.split('|')) for line in result.stdout.splitlines()
624           if line.count('|') >= 3]
625
626
627 def BridgesExist(bridges_list):
628   """Check if a list of bridges exist on the current node.
629
630   @rtype: boolean
631   @return: C{True} if all of them exist, C{False} otherwise
632
633   """
634   missing = []
635   for bridge in bridges_list:
636     if not utils.BridgeExists(bridge):
637       missing.append(bridge)
638
639   if missing:
640     _Fail("Missing bridges %s", ", ".join(missing))
641
642
643 def GetInstanceList(hypervisor_list):
644   """Provides a list of instances.
645
646   @type hypervisor_list: list
647   @param hypervisor_list: the list of hypervisors to query information
648
649   @rtype: list
650   @return: a list of all running instances on the current node
651     - instance1.example.com
652     - instance2.example.com
653
654   """
655   results = []
656   for hname in hypervisor_list:
657     try:
658       names = hypervisor.GetHypervisor(hname).ListInstances()
659       results.extend(names)
660     except errors.HypervisorError, err:
661       _Fail("Error enumerating instances (hypervisor %s): %s",
662             hname, err, exc=True)
663
664   return results
665
666
667 def GetInstanceInfo(instance, hname):
668   """Gives back the information about an instance as a dictionary.
669
670   @type instance: string
671   @param instance: the instance name
672   @type hname: string
673   @param hname: the hypervisor type of the instance
674
675   @rtype: dict
676   @return: dictionary with the following keys:
677       - memory: memory size of instance (int)
678       - state: xen state of instance (string)
679       - time: cpu time of instance (float)
680
681   """
682   output = {}
683
684   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
685   if iinfo is not None:
686     output['memory'] = iinfo[2]
687     output['state'] = iinfo[4]
688     output['time'] = iinfo[5]
689
690   return output
691
692
693 def GetInstanceMigratable(instance):
694   """Gives whether an instance can be migrated.
695
696   @type instance: L{objects.Instance}
697   @param instance: object representing the instance to be checked.
698
699   @rtype: tuple
700   @return: tuple of (result, description) where:
701       - result: whether the instance can be migrated or not
702       - description: a description of the issue, if relevant
703
704   """
705   hyper = hypervisor.GetHypervisor(instance.hypervisor)
706   iname = instance.name
707   if iname not in hyper.ListInstances():
708     _Fail("Instance %s is not running", iname)
709
710   for idx in range(len(instance.disks)):
711     link_name = _GetBlockDevSymlinkPath(iname, idx)
712     if not os.path.islink(link_name):
713       _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
714
715
716 def GetAllInstancesInfo(hypervisor_list):
717   """Gather data about all instances.
718
719   This is the equivalent of L{GetInstanceInfo}, except that it
720   computes data for all instances at once, thus being faster if one
721   needs data about more than one instance.
722
723   @type hypervisor_list: list
724   @param hypervisor_list: list of hypervisors to query for instance data
725
726   @rtype: dict
727   @return: dictionary of instance: data, with data having the following keys:
728       - memory: memory size of instance (int)
729       - state: xen state of instance (string)
730       - time: cpu time of instance (float)
731       - vcpus: the number of vcpus
732
733   """
734   output = {}
735
736   for hname in hypervisor_list:
737     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
738     if iinfo:
739       for name, _, memory, vcpus, state, times in iinfo:
740         value = {
741           'memory': memory,
742           'vcpus': vcpus,
743           'state': state,
744           'time': times,
745           }
746         if name in output:
747           # we only check static parameters, like memory and vcpus,
748           # and not state and time which can change between the
749           # invocations of the different hypervisors
750           for key in 'memory', 'vcpus':
751             if value[key] != output[name][key]:
752               _Fail("Instance %s is running twice"
753                     " with different parameters", name)
754         output[name] = value
755
756   return output
757
758
759 def InstanceOsAdd(instance, reinstall):
760   """Add an OS to an instance.
761
762   @type instance: L{objects.Instance}
763   @param instance: Instance whose OS is to be installed
764   @type reinstall: boolean
765   @param reinstall: whether this is an instance reinstall
766   @rtype: None
767
768   """
769   inst_os = OSFromDisk(instance.os)
770
771   create_env = OSEnvironment(instance, inst_os)
772   if reinstall:
773     create_env['INSTANCE_REINSTALL'] = "1"
774
775   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
776                                      instance.name, int(time.time()))
777
778   result = utils.RunCmd([inst_os.create_script], env=create_env,
779                         cwd=inst_os.path, output=logfile,)
780   if result.failed:
781     logging.error("os create command '%s' returned error: %s, logfile: %s,"
782                   " output: %s", result.cmd, result.fail_reason, logfile,
783                   result.output)
784     lines = [utils.SafeEncode(val)
785              for val in utils.TailFile(logfile, lines=20)]
786     _Fail("OS create script failed (%s), last lines in the"
787           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
788
789
790 def RunRenameInstance(instance, old_name):
791   """Run the OS rename script for an instance.
792
793   @type instance: L{objects.Instance}
794   @param instance: Instance whose OS is to be installed
795   @type old_name: string
796   @param old_name: previous instance name
797   @rtype: boolean
798   @return: the success of the operation
799
800   """
801   inst_os = OSFromDisk(instance.os)
802
803   rename_env = OSEnvironment(instance, inst_os)
804   rename_env['OLD_INSTANCE_NAME'] = old_name
805
806   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
807                                            old_name,
808                                            instance.name, int(time.time()))
809
810   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
811                         cwd=inst_os.path, output=logfile)
812
813   if result.failed:
814     logging.error("os create command '%s' returned error: %s output: %s",
815                   result.cmd, result.fail_reason, result.output)
816     lines = [utils.SafeEncode(val)
817              for val in utils.TailFile(logfile, lines=20)]
818     _Fail("OS rename script failed (%s), last lines in the"
819           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
820
821
822 def _GetVGInfo(vg_name):
823   """Get information about the volume group.
824
825   @type vg_name: str
826   @param vg_name: the volume group which we query
827   @rtype: dict
828   @return:
829     A dictionary with the following keys:
830       - C{vg_size} is the total size of the volume group in MiB
831       - C{vg_free} is the free size of the volume group in MiB
832       - C{pv_count} are the number of physical disks in that VG
833
834     If an error occurs during gathering of data, we return the same dict
835     with keys all set to None.
836
837   """
838   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
839
840   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
841                          "--nosuffix", "--units=m", "--separator=:", vg_name])
842
843   if retval.failed:
844     logging.error("volume group %s not present", vg_name)
845     return retdic
846   valarr = retval.stdout.strip().rstrip(':').split(':')
847   if len(valarr) == 3:
848     try:
849       retdic = {
850         "vg_size": int(round(float(valarr[0]), 0)),
851         "vg_free": int(round(float(valarr[1]), 0)),
852         "pv_count": int(valarr[2]),
853         }
854     except ValueError, err:
855       logging.exception("Fail to parse vgs output: %s", err)
856   else:
857     logging.error("vgs output has the wrong number of fields (expected"
858                   " three): %s", str(valarr))
859   return retdic
860
861
862 def _GetBlockDevSymlinkPath(instance_name, idx):
863   return os.path.join(constants.DISK_LINKS_DIR,
864                       "%s:%d" % (instance_name, idx))
865
866
867 def _SymlinkBlockDev(instance_name, device_path, idx):
868   """Set up symlinks to a instance's block device.
869
870   This is an auxiliary function run when an instance is start (on the primary
871   node) or when an instance is migrated (on the target node).
872
873
874   @param instance_name: the name of the target instance
875   @param device_path: path of the physical block device, on the node
876   @param idx: the disk index
877   @return: absolute path to the disk's symlink
878
879   """
880   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
881   try:
882     os.symlink(device_path, link_name)
883   except OSError, err:
884     if err.errno == errno.EEXIST:
885       if (not os.path.islink(link_name) or
886           os.readlink(link_name) != device_path):
887         os.remove(link_name)
888         os.symlink(device_path, link_name)
889     else:
890       raise
891
892   return link_name
893
894
895 def _RemoveBlockDevLinks(instance_name, disks):
896   """Remove the block device symlinks belonging to the given instance.
897
898   """
899   for idx, _ in enumerate(disks):
900     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
901     if os.path.islink(link_name):
902       try:
903         os.remove(link_name)
904       except OSError:
905         logging.exception("Can't remove symlink '%s'", link_name)
906
907
908 def _GatherAndLinkBlockDevs(instance):
909   """Set up an instance's block device(s).
910
911   This is run on the primary node at instance startup. The block
912   devices must be already assembled.
913
914   @type instance: L{objects.Instance}
915   @param instance: the instance whose disks we shoul assemble
916   @rtype: list
917   @return: list of (disk_object, device_path)
918
919   """
920   block_devices = []
921   for idx, disk in enumerate(instance.disks):
922     device = _RecursiveFindBD(disk)
923     if device is None:
924       raise errors.BlockDeviceError("Block device '%s' is not set up." %
925                                     str(disk))
926     device.Open()
927     try:
928       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
929     except OSError, e:
930       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
931                                     e.strerror)
932
933     block_devices.append((disk, link_name))
934
935   return block_devices
936
937
938 def StartInstance(instance):
939   """Start an instance.
940
941   @type instance: L{objects.Instance}
942   @param instance: the instance object
943   @rtype: None
944
945   """
946   running_instances = GetInstanceList([instance.hypervisor])
947
948   if instance.name in running_instances:
949     logging.info("Instance %s already running, not starting", instance.name)
950     return
951
952   try:
953     block_devices = _GatherAndLinkBlockDevs(instance)
954     hyper = hypervisor.GetHypervisor(instance.hypervisor)
955     hyper.StartInstance(instance, block_devices)
956   except errors.BlockDeviceError, err:
957     _Fail("Block device error: %s", err, exc=True)
958   except errors.HypervisorError, err:
959     _RemoveBlockDevLinks(instance.name, instance.disks)
960     _Fail("Hypervisor error: %s", err, exc=True)
961
962
963 def InstanceShutdown(instance):
964   """Shut an instance down.
965
966   @note: this functions uses polling with a hardcoded timeout.
967
968   @type instance: L{objects.Instance}
969   @param instance: the instance object
970   @rtype: None
971
972   """
973   hv_name = instance.hypervisor
974   running_instances = GetInstanceList([hv_name])
975   iname = instance.name
976
977   if iname not in running_instances:
978     logging.info("Instance %s not running, doing nothing", iname)
979     return
980
981   hyper = hypervisor.GetHypervisor(hv_name)
982   try:
983     hyper.StopInstance(instance)
984   except errors.HypervisorError, err:
985     _Fail("Failed to stop instance %s: %s", iname, err)
986
987   # test every 10secs for 2min
988
989   time.sleep(1)
990   for _ in range(11):
991     if instance.name not in GetInstanceList([hv_name]):
992       break
993     time.sleep(10)
994   else:
995     # the shutdown did not succeed
996     logging.error("Shutdown of '%s' unsuccessful, using destroy", iname)
997
998     try:
999       hyper.StopInstance(instance, force=True)
1000     except errors.HypervisorError, err:
1001       _Fail("Failed to force stop instance %s: %s", iname, err)
1002
1003     time.sleep(1)
1004     if instance.name in GetInstanceList([hv_name]):
1005       _Fail("Could not shutdown instance %s even by destroy", iname)
1006
1007   _RemoveBlockDevLinks(iname, instance.disks)
1008
1009
1010 def InstanceReboot(instance, reboot_type):
1011   """Reboot an instance.
1012
1013   @type instance: L{objects.Instance}
1014   @param instance: the instance object to reboot
1015   @type reboot_type: str
1016   @param reboot_type: the type of reboot, one the following
1017     constants:
1018       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1019         instance OS, do not recreate the VM
1020       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1021         restart the VM (at the hypervisor level)
1022       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1023         not accepted here, since that mode is handled differently, in
1024         cmdlib, and translates into full stop and start of the
1025         instance (instead of a call_instance_reboot RPC)
1026   @rtype: None
1027
1028   """
1029   running_instances = GetInstanceList([instance.hypervisor])
1030
1031   if instance.name not in running_instances:
1032     _Fail("Cannot reboot instance %s that is not running", instance.name)
1033
1034   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1035   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1036     try:
1037       hyper.RebootInstance(instance)
1038     except errors.HypervisorError, err:
1039       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1040   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1041     try:
1042       InstanceShutdown(instance)
1043       return StartInstance(instance)
1044     except errors.HypervisorError, err:
1045       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1046   else:
1047     _Fail("Invalid reboot_type received: %s", reboot_type)
1048
1049
1050 def MigrationInfo(instance):
1051   """Gather information about an instance to be migrated.
1052
1053   @type instance: L{objects.Instance}
1054   @param instance: the instance definition
1055
1056   """
1057   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1058   try:
1059     info = hyper.MigrationInfo(instance)
1060   except errors.HypervisorError, err:
1061     _Fail("Failed to fetch migration information: %s", err, exc=True)
1062   return info
1063
1064
1065 def AcceptInstance(instance, info, target):
1066   """Prepare the node to accept an instance.
1067
1068   @type instance: L{objects.Instance}
1069   @param instance: the instance definition
1070   @type info: string/data (opaque)
1071   @param info: migration information, from the source node
1072   @type target: string
1073   @param target: target host (usually ip), on this node
1074
1075   """
1076   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1077   try:
1078     hyper.AcceptInstance(instance, info, target)
1079   except errors.HypervisorError, err:
1080     _Fail("Failed to accept instance: %s", err, exc=True)
1081
1082
1083 def FinalizeMigration(instance, info, success):
1084   """Finalize any preparation to accept an instance.
1085
1086   @type instance: L{objects.Instance}
1087   @param instance: the instance definition
1088   @type info: string/data (opaque)
1089   @param info: migration information, from the source node
1090   @type success: boolean
1091   @param success: whether the migration was a success or a failure
1092
1093   """
1094   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1095   try:
1096     hyper.FinalizeMigration(instance, info, success)
1097   except errors.HypervisorError, err:
1098     _Fail("Failed to finalize migration: %s", err, exc=True)
1099
1100
1101 def MigrateInstance(instance, target, live):
1102   """Migrates an instance to another node.
1103
1104   @type instance: L{objects.Instance}
1105   @param instance: the instance definition
1106   @type target: string
1107   @param target: the target node name
1108   @type live: boolean
1109   @param live: whether the migration should be done live or not (the
1110       interpretation of this parameter is left to the hypervisor)
1111   @rtype: tuple
1112   @return: a tuple of (success, msg) where:
1113       - succes is a boolean denoting the success/failure of the operation
1114       - msg is a string with details in case of failure
1115
1116   """
1117   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1118
1119   try:
1120     hyper.MigrateInstance(instance.name, target, live)
1121   except errors.HypervisorError, err:
1122     _Fail("Failed to migrate instance: %s", err, exc=True)
1123
1124
1125 def BlockdevCreate(disk, size, owner, on_primary, info):
1126   """Creates a block device for an instance.
1127
1128   @type disk: L{objects.Disk}
1129   @param disk: the object describing the disk we should create
1130   @type size: int
1131   @param size: the size of the physical underlying device, in MiB
1132   @type owner: str
1133   @param owner: the name of the instance for which disk is created,
1134       used for device cache data
1135   @type on_primary: boolean
1136   @param on_primary:  indicates if it is the primary node or not
1137   @type info: string
1138   @param info: string that will be sent to the physical device
1139       creation, used for example to set (LVM) tags on LVs
1140
1141   @return: the new unique_id of the device (this can sometime be
1142       computed only after creation), or None. On secondary nodes,
1143       it's not required to return anything.
1144
1145   """
1146   clist = []
1147   if disk.children:
1148     for child in disk.children:
1149       try:
1150         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1151       except errors.BlockDeviceError, err:
1152         _Fail("Can't assemble device %s: %s", child, err)
1153       if on_primary or disk.AssembleOnSecondary():
1154         # we need the children open in case the device itself has to
1155         # be assembled
1156         try:
1157           crdev.Open()
1158         except errors.BlockDeviceError, err:
1159           _Fail("Can't make child '%s' read-write: %s", child, err)
1160       clist.append(crdev)
1161
1162   try:
1163     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1164   except errors.BlockDeviceError, err:
1165     _Fail("Can't create block device: %s", err)
1166
1167   if on_primary or disk.AssembleOnSecondary():
1168     try:
1169       device.Assemble()
1170     except errors.BlockDeviceError, err:
1171       _Fail("Can't assemble device after creation, unusual event: %s", err)
1172     device.SetSyncSpeed(constants.SYNC_SPEED)
1173     if on_primary or disk.OpenOnSecondary():
1174       try:
1175         device.Open(force=True)
1176       except errors.BlockDeviceError, err:
1177         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1178     DevCacheManager.UpdateCache(device.dev_path, owner,
1179                                 on_primary, disk.iv_name)
1180
1181   device.SetInfo(info)
1182
1183   return device.unique_id
1184
1185
1186 def BlockdevRemove(disk):
1187   """Remove a block device.
1188
1189   @note: This is intended to be called recursively.
1190
1191   @type disk: L{objects.Disk}
1192   @param disk: the disk object we should remove
1193   @rtype: boolean
1194   @return: the success of the operation
1195
1196   """
1197   msgs = []
1198   try:
1199     rdev = _RecursiveFindBD(disk)
1200   except errors.BlockDeviceError, err:
1201     # probably can't attach
1202     logging.info("Can't attach to device %s in remove", disk)
1203     rdev = None
1204   if rdev is not None:
1205     r_path = rdev.dev_path
1206     try:
1207       rdev.Remove()
1208     except errors.BlockDeviceError, err:
1209       msgs.append(str(err))
1210     if not msgs:
1211       DevCacheManager.RemoveCache(r_path)
1212
1213   if disk.children:
1214     for child in disk.children:
1215       try:
1216         BlockdevRemove(child)
1217       except RPCFail, err:
1218         msgs.append(str(err))
1219
1220   if msgs:
1221     _Fail("; ".join(msgs))
1222
1223
1224 def _RecursiveAssembleBD(disk, owner, as_primary):
1225   """Activate a block device for an instance.
1226
1227   This is run on the primary and secondary nodes for an instance.
1228
1229   @note: this function is called recursively.
1230
1231   @type disk: L{objects.Disk}
1232   @param disk: the disk we try to assemble
1233   @type owner: str
1234   @param owner: the name of the instance which owns the disk
1235   @type as_primary: boolean
1236   @param as_primary: if we should make the block device
1237       read/write
1238
1239   @return: the assembled device or None (in case no device
1240       was assembled)
1241   @raise errors.BlockDeviceError: in case there is an error
1242       during the activation of the children or the device
1243       itself
1244
1245   """
1246   children = []
1247   if disk.children:
1248     mcn = disk.ChildrenNeeded()
1249     if mcn == -1:
1250       mcn = 0 # max number of Nones allowed
1251     else:
1252       mcn = len(disk.children) - mcn # max number of Nones
1253     for chld_disk in disk.children:
1254       try:
1255         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1256       except errors.BlockDeviceError, err:
1257         if children.count(None) >= mcn:
1258           raise
1259         cdev = None
1260         logging.error("Error in child activation (but continuing): %s",
1261                       str(err))
1262       children.append(cdev)
1263
1264   if as_primary or disk.AssembleOnSecondary():
1265     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1266     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1267     result = r_dev
1268     if as_primary or disk.OpenOnSecondary():
1269       r_dev.Open()
1270     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1271                                 as_primary, disk.iv_name)
1272
1273   else:
1274     result = True
1275   return result
1276
1277
1278 def BlockdevAssemble(disk, owner, as_primary):
1279   """Activate a block device for an instance.
1280
1281   This is a wrapper over _RecursiveAssembleBD.
1282
1283   @rtype: str or boolean
1284   @return: a C{/dev/...} path for primary nodes, and
1285       C{True} for secondary nodes
1286
1287   """
1288   try:
1289     result = _RecursiveAssembleBD(disk, owner, as_primary)
1290     if isinstance(result, bdev.BlockDev):
1291       result = result.dev_path
1292   except errors.BlockDeviceError, err:
1293     _Fail("Error while assembling disk: %s", err, exc=True)
1294
1295   return result
1296
1297
1298 def BlockdevShutdown(disk):
1299   """Shut down a block device.
1300
1301   First, if the device is assembled (Attach() is successful), then
1302   the device is shutdown. Then the children of the device are
1303   shutdown.
1304
1305   This function is called recursively. Note that we don't cache the
1306   children or such, as oppossed to assemble, shutdown of different
1307   devices doesn't require that the upper device was active.
1308
1309   @type disk: L{objects.Disk}
1310   @param disk: the description of the disk we should
1311       shutdown
1312   @rtype: None
1313
1314   """
1315   msgs = []
1316   r_dev = _RecursiveFindBD(disk)
1317   if r_dev is not None:
1318     r_path = r_dev.dev_path
1319     try:
1320       r_dev.Shutdown()
1321       DevCacheManager.RemoveCache(r_path)
1322     except errors.BlockDeviceError, err:
1323       msgs.append(str(err))
1324
1325   if disk.children:
1326     for child in disk.children:
1327       try:
1328         BlockdevShutdown(child)
1329       except RPCFail, err:
1330         msgs.append(str(err))
1331
1332   if msgs:
1333     _Fail("; ".join(msgs))
1334
1335
1336 def BlockdevAddchildren(parent_cdev, new_cdevs):
1337   """Extend a mirrored block device.
1338
1339   @type parent_cdev: L{objects.Disk}
1340   @param parent_cdev: the disk to which we should add children
1341   @type new_cdevs: list of L{objects.Disk}
1342   @param new_cdevs: the list of children which we should add
1343   @rtype: None
1344
1345   """
1346   parent_bdev = _RecursiveFindBD(parent_cdev)
1347   if parent_bdev is None:
1348     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1349   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1350   if new_bdevs.count(None) > 0:
1351     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1352   parent_bdev.AddChildren(new_bdevs)
1353
1354
1355 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1356   """Shrink a mirrored block device.
1357
1358   @type parent_cdev: L{objects.Disk}
1359   @param parent_cdev: the disk from which we should remove children
1360   @type new_cdevs: list of L{objects.Disk}
1361   @param new_cdevs: the list of children which we should remove
1362   @rtype: None
1363
1364   """
1365   parent_bdev = _RecursiveFindBD(parent_cdev)
1366   if parent_bdev is None:
1367     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1368   devs = []
1369   for disk in new_cdevs:
1370     rpath = disk.StaticDevPath()
1371     if rpath is None:
1372       bd = _RecursiveFindBD(disk)
1373       if bd is None:
1374         _Fail("Can't find device %s while removing children", disk)
1375       else:
1376         devs.append(bd.dev_path)
1377     else:
1378       devs.append(rpath)
1379   parent_bdev.RemoveChildren(devs)
1380
1381
1382 def BlockdevGetmirrorstatus(disks):
1383   """Get the mirroring status of a list of devices.
1384
1385   @type disks: list of L{objects.Disk}
1386   @param disks: the list of disks which we should query
1387   @rtype: disk
1388   @return:
1389       a list of (mirror_done, estimated_time) tuples, which
1390       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1391   @raise errors.BlockDeviceError: if any of the disks cannot be
1392       found
1393
1394   """
1395   stats = []
1396   for dsk in disks:
1397     rbd = _RecursiveFindBD(dsk)
1398     if rbd is None:
1399       _Fail("Can't find device %s", dsk)
1400
1401     stats.append(rbd.CombinedSyncStatus())
1402
1403   return stats
1404
1405
1406 def _RecursiveFindBD(disk):
1407   """Check if a device is activated.
1408
1409   If so, return information about the real device.
1410
1411   @type disk: L{objects.Disk}
1412   @param disk: the disk object we need to find
1413
1414   @return: None if the device can't be found,
1415       otherwise the device instance
1416
1417   """
1418   children = []
1419   if disk.children:
1420     for chdisk in disk.children:
1421       children.append(_RecursiveFindBD(chdisk))
1422
1423   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1424
1425
1426 def BlockdevFind(disk):
1427   """Check if a device is activated.
1428
1429   If it is, return information about the real device.
1430
1431   @type disk: L{objects.Disk}
1432   @param disk: the disk to find
1433   @rtype: None or objects.BlockDevStatus
1434   @return: None if the disk cannot be found, otherwise a the current
1435            information
1436
1437   """
1438   try:
1439     rbd = _RecursiveFindBD(disk)
1440   except errors.BlockDeviceError, err:
1441     _Fail("Failed to find device: %s", err, exc=True)
1442
1443   if rbd is None:
1444     return None
1445
1446   return rbd.GetSyncStatus()
1447
1448
1449 def BlockdevGetsize(disks):
1450   """Computes the size of the given disks.
1451
1452   If a disk is not found, returns None instead.
1453
1454   @type disks: list of L{objects.Disk}
1455   @param disks: the list of disk to compute the size for
1456   @rtype: list
1457   @return: list with elements None if the disk cannot be found,
1458       otherwise the size
1459
1460   """
1461   result = []
1462   for cf in disks:
1463     try:
1464       rbd = _RecursiveFindBD(cf)
1465     except errors.BlockDeviceError, err:
1466       result.append(None)
1467       continue
1468     if rbd is None:
1469       result.append(None)
1470     else:
1471       result.append(rbd.GetActualSize())
1472   return result
1473
1474
1475 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1476   """Export a block device to a remote node.
1477
1478   @type disk: L{objects.Disk}
1479   @param disk: the description of the disk to export
1480   @type dest_node: str
1481   @param dest_node: the destination node to export to
1482   @type dest_path: str
1483   @param dest_path: the destination path on the target node
1484   @type cluster_name: str
1485   @param cluster_name: the cluster name, needed for SSH hostalias
1486   @rtype: None
1487
1488   """
1489   real_disk = _RecursiveFindBD(disk)
1490   if real_disk is None:
1491     _Fail("Block device '%s' is not set up", disk)
1492
1493   real_disk.Open()
1494
1495   # the block size on the read dd is 1MiB to match our units
1496   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1497                                "dd if=%s bs=1048576 count=%s",
1498                                real_disk.dev_path, str(disk.size))
1499
1500   # we set here a smaller block size as, due to ssh buffering, more
1501   # than 64-128k will mostly ignored; we use nocreat to fail if the
1502   # device is not already there or we pass a wrong path; we use
1503   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1504   # to not buffer too much memory; this means that at best, we flush
1505   # every 64k, which will not be very fast
1506   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1507                                 " oflag=dsync", dest_path)
1508
1509   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1510                                                    constants.GANETI_RUNAS,
1511                                                    destcmd)
1512
1513   # all commands have been checked, so we're safe to combine them
1514   command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1515
1516   result = utils.RunCmd(["bash", "-c", command])
1517
1518   if result.failed:
1519     _Fail("Disk copy command '%s' returned error: %s"
1520           " output: %s", command, result.fail_reason, result.output)
1521
1522
1523 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1524   """Write a file to the filesystem.
1525
1526   This allows the master to overwrite(!) a file. It will only perform
1527   the operation if the file belongs to a list of configuration files.
1528
1529   @type file_name: str
1530   @param file_name: the target file name
1531   @type data: str
1532   @param data: the new contents of the file
1533   @type mode: int
1534   @param mode: the mode to give the file (can be None)
1535   @type uid: int
1536   @param uid: the owner of the file (can be -1 for default)
1537   @type gid: int
1538   @param gid: the group of the file (can be -1 for default)
1539   @type atime: float
1540   @param atime: the atime to set on the file (can be None)
1541   @type mtime: float
1542   @param mtime: the mtime to set on the file (can be None)
1543   @rtype: None
1544
1545   """
1546   if not os.path.isabs(file_name):
1547     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1548
1549   if file_name not in _ALLOWED_UPLOAD_FILES:
1550     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1551           file_name)
1552
1553   raw_data = _Decompress(data)
1554
1555   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1556                   atime=atime, mtime=mtime)
1557
1558
1559 def WriteSsconfFiles(values):
1560   """Update all ssconf files.
1561
1562   Wrapper around the SimpleStore.WriteFiles.
1563
1564   """
1565   ssconf.SimpleStore().WriteFiles(values)
1566
1567
1568 def _ErrnoOrStr(err):
1569   """Format an EnvironmentError exception.
1570
1571   If the L{err} argument has an errno attribute, it will be looked up
1572   and converted into a textual C{E...} description. Otherwise the
1573   string representation of the error will be returned.
1574
1575   @type err: L{EnvironmentError}
1576   @param err: the exception to format
1577
1578   """
1579   if hasattr(err, 'errno'):
1580     detail = errno.errorcode[err.errno]
1581   else:
1582     detail = str(err)
1583   return detail
1584
1585
1586 def _OSOndiskAPIVersion(name, os_dir):
1587   """Compute and return the API version of a given OS.
1588
1589   This function will try to read the API version of the OS given by
1590   the 'name' parameter and residing in the 'os_dir' directory.
1591
1592   @type name: str
1593   @param name: the OS name we should look for
1594   @type os_dir: str
1595   @param os_dir: the directory inwhich we should look for the OS
1596   @rtype: tuple
1597   @return: tuple (status, data) with status denoting the validity and
1598       data holding either the vaid versions or an error message
1599
1600   """
1601   api_file = os.path.sep.join([os_dir, constants.OS_API_FILE])
1602
1603   try:
1604     st = os.stat(api_file)
1605   except EnvironmentError, err:
1606     return False, ("Required file '%s' not found under path %s: %s" %
1607                    (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1608
1609   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1610     return False, ("File '%s' in %s is not a regular file" %
1611                    (constants.OS_API_FILE, os_dir))
1612
1613   try:
1614     api_versions = utils.ReadFile(api_file).splitlines()
1615   except EnvironmentError, err:
1616     return False, ("Error while reading the API version file at %s: %s" %
1617                    (api_file, _ErrnoOrStr(err)))
1618
1619   try:
1620     api_versions = [int(version.strip()) for version in api_versions]
1621   except (TypeError, ValueError), err:
1622     return False, ("API version(s) can't be converted to integer: %s" %
1623                    str(err))
1624
1625   return True, api_versions
1626
1627
1628 def DiagnoseOS(top_dirs=None):
1629   """Compute the validity for all OSes.
1630
1631   @type top_dirs: list
1632   @param top_dirs: the list of directories in which to
1633       search (if not given defaults to
1634       L{constants.OS_SEARCH_PATH})
1635   @rtype: list of L{objects.OS}
1636   @return: a list of tuples (name, path, status, diagnose)
1637       for all (potential) OSes under all search paths, where:
1638           - name is the (potential) OS name
1639           - path is the full path to the OS
1640           - status True/False is the validity of the OS
1641           - diagnose is the error message for an invalid OS, otherwise empty
1642
1643   """
1644   if top_dirs is None:
1645     top_dirs = constants.OS_SEARCH_PATH
1646
1647   result = []
1648   for dir_name in top_dirs:
1649     if os.path.isdir(dir_name):
1650       try:
1651         f_names = utils.ListVisibleFiles(dir_name)
1652       except EnvironmentError, err:
1653         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1654         break
1655       for name in f_names:
1656         os_path = os.path.sep.join([dir_name, name])
1657         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1658         if status:
1659           diagnose = ""
1660         else:
1661           diagnose = os_inst
1662         result.append((name, os_path, status, diagnose))
1663
1664   return result
1665
1666
1667 def _TryOSFromDisk(name, base_dir=None):
1668   """Create an OS instance from disk.
1669
1670   This function will return an OS instance if the given name is a
1671   valid OS name.
1672
1673   @type base_dir: string
1674   @keyword base_dir: Base directory containing OS installations.
1675                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1676   @rtype: tuple
1677   @return: success and either the OS instance if we find a valid one,
1678       or error message
1679
1680   """
1681   if base_dir is None:
1682     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1683     if os_dir is None:
1684       return False, "Directory for OS %s not found in search path" % name
1685   else:
1686     os_dir = os.path.sep.join([base_dir, name])
1687
1688   status, api_versions = _OSOndiskAPIVersion(name, os_dir)
1689   if not status:
1690     # push the error up
1691     return status, api_versions
1692
1693   if not constants.OS_API_VERSIONS.intersection(api_versions):
1694     return False, ("API version mismatch for path '%s': found %s, want %s." %
1695                    (os_dir, api_versions, constants.OS_API_VERSIONS))
1696
1697   # OS Files dictionary, we will populate it with the absolute path names
1698   os_files = dict.fromkeys(constants.OS_SCRIPTS)
1699
1700   if max(api_versions) >= constants.OS_API_V15:
1701     os_files[constants.OS_VARIANTS_FILE] = ''
1702
1703   for name in os_files:
1704     os_files[name] = os.path.sep.join([os_dir, name])
1705
1706     try:
1707       st = os.stat(os_files[name])
1708     except EnvironmentError, err:
1709       return False, ("File '%s' under path '%s' is missing (%s)" %
1710                      (name, os_dir, _ErrnoOrStr(err)))
1711
1712     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1713       return False, ("File '%s' under path '%s' is not a regular file" %
1714                      (name, os_dir))
1715
1716     if name in constants.OS_SCRIPTS:
1717       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1718         return False, ("File '%s' under path '%s' is not executable" %
1719                        (name, os_dir))
1720
1721   variants = None
1722   if constants.OS_VARIANTS_FILE in os_files:
1723     variants_file = os_files[constants.OS_VARIANTS_FILE]
1724     try:
1725       variants = utils.ReadFile(variants_file).splitlines()
1726     except EnvironmentError, err:
1727       return False, ("Error while reading the OS variants file at %s: %s" %
1728                      (variants_file, _ErrnoOrStr(err)))
1729     if not variants:
1730       return False, ("No supported os variant found")
1731
1732   os_obj = objects.OS(name=name, path=os_dir,
1733                       create_script=os_files[constants.OS_SCRIPT_CREATE],
1734                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
1735                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
1736                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
1737                       supported_variants=variants,
1738                       api_versions=api_versions)
1739   return True, os_obj
1740
1741
1742 def OSFromDisk(name, base_dir=None):
1743   """Create an OS instance from disk.
1744
1745   This function will return an OS instance if the given name is a
1746   valid OS name. Otherwise, it will raise an appropriate
1747   L{RPCFail} exception, detailing why this is not a valid OS.
1748
1749   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1750   an exception but returns true/false status data.
1751
1752   @type base_dir: string
1753   @keyword base_dir: Base directory containing OS installations.
1754                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1755   @rtype: L{objects.OS}
1756   @return: the OS instance if we find a valid one
1757   @raise RPCFail: if we don't find a valid OS
1758
1759   """
1760   name_only = name.split('+',1)[0]
1761   status, payload = _TryOSFromDisk(name_only, base_dir)
1762
1763   if not status:
1764     _Fail(payload)
1765
1766   return payload
1767
1768
1769 def OSEnvironment(instance, os, debug=0):
1770   """Calculate the environment for an os script.
1771
1772   @type instance: L{objects.Instance}
1773   @param instance: target instance for the os script run
1774   @type os: L{objects.OS}
1775   @param os: operating system for which the environment is being built
1776   @type debug: integer
1777   @param debug: debug level (0 or 1, for OS Api 10)
1778   @rtype: dict
1779   @return: dict of environment variables
1780   @raise errors.BlockDeviceError: if the block device
1781       cannot be found
1782
1783   """
1784   result = {}
1785   api_version = max(constants.OS_API_VERSIONS.intersection(os.api_versions))
1786   result['OS_API_VERSION'] = '%d' % api_version
1787   result['INSTANCE_NAME'] = instance.name
1788   result['INSTANCE_OS'] = instance.os
1789   result['HYPERVISOR'] = instance.hypervisor
1790   result['DISK_COUNT'] = '%d' % len(instance.disks)
1791   result['NIC_COUNT'] = '%d' % len(instance.nics)
1792   result['DEBUG_LEVEL'] = '%d' % debug
1793   if api_version >= constants.OS_API_V15:
1794     try:
1795       variant = instance.os.split('+', 1)[1]
1796     except IndexError:
1797       variant = os.supported_variants[0]
1798     result['OS_VARIANT'] = variant
1799   for idx, disk in enumerate(instance.disks):
1800     real_disk = _RecursiveFindBD(disk)
1801     if real_disk is None:
1802       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1803                                     str(disk))
1804     real_disk.Open()
1805     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1806     result['DISK_%d_ACCESS' % idx] = disk.mode
1807     if constants.HV_DISK_TYPE in instance.hvparams:
1808       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1809         instance.hvparams[constants.HV_DISK_TYPE]
1810     if disk.dev_type in constants.LDS_BLOCK:
1811       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1812     elif disk.dev_type == constants.LD_FILE:
1813       result['DISK_%d_BACKEND_TYPE' % idx] = \
1814         'file:%s' % disk.physical_id[0]
1815   for idx, nic in enumerate(instance.nics):
1816     result['NIC_%d_MAC' % idx] = nic.mac
1817     if nic.ip:
1818       result['NIC_%d_IP' % idx] = nic.ip
1819     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1820     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1821       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1822     if nic.nicparams[constants.NIC_LINK]:
1823       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1824     if constants.HV_NIC_TYPE in instance.hvparams:
1825       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1826         instance.hvparams[constants.HV_NIC_TYPE]
1827
1828   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1829     for key, value in source.items():
1830       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1831
1832   return result
1833
1834 def BlockdevGrow(disk, amount):
1835   """Grow a stack of block devices.
1836
1837   This function is called recursively, with the childrens being the
1838   first ones to resize.
1839
1840   @type disk: L{objects.Disk}
1841   @param disk: the disk to be grown
1842   @rtype: (status, result)
1843   @return: a tuple with the status of the operation
1844       (True/False), and the errors message if status
1845       is False
1846
1847   """
1848   r_dev = _RecursiveFindBD(disk)
1849   if r_dev is None:
1850     _Fail("Cannot find block device %s", disk)
1851
1852   try:
1853     r_dev.Grow(amount)
1854   except errors.BlockDeviceError, err:
1855     _Fail("Failed to grow block device: %s", err, exc=True)
1856
1857
1858 def BlockdevSnapshot(disk):
1859   """Create a snapshot copy of a block device.
1860
1861   This function is called recursively, and the snapshot is actually created
1862   just for the leaf lvm backend device.
1863
1864   @type disk: L{objects.Disk}
1865   @param disk: the disk to be snapshotted
1866   @rtype: string
1867   @return: snapshot disk path
1868
1869   """
1870   if disk.children:
1871     if len(disk.children) == 1:
1872       # only one child, let's recurse on it
1873       return BlockdevSnapshot(disk.children[0])
1874     else:
1875       # more than one child, choose one that matches
1876       for child in disk.children:
1877         if child.size == disk.size:
1878           # return implies breaking the loop
1879           return BlockdevSnapshot(child)
1880   elif disk.dev_type == constants.LD_LV:
1881     r_dev = _RecursiveFindBD(disk)
1882     if r_dev is not None:
1883       # let's stay on the safe side and ask for the full size, for now
1884       return r_dev.Snapshot(disk.size)
1885     else:
1886       _Fail("Cannot find block device %s", disk)
1887   else:
1888     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1889           disk.unique_id, disk.dev_type)
1890
1891
1892 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1893   """Export a block device snapshot to a remote node.
1894
1895   @type disk: L{objects.Disk}
1896   @param disk: the description of the disk to export
1897   @type dest_node: str
1898   @param dest_node: the destination node to export to
1899   @type instance: L{objects.Instance}
1900   @param instance: the instance object to whom the disk belongs
1901   @type cluster_name: str
1902   @param cluster_name: the cluster name, needed for SSH hostalias
1903   @type idx: int
1904   @param idx: the index of the disk in the instance's disk list,
1905       used to export to the OS scripts environment
1906   @rtype: None
1907
1908   """
1909   inst_os = OSFromDisk(instance.os)
1910   export_env = OSEnvironment(instance, inst_os)
1911
1912   export_script = inst_os.export_script
1913
1914   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1915                                      instance.name, int(time.time()))
1916   if not os.path.exists(constants.LOG_OS_DIR):
1917     os.mkdir(constants.LOG_OS_DIR, 0750)
1918   real_disk = _RecursiveFindBD(disk)
1919   if real_disk is None:
1920     _Fail("Block device '%s' is not set up", disk)
1921
1922   real_disk.Open()
1923
1924   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1925   export_env['EXPORT_INDEX'] = str(idx)
1926
1927   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1928   destfile = disk.physical_id[1]
1929
1930   # the target command is built out of three individual commands,
1931   # which are joined by pipes; we check each individual command for
1932   # valid parameters
1933   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
1934                                inst_os.path, export_script, logfile)
1935
1936   comprcmd = "gzip"
1937
1938   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1939                                 destdir, destdir, destfile)
1940   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1941                                                    constants.GANETI_RUNAS,
1942                                                    destcmd)
1943
1944   # all commands have been checked, so we're safe to combine them
1945   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1946
1947   result = utils.RunCmd(["bash", "-c", command], env=export_env)
1948
1949   if result.failed:
1950     _Fail("OS snapshot export command '%s' returned error: %s"
1951           " output: %s", command, result.fail_reason, result.output)
1952
1953
1954 def FinalizeExport(instance, snap_disks):
1955   """Write out the export configuration information.
1956
1957   @type instance: L{objects.Instance}
1958   @param instance: the instance which we export, used for
1959       saving configuration
1960   @type snap_disks: list of L{objects.Disk}
1961   @param snap_disks: list of snapshot block devices, which
1962       will be used to get the actual name of the dump file
1963
1964   @rtype: None
1965
1966   """
1967   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1968   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1969
1970   config = objects.SerializableConfigParser()
1971
1972   config.add_section(constants.INISECT_EXP)
1973   config.set(constants.INISECT_EXP, 'version', '0')
1974   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1975   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1976   config.set(constants.INISECT_EXP, 'os', instance.os)
1977   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1978
1979   config.add_section(constants.INISECT_INS)
1980   config.set(constants.INISECT_INS, 'name', instance.name)
1981   config.set(constants.INISECT_INS, 'memory', '%d' %
1982              instance.beparams[constants.BE_MEMORY])
1983   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1984              instance.beparams[constants.BE_VCPUS])
1985   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1986
1987   nic_total = 0
1988   for nic_count, nic in enumerate(instance.nics):
1989     nic_total += 1
1990     config.set(constants.INISECT_INS, 'nic%d_mac' %
1991                nic_count, '%s' % nic.mac)
1992     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1993     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1994                '%s' % nic.bridge)
1995   # TODO: redundant: on load can read nics until it doesn't exist
1996   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1997
1998   disk_total = 0
1999   for disk_count, disk in enumerate(snap_disks):
2000     if disk:
2001       disk_total += 1
2002       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2003                  ('%s' % disk.iv_name))
2004       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2005                  ('%s' % disk.physical_id[1]))
2006       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2007                  ('%d' % disk.size))
2008
2009   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2010
2011   utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
2012                   data=config.Dumps())
2013   shutil.rmtree(finaldestdir, True)
2014   shutil.move(destdir, finaldestdir)
2015
2016
2017 def ExportInfo(dest):
2018   """Get export configuration information.
2019
2020   @type dest: str
2021   @param dest: directory containing the export
2022
2023   @rtype: L{objects.SerializableConfigParser}
2024   @return: a serializable config file containing the
2025       export info
2026
2027   """
2028   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
2029
2030   config = objects.SerializableConfigParser()
2031   config.read(cff)
2032
2033   if (not config.has_section(constants.INISECT_EXP) or
2034       not config.has_section(constants.INISECT_INS)):
2035     _Fail("Export info file doesn't have the required fields")
2036
2037   return config.Dumps()
2038
2039
2040 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
2041   """Import an os image into an instance.
2042
2043   @type instance: L{objects.Instance}
2044   @param instance: instance to import the disks into
2045   @type src_node: string
2046   @param src_node: source node for the disk images
2047   @type src_images: list of string
2048   @param src_images: absolute paths of the disk images
2049   @rtype: list of boolean
2050   @return: each boolean represent the success of importing the n-th disk
2051
2052   """
2053   inst_os = OSFromDisk(instance.os)
2054   import_env = OSEnvironment(instance, inst_os)
2055   import_script = inst_os.import_script
2056
2057   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
2058                                         instance.name, int(time.time()))
2059   if not os.path.exists(constants.LOG_OS_DIR):
2060     os.mkdir(constants.LOG_OS_DIR, 0750)
2061
2062   comprcmd = "gunzip"
2063   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
2064                                import_script, logfile)
2065
2066   final_result = []
2067   for idx, image in enumerate(src_images):
2068     if image:
2069       destcmd = utils.BuildShellCmd('cat %s', image)
2070       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
2071                                                        constants.GANETI_RUNAS,
2072                                                        destcmd)
2073       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
2074       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
2075       import_env['IMPORT_INDEX'] = str(idx)
2076       result = utils.RunCmd(command, env=import_env)
2077       if result.failed:
2078         logging.error("Disk import command '%s' returned error: %s"
2079                       " output: %s", command, result.fail_reason,
2080                       result.output)
2081         final_result.append("error importing disk %d: %s, %s" %
2082                             (idx, result.fail_reason, result.output[-100]))
2083
2084   if final_result:
2085     _Fail("; ".join(final_result), log=False)
2086
2087
2088 def ListExports():
2089   """Return a list of exports currently available on this machine.
2090
2091   @rtype: list
2092   @return: list of the exports
2093
2094   """
2095   if os.path.isdir(constants.EXPORT_DIR):
2096     return utils.ListVisibleFiles(constants.EXPORT_DIR)
2097   else:
2098     _Fail("No exports directory")
2099
2100
2101 def RemoveExport(export):
2102   """Remove an existing export from the node.
2103
2104   @type export: str
2105   @param export: the name of the export to remove
2106   @rtype: None
2107
2108   """
2109   target = os.path.join(constants.EXPORT_DIR, export)
2110
2111   try:
2112     shutil.rmtree(target)
2113   except EnvironmentError, err:
2114     _Fail("Error while removing the export: %s", err, exc=True)
2115
2116
2117 def BlockdevRename(devlist):
2118   """Rename a list of block devices.
2119
2120   @type devlist: list of tuples
2121   @param devlist: list of tuples of the form  (disk,
2122       new_logical_id, new_physical_id); disk is an
2123       L{objects.Disk} object describing the current disk,
2124       and new logical_id/physical_id is the name we
2125       rename it to
2126   @rtype: boolean
2127   @return: True if all renames succeeded, False otherwise
2128
2129   """
2130   msgs = []
2131   result = True
2132   for disk, unique_id in devlist:
2133     dev = _RecursiveFindBD(disk)
2134     if dev is None:
2135       msgs.append("Can't find device %s in rename" % str(disk))
2136       result = False
2137       continue
2138     try:
2139       old_rpath = dev.dev_path
2140       dev.Rename(unique_id)
2141       new_rpath = dev.dev_path
2142       if old_rpath != new_rpath:
2143         DevCacheManager.RemoveCache(old_rpath)
2144         # FIXME: we should add the new cache information here, like:
2145         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2146         # but we don't have the owner here - maybe parse from existing
2147         # cache? for now, we only lose lvm data when we rename, which
2148         # is less critical than DRBD or MD
2149     except errors.BlockDeviceError, err:
2150       msgs.append("Can't rename device '%s' to '%s': %s" %
2151                   (dev, unique_id, err))
2152       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2153       result = False
2154   if not result:
2155     _Fail("; ".join(msgs))
2156
2157
2158 def _TransformFileStorageDir(file_storage_dir):
2159   """Checks whether given file_storage_dir is valid.
2160
2161   Checks wheter the given file_storage_dir is within the cluster-wide
2162   default file_storage_dir stored in SimpleStore. Only paths under that
2163   directory are allowed.
2164
2165   @type file_storage_dir: str
2166   @param file_storage_dir: the path to check
2167
2168   @return: the normalized path if valid, None otherwise
2169
2170   """
2171   cfg = _GetConfig()
2172   file_storage_dir = os.path.normpath(file_storage_dir)
2173   base_file_storage_dir = cfg.GetFileStorageDir()
2174   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2175       base_file_storage_dir):
2176     _Fail("File storage directory '%s' is not under base file"
2177           " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2178   return file_storage_dir
2179
2180
2181 def CreateFileStorageDir(file_storage_dir):
2182   """Create file storage directory.
2183
2184   @type file_storage_dir: str
2185   @param file_storage_dir: directory to create
2186
2187   @rtype: tuple
2188   @return: tuple with first element a boolean indicating wheter dir
2189       creation was successful or not
2190
2191   """
2192   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2193   if os.path.exists(file_storage_dir):
2194     if not os.path.isdir(file_storage_dir):
2195       _Fail("Specified storage dir '%s' is not a directory",
2196             file_storage_dir)
2197   else:
2198     try:
2199       os.makedirs(file_storage_dir, 0750)
2200     except OSError, err:
2201       _Fail("Cannot create file storage directory '%s': %s",
2202             file_storage_dir, err, exc=True)
2203
2204
2205 def RemoveFileStorageDir(file_storage_dir):
2206   """Remove file storage directory.
2207
2208   Remove it only if it's empty. If not log an error and return.
2209
2210   @type file_storage_dir: str
2211   @param file_storage_dir: the directory we should cleanup
2212   @rtype: tuple (success,)
2213   @return: tuple of one element, C{success}, denoting
2214       whether the operation was successful
2215
2216   """
2217   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2218   if os.path.exists(file_storage_dir):
2219     if not os.path.isdir(file_storage_dir):
2220       _Fail("Specified Storage directory '%s' is not a directory",
2221             file_storage_dir)
2222     # deletes dir only if empty, otherwise we want to fail the rpc call
2223     try:
2224       os.rmdir(file_storage_dir)
2225     except OSError, err:
2226       _Fail("Cannot remove file storage directory '%s': %s",
2227             file_storage_dir, err)
2228
2229
2230 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2231   """Rename the file storage directory.
2232
2233   @type old_file_storage_dir: str
2234   @param old_file_storage_dir: the current path
2235   @type new_file_storage_dir: str
2236   @param new_file_storage_dir: the name we should rename to
2237   @rtype: tuple (success,)
2238   @return: tuple of one element, C{success}, denoting
2239       whether the operation was successful
2240
2241   """
2242   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2243   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2244   if not os.path.exists(new_file_storage_dir):
2245     if os.path.isdir(old_file_storage_dir):
2246       try:
2247         os.rename(old_file_storage_dir, new_file_storage_dir)
2248       except OSError, err:
2249         _Fail("Cannot rename '%s' to '%s': %s",
2250               old_file_storage_dir, new_file_storage_dir, err)
2251     else:
2252       _Fail("Specified storage dir '%s' is not a directory",
2253             old_file_storage_dir)
2254   else:
2255     if os.path.exists(old_file_storage_dir):
2256       _Fail("Cannot rename '%s' to '%s': both locations exist",
2257             old_file_storage_dir, new_file_storage_dir)
2258
2259
2260 def _EnsureJobQueueFile(file_name):
2261   """Checks whether the given filename is in the queue directory.
2262
2263   @type file_name: str
2264   @param file_name: the file name we should check
2265   @rtype: None
2266   @raises RPCFail: if the file is not valid
2267
2268   """
2269   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2270   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2271
2272   if not result:
2273     _Fail("Passed job queue file '%s' does not belong to"
2274           " the queue directory '%s'", file_name, queue_dir)
2275
2276
2277 def JobQueueUpdate(file_name, content):
2278   """Updates a file in the queue directory.
2279
2280   This is just a wrapper over L{utils.WriteFile}, with proper
2281   checking.
2282
2283   @type file_name: str
2284   @param file_name: the job file name
2285   @type content: str
2286   @param content: the new job contents
2287   @rtype: boolean
2288   @return: the success of the operation
2289
2290   """
2291   _EnsureJobQueueFile(file_name)
2292
2293   # Write and replace the file atomically
2294   utils.WriteFile(file_name, data=_Decompress(content))
2295
2296
2297 def JobQueueRename(old, new):
2298   """Renames a job queue file.
2299
2300   This is just a wrapper over os.rename with proper checking.
2301
2302   @type old: str
2303   @param old: the old (actual) file name
2304   @type new: str
2305   @param new: the desired file name
2306   @rtype: tuple
2307   @return: the success of the operation and payload
2308
2309   """
2310   _EnsureJobQueueFile(old)
2311   _EnsureJobQueueFile(new)
2312
2313   utils.RenameFile(old, new, mkdir=True)
2314
2315
2316 def JobQueueSetDrainFlag(drain_flag):
2317   """Set the drain flag for the queue.
2318
2319   This will set or unset the queue drain flag.
2320
2321   @type drain_flag: boolean
2322   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2323   @rtype: truple
2324   @return: always True, None
2325   @warning: the function always returns True
2326
2327   """
2328   if drain_flag:
2329     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2330   else:
2331     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2332
2333
2334 def BlockdevClose(instance_name, disks):
2335   """Closes the given block devices.
2336
2337   This means they will be switched to secondary mode (in case of
2338   DRBD).
2339
2340   @param instance_name: if the argument is not empty, the symlinks
2341       of this instance will be removed
2342   @type disks: list of L{objects.Disk}
2343   @param disks: the list of disks to be closed
2344   @rtype: tuple (success, message)
2345   @return: a tuple of success and message, where success
2346       indicates the succes of the operation, and message
2347       which will contain the error details in case we
2348       failed
2349
2350   """
2351   bdevs = []
2352   for cf in disks:
2353     rd = _RecursiveFindBD(cf)
2354     if rd is None:
2355       _Fail("Can't find device %s", cf)
2356     bdevs.append(rd)
2357
2358   msg = []
2359   for rd in bdevs:
2360     try:
2361       rd.Close()
2362     except errors.BlockDeviceError, err:
2363       msg.append(str(err))
2364   if msg:
2365     _Fail("Can't make devices secondary: %s", ",".join(msg))
2366   else:
2367     if instance_name:
2368       _RemoveBlockDevLinks(instance_name, disks)
2369
2370
2371 def ValidateHVParams(hvname, hvparams):
2372   """Validates the given hypervisor parameters.
2373
2374   @type hvname: string
2375   @param hvname: the hypervisor name
2376   @type hvparams: dict
2377   @param hvparams: the hypervisor parameters to be validated
2378   @rtype: None
2379
2380   """
2381   try:
2382     hv_type = hypervisor.GetHypervisor(hvname)
2383     hv_type.ValidateParameters(hvparams)
2384   except errors.HypervisorError, err:
2385     _Fail(str(err), log=False)
2386
2387
2388 def DemoteFromMC():
2389   """Demotes the current node from master candidate role.
2390
2391   """
2392   # try to ensure we're not the master by mistake
2393   master, myself = ssconf.GetMasterAndMyself()
2394   if master == myself:
2395     _Fail("ssconf status shows I'm the master node, will not demote")
2396   pid_file = utils.DaemonPidFileName(constants.MASTERD)
2397   if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2398     _Fail("The master daemon is running, will not demote")
2399   try:
2400     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2401       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2402   except EnvironmentError, err:
2403     if err.errno != errno.ENOENT:
2404       _Fail("Error while backing up cluster file: %s", err, exc=True)
2405   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2406
2407
2408 def _FindDisks(nodes_ip, disks):
2409   """Sets the physical ID on disks and returns the block devices.
2410
2411   """
2412   # set the correct physical ID
2413   my_name = utils.HostInfo().name
2414   for cf in disks:
2415     cf.SetPhysicalID(my_name, nodes_ip)
2416
2417   bdevs = []
2418
2419   for cf in disks:
2420     rd = _RecursiveFindBD(cf)
2421     if rd is None:
2422       _Fail("Can't find device %s", cf)
2423     bdevs.append(rd)
2424   return bdevs
2425
2426
2427 def DrbdDisconnectNet(nodes_ip, disks):
2428   """Disconnects the network on a list of drbd devices.
2429
2430   """
2431   bdevs = _FindDisks(nodes_ip, disks)
2432
2433   # disconnect disks
2434   for rd in bdevs:
2435     try:
2436       rd.DisconnectNet()
2437     except errors.BlockDeviceError, err:
2438       _Fail("Can't change network configuration to standalone mode: %s",
2439             err, exc=True)
2440
2441
2442 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2443   """Attaches the network on a list of drbd devices.
2444
2445   """
2446   bdevs = _FindDisks(nodes_ip, disks)
2447
2448   if multimaster:
2449     for idx, rd in enumerate(bdevs):
2450       try:
2451         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2452       except EnvironmentError, err:
2453         _Fail("Can't create symlink: %s", err)
2454   # reconnect disks, switch to new master configuration and if
2455   # needed primary mode
2456   for rd in bdevs:
2457     try:
2458       rd.AttachNet(multimaster)
2459     except errors.BlockDeviceError, err:
2460       _Fail("Can't change network configuration: %s", err)
2461   # wait until the disks are connected; we need to retry the re-attach
2462   # if the device becomes standalone, as this might happen if the one
2463   # node disconnects and reconnects in a different mode before the
2464   # other node reconnects; in this case, one or both of the nodes will
2465   # decide it has wrong configuration and switch to standalone
2466   RECONNECT_TIMEOUT = 2 * 60
2467   sleep_time = 0.100 # start with 100 miliseconds
2468   timeout_limit = time.time() + RECONNECT_TIMEOUT
2469   while time.time() < timeout_limit:
2470     all_connected = True
2471     for rd in bdevs:
2472       stats = rd.GetProcStatus()
2473       if not (stats.is_connected or stats.is_in_resync):
2474         all_connected = False
2475       if stats.is_standalone:
2476         # peer had different config info and this node became
2477         # standalone, even though this should not happen with the
2478         # new staged way of changing disk configs
2479         try:
2480           rd.AttachNet(multimaster)
2481         except errors.BlockDeviceError, err:
2482           _Fail("Can't change network configuration: %s", err)
2483     if all_connected:
2484       break
2485     time.sleep(sleep_time)
2486     sleep_time = min(5, sleep_time * 1.5)
2487   if not all_connected:
2488     _Fail("Timeout in disk reconnecting")
2489   if multimaster:
2490     # change to primary mode
2491     for rd in bdevs:
2492       try:
2493         rd.Open()
2494       except errors.BlockDeviceError, err:
2495         _Fail("Can't change to primary mode: %s", err)
2496
2497
2498 def DrbdWaitSync(nodes_ip, disks):
2499   """Wait until DRBDs have synchronized.
2500
2501   """
2502   bdevs = _FindDisks(nodes_ip, disks)
2503
2504   min_resync = 100
2505   alldone = True
2506   for rd in bdevs:
2507     stats = rd.GetProcStatus()
2508     if not (stats.is_connected or stats.is_in_resync):
2509       _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2510     alldone = alldone and (not stats.is_in_resync)
2511     if stats.sync_percent is not None:
2512       min_resync = min(min_resync, stats.sync_percent)
2513
2514   return (alldone, min_resync)
2515
2516
2517 def PowercycleNode(hypervisor_type):
2518   """Hard-powercycle the node.
2519
2520   Because we need to return first, and schedule the powercycle in the
2521   background, we won't be able to report failures nicely.
2522
2523   """
2524   hyper = hypervisor.GetHypervisor(hypervisor_type)
2525   try:
2526     pid = os.fork()
2527   except OSError:
2528     # if we can't fork, we'll pretend that we're in the child process
2529     pid = 0
2530   if pid > 0:
2531     return "Reboot scheduled in 5 seconds"
2532   time.sleep(5)
2533   hyper.PowercycleNode()
2534
2535
2536 class HooksRunner(object):
2537   """Hook runner.
2538
2539   This class is instantiated on the node side (ganeti-noded) and not
2540   on the master side.
2541
2542   """
2543   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2544
2545   def __init__(self, hooks_base_dir=None):
2546     """Constructor for hooks runner.
2547
2548     @type hooks_base_dir: str or None
2549     @param hooks_base_dir: if not None, this overrides the
2550         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2551
2552     """
2553     if hooks_base_dir is None:
2554       hooks_base_dir = constants.HOOKS_BASE_DIR
2555     self._BASE_DIR = hooks_base_dir
2556
2557   @staticmethod
2558   def ExecHook(script, env):
2559     """Exec one hook script.
2560
2561     @type script: str
2562     @param script: the full path to the script
2563     @type env: dict
2564     @param env: the environment with which to exec the script
2565     @rtype: tuple (success, message)
2566     @return: a tuple of success and message, where success
2567         indicates the succes of the operation, and message
2568         which will contain the error details in case we
2569         failed
2570
2571     """
2572     # exec the process using subprocess and log the output
2573     fdstdin = None
2574     try:
2575       fdstdin = open("/dev/null", "r")
2576       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2577                                stderr=subprocess.STDOUT, close_fds=True,
2578                                shell=False, cwd="/", env=env)
2579       output = ""
2580       try:
2581         output = child.stdout.read(4096)
2582         child.stdout.close()
2583       except EnvironmentError, err:
2584         output += "Hook script error: %s" % str(err)
2585
2586       while True:
2587         try:
2588           result = child.wait()
2589           break
2590         except EnvironmentError, err:
2591           if err.errno == errno.EINTR:
2592             continue
2593           raise
2594     finally:
2595       # try not to leak fds
2596       for fd in (fdstdin, ):
2597         if fd is not None:
2598           try:
2599             fd.close()
2600           except EnvironmentError, err:
2601             # just log the error
2602             #logging.exception("Error while closing fd %s", fd)
2603             pass
2604
2605     return result == 0, utils.SafeEncode(output.strip())
2606
2607   def RunHooks(self, hpath, phase, env):
2608     """Run the scripts in the hooks directory.
2609
2610     @type hpath: str
2611     @param hpath: the path to the hooks directory which
2612         holds the scripts
2613     @type phase: str
2614     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2615         L{constants.HOOKS_PHASE_POST}
2616     @type env: dict
2617     @param env: dictionary with the environment for the hook
2618     @rtype: list
2619     @return: list of 3-element tuples:
2620       - script path
2621       - script result, either L{constants.HKR_SUCCESS} or
2622         L{constants.HKR_FAIL}
2623       - output of the script
2624
2625     @raise errors.ProgrammerError: for invalid input
2626         parameters
2627
2628     """
2629     if phase == constants.HOOKS_PHASE_PRE:
2630       suffix = "pre"
2631     elif phase == constants.HOOKS_PHASE_POST:
2632       suffix = "post"
2633     else:
2634       _Fail("Unknown hooks phase '%s'", phase)
2635
2636     rr = []
2637
2638     subdir = "%s-%s.d" % (hpath, suffix)
2639     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2640     try:
2641       dir_contents = utils.ListVisibleFiles(dir_name)
2642     except OSError:
2643       # FIXME: must log output in case of failures
2644       return rr
2645
2646     # we use the standard python sort order,
2647     # so 00name is the recommended naming scheme
2648     dir_contents.sort()
2649     for relname in dir_contents:
2650       fname = os.path.join(dir_name, relname)
2651       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2652           self.RE_MASK.match(relname) is not None):
2653         rrval = constants.HKR_SKIP
2654         output = ""
2655       else:
2656         result, output = self.ExecHook(fname, env)
2657         if not result:
2658           rrval = constants.HKR_FAIL
2659         else:
2660           rrval = constants.HKR_SUCCESS
2661       rr.append(("%s/%s" % (subdir, relname), rrval, output))
2662
2663     return rr
2664
2665
2666 class IAllocatorRunner(object):
2667   """IAllocator runner.
2668
2669   This class is instantiated on the node side (ganeti-noded) and not on
2670   the master side.
2671
2672   """
2673   def Run(self, name, idata):
2674     """Run an iallocator script.
2675
2676     @type name: str
2677     @param name: the iallocator script name
2678     @type idata: str
2679     @param idata: the allocator input data
2680
2681     @rtype: tuple
2682     @return: two element tuple of:
2683        - status
2684        - either error message or stdout of allocator (for success)
2685
2686     """
2687     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2688                                   os.path.isfile)
2689     if alloc_script is None:
2690       _Fail("iallocator module '%s' not found in the search path", name)
2691
2692     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2693     try:
2694       os.write(fd, idata)
2695       os.close(fd)
2696       result = utils.RunCmd([alloc_script, fin_name])
2697       if result.failed:
2698         _Fail("iallocator module '%s' failed: %s, output '%s'",
2699               name, result.fail_reason, result.output)
2700     finally:
2701       os.unlink(fin_name)
2702
2703     return result.stdout
2704
2705
2706 class DevCacheManager(object):
2707   """Simple class for managing a cache of block device information.
2708
2709   """
2710   _DEV_PREFIX = "/dev/"
2711   _ROOT_DIR = constants.BDEV_CACHE_DIR
2712
2713   @classmethod
2714   def _ConvertPath(cls, dev_path):
2715     """Converts a /dev/name path to the cache file name.
2716
2717     This replaces slashes with underscores and strips the /dev
2718     prefix. It then returns the full path to the cache file.
2719
2720     @type dev_path: str
2721     @param dev_path: the C{/dev/} path name
2722     @rtype: str
2723     @return: the converted path name
2724
2725     """
2726     if dev_path.startswith(cls._DEV_PREFIX):
2727       dev_path = dev_path[len(cls._DEV_PREFIX):]
2728     dev_path = dev_path.replace("/", "_")
2729     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2730     return fpath
2731
2732   @classmethod
2733   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2734     """Updates the cache information for a given device.
2735
2736     @type dev_path: str
2737     @param dev_path: the pathname of the device
2738     @type owner: str
2739     @param owner: the owner (instance name) of the device
2740     @type on_primary: bool
2741     @param on_primary: whether this is the primary
2742         node nor not
2743     @type iv_name: str
2744     @param iv_name: the instance-visible name of the
2745         device, as in objects.Disk.iv_name
2746
2747     @rtype: None
2748
2749     """
2750     if dev_path is None:
2751       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2752       return
2753     fpath = cls._ConvertPath(dev_path)
2754     if on_primary:
2755       state = "primary"
2756     else:
2757       state = "secondary"
2758     if iv_name is None:
2759       iv_name = "not_visible"
2760     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2761     try:
2762       utils.WriteFile(fpath, data=fdata)
2763     except EnvironmentError, err:
2764       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
2765
2766   @classmethod
2767   def RemoveCache(cls, dev_path):
2768     """Remove data for a dev_path.
2769
2770     This is just a wrapper over L{utils.RemoveFile} with a converted
2771     path name and logging.
2772
2773     @type dev_path: str
2774     @param dev_path: the pathname of the device
2775
2776     @rtype: None
2777
2778     """
2779     if dev_path is None:
2780       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2781       return
2782     fpath = cls._ConvertPath(dev_path)
2783     try:
2784       utils.RemoveFile(fpath)
2785     except EnvironmentError, err:
2786       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)