Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 6a1434d7

History | View | Annotate | Download (108 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable=E1103
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37

    
38
import os
39
import os.path
40
import shutil
41
import time
42
import stat
43
import errno
44
import re
45
import random
46
import logging
47
import tempfile
48
import zlib
49
import base64
50
import signal
51

    
52
from ganeti import errors
53
from ganeti import utils
54
from ganeti import ssh
55
from ganeti import hypervisor
56
from ganeti import constants
57
from ganeti import bdev
58
from ganeti import objects
59
from ganeti import ssconf
60
from ganeti import serializer
61
from ganeti import netutils
62
from ganeti import runtime
63

    
64

    
65
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
66
_ALLOWED_CLEAN_DIRS = frozenset([
67
  constants.DATA_DIR,
68
  constants.JOB_QUEUE_ARCHIVE_DIR,
69
  constants.QUEUE_DIR,
70
  constants.CRYPTO_KEYS_DIR,
71
  ])
72
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
73
_X509_KEY_FILE = "key"
74
_X509_CERT_FILE = "cert"
75
_IES_STATUS_FILE = "status"
76
_IES_PID_FILE = "pid"
77
_IES_CA_FILE = "ca"
78

    
79
#: Valid LVS output line regex
80
_LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
81

    
82

    
83
class RPCFail(Exception):
84
  """Class denoting RPC failure.
85

86
  Its argument is the error message.
87

88
  """
89

    
90

    
91
def _Fail(msg, *args, **kwargs):
92
  """Log an error and the raise an RPCFail exception.
93

94
  This exception is then handled specially in the ganeti daemon and
95
  turned into a 'failed' return type. As such, this function is a
96
  useful shortcut for logging the error and returning it to the master
97
  daemon.
98

99
  @type msg: string
100
  @param msg: the text of the exception
101
  @raise RPCFail
102

103
  """
104
  if args:
105
    msg = msg % args
106
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
107
    if "exc" in kwargs and kwargs["exc"]:
108
      logging.exception(msg)
109
    else:
110
      logging.error(msg)
111
  raise RPCFail(msg)
112

    
113

    
114
def _GetConfig():
115
  """Simple wrapper to return a SimpleStore.
116

117
  @rtype: L{ssconf.SimpleStore}
118
  @return: a SimpleStore instance
119

120
  """
121
  return ssconf.SimpleStore()
122

    
123

    
124
def _GetSshRunner(cluster_name):
125
  """Simple wrapper to return an SshRunner.
126

127
  @type cluster_name: str
128
  @param cluster_name: the cluster name, which is needed
129
      by the SshRunner constructor
130
  @rtype: L{ssh.SshRunner}
131
  @return: an SshRunner instance
132

133
  """
134
  return ssh.SshRunner(cluster_name)
135

    
136

    
137
def _Decompress(data):
138
  """Unpacks data compressed by the RPC client.
139

140
  @type data: list or tuple
141
  @param data: Data sent by RPC client
142
  @rtype: str
143
  @return: Decompressed data
144

145
  """
146
  assert isinstance(data, (list, tuple))
147
  assert len(data) == 2
148
  (encoding, content) = data
149
  if encoding == constants.RPC_ENCODING_NONE:
150
    return content
151
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
152
    return zlib.decompress(base64.b64decode(content))
153
  else:
154
    raise AssertionError("Unknown data encoding")
155

    
156

    
157
def _CleanDirectory(path, exclude=None):
158
  """Removes all regular files in a directory.
159

160
  @type path: str
161
  @param path: the directory to clean
162
  @type exclude: list
163
  @param exclude: list of files to be excluded, defaults
164
      to the empty list
165

166
  """
167
  if path not in _ALLOWED_CLEAN_DIRS:
168
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
169
          path)
170

    
171
  if not os.path.isdir(path):
172
    return
173
  if exclude is None:
174
    exclude = []
175
  else:
176
    # Normalize excluded paths
177
    exclude = [os.path.normpath(i) for i in exclude]
178

    
179
  for rel_name in utils.ListVisibleFiles(path):
180
    full_name = utils.PathJoin(path, rel_name)
181
    if full_name in exclude:
182
      continue
183
    if os.path.isfile(full_name) and not os.path.islink(full_name):
184
      utils.RemoveFile(full_name)
185

    
186

    
187
def _BuildUploadFileList():
188
  """Build the list of allowed upload files.
189

190
  This is abstracted so that it's built only once at module import time.
191

192
  """
193
  allowed_files = set([
194
    constants.CLUSTER_CONF_FILE,
195
    constants.ETC_HOSTS,
196
    constants.SSH_KNOWN_HOSTS_FILE,
197
    constants.VNC_PASSWORD_FILE,
198
    constants.RAPI_CERT_FILE,
199
    constants.SPICE_CERT_FILE,
200
    constants.SPICE_CACERT_FILE,
201
    constants.RAPI_USERS_FILE,
202
    constants.CONFD_HMAC_KEY,
203
    constants.CLUSTER_DOMAIN_SECRET_FILE,
204
    ])
205

    
206
  for hv_name in constants.HYPER_TYPES:
207
    hv_class = hypervisor.GetHypervisorClass(hv_name)
208
    allowed_files.update(hv_class.GetAncillaryFiles())
209

    
210
  return frozenset(allowed_files)
211

    
212

    
213
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
214

    
215

    
216
def JobQueuePurge():
217
  """Removes job queue files and archived jobs.
218

219
  @rtype: tuple
220
  @return: True, None
221

222
  """
223
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
224
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
225

    
226

    
227
def GetMasterInfo():
228
  """Returns master information.
229

230
  This is an utility function to compute master information, either
231
  for consumption here or from the node daemon.
232

233
  @rtype: tuple
234
  @return: master_netdev, master_ip, master_name, primary_ip_family
235
  @raise RPCFail: in case of errors
236

237
  """
238
  try:
239
    cfg = _GetConfig()
240
    master_netdev = cfg.GetMasterNetdev()
241
    master_ip = cfg.GetMasterIP()
242
    master_node = cfg.GetMasterNode()
243
    primary_ip_family = cfg.GetPrimaryIPFamily()
244
  except errors.ConfigurationError, err:
245
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
246
  return (master_netdev, master_ip, master_node, primary_ip_family)
247

    
248

    
249
def StartMaster(start_daemons, no_voting):
250
  """Activate local node as master node.
251

252
  The function will either try activate the IP address of the master
253
  (unless someone else has it) or also start the master daemons, based
254
  on the start_daemons parameter.
255

256
  @type start_daemons: boolean
257
  @param start_daemons: whether to start the master daemons
258
      (ganeti-masterd and ganeti-rapi), or (if false) activate the
259
      master ip
260
  @type no_voting: boolean
261
  @param no_voting: whether to start ganeti-masterd without a node vote
262
      (if start_daemons is True), but still non-interactively
263
  @rtype: None
264

265
  """
266
  # GetMasterInfo will raise an exception if not able to return data
267
  master_netdev, master_ip, _, family = GetMasterInfo()
268

    
269
  err_msgs = []
270
  # either start the master and rapi daemons
271
  if start_daemons:
272
    if no_voting:
273
      masterd_args = "--no-voting --yes-do-it"
274
    else:
275
      masterd_args = ""
276

    
277
    env = {
278
      "EXTRA_MASTERD_ARGS": masterd_args,
279
      }
280

    
281
    result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
282
    if result.failed:
283
      msg = "Can't start Ganeti master: %s" % result.output
284
      logging.error(msg)
285
      err_msgs.append(msg)
286
  # or activate the IP
287
  else:
288
    if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
289
      if netutils.IPAddress.Own(master_ip):
290
        # we already have the ip:
291
        logging.debug("Master IP already configured, doing nothing")
292
      else:
293
        msg = "Someone else has the master ip, not activating"
294
        logging.error(msg)
295
        err_msgs.append(msg)
296
    else:
297
      ipcls = netutils.IP4Address
298
      if family == netutils.IP6Address.family:
299
        ipcls = netutils.IP6Address
300

    
301
      result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
302
                             "%s/%d" % (master_ip, ipcls.iplen),
303
                             "dev", master_netdev, "label",
304
                             "%s:0" % master_netdev])
305
      if result.failed:
306
        msg = "Can't activate master IP: %s" % result.output
307
        logging.error(msg)
308
        err_msgs.append(msg)
309

    
310
      # we ignore the exit code of the following cmds
311
      if ipcls == netutils.IP4Address:
312
        utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
313
                      master_ip, master_ip])
314
      elif ipcls == netutils.IP6Address:
315
        try:
316
          utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
317
        except errors.OpExecError:
318
          # TODO: Better error reporting
319
          logging.warning("Can't execute ndisc6, please install if missing")
320

    
321
  if err_msgs:
322
    _Fail("; ".join(err_msgs))
323

    
324

    
325
def StopMaster(stop_daemons):
326
  """Deactivate this node as master.
327

328
  The function will always try to deactivate the IP address of the
329
  master. It will also stop the master daemons depending on the
330
  stop_daemons parameter.
331

332
  @type stop_daemons: boolean
333
  @param stop_daemons: whether to also stop the master daemons
334
      (ganeti-masterd and ganeti-rapi)
335
  @rtype: None
336

337
  """
338
  # TODO: log and report back to the caller the error failures; we
339
  # need to decide in which case we fail the RPC for this
340

    
341
  # GetMasterInfo will raise an exception if not able to return data
342
  master_netdev, master_ip, _, family = GetMasterInfo()
343

    
344
  ipcls = netutils.IP4Address
345
  if family == netutils.IP6Address.family:
346
    ipcls = netutils.IP6Address
347

    
348
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
349
                         "%s/%d" % (master_ip, ipcls.iplen),
350
                         "dev", master_netdev])
351
  if result.failed:
352
    logging.error("Can't remove the master IP, error: %s", result.output)
353
    # but otherwise ignore the failure
354

    
355
  if stop_daemons:
356
    result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
357
    if result.failed:
358
      logging.error("Could not stop Ganeti master, command %s had exitcode %s"
359
                    " and error %s",
360
                    result.cmd, result.exit_code, result.output)
361

    
362

    
363
def EtcHostsModify(mode, host, ip):
364
  """Modify a host entry in /etc/hosts.
365

366
  @param mode: The mode to operate. Either add or remove entry
367
  @param host: The host to operate on
368
  @param ip: The ip associated with the entry
369

370
  """
371
  if mode == constants.ETC_HOSTS_ADD:
372
    if not ip:
373
      RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
374
              " present")
375
    utils.AddHostToEtcHosts(host, ip)
376
  elif mode == constants.ETC_HOSTS_REMOVE:
377
    if ip:
378
      RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
379
              " parameter is present")
380
    utils.RemoveHostFromEtcHosts(host)
381
  else:
382
    RPCFail("Mode not supported")
383

    
384

    
385
def LeaveCluster(modify_ssh_setup):
386
  """Cleans up and remove the current node.
387

388
  This function cleans up and prepares the current node to be removed
389
  from the cluster.
390

391
  If processing is successful, then it raises an
392
  L{errors.QuitGanetiException} which is used as a special case to
393
  shutdown the node daemon.
394

395
  @param modify_ssh_setup: boolean
396

397
  """
398
  _CleanDirectory(constants.DATA_DIR)
399
  _CleanDirectory(constants.CRYPTO_KEYS_DIR)
400
  JobQueuePurge()
401

    
402
  if modify_ssh_setup:
403
    try:
404
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
405

    
406
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
407

    
408
      utils.RemoveFile(priv_key)
409
      utils.RemoveFile(pub_key)
410
    except errors.OpExecError:
411
      logging.exception("Error while processing ssh files")
412

    
413
  try:
414
    utils.RemoveFile(constants.CONFD_HMAC_KEY)
415
    utils.RemoveFile(constants.RAPI_CERT_FILE)
416
    utils.RemoveFile(constants.SPICE_CERT_FILE)
417
    utils.RemoveFile(constants.SPICE_CACERT_FILE)
418
    utils.RemoveFile(constants.NODED_CERT_FILE)
419
  except: # pylint: disable=W0702
420
    logging.exception("Error while removing cluster secrets")
421

    
422
  result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
423
  if result.failed:
424
    logging.error("Command %s failed with exitcode %s and error %s",
425
                  result.cmd, result.exit_code, result.output)
426

    
427
  # Raise a custom exception (handled in ganeti-noded)
428
  raise errors.QuitGanetiException(True, "Shutdown scheduled")
429

    
430

    
431
def GetNodeInfo(vgname, hypervisor_type):
432
  """Gives back a hash with different information about the node.
433

434
  @type vgname: C{string}
435
  @param vgname: the name of the volume group to ask for disk space information
436
  @type hypervisor_type: C{str}
437
  @param hypervisor_type: the name of the hypervisor to ask for
438
      memory information
439
  @rtype: C{dict}
440
  @return: dictionary with the following keys:
441
      - vg_size is the size of the configured volume group in MiB
442
      - vg_free is the free size of the volume group in MiB
443
      - memory_dom0 is the memory allocated for domain0 in MiB
444
      - memory_free is the currently available (free) ram in MiB
445
      - memory_total is the total number of ram in MiB
446

447
  """
448
  outputarray = {}
449

    
450
  if vgname is not None:
451
    vginfo = bdev.LogicalVolume.GetVGInfo([vgname])
452
    vg_free = vg_size = None
453
    if vginfo:
454
      vg_free = int(round(vginfo[0][0], 0))
455
      vg_size = int(round(vginfo[0][1], 0))
456
    outputarray["vg_size"] = vg_size
457
    outputarray["vg_free"] = vg_free
458

    
459
  if hypervisor_type is not None:
460
    hyper = hypervisor.GetHypervisor(hypervisor_type)
461
    hyp_info = hyper.GetNodeInfo()
462
    if hyp_info is not None:
463
      outputarray.update(hyp_info)
464

    
465
  outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
466

    
467
  return outputarray
468

    
469

    
470
def VerifyNode(what, cluster_name):
471
  """Verify the status of the local node.
472

473
  Based on the input L{what} parameter, various checks are done on the
474
  local node.
475

476
  If the I{filelist} key is present, this list of
477
  files is checksummed and the file/checksum pairs are returned.
478

479
  If the I{nodelist} key is present, we check that we have
480
  connectivity via ssh with the target nodes (and check the hostname
481
  report).
482

483
  If the I{node-net-test} key is present, we check that we have
484
  connectivity to the given nodes via both primary IP and, if
485
  applicable, secondary IPs.
486

487
  @type what: C{dict}
488
  @param what: a dictionary of things to check:
489
      - filelist: list of files for which to compute checksums
490
      - nodelist: list of nodes we should check ssh communication with
491
      - node-net-test: list of nodes we should check node daemon port
492
        connectivity with
493
      - hypervisor: list with hypervisors to run the verify for
494
  @rtype: dict
495
  @return: a dictionary with the same keys as the input dict, and
496
      values representing the result of the checks
497

498
  """
