Unify the “--backend-parameters” option
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26
27 """
28
29
30 import os
31 import os.path
32 import shutil
33 import time
34 import stat
35 import errno
36 import re
37 import subprocess
38 import random
39 import logging
40 import tempfile
41 import zlib
42 import base64
43
44 from ganeti import errors
45 from ganeti import utils
46 from ganeti import ssh
47 from ganeti import hypervisor
48 from ganeti import constants
49 from ganeti import bdev
50 from ganeti import objects
51 from ganeti import ssconf
52
53
54 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
55
56
57 class RPCFail(Exception):
58   """Class denoting RPC failure.
59
60   Its argument is the error message.
61
62   """
63
64
65 def _Fail(msg, *args, **kwargs):
66   """Log an error and the raise an RPCFail exception.
67
68   This exception is then handled specially in the ganeti daemon and
69   turned into a 'failed' return type. As such, this function is a
70   useful shortcut for logging the error and returning it to the master
71   daemon.
72
73   @type msg: string
74   @param msg: the text of the exception
75   @raise RPCFail
76
77   """
78   if args:
79     msg = msg % args
80   if "log" not in kwargs or kwargs["log"]: # if we should log this error
81     if "exc" in kwargs and kwargs["exc"]:
82       logging.exception(msg)
83     else:
84       logging.error(msg)
85   raise RPCFail(msg)
86
87
88 def _GetConfig():
89   """Simple wrapper to return a SimpleStore.
90
91   @rtype: L{ssconf.SimpleStore}
92   @return: a SimpleStore instance
93
94   """
95   return ssconf.SimpleStore()
96
97
98 def _GetSshRunner(cluster_name):
99   """Simple wrapper to return an SshRunner.
100
101   @type cluster_name: str
102   @param cluster_name: the cluster name, which is needed
103       by the SshRunner constructor
104   @rtype: L{ssh.SshRunner}
105   @return: an SshRunner instance
106
107   """
108   return ssh.SshRunner(cluster_name)
109
110
111 def _Decompress(data):
112   """Unpacks data compressed by the RPC client.
113
114   @type data: list or tuple
115   @param data: Data sent by RPC client
116   @rtype: str
117   @return: Decompressed data
118
119   """
120   assert isinstance(data, (list, tuple))
121   assert len(data) == 2
122   (encoding, content) = data
123   if encoding == constants.RPC_ENCODING_NONE:
124     return content
125   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
126     return zlib.decompress(base64.b64decode(content))
127   else:
128     raise AssertionError("Unknown data encoding")
129
130
131 def _CleanDirectory(path, exclude=None):
132   """Removes all regular files in a directory.
133
134   @type path: str
135   @param path: the directory to clean
136   @type exclude: list
137   @param exclude: list of files to be excluded, defaults
138       to the empty list
139
140   """
141   if not os.path.isdir(path):
142     return
143   if exclude is None:
144     exclude = []
145   else:
146     # Normalize excluded paths
147     exclude = [os.path.normpath(i) for i in exclude]
148
149   for rel_name in utils.ListVisibleFiles(path):
150     full_name = os.path.normpath(os.path.join(path, rel_name))
151     if full_name in exclude:
152       continue
153     if os.path.isfile(full_name) and not os.path.islink(full_name):
154       utils.RemoveFile(full_name)
155
156
157 def _BuildUploadFileList():
158   """Build the list of allowed upload files.
159
160   This is abstracted so that it's built only once at module import time.
161
162   """
163   allowed_files = set([
164     constants.CLUSTER_CONF_FILE,
165     constants.ETC_HOSTS,
166     constants.SSH_KNOWN_HOSTS_FILE,
167     constants.VNC_PASSWORD_FILE,
168     constants.RAPI_CERT_FILE,
169     constants.RAPI_USERS_FILE,
170     constants.HMAC_CLUSTER_KEY,
171     ])
172
173   for hv_name in constants.HYPER_TYPES:
174     hv_class = hypervisor.GetHypervisorClass(hv_name)
175     allowed_files.update(hv_class.GetAncillaryFiles())
176
177   return frozenset(allowed_files)
178
179
180 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
181
182
183 def JobQueuePurge():
184   """Removes job queue files and archived jobs.
185
186   @rtype: tuple
187   @return: True, None
188
189   """
190   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
191   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
192
193
194 def GetMasterInfo():
195   """Returns master information.
196
197   This is an utility function to compute master information, either
198   for consumption here or from the node daemon.
199
200   @rtype: tuple
201   @return: master_netdev, master_ip, master_name
202   @raise RPCFail: in case of errors
203
204   """
205   try:
206     cfg = _GetConfig()
207     master_netdev = cfg.GetMasterNetdev()
208     master_ip = cfg.GetMasterIP()
209     master_node = cfg.GetMasterNode()
210   except errors.ConfigurationError, err:
211     _Fail("Cluster configuration incomplete: %s", err, exc=True)
212   return (master_netdev, master_ip, master_node)
213
214
215 def StartMaster(start_daemons, no_voting):
216   """Activate local node as master node.
217
218   The function will always try activate the IP address of the master
219   (unless someone else has it). It will also start the master daemons,
220   based on the start_daemons parameter.
221
222   @type start_daemons: boolean
223   @param start_daemons: whether to also start the master
224       daemons (ganeti-masterd and ganeti-rapi)
225   @type no_voting: boolean
226   @param no_voting: whether to start ganeti-masterd without a node vote
227       (if start_daemons is True), but still non-interactively
228   @rtype: None
229
230   """
231   # GetMasterInfo will raise an exception if not able to return data
232   master_netdev, master_ip, _ = GetMasterInfo()
233
234   err_msgs = []
235   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
236     if utils.OwnIpAddress(master_ip):
237       # we already have the ip:
238       logging.debug("Master IP already configured, doing nothing")
239     else:
240       msg = "Someone else has the master ip, not activating"
241       logging.error(msg)
242       err_msgs.append(msg)
243   else:
244     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
245                            "dev", master_netdev, "label",
246                            "%s:0" % master_netdev])
247     if result.failed:
248       msg = "Can't activate master IP: %s" % result.output
249       logging.error(msg)
250       err_msgs.append(msg)
251
252     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
253                            "-s", master_ip, master_ip])
254     # we'll ignore the exit code of arping
255
256   # and now start the master and rapi daemons
257   if start_daemons:
258     daemons_params = {
259         'ganeti-masterd': [],
260         'ganeti-rapi': [],
261         }
262     if no_voting:
263       daemons_params['ganeti-masterd'].append('--no-voting')
264       daemons_params['ganeti-masterd'].append('--yes-do-it')
265     for daemon in daemons_params:
266       cmd = [daemon]
267       cmd.extend(daemons_params[daemon])
268       result = utils.RunCmd(cmd)
269       if result.failed:
270         msg = "Can't start daemon %s: %s" % (daemon, result.output)
271         logging.error(msg)
272         err_msgs.append(msg)
273
274   if err_msgs:
275     _Fail("; ".join(err_msgs))
276
277
278 def StopMaster(stop_daemons):
279   """Deactivate this node as master.
280
281   The function will always try to deactivate the IP address of the
282   master. It will also stop the master daemons depending on the
283   stop_daemons parameter.
284
285   @type stop_daemons: boolean
286   @param stop_daemons: whether to also stop the master daemons
287       (ganeti-masterd and ganeti-rapi)
288   @rtype: None
289
290   """
291   # TODO: log and report back to the caller the error failures; we
292   # need to decide in which case we fail the RPC for this
293
294   # GetMasterInfo will raise an exception if not able to return data
295   master_netdev, master_ip, _ = GetMasterInfo()
296
297   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
298                          "dev", master_netdev])
299   if result.failed:
300     logging.error("Can't remove the master IP, error: %s", result.output)
301     # but otherwise ignore the failure
302
303   if stop_daemons:
304     # stop/kill the rapi and the master daemon
305     for daemon in constants.RAPI, constants.MASTERD:
306       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
307
308
309 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
310   """Joins this node to the cluster.
311
312   This does the following:
313       - updates the hostkeys of the machine (rsa and dsa)
314       - adds the ssh private key to the user
315       - adds the ssh public key to the users' authorized_keys file
316
317   @type dsa: str
318   @param dsa: the DSA private key to write
319   @type dsapub: str
320   @param dsapub: the DSA public key to write
321   @type rsa: str
322   @param rsa: the RSA private key to write
323   @type rsapub: str
324   @param rsapub: the RSA public key to write
325   @type sshkey: str
326   @param sshkey: the SSH private key to write
327   @type sshpub: str
328   @param sshpub: the SSH public key to write
329   @rtype: boolean
330   @return: the success of the operation
331
332   """
333   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
334                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
335                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
336                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
337   for name, content, mode in sshd_keys:
338     utils.WriteFile(name, data=content, mode=mode)
339
340   try:
341     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
342                                                     mkdir=True)
343   except errors.OpExecError, err:
344     _Fail("Error while processing user ssh files: %s", err, exc=True)
345
346   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
347     utils.WriteFile(name, data=content, mode=0600)
348
349   utils.AddAuthorizedKey(auth_keys, sshpub)
350
351   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
352
353
354 def LeaveCluster():
355   """Cleans up and remove the current node.
356
357   This function cleans up and prepares the current node to be removed
358   from the cluster.
359
360   If processing is successful, then it raises an
361   L{errors.QuitGanetiException} which is used as a special case to
362   shutdown the node daemon.
363
364   """
365   _CleanDirectory(constants.DATA_DIR)
366   JobQueuePurge()
367
368   try:
369     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
370
371     utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
372
373     utils.RemoveFile(priv_key)
374     utils.RemoveFile(pub_key)
375   except errors.OpExecError:
376     logging.exception("Error while processing ssh files")
377
378   # Raise a custom exception (handled in ganeti-noded)
379   raise errors.QuitGanetiException(True, 'Shutdown scheduled')
380
381
382 def GetNodeInfo(vgname, hypervisor_type):
383   """Gives back a hash with different information about the node.
384
385   @type vgname: C{string}
386   @param vgname: the name of the volume group to ask for disk space information
387   @type hypervisor_type: C{str}
388   @param hypervisor_type: the name of the hypervisor to ask for
389       memory information
390   @rtype: C{dict}
391   @return: dictionary with the following keys:
392       - vg_size is the size of the configured volume group in MiB
393       - vg_free is the free size of the volume group in MiB
394       - memory_dom0 is the memory allocated for domain0 in MiB
395       - memory_free is the currently available (free) ram in MiB
396       - memory_total is the total number of ram in MiB
397
398   """
399   outputarray = {}
400   vginfo = _GetVGInfo(vgname)
401   outputarray['vg_size'] = vginfo['vg_size']
402   outputarray['vg_free'] = vginfo['vg_free']
403
404   hyper = hypervisor.GetHypervisor(hypervisor_type)
405   hyp_info = hyper.GetNodeInfo()
406   if hyp_info is not None:
407     outputarray.update(hyp_info)
408
409   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
410
411   return outputarray
412
413
414 def VerifyNode(what, cluster_name):
415   """Verify the status of the local node.
416
417   Based on the input L{what} parameter, various checks are done on the
418   local node.
419
420   If the I{filelist} key is present, this list of
421   files is checksummed and the file/checksum pairs are returned.
422
423   If the I{nodelist} key is present, we check that we have
424   connectivity via ssh with the target nodes (and check the hostname
425   report).
426
427   If the I{node-net-test} key is present, we check that we have
428   connectivity to the given nodes via both primary IP and, if
429   applicable, secondary IPs.
430
431   @type what: C{dict}
432   @param what: a dictionary of things to check:
433       - filelist: list of files for which to compute checksums
434       - nodelist: list of nodes we should check ssh communication with
435       - node-net-test: list of nodes we should check node daemon port
436         connectivity with
437       - hypervisor: list with hypervisors to run the verify for
438   @rtype: dict
439   @return: a dictionary with the same keys as the input dict, and
440       values representing the result of the checks
441
442   """
443   result = {}
444
445   if constants.NV_HYPERVISOR in what:
446     result[constants.NV_HYPERVISOR] = tmp = {}
447     for hv_name in what[constants.NV_HYPERVISOR]:
448       tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
449
450   if constants.NV_FILELIST in what:
451     result[constants.NV_FILELIST] = utils.FingerprintFiles(
452       what[constants.NV_FILELIST])
453
454   if constants.NV_NODELIST in what:
455     result[constants.NV_NODELIST] = tmp = {}
456     random.shuffle(what[constants.NV_NODELIST])
457     for node in what[constants.NV_NODELIST]:
458       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
459       if not success:
460         tmp[node] = message
461
462   if constants.NV_NODENETTEST in what:
463     result[constants.NV_NODENETTEST] = tmp = {}
464     my_name = utils.HostInfo().name
465     my_pip = my_sip = None
466     for name, pip, sip in what[constants.NV_NODENETTEST]:
467       if name == my_name:
468         my_pip = pip
469         my_sip = sip
470         break
471     if not my_pip:
472       tmp[my_name] = ("Can't find my own primary/secondary IP"
473                       " in the node list")
474     else:
475       port = utils.GetDaemonPort(constants.NODED)
476       for name, pip, sip in what[constants.NV_NODENETTEST]:
477         fail = []
478         if not utils.TcpPing(pip, port, source=my_pip):
479           fail.append("primary")
480         if sip != pip:
481           if not utils.TcpPing(sip, port, source=my_sip):
482             fail.append("secondary")
483         if fail:
484           tmp[name] = ("failure using the %s interface(s)" %
485                        " and ".join(fail))
486
487   if constants.NV_LVLIST in what:
488     result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
489
490   if constants.NV_INSTANCELIST in what:
491     result[constants.NV_INSTANCELIST] = GetInstanceList(
492       what[constants.NV_INSTANCELIST])
493
494   if constants.NV_VGLIST in what:
495     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
496
497   if constants.NV_VERSION in what:
498     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
499                                     constants.RELEASE_VERSION)
500
501   if constants.NV_HVINFO in what:
502     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
503     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
504
505   if constants.NV_DRBDLIST in what:
506     try:
507       used_minors = bdev.DRBD8.GetUsedDevs().keys()
508     except errors.BlockDeviceError, err:
509       logging.warning("Can't get used minors list", exc_info=True)
510       used_minors = str(err)
511     result[constants.NV_DRBDLIST] = used_minors
512
513   return result
514
515
516 def GetVolumeList(vg_name):
517   """Compute list of logical volumes and their size.
518
519   @type vg_name: str
520   @param vg_name: the volume group whose LVs we should list
521   @rtype: dict
522   @return:
523       dictionary of all partions (key) with value being a tuple of
524       their size (in MiB), inactive and online status::
525
526         {'test1': ('20.06', True, True)}
527
528       in case of errors, a string is returned with the error
529       details.
530
531   """
532   lvs = {}
533   sep = '|'
534   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
535                          "--separator=%s" % sep,
536                          "-olv_name,lv_size,lv_attr", vg_name])
537   if result.failed:
538     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
539
540   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
541   for line in result.stdout.splitlines():
542     line = line.strip()
543     match = valid_line_re.match(line)
544     if not match:
545       logging.error("Invalid line returned from lvs output: '%s'", line)
546       continue
547     name, size, attr = match.groups()
548     inactive = attr[4] == '-'
549     online = attr[5] == 'o'
550     virtual = attr[0] == 'v'
551     if virtual:
552       # we don't want to report such volumes as existing, since they
553       # don't really hold data
554       continue
555     lvs[name] = (size, inactive, online)
556
557   return lvs
558
559
560 def ListVolumeGroups():
561   """List the volume groups and their size.
562
563   @rtype: dict
564   @return: dictionary with keys volume name and values the
565       size of the volume
566
567   """
568   return utils.ListVolumeGroups()
569
570
571 def NodeVolumes():
572   """List all volumes on this node.
573
574   @rtype: list
575   @return:
576     A list of dictionaries, each having four keys:
577       - name: the logical volume name,
578       - size: the size of the logical volume
579       - dev: the physical device on which the LV lives
580       - vg: the volume group to which it belongs
581
582     In case of errors, we return an empty list and log the
583     error.
584
585     Note that since a logical volume can live on multiple physical
586     volumes, the resulting list might include a logical volume
587     multiple times.
588
589   """
590   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
591                          "--separator=|",
592                          "--options=lv_name,lv_size,devices,vg_name"])
593   if result.failed:
594     _Fail("Failed to list logical volumes, lvs output: %s",
595           result.output)
596
597   def parse_dev(dev):
598     if '(' in dev:
599       return dev.split('(')[0]
600     else:
601       return dev
602
603   def map_line(line):
604     return {
605       'name': line[0].strip(),
606       'size': line[1].strip(),
607       'dev': parse_dev(line[2].strip()),
608       'vg': line[3].strip(),
609     }
610
611   return [map_line(line.split('|')) for line in result.stdout.splitlines()
612           if line.count('|') >= 3]
613
614
615 def BridgesExist(bridges_list):
616   """Check if a list of bridges exist on the current node.
617
618   @rtype: boolean
619   @return: C{True} if all of them exist, C{False} otherwise
620
621   """
622   missing = []
623   for bridge in bridges_list:
624     if not utils.BridgeExists(bridge):
625       missing.append(bridge)
626
627   if missing:
628     _Fail("Missing bridges %s", ", ".join(missing))
629
630
631 def GetInstanceList(hypervisor_list):
632   """Provides a list of instances.
633
634   @type hypervisor_list: list
635   @param hypervisor_list: the list of hypervisors to query information
636
637   @rtype: list
638   @return: a list of all running instances on the current node
639     - instance1.example.com
640     - instance2.example.com
641
642   """
643   results = []
644   for hname in hypervisor_list:
645     try:
646       names = hypervisor.GetHypervisor(hname).ListInstances()
647       results.extend(names)
648     except errors.HypervisorError, err:
649       _Fail("Error enumerating instances (hypervisor %s): %s",
650             hname, err, exc=True)
651
652   return results
653
654
655 def GetInstanceInfo(instance, hname):
656   """Gives back the information about an instance as a dictionary.
657
658   @type instance: string
659   @param instance: the instance name
660   @type hname: string
661   @param hname: the hypervisor type of the instance
662
663   @rtype: dict
664   @return: dictionary with the following keys:
665       - memory: memory size of instance (int)
666       - state: xen state of instance (string)
667       - time: cpu time of instance (float)
668
669   """
670   output = {}
671
672   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
673   if iinfo is not None:
674     output['memory'] = iinfo[2]
675     output['state'] = iinfo[4]
676     output['time'] = iinfo[5]
677
678   return output
679
680
681 def GetInstanceMigratable(instance):
682   """Gives whether an instance can be migrated.
683
684   @type instance: L{objects.Instance}
685   @param instance: object representing the instance to be checked.
686
687   @rtype: tuple
688   @return: tuple of (result, description) where:
689       - result: whether the instance can be migrated or not
690       - description: a description of the issue, if relevant
691
692   """
693   hyper = hypervisor.GetHypervisor(instance.hypervisor)
694   iname = instance.name
695   if iname not in hyper.ListInstances():
696     _Fail("Instance %s is not running", iname)
697
698   for idx in range(len(instance.disks)):
699     link_name = _GetBlockDevSymlinkPath(iname, idx)
700     if not os.path.islink(link_name):
701       _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
702
703
704 def GetAllInstancesInfo(hypervisor_list):
705   """Gather data about all instances.
706
707   This is the equivalent of L{GetInstanceInfo}, except that it
708   computes data for all instances at once, thus being faster if one
709   needs data about more than one instance.
710
711   @type hypervisor_list: list
712   @param hypervisor_list: list of hypervisors to query for instance data
713
714   @rtype: dict
715   @return: dictionary of instance: data, with data having the following keys:
716       - memory: memory size of instance (int)
717       - state: xen state of instance (string)
718       - time: cpu time of instance (float)
719       - vcpus: the number of vcpus
720
721   """
722   output = {}
723
724   for hname in hypervisor_list:
725     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
726     if iinfo:
727       for name, _, memory, vcpus, state, times in iinfo:
728         value = {
729           'memory': memory,
730           'vcpus': vcpus,
731           'state': state,
732           'time': times,
733           }
734         if name in output:
735           # we only check static parameters, like memory and vcpus,
736           # and not state and time which can change between the
737           # invocations of the different hypervisors
738           for key in 'memory', 'vcpus':
739             if value[key] != output[name][key]:
740               _Fail("Instance %s is running twice"
741                     " with different parameters", name)
742         output[name] = value
743
744   return output
745
746
747 def InstanceOsAdd(instance, reinstall):
748   """Add an OS to an instance.
749
750   @type instance: L{objects.Instance}
751   @param instance: Instance whose OS is to be installed
752   @type reinstall: boolean
753   @param reinstall: whether this is an instance reinstall
754   @rtype: None
755
756   """
757   inst_os = OSFromDisk(instance.os)
758
759   create_env = OSEnvironment(instance, inst_os)
760   if reinstall:
761     create_env['INSTANCE_REINSTALL'] = "1"
762
763   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
764                                      instance.name, int(time.time()))
765
766   result = utils.RunCmd([inst_os.create_script], env=create_env,
767                         cwd=inst_os.path, output=logfile,)
768   if result.failed:
769     logging.error("os create command '%s' returned error: %s, logfile: %s,"
770                   " output: %s", result.cmd, result.fail_reason, logfile,
771                   result.output)
772     lines = [utils.SafeEncode(val)
773              for val in utils.TailFile(logfile, lines=20)]
774     _Fail("OS create script failed (%s), last lines in the"
775           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
776
777
778 def RunRenameInstance(instance, old_name):
779   """Run the OS rename script for an instance.
780
781   @type instance: L{objects.Instance}
782   @param instance: Instance whose OS is to be installed
783   @type old_name: string
784   @param old_name: previous instance name
785   @rtype: boolean
786   @return: the success of the operation
787
788   """
789   inst_os = OSFromDisk(instance.os)
790
791   rename_env = OSEnvironment(instance, inst_os)
792   rename_env['OLD_INSTANCE_NAME'] = old_name
793
794   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
795                                            old_name,
796                                            instance.name, int(time.time()))
797
798   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
799                         cwd=inst_os.path, output=logfile)
800
801   if result.failed:
802     logging.error("os create command '%s' returned error: %s output: %s",
803                   result.cmd, result.fail_reason, result.output)
804     lines = [utils.SafeEncode(val)
805              for val in utils.TailFile(logfile, lines=20)]
806     _Fail("OS rename script failed (%s), last lines in the"
807           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
808
809
810 def _GetVGInfo(vg_name):
811   """Get information about the volume group.
812
813   @type vg_name: str
814   @param vg_name: the volume group which we query
815   @rtype: dict
816   @return:
817     A dictionary with the following keys:
818       - C{vg_size} is the total size of the volume group in MiB
819       - C{vg_free} is the free size of the volume group in MiB
820       - C{pv_count} are the number of physical disks in that VG
821
822     If an error occurs during gathering of data, we return the same dict
823     with keys all set to None.
824
825   """
826   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
827
828   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
829                          "--nosuffix", "--units=m", "--separator=:", vg_name])
830
831   if retval.failed:
832     logging.error("volume group %s not present", vg_name)
833     return retdic
834   valarr = retval.stdout.strip().rstrip(':').split(':')
835   if len(valarr) == 3:
836     try:
837       retdic = {
838         "vg_size": int(round(float(valarr[0]), 0)),
839         "vg_free": int(round(float(valarr[1]), 0)),
840         "pv_count": int(valarr[2]),
841         }
842     except ValueError, err:
843       logging.exception("Fail to parse vgs output: %s", err)
844   else:
845     logging.error("vgs output has the wrong number of fields (expected"
846                   " three): %s", str(valarr))
847   return retdic
848
849
850 def _GetBlockDevSymlinkPath(instance_name, idx):
851   return os.path.join(constants.DISK_LINKS_DIR,
852                       "%s:%d" % (instance_name, idx))
853
854
855 def _SymlinkBlockDev(instance_name, device_path, idx):
856   """Set up symlinks to a instance's block device.
857
858   This is an auxiliary function run when an instance is start (on the primary
859   node) or when an instance is migrated (on the target node).
860
861
862   @param instance_name: the name of the target instance
863   @param device_path: path of the physical block device, on the node
864   @param idx: the disk index
865   @return: absolute path to the disk's symlink
866
867   """
868   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
869   try:
870     os.symlink(device_path, link_name)
871   except OSError, err:
872     if err.errno == errno.EEXIST:
873       if (not os.path.islink(link_name) or
874           os.readlink(link_name) != device_path):
875         os.remove(link_name)
876         os.symlink(device_path, link_name)
877     else:
878       raise
879
880   return link_name
881
882
883 def _RemoveBlockDevLinks(instance_name, disks):
884   """Remove the block device symlinks belonging to the given instance.
885
886   """
887   for idx, _ in enumerate(disks):
888     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
889     if os.path.islink(link_name):
890       try:
891         os.remove(link_name)
892       except OSError:
893         logging.exception("Can't remove symlink '%s'", link_name)
894
895
896 def _GatherAndLinkBlockDevs(instance):
897   """Set up an instance's block device(s).
898
899   This is run on the primary node at instance startup. The block
900   devices must be already assembled.
901
902   @type instance: L{objects.Instance}
903   @param instance: the instance whose disks we shoul assemble
904   @rtype: list
905   @return: list of (disk_object, device_path)
906
907   """
908   block_devices = []
909   for idx, disk in enumerate(instance.disks):
910     device = _RecursiveFindBD(disk)
911     if device is None:
912       raise errors.BlockDeviceError("Block device '%s' is not set up." %
913                                     str(disk))
914     device.Open()
915     try:
916       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
917     except OSError, e:
918       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
919                                     e.strerror)
920
921     block_devices.append((disk, link_name))
922
923   return block_devices
924
925
926 def StartInstance(instance):
927   """Start an instance.
928
929   @type instance: L{objects.Instance}
930   @param instance: the instance object
931   @rtype: None
932
933   """
934   running_instances = GetInstanceList([instance.hypervisor])
935
936   if instance.name in running_instances:
937     logging.info("Instance %s already running, not starting", instance.name)
938     return
939
940   try:
941     block_devices = _GatherAndLinkBlockDevs(instance)
942     hyper = hypervisor.GetHypervisor(instance.hypervisor)
943     hyper.StartInstance(instance, block_devices)
944   except errors.BlockDeviceError, err:
945     _Fail("Block device error: %s", err, exc=True)
946   except errors.HypervisorError, err:
947     _RemoveBlockDevLinks(instance.name, instance.disks)
948     _Fail("Hypervisor error: %s", err, exc=True)
949
950
951 def InstanceShutdown(instance):
952   """Shut an instance down.
953
954   @note: this functions uses polling with a hardcoded timeout.
955
956   @type instance: L{objects.Instance}
957   @param instance: the instance object
958   @rtype: None
959
960   """
961   hv_name = instance.hypervisor
962   running_instances = GetInstanceList([hv_name])
963   iname = instance.name
964
965   if iname not in running_instances:
966     logging.info("Instance %s not running, doing nothing", iname)
967     return
968
969   hyper = hypervisor.GetHypervisor(hv_name)
970   try:
971     hyper.StopInstance(instance)
972   except errors.HypervisorError, err:
973     _Fail("Failed to stop instance %s: %s", iname, err)
974
975   # test every 10secs for 2min
976
977   time.sleep(1)
978   for _ in range(11):
979     if instance.name not in GetInstanceList([hv_name]):
980       break
981     time.sleep(10)
982   else:
983     # the shutdown did not succeed
984     logging.error("Shutdown of '%s' unsuccessful, using destroy", iname)
985
986     try:
987       hyper.StopInstance(instance, force=True)
988     except errors.HypervisorError, err:
989       _Fail("Failed to force stop instance %s: %s", iname, err)
990
991     time.sleep(1)
992     if instance.name in GetInstanceList([hv_name]):
993       _Fail("Could not shutdown instance %s even by destroy", iname)
994
995   _RemoveBlockDevLinks(iname, instance.disks)
996
997
998 def InstanceReboot(instance, reboot_type):
999   """Reboot an instance.
1000
1001   @type instance: L{objects.Instance}
1002   @param instance: the instance object to reboot
1003   @type reboot_type: str
1004   @param reboot_type: the type of reboot, one the following
1005     constants:
1006       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1007         instance OS, do not recreate the VM
1008       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1009         restart the VM (at the hypervisor level)
1010       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1011         not accepted here, since that mode is handled differently, in
1012         cmdlib, and translates into full stop and start of the
1013         instance (instead of a call_instance_reboot RPC)
1014   @rtype: None
1015
1016   """
1017   running_instances = GetInstanceList([instance.hypervisor])
1018
1019   if instance.name not in running_instances:
1020     _Fail("Cannot reboot instance %s that is not running", instance.name)
1021
1022   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1023   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1024     try:
1025       hyper.RebootInstance(instance)
1026     except errors.HypervisorError, err:
1027       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1028   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1029     try:
1030       InstanceShutdown(instance)
1031       return StartInstance(instance)
1032     except errors.HypervisorError, err:
1033       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1034   else:
1035     _Fail("Invalid reboot_type received: %s", reboot_type)
1036
1037
1038 def MigrationInfo(instance):
1039   """Gather information about an instance to be migrated.
1040
1041   @type instance: L{objects.Instance}
1042   @param instance: the instance definition
1043
1044   """
1045   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1046   try:
1047     info = hyper.MigrationInfo(instance)
1048   except errors.HypervisorError, err:
1049     _Fail("Failed to fetch migration information: %s", err, exc=True)
1050   return info
1051
1052
1053 def AcceptInstance(instance, info, target):
1054   """Prepare the node to accept an instance.
1055
1056   @type instance: L{objects.Instance}
1057   @param instance: the instance definition
1058   @type info: string/data (opaque)
1059   @param info: migration information, from the source node
1060   @type target: string
1061   @param target: target host (usually ip), on this node
1062
1063   """
1064   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1065   try:
1066     hyper.AcceptInstance(instance, info, target)
1067   except errors.HypervisorError, err:
1068     _Fail("Failed to accept instance: %s", err, exc=True)
1069
1070
1071 def FinalizeMigration(instance, info, success):
1072   """Finalize any preparation to accept an instance.
1073
1074   @type instance: L{objects.Instance}
1075   @param instance: the instance definition
1076   @type info: string/data (opaque)
1077   @param info: migration information, from the source node
1078   @type success: boolean
1079   @param success: whether the migration was a success or a failure
1080
1081   """
1082   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1083   try:
1084     hyper.FinalizeMigration(instance, info, success)
1085   except errors.HypervisorError, err:
1086     _Fail("Failed to finalize migration: %s", err, exc=True)
1087
1088
1089 def MigrateInstance(instance, target, live):
1090   """Migrates an instance to another node.
1091
1092   @type instance: L{objects.Instance}
1093   @param instance: the instance definition
1094   @type target: string
1095   @param target: the target node name
1096   @type live: boolean
1097   @param live: whether the migration should be done live or not (the
1098       interpretation of this parameter is left to the hypervisor)
1099   @rtype: tuple
1100   @return: a tuple of (success, msg) where:
1101       - succes is a boolean denoting the success/failure of the operation
1102       - msg is a string with details in case of failure
1103
1104   """
1105   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1106
1107   try:
1108     hyper.MigrateInstance(instance.name, target, live)
1109   except errors.HypervisorError, err:
1110     _Fail("Failed to migrate instance: %s", err, exc=True)
1111
1112
1113 def BlockdevCreate(disk, size, owner, on_primary, info):
1114   """Creates a block device for an instance.
1115
1116   @type disk: L{objects.Disk}
1117   @param disk: the object describing the disk we should create
1118   @type size: int
1119   @param size: the size of the physical underlying device, in MiB
1120   @type owner: str
1121   @param owner: the name of the instance for which disk is created,
1122       used for device cache data
1123   @type on_primary: boolean
1124   @param on_primary:  indicates if it is the primary node or not
1125   @type info: string
1126   @param info: string that will be sent to the physical device
1127       creation, used for example to set (LVM) tags on LVs
1128
1129   @return: the new unique_id of the device (this can sometime be
1130       computed only after creation), or None. On secondary nodes,
1131       it's not required to return anything.
1132
1133   """
1134   clist = []
1135   if disk.children:
1136     for child in disk.children:
1137       try:
1138         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1139       except errors.BlockDeviceError, err:
1140         _Fail("Can't assemble device %s: %s", child, err)
1141       if on_primary or disk.AssembleOnSecondary():
1142         # we need the children open in case the device itself has to
1143         # be assembled
1144         try:
1145           crdev.Open()
1146         except errors.BlockDeviceError, err:
1147           _Fail("Can't make child '%s' read-write: %s", child, err)
1148       clist.append(crdev)
1149
1150   try:
1151     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1152   except errors.BlockDeviceError, err:
1153     _Fail("Can't create block device: %s", err)
1154
1155   if on_primary or disk.AssembleOnSecondary():
1156     try:
1157       device.Assemble()
1158     except errors.BlockDeviceError, err:
1159       _Fail("Can't assemble device after creation, unusual event: %s", err)
1160     device.SetSyncSpeed(constants.SYNC_SPEED)
1161     if on_primary or disk.OpenOnSecondary():
1162       try:
1163         device.Open(force=True)
1164       except errors.BlockDeviceError, err:
1165         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1166     DevCacheManager.UpdateCache(device.dev_path, owner,
1167                                 on_primary, disk.iv_name)
1168
1169   device.SetInfo(info)
1170
1171   return device.unique_id
1172
1173
1174 def BlockdevRemove(disk):
1175   """Remove a block device.
1176
1177   @note: This is intended to be called recursively.
1178
1179   @type disk: L{objects.Disk}
1180   @param disk: the disk object we should remove
1181   @rtype: boolean
1182   @return: the success of the operation
1183
1184   """
1185   msgs = []
1186   try:
1187     rdev = _RecursiveFindBD(disk)
1188   except errors.BlockDeviceError, err:
1189     # probably can't attach
1190     logging.info("Can't attach to device %s in remove", disk)
1191     rdev = None
1192   if rdev is not None:
1193     r_path = rdev.dev_path
1194     try:
1195       rdev.Remove()
1196     except errors.BlockDeviceError, err:
1197       msgs.append(str(err))
1198     if not msgs:
1199       DevCacheManager.RemoveCache(r_path)
1200
1201   if disk.children:
1202     for child in disk.children:
1203       try:
1204         BlockdevRemove(child)
1205       except RPCFail, err:
1206         msgs.append(str(err))
1207
1208   if msgs:
1209     _Fail("; ".join(msgs))
1210
1211
1212 def _RecursiveAssembleBD(disk, owner, as_primary):
1213   """Activate a block device for an instance.
1214
1215   This is run on the primary and secondary nodes for an instance.
1216
1217   @note: this function is called recursively.
1218
1219   @type disk: L{objects.Disk}
1220   @param disk: the disk we try to assemble
1221   @type owner: str
1222   @param owner: the name of the instance which owns the disk
1223   @type as_primary: boolean
1224   @param as_primary: if we should make the block device
1225       read/write
1226
1227   @return: the assembled device or None (in case no device
1228       was assembled)
1229   @raise errors.BlockDeviceError: in case there is an error
1230       during the activation of the children or the device
1231       itself
1232
1233   """
1234   children = []
1235   if disk.children:
1236     mcn = disk.ChildrenNeeded()
1237     if mcn == -1:
1238       mcn = 0 # max number of Nones allowed
1239     else:
1240       mcn = len(disk.children) - mcn # max number of Nones
1241     for chld_disk in disk.children:
1242       try:
1243         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1244       except errors.BlockDeviceError, err:
1245         if children.count(None) >= mcn:
1246           raise
1247         cdev = None
1248         logging.error("Error in child activation (but continuing): %s",
1249                       str(err))
1250       children.append(cdev)
1251
1252   if as_primary or disk.AssembleOnSecondary():
1253     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1254     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1255     result = r_dev
1256     if as_primary or disk.OpenOnSecondary():
1257       r_dev.Open()
1258     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1259                                 as_primary, disk.iv_name)
1260
1261   else:
1262     result = True
1263   return result
1264
1265
1266 def BlockdevAssemble(disk, owner, as_primary):
1267   """Activate a block device for an instance.
1268
1269   This is a wrapper over _RecursiveAssembleBD.
1270
1271   @rtype: str or boolean
1272   @return: a C{/dev/...} path for primary nodes, and
1273       C{True} for secondary nodes
1274
1275   """
1276   try:
1277     result = _RecursiveAssembleBD(disk, owner, as_primary)
1278     if isinstance(result, bdev.BlockDev):
1279       result = result.dev_path
1280   except errors.BlockDeviceError, err:
1281     _Fail("Error while assembling disk: %s", err, exc=True)
1282
1283   return result
1284
1285
1286 def BlockdevShutdown(disk):
1287   """Shut down a block device.
1288
1289   First, if the device is assembled (Attach() is successful), then
1290   the device is shutdown. Then the children of the device are
1291   shutdown.
1292
1293   This function is called recursively. Note that we don't cache the
1294   children or such, as oppossed to assemble, shutdown of different
1295   devices doesn't require that the upper device was active.
1296
1297   @type disk: L{objects.Disk}
1298   @param disk: the description of the disk we should
1299       shutdown
1300   @rtype: None
1301
1302   """
1303   msgs = []
1304   r_dev = _RecursiveFindBD(disk)
1305   if r_dev is not None:
1306     r_path = r_dev.dev_path
1307     try:
1308       r_dev.Shutdown()
1309       DevCacheManager.RemoveCache(r_path)
1310     except errors.BlockDeviceError, err:
1311       msgs.append(str(err))
1312
1313   if disk.children:
1314     for child in disk.children:
1315       try:
1316         BlockdevShutdown(child)
1317       except RPCFail, err:
1318         msgs.append(str(err))
1319
1320   if msgs:
1321     _Fail("; ".join(msgs))
1322
1323
1324 def BlockdevAddchildren(parent_cdev, new_cdevs):
1325   """Extend a mirrored block device.
1326
1327   @type parent_cdev: L{objects.Disk}
1328   @param parent_cdev: the disk to which we should add children
1329   @type new_cdevs: list of L{objects.Disk}
1330   @param new_cdevs: the list of children which we should add
1331   @rtype: None
1332
1333   """
1334   parent_bdev = _RecursiveFindBD(parent_cdev)
1335   if parent_bdev is None:
1336     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1337   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1338   if new_bdevs.count(None) > 0:
1339     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1340   parent_bdev.AddChildren(new_bdevs)
1341
1342
1343 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1344   """Shrink a mirrored block device.
1345
1346   @type parent_cdev: L{objects.Disk}
1347   @param parent_cdev: the disk from which we should remove children
1348   @type new_cdevs: list of L{objects.Disk}
1349   @param new_cdevs: the list of children which we should remove
1350   @rtype: None
1351
1352   """
1353   parent_bdev = _RecursiveFindBD(parent_cdev)
1354   if parent_bdev is None:
1355     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1356   devs = []
1357   for disk in new_cdevs:
1358     rpath = disk.StaticDevPath()
1359     if rpath is None:
1360       bd = _RecursiveFindBD(disk)
1361       if bd is None:
1362         _Fail("Can't find device %s while removing children", disk)
1363       else:
1364         devs.append(bd.dev_path)
1365     else:
1366       devs.append(rpath)
1367   parent_bdev.RemoveChildren(devs)
1368
1369
1370 def BlockdevGetmirrorstatus(disks):
1371   """Get the mirroring status of a list of devices.
1372
1373   @type disks: list of L{objects.Disk}
1374   @param disks: the list of disks which we should query
1375   @rtype: disk
1376   @return:
1377       a list of (mirror_done, estimated_time) tuples, which
1378       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1379   @raise errors.BlockDeviceError: if any of the disks cannot be
1380       found
1381
1382   """
1383   stats = []
1384   for dsk in disks:
1385     rbd = _RecursiveFindBD(dsk)
1386     if rbd is None:
1387       _Fail("Can't find device %s", dsk)
1388
1389     stats.append(rbd.CombinedSyncStatus())
1390
1391   return stats
1392
1393
1394 def _RecursiveFindBD(disk):
1395   """Check if a device is activated.
1396
1397   If so, return information about the real device.
1398
1399   @type disk: L{objects.Disk}
1400   @param disk: the disk object we need to find
1401
1402   @return: None if the device can't be found,
1403       otherwise the device instance
1404
1405   """
1406   children = []
1407   if disk.children:
1408     for chdisk in disk.children:
1409       children.append(_RecursiveFindBD(chdisk))
1410
1411   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1412
1413
1414 def BlockdevFind(disk):
1415   """Check if a device is activated.
1416
1417   If it is, return information about the real device.
1418
1419   @type disk: L{objects.Disk}
1420   @param disk: the disk to find
1421   @rtype: None or objects.BlockDevStatus
1422   @return: None if the disk cannot be found, otherwise a the current
1423            information
1424
1425   """
1426   try:
1427     rbd = _RecursiveFindBD(disk)
1428   except errors.BlockDeviceError, err:
1429     _Fail("Failed to find device: %s", err, exc=True)
1430
1431   if rbd is None:
1432     return None
1433
1434   return rbd.GetSyncStatus()
1435
1436
1437 def BlockdevGetsize(disks):
1438   """Computes the size of the given disks.
1439
1440   If a disk is not found, returns None instead.
1441
1442   @type disks: list of L{objects.Disk}
1443   @param disks: the list of disk to compute the size for
1444   @rtype: list
1445   @return: list with elements None if the disk cannot be found,
1446       otherwise the size
1447
1448   """
1449   result = []
1450   for cf in disks:
1451     try:
1452       rbd = _RecursiveFindBD(cf)
1453     except errors.BlockDeviceError, err:
1454       result.append(None)
1455       continue
1456     if rbd is None:
1457       result.append(None)
1458     else:
1459       result.append(rbd.GetActualSize())
1460   return result
1461
1462
1463 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1464   """Export a block device to a remote node.
1465
1466   @type disk: L{objects.Disk}
1467   @param disk: the description of the disk to export
1468   @type dest_node: str
1469   @param dest_node: the destination node to export to
1470   @type dest_path: str
1471   @param dest_path: the destination path on the target node
1472   @type cluster_name: str
1473   @param cluster_name: the cluster name, needed for SSH hostalias
1474   @rtype: None
1475
1476   """
1477   real_disk = _RecursiveFindBD(disk)
1478   if real_disk is None:
1479     _Fail("Block device '%s' is not set up", disk)
1480
1481   real_disk.Open()
1482
1483   # the block size on the read dd is 1MiB to match our units
1484   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1485                                "dd if=%s bs=1048576 count=%s",
1486                                real_disk.dev_path, str(disk.size))
1487
1488   # we set here a smaller block size as, due to ssh buffering, more
1489   # than 64-128k will mostly ignored; we use nocreat to fail if the
1490   # device is not already there or we pass a wrong path; we use
1491   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1492   # to not buffer too much memory; this means that at best, we flush
1493   # every 64k, which will not be very fast
1494   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1495                                 " oflag=dsync", dest_path)
1496
1497   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1498                                                    constants.GANETI_RUNAS,
1499                                                    destcmd)
1500
1501   # all commands have been checked, so we're safe to combine them
1502   command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1503
1504   result = utils.RunCmd(["bash", "-c", command])
1505
1506   if result.failed:
1507     _Fail("Disk copy command '%s' returned error: %s"
1508           " output: %s", command, result.fail_reason, result.output)
1509
1510
1511 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1512   """Write a file to the filesystem.
1513
1514   This allows the master to overwrite(!) a file. It will only perform
1515   the operation if the file belongs to a list of configuration files.
1516
1517   @type file_name: str
1518   @param file_name: the target file name
1519   @type data: str
1520   @param data: the new contents of the file
1521   @type mode: int
1522   @param mode: the mode to give the file (can be None)
1523   @type uid: int
1524   @param uid: the owner of the file (can be -1 for default)
1525   @type gid: int
1526   @param gid: the group of the file (can be -1 for default)
1527   @type atime: float
1528   @param atime: the atime to set on the file (can be None)
1529   @type mtime: float
1530   @param mtime: the mtime to set on the file (can be None)
1531   @rtype: None
1532
1533   """
1534   if not os.path.isabs(file_name):
1535     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1536
1537   if file_name not in _ALLOWED_UPLOAD_FILES:
1538     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1539           file_name)
1540
1541   raw_data = _Decompress(data)
1542
1543   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1544                   atime=atime, mtime=mtime)
1545
1546
1547 def WriteSsconfFiles(values):
1548   """Update all ssconf files.
1549
1550   Wrapper around the SimpleStore.WriteFiles.
1551
1552   """
1553   ssconf.SimpleStore().WriteFiles(values)
1554
1555
1556 def _ErrnoOrStr(err):
1557   """Format an EnvironmentError exception.
1558
1559   If the L{err} argument has an errno attribute, it will be looked up
1560   and converted into a textual C{E...} description. Otherwise the
1561   string representation of the error will be returned.
1562
1563   @type err: L{EnvironmentError}
1564   @param err: the exception to format
1565
1566   """
1567   if hasattr(err, 'errno'):
1568     detail = errno.errorcode[err.errno]
1569   else:
1570     detail = str(err)
1571   return detail
1572
1573
1574 def _OSOndiskAPIVersion(name, os_dir):
1575   """Compute and return the API version of a given OS.
1576
1577   This function will try to read the API version of the OS given by
1578   the 'name' parameter and residing in the 'os_dir' directory.
1579
1580   @type name: str
1581   @param name: the OS name we should look for
1582   @type os_dir: str
1583   @param os_dir: the directory inwhich we should look for the OS
1584   @rtype: tuple
1585   @return: tuple (status, data) with status denoting the validity and
1586       data holding either the vaid versions or an error message
1587
1588   """
1589   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1590
1591   try:
1592     st = os.stat(api_file)
1593   except EnvironmentError, err:
1594     return False, ("Required file 'ganeti_api_version' file not"
1595                    " found under path %s: %s" % (os_dir, _ErrnoOrStr(err)))
1596
1597   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1598     return False, ("File 'ganeti_api_version' file at %s is not"
1599                    " a regular file" % os_dir)
1600
1601   try:
1602     api_versions = utils.ReadFile(api_file).splitlines()
1603   except EnvironmentError, err:
1604     return False, ("Error while reading the API version file at %s: %s" %
1605                    (api_file, _ErrnoOrStr(err)))
1606
1607   try:
1608     api_versions = [int(version.strip()) for version in api_versions]
1609   except (TypeError, ValueError), err:
1610     return False, ("API version(s) can't be converted to integer: %s" %
1611                    str(err))
1612
1613   return True, api_versions
1614
1615
1616 def DiagnoseOS(top_dirs=None):
1617   """Compute the validity for all OSes.
1618
1619   @type top_dirs: list
1620   @param top_dirs: the list of directories in which to
1621       search (if not given defaults to
1622       L{constants.OS_SEARCH_PATH})
1623   @rtype: list of L{objects.OS}
1624   @return: a list of tuples (name, path, status, diagnose)
1625       for all (potential) OSes under all search paths, where:
1626           - name is the (potential) OS name
1627           - path is the full path to the OS
1628           - status True/False is the validity of the OS
1629           - diagnose is the error message for an invalid OS, otherwise empty
1630
1631   """
1632   if top_dirs is None:
1633     top_dirs = constants.OS_SEARCH_PATH
1634
1635   result = []
1636   for dir_name in top_dirs:
1637     if os.path.isdir(dir_name):
1638       try:
1639         f_names = utils.ListVisibleFiles(dir_name)
1640       except EnvironmentError, err:
1641         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1642         break
1643       for name in f_names:
1644         os_path = os.path.sep.join([dir_name, name])
1645         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1646         if status:
1647           diagnose = ""
1648         else:
1649           diagnose = os_inst
1650         result.append((name, os_path, status, diagnose))
1651
1652   return result
1653
1654
1655 def _TryOSFromDisk(name, base_dir=None):
1656   """Create an OS instance from disk.
1657
1658   This function will return an OS instance if the given name is a
1659   valid OS name.
1660
1661   @type base_dir: string
1662   @keyword base_dir: Base directory containing OS installations.
1663                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1664   @rtype: tuple
1665   @return: success and either the OS instance if we find a valid one,
1666       or error message
1667
1668   """
1669   if base_dir is None:
1670     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1671     if os_dir is None:
1672       return False, "Directory for OS %s not found in search path" % name
1673   else:
1674     os_dir = os.path.sep.join([base_dir, name])
1675
1676   status, api_versions = _OSOndiskAPIVersion(name, os_dir)
1677   if not status:
1678     # push the error up
1679     return status, api_versions
1680
1681   if not constants.OS_API_VERSIONS.intersection(api_versions):
1682     return False, ("API version mismatch for path '%s': found %s, want %s." %
1683                    (os_dir, api_versions, constants.OS_API_VERSIONS))
1684
1685   # OS Scripts dictionary, we will populate it with the actual script names
1686   os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1687
1688   for script in os_scripts:
1689     os_scripts[script] = os.path.sep.join([os_dir, script])
1690
1691     try:
1692       st = os.stat(os_scripts[script])
1693     except EnvironmentError, err:
1694       return False, ("Script '%s' under path '%s' is missing (%s)" %
1695                      (script, os_dir, _ErrnoOrStr(err)))
1696
1697     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1698       return False, ("Script '%s' under path '%s' is not executable" %
1699                      (script, os_dir))
1700
1701     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1702       return False, ("Script '%s' under path '%s' is not a regular file" %
1703                      (script, os_dir))
1704
1705   os_obj = objects.OS(name=name, path=os_dir,
1706                       create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1707                       export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1708                       import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1709                       rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1710                       api_versions=api_versions)
1711   return True, os_obj
1712
1713
1714 def OSFromDisk(name, base_dir=None):
1715   """Create an OS instance from disk.
1716
1717   This function will return an OS instance if the given name is a
1718   valid OS name. Otherwise, it will raise an appropriate
1719   L{RPCFail} exception, detailing why this is not a valid OS.
1720
1721   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1722   an exception but returns true/false status data.
1723
1724   @type base_dir: string
1725   @keyword base_dir: Base directory containing OS installations.
1726                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1727   @rtype: L{objects.OS}
1728   @return: the OS instance if we find a valid one
1729   @raise RPCFail: if we don't find a valid OS
1730
1731   """
1732   status, payload = _TryOSFromDisk(name, base_dir)
1733
1734   if not status:
1735     _Fail(payload)
1736
1737   return payload
1738
1739
1740 def OSEnvironment(instance, os, debug=0):
1741   """Calculate the environment for an os script.
1742
1743   @type instance: L{objects.Instance}
1744   @param instance: target instance for the os script run
1745   @type os: L{objects.OS}
1746   @param os: operating system for which the environment is being built
1747   @type debug: integer
1748   @param debug: debug level (0 or 1, for OS Api 10)
1749   @rtype: dict
1750   @return: dict of environment variables
1751   @raise errors.BlockDeviceError: if the block device
1752       cannot be found
1753
1754   """
1755   result = {}
1756   api_version = max(constants.OS_API_VERSIONS.intersection(os.api_versions))
1757   result['OS_API_VERSION'] = '%d' % api_version
1758   result['INSTANCE_NAME'] = instance.name
1759   result['INSTANCE_OS'] = instance.os
1760   result['HYPERVISOR'] = instance.hypervisor
1761   result['DISK_COUNT'] = '%d' % len(instance.disks)
1762   result['NIC_COUNT'] = '%d' % len(instance.nics)
1763   result['DEBUG_LEVEL'] = '%d' % debug
1764   for idx, disk in enumerate(instance.disks):
1765     real_disk = _RecursiveFindBD(disk)
1766     if real_disk is None:
1767       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1768                                     str(disk))
1769     real_disk.Open()
1770     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1771     result['DISK_%d_ACCESS' % idx] = disk.mode
1772     if constants.HV_DISK_TYPE in instance.hvparams:
1773       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1774         instance.hvparams[constants.HV_DISK_TYPE]
1775     if disk.dev_type in constants.LDS_BLOCK:
1776       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1777     elif disk.dev_type == constants.LD_FILE:
1778       result['DISK_%d_BACKEND_TYPE' % idx] = \
1779         'file:%s' % disk.physical_id[0]
1780   for idx, nic in enumerate(instance.nics):
1781     result['NIC_%d_MAC' % idx] = nic.mac
1782     if nic.ip:
1783       result['NIC_%d_IP' % idx] = nic.ip
1784     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1785     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1786       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1787     if nic.nicparams[constants.NIC_LINK]:
1788       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1789     if constants.HV_NIC_TYPE in instance.hvparams:
1790       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1791         instance.hvparams[constants.HV_NIC_TYPE]
1792
1793   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1794     for key, value in source.items():
1795       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1796
1797   return result
1798
1799 def BlockdevGrow(disk, amount):
1800   """Grow a stack of block devices.
1801
1802   This function is called recursively, with the childrens being the
1803   first ones to resize.
1804
1805   @type disk: L{objects.Disk}
1806   @param disk: the disk to be grown
1807   @rtype: (status, result)
1808   @return: a tuple with the status of the operation
1809       (True/False), and the errors message if status
1810       is False
1811
1812   """
1813   r_dev = _RecursiveFindBD(disk)
1814   if r_dev is None:
1815     _Fail("Cannot find block device %s", disk)
1816
1817   try:
1818     r_dev.Grow(amount)
1819   except errors.BlockDeviceError, err:
1820     _Fail("Failed to grow block device: %s", err, exc=True)
1821
1822
1823 def BlockdevSnapshot(disk):
1824   """Create a snapshot copy of a block device.
1825
1826   This function is called recursively, and the snapshot is actually created
1827   just for the leaf lvm backend device.
1828
1829   @type disk: L{objects.Disk}
1830   @param disk: the disk to be snapshotted
1831   @rtype: string
1832   @return: snapshot disk path
1833
1834   """
1835   if disk.children:
1836     if len(disk.children) == 1:
1837       # only one child, let's recurse on it
1838       return BlockdevSnapshot(disk.children[0])
1839     else:
1840       # more than one child, choose one that matches
1841       for child in disk.children:
1842         if child.size == disk.size:
1843           # return implies breaking the loop
1844           return BlockdevSnapshot(child)
1845   elif disk.dev_type == constants.LD_LV:
1846     r_dev = _RecursiveFindBD(disk)
1847     if r_dev is not None:
1848       # let's stay on the safe side and ask for the full size, for now
1849       return r_dev.Snapshot(disk.size)
1850     else:
1851       _Fail("Cannot find block device %s", disk)
1852   else:
1853     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1854           disk.unique_id, disk.dev_type)
1855
1856
1857 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1858   """Export a block device snapshot to a remote node.
1859
1860   @type disk: L{objects.Disk}
1861   @param disk: the description of the disk to export
1862   @type dest_node: str
1863   @param dest_node: the destination node to export to
1864   @type instance: L{objects.Instance}
1865   @param instance: the instance object to whom the disk belongs
1866   @type cluster_name: str
1867   @param cluster_name: the cluster name, needed for SSH hostalias
1868   @type idx: int
1869   @param idx: the index of the disk in the instance's disk list,
1870       used to export to the OS scripts environment
1871   @rtype: None
1872
1873   """
1874   inst_os = OSFromDisk(instance.os)
1875   export_env = OSEnvironment(instance, inst_os)
1876
1877   export_script = inst_os.export_script
1878
1879   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1880                                      instance.name, int(time.time()))
1881   if not os.path.exists(constants.LOG_OS_DIR):
1882     os.mkdir(constants.LOG_OS_DIR, 0750)
1883   real_disk = _RecursiveFindBD(disk)
1884   if real_disk is None:
1885     _Fail("Block device '%s' is not set up", disk)
1886
1887   real_disk.Open()
1888
1889   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1890   export_env['EXPORT_INDEX'] = str(idx)
1891
1892   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1893   destfile = disk.physical_id[1]
1894
1895   # the target command is built out of three individual commands,
1896   # which are joined by pipes; we check each individual command for
1897   # valid parameters
1898   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
1899                                inst_os.path, export_script, logfile)
1900
1901   comprcmd = "gzip"
1902
1903   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1904                                 destdir, destdir, destfile)
1905   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1906                                                    constants.GANETI_RUNAS,
1907                                                    destcmd)
1908
1909   # all commands have been checked, so we're safe to combine them
1910   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1911
1912   result = utils.RunCmd(["bash", "-c", command], env=export_env)
1913
1914   if result.failed:
1915     _Fail("OS snapshot export command '%s' returned error: %s"
1916           " output: %s", command, result.fail_reason, result.output)
1917
1918
1919 def FinalizeExport(instance, snap_disks):
1920   """Write out the export configuration information.
1921
1922   @type instance: L{objects.Instance}
1923   @param instance: the instance which we export, used for
1924       saving configuration
1925   @type snap_disks: list of L{objects.Disk}
1926   @param snap_disks: list of snapshot block devices, which
1927       will be used to get the actual name of the dump file
1928
1929   @rtype: None
1930
1931   """
1932   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1933   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1934
1935   config = objects.SerializableConfigParser()
1936
1937   config.add_section(constants.INISECT_EXP)
1938   config.set(constants.INISECT_EXP, 'version', '0')
1939   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1940   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1941   config.set(constants.INISECT_EXP, 'os', instance.os)
1942   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1943
1944   config.add_section(constants.INISECT_INS)
1945   config.set(constants.INISECT_INS, 'name', instance.name)
1946   config.set(constants.INISECT_INS, 'memory', '%d' %
1947              instance.beparams[constants.BE_MEMORY])
1948   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1949              instance.beparams[constants.BE_VCPUS])
1950   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1951
1952   nic_total = 0
1953   for nic_count, nic in enumerate(instance.nics):
1954     nic_total += 1
1955     config.set(constants.INISECT_INS, 'nic%d_mac' %
1956                nic_count, '%s' % nic.mac)
1957     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1958     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1959                '%s' % nic.bridge)
1960   # TODO: redundant: on load can read nics until it doesn't exist
1961   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1962
1963   disk_total = 0
1964   for disk_count, disk in enumerate(snap_disks):
1965     if disk:
1966       disk_total += 1
1967       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1968                  ('%s' % disk.iv_name))
1969       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1970                  ('%s' % disk.physical_id[1]))
1971       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1972                  ('%d' % disk.size))
1973
1974   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1975
1976   utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1977                   data=config.Dumps())
1978   shutil.rmtree(finaldestdir, True)
1979   shutil.move(destdir, finaldestdir)
1980
1981
1982 def ExportInfo(dest):
1983   """Get export configuration information.
1984
1985   @type dest: str
1986   @param dest: directory containing the export
1987
1988   @rtype: L{objects.SerializableConfigParser}
1989   @return: a serializable config file containing the
1990       export info
1991
1992   """
1993   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1994
1995   config = objects.SerializableConfigParser()
1996   config.read(cff)
1997
1998   if (not config.has_section(constants.INISECT_EXP) or
1999       not config.has_section(constants.INISECT_INS)):
2000     _Fail("Export info file doesn't have the required fields")
2001
2002   return config.Dumps()
2003
2004
2005 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
2006   """Import an os image into an instance.
2007
2008   @type instance: L{objects.Instance}
2009   @param instance: instance to import the disks into
2010   @type src_node: string
2011   @param src_node: source node for the disk images
2012   @type src_images: list of string
2013   @param src_images: absolute paths of the disk images
2014   @rtype: list of boolean
2015   @return: each boolean represent the success of importing the n-th disk
2016
2017   """
2018   inst_os = OSFromDisk(instance.os)
2019   import_env = OSEnvironment(instance, inst_os)
2020   import_script = inst_os.import_script
2021
2022   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
2023                                         instance.name, int(time.time()))
2024   if not os.path.exists(constants.LOG_OS_DIR):
2025     os.mkdir(constants.LOG_OS_DIR, 0750)
2026
2027   comprcmd = "gunzip"
2028   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
2029                                import_script, logfile)
2030
2031   final_result = []
2032   for idx, image in enumerate(src_images):
2033     if image:
2034       destcmd = utils.BuildShellCmd('cat %s', image)
2035       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
2036                                                        constants.GANETI_RUNAS,
2037                                                        destcmd)
2038       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
2039       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
2040       import_env['IMPORT_INDEX'] = str(idx)
2041       result = utils.RunCmd(command, env=import_env)
2042       if result.failed:
2043         logging.error("Disk import command '%s' returned error: %s"
2044                       " output: %s", command, result.fail_reason,
2045                       result.output)
2046         final_result.append("error importing disk %d: %s, %s" %
2047                             (idx, result.fail_reason, result.output[-100]))
2048
2049   if final_result:
2050     _Fail("; ".join(final_result), log=False)
2051
2052
2053 def ListExports():
2054   """Return a list of exports currently available on this machine.
2055
2056   @rtype: list
2057   @return: list of the exports
2058
2059   """
2060   if os.path.isdir(constants.EXPORT_DIR):
2061     return utils.ListVisibleFiles(constants.EXPORT_DIR)
2062   else:
2063     _Fail("No exports directory")
2064
2065
2066 def RemoveExport(export):
2067   """Remove an existing export from the node.
2068
2069   @type export: str
2070   @param export: the name of the export to remove
2071   @rtype: None
2072
2073   """
2074   target = os.path.join(constants.EXPORT_DIR, export)
2075
2076   try:
2077     shutil.rmtree(target)
2078   except EnvironmentError, err:
2079     _Fail("Error while removing the export: %s", err, exc=True)
2080
2081
2082 def BlockdevRename(devlist):
2083   """Rename a list of block devices.
2084
2085   @type devlist: list of tuples
2086   @param devlist: list of tuples of the form  (disk,
2087       new_logical_id, new_physical_id); disk is an
2088       L{objects.Disk} object describing the current disk,
2089       and new logical_id/physical_id is the name we
2090       rename it to
2091   @rtype: boolean
2092   @return: True if all renames succeeded, False otherwise
2093
2094   """
2095   msgs = []
2096   result = True
2097   for disk, unique_id in devlist:
2098     dev = _RecursiveFindBD(disk)
2099     if dev is None:
2100       msgs.append("Can't find device %s in rename" % str(disk))
2101       result = False
2102       continue
2103     try:
2104       old_rpath = dev.dev_path
2105       dev.Rename(unique_id)
2106       new_rpath = dev.dev_path
2107       if old_rpath != new_rpath:
2108         DevCacheManager.RemoveCache(old_rpath)
2109         # FIXME: we should add the new cache information here, like:
2110         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2111         # but we don't have the owner here - maybe parse from existing
2112         # cache? for now, we only lose lvm data when we rename, which
2113         # is less critical than DRBD or MD
2114     except errors.BlockDeviceError, err:
2115       msgs.append("Can't rename device '%s' to '%s': %s" %
2116                   (dev, unique_id, err))
2117       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2118       result = False
2119   if not result:
2120     _Fail("; ".join(msgs))
2121
2122
2123 def _TransformFileStorageDir(file_storage_dir):
2124   """Checks whether given file_storage_dir is valid.
2125
2126   Checks wheter the given file_storage_dir is within the cluster-wide
2127   default file_storage_dir stored in SimpleStore. Only paths under that
2128   directory are allowed.
2129
2130   @type file_storage_dir: str
2131   @param file_storage_dir: the path to check
2132
2133   @return: the normalized path if valid, None otherwise
2134
2135   """
2136   cfg = _GetConfig()
2137   file_storage_dir = os.path.normpath(file_storage_dir)
2138   base_file_storage_dir = cfg.GetFileStorageDir()
2139   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2140       base_file_storage_dir):
2141     _Fail("File storage directory '%s' is not under base file"
2142           " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2143   return file_storage_dir
2144
2145
2146 def CreateFileStorageDir(file_storage_dir):
2147   """Create file storage directory.
2148
2149   @type file_storage_dir: str
2150   @param file_storage_dir: directory to create
2151
2152   @rtype: tuple
2153   @return: tuple with first element a boolean indicating wheter dir
2154       creation was successful or not
2155
2156   """
2157   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2158   if os.path.exists(file_storage_dir):
2159     if not os.path.isdir(file_storage_dir):
2160       _Fail("Specified storage dir '%s' is not a directory",
2161             file_storage_dir)
2162   else:
2163     try:
2164       os.makedirs(file_storage_dir, 0750)
2165     except OSError, err:
2166       _Fail("Cannot create file storage directory '%s': %s",
2167             file_storage_dir, err, exc=True)
2168
2169
2170 def RemoveFileStorageDir(file_storage_dir):
2171   """Remove file storage directory.
2172
2173   Remove it only if it's empty. If not log an error and return.
2174
2175   @type file_storage_dir: str
2176   @param file_storage_dir: the directory we should cleanup
2177   @rtype: tuple (success,)
2178   @return: tuple of one element, C{success}, denoting
2179       whether the operation was successful
2180
2181   """
2182   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2183   if os.path.exists(file_storage_dir):
2184     if not os.path.isdir(file_storage_dir):
2185       _Fail("Specified Storage directory '%s' is not a directory",
2186             file_storage_dir)
2187     # deletes dir only if empty, otherwise we want to fail the rpc call
2188     try:
2189       os.rmdir(file_storage_dir)
2190     except OSError, err:
2191       _Fail("Cannot remove file storage directory '%s': %s",
2192             file_storage_dir, err)
2193
2194
2195 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2196   """Rename the file storage directory.
2197
2198   @type old_file_storage_dir: str
2199   @param old_file_storage_dir: the current path
2200   @type new_file_storage_dir: str
2201   @param new_file_storage_dir: the name we should rename to
2202   @rtype: tuple (success,)
2203   @return: tuple of one element, C{success}, denoting
2204       whether the operation was successful
2205
2206   """
2207   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2208   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2209   if not os.path.exists(new_file_storage_dir):
2210     if os.path.isdir(old_file_storage_dir):
2211       try:
2212         os.rename(old_file_storage_dir, new_file_storage_dir)
2213       except OSError, err:
2214         _Fail("Cannot rename '%s' to '%s': %s",
2215               old_file_storage_dir, new_file_storage_dir, err)
2216     else:
2217       _Fail("Specified storage dir '%s' is not a directory",
2218             old_file_storage_dir)
2219   else:
2220     if os.path.exists(old_file_storage_dir):
2221       _Fail("Cannot rename '%s' to '%s': both locations exist",
2222             old_file_storage_dir, new_file_storage_dir)
2223
2224
2225 def _EnsureJobQueueFile(file_name):
2226   """Checks whether the given filename is in the queue directory.
2227
2228   @type file_name: str
2229   @param file_name: the file name we should check
2230   @rtype: None
2231   @raises RPCFail: if the file is not valid
2232
2233   """
2234   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2235   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2236
2237   if not result:
2238     _Fail("Passed job queue file '%s' does not belong to"
2239           " the queue directory '%s'", file_name, queue_dir)
2240
2241
2242 def JobQueueUpdate(file_name, content):
2243   """Updates a file in the queue directory.
2244
2245   This is just a wrapper over L{utils.WriteFile}, with proper
2246   checking.
2247
2248   @type file_name: str
2249   @param file_name: the job file name
2250   @type content: str
2251   @param content: the new job contents
2252   @rtype: boolean
2253   @return: the success of the operation
2254
2255   """
2256   _EnsureJobQueueFile(file_name)
2257
2258   # Write and replace the file atomically
2259   utils.WriteFile(file_name, data=_Decompress(content))
2260
2261
2262 def JobQueueRename(old, new):
2263   """Renames a job queue file.
2264
2265   This is just a wrapper over os.rename with proper checking.
2266
2267   @type old: str
2268   @param old: the old (actual) file name
2269   @type new: str
2270   @param new: the desired file name
2271   @rtype: tuple
2272   @return: the success of the operation and payload
2273
2274   """
2275   _EnsureJobQueueFile(old)
2276   _EnsureJobQueueFile(new)
2277
2278   utils.RenameFile(old, new, mkdir=True)
2279
2280
2281 def JobQueueSetDrainFlag(drain_flag):
2282   """Set the drain flag for the queue.
2283
2284   This will set or unset the queue drain flag.
2285
2286   @type drain_flag: boolean
2287   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2288   @rtype: truple
2289   @return: always True, None
2290   @warning: the function always returns True
2291
2292   """
2293   if drain_flag:
2294     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2295   else:
2296     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2297
2298
2299 def BlockdevClose(instance_name, disks):
2300   """Closes the given block devices.
2301
2302   This means they will be switched to secondary mode (in case of
2303   DRBD).
2304
2305   @param instance_name: if the argument is not empty, the symlinks
2306       of this instance will be removed
2307   @type disks: list of L{objects.Disk}
2308   @param disks: the list of disks to be closed
2309   @rtype: tuple (success, message)
2310   @return: a tuple of success and message, where success
2311       indicates the succes of the operation, and message
2312       which will contain the error details in case we
2313       failed
2314
2315   """
2316   bdevs = []
2317   for cf in disks:
2318     rd = _RecursiveFindBD(cf)
2319     if rd is None:
2320       _Fail("Can't find device %s", cf)
2321     bdevs.append(rd)
2322
2323   msg = []
2324   for rd in bdevs:
2325     try:
2326       rd.Close()
2327     except errors.BlockDeviceError, err:
2328       msg.append(str(err))
2329   if msg:
2330     _Fail("Can't make devices secondary: %s", ",".join(msg))
2331   else:
2332     if instance_name:
2333       _RemoveBlockDevLinks(instance_name, disks)
2334
2335
2336 def ValidateHVParams(hvname, hvparams):
2337   """Validates the given hypervisor parameters.
2338
2339   @type hvname: string
2340   @param hvname: the hypervisor name
2341   @type hvparams: dict
2342   @param hvparams: the hypervisor parameters to be validated
2343   @rtype: None
2344
2345   """
2346   try:
2347     hv_type = hypervisor.GetHypervisor(hvname)
2348     hv_type.ValidateParameters(hvparams)
2349   except errors.HypervisorError, err:
2350     _Fail(str(err), log=False)
2351
2352
2353 def DemoteFromMC():
2354   """Demotes the current node from master candidate role.
2355
2356   """
2357   # try to ensure we're not the master by mistake
2358   master, myself = ssconf.GetMasterAndMyself()
2359   if master == myself:
2360     _Fail("ssconf status shows I'm the master node, will not demote")
2361   pid_file = utils.DaemonPidFileName(constants.MASTERD)
2362   if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2363     _Fail("The master daemon is running, will not demote")
2364   try:
2365     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2366       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2367   except EnvironmentError, err:
2368     if err.errno != errno.ENOENT:
2369       _Fail("Error while backing up cluster file: %s", err, exc=True)
2370   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2371
2372
2373 def _FindDisks(nodes_ip, disks):
2374   """Sets the physical ID on disks and returns the block devices.
2375
2376   """
2377   # set the correct physical ID
2378   my_name = utils.HostInfo().name
2379   for cf in disks:
2380     cf.SetPhysicalID(my_name, nodes_ip)
2381
2382   bdevs = []
2383
2384   for cf in disks:
2385     rd = _RecursiveFindBD(cf)
2386     if rd is None:
2387       _Fail("Can't find device %s", cf)
2388     bdevs.append(rd)
2389   return bdevs
2390
2391
2392 def DrbdDisconnectNet(nodes_ip, disks):
2393   """Disconnects the network on a list of drbd devices.
2394
2395   """
2396   bdevs = _FindDisks(nodes_ip, disks)
2397
2398   # disconnect disks
2399   for rd in bdevs:
2400     try:
2401       rd.DisconnectNet()
2402     except errors.BlockDeviceError, err:
2403       _Fail("Can't change network configuration to standalone mode: %s",
2404             err, exc=True)
2405
2406
2407 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2408   """Attaches the network on a list of drbd devices.
2409
2410   """
2411   bdevs = _FindDisks(nodes_ip, disks)
2412
2413   if multimaster:
2414     for idx, rd in enumerate(bdevs):
2415       try:
2416         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2417       except EnvironmentError, err:
2418         _Fail("Can't create symlink: %s", err)
2419   # reconnect disks, switch to new master configuration and if
2420   # needed primary mode
2421   for rd in bdevs:
2422     try:
2423       rd.AttachNet(multimaster)
2424     except errors.BlockDeviceError, err:
2425       _Fail("Can't change network configuration: %s", err)
2426   # wait until the disks are connected; we need to retry the re-attach
2427   # if the device becomes standalone, as this might happen if the one
2428   # node disconnects and reconnects in a different mode before the
2429   # other node reconnects; in this case, one or both of the nodes will
2430   # decide it has wrong configuration and switch to standalone
2431   RECONNECT_TIMEOUT = 2 * 60
2432   sleep_time = 0.100 # start with 100 miliseconds
2433   timeout_limit = time.time() + RECONNECT_TIMEOUT
2434   while time.time() < timeout_limit:
2435     all_connected = True
2436     for rd in bdevs:
2437       stats = rd.GetProcStatus()
2438       if not (stats.is_connected or stats.is_in_resync):
2439         all_connected = False
2440       if stats.is_standalone:
2441         # peer had different config info and this node became
2442         # standalone, even though this should not happen with the
2443         # new staged way of changing disk configs
2444         try:
2445           rd.AttachNet(multimaster)
2446         except errors.BlockDeviceError, err:
2447           _Fail("Can't change network configuration: %s", err)
2448     if all_connected:
2449       break
2450     time.sleep(sleep_time)
2451     sleep_time = min(5, sleep_time * 1.5)
2452   if not all_connected:
2453     _Fail("Timeout in disk reconnecting")
2454   if multimaster:
2455     # change to primary mode
2456     for rd in bdevs:
2457       try:
2458         rd.Open()
2459       except errors.BlockDeviceError, err:
2460         _Fail("Can't change to primary mode: %s", err)
2461
2462
2463 def DrbdWaitSync(nodes_ip, disks):
2464   """Wait until DRBDs have synchronized.
2465
2466   """
2467   bdevs = _FindDisks(nodes_ip, disks)
2468
2469   min_resync = 100
2470   alldone = True
2471   for rd in bdevs:
2472     stats = rd.GetProcStatus()
2473     if not (stats.is_connected or stats.is_in_resync):
2474       _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2475     alldone = alldone and (not stats.is_in_resync)
2476     if stats.sync_percent is not None:
2477       min_resync = min(min_resync, stats.sync_percent)
2478
2479   return (alldone, min_resync)
2480
2481
2482 def PowercycleNode(hypervisor_type):
2483   """Hard-powercycle the node.
2484
2485   Because we need to return first, and schedule the powercycle in the
2486   background, we won't be able to report failures nicely.
2487
2488   """
2489   hyper = hypervisor.GetHypervisor(hypervisor_type)
2490   try:
2491     pid = os.fork()
2492   except OSError:
2493     # if we can't fork, we'll pretend that we're in the child process
2494     pid = 0
2495   if pid > 0:
2496     return "Reboot scheduled in 5 seconds"
2497   time.sleep(5)
2498   hyper.PowercycleNode()
2499
2500
2501 class HooksRunner(object):
2502   """Hook runner.
2503
2504   This class is instantiated on the node side (ganeti-noded) and not
2505   on the master side.
2506
2507   """
2508   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2509
2510   def __init__(self, hooks_base_dir=None):
2511     """Constructor for hooks runner.
2512
2513     @type hooks_base_dir: str or None
2514     @param hooks_base_dir: if not None, this overrides the
2515         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2516
2517     """
2518     if hooks_base_dir is None:
2519       hooks_base_dir = constants.HOOKS_BASE_DIR
2520     self._BASE_DIR = hooks_base_dir
2521
2522   @staticmethod
2523   def ExecHook(script, env):
2524     """Exec one hook script.
2525
2526     @type script: str
2527     @param script: the full path to the script
2528     @type env: dict
2529     @param env: the environment with which to exec the script
2530     @rtype: tuple (success, message)
2531     @return: a tuple of success and message, where success
2532         indicates the succes of the operation, and message
2533         which will contain the error details in case we
2534         failed
2535
2536     """
2537     # exec the process using subprocess and log the output
2538     fdstdin = None
2539     try:
2540       fdstdin = open("/dev/null", "r")
2541       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2542                                stderr=subprocess.STDOUT, close_fds=True,
2543                                shell=False, cwd="/", env=env)
2544       output = ""
2545       try:
2546         output = child.stdout.read(4096)
2547         child.stdout.close()
2548       except EnvironmentError, err:
2549         output += "Hook script error: %s" % str(err)
2550
2551       while True:
2552         try:
2553           result = child.wait()
2554           break
2555         except EnvironmentError, err:
2556           if err.errno == errno.EINTR:
2557             continue
2558           raise
2559     finally:
2560       # try not to leak fds
2561       for fd in (fdstdin, ):
2562         if fd is not None:
2563           try:
2564             fd.close()
2565           except EnvironmentError, err:
2566             # just log the error
2567             #logging.exception("Error while closing fd %s", fd)
2568             pass
2569
2570     return result == 0, utils.SafeEncode(output.strip())
2571
2572   def RunHooks(self, hpath, phase, env):
2573     """Run the scripts in the hooks directory.
2574
2575     @type hpath: str
2576     @param hpath: the path to the hooks directory which
2577         holds the scripts
2578     @type phase: str
2579     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2580         L{constants.HOOKS_PHASE_POST}
2581     @type env: dict
2582     @param env: dictionary with the environment for the hook
2583     @rtype: list
2584     @return: list of 3-element tuples:
2585       - script path
2586       - script result, either L{constants.HKR_SUCCESS} or
2587         L{constants.HKR_FAIL}
2588       - output of the script
2589
2590     @raise errors.ProgrammerError: for invalid input
2591         parameters
2592
2593     """
2594     if phase == constants.HOOKS_PHASE_PRE:
2595       suffix = "pre"
2596     elif phase == constants.HOOKS_PHASE_POST:
2597       suffix = "post"
2598     else:
2599       _Fail("Unknown hooks phase '%s'", phase)
2600
2601     rr = []
2602
2603     subdir = "%s-%s.d" % (hpath, suffix)
2604     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2605     try:
2606       dir_contents = utils.ListVisibleFiles(dir_name)
2607     except OSError:
2608       # FIXME: must log output in case of failures
2609       return rr
2610
2611     # we use the standard python sort order,
2612     # so 00name is the recommended naming scheme
2613     dir_contents.sort()
2614     for relname in dir_contents:
2615       fname = os.path.join(dir_name, relname)
2616       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2617           self.RE_MASK.match(relname) is not None):
2618         rrval = constants.HKR_SKIP
2619         output = ""
2620       else:
2621         result, output = self.ExecHook(fname, env)
2622         if not result:
2623           rrval = constants.HKR_FAIL
2624         else:
2625           rrval = constants.HKR_SUCCESS
2626       rr.append(("%s/%s" % (subdir, relname), rrval, output))
2627
2628     return rr
2629
2630
2631 class IAllocatorRunner(object):
2632   """IAllocator runner.
2633
2634   This class is instantiated on the node side (ganeti-noded) and not on
2635   the master side.
2636
2637   """
2638   def Run(self, name, idata):
2639     """Run an iallocator script.
2640
2641     @type name: str
2642     @param name: the iallocator script name
2643     @type idata: str
2644     @param idata: the allocator input data
2645
2646     @rtype: tuple
2647     @return: two element tuple of:
2648        - status
2649        - either error message or stdout of allocator (for success)
2650
2651     """
2652     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2653                                   os.path.isfile)
2654     if alloc_script is None:
2655       _Fail("iallocator module '%s' not found in the search path", name)
2656
2657     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2658     try:
2659       os.write(fd, idata)
2660       os.close(fd)
2661       result = utils.RunCmd([alloc_script, fin_name])
2662       if result.failed:
2663         _Fail("iallocator module '%s' failed: %s, output '%s'",
2664               name, result.fail_reason, result.output)
2665     finally:
2666       os.unlink(fin_name)
2667
2668     return result.stdout
2669
2670
2671 class DevCacheManager(object):
2672   """Simple class for managing a cache of block device information.
2673
2674   """
2675   _DEV_PREFIX = "/dev/"
2676   _ROOT_DIR = constants.BDEV_CACHE_DIR
2677
2678   @classmethod
2679   def _ConvertPath(cls, dev_path):
2680     """Converts a /dev/name path to the cache file name.
2681
2682     This replaces slashes with underscores and strips the /dev
2683     prefix. It then returns the full path to the cache file.
2684
2685     @type dev_path: str
2686     @param dev_path: the C{/dev/} path name
2687     @rtype: str
2688     @return: the converted path name
2689
2690     """
2691     if dev_path.startswith(cls._DEV_PREFIX):
2692       dev_path = dev_path[len(cls._DEV_PREFIX):]
2693     dev_path = dev_path.replace("/", "_")
2694     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2695     return fpath
2696
2697   @classmethod
2698   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2699     """Updates the cache information for a given device.
2700
2701     @type dev_path: str
2702     @param dev_path: the pathname of the device
2703     @type owner: str
2704     @param owner: the owner (instance name) of the device
2705     @type on_primary: bool
2706     @param on_primary: whether this is the primary
2707         node nor not
2708     @type iv_name: str
2709     @param iv_name: the instance-visible name of the
2710         device, as in objects.Disk.iv_name
2711
2712     @rtype: None
2713
2714     """
2715     if dev_path is None:
2716       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2717       return
2718     fpath = cls._ConvertPath(dev_path)
2719     if on_primary:
2720       state = "primary"
2721     else:
2722       state = "secondary"
2723     if iv_name is None:
2724       iv_name = "not_visible"
2725     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2726     try:
2727       utils.WriteFile(fpath, data=fdata)
2728     except EnvironmentError, err:
2729       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
2730
2731   @classmethod
2732   def RemoveCache(cls, dev_path):
2733     """Remove data for a dev_path.
2734
2735     This is just a wrapper over L{utils.RemoveFile} with a converted
2736     path name and logging.
2737
2738     @type dev_path: str
2739     @param dev_path: the pathname of the device
2740
2741     @rtype: None
2742
2743     """
2744     if dev_path is None:
2745       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2746       return
2747     fpath = cls._ConvertPath(dev_path)
2748     try:
2749       utils.RemoveFile(fpath)
2750     except EnvironmentError, err:
2751       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)