499
  result = {}
500
  my_name = netutils.Hostname.GetSysName()
501
  port = netutils.GetDaemonPort(constants.NODED)
502
  vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
503

    
504
  if constants.NV_HYPERVISOR in what and vm_capable:
505
    result[constants.NV_HYPERVISOR] = tmp = {}
506
    for hv_name in what[constants.NV_HYPERVISOR]:
507
      try:
508
        val = hypervisor.GetHypervisor(hv_name).Verify()
509
      except errors.HypervisorError, err:
510
        val = "Error while checking hypervisor: %s" % str(err)
511
      tmp[hv_name] = val
512

    
513
  if constants.NV_HVPARAMS in what and vm_capable:
514
    result[constants.NV_HVPARAMS] = tmp = []
515
    for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
516
      try:
517
        logging.info("Validating hv %s, %s", hv_name, hvparms)
518
        hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
519
      except errors.HypervisorError, err:
520
        tmp.append((source, hv_name, str(err)))
521

    
522
  if constants.NV_FILELIST in what:
523
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
524
      what[constants.NV_FILELIST])
525

    
526
  if constants.NV_NODELIST in what:
527
    result[constants.NV_NODELIST] = tmp = {}
528
    random.shuffle(what[constants.NV_NODELIST])
529
    for node in what[constants.NV_NODELIST]:
530
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
531
      if not success:
532
        tmp[node] = message
533

    
534
  if constants.NV_NODENETTEST in what:
535
    result[constants.NV_NODENETTEST] = tmp = {}
536
    my_pip = my_sip = None
537
    for name, pip, sip in what[constants.NV_NODENETTEST]:
538
      if name == my_name:
539
        my_pip = pip
540
        my_sip = sip
541
        break
542
    if not my_pip:
543
      tmp[my_name] = ("Can't find my own primary/secondary IP"
544
                      " in the node list")
545
    else:
546
      for name, pip, sip in what[constants.NV_NODENETTEST]:
547
        fail = []
548
        if not netutils.TcpPing(pip, port, source=my_pip):
549
          fail.append("primary")
550
        if sip != pip:
551
          if not netutils.TcpPing(sip, port, source=my_sip):
552
            fail.append("secondary")
553
        if fail:
554
          tmp[name] = ("failure using the %s interface(s)" %
555
                       " and ".join(fail))
556

    
557
  if constants.NV_MASTERIP in what:
558
    # FIXME: add checks on incoming data structures (here and in the
559
    # rest of the function)
560
    master_name, master_ip = what[constants.NV_MASTERIP]
561
    if master_name == my_name:
562
      source = constants.IP4_ADDRESS_LOCALHOST
563
    else:
564
      source = None
565
    result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
566
                                                  source=source)
567

    
568
  if constants.NV_OOB_PATHS in what:
569
    result[constants.NV_OOB_PATHS] = tmp = []
570
    for path in what[constants.NV_OOB_PATHS]:
571
      try:
572
        st = os.stat(path)
573
      except OSError, err:
574
        tmp.append("error stating out of band helper: %s" % err)
575
      else:
576
        if stat.S_ISREG(st.st_mode):
577
          if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
578
            tmp.append(None)
579
          else:
580
            tmp.append("out of band helper %s is not executable" % path)
581
        else:
582
          tmp.append("out of band helper %s is not a file" % path)
583

    
584
  if constants.NV_LVLIST in what and vm_capable:
585
    try:
586
      val = GetVolumeList(utils.ListVolumeGroups().keys())
587
    except RPCFail, err:
588
      val = str(err)
589
    result[constants.NV_LVLIST] = val
590

    
591
  if constants.NV_INSTANCELIST in what and vm_capable:
592
    # GetInstanceList can fail
593
    try:
594
      val = GetInstanceList(what[constants.NV_INSTANCELIST])
595
    except RPCFail, err:
596
      val = str(err)
597
    result[constants.NV_INSTANCELIST] = val
598

    
599
  if constants.NV_VGLIST in what and vm_capable:
600
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
601

    
602
  if constants.NV_PVLIST in what and vm_capable:
603
    result[constants.NV_PVLIST] = \
604
      bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
605
                                   filter_allocatable=False)
606

    
607
  if constants.NV_VERSION in what:
608
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
609
                                    constants.RELEASE_VERSION)
610

    
611
  if constants.NV_HVINFO in what and vm_capable:
612
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
613
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
614

    
615
  if constants.NV_DRBDLIST in what and vm_capable:
616
    try:
617
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
618
    except errors.BlockDeviceError, err:
619
      logging.warning("Can't get used minors list", exc_info=True)
620
      used_minors = str(err)
621
    result[constants.NV_DRBDLIST] = used_minors
622

    
623
  if constants.NV_DRBDHELPER in what and vm_capable:
624
    status = True
625
    try:
626
      payload = bdev.BaseDRBD.GetUsermodeHelper()
627
    except errors.BlockDeviceError, err:
628
      logging.error("Can't get DRBD usermode helper: %s", str(err))
629
      status = False
630
      payload = str(err)
631
    result[constants.NV_DRBDHELPER] = (status, payload)
632

    
633
  if constants.NV_NODESETUP in what:
634
    result[constants.NV_NODESETUP] = tmpr = []
635
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
636
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
637
                  " under /sys, missing required directories /sys/block"
638
                  " and /sys/class/net")
639
    if (not os.path.isdir("/proc/sys") or
640
        not os.path.isfile("/proc/sysrq-trigger")):
641
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
642
                  " under /proc, missing required directory /proc/sys and"
643
                  " the file /proc/sysrq-trigger")
644

    
645
  if constants.NV_TIME in what:
646
    result[constants.NV_TIME] = utils.SplitTime(time.time())
647

    
648
  if constants.NV_OSLIST in what and vm_capable:
649
    result[constants.NV_OSLIST] = DiagnoseOS()
650

    
651
  if constants.NV_BRIDGES in what and vm_capable:
652
    result[constants.NV_BRIDGES] = [bridge
653
                                    for bridge in what[constants.NV_BRIDGES]
654
                                    if not utils.BridgeExists(bridge)]
655
  return result
656

    
657

    
658
def GetBlockDevSizes(devices):
659
  """Return the size of the given block devices
660

661
  @type devices: list
662
  @param devices: list of block device nodes to query
663
  @rtype: dict
664
  @return:
665
    dictionary of all block devices under /dev (key). The value is their
666
    size in MiB.
667

668
    {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
669

670
  """
671
  DEV_PREFIX = "/dev/"
672
  blockdevs = {}
673

    
674
  for devpath in devices:
675
    if not utils.IsBelowDir(DEV_PREFIX, devpath):
676
      continue
677

    
678
    try:
679
      st = os.stat(devpath)
680
    except EnvironmentError, err:
681
      logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
682
      continue
683

    
684
    if stat.S_ISBLK(st.st_mode):
685
      result = utils.RunCmd(["blockdev", "--getsize64", devpath])
686
      if result.failed:
687
        # We don't want to fail, just do not list this device as available
688
        logging.warning("Cannot get size for block device %s", devpath)
689
        continue
690

    
691
      size = int(result.stdout) / (1024 * 1024)
692
      blockdevs[devpath] = size
693
  return blockdevs
694

    
695

    
696
def GetVolumeList(vg_names):
697
  """Compute list of logical volumes and their size.
698

699
  @type vg_names: list
700
  @param vg_names: the volume groups whose LVs we should list, or
701
      empty for all volume groups
702
  @rtype: dict
703
  @return:
704
      dictionary of all partions (key) with value being a tuple of
705
      their size (in MiB), inactive and online status::
706

707
        {'xenvg/test1': ('20.06', True, True)}
708

709
      in case of errors, a string is returned with the error
710
      details.
711

712
  """
713
  lvs = {}
714
  sep = "|"
715
  if not vg_names:
716
    vg_names = []
717
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
718
                         "--separator=%s" % sep,
719
                         "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
720
  if result.failed:
721
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
722

    
723
  for line in result.stdout.splitlines():
724
    line = line.strip()
725
    match = _LVSLINE_REGEX.match(line)
726
    if not match:
727
      logging.error("Invalid line returned from lvs output: '%s'", line)
728
      continue
729
    vg_name, name, size, attr = match.groups()
730
    inactive = attr[4] == "-"
731
    online = attr[5] == "o"
732
    virtual = attr[0] == "v"
733
    if virtual:
734
      # we don't want to report such volumes as existing, since they
735
      # don't really hold data
736
      continue
737
    lvs[vg_name + "/" + name] = (size, inactive, online)
738

    
739
  return lvs
740

    
741

    
742
def ListVolumeGroups():
743
  """List the volume groups and their size.
744

745
  @rtype: dict
746
  @return: dictionary with keys volume name and values the
747
      size of the volume
748

749
  """
750
  return utils.ListVolumeGroups()
751

    
752

    
753
def NodeVolumes():
754
  """List all volumes on this node.
755

756
  @rtype: list
757
  @return:
758
    A list of dictionaries, each having four keys:
759
      - name: the logical volume name,
760
      - size: the size of the logical volume
761
      - dev: the physical device on which the LV lives
762
      - vg: the volume group to which it belongs
763

764
    In case of errors, we return an empty list and log the
765
    error.
766

767
    Note that since a logical volume can live on multiple physical
768
    volumes, the resulting list might include a logical volume
769
    multiple times.
770

771
  """
772
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
773
                         "--separator=|",
774
                         "--options=lv_name,lv_size,devices,vg_name"])
775
  if result.failed:
776
    _Fail("Failed to list logical volumes, lvs output: %s",
777
          result.output)
778

    
779
  def parse_dev(dev):
780
    return dev.split("(")[0]
781

    
782
  def handle_dev(dev):
783
    return [parse_dev(x) for x in dev.split(",")]
784

    
785
  def map_line(line):
786
    line = [v.strip() for v in line]
787
    return [{"name": line[0], "size": line[1],
788
             "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
789

    
790
  all_devs = []
791
  for line in result.stdout.splitlines():
792
    if line.count("|") >= 3:
793
      all_devs.extend(map_line(line.split("|")))
794
    else:
795
      logging.warning("Strange line in the output from lvs: '%s'", line)
796
  return all_devs
797

    
798

    
799
def BridgesExist(bridges_list):
800
  """Check if a list of bridges exist on the current node.
801

802
  @rtype: boolean
803
  @return: C{True} if all of them exist, C{False} otherwise
804

805
  """
806
  missing = []
807
  for bridge in bridges_list:
808
    if not utils.BridgeExists(bridge):
809
      missing.append(bridge)
810

    
811
  if missing:
812
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
813

    
814

    
815
def GetInstanceList(hypervisor_list):
816
  """Provides a list of instances.
817

818
  @type hypervisor_list: list
819
  @param hypervisor_list: the list of hypervisors to query information
820

821
  @rtype: list
822
  @return: a list of all running instances on the current node
823
    - instance1.example.com
824
    - instance2.example.com
825

826
  """
827
  results = []
828
  for hname in hypervisor_list:
829
    try:
830
      names = hypervisor.GetHypervisor(hname).ListInstances()
831
      results.extend(names)
832
    except errors.HypervisorError, err:
833
      _Fail("Error enumerating instances (hypervisor %s): %s",
834
            hname, err, exc=True)
835

    
836
  return results
837

    
838

    
839
def GetInstanceInfo(instance, hname):
840
  """Gives back the information about an instance as a dictionary.
841

842
  @type instance: string
843
  @param instance: the instance name
844
  @type hname: string
845
  @param hname: the hypervisor type of the instance
846

847
  @rtype: dict
848
  @return: dictionary with the following keys:
849
      - memory: memory size of instance (int)
850
      - state: xen state of instance (string)
851
      - time: cpu time of instance (float)
852

853
  """
854
  output = {}
855

    
856
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
857
  if iinfo is not None:
858
    output["memory"] = iinfo[2]
859
    output["state"] = iinfo[4]
860
    output["time"] = iinfo[5]
861

    
862
  return output
863

    
864

    
865
def GetInstanceMigratable(instance):
866
  """Gives whether an instance can be migrated.
867

868
  @type instance: L{objects.Instance}
869
  @param instance: object representing the instance to be checked.
870

871
  @rtype: tuple
872
  @return: tuple of (result, description) where:
873
      - result: whether the instance can be migrated or not
874
      - description: a description of the issue, if relevant
875

876
  """
877
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
878
  iname = instance.name
879
  if iname not in hyper.ListInstances():
880
    _Fail("Instance %s is not running", iname)
881

    
882
  for idx in range(len(instance.disks)):
883
    link_name = _GetBlockDevSymlinkPath(iname, idx)
884
    if not os.path.islink(link_name):
885
      logging.warning("Instance %s is missing symlink %s for disk %d",
886
                      iname, link_name, idx)
887

    
888

    
889
def GetAllInstancesInfo(hypervisor_list):
890
  """Gather data about all instances.
891

892
  This is the equivalent of L{GetInstanceInfo}, except that it
893
  computes data for all instances at once, thus being faster if one
894
  needs data about more than one instance.
895

896
  @type hypervisor_list: list
897
  @param hypervisor_list: list of hypervisors to query for instance data
898

899
  @rtype: dict
900
  @return: dictionary of instance: data, with data having the following keys:
901
      - memory: memory size of instance (int)
902
      - state: xen state of instance (string)
903
      - time: cpu time of instance (float)
904
      - vcpus: the number of vcpus
905

906
  """
907
  output = {}
908

    
909
  for hname in hypervisor_list:
910
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
911
    if iinfo:
912
      for name, _, memory, vcpus, state, times in iinfo:
913
        value = {
914
          "memory": memory,
915
          "vcpus": vcpus,
916
          "state": state,
917
          "time": times,
918
          }
919
        if name in output:
920
          # we only check static parameters, like memory and vcpus,
921
          # and not state and time which can change between the
922
          # invocations of the different hypervisors
923
          for key in "memory", "vcpus":
924
            if value[key] != output[name][key]:
925
              _Fail("Instance %s is running twice"
926
                    " with different parameters", name)
927
        output[name] = value
928

    
929
  return output
930

    
931

    
932
def _InstanceLogName(kind, os_name, instance, component):
933
  """Compute the OS log filename for a given instance and operation.
934

935
  The instance name and os name are passed in as strings since not all
936
  operations have these as part of an instance object.
937

938
  @type kind: string
939
  @param kind: the operation type (e.g. add, import, etc.)
940
  @type os_name: string
941
  @param os_name: the os name
942
  @type instance: string
943
  @param instance: the name of the instance being imported/added/etc.
944
  @type component: string or None
945
  @param component: the name of the component of the instance being
946
      transferred
947

948
  """
949
  # TODO: Use tempfile.mkstemp to create unique filename
950
  if component:
951
    assert "/" not in component
952
    c_msg = "-%s" % component
953
  else:
954
    c_msg = ""
955
  base = ("%s-%s-%s%s-%s.log" %
956
          (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
957
  return utils.PathJoin(constants.LOG_OS_DIR, base)
958

    
959

    
960
def InstanceOsAdd(instance, reinstall, debug):
961
  """Add an OS to an instance.
962

963
  @type instance: L{objects.Instance}
964
  @param instance: Instance whose OS is to be installed
965
  @type reinstall: boolean
966
  @param reinstall: whether this is an instance reinstall
967
  @type debug: integer
968
  @param debug: debug level, passed to the OS scripts
969
  @rtype: None
970

971
  """
972
  inst_os = OSFromDisk(instance.os)
973

    
974
  create_env = OSEnvironment(instance, inst_os, debug)
975
  if reinstall:
976
    create_env["INSTANCE_REINSTALL"] = "1"
977

    
978
  logfile = _InstanceLogName("add", instance.os, instance.name, None)
979

    
980
  result = utils.RunCmd([inst_os.create_script], env=create_env,
981
                        cwd=inst_os.path, output=logfile, reset_env=True)
982
  if result.failed:
983
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
984
                  " output: %s", result.cmd, result.fail_reason, logfile,
985
                  result.output)
986
    lines = [utils.SafeEncode(val)
987
             for val in utils.TailFile(logfile, lines=20)]
988
    _Fail("OS create script failed (%s), last lines in the"
989
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
990

    
991

    
992
def RunRenameInstance(instance, old_name, debug):
993
  """Run the OS rename script for an instance.
994

995
  @type instance: L{objects.Instance}
996
  @param instance: Instance whose OS is to be installed
997
  @type old_name: string
998
  @param old_name: previous instance name
999
  @type debug: integer
1000
  @param debug: debug level, passed to the OS scripts
1001
  @rtype: boolean
1002
  @return: the success of the operation
1003

1004
  """
1005
  inst_os = OSFromDisk(instance.os)
1006

    
1007
  rename_env = OSEnvironment(instance, inst_os, debug)
1008
  rename_env["OLD_INSTANCE_NAME"] = old_name
1009

    
1010
  logfile = _InstanceLogName("rename", instance.os,
1011
                             "%s-%s" % (old_name, instance.name), None)
1012

    
1013
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1014
                        cwd=inst_os.path, output=logfile, reset_env=True)
1015

    
1016
  if result.failed:
1017
    logging.error("os create command '%s' returned error: %s output: %s",
1018
                  result.cmd, result.fail_reason, result.output)
1019
    lines = [utils.SafeEncode(val)
1020
             for val in utils.TailFile(logfile, lines=20)]
1021
    _Fail("OS rename script failed (%s), last lines in the"
1022
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1023

    
1024

    
1025
def _GetBlockDevSymlinkPath(instance_name, idx):
1026
  return utils.PathJoin(constants.DISK_LINKS_DIR, "%s%s%d" %
1027
                        (instance_name, constants.DISK_SEPARATOR, idx))
1028

    
1029

    
1030
def _SymlinkBlockDev(instance_name, device_path, idx):
1031
  """Set up symlinks to a instance's block device.
1032

1033
  This is an auxiliary function run when an instance is start (on the primary
1034
  node) or when an instance is migrated (on the target node).
1035

1036

1037
  @param instance_name: the name of the target instance
1038
  @param device_path: path of the physical block device, on the node
1039
  @param idx: the disk index
1040
  @return: absolute path to the disk's symlink
1041

1042
  """
1043
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1044
  try:
1045
    os.symlink(device_path, link_name)
1046
  except OSError, err:
1047
    if err.errno == errno.EEXIST:
1048
      if (not os.path.islink(link_name) or
1049
          os.readlink(link_name) != device_path):
1050
        os.remove(link_name)
1051
        os.symlink(device_path, link_name)
1052
    else:
1053
      raise
1054

    
1055
  return link_name
1056

    
1057

    
1058
def _RemoveBlockDevLinks(instance_name, disks):
1059
  """Remove the block device symlinks belonging to the given instance.
1060

1061
  """
1062
  for idx, _ in enumerate(disks):
1063
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1064
    if os.path.islink(link_name):
1065
      try:
1066
        os.remove(link_name)
1067
      except OSError:
1068
        logging.exception("Can't remove symlink '%s'", link_name)
1069

    
1070

    
1071
def _GatherAndLinkBlockDevs(instance):
1072
  """Set up an instance's block device(s).
1073

1074
  This is run on the primary node at instance startup. The block
1075
  devices must be already assembled.
1076

1077
  @type instance: L{objects.Instance}
1078
  @param instance: the instance whose disks we shoul assemble
1079
  @rtype: list
1080
  @return: list of (disk_object, device_path)
1081

1082
  """
1083
  block_devices = []
1084
  for idx, disk in enumerate(instance.disks):
1085
    device = _RecursiveFindBD(disk)
1086
    if device is None:
1087
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1088
                                    str(disk))
1089
    device.Open()
1090
    try:
1091
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1092
    except OSError, e:
1093
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1094
                                    e.strerror)
1095

    
1096
    block_devices.append((disk, link_name))
1097

    
1098
  return block_devices
1099

    
1100

    
1101
def StartInstance(instance, startup_paused):
1102
  """Start an instance.
1103

1104
  @type instance: L{objects.Instance}
1105
  @param instance: the instance object
1106
  @type startup_paused: bool
1107
  @param instance: pause instance at startup?
1108
  @rtype: None
1109

1110
  """
1111
  running_instances = GetInstanceList([instance.hypervisor])
1112

    
1113
  if instance.name in running_instances:
1114
    logging.info("Instance %s already running, not starting", instance.name)
1115
    return
1116

    
1117
  try:
1118
    block_devices = _GatherAndLinkBlockDevs(instance)
1119
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1120
    hyper.StartInstance(instance, block_devices, startup_paused)
1121
  except errors.BlockDeviceError, err:
1122
    _Fail("Block device error: %s", err, exc=True)
1123
  except errors.HypervisorError, err:
1124
    _RemoveBlockDevLinks(instance.name, instance.disks)
1125
    _Fail("Hypervisor error: %s", err, exc=True)
1126

    
1127

    
1128
def InstanceShutdown(instance, timeout):
1129
  """Shut an instance down.
1130

1131
  @note: this functions uses polling with a hardcoded timeout.
1132

1133
  @type instance: L{objects.Instance}
1134
  @param instance: the instance object
1135
  @type timeout: integer
1136
  @param timeout: maximum timeout for soft shutdown
1137
  @rtype: None
1138

1139
  """
1140
  hv_name = instance.hypervisor
1141
  hyper = hypervisor.GetHypervisor(hv_name)
1142
  iname = instance.name
1143

    
1144
  if instance.name not in hyper.ListInstances():
1145
    logging.info("Instance %s not running, doing nothing", iname)
1146
    return
1147

    
1148
  class _TryShutdown:
1149
    def __init__(self):
1150
      self.tried_once = False
1151

    
1152
    def __call__(self):
1153
      if iname not in hyper.ListInstances():
1154
        return
1155

    
1156
      try:
1157
        hyper.StopInstance(instance, retry=self.tried_once)
1158
      except errors.HypervisorError, err:
1159
        if iname not in hyper.ListInstances():
1160
          # if the instance is no longer existing, consider this a
1161
          # success and go to cleanup
1162
          return
1163

    
1164
        _Fail("Failed to stop instance %s: %s", iname, err)
1165

    
1166
      self.tried_once = True
1167

    
1168
      raise utils.RetryAgain()
1169

    
1170
  try:
1171
    utils.Retry(_TryShutdown(), 5, timeout)
1172
  except utils.RetryTimeout:
1173
    # the shutdown did not succeed
1174
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1175

    
1176
    try:
1177
      hyper.StopInstance(instance, force=True)
1178
    except errors.HypervisorError, err:
1179
      if iname in hyper.ListInstances():
1180
        # only raise an error if the instance still exists, otherwise
1181
        # the error could simply be "instance ... unknown"!
1182
        _Fail("Failed to force stop instance %s: %s", iname, err)
1183

    
1184
    time.sleep(1)
1185

    
1186
    if iname in hyper.ListInstances():
1187
      _Fail("Could not shutdown instance %s even by destroy", iname)
1188

    
1189
  try:
1190
    hyper.CleanupInstance(instance.name)
1191
  except errors.HypervisorError, err:
1192
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1193

    
1194
  _RemoveBlockDevLinks(iname, instance.disks)
1195

    
1196

    
1197
def InstanceReboot(instance, reboot_type, shutdown_timeout):
1198
  """Reboot an instance.
1199

1200
  @type instance: L{objects.Instance}
1201
  @param instance: the instance object to reboot
1202
  @type reboot_type: str
1203
  @param reboot_type: the type of reboot, one the following
1204
    constants:
1205
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1206
        instance OS, do not recreate the VM
1207
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1208
        restart the VM (at the hypervisor level)
1209
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1210
        not accepted here, since that mode is handled differently, in
1211
        cmdlib, and translates into full stop and start of the
1212
        instance (instead of a call_instance_reboot RPC)
1213
  @type shutdown_timeout: integer
1214
  @param shutdown_timeout: maximum timeout for soft shutdown
1215
  @rtype: None
1216

1217
  """
1218
  running_instances = GetInstanceList([instance.hypervisor])
1219

    
1220
  if instance.name not in running_instances:
1221
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1222

    
1223
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1224
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1225
    try:
1226
      hyper.RebootInstance(instance)
1227
    except errors.HypervisorError, err:
1228
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1229
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1230
    try:
1231
      InstanceShutdown(instance, shutdown_timeout)
1232
      return StartInstance(instance, False)
1233
    except errors.HypervisorError, err:
1234
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1235
  else:
1236
    _Fail("Invalid reboot_type received: %s", reboot_type)
1237

    
1238

    
1239
def MigrationInfo(instance):
1240
  """Gather information about an instance to be migrated.
1241

1242
  @type instance: L{objects.Instance}
1243
  @param instance: the instance definition
1244

1245
  """
1246
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1247
  try:
1248
    info = hyper.MigrationInfo(instance)
1249
  except errors.HypervisorError, err:
1250
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1251
  return info
1252

    
1253

    
1254
def AcceptInstance(instance, info, target):
1255
  """Prepare the node to accept an instance.
1256

1257
  @type instance: L{objects.Instance}
1258
  @param instance: the instance definition
1259
  @type info: string/data (opaque)
1260
  @param info: migration information, from the source node
1261
  @type target: string
1262
  @param target: target host (usually ip), on this node
1263

1264
  """
1265
  # TODO: why is this required only for DTS_EXT_MIRROR?
1266
  if instance.disk_template in constants.DTS_EXT_MIRROR:
1267
    # Create the symlinks, as the disks are not active
1268
    # in any way
1269
    try:
1270
      _GatherAndLinkBlockDevs(instance)
1271
    except errors.BlockDeviceError, err:
1272
      _Fail("Block device error: %s", err, exc=True)
1273

    
1274
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1275
  try:
1276
    hyper.AcceptInstance(instance, info, target)
1277
  except errors.HypervisorError, err:
1278
    if instance.disk_template in constants.DTS_EXT_MIRROR:
1279
      _RemoveBlockDevLinks(instance.name, instance.disks)
1280
    _Fail("Failed to accept instance: %s", err, exc=True)
1281

    
1282

    
1283
def FinalizeMigrationDst(instance, info, success):
1284
  """Finalize any preparation to accept an instance.
1285

1286
  @type instance: L{objects.Instance}
1287
  @param instance: the instance definition
1288
  @type info: string/data (opaque)
1289
  @param info: migration information, from the source node
1290
  @type success: boolean
1291
  @param success: whether the migration was a success or a failure
1292

1293
  """
1294
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1295
  try:
1296
    hyper.FinalizeMigrationDst(instance, info, success)
1297
  except errors.HypervisorError, err:
1298
    _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1299

    
1300

    
1301
def MigrateInstance(instance, target, live):
1302
  """Migrates an instance to another node.
1303

1304
  @type instance: L{objects.Instance}
1305
  @param instance: the instance definition
1306
  @type target: string
1307
  @param target: the target node name
1308
  @type live: boolean
1309
  @param live: whether the migration should be done live or not (the
1310
      interpretation of this parameter is left to the hypervisor)
1311
  @raise RPCFail: if migration fails for some reason
1312

1313
  """
1314
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1315

    
1316
  try:
1317
    hyper.MigrateInstance(instance, target, live)
1318
  except errors.HypervisorError, err:
1319
    _Fail("Failed to migrate instance: %s", err, exc=True)
1320

    
1321

    
1322
def FinalizeMigrationSource(instance, success, live):
1323
  """Finalize the instance migration on the source node.
1324

1325
  @type instance: L{objects.Instance}
1326
  @param instance: the instance definition of the migrated instance
1327
  @type success: bool
1328
  @param success: whether the migration succeeded or not
1329
  @type live: bool
1330
  @param live: whether the user requested a live migration or not
1331
  @raise RPCFail: If the execution fails for some reason
1332

1333
  """
1334
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1335

    
1336
  try:
1337
    hyper.FinalizeMigrationSource(instance, success, live)
1338
  except Exception, err:  # pylint: disable=W0703
1339
    _Fail("Failed to finalize the migration on the source node: %s", err,
1340
          exc=True)
1341

    
1342

    
1343
def GetMigrationStatus(instance):
1344
  """Get the migration status
1345

1346
  @type instance: L{objects.Instance}
1347
  @param instance: the instance that is being migrated
1348
  @rtype: L{objects.MigrationStatus}
1349
  @return: the status of the current migration (one of
1350
           L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
1351
           progress info that can be retrieved from the hypervisor
1352
  @raise RPCFail: If the migration status cannot be retrieved
1353

1354
  """
1355
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1356
  try:
1357
    return hyper.GetMigrationStatus(instance)
1358
  except Exception, err:  # pylint: disable=W0703
1359
    _Fail("Failed to get migration status: %s", err, exc=True)
1360

    
1361

    
1362
def BlockdevCreate(disk, size, owner, on_primary, info):
1363
  """Creates a block device for an instance.
1364

1365
  @type disk: L{objects.Disk}
1366
  @param disk: the object describing the disk we should create
1367
  @type size: int
1368
  @param size: the size of the physical underlying device, in MiB
1369
  @type owner: str
1370
  @param owner: the name of the instance for which disk is created,
1371
      used for device cache data
1372
  @type on_primary: boolean
1373
  @param on_primary:  indicates if it is the primary node or not
1374
  @type info: string
1375
  @param info: string that will be sent to the physical device
1376
      creation, used for example to set (LVM) tags on LVs
1377

1378
  @return: the new unique_id of the device (this can sometime be
1379
      computed only after creation), or None. On secondary nodes,
1380
      it's not required to return anything.
1381

1382
  """
1383
  # TODO: remove the obsolete "size" argument
1384
  # pylint: disable=W0613
1385
  clist = []
1386
  if disk.children:
1387
    for child in disk.children:
1388
      try:
1389
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1390
      except errors.BlockDeviceError, err:
1391
        _Fail("Can't assemble device %s: %s", child, err)
1392
      if on_primary or disk.AssembleOnSecondary():
1393
        # we need the children open in case the device itself has to
1394
        # be assembled
1395
        try:
1396
          # pylint: disable=E1103
1397
          crdev.Open()
1398
        except errors.BlockDeviceError, err:
1399
          _Fail("Can't make child '%s' read-write: %s", child, err)
1400
      clist.append(crdev)
1401

    
1402
  try:
1403
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1404
  except errors.BlockDeviceError, err:
1405
    _Fail("Can't create block device: %s", err)
1406

    
1407
  if on_primary or disk.AssembleOnSecondary():
1408
    try:
1409
      device.Assemble()
1410
    except errors.BlockDeviceError, err:
1411
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1412
    device.SetSyncSpeed(constants.SYNC_SPEED)
1413
    if on_primary or disk.OpenOnSecondary():
1414
      try:
1415
        device.Open(force=True)
1416
      except errors.BlockDeviceError, err:
1417
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1418
    DevCacheManager.UpdateCache(device.dev_path, owner,
1419
                                on_primary, disk.iv_name)
1420

    
1421
  device.SetInfo(info)
1422

    
1423
  return device.unique_id
1424

    
1425

    
1426
def _WipeDevice(path, offset, size):
1427
  """This function actually wipes the device.
1428

1429
  @param path: The path to the device to wipe
1430
  @param offset: The offset in MiB in the file
1431
  @param size: The size in MiB to write
1432

1433
  """
1434
  cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1435
         "bs=%d" % constants.WIPE_BLOCK_SIZE, "oflag=direct", "of=%s" % path,
1436
         "count=%d" % size]
1437
  result = utils.RunCmd(cmd)
1438

    
1439
  if result.failed:
1440
    _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1441
          result.fail_reason, result.output)
1442

    
1443

    
1444
def BlockdevWipe(disk, offset, size):
1445
  """Wipes a block device.
1446

1447
  @type disk: L{objects.Disk}
1448
  @param disk: the disk object we want to wipe
1449
  @type offset: int
1450
  @param offset: The offset in MiB in the file
1451
  @type size: int
1452
  @param size: The size in MiB to write
1453

1454
  """
1455
  try:
1456
    rdev = _RecursiveFindBD(disk)
1457
  except errors.BlockDeviceError:
1458
    rdev = None
1459

    
1460
  if not rdev:
1461
    _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1462

    
1463
  # Do cross verify some of the parameters
1464
  if offset > rdev.size:
1465
    _Fail("Offset is bigger than device size")
1466
  if (offset + size) > rdev.size:
1467
    _Fail("The provided offset and size to wipe is bigger than device size")
1468

    
1469
  _WipeDevice(rdev.dev_path, offset, size)
1470

    
1471

    
1472
def BlockdevPauseResumeSync(disks, pause):
1473
  """Pause or resume the sync of the block device.
1474

1475
  @type disks: list of L{objects.Disk}
1476
  @param disks: the disks object we want to pause/resume
1477
  @type pause: bool
1478
  @param pause: Wheater to pause or resume
1479

1480
  """
1481
  success = []
1482
  for disk in disks:
1483
    try:
1484
      rdev = _RecursiveFindBD(disk)
1485
    except errors.BlockDeviceError:
1486
      rdev = None
1487

    
1488
    if not rdev:
1489
      success.append((False, ("Cannot change sync for device %s:"
1490
                              " device not found" % disk.iv_name)))
1491
      continue
1492

    
1493
    result = rdev.PauseResumeSync(pause)
1494

    
1495
    if result:
1496
      success.append((result, None))
1497
    else:
1498
      if pause:
1499
        msg = "Pause"
1500
      else:
1501
        msg = "Resume"
1502
      success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1503

    
1504
  return success
1505

    
1506

    
1507
def BlockdevRemove(disk):
1508
  """Remove a block device.
1509

1510
  @note: This is intended to be called recursively.
1511

1512
  @type disk: L{objects.Disk}
1513
  @param disk: the disk object we should remove
1514
  @rtype: boolean
1515
  @return: the success of the operation
1516

1517
  """
1518
  msgs = []
1519
  try:
1520
    rdev = _RecursiveFindBD(disk)
1521
  except errors.BlockDeviceError, err:
1522
    # probably can't attach
1523
    logging.info("Can't attach to device %s in remove", disk)
1524
    rdev = None
1525
  if rdev is not None:
1526
    r_path = rdev.dev_path
1527
    try:
1528
      rdev.Remove()
1529
    except errors.BlockDeviceError, err:
1530
      msgs.append(str(err))
1531
    if not msgs:
1532
      DevCacheManager.RemoveCache(r_path)
1533

    
1534
  if disk.children:
1535
    for child in disk.children:
1536
      try:
1537
        BlockdevRemove(child)
1538
      except RPCFail, err:
1539
        msgs.append(str(err))
1540

    
1541
  if msgs:
1542
    _Fail("; ".join(msgs))
1543

    
1544

    
1545
def _RecursiveAssembleBD(disk, owner, as_primary):
1546
  """Activate a block device for an instance.
1547

1548
  This is run on the primary and secondary nodes for an instance.
1549

1550
  @note: this function is called recursively.
1551

1552
  @type disk: L{objects.Disk}
1553
  @param disk: the disk we try to assemble
1554
  @type owner: str
1555
  @param owner: the name of the instance which owns the disk
1556
  @type as_primary: boolean
1557
  @param as_primary: if we should make the block device
1558
      read/write
1559

1560
  @return: the assembled device or None (in case no device
1561
      was assembled)
1562
  @raise errors.BlockDeviceError: in case there is an error
1563
      during the activation of the children or the device
1564
      itself
1565

1566
  """
1567
  children = []
1568
  if disk.children:
1569
    mcn = disk.ChildrenNeeded()
1570
    if mcn == -1:
1571
      mcn = 0 # max number of Nones allowed
1572
    else:
1573
      mcn = len(disk.children) - mcn # max number of Nones
1574
    for chld_disk in disk.children:
1575
      try:
1576
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1577
      except errors.BlockDeviceError, err:
1578
        if children.count(None) >= mcn:
1579
          raise
1580
        cdev = None
1581
        logging.error("Error in child activation (but continuing): %s",
1582
                      str(err))
1583
      children.append(cdev)
1584

    
1585
  if as_primary or disk.AssembleOnSecondary():
1586
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1587
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1588
    result = r_dev
1589
    if as_primary or disk.OpenOnSecondary():
1590
      r_dev.Open()
1591
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1592
                                as_primary, disk.iv_name)
1593

    
1594
  else:
1595
    result = True
1596
  return result
1597

    
1598

    
1599
def BlockdevAssemble(disk, owner, as_primary, idx):
1600
  """Activate a block device for an instance.
1601

1602
  This is a wrapper over _RecursiveAssembleBD.
1603

1604
  @rtype: str or boolean
1605
  @return: a C{/dev/...} path for primary nodes, and
1606
      C{True} for secondary nodes
1607

1608
  """
1609
  try:
1610
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1611
    if isinstance(result, bdev.BlockDev):
1612
      # pylint: disable=E1103
1613
      result = result.dev_path
1614
      if as_primary:
1615
        _SymlinkBlockDev(owner, result, idx)
1616
  except errors.BlockDeviceError, err:
1617
    _Fail("Error while assembling disk: %s", err, exc=True)
1618
  except OSError, err:
1619
    _Fail("Error while symlinking disk: %s", err, exc=True)
1620

    
1621
  return result
1622

    
1623

    
1624
def BlockdevShutdown(disk):
1625
  """Shut down a block device.
1626

1627
  First, if the device is assembled (Attach() is successful), then
1628
  the device is shutdown. Then the children of the device are
1629
  shutdown.
1630

1631
  This function is called recursively. Note that we don't cache the
1632
  children or such, as oppossed to assemble, shutdown of different
1633
  devices doesn't require that the upper device was active.
1634

1635
  @type disk: L{objects.Disk}
1636
  @param disk: the description of the disk we should
1637
      shutdown
1638
  @rtype: None
1639

1640
  """
1641
  msgs = []
1642
  r_dev = _RecursiveFindBD(disk)
1643
  if r_dev is not None:
1644
    r_path = r_dev.dev_path
1645
    try:
1646
      r_dev.Shutdown()
1647
      DevCacheManager.RemoveCache(r_path)
1648
    except errors.BlockDeviceError, err:
1649
      msgs.append(str(err))
1650

    
1651
  if disk.children:
1652
    for child in disk.children:
1653
      try:
1654
        BlockdevShutdown(child)
1655
      except RPCFail, err:
1656
        msgs.append(str(err))
1657

    
1658
  if msgs:
1659
    _Fail("; ".join(msgs))
1660

    
1661

    
1662
def BlockdevAddchildren(parent_cdev, new_cdevs):
1663
  """Extend a mirrored block device.
1664

1665
  @type parent_cdev: L{objects.Disk}
1666
  @param parent_cdev: the disk to which we should add children
1667
  @type new_cdevs: list of L{objects.Disk}
1668
  @param new_cdevs: the list of children which we should add
1669
  @rtype: None
1670

1671
  """
1672
  parent_bdev = _RecursiveFindBD(parent_cdev)
1673
  if parent_bdev is None:
1674
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1675
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1676
  if new_bdevs.count(None) > 0:
1677
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1678
  parent_bdev.AddChildren(new_bdevs)
1679

    
1680

    
1681
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1682
  """Shrink a mirrored block device.
1683

1684
  @type parent_cdev: L{objects.Disk}
1685
  @param parent_cdev: the disk from which we should remove children
1686
  @type new_cdevs: list of L{objects.Disk}
1687
  @param new_cdevs: the list of children which we should remove
1688
  @rtype: None
1689

1690
  """
1691
  parent_bdev = _RecursiveFindBD(parent_cdev)
1692
  if parent_bdev is None:
1693
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1694
  devs = []
1695
  for disk in new_cdevs:
1696
    rpath = disk.StaticDevPath()
1697
    if rpath is None:
1698
      bd = _RecursiveFindBD(disk)
1699
      if bd is None:
1700
        _Fail("Can't find device %s while removing children", disk)
1701
      else:
1702
        devs.append(bd.dev_path)
1703
    else:
1704
      if not utils.IsNormAbsPath(rpath):
1705
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1706
      devs.append(rpath)
1707
  parent_bdev.RemoveChildren(devs)
1708

    
1709

    
1710
def BlockdevGetmirrorstatus(disks):
1711
  """Get the mirroring status of a list of devices.
1712

1713
  @type disks: list of L{objects.Disk}
1714
  @param disks: the list of disks which we should query
1715
  @rtype: disk
1716
  @return: List of L{objects.BlockDevStatus}, one for each disk
1717
  @raise errors.BlockDeviceError: if any of the disks cannot be
1718
      found
1719

1720
  """
1721
  stats = []
1722
  for dsk in disks:
1723
    rbd = _RecursiveFindBD(dsk)
1724
    if rbd is None:
1725
      _Fail("Can't find device %s", dsk)
1726

    
1727
    stats.append(rbd.CombinedSyncStatus())
1728

    
1729
  return stats
1730

    
1731

    
1732
def BlockdevGetmirrorstatusMulti(disks):
1733
  """Get the mirroring status of a list of devices.
1734

1735
  @type disks: list of L{objects.Disk}
1736
  @param disks: the list of disks which we should query
1737
  @rtype: disk
1738
  @return: List of tuples, (bool, status), one for each disk; bool denotes
1739
    success/failure, status is L{objects.BlockDevStatus} on success, string
1740
    otherwise
1741

1742
  """
1743
  result = []
1744
  for disk in disks:
1745
    try:
1746
      rbd = _RecursiveFindBD(disk)
1747
      if rbd is None:
1748
        result.append((False, "Can't find device %s" % disk))
1749
        continue
1750

    
1751
      status = rbd.CombinedSyncStatus()
1752
    except errors.BlockDeviceError, err:
1753
      logging.exception("Error while getting disk status")
1754
      result.append((False, str(err)))
1755
    else:
1756
      result.append((True, status))
1757

    
1758
  assert len(disks) == len(result)
1759

    
1760
  return result
1761

    
1762

    
1763
def _RecursiveFindBD(disk):
1764
  """Check if a device is activated.
1765

1766
  If so, return information about the real device.
1767

1768
  @type disk: L{objects.Disk}
1769
  @param disk: the disk object we need to find
1770

1771
  @return: None if the device can't be found,
1772
      otherwise the device instance
1773

1774
  """
1775
  children = []
1776
  if disk.children:
1777
    for chdisk in disk.children:
1778
      children.append(_RecursiveFindBD(chdisk))
1779

    
1780
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1781

    
1782

    
1783
def _OpenRealBD(disk):
1784
  """Opens the underlying block device of a disk.
1785

1786
  @type disk: L{objects.Disk}
1787
  @param disk: the disk object we want to open
1788

1789
  """
1790
  real_disk = _RecursiveFindBD(disk)
1791
  if real_disk is None:
1792
    _Fail("Block device '%s' is not set up", disk)
1793

    
1794
  real_disk.Open()
1795

    
1796
  return real_disk
1797

    
1798

    
1799
def BlockdevFind(disk):
1800
  """Check if a device is activated.
1801

1802
  If it is, return information about the real device.
1803

1804
  @type disk: L{objects.Disk}
1805
  @param disk: the disk to find
1806
  @rtype: None or objects.BlockDevStatus
1807
  @return: None if the disk cannot be found, otherwise a the current
1808
           information
1809

1810
  """
1811
  try:
1812
    rbd = _RecursiveFindBD(disk)
1813
  except errors.BlockDeviceError, err:
1814
    _Fail("Failed to find device: %s", err, exc=True)
1815

    
1816
  if rbd is None:
1817
    return None
1818

    
1819
  return rbd.GetSyncStatus()
1820

    
1821

    
1822
def BlockdevGetsize(disks):
1823
  """Computes the size of the given disks.
1824

1825
  If a disk is not found, returns None instead.
1826

1827
  @type disks: list of L{objects.Disk}
1828
  @param disks: the list of disk to compute the size for
1829
  @rtype: list
1830
  @return: list with elements None if the disk cannot be found,
1831
      otherwise the size
1832

1833
  """
1834
  result = []
1835
  for cf in disks:
1836
    try:
1837
      rbd = _RecursiveFindBD(cf)
1838
    except errors.BlockDeviceError:
1839
      result.append(None)
1840
      continue
1841
    if rbd is None:
1842
      result.append(None)
1843
    else:
1844
      result.append(rbd.GetActualSize())
1845
  return result
1846

    
1847

    
1848
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1849
  """Export a block device to a remote node.
1850

1851
  @type disk: L{objects.Disk}
1852
  @param disk: the description of the disk to export
1853
  @type dest_node: str
1854
  @param dest_node: the destination node to export to
1855
  @type dest_path: str
1856
  @param dest_path: the destination path on the target node
1857
  @type cluster_name: str
1858
  @param cluster_name: the cluster name, needed for SSH hostalias
1859
  @rtype: None
1860

1861
  """
1862
  real_disk = _OpenRealBD(disk)
1863

    
1864
  # the block size on the read dd is 1MiB to match our units
1865
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1866
                               "dd if=%s bs=1048576 count=%s",
1867
                               real_disk.dev_path, str(disk.size))
1868

    
1869
  # we set here a smaller block size as, due to ssh buffering, more
1870
  # than 64-128k will mostly ignored; we use nocreat to fail if the
1871
  # device is not already there or we pass a wrong path; we use
1872
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1873
  # to not buffer too much memory; this means that at best, we flush
1874
  # every 64k, which will not be very fast
1875
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1876
                                " oflag=dsync", dest_path)
1877

    
1878
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1879
                                                   constants.GANETI_RUNAS,
1880
                                                   destcmd)
1881

    
1882
  # all commands have been checked, so we're safe to combine them
1883
  command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1884

    
1885
  result = utils.RunCmd(["bash", "-c", command])
1886

    
1887
  if result.failed:
1888
    _Fail("Disk copy command '%s' returned error: %s"
1889
          " output: %s", command, result.fail_reason, result.output)
1890

    
1891

    
1892
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1893
  """Write a file to the filesystem.
1894

1895
  This allows the master to overwrite(!) a file. It will only perform
1896
  the operation if the file belongs to a list of configuration files.
1897

1898
  @type file_name: str
1899
  @param file_name: the target file name
1900
  @type data: str
1901
  @param data: the new contents of the file
1902
  @type mode: int
1903
  @param mode: the mode to give the file (can be None)
1904
  @type uid: string
1905
  @param uid: the owner of the file
1906
  @type gid: string
1907
  @param gid: the group of the file
1908
  @type atime: float
1909
  @param atime: the atime to set on the file (can be None)
1910
  @type mtime: float
1911
  @param mtime: the mtime to set on the file (can be None)
1912
  @rtype: None
1913

1914
  """
1915
  if not os.path.isabs(file_name):
1916
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1917

    
1918
  if file_name not in _ALLOWED_UPLOAD_FILES:
1919
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1920
          file_name)
1921

    
1922
  raw_data = _Decompress(data)
1923

    
1924
  if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
1925
    _Fail("Invalid username/groupname type")
1926

    
1927
  getents = runtime.GetEnts()
1928
  uid = getents.LookupUser(uid)
1929
  gid = getents.LookupGroup(gid)
1930

    
1931
  utils.SafeWriteFile(file_name, None,
1932
                      data=raw_data, mode=mode, uid=uid, gid=gid,
1933
                      atime=atime, mtime=mtime)
1934

    
1935

    
1936
def RunOob(oob_program, command, node, timeout):
1937
  """Executes oob_program with given command on given node.
1938

1939
  @param oob_program: The path to the executable oob_program
1940
  @param command: The command to invoke on oob_program
1941
  @param node: The node given as an argument to the program
1942
  @param timeout: Timeout after which we kill the oob program
1943

1944
  @return: stdout
1945
  @raise RPCFail: If execution fails for some reason
1946

1947
  """
1948
  result = utils.RunCmd([oob_program, command, node], timeout=timeout)
1949

    
1950
  if result.failed:
1951
    _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
1952
          result.fail_reason, result.output)
1953

    
1954
  return result.stdout
1955

    
1956

    
1957
def WriteSsconfFiles(values):
1958
  """Update all ssconf files.
1959

1960
  Wrapper around the SimpleStore.WriteFiles.
1961

1962
  """
1963
  ssconf.SimpleStore().WriteFiles(values)
1964

    
1965

    
1966
def _ErrnoOrStr(err):
1967
  """Format an EnvironmentError exception.
1968

1969
  If the L{err} argument has an errno attribute, it will be looked up
1970
  and converted into a textual C{E...} description. Otherwise the
1971
  string representation of the error will be returned.
1972

1973
  @type err: L{EnvironmentError}
1974
  @param err: the exception to format
1975

1976
  """
1977
  if hasattr(err, "errno"):
1978
    detail = errno.errorcode[err.errno]
1979
  else:
1980
    detail = str(err)
1981
  return detail
1982

    
1983

    
1984
def _OSOndiskAPIVersion(os_dir):
1985
  """Compute and return the API version of a given OS.
1986

1987
  This function will try to read the API version of the OS residing in
1988
  the 'os_dir' directory.
1989

1990
  @type os_dir: str
1991
  @param os_dir: the directory in which we should look for the OS
1992
  @rtype: tuple
1993
  @return: tuple (status, data) with status denoting the validity and
1994
      data holding either the vaid versions or an error message
1995

1996
  """
1997
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1998

    
1999
  try:
2000
    st = os.stat(api_file)
2001
  except EnvironmentError, err:
2002
    return False, ("Required file '%s' not found under path %s: %s" %
2003
                   (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
2004

    
2005
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2006
    return False, ("File '%s' in %s is not a regular file" %
2007
                   (constants.OS_API_FILE, os_dir))
2008

    
2009
  try:
2010
    api_versions = utils.ReadFile(api_file).splitlines()
2011
  except EnvironmentError, err:
2012
    return False, ("Error while reading the API version file at %s: %s" %
2013
                   (api_file, _ErrnoOrStr(err)))
2014

    
2015
  try:
2016
    api_versions = [int(version.strip()) for version in api_versions]
2017
  except (TypeError, ValueError), err:
2018
    return False, ("API version(s) can't be converted to integer: %s" %
2019
                   str(err))
2020

    
2021
  return True, api_versions
2022

    
2023

    
2024
def DiagnoseOS(top_dirs=None):
2025
  """Compute the validity for all OSes.
2026

2027
  @type top_dirs: list
2028
  @param top_dirs: the list of directories in which to
2029
      search (if not given defaults to
2030
      L{constants.OS_SEARCH_PATH})
2031
  @rtype: list of L{objects.OS}
2032
  @return: a list of tuples (name, path, status, diagnose, variants,
2033
      parameters, api_version) for all (potential) OSes under all
2034
      search paths, where:
2035
          - name is the (potential) OS name
2036
          - path is the full path to the OS
2037
          - status True/False is the validity of the OS
2038
          - diagnose is the error message for an invalid OS, otherwise empty
2039
          - variants is a list of supported OS variants, if any
2040
          - parameters is a list of (name, help) parameters, if any
2041
          - api_version is a list of support OS API versions
2042

2043
  """
2044
  if top_dirs is None:
2045
    top_dirs = constants.OS_SEARCH_PATH
2046

    
2047
  result = []
2048
  for dir_name in top_dirs:
2049
    if os.path.isdir(dir_name):
2050
      try:
2051
        f_names = utils.ListVisibleFiles(dir_name)
2052
      except EnvironmentError, err:
2053
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2054
        break
2055
      for name in f_names:
2056
        os_path = utils.PathJoin(dir_name, name)
2057
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2058
        if status:
2059
          diagnose = ""
2060
          variants = os_inst.supported_variants
2061
          parameters = os_inst.supported_parameters
2062
          api_versions = os_inst.api_versions
2063
        else:
2064
          diagnose = os_inst
2065
          variants = parameters = api_versions = []
2066
        result.append((name, os_path, status, diagnose, variants,
2067
                       parameters, api_versions))
2068

    
2069
  return result
2070

    
2071

    
2072
def _TryOSFromDisk(name, base_dir=None):
2073
  """Create an OS instance from disk.
2074

2075
  This function will return an OS instance if the given name is a
2076
  valid OS name.
2077

2078
  @type base_dir: string
2079
  @keyword base_dir: Base directory containing OS installations.
2080
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2081
  @rtype: tuple
2082
  @return: success and either the OS instance if we find a valid one,
2083
      or error message
2084

2085
  """
2086
  if base_dir is None:
2087
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
2088
  else:
2089
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2090

    
2091
  if os_dir is None:
2092
    return False, "Directory for OS %s not found in search path" % name
2093

    
2094
  status, api_versions = _OSOndiskAPIVersion(os_dir)
2095
  if not status:
2096
    # push the error up
2097
    return status, api_versions
2098

    
2099
  if not constants.OS_API_VERSIONS.intersection(api_versions):
2100
    return False, ("API version mismatch for path '%s': found %s, want %s." %
2101
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
2102

    
2103
  # OS Files dictionary, we will populate it with the absolute path
2104
  # names; if the value is True, then it is a required file, otherwise
2105
  # an optional one
2106
  os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2107

    
2108
  if max(api_versions) >= constants.OS_API_V15:
2109
    os_files[constants.OS_VARIANTS_FILE] = False
2110

    
2111
  if max(api_versions) >= constants.OS_API_V20:
2112
    os_files[constants.OS_PARAMETERS_FILE] = True
2113
  else:
2114
    del os_files[constants.OS_SCRIPT_VERIFY]
2115

    
2116
  for (filename, required) in os_files.items():
2117
    os_files[filename] = utils.PathJoin(os_dir, filename)
2118

    
2119
    try:
2120
      st = os.stat(os_files[filename])
2121
    except EnvironmentError, err:
2122
      if err.errno == errno.ENOENT and not required:
2123
        del os_files[filename]
2124
        continue
2125
      return False, ("File '%s' under path '%s' is missing (%s)" %
2126
                     (filename, os_dir, _ErrnoOrStr(err)))
2127

    
2128
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2129
      return False, ("File '%s' under path '%s' is not a regular file" %
2130
                     (filename, os_dir))
2131

    
2132
    if filename in constants.OS_SCRIPTS:
2133
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2134
        return False, ("File '%s' under path '%s' is not executable" %
2135
                       (filename, os_dir))
2136

    
2137
  variants = []
2138
  if constants.OS_VARIANTS_FILE in os_files:
2139
    variants_file = os_files[constants.OS_VARIANTS_FILE]
2140
    try:
2141
      variants = utils.ReadFile(variants_file).splitlines()
2142
    except EnvironmentError, err:
2143
      # we accept missing files, but not other errors
2144
      if err.errno != errno.ENOENT:
2145
        return False, ("Error while reading the OS variants file at %s: %s" %
2146
                       (variants_file, _ErrnoOrStr(err)))
2147

    
2148
  parameters = []
2149
  if constants.OS_PARAMETERS_FILE in os_files:
2150
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2151
    try:
2152
      parameters = utils.ReadFile(parameters_file).splitlines()
2153
    except EnvironmentError, err:
2154
      return False, ("Error while reading the OS parameters file at %s: %s" %
2155
                     (parameters_file, _ErrnoOrStr(err)))
2156
    parameters = [v.split(None, 1) for v in parameters]
2157

    
2158
  os_obj = objects.OS(name=name, path=os_dir,
2159
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
2160
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
2161
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
2162
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
2163
                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2164
                                                 None),
2165
                      supported_variants=variants,
2166
                      supported_parameters=parameters,
2167
                      api_versions=api_versions)
2168
  return True, os_obj
2169

    
2170

    
2171
def OSFromDisk(name, base_dir=None):
2172
  """Create an OS instance from disk.
2173

2174
  This function will return an OS instance if the given name is a
2175
  valid OS name. Otherwise, it will raise an appropriate
2176
  L{RPCFail} exception, detailing why this is not a valid OS.
2177

2178
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2179
  an exception but returns true/false status data.
2180

2181
  @type base_dir: string
2182
  @keyword base_dir: Base directory containing OS installations.
2183
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2184
  @rtype: L{objects.OS}
2185
  @return: the OS instance if we find a valid one
2186
  @raise RPCFail: if we don't find a valid OS
2187

2188
  """
2189
  name_only = objects.OS.GetName(name)
2190
  status, payload = _TryOSFromDisk(name_only, base_dir)
2191

    
2192
  if not status:
2193
    _Fail(payload)
2194

    
2195
  return payload
2196

    
2197

    
2198
def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2199
  """Calculate the basic environment for an os script.
2200

2201
  @type os_name: str
2202
  @param os_name: full operating system name (including variant)
2203
  @type inst_os: L{objects.OS}
2204
  @param inst_os: operating system for which the environment is being built
2205
  @type os_params: dict
2206
  @param os_params: the OS parameters
2207
  @type debug: integer
2208
  @param debug: debug level (0 or 1, for OS Api 10)
2209
  @rtype: dict
2210
  @return: dict of environment variables
2211
  @raise errors.BlockDeviceError: if the block device
2212
      cannot be found
2213

2214
  """
2215
  result = {}
2216
  api_version = \
2217
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2218
  result["OS_API_VERSION"] = "%d" % api_version
2219
  result["OS_NAME"] = inst_os.name
2220
  result["DEBUG_LEVEL"] = "%d" % debug
2221

    
2222
  # OS variants
2223
  if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2224
    variant = objects.OS.GetVariant(os_name)
2225
    if not variant:
2226
      variant = inst_os.supported_variants[0]
2227
  else:
2228
    variant = ""
2229
  result["OS_VARIANT"] = variant
2230

    
2231
  # OS params
2232
  for pname, pvalue in os_params.items():
2233
    result["OSP_%s" % pname.upper()] = pvalue
2234

    
2235
  return result
2236

    
2237

    
2238
def OSEnvironment(instance, inst_os, debug=0):
2239
  """Calculate the environment for an os script.
2240

2241
  @type instance: L{objects.Instance}
2242
  @param instance: target instance for the os script run
2243
  @type inst_os: L{objects.OS}
2244
  @param inst_os: operating system for which the environment is being built
2245
  @type debug: integer
2246
  @param debug: debug level (0 or 1, for OS Api 10)
2247
  @rtype: dict
2248
  @return: dict of environment variables
2249
  @raise errors.BlockDeviceError: if the block device
2250
      cannot be found
2251

2252
  """
2253
  result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2254

    
2255
  for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2256
    result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2257

    
2258
  result["HYPERVISOR"] = instance.hypervisor
2259
  result["DISK_COUNT"] = "%d" % len(instance.disks)
2260
  result["NIC_COUNT"] = "%d" % len(instance.nics)
2261
  result["INSTANCE_SECONDARY_NODES"] = \
2262
      ("%s" % " ".join(instance.secondary_nodes))
2263

    
2264
  # Disks
2265
  for idx, disk in enumerate(instance.disks):
2266
    real_disk = _OpenRealBD(disk)
2267
    result["DISK_%d_PATH" % idx] = real_disk.dev_path
2268
    result["DISK_%d_ACCESS" % idx] = disk.mode
2269
    if constants.HV_DISK_TYPE in instance.hvparams:
2270
      result["DISK_%d_FRONTEND_TYPE" % idx] = \
2271
        instance.hvparams[constants.HV_DISK_TYPE]
2272
    if disk.dev_type in constants.LDS_BLOCK:
2273
      result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2274
    elif disk.dev_type == constants.LD_FILE:
2275
      result["DISK_%d_BACKEND_TYPE" % idx] = \
2276
        "file:%s" % disk.physical_id[0]
2277

    
2278
  # NICs
2279
  for idx, nic in enumerate(instance.nics):
2280
    result["NIC_%d_MAC" % idx] = nic.mac
2281
    if nic.ip:
2282
      result["NIC_%d_IP" % idx] = nic.ip
2283
    result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2284
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2285
      result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2286
    if nic.nicparams[constants.NIC_LINK]:
2287
      result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2288
    if constants.HV_NIC_TYPE in instance.hvparams:
2289
      result["NIC_%d_FRONTEND_TYPE" % idx] = \
2290
        instance.hvparams[constants.HV_NIC_TYPE]
2291

    
2292
  # HV/BE params
2293
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2294
    for key, value in source.items():
2295
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2296

    
2297
  return result
2298

    
2299

    
2300
def BlockdevGrow(disk, amount, dryrun):
2301
  """Grow a stack of block devices.
2302

2303
  This function is called recursively, with the childrens being the
2304
  first ones to resize.
2305

2306
  @type disk: L{objects.Disk}
2307
  @param disk: the disk to be grown
2308
  @type amount: integer
2309
  @param amount: the amount (in mebibytes) to grow with
2310
  @type dryrun: boolean
2311
  @param dryrun: whether to execute the operation in simulation mode
2312
      only, without actually increasing the size
2313
  @rtype: (status, result)
2314
  @return: a tuple with the status of the operation (True/False), and
2315
      the errors message if status is False
2316

2317
  """
2318
  r_dev = _RecursiveFindBD(disk)
2319
  if r_dev is None:
2320
    _Fail("Cannot find block device %s", disk)
2321

    
2322
  try:
2323
    r_dev.Grow(amount, dryrun)
2324
  except errors.BlockDeviceError, err:
2325
    _Fail("Failed to grow block device: %s", err, exc=True)
2326

    
2327

    
2328
def BlockdevSnapshot(disk):
2329
  """Create a snapshot copy of a block device.
2330

2331
  This function is called recursively, and the snapshot is actually created
2332
  just for the leaf lvm backend device.
2333

2334
  @type disk: L{objects.Disk}
2335
  @param disk: the disk to be snapshotted
2336
  @rtype: string
2337
  @return: snapshot disk ID as (vg, lv)
2338

2339
  """
2340
  if disk.dev_type == constants.LD_DRBD8:
2341
    if not disk.children:
2342
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2343
            disk.unique_id)
2344
    return BlockdevSnapshot(disk.children[0])
2345
  elif disk.dev_type == constants.LD_LV:
2346
    r_dev = _RecursiveFindBD(disk)
2347
    if r_dev is not None:
2348
      # FIXME: choose a saner value for the snapshot size
2349
      # let's stay on the safe side and ask for the full size, for now
2350
      return r_dev.Snapshot(disk.size)
2351
    else:
2352
      _Fail("Cannot find block device %s", disk)
2353
  else:
2354
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2355
          disk.unique_id, disk.dev_type)
2356

    
2357

    
2358
def FinalizeExport(instance, snap_disks):
2359
  """Write out the export configuration information.
2360

2361
  @type instance: L{objects.Instance}
2362
  @param instance: the instance which we export, used for
2363
      saving configuration
2364
  @type snap_disks: list of L{objects.Disk}
2365
  @param snap_disks: list of snapshot block devices, which
2366
      will be used to get the actual name of the dump file
2367

2368
  @rtype: None
2369

2370
  """
2371
  destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2372
  finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2373

    
2374
  config = objects.SerializableConfigParser()
2375

    
2376
  config.add_section(constants.INISECT_EXP)
2377
  config.set(constants.INISECT_EXP, "version", "0")
2378
  config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2379
  config.set(constants.INISECT_EXP, "source", instance.primary_node)
2380
  config.set(constants.INISECT_EXP, "os", instance.os)
2381
  config.set(constants.INISECT_EXP, "compression", "none")
2382

    
2383
  config.add_section(constants.INISECT_INS)
2384
  config.set(constants.INISECT_INS, "name", instance.name)
2385
  config.set(constants.INISECT_INS, "memory", "%d" %
2386
             instance.beparams[constants.BE_MEMORY])
2387
  config.set(constants.INISECT_INS, "vcpus", "%d" %
2388
             instance.beparams[constants.BE_VCPUS])
2389
  config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2390
  config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2391
  config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2392

    
2393
  nic_total = 0
2394
  for nic_count, nic in enumerate(instance.nics):
2395
    nic_total += 1
2396
    config.set(constants.INISECT_INS, "nic%d_mac" %
2397
               nic_count, "%s" % nic.mac)
2398
    config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2399
    for param in constants.NICS_PARAMETER_TYPES:
2400
      config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2401
                 "%s" % nic.nicparams.get(param, None))
2402
  # TODO: redundant: on load can read nics until it doesn't exist
2403
  config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2404

    
2405
  disk_total = 0
2406
  for disk_count, disk in enumerate(snap_disks):
2407
    if disk:
2408
      disk_total += 1
2409
      config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2410
                 ("%s" % disk.iv_name))
2411
      config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2412
                 ("%s" % disk.physical_id[1]))
2413
      config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2414
                 ("%d" % disk.size))
2415

    
2416
  config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2417

    
2418
  # New-style hypervisor/backend parameters
2419

    
2420
  config.add_section(constants.INISECT_HYP)
2421
  for name, value in instance.hvparams.items():
2422
    if name not in constants.HVC_GLOBALS:
2423
      config.set(constants.INISECT_HYP, name, str(value))
2424

    
2425
  config.add_section(constants.INISECT_BEP)
2426
  for name, value in instance.beparams.items():
2427
    config.set(constants.INISECT_BEP, name, str(value))
2428

    
2429
  config.add_section(constants.INISECT_OSP)
2430
  for name, value in instance.osparams.items():
2431
    config.set(constants.INISECT_OSP, name, str(value))
2432

    
2433
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2434
                  data=config.Dumps())
2435
  shutil.rmtree(finaldestdir, ignore_errors=True)
2436
  shutil.move(destdir, finaldestdir)
2437

    
2438

    
2439
def ExportInfo(dest):
2440
  """Get export configuration information.
2441

2442
  @type dest: str
2443
  @param dest: directory containing the export
2444

2445
  @rtype: L{objects.SerializableConfigParser}
2446
  @return: a serializable config file containing the
2447
      export info
2448

2449
  """
2450
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2451

    
2452
  config = objects.SerializableConfigParser()
2453
  config.read(cff)
2454

    
2455
  if (not config.has_section(constants.INISECT_EXP) or
2456
      not config.has_section(constants.INISECT_INS)):
2457
    _Fail("Export info file doesn't have the required fields")
2458

    
2459
  return config.Dumps()
2460

    
2461

    
2462
def ListExports():
2463
  """Return a list of exports currently available on this machine.
2464

2465
  @rtype: list
2466
  @return: list of the exports
2467

2468
  """
2469
  if os.path.isdir(constants.EXPORT_DIR):
2470
    return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2471
  else:
2472
    _Fail("No exports directory")
2473

    
2474

    
2475
def RemoveExport(export):
2476
  """Remove an existing export from the node.
2477

2478
  @type export: str
2479
  @param export: the name of the export to remove
2480
  @rtype: None
2481

2482
  """
2483
  target = utils.PathJoin(constants.EXPORT_DIR, export)
2484

    
2485
  try:
2486
    shutil.rmtree(target)
2487
  except EnvironmentError, err:
2488
    _Fail("Error while removing the export: %s", err, exc=True)
2489

    
2490

    
2491
def BlockdevRename(devlist):
2492
  """Rename a list of block devices.
2493

2494
  @type devlist: list of tuples
2495
  @param devlist: list of tuples of the form  (disk,
2496
      new_logical_id, new_physical_id); disk is an
2497
      L{objects.Disk} object describing the current disk,
2498
      and new logical_id/physical_id is the name we
2499
      rename it to
2500
  @rtype: boolean
2501
  @return: True if all renames succeeded, False otherwise
2502

2503
  """
2504
  msgs = []
2505
  result = True
2506
  for disk, unique_id in devlist:
2507
    dev = _RecursiveFindBD(disk)
2508
    if dev is None:
2509
      msgs.append("Can't find device %s in rename" % str(disk))
2510
      result = False
2511
      continue
2512
    try:
2513
      old_rpath = dev.dev_path
2514
      dev.Rename(unique_id)
2515
      new_rpath = dev.dev_path
2516
      if old_rpath != new_rpath:
2517
        DevCacheManager.RemoveCache(old_rpath)
2518
        # FIXME: we should add the new cache information here, like:
2519
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2520
        # but we don't have the owner here - maybe parse from existing
2521
        # cache? for now, we only lose lvm data when we rename, which
2522
        # is less critical than DRBD or MD
2523
    except errors.BlockDeviceError, err:
2524
      msgs.append("Can't rename device '%s' to '%s': %s" %
2525
                  (dev, unique_id, err))
2526
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2527
      result = False
2528
  if not result:
2529
    _Fail("; ".join(msgs))
2530

    
2531

    
2532
def _TransformFileStorageDir(fs_dir):
2533
  """Checks whether given file_storage_dir is valid.
2534

2535
  Checks wheter the given fs_dir is within the cluster-wide default
2536
  file_storage_dir or the shared_file_storage_dir, which are stored in
2537
  SimpleStore. Only paths under those directories are allowed.
2538

2539
  @type fs_dir: str
2540
  @param fs_dir: the path to check
2541

2542
  @return: the normalized path if valid, None otherwise
2543

2544
  """
2545
  if not constants.ENABLE_FILE_STORAGE:
2546
    _Fail("File storage disabled at configure time")
2547
  cfg = _GetConfig()
2548
  fs_dir = os.path.normpath(fs_dir)
2549
  base_fstore = cfg.GetFileStorageDir()
2550
  base_shared = cfg.GetSharedFileStorageDir()
2551
  if not (utils.IsBelowDir(base_fstore, fs_dir) or
2552
          utils.IsBelowDir(base_shared, fs_dir)):
2553
    _Fail("File storage directory '%s' is not under base file"
2554
          " storage directory '%s' or shared storage directory '%s'",
2555
          fs_dir, base_fstore, base_shared)
2556
  return fs_dir
2557

    
2558

    
2559
def CreateFileStorageDir(file_storage_dir):
2560
  """Create file storage directory.
2561

2562
  @type file_storage_dir: str
2563
  @param file_storage_dir: directory to create
2564

2565
  @rtype: tuple
2566
  @return: tuple with first element a boolean indicating wheter dir
2567
      creation was successful or not
2568

2569
  """
2570
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2571
  if os.path.exists(file_storage_dir):
2572
    if not os.path.isdir(file_storage_dir):
2573
      _Fail("Specified storage dir '%s' is not a directory",
2574
            file_storage_dir)
2575
  else:
2576
    try:
2577
      os.makedirs(file_storage_dir, 0750)
2578
    except OSError, err:
2579
      _Fail("Cannot create file storage directory '%s': %s",
2580
            file_storage_dir, err, exc=True)
2581

    
2582

    
2583
def RemoveFileStorageDir(file_storage_dir):
2584
  """Remove file storage directory.
2585

2586
  Remove it only if it's empty. If not log an error and return.
2587

2588
  @type file_storage_dir: str
2589
  @param file_storage_dir: the directory we should cleanup
2590
  @rtype: tuple (success,)
2591
  @return: tuple of one element, C{success}, denoting
2592
      whether the operation was successful
2593

2594
  """
2595
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2596
  if os.path.exists(file_storage_dir):
2597
    if not os.path.isdir(file_storage_dir):
2598
      _Fail("Specified Storage directory '%s' is not a directory",
2599
            file_storage_dir)
2600
    # deletes dir only if empty, otherwise we want to fail the rpc call
2601
    try:
2602
      os.rmdir(file_storage_dir)
2603
    except OSError, err:
2604
      _Fail("Cannot remove file storage directory '%s': %s",
2605
            file_storage_dir, err)
2606

    
2607

    
2608
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2609
  """Rename the file storage directory.
2610

2611
  @type old_file_storage_dir: str
2612
  @param old_file_storage_dir: the current path
2613
  @type new_file_storage_dir: str
2614
  @param new_file_storage_dir: the name we should rename to
2615
  @rtype: tuple (success,)
2616
  @return: tuple of one element, C{success}, denoting
2617
      whether the operation was successful
2618

2619
  """
2620
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2621
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2622
  if not os.path.exists(new_file_storage_dir):
2623
    if os.path.isdir(old_file_storage_dir):
2624
      try:
2625
        os.rename(old_file_storage_dir, new_file_storage_dir)
2626
      except OSError, err:
2627
        _Fail("Cannot rename '%s' to '%s': %s",
2628
              old_file_storage_dir, new_file_storage_dir, err)
2629
    else:
2630
      _Fail("Specified storage dir '%s' is not a directory",
2631
            old_file_storage_dir)
2632
  else:
2633
    if os.path.exists(old_file_storage_dir):
2634
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2635
            old_file_storage_dir, new_file_storage_dir)
2636

    
2637

    
2638
def _EnsureJobQueueFile(file_name):
2639
  """Checks whether the given filename is in the queue directory.
2640

2641
  @type file_name: str
2642
  @param file_name: the file name we should check
2643
  @rtype: None
2644
  @raises RPCFail: if the file is not valid
2645

2646
  """
2647
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2648
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2649

    
2650
  if not result:
2651
    _Fail("Passed job queue file '%s' does not belong to"
2652
          " the queue directory '%s'", file_name, queue_dir)
2653

    
2654

    
2655
def JobQueueUpdate(file_name, content):
2656
  """Updates a file in the queue directory.
2657

2658
  This is just a wrapper over L{utils.io.WriteFile}, with proper
2659
  checking.
2660

2661
  @type file_name: str
2662
  @param file_name: the job file name
2663
  @type content: str
2664
  @param content: the new job contents
2665
  @rtype: boolean
2666
  @return: the success of the operation
2667

2668
  """
2669
  _EnsureJobQueueFile(file_name)
2670
  getents = runtime.GetEnts()
2671

    
2672
  # Write and replace the file atomically
2673
  utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2674
                  gid=getents.masterd_gid)
2675

    
2676

    
2677
def JobQueueRename(old, new):
2678
  """Renames a job queue file.
2679

2680
  This is just a wrapper over os.rename with proper checking.
2681

2682
  @type old: str
2683
  @param old: the old (actual) file name
2684
  @type new: str
2685
  @param new: the desired file name
2686
  @rtype: tuple
2687
  @return: the success of the operation and payload
2688

2689
  """
2690
  _EnsureJobQueueFile(old)
2691
  _EnsureJobQueueFile(new)
2692

    
2693
  utils.RenameFile(old, new, mkdir=True)
2694

    
2695

    
2696
def BlockdevClose(instance_name, disks):
2697
  """Closes the given block devices.
2698

2699
  This means they will be switched to secondary mode (in case of
2700
  DRBD).
2701

2702
  @param instance_name: if the argument is not empty, the symlinks
2703
      of this instance will be removed
2704
  @type disks: list of L{objects.Disk}
2705
  @param disks: the list of disks to be closed
2706
  @rtype: tuple (success, message)
2707
  @return: a tuple of success and message, where success
2708
      indicates the succes of the operation, and message
2709
      which will contain the error details in case we
2710
      failed
2711

2712
  """
2713
  bdevs = []
2714
  for cf in disks:
2715
    rd = _RecursiveFindBD(cf)
2716
    if rd is None:
2717
      _Fail("Can't find device %s", cf)
2718
    bdevs.append(rd)
2719

    
2720
  msg = []
2721
  for rd in bdevs:
2722
    try:
2723
      rd.Close()
2724
    except errors.BlockDeviceError, err:
2725
      msg.append(str(err))
2726
  if msg:
2727
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2728
  else:
2729
    if instance_name:
2730
      _RemoveBlockDevLinks(instance_name, disks)
2731

    
2732

    
2733
def ValidateHVParams(hvname, hvparams):
2734
  """Validates the given hypervisor parameters.
2735

2736
  @type hvname: string
2737
  @param hvname: the hypervisor name
2738
  @type hvparams: dict
2739
  @param hvparams: the hypervisor parameters to be validated
2740
  @rtype: None
2741

2742
  """
2743
  try:
2744
    hv_type = hypervisor.GetHypervisor(hvname)
2745
    hv_type.ValidateParameters(hvparams)
2746
  except errors.HypervisorError, err:
2747
    _Fail(str(err), log=False)
2748

    
2749

    
2750
def _CheckOSPList(os_obj, parameters):
2751
  """Check whether a list of parameters is supported by the OS.
2752

2753
  @type os_obj: L{objects.OS}
2754
  @param os_obj: OS object to check
2755
  @type parameters: list
2756
  @param parameters: the list of parameters to check
2757

2758
  """
2759
  supported = [v[0] for v in os_obj.supported_parameters]
2760
  delta = frozenset(parameters).difference(supported)
2761
  if delta:
2762
    _Fail("The following parameters are not supported"
2763
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2764

    
2765

    
2766
def ValidateOS(required, osname, checks, osparams):
2767
  """Validate the given OS' parameters.
2768

2769
  @type required: boolean
2770
  @param required: whether absence of the OS should translate into
2771
      failure or not
2772
  @type osname: string
2773
  @param osname: the OS to be validated
2774
  @type checks: list
2775
  @param checks: list of the checks to run (currently only 'parameters')
2776
  @type osparams: dict
2777
  @param osparams: dictionary with OS parameters
2778
  @rtype: boolean
2779
  @return: True if the validation passed, or False if the OS was not
2780
      found and L{required} was false
2781

2782
  """
2783
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2784
    _Fail("Unknown checks required for OS %s: %s", osname,
2785
          set(checks).difference(constants.OS_VALIDATE_CALLS))
2786

    
2787
  name_only = objects.OS.GetName(osname)
2788
  status, tbv = _TryOSFromDisk(name_only, None)
2789

    
2790
  if not status:
2791
    if required:
2792
      _Fail(tbv)
2793
    else:
2794
      return False
2795

    
2796
  if max(tbv.api_versions) < constants.OS_API_V20:
2797
    return True
2798

    
2799
  if constants.OS_VALIDATE_PARAMETERS in checks:
2800
    _CheckOSPList(tbv, osparams.keys())
2801

    
2802
  validate_env = OSCoreEnv(osname, tbv, osparams)
2803
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2804
                        cwd=tbv.path, reset_env=True)
2805
  if result.failed:
2806
    logging.error("os validate command '%s' returned error: %s output: %s",
2807
                  result.cmd, result.fail_reason, result.output)
2808
    _Fail("OS validation script failed (%s), output: %s",
2809
          result.fail_reason, result.output, log=False)
2810

    
2811
  return True
2812

    
2813

    
2814
def DemoteFromMC():
2815
  """Demotes the current node from master candidate role.
2816

2817
  """
2818
  # try to ensure we're not the master by mistake
2819
  master, myself = ssconf.GetMasterAndMyself()
2820
  if master == myself:
2821
    _Fail("ssconf status shows I'm the master node, will not demote")
2822

    
2823
  result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2824
  if not result.failed:
2825
    _Fail("The master daemon is running, will not demote")
2826

    
2827
  try:
2828
    if os.path.isfile(constants.CLUSTER_CONF_FILE):
2829
      utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2830
  except EnvironmentError, err:
2831
    if err.errno != errno.ENOENT:
2832
      _Fail("Error while backing up cluster file: %s", err, exc=True)
2833

    
2834
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2835

    
2836

    
2837
def _GetX509Filenames(cryptodir, name):
2838
  """Returns the full paths for the private key and certificate.
2839

2840
  """
2841
  return (utils.PathJoin(cryptodir, name),
2842
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2843
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2844

    
2845

    
2846
def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2847
  """Creates a new X509 certificate for SSL/TLS.
2848

2849
  @type validity: int
2850
  @param validity: Validity in seconds
2851
  @rtype: tuple; (string, string)
2852
  @return: Certificate name and public part
2853

2854
  """
2855
  (key_pem, cert_pem) = \
2856
    utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2857
                                     min(validity, _MAX_SSL_CERT_VALIDITY))
2858

    
2859
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
2860
                              prefix="x509-%s-" % utils.TimestampForFilename())
2861
  try:
2862
    name = os.path.basename(cert_dir)
2863
    assert len(name) > 5
2864

    
2865
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2866

    
2867
    utils.WriteFile(key_file, mode=0400, data=key_pem)
2868
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2869

    
2870
    # Never return private key as it shouldn't leave the node
2871
    return (name, cert_pem)
2872
  except Exception:
2873
    shutil.rmtree(cert_dir, ignore_errors=True)
2874
    raise
2875

    
2876

    
2877
def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2878
  """Removes a X509 certificate.
2879

2880
  @type name: string
2881
  @param name: Certificate name
2882

2883
  """
2884
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2885

    
2886
  utils.RemoveFile(key_file)
2887
  utils.RemoveFile(cert_file)
2888

    
2889
  try:
2890
    os.rmdir(cert_dir)
2891
  except EnvironmentError, err:
2892
    _Fail("Cannot remove certificate directory '%s': %s",
2893
          cert_dir, err)
2894

    
2895

    
2896
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2897
  """Returns the command for the requested input/output.
2898

2899
  @type instance: L{objects.Instance}
2900
  @param instance: The instance object
2901
  @param mode: Import/export mode
2902
  @param ieio: Input/output type
2903
  @param ieargs: Input/output arguments
2904

2905
  """
2906
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2907

    
2908
  env = None
2909
  prefix = None
2910
  suffix = None
2911
  exp_size = None
2912

    
2913
  if ieio == constants.IEIO_FILE:
2914
    (filename, ) = ieargs
2915

    
2916
    if not utils.IsNormAbsPath(filename):
2917
      _Fail("Path '%s' is not normalized or absolute", filename)
2918

    
2919
    real_filename = os.path.realpath(filename)
2920
    directory = os.path.dirname(real_filename)
2921

    
2922
    if not utils.IsBelowDir(constants.EXPORT_DIR, real_filename):
2923
      _Fail("File '%s' is not under exports directory '%s': %s",
2924
            filename, constants.EXPORT_DIR, real_filename)
2925

    
2926
    # Create directory
2927
    utils.Makedirs(directory, mode=0750)
2928

    
2929
    quoted_filename = utils.ShellQuote(filename)
2930

    
2931
    if mode == constants.IEM_IMPORT:
2932
      suffix = "> %s" % quoted_filename
2933
    elif mode == constants.IEM_EXPORT:
2934
      suffix = "< %s" % quoted_filename
2935

    
2936
      # Retrieve file size
2937
      try:
2938
        st = os.stat(filename)
2939
      except EnvironmentError, err:
2940
        logging.error("Can't stat(2) %s: %s", filename, err)
2941
      else:
2942
        exp_size = utils.BytesToMebibyte(st.st_size)
2943

    
2944
  elif ieio == constants.IEIO_RAW_DISK:
2945
    (disk, ) = ieargs
2946

    
2947
    real_disk = _OpenRealBD(disk)
2948

    
2949
    if mode == constants.IEM_IMPORT:
2950
      # we set here a smaller block size as, due to transport buffering, more
2951
      # than 64-128k will mostly ignored; we use nocreat to fail if the device
2952
      # is not already there or we pass a wrong path; we use notrunc to no
2953
      # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2954
      # much memory; this means that at best, we flush every 64k, which will
2955
      # not be very fast
2956
      suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2957
                                    " bs=%s oflag=dsync"),
2958
                                    real_disk.dev_path,
2959
                                    str(64 * 1024))
2960

    
2961
    elif mode == constants.IEM_EXPORT:
2962
      # the block size on the read dd is 1MiB to match our units
2963
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2964
                                   real_disk.dev_path,
2965
                                   str(1024 * 1024), # 1 MB
2966
                                   str(disk.size))
2967
      exp_size = disk.size
2968

    
2969
  elif ieio == constants.IEIO_SCRIPT:
2970
    (disk, disk_index, ) = ieargs
2971

    
2972
    assert isinstance(disk_index, (int, long))
2973

    
2974
    real_disk = _OpenRealBD(disk)
2975

    
2976
    inst_os = OSFromDisk(instance.os)
2977
    env = OSEnvironment(instance, inst_os)
2978

    
2979
    if mode == constants.IEM_IMPORT:
2980
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2981
      env["IMPORT_INDEX"] = str(disk_index)
2982
      script = inst_os.import_script
2983

    
2984
    elif mode == constants.IEM_EXPORT:
2985
      env["EXPORT_DEVICE"] = real_disk.dev_path
2986
      env["EXPORT_INDEX"] = str(disk_index)
2987
      script = inst_os.export_script
2988

    
2989
    # TODO: Pass special environment only to script
2990
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2991

    
2992
    if mode == constants.IEM_IMPORT:
2993
      suffix = "| %s" % script_cmd
2994

    
2995
    elif mode == constants.IEM_EXPORT:
2996
      prefix = "%s |" % script_cmd
2997

    
2998
    # Let script predict size
2999
    exp_size = constants.IE_CUSTOM_SIZE
3000

    
3001
  else:
3002
    _Fail("Invalid %s I/O mode %r", mode, ieio)
3003

    
3004
  return (env, prefix, suffix, exp_size)
3005

    
3006

    
3007
def _CreateImportExportStatusDir(prefix):
3008
  """Creates status directory for import/export.
3009

3010
  """
3011
  return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
3012
                          prefix=("%s-%s-" %
3013
                                  (prefix, utils.TimestampForFilename())))
3014

    
3015

    
3016
def StartImportExportDaemon(mode, opts, host, port, instance, component,
3017
                            ieio, ieioargs):
3018
  """Starts an import or export daemon.
3019

3020
  @param mode: Import/output mode
3021
  @type opts: L{objects.ImportExportOptions}
3022
  @param opts: Daemon options
3023
  @type host: string
3024
  @param host: Remote host for export (None for import)
3025
  @type port: int
3026
  @param port: Remote port for export (None for import)
3027
  @type instance: L{objects.Instance}
3028
  @param instance: Instance object
3029
  @type component: string
3030
  @param component: which part of the instance is transferred now,
3031
      e.g. 'disk/0'
3032
  @param ieio: Input/output type
3033
  @param ieioargs: Input/output arguments
3034

3035
  """
3036
  if mode == constants.IEM_IMPORT:
3037
    prefix = "import"
3038

    
3039
    if not (host is None and port is None):
3040
      _Fail("Can not specify host or port on import")
3041

    
3042
  elif mode == constants.IEM_EXPORT:
3043
    prefix = "export"
3044

    
3045
    if host is None or port is None:
3046
      _Fail("Host and port must be specified for an export")
3047

    
3048
  else:
3049
    _Fail("Invalid mode %r", mode)
3050

    
3051
  if (opts.key_name is None) ^ (opts.ca_pem is None):
3052
    _Fail("Cluster certificate can only be used for both key and CA")
3053

    
3054
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3055
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3056

    
3057
  if opts.key_name is None:
3058
    # Use server.pem
3059
    key_path = constants.NODED_CERT_FILE
3060
    cert_path = constants.NODED_CERT_FILE
3061
    assert opts.ca_pem is None
3062
  else:
3063
    (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
3064
                                                 opts.key_name)
3065
    assert opts.ca_pem is not None
3066

    
3067
  for i in [key_path, cert_path]:
3068
    if not os.path.exists(i):
3069
      _Fail("File '%s' does not exist" % i)
3070

    
3071
  status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3072
  try:
3073
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3074
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3075
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3076

    
3077
    if opts.ca_pem is None:
3078
      # Use server.pem
3079
      ca = utils.ReadFile(constants.NODED_CERT_FILE)
3080
    else:
3081
      ca = opts.ca_pem
3082

    
3083
    # Write CA file
3084
    utils.WriteFile(ca_file, data=ca, mode=0400)
3085

    
3086
    cmd = [
3087
      constants.IMPORT_EXPORT_DAEMON,
3088
      status_file, mode,
3089
      "--key=%s" % key_path,
3090
      "--cert=%s" % cert_path,
3091
      "--ca=%s" % ca_file,
3092
      ]
3093

    
3094
    if host:
3095
      cmd.append("--host=%s" % host)
3096

    
3097
    if port:
3098
      cmd.append("--port=%s" % port)
3099

    
3100
    if opts.ipv6:
3101
      cmd.append("--ipv6")
3102
    else:
3103
      cmd.append("--ipv4")
3104

    
3105
    if opts.compress:
3106
      cmd.append("--compress=%s" % opts.compress)
3107

    
3108
    if opts.magic:
3109
      cmd.append("--magic=%s" % opts.magic)
3110

    
3111
    if exp_size is not None:
3112
      cmd.append("--expected-size=%s" % exp_size)
3113

    
3114
    if cmd_prefix:
3115
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
3116

    
3117
    if cmd_suffix:
3118
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
3119

    
3120
    if mode == constants.IEM_EXPORT:
3121
      # Retry connection a few times when connecting to remote peer
3122
      cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3123
      cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3124
    elif opts.connect_timeout is not None:
3125
      assert mode == constants.IEM_IMPORT
3126
      # Overall timeout for establishing connection while listening
3127
      cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3128

    
3129
    logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3130

    
3131
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3132
    # support for receiving a file descriptor for output
3133
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3134
                      output=logfile)
3135

    
3136
    # The import/export name is simply the status directory name
3137
    return os.path.basename(status_dir)
3138

    
3139
  except Exception:
3140
    shutil.rmtree(status_dir, ignore_errors=True)
3141
    raise
3142

    
3143

    
3144
def GetImportExportStatus(names):
3145
  """Returns import/export daemon status.
3146

3147
  @type names: sequence
3148
  @param names: List of names
3149
  @rtype: List of dicts
3150
  @return: Returns a list of the state of each named import/export or None if a
3151
           status couldn't be read
3152

3153
  """
3154
  result = []
3155

    
3156
  for name in names:
3157
    status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
3158
                                 _IES_STATUS_FILE)
3159

    
3160
    try:
3161
      data = utils.ReadFile(status_file)
3162
    except EnvironmentError, err:
3163
      if err.errno != errno.ENOENT:
3164
        raise
3165
      data = None
3166

    
3167
    if not data:
3168
      result.append(None)
3169
      continue
3170

    
3171
    result.append(serializer.LoadJson(data))
3172

    
3173
  return result
3174

    
3175

    
3176
def AbortImportExport(name):
3177
  """Sends SIGTERM to a running import/export daemon.
3178

3179
  """
3180
  logging.info("Abort import/export %s", name)
3181

    
3182
  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3183
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3184

    
3185
  if pid:
3186
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3187
                 name, pid)
3188
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3189

    
3190

    
3191
def CleanupImportExport(name):
3192
  """Cleanup after an import or export.
3193

3194
  If the import/export daemon is still running it's killed. Afterwards the
3195
  whole status directory is removed.
3196

3197
  """
3198
  logging.info("Finalizing import/export %s", name)
3199

    
3200
  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3201

    
3202
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3203

    
3204
  if pid:
3205
    logging.info("Import/export %s is still running with PID %s",
3206
                 name, pid)
3207
    utils.KillProcess(pid, waitpid=False)
3208

    
3209
  shutil.rmtree(status_dir, ignore_errors=True)
3210

    
3211

    
3212
def _FindDisks(nodes_ip, disks):
3213
  """Sets the physical ID on disks and returns the block devices.
3214

3215
  """
3216
  # set the correct physical ID
3217
  my_name = netutils.Hostname.GetSysName()
3218
  for cf in disks:
3219
    cf.SetPhysicalID(my_name, nodes_ip)
3220

    
3221
  bdevs = []
3222

    
3223
  for cf in disks:
3224
    rd = _RecursiveFindBD(cf)
3225
    if rd is None:
3226
      _Fail("Can't find device %s", cf)
3227
    bdevs.append(rd)
3228
  return bdevs
3229

    
3230

    
3231
def DrbdDisconnectNet(nodes_ip, disks):
3232
  """Disconnects the network on a list of drbd devices.
3233

3234
  """
3235
  bdevs = _FindDisks(nodes_ip, disks)
3236

    
3237
  # disconnect disks
3238
  for rd in bdevs:
3239
    try:
3240
      rd.DisconnectNet()
3241
    except errors.BlockDeviceError, err:
3242
      _Fail("Can't change network configuration to standalone mode: %s",
3243
            err, exc=True)
3244

    
3245

    
3246
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3247
  """Attaches the network on a list of drbd devices.
3248

3249
  """
3250
  bdevs = _FindDisks(nodes_ip, disks)
3251

    
3252
  if multimaster:
3253
    for idx, rd in enumerate(bdevs):
3254
      try:
3255
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3256
      except EnvironmentError, err:
3257
        _Fail("Can't create symlink: %s", err)
3258
  # reconnect disks, switch to new master configuration and if
3259
  # needed primary mode
3260
  for rd in bdevs:
3261
    try:
3262
      rd.AttachNet(multimaster)
3263
    except errors.BlockDeviceError, err:
3264
      _Fail("Can't change network configuration: %s", err)
3265

    
3266
  # wait until the disks are connected; we need to retry the re-attach
3267
  # if the device becomes standalone, as this might happen if the one
3268
  # node disconnects and reconnects in a different mode before the
3269
  # other node reconnects; in this case, one or both of the nodes will
3270
  # decide it has wrong configuration and switch to standalone
3271

    
3272
  def _Attach():
3273
    all_connected = True
3274

    
3275
    for rd in bdevs:
3276
      stats = rd.GetProcStatus()
3277

    
3278
      all_connected = (all_connected and
3279
                       (stats.is_connected or stats.is_in_resync))
3280

    
3281
      if stats.is_standalone:
3282
        # peer had different config info and this node became
3283
        # standalone, even though this should not happen with the
3284
        # new staged way of changing disk configs
3285
        try:
3286
          rd.AttachNet(multimaster)
3287
        except errors.BlockDeviceError, err:
3288
          _Fail("Can't change network configuration: %s", err)
3289

    
3290
    if not all_connected:
3291
      raise utils.RetryAgain()
3292

    
3293
  try:
3294
    # Start with a delay of 100 miliseconds and go up to 5 seconds
3295
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3296
  except utils.RetryTimeout:
3297
    _Fail("Timeout in disk reconnecting")
3298

    
3299
  if multimaster:
3300
    # change to primary mode
3301
    for rd in bdevs:
3302
      try:
3303
        rd.Open()
3304
      except errors.BlockDeviceError, err:
3305
        _Fail("Can't change to primary mode: %s", err)
3306

    
3307

    
3308
def DrbdWaitSync(nodes_ip, disks):
3309
  """Wait until DRBDs have synchronized.
3310

3311
  """
3312
  def _helper(rd):
3313
    stats = rd.GetProcStatus()
3314
    if not (stats.is_connected or stats.is_in_resync):
3315
      raise utils.RetryAgain()
3316
    return stats
3317

    
3318
  bdevs = _FindDisks(nodes_ip, disks)
3319

    
3320
  min_resync = 100
3321
  alldone = True
3322
  for rd in bdevs:
3323
    try:
3324
      # poll each second for 15 seconds
3325
      stats = utils.Retry(_helper, 1, 15, args=[rd])
3326
    except utils.RetryTimeout:
3327
      stats = rd.GetProcStatus()
3328
      # last check
3329
      if not (stats.is_connected or stats.is_in_resync):
3330
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3331
    alldone = alldone and (not stats.is_in_resync)
3332
    if stats.sync_percent is not None:
3333
      min_resync = min(min_resync, stats.sync_percent)
3334

    
3335
  return (alldone, min_resync)
3336

    
3337

    
3338
def GetDrbdUsermodeHelper():
3339
  """Returns DRBD usermode helper currently configured.
3340

3341
  """
3342
  try:
3343
    return bdev.BaseDRBD.GetUsermodeHelper()
3344
  except errors.BlockDeviceError, err:
3345
    _Fail(str(err))
3346

    
3347

    
3348
def PowercycleNode(hypervisor_type):
3349
  """Hard-powercycle the node.
3350

3351
  Because we need to return first, and schedule the powercycle in the
3352
  background, we won't be able to report failures nicely.
3353

3354
  """
3355
  hyper = hypervisor.GetHypervisor(hypervisor_type)
3356
  try:
3357
    pid = os.fork()
3358
  except OSError:
3359
    # if we can't fork, we'll pretend that we're in the child process
3360
    pid = 0
3361
  if pid > 0:
3362
    return "Reboot scheduled in 5 seconds"
3363
  # ensure the child is running on ram
3364
  try:
3365
    utils.Mlockall()
3366
  except Exception: # pylint: disable=W0703
3367
    pass
3368
  time.sleep(5)
3369
  hyper.PowercycleNode()
3370

    
3371

    
3372
class HooksRunner(object):
3373
  """Hook runner.
3374

3375
  This class is instantiated on the node side (ganeti-noded) and not
3376
  on the master side.
3377

3378
  """
3379
  def __init__(self, hooks_base_dir=None):
3380
    """Constructor for hooks runner.
3381

3382
    @type hooks_base_dir: str or None
3383
    @param hooks_base_dir: if not None, this overrides the
3384
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
3385

3386
    """
3387
    if hooks_base_dir is None:
3388
      hooks_base_dir = constants.HOOKS_BASE_DIR
3389
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
3390
    # constant
3391
    self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
3392

    
3393
  def RunHooks(self, hpath, phase, env):
3394
    """Run the scripts in the hooks directory.
3395

3396
    @type hpath: str
3397
    @param hpath: the path to the hooks directory which
3398
        holds the scripts
3399
    @type phase: str
3400
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
3401
        L{constants.HOOKS_PHASE_POST}
3402
    @type env: dict
3403
    @param env: dictionary with the environment for the hook
3404
    @rtype: list
3405
    @return: list of 3-element tuples:
3406
      - script path
3407
      - script result, either L{constants.HKR_SUCCESS} or
3408
        L{constants.HKR_FAIL}
3409
      - output of the script
3410

3411
    @raise errors.ProgrammerError: for invalid input
3412
        parameters
3413

3414
    """
3415
    if phase == constants.HOOKS_PHASE_PRE:
3416
      suffix = "pre"
3417
    elif phase == constants.HOOKS_PHASE_POST:
3418
      suffix = "post"
3419
    else:
3420
      _Fail("Unknown hooks phase '%s'", phase)
3421

    
3422
    subdir = "%s-%s.d" % (hpath, suffix)
3423
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3424

    
3425
    results = []
3426

    
3427
    if not os.path.isdir(dir_name):
3428
      # for non-existing/non-dirs, we simply exit instead of logging a
3429
      # warning at every operation
3430
      return results
3431

    
3432
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3433

    
3434
    for (relname, relstatus, runresult)  in runparts_results:
3435
      if relstatus == constants.RUNPARTS_SKIP:
3436
        rrval = constants.HKR_SKIP
3437
        output = ""
3438
      elif relstatus == constants.RUNPARTS_ERR:
3439
        rrval = constants.HKR_FAIL
3440
        output = "Hook script execution error: %s" % runresult
3441
      elif relstatus == constants.RUNPARTS_RUN:
3442
        if runresult.failed:
3443
          rrval = constants.HKR_FAIL
3444
        else:
3445
          rrval = constants.HKR_SUCCESS
3446
        output = utils.SafeEncode(runresult.output.strip())
3447
      results.append(("%s/%s" % (subdir, relname), rrval, output))
3448

    
3449
    return results
3450

    
3451

    
3452
class IAllocatorRunner(object):
3453
  """IAllocator runner.
3454

3455
  This class is instantiated on the node side (ganeti-noded) and not on
3456
  the master side.
3457

3458
  """
3459
  @staticmethod
3460
  def Run(name, idata):
3461
    """Run an iallocator script.
3462

3463
    @type name: str
3464
    @param name: the iallocator script name
3465
    @type idata: str
3466
    @param idata: the allocator input data
3467

3468
    @rtype: tuple
3469
    @return: two element tuple of:
3470
       - status
3471
       - either error message or stdout of allocator (for success)
3472

3473
    """
3474
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3475
                                  os.path.isfile)
3476
    if alloc_script is None:
3477
      _Fail("iallocator module '%s' not found in the search path", name)
3478

    
3479
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3480
    try:
3481
      os.write(fd, idata)
3482
      os.close(fd)
3483
      result = utils.RunCmd([alloc_script, fin_name])
3484
      if result.failed:
3485
        _Fail("iallocator module '%s' failed: %s, output '%s'",
3486
              name, result.fail_reason, result.output)
3487
    finally:
3488
      os.unlink(fin_name)
3489

    
3490
    return result.stdout
3491

    
3492

    
3493
class DevCacheManager(object):
3494
  """Simple class for managing a cache of block device information.
3495

3496
  """
3497
  _DEV_PREFIX = "/dev/"
3498
  _ROOT_DIR = constants.BDEV_CACHE_DIR
3499

    
3500
  @classmethod
3501
  def _ConvertPath(cls, dev_path):
3502
    """Converts a /dev/name path to the cache file name.
3503

3504
    This replaces slashes with underscores and strips the /dev
3505
    prefix. It then returns the full path to the cache file.
3506

3507
    @type dev_path: str
3508
    @param dev_path: the C{/dev/} path name
3509
    @rtype: str
3510
    @return: the converted path name
3511

3512
    """
3513
    if dev_path.startswith(cls._DEV_PREFIX):
3514
      dev_path = dev_path[len(cls._DEV_PREFIX):]
3515
    dev_path = dev_path.replace("/", "_")
3516
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3517
    return fpath
3518

    
3519
  @classmethod
3520
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3521
    """Updates the cache information for a given device.
3522

3523
    @type dev_path: str
3524
    @param dev_path: the pathname of the device
3525
    @type owner: str
3526
    @param owner: the owner (instance name) of the device
3527
    @type on_primary: bool
3528
    @param on_primary: whether this is the primary
3529
        node nor not
3530
    @type iv_name: str
3531
    @param iv_name: the instance-visible name of the
3532
        device, as in objects.Disk.iv_name
3533

3534
    @rtype: None
3535

3536
    """
3537
    if dev_path is None:
3538
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
3539
      return
3540
    fpath = cls._ConvertPath(dev_path)
3541
    if on_primary:
3542
      state = "primary"
3543
    else:
3544
      state = "secondary"
3545
    if iv_name is None:
3546
      iv_name = "not_visible"
3547
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3548
    try:
3549
      utils.WriteFile(fpath, data=fdata)
3550
    except EnvironmentError, err:
3551
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3552

    
3553
  @classmethod
3554
  def RemoveCache(cls, dev_path):
3555
    """Remove data for a dev_path.
3556

3557
    This is just a wrapper over L{utils.io.RemoveFile} with a converted
3558
    path name and logging.
3559

3560
    @type dev_path: str
3561
    @param dev_path: the pathname of the device
3562

3563
    @rtype: None
3564

3565
    """
3566
    if dev_path is None:
3567
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
3568
      return
3569
    fpath = cls._ConvertPath(dev_path)
3570
    try:
3571
      utils.RemoveFile(fpath)
3572
    except EnvironmentError, err:
3573
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)