Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 82b22e19

History | View | Annotate | Download (98.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable-msg=E1103
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37

    
38
import os
39
import os.path
40
import shutil
41
import time
42
import stat
43
import errno
44
import re
45
import random
46
import logging
47
import tempfile
48
import zlib
49
import base64
50
import signal
51

    
52
from ganeti import errors
53
from ganeti import utils
54
from ganeti import ssh
55
from ganeti import hypervisor
56
from ganeti import constants
57
from ganeti import bdev
58
from ganeti import objects
59
from ganeti import ssconf
60
from ganeti import serializer
61
from ganeti import netutils
62
from ganeti import runtime
63

    
64

    
65
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
66
_ALLOWED_CLEAN_DIRS = frozenset([
67
  constants.DATA_DIR,
68
  constants.JOB_QUEUE_ARCHIVE_DIR,
69
  constants.QUEUE_DIR,
70
  constants.CRYPTO_KEYS_DIR,
71
  ])
72
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
73
_X509_KEY_FILE = "key"
74
_X509_CERT_FILE = "cert"
75
_IES_STATUS_FILE = "status"
76
_IES_PID_FILE = "pid"
77
_IES_CA_FILE = "ca"
78

    
79

    
80
class RPCFail(Exception):
81
  """Class denoting RPC failure.
82

83
  Its argument is the error message.
84

85
  """
86

    
87

    
88
def _Fail(msg, *args, **kwargs):
89
  """Log an error and the raise an RPCFail exception.
90

91
  This exception is then handled specially in the ganeti daemon and
92
  turned into a 'failed' return type. As such, this function is a
93
  useful shortcut for logging the error and returning it to the master
94
  daemon.
95

96
  @type msg: string
97
  @param msg: the text of the exception
98
  @raise RPCFail
99

100
  """
101
  if args:
102
    msg = msg % args
103
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
104
    if "exc" in kwargs and kwargs["exc"]:
105
      logging.exception(msg)
106
    else:
107
      logging.error(msg)
108
  raise RPCFail(msg)
109

    
110

    
111
def _GetConfig():
112
  """Simple wrapper to return a SimpleStore.
113

114
  @rtype: L{ssconf.SimpleStore}
115
  @return: a SimpleStore instance
116

117
  """
118
  return ssconf.SimpleStore()
119

    
120

    
121
def _GetSshRunner(cluster_name):
122
  """Simple wrapper to return an SshRunner.
123

124
  @type cluster_name: str
125
  @param cluster_name: the cluster name, which is needed
126
      by the SshRunner constructor
127
  @rtype: L{ssh.SshRunner}
128
  @return: an SshRunner instance
129

130
  """
131
  return ssh.SshRunner(cluster_name)
132

    
133

    
134
def _Decompress(data):
135
  """Unpacks data compressed by the RPC client.
136

137
  @type data: list or tuple
138
  @param data: Data sent by RPC client
139
  @rtype: str
140
  @return: Decompressed data
141

142
  """
143
  assert isinstance(data, (list, tuple))
144
  assert len(data) == 2
145
  (encoding, content) = data
146
  if encoding == constants.RPC_ENCODING_NONE:
147
    return content
148
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
149
    return zlib.decompress(base64.b64decode(content))
150
  else:
151
    raise AssertionError("Unknown data encoding")
152

    
153

    
154
def _CleanDirectory(path, exclude=None):
155
  """Removes all regular files in a directory.
156

157
  @type path: str
158
  @param path: the directory to clean
159
  @type exclude: list
160
  @param exclude: list of files to be excluded, defaults
161
      to the empty list
162

163
  """
164
  if path not in _ALLOWED_CLEAN_DIRS:
165
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
166
          path)
167

    
168
  if not os.path.isdir(path):
169
    return
170
  if exclude is None:
171
    exclude = []
172
  else:
173
    # Normalize excluded paths
174
    exclude = [os.path.normpath(i) for i in exclude]
175

    
176
  for rel_name in utils.ListVisibleFiles(path):
177
    full_name = utils.PathJoin(path, rel_name)
178
    if full_name in exclude:
179
      continue
180
    if os.path.isfile(full_name) and not os.path.islink(full_name):
181
      utils.RemoveFile(full_name)
182

    
183

    
184
def _BuildUploadFileList():
185
  """Build the list of allowed upload files.
186

187
  This is abstracted so that it's built only once at module import time.
188

189
  """
190
  allowed_files = set([
191
    constants.CLUSTER_CONF_FILE,
192
    constants.ETC_HOSTS,
193
    constants.SSH_KNOWN_HOSTS_FILE,
194
    constants.VNC_PASSWORD_FILE,
195
    constants.RAPI_CERT_FILE,
196
    constants.RAPI_USERS_FILE,
197
    constants.CONFD_HMAC_KEY,
198
    constants.CLUSTER_DOMAIN_SECRET_FILE,
199
    ])
200

    
201
  for hv_name in constants.HYPER_TYPES:
202
    hv_class = hypervisor.GetHypervisorClass(hv_name)
203
    allowed_files.update(hv_class.GetAncillaryFiles())
204

    
205
  return frozenset(allowed_files)
206

    
207

    
208
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
209

    
210

    
211
def JobQueuePurge():
212
  """Removes job queue files and archived jobs.
213

214
  @rtype: tuple
215
  @return: True, None
216

217
  """
218
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
219
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
220

    
221

    
222
def GetMasterInfo():
223
  """Returns master information.
224

225
  This is an utility function to compute master information, either
226
  for consumption here or from the node daemon.
227

228
  @rtype: tuple
229
  @return: master_netdev, master_ip, master_name, primary_ip_family
230
  @raise RPCFail: in case of errors
231

232
  """
233
  try:
234
    cfg = _GetConfig()
235
    master_netdev = cfg.GetMasterNetdev()
236
    master_ip = cfg.GetMasterIP()
237
    master_node = cfg.GetMasterNode()
238
    primary_ip_family = cfg.GetPrimaryIPFamily()
239
  except errors.ConfigurationError, err:
240
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
241
  return (master_netdev, master_ip, master_node, primary_ip_family)
242

    
243

    
244
def StartMaster(start_daemons, no_voting):
245
  """Activate local node as master node.
246

247
  The function will either try activate the IP address of the master
248
  (unless someone else has it) or also start the master daemons, based
249
  on the start_daemons parameter.
250

251
  @type start_daemons: boolean
252
  @param start_daemons: whether to start the master daemons
253
      (ganeti-masterd and ganeti-rapi), or (if false) activate the
254
      master ip
255
  @type no_voting: boolean
256
  @param no_voting: whether to start ganeti-masterd without a node vote
257
      (if start_daemons is True), but still non-interactively
258
  @rtype: None
259

260
  """
261
  # GetMasterInfo will raise an exception if not able to return data
262
  master_netdev, master_ip, _, family = GetMasterInfo()
263

    
264
  err_msgs = []
265
  # either start the master and rapi daemons
266
  if start_daemons:
267
    if no_voting:
268
      masterd_args = "--no-voting --yes-do-it"
269
    else:
270
      masterd_args = ""
271

    
272
    env = {
273
      "EXTRA_MASTERD_ARGS": masterd_args,
274
      }
275

    
276
    result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
277
    if result.failed:
278
      msg = "Can't start Ganeti master: %s" % result.output
279
      logging.error(msg)
280
      err_msgs.append(msg)
281
  # or activate the IP
282
  else:
283
    if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
284
      if netutils.IPAddress.Own(master_ip):
285
        # we already have the ip:
286
        logging.debug("Master IP already configured, doing nothing")
287
      else:
288
        msg = "Someone else has the master ip, not activating"
289
        logging.error(msg)
290
        err_msgs.append(msg)
291
    else:
292
      ipcls = netutils.IP4Address
293
      if family == netutils.IP6Address.family:
294
        ipcls = netutils.IP6Address
295

    
296
      result = utils.RunCmd(["ip", "address", "add",
297
                             "%s/%d" % (master_ip, ipcls.iplen),
298
                             "dev", master_netdev, "label",
299
                             "%s:0" % master_netdev])
300
      if result.failed:
301
        msg = "Can't activate master IP: %s" % result.output
302
        logging.error(msg)
303
        err_msgs.append(msg)
304

    
305
      # we ignore the exit code of the following cmds
306
      if ipcls == netutils.IP4Address:
307
        utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
308
                      master_ip, master_ip])
309
      elif ipcls == netutils.IP6Address:
310
        try:
311
          utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
312
        except errors.OpExecError:
313
          # TODO: Better error reporting
314
          logging.warning("Can't execute ndisc6, please install if missing")
315

    
316
  if err_msgs:
317
    _Fail("; ".join(err_msgs))
318

    
319

    
320
def StopMaster(stop_daemons):
321
  """Deactivate this node as master.
322

323
  The function will always try to deactivate the IP address of the
324
  master. It will also stop the master daemons depending on the
325
  stop_daemons parameter.
326

327
  @type stop_daemons: boolean
328
  @param stop_daemons: whether to also stop the master daemons
329
      (ganeti-masterd and ganeti-rapi)
330
  @rtype: None
331

332
  """
333
  # TODO: log and report back to the caller the error failures; we
334
  # need to decide in which case we fail the RPC for this
335

    
336
  # GetMasterInfo will raise an exception if not able to return data
337
  master_netdev, master_ip, _, family = GetMasterInfo()
338

    
339
  ipcls = netutils.IP4Address
340
  if family == netutils.IP6Address.family:
341
    ipcls = netutils.IP6Address
342

    
343
  result = utils.RunCmd(["ip", "address", "del",
344
                         "%s/%d" % (master_ip, ipcls.iplen),
345
                         "dev", master_netdev])
346
  if result.failed:
347
    logging.error("Can't remove the master IP, error: %s", result.output)
348
    # but otherwise ignore the failure
349

    
350
  if stop_daemons:
351
    result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
352
    if result.failed:
353
      logging.error("Could not stop Ganeti master, command %s had exitcode %s"
354
                    " and error %s",
355
                    result.cmd, result.exit_code, result.output)
356

    
357

    
358
def EtcHostsModify(mode, host, ip):
359
  """Modify a host entry in /etc/hosts.
360

361
  @param mode: The mode to operate. Either add or remove entry
362
  @param host: The host to operate on
363
  @param ip: The ip associated with the entry
364

365
  """
366
  if mode == constants.ETC_HOSTS_ADD:
367
    if not ip:
368
      RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
369
              " present")
370
    utils.AddHostToEtcHosts(host, ip)
371
  elif mode == constants.ETC_HOSTS_REMOVE:
372
    if ip:
373
      RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
374
              " parameter is present")
375
    utils.RemoveHostFromEtcHosts(host)
376
  else:
377
    RPCFail("Mode not supported")
378

    
379

    
380
def LeaveCluster(modify_ssh_setup):
381
  """Cleans up and remove the current node.
382

383
  This function cleans up and prepares the current node to be removed
384
  from the cluster.
385

386
  If processing is successful, then it raises an
387
  L{errors.QuitGanetiException} which is used as a special case to
388
  shutdown the node daemon.
389

390
  @param modify_ssh_setup: boolean
391

392
  """
393
  _CleanDirectory(constants.DATA_DIR)
394
  _CleanDirectory(constants.CRYPTO_KEYS_DIR)
395
  JobQueuePurge()
396

    
397
  if modify_ssh_setup:
398
    try:
399
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
400

    
401
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
402

    
403
      utils.RemoveFile(priv_key)
404
      utils.RemoveFile(pub_key)
405
    except errors.OpExecError:
406
      logging.exception("Error while processing ssh files")
407

    
408
  try:
409
    utils.RemoveFile(constants.CONFD_HMAC_KEY)
410
    utils.RemoveFile(constants.RAPI_CERT_FILE)
411
    utils.RemoveFile(constants.NODED_CERT_FILE)
412
  except: # pylint: disable-msg=W0702
413
    logging.exception("Error while removing cluster secrets")
414

    
415
  result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
416
  if result.failed:
417
    logging.error("Command %s failed with exitcode %s and error %s",
418
                  result.cmd, result.exit_code, result.output)
419

    
420
  # Raise a custom exception (handled in ganeti-noded)
421
  raise errors.QuitGanetiException(True, 'Shutdown scheduled')
422

    
423

    
424
def GetNodeInfo(vgname, hypervisor_type):
425
  """Gives back a hash with different information about the node.
426

427
  @type vgname: C{string}
428
  @param vgname: the name of the volume group to ask for disk space information
429
  @type hypervisor_type: C{str}
430
  @param hypervisor_type: the name of the hypervisor to ask for
431
      memory information
432
  @rtype: C{dict}
433
  @return: dictionary with the following keys:
434
      - vg_size is the size of the configured volume group in MiB
435
      - vg_free is the free size of the volume group in MiB
436
      - memory_dom0 is the memory allocated for domain0 in MiB
437
      - memory_free is the currently available (free) ram in MiB
438
      - memory_total is the total number of ram in MiB
439

440
  """
441
  outputarray = {}
442
  vginfo = _GetVGInfo(vgname)
443
  outputarray['vg_size'] = vginfo['vg_size']
444
  outputarray['vg_free'] = vginfo['vg_free']
445

    
446
  hyper = hypervisor.GetHypervisor(hypervisor_type)
447
  hyp_info = hyper.GetNodeInfo()
448
  if hyp_info is not None:
449
    outputarray.update(hyp_info)
450

    
451
  outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
452

    
453
  return outputarray
454

    
455

    
456
def VerifyNode(what, cluster_name):
457
  """Verify the status of the local node.
458

459
  Based on the input L{what} parameter, various checks are done on the
460
  local node.
461

462
  If the I{filelist} key is present, this list of
463
  files is checksummed and the file/checksum pairs are returned.
464

465
  If the I{nodelist} key is present, we check that we have
466
  connectivity via ssh with the target nodes (and check the hostname
467
  report).
468

469
  If the I{node-net-test} key is present, we check that we have
470
  connectivity to the given nodes via both primary IP and, if
471
  applicable, secondary IPs.
472

473
  @type what: C{dict}
474
  @param what: a dictionary of things to check:
475
      - filelist: list of files for which to compute checksums
476
      - nodelist: list of nodes we should check ssh communication with
477
      - node-net-test: list of nodes we should check node daemon port
478
        connectivity with
479
      - hypervisor: list with hypervisors to run the verify for
480
  @rtype: dict
481
  @return: a dictionary with the same keys as the input dict, and
482
      values representing the result of the checks
483

484
  """
485
  result = {}
486
  my_name = netutils.Hostname.GetSysName()
487
  port = netutils.GetDaemonPort(constants.NODED)
488

    
489
  if constants.NV_HYPERVISOR in what:
490
    result[constants.NV_HYPERVISOR] = tmp = {}
491
    for hv_name in what[constants.NV_HYPERVISOR]:
492
      try:
493
        val = hypervisor.GetHypervisor(hv_name).Verify()
494
      except errors.HypervisorError, err:
495
        val = "Error while checking hypervisor: %s" % str(err)
496
      tmp[hv_name] = val
497

    
498
  if constants.NV_FILELIST in what:
499
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
500
      what[constants.NV_FILELIST])
501

    
502
  if constants.NV_NODELIST in what:
503
    result[constants.NV_NODELIST] = tmp = {}
504
    random.shuffle(what[constants.NV_NODELIST])
505
    for node in what[constants.NV_NODELIST]:
506
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
507
      if not success:
508
        tmp[node] = message
509

    
510
  if constants.NV_NODENETTEST in what:
511
    result[constants.NV_NODENETTEST] = tmp = {}
512
    my_pip = my_sip = None
513
    for name, pip, sip in what[constants.NV_NODENETTEST]:
514
      if name == my_name:
515
        my_pip = pip
516
        my_sip = sip
517
        break
518
    if not my_pip:
519
      tmp[my_name] = ("Can't find my own primary/secondary IP"
520
                      " in the node list")
521
    else:
522
      for name, pip, sip in what[constants.NV_NODENETTEST]:
523
        fail = []
524
        if not netutils.TcpPing(pip, port, source=my_pip):
525
          fail.append("primary")
526
        if sip != pip:
527
          if not netutils.TcpPing(sip, port, source=my_sip):
528
            fail.append("secondary")
529
        if fail:
530
          tmp[name] = ("failure using the %s interface(s)" %
531
                       " and ".join(fail))
532

    
533
  if constants.NV_MASTERIP in what:
534
    # FIXME: add checks on incoming data structures (here and in the
535
    # rest of the function)
536
    master_name, master_ip = what[constants.NV_MASTERIP]
537
    if master_name == my_name:
538
      source = constants.IP4_ADDRESS_LOCALHOST
539
    else:
540
      source = None
541
    result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
542
                                                  source=source)
543

    
544
  if constants.NV_LVLIST in what:
545
    try:
546
      val = GetVolumeList(what[constants.NV_LVLIST])
547
    except RPCFail, err:
548
      val = str(err)
549
    result[constants.NV_LVLIST] = val
550

    
551
  if constants.NV_INSTANCELIST in what:
552
    # GetInstanceList can fail
553
    try:
554
      val = GetInstanceList(what[constants.NV_INSTANCELIST])
555
    except RPCFail, err:
556
      val = str(err)
557
    result[constants.NV_INSTANCELIST] = val
558

    
559
  if constants.NV_VGLIST in what:
560
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
561

    
562
  if constants.NV_PVLIST in what:
563
    result[constants.NV_PVLIST] = \
564
      bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
565
                                   filter_allocatable=False)
566

    
567
  if constants.NV_VERSION in what:
568
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
569
                                    constants.RELEASE_VERSION)
570

    
571
  if constants.NV_HVINFO in what:
572
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
573
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
574

    
575
  if constants.NV_DRBDLIST in what:
576
    try:
577
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
578
    except errors.BlockDeviceError, err:
579
      logging.warning("Can't get used minors list", exc_info=True)
580
      used_minors = str(err)
581
    result[constants.NV_DRBDLIST] = used_minors
582

    
583
  if constants.NV_DRBDHELPER in what:
584
    status = True
585
    try:
586
      payload = bdev.BaseDRBD.GetUsermodeHelper()
587
    except errors.BlockDeviceError, err:
588
      logging.error("Can't get DRBD usermode helper: %s", str(err))
589
      status = False
590
      payload = str(err)
591
    result[constants.NV_DRBDHELPER] = (status, payload)
592

    
593
  if constants.NV_NODESETUP in what:
594
    result[constants.NV_NODESETUP] = tmpr = []
595
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
596
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
597
                  " under /sys, missing required directories /sys/block"
598
                  " and /sys/class/net")
599
    if (not os.path.isdir("/proc/sys") or
600
        not os.path.isfile("/proc/sysrq-trigger")):
601
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
602
                  " under /proc, missing required directory /proc/sys and"
603
                  " the file /proc/sysrq-trigger")
604

    
605
  if constants.NV_TIME in what:
606
    result[constants.NV_TIME] = utils.SplitTime(time.time())
607

    
608
  if constants.NV_OSLIST in what:
609
    result[constants.NV_OSLIST] = DiagnoseOS()
610

    
611
  return result
612

    
613

    
614
def GetVolumeList(vg_name):
615
  """Compute list of logical volumes and their size.
616

617
  @type vg_name: str
618
  @param vg_name: the volume group whose LVs we should list
619
  @rtype: dict
620
  @return:
621
      dictionary of all partions (key) with value being a tuple of
622
      their size (in MiB), inactive and online status::
623

624
        {'test1': ('20.06', True, True)}
625

626
      in case of errors, a string is returned with the error
627
      details.
628

629
  """
630
  lvs = {}
631
  sep = '|'
632
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
633
                         "--separator=%s" % sep,
634
                         "-olv_name,lv_size,lv_attr", vg_name])
635
  if result.failed:
636
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
637

    
638
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
639
  for line in result.stdout.splitlines():
640
    line = line.strip()
641
    match = valid_line_re.match(line)
642
    if not match:
643
      logging.error("Invalid line returned from lvs output: '%s'", line)
644
      continue
645
    name, size, attr = match.groups()
646
    inactive = attr[4] == '-'
647
    online = attr[5] == 'o'
648
    virtual = attr[0] == 'v'
649
    if virtual:
650
      # we don't want to report such volumes as existing, since they
651
      # don't really hold data
652
      continue
653
    lvs[name] = (size, inactive, online)
654

    
655
  return lvs
656

    
657

    
658
def ListVolumeGroups():
659
  """List the volume groups and their size.
660

661
  @rtype: dict
662
  @return: dictionary with keys volume name and values the
663
      size of the volume
664

665
  """
666
  return utils.ListVolumeGroups()
667

    
668

    
669
def NodeVolumes():
670
  """List all volumes on this node.
671

672
  @rtype: list
673
  @return:
674
    A list of dictionaries, each having four keys:
675
      - name: the logical volume name,
676
      - size: the size of the logical volume
677
      - dev: the physical device on which the LV lives
678
      - vg: the volume group to which it belongs
679

680
    In case of errors, we return an empty list and log the
681
    error.
682

683
    Note that since a logical volume can live on multiple physical
684
    volumes, the resulting list might include a logical volume
685
    multiple times.
686

687
  """
688
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
689
                         "--separator=|",
690
                         "--options=lv_name,lv_size,devices,vg_name"])
691
  if result.failed:
692
    _Fail("Failed to list logical volumes, lvs output: %s",
693
          result.output)
694

    
695
  def parse_dev(dev):
696
    return dev.split('(')[0]
697

    
698
  def handle_dev(dev):
699
    return [parse_dev(x) for x in dev.split(",")]
700

    
701
  def map_line(line):
702
    line = [v.strip() for v in line]
703
    return [{'name': line[0], 'size': line[1],
704
             'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
705

    
706
  all_devs = []
707
  for line in result.stdout.splitlines():
708
    if line.count('|') >= 3:
709
      all_devs.extend(map_line(line.split('|')))
710
    else:
711
      logging.warning("Strange line in the output from lvs: '%s'", line)
712
  return all_devs
713

    
714

    
715
def BridgesExist(bridges_list):
716
  """Check if a list of bridges exist on the current node.
717

718
  @rtype: boolean
719
  @return: C{True} if all of them exist, C{False} otherwise
720

721
  """
722
  missing = []
723
  for bridge in bridges_list:
724
    if not utils.BridgeExists(bridge):
725
      missing.append(bridge)
726

    
727
  if missing:
728
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
729

    
730

    
731
def GetInstanceList(hypervisor_list):
732
  """Provides a list of instances.
733

734
  @type hypervisor_list: list
735
  @param hypervisor_list: the list of hypervisors to query information
736

737
  @rtype: list
738
  @return: a list of all running instances on the current node
739
    - instance1.example.com
740
    - instance2.example.com
741

742
  """
743
  results = []
744
  for hname in hypervisor_list:
745
    try:
746
      names = hypervisor.GetHypervisor(hname).ListInstances()
747
      results.extend(names)
748
    except errors.HypervisorError, err:
749
      _Fail("Error enumerating instances (hypervisor %s): %s",
750
            hname, err, exc=True)
751

    
752
  return results
753

    
754

    
755
def GetInstanceInfo(instance, hname):
756
  """Gives back the information about an instance as a dictionary.
757

758
  @type instance: string
759
  @param instance: the instance name
760
  @type hname: string
761
  @param hname: the hypervisor type of the instance
762

763
  @rtype: dict
764
  @return: dictionary with the following keys:
765
      - memory: memory size of instance (int)
766
      - state: xen state of instance (string)
767
      - time: cpu time of instance (float)
768

769
  """
770
  output = {}
771

    
772
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
773
  if iinfo is not None:
774
    output['memory'] = iinfo[2]
775
    output['state'] = iinfo[4]
776
    output['time'] = iinfo[5]
777

    
778
  return output
779

    
780

    
781
def GetInstanceMigratable(instance):
782
  """Gives whether an instance can be migrated.
783

784
  @type instance: L{objects.Instance}
785
  @param instance: object representing the instance to be checked.
786

787
  @rtype: tuple
788
  @return: tuple of (result, description) where:
789
      - result: whether the instance can be migrated or not
790
      - description: a description of the issue, if relevant
791

792
  """
793
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
794
  iname = instance.name
795
  if iname not in hyper.ListInstances():
796
    _Fail("Instance %s is not running", iname)
797

    
798
  for idx in range(len(instance.disks)):
799
    link_name = _GetBlockDevSymlinkPath(iname, idx)
800
    if not os.path.islink(link_name):
801
      logging.warning("Instance %s is missing symlink %s for disk %d",
802
                      iname, link_name, idx)
803

    
804

    
805
def GetAllInstancesInfo(hypervisor_list):
806
  """Gather data about all instances.
807

808
  This is the equivalent of L{GetInstanceInfo}, except that it
809
  computes data for all instances at once, thus being faster if one
810
  needs data about more than one instance.
811

812
  @type hypervisor_list: list
813
  @param hypervisor_list: list of hypervisors to query for instance data
814

815
  @rtype: dict
816
  @return: dictionary of instance: data, with data having the following keys:
817
      - memory: memory size of instance (int)
818
      - state: xen state of instance (string)
819
      - time: cpu time of instance (float)
820
      - vcpus: the number of vcpus
821

822
  """
823
  output = {}
824

    
825
  for hname in hypervisor_list:
826
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
827
    if iinfo:
828
      for name, _, memory, vcpus, state, times in iinfo:
829
        value = {
830
          'memory': memory,
831
          'vcpus': vcpus,
832
          'state': state,
833
          'time': times,
834
          }
835
        if name in output:
836
          # we only check static parameters, like memory and vcpus,
837
          # and not state and time which can change between the
838
          # invocations of the different hypervisors
839
          for key in 'memory', 'vcpus':
840
            if value[key] != output[name][key]:
841
              _Fail("Instance %s is running twice"
842
                    " with different parameters", name)
843
        output[name] = value
844

    
845
  return output
846

    
847

    
848
def _InstanceLogName(kind, os_name, instance):
849
  """Compute the OS log filename for a given instance and operation.
850

851
  The instance name and os name are passed in as strings since not all
852
  operations have these as part of an instance object.
853

854
  @type kind: string
855
  @param kind: the operation type (e.g. add, import, etc.)
856
  @type os_name: string
857
  @param os_name: the os name
858
  @type instance: string
859
  @param instance: the name of the instance being imported/added/etc.
860

861
  """
862
  # TODO: Use tempfile.mkstemp to create unique filename
863
  base = ("%s-%s-%s-%s.log" %
864
          (kind, os_name, instance, utils.TimestampForFilename()))
865
  return utils.PathJoin(constants.LOG_OS_DIR, base)
866

    
867

    
868
def InstanceOsAdd(instance, reinstall, debug):
869
  """Add an OS to an instance.
870

871
  @type instance: L{objects.Instance}
872
  @param instance: Instance whose OS is to be installed
873
  @type reinstall: boolean
874
  @param reinstall: whether this is an instance reinstall
875
  @type debug: integer
876
  @param debug: debug level, passed to the OS scripts
877
  @rtype: None
878

879
  """
880
  inst_os = OSFromDisk(instance.os)
881

    
882
  create_env = OSEnvironment(instance, inst_os, debug)
883
  if reinstall:
884
    create_env['INSTANCE_REINSTALL'] = "1"
885

    
886
  logfile = _InstanceLogName("add", instance.os, instance.name)
887

    
888
  result = utils.RunCmd([inst_os.create_script], env=create_env,
889
                        cwd=inst_os.path, output=logfile,)
890
  if result.failed:
891
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
892
                  " output: %s", result.cmd, result.fail_reason, logfile,
893
                  result.output)
894
    lines = [utils.SafeEncode(val)
895
             for val in utils.TailFile(logfile, lines=20)]
896
    _Fail("OS create script failed (%s), last lines in the"
897
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
898

    
899

    
900
def RunRenameInstance(instance, old_name, debug):
901
  """Run the OS rename script for an instance.
902

903
  @type instance: L{objects.Instance}
904
  @param instance: Instance whose OS is to be installed
905
  @type old_name: string
906
  @param old_name: previous instance name
907
  @type debug: integer
908
  @param debug: debug level, passed to the OS scripts
909
  @rtype: boolean
910
  @return: the success of the operation
911

912
  """
913
  inst_os = OSFromDisk(instance.os)
914

    
915
  rename_env = OSEnvironment(instance, inst_os, debug)
916
  rename_env['OLD_INSTANCE_NAME'] = old_name
917

    
918
  logfile = _InstanceLogName("rename", instance.os,
919
                             "%s-%s" % (old_name, instance.name))
920

    
921
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
922
                        cwd=inst_os.path, output=logfile)
923

    
924
  if result.failed:
925
    logging.error("os create command '%s' returned error: %s output: %s",
926
                  result.cmd, result.fail_reason, result.output)
927
    lines = [utils.SafeEncode(val)
928
             for val in utils.TailFile(logfile, lines=20)]
929
    _Fail("OS rename script failed (%s), last lines in the"
930
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
931

    
932

    
933
def _GetVGInfo(vg_name):
934
  """Get information about the volume group.
935

936
  @type vg_name: str
937
  @param vg_name: the volume group which we query
938
  @rtype: dict
939
  @return:
940
    A dictionary with the following keys:
941
      - C{vg_size} is the total size of the volume group in MiB
942
      - C{vg_free} is the free size of the volume group in MiB
943
      - C{pv_count} are the number of physical disks in that VG
944

945
    If an error occurs during gathering of data, we return the same dict
946
    with keys all set to None.
947

948
  """
949
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
950

    
951
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
952
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
953

    
954
  if retval.failed:
955
    logging.error("volume group %s not present", vg_name)
956
    return retdic
957
  valarr = retval.stdout.strip().rstrip(':').split(':')
958
  if len(valarr) == 3:
959
    try:
960
      retdic = {
961
        "vg_size": int(round(float(valarr[0]), 0)),
962
        "vg_free": int(round(float(valarr[1]), 0)),
963
        "pv_count": int(valarr[2]),
964
        }
965
    except (TypeError, ValueError), err:
966
      logging.exception("Fail to parse vgs output: %s", err)
967
  else:
968
    logging.error("vgs output has the wrong number of fields (expected"
969
                  " three): %s", str(valarr))
970
  return retdic
971

    
972

    
973
def _GetBlockDevSymlinkPath(instance_name, idx):
974
  return utils.PathJoin(constants.DISK_LINKS_DIR,
975
                        "%s:%d" % (instance_name, idx))
976

    
977

    
978
def _SymlinkBlockDev(instance_name, device_path, idx):
979
  """Set up symlinks to a instance's block device.
980

981
  This is an auxiliary function run when an instance is start (on the primary
982
  node) or when an instance is migrated (on the target node).
983

984

985
  @param instance_name: the name of the target instance
986
  @param device_path: path of the physical block device, on the node
987
  @param idx: the disk index
988
  @return: absolute path to the disk's symlink
989

990
  """
991
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
992
  try:
993
    os.symlink(device_path, link_name)
994
  except OSError, err:
995
    if err.errno == errno.EEXIST:
996
      if (not os.path.islink(link_name) or
997
          os.readlink(link_name) != device_path):
998
        os.remove(link_name)
999
        os.symlink(device_path, link_name)
1000
    else:
1001
      raise
1002

    
1003
  return link_name
1004

    
1005

    
1006
def _RemoveBlockDevLinks(instance_name, disks):
1007
  """Remove the block device symlinks belonging to the given instance.
1008

1009
  """
1010
  for idx, _ in enumerate(disks):
1011
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1012
    if os.path.islink(link_name):
1013
      try:
1014
        os.remove(link_name)
1015
      except OSError:
1016
        logging.exception("Can't remove symlink '%s'", link_name)
1017

    
1018

    
1019
def _GatherAndLinkBlockDevs(instance):
1020
  """Set up an instance's block device(s).
1021

1022
  This is run on the primary node at instance startup. The block
1023
  devices must be already assembled.
1024

1025
  @type instance: L{objects.Instance}
1026
  @param instance: the instance whose disks we shoul assemble
1027
  @rtype: list
1028
  @return: list of (disk_object, device_path)
1029

1030
  """
1031
  block_devices = []
1032
  for idx, disk in enumerate(instance.disks):
1033
    device = _RecursiveFindBD(disk)
1034
    if device is None:
1035
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1036
                                    str(disk))
1037
    device.Open()
1038
    try:
1039
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1040
    except OSError, e:
1041
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1042
                                    e.strerror)
1043

    
1044
    block_devices.append((disk, link_name))
1045

    
1046
  return block_devices
1047

    
1048

    
1049
def StartInstance(instance):
1050
  """Start an instance.
1051

1052
  @type instance: L{objects.Instance}
1053
  @param instance: the instance object
1054
  @rtype: None
1055

1056
  """
1057
  running_instances = GetInstanceList([instance.hypervisor])
1058

    
1059
  if instance.name in running_instances:
1060
    logging.info("Instance %s already running, not starting", instance.name)
1061
    return
1062

    
1063
  try:
1064
    block_devices = _GatherAndLinkBlockDevs(instance)
1065
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1066
    hyper.StartInstance(instance, block_devices)
1067
  except errors.BlockDeviceError, err:
1068
    _Fail("Block device error: %s", err, exc=True)
1069
  except errors.HypervisorError, err:
1070
    _RemoveBlockDevLinks(instance.name, instance.disks)
1071
    _Fail("Hypervisor error: %s", err, exc=True)
1072

    
1073

    
1074
def InstanceShutdown(instance, timeout):
1075
  """Shut an instance down.
1076

1077
  @note: this functions uses polling with a hardcoded timeout.
1078

1079
  @type instance: L{objects.Instance}
1080
  @param instance: the instance object
1081
  @type timeout: integer
1082
  @param timeout: maximum timeout for soft shutdown
1083
  @rtype: None
1084

1085
  """
1086
  hv_name = instance.hypervisor
1087
  hyper = hypervisor.GetHypervisor(hv_name)
1088
  iname = instance.name
1089

    
1090
  if instance.name not in hyper.ListInstances():
1091
    logging.info("Instance %s not running, doing nothing", iname)
1092
    return
1093

    
1094
  class _TryShutdown:
1095
    def __init__(self):
1096
      self.tried_once = False
1097

    
1098
    def __call__(self):
1099
      if iname not in hyper.ListInstances():
1100
        return
1101

    
1102
      try:
1103
        hyper.StopInstance(instance, retry=self.tried_once)
1104
      except errors.HypervisorError, err:
1105
        if iname not in hyper.ListInstances():
1106
          # if the instance is no longer existing, consider this a
1107
          # success and go to cleanup
1108
          return
1109

    
1110
        _Fail("Failed to stop instance %s: %s", iname, err)
1111

    
1112
      self.tried_once = True
1113

    
1114
      raise utils.RetryAgain()
1115

    
1116
  try:
1117
    utils.Retry(_TryShutdown(), 5, timeout)
1118
  except utils.RetryTimeout:
1119
    # the shutdown did not succeed
1120
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1121

    
1122
    try:
1123
      hyper.StopInstance(instance, force=True)
1124
    except errors.HypervisorError, err:
1125
      if iname in hyper.ListInstances():
1126
        # only raise an error if the instance still exists, otherwise
1127
        # the error could simply be "instance ... unknown"!
1128
        _Fail("Failed to force stop instance %s: %s", iname, err)
1129

    
1130
    time.sleep(1)
1131

    
1132
    if iname in hyper.ListInstances():
1133
      _Fail("Could not shutdown instance %s even by destroy", iname)
1134

    
1135
  try:
1136
    hyper.CleanupInstance(instance.name)
1137
  except errors.HypervisorError, err:
1138
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1139

    
1140
  _RemoveBlockDevLinks(iname, instance.disks)
1141

    
1142

    
1143
def InstanceReboot(instance, reboot_type, shutdown_timeout):
1144
  """Reboot an instance.
1145

1146
  @type instance: L{objects.Instance}
1147
  @param instance: the instance object to reboot
1148
  @type reboot_type: str
1149
  @param reboot_type: the type of reboot, one the following
1150
    constants:
1151
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1152
        instance OS, do not recreate the VM
1153
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1154
        restart the VM (at the hypervisor level)
1155
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1156
        not accepted here, since that mode is handled differently, in
1157
        cmdlib, and translates into full stop and start of the
1158
        instance (instead of a call_instance_reboot RPC)
1159
  @type shutdown_timeout: integer
1160
  @param shutdown_timeout: maximum timeout for soft shutdown
1161
  @rtype: None
1162

1163
  """
1164
  running_instances = GetInstanceList([instance.hypervisor])
1165

    
1166
  if instance.name not in running_instances:
1167
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1168

    
1169
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1170
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1171
    try:
1172
      hyper.RebootInstance(instance)
1173
    except errors.HypervisorError, err:
1174
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1175
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1176
    try:
1177
      InstanceShutdown(instance, shutdown_timeout)
1178
      return StartInstance(instance)
1179
    except errors.HypervisorError, err:
1180
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1181
  else:
1182
    _Fail("Invalid reboot_type received: %s", reboot_type)
1183

    
1184

    
1185
def MigrationInfo(instance):
1186
  """Gather information about an instance to be migrated.
1187

1188
  @type instance: L{objects.Instance}
1189
  @param instance: the instance definition
1190

1191
  """
1192
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1193
  try:
1194
    info = hyper.MigrationInfo(instance)
1195
  except errors.HypervisorError, err:
1196
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1197
  return info
1198

    
1199

    
1200
def AcceptInstance(instance, info, target):
1201
  """Prepare the node to accept an instance.
1202

1203
  @type instance: L{objects.Instance}
1204
  @param instance: the instance definition
1205
  @type info: string/data (opaque)
1206
  @param info: migration information, from the source node
1207
  @type target: string
1208
  @param target: target host (usually ip), on this node
1209

1210
  """
1211
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1212
  try:
1213
    hyper.AcceptInstance(instance, info, target)
1214
  except errors.HypervisorError, err:
1215
    _Fail("Failed to accept instance: %s", err, exc=True)
1216

    
1217

    
1218
def FinalizeMigration(instance, info, success):
1219
  """Finalize any preparation to accept an instance.
1220

1221
  @type instance: L{objects.Instance}
1222
  @param instance: the instance definition
1223
  @type info: string/data (opaque)
1224
  @param info: migration information, from the source node
1225
  @type success: boolean
1226
  @param success: whether the migration was a success or a failure
1227

1228
  """
1229
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1230
  try:
1231
    hyper.FinalizeMigration(instance, info, success)
1232
  except errors.HypervisorError, err:
1233
    _Fail("Failed to finalize migration: %s", err, exc=True)
1234

    
1235

    
1236
def MigrateInstance(instance, target, live):
1237
  """Migrates an instance to another node.
1238

1239
  @type instance: L{objects.Instance}
1240
  @param instance: the instance definition
1241
  @type target: string
1242
  @param target: the target node name
1243
  @type live: boolean
1244
  @param live: whether the migration should be done live or not (the
1245
      interpretation of this parameter is left to the hypervisor)
1246
  @rtype: tuple
1247
  @return: a tuple of (success, msg) where:
1248
      - succes is a boolean denoting the success/failure of the operation
1249
      - msg is a string with details in case of failure
1250

1251
  """
1252
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1253

    
1254
  try:
1255
    hyper.MigrateInstance(instance, target, live)
1256
  except errors.HypervisorError, err:
1257
    _Fail("Failed to migrate instance: %s", err, exc=True)
1258

    
1259

    
1260
def BlockdevCreate(disk, size, owner, on_primary, info):
1261
  """Creates a block device for an instance.
1262

1263
  @type disk: L{objects.Disk}
1264
  @param disk: the object describing the disk we should create
1265
  @type size: int
1266
  @param size: the size of the physical underlying device, in MiB
1267
  @type owner: str
1268
  @param owner: the name of the instance for which disk is created,
1269
      used for device cache data
1270
  @type on_primary: boolean
1271
  @param on_primary:  indicates if it is the primary node or not
1272
  @type info: string
1273
  @param info: string that will be sent to the physical device
1274
      creation, used for example to set (LVM) tags on LVs
1275

1276
  @return: the new unique_id of the device (this can sometime be
1277
      computed only after creation), or None. On secondary nodes,
1278
      it's not required to return anything.
1279

1280
  """
1281
  # TODO: remove the obsolete 'size' argument
1282
  # pylint: disable-msg=W0613
1283
  clist = []
1284
  if disk.children:
1285
    for child in disk.children:
1286
      try:
1287
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1288
      except errors.BlockDeviceError, err:
1289
        _Fail("Can't assemble device %s: %s", child, err)
1290
      if on_primary or disk.AssembleOnSecondary():
1291
        # we need the children open in case the device itself has to
1292
        # be assembled
1293
        try:
1294
          # pylint: disable-msg=E1103
1295
          crdev.Open()
1296
        except errors.BlockDeviceError, err:
1297
          _Fail("Can't make child '%s' read-write: %s", child, err)
1298
      clist.append(crdev)
1299

    
1300
  try:
1301
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1302
  except errors.BlockDeviceError, err:
1303
    _Fail("Can't create block device: %s", err)
1304

    
1305
  if on_primary or disk.AssembleOnSecondary():
1306
    try:
1307
      device.Assemble()
1308
    except errors.BlockDeviceError, err:
1309
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1310
    device.SetSyncSpeed(constants.SYNC_SPEED)
1311
    if on_primary or disk.OpenOnSecondary():
1312
      try:
1313
        device.Open(force=True)
1314
      except errors.BlockDeviceError, err:
1315
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1316
    DevCacheManager.UpdateCache(device.dev_path, owner,
1317
                                on_primary, disk.iv_name)
1318

    
1319
  device.SetInfo(info)
1320

    
1321
  return device.unique_id
1322

    
1323

    
1324
def BlockdevRemove(disk):
1325
  """Remove a block device.
1326

1327
  @note: This is intended to be called recursively.
1328

1329
  @type disk: L{objects.Disk}
1330
  @param disk: the disk object we should remove
1331
  @rtype: boolean
1332
  @return: the success of the operation
1333

1334
  """
1335
  msgs = []
1336
  try:
1337
    rdev = _RecursiveFindBD(disk)
1338
  except errors.BlockDeviceError, err:
1339
    # probably can't attach
1340
    logging.info("Can't attach to device %s in remove", disk)
1341
    rdev = None
1342
  if rdev is not None:
1343
    r_path = rdev.dev_path
1344
    try:
1345
      rdev.Remove()
1346
    except errors.BlockDeviceError, err:
1347
      msgs.append(str(err))
1348
    if not msgs:
1349
      DevCacheManager.RemoveCache(r_path)
1350

    
1351
  if disk.children:
1352
    for child in disk.children:
1353
      try:
1354
        BlockdevRemove(child)
1355
      except RPCFail, err:
1356
        msgs.append(str(err))
1357

    
1358
  if msgs:
1359
    _Fail("; ".join(msgs))
1360

    
1361

    
1362
def _RecursiveAssembleBD(disk, owner, as_primary):
1363
  """Activate a block device for an instance.
1364

1365
  This is run on the primary and secondary nodes for an instance.
1366

1367
  @note: this function is called recursively.
1368

1369
  @type disk: L{objects.Disk}
1370
  @param disk: the disk we try to assemble
1371
  @type owner: str
1372
  @param owner: the name of the instance which owns the disk
1373
  @type as_primary: boolean
1374
  @param as_primary: if we should make the block device
1375
      read/write
1376

1377
  @return: the assembled device or None (in case no device
1378
      was assembled)
1379
  @raise errors.BlockDeviceError: in case there is an error
1380
      during the activation of the children or the device
1381
      itself
1382

1383
  """
1384
  children = []
1385
  if disk.children:
1386
    mcn = disk.ChildrenNeeded()
1387
    if mcn == -1:
1388
      mcn = 0 # max number of Nones allowed
1389
    else:
1390
      mcn = len(disk.children) - mcn # max number of Nones
1391
    for chld_disk in disk.children:
1392
      try:
1393
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1394
      except errors.BlockDeviceError, err:
1395
        if children.count(None) >= mcn:
1396
          raise
1397
        cdev = None
1398
        logging.error("Error in child activation (but continuing): %s",
1399
                      str(err))
1400
      children.append(cdev)
1401

    
1402
  if as_primary or disk.AssembleOnSecondary():
1403
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1404
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1405
    result = r_dev
1406
    if as_primary or disk.OpenOnSecondary():
1407
      r_dev.Open()
1408
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1409
                                as_primary, disk.iv_name)
1410

    
1411
  else:
1412
    result = True
1413
  return result
1414

    
1415

    
1416
def BlockdevAssemble(disk, owner, as_primary):
1417
  """Activate a block device for an instance.
1418

1419
  This is a wrapper over _RecursiveAssembleBD.
1420

1421
  @rtype: str or boolean
1422
  @return: a C{/dev/...} path for primary nodes, and
1423
      C{True} for secondary nodes
1424

1425
  """
1426
  try:
1427
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1428
    if isinstance(result, bdev.BlockDev):
1429
      # pylint: disable-msg=E1103
1430
      result = result.dev_path
1431
  except errors.BlockDeviceError, err:
1432
    _Fail("Error while assembling disk: %s", err, exc=True)
1433

    
1434
  return result
1435

    
1436

    
1437
def BlockdevShutdown(disk):
1438
  """Shut down a block device.
1439

1440
  First, if the device is assembled (Attach() is successful), then
1441
  the device is shutdown. Then the children of the device are
1442
  shutdown.
1443

1444
  This function is called recursively. Note that we don't cache the
1445
  children or such, as oppossed to assemble, shutdown of different
1446
  devices doesn't require that the upper device was active.
1447

1448
  @type disk: L{objects.Disk}
1449
  @param disk: the description of the disk we should
1450
      shutdown
1451
  @rtype: None
1452

1453
  """
1454
  msgs = []
1455
  r_dev = _RecursiveFindBD(disk)
1456
  if r_dev is not None:
1457
    r_path = r_dev.dev_path
1458
    try:
1459
      r_dev.Shutdown()
1460
      DevCacheManager.RemoveCache(r_path)
1461
    except errors.BlockDeviceError, err:
1462
      msgs.append(str(err))
1463

    
1464
  if disk.children:
1465
    for child in disk.children:
1466
      try:
1467
        BlockdevShutdown(child)
1468
      except RPCFail, err:
1469
        msgs.append(str(err))
1470

    
1471
  if msgs:
1472
    _Fail("; ".join(msgs))
1473

    
1474

    
1475
def BlockdevAddchildren(parent_cdev, new_cdevs):
1476
  """Extend a mirrored block device.
1477

1478
  @type parent_cdev: L{objects.Disk}
1479
  @param parent_cdev: the disk to which we should add children
1480
  @type new_cdevs: list of L{objects.Disk}
1481
  @param new_cdevs: the list of children which we should add
1482
  @rtype: None
1483

1484
  """
1485
  parent_bdev = _RecursiveFindBD(parent_cdev)
1486
  if parent_bdev is None:
1487
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1488
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1489
  if new_bdevs.count(None) > 0:
1490
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1491
  parent_bdev.AddChildren(new_bdevs)
1492

    
1493

    
1494
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1495
  """Shrink a mirrored block device.
1496

1497
  @type parent_cdev: L{objects.Disk}
1498
  @param parent_cdev: the disk from which we should remove children
1499
  @type new_cdevs: list of L{objects.Disk}
1500
  @param new_cdevs: the list of children which we should remove
1501
  @rtype: None
1502

1503
  """
1504
  parent_bdev = _RecursiveFindBD(parent_cdev)
1505
  if parent_bdev is None:
1506
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1507
  devs = []
1508
  for disk in new_cdevs:
1509
    rpath = disk.StaticDevPath()
1510
    if rpath is None:
1511
      bd = _RecursiveFindBD(disk)
1512
      if bd is None:
1513
        _Fail("Can't find device %s while removing children", disk)
1514
      else:
1515
        devs.append(bd.dev_path)
1516
    else:
1517
      if not utils.IsNormAbsPath(rpath):
1518
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1519
      devs.append(rpath)
1520
  parent_bdev.RemoveChildren(devs)
1521

    
1522

    
1523
def BlockdevGetmirrorstatus(disks):
1524
  """Get the mirroring status of a list of devices.
1525

1526
  @type disks: list of L{objects.Disk}
1527
  @param disks: the list of disks which we should query
1528
  @rtype: disk
1529
  @return:
1530
      a list of (mirror_done, estimated_time) tuples, which
1531
      are the result of L{bdev.BlockDev.CombinedSyncStatus}
1532
  @raise errors.BlockDeviceError: if any of the disks cannot be
1533
      found
1534

1535
  """
1536
  stats = []
1537
  for dsk in disks:
1538
    rbd = _RecursiveFindBD(dsk)
1539
    if rbd is None:
1540
      _Fail("Can't find device %s", dsk)
1541

    
1542
    stats.append(rbd.CombinedSyncStatus())
1543

    
1544
  return stats
1545

    
1546

    
1547
def _RecursiveFindBD(disk):
1548
  """Check if a device is activated.
1549

1550
  If so, return information about the real device.
1551

1552
  @type disk: L{objects.Disk}
1553
  @param disk: the disk object we need to find
1554

1555
  @return: None if the device can't be found,
1556
      otherwise the device instance
1557

1558
  """
1559
  children = []
1560
  if disk.children:
1561
    for chdisk in disk.children:
1562
      children.append(_RecursiveFindBD(chdisk))
1563

    
1564
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1565

    
1566

    
1567
def _OpenRealBD(disk):
1568
  """Opens the underlying block device of a disk.
1569

1570
  @type disk: L{objects.Disk}
1571
  @param disk: the disk object we want to open
1572

1573
  """
1574
  real_disk = _RecursiveFindBD(disk)
1575
  if real_disk is None:
1576
    _Fail("Block device '%s' is not set up", disk)
1577

    
1578
  real_disk.Open()
1579

    
1580
  return real_disk
1581

    
1582

    
1583
def BlockdevFind(disk):
1584
  """Check if a device is activated.
1585

1586
  If it is, return information about the real device.
1587

1588
  @type disk: L{objects.Disk}
1589
  @param disk: the disk to find
1590
  @rtype: None or objects.BlockDevStatus
1591
  @return: None if the disk cannot be found, otherwise a the current
1592
           information
1593

1594
  """
1595
  try:
1596
    rbd = _RecursiveFindBD(disk)
1597
  except errors.BlockDeviceError, err:
1598
    _Fail("Failed to find device: %s", err, exc=True)
1599

    
1600
  if rbd is None:
1601
    return None
1602

    
1603
  return rbd.GetSyncStatus()
1604

    
1605

    
1606
def BlockdevGetsize(disks):
1607
  """Computes the size of the given disks.
1608

1609
  If a disk is not found, returns None instead.
1610

1611
  @type disks: list of L{objects.Disk}
1612
  @param disks: the list of disk to compute the size for
1613
  @rtype: list
1614
  @return: list with elements None if the disk cannot be found,
1615
      otherwise the size
1616

1617
  """
1618
  result = []
1619
  for cf in disks:
1620
    try:
1621
      rbd = _RecursiveFindBD(cf)
1622
    except errors.BlockDeviceError:
1623
      result.append(None)
1624
      continue
1625
    if rbd is None:
1626
      result.append(None)
1627
    else:
1628
      result.append(rbd.GetActualSize())
1629
  return result
1630

    
1631

    
1632
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1633
  """Export a block device to a remote node.
1634

1635
  @type disk: L{objects.Disk}
1636
  @param disk: the description of the disk to export
1637
  @type dest_node: str
1638
  @param dest_node: the destination node to export to
1639
  @type dest_path: str
1640
  @param dest_path: the destination path on the target node
1641
  @type cluster_name: str
1642
  @param cluster_name: the cluster name, needed for SSH hostalias
1643
  @rtype: None
1644

1645
  """
1646
  real_disk = _OpenRealBD(disk)
1647

    
1648
  # the block size on the read dd is 1MiB to match our units
1649
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1650
                               "dd if=%s bs=1048576 count=%s",
1651
                               real_disk.dev_path, str(disk.size))
1652

    
1653
  # we set here a smaller block size as, due to ssh buffering, more
1654
  # than 64-128k will mostly ignored; we use nocreat to fail if the
1655
  # device is not already there or we pass a wrong path; we use
1656
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1657
  # to not buffer too much memory; this means that at best, we flush
1658
  # every 64k, which will not be very fast
1659
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1660
                                " oflag=dsync", dest_path)
1661

    
1662
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1663
                                                   constants.GANETI_RUNAS,
1664
                                                   destcmd)
1665

    
1666
  # all commands have been checked, so we're safe to combine them
1667
  command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1668

    
1669
  result = utils.RunCmd(["bash", "-c", command])
1670

    
1671
  if result.failed:
1672
    _Fail("Disk copy command '%s' returned error: %s"
1673
          " output: %s", command, result.fail_reason, result.output)
1674

    
1675

    
1676
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1677
  """Write a file to the filesystem.
1678

1679
  This allows the master to overwrite(!) a file. It will only perform
1680
  the operation if the file belongs to a list of configuration files.
1681

1682
  @type file_name: str
1683
  @param file_name: the target file name
1684
  @type data: str
1685
  @param data: the new contents of the file
1686
  @type mode: int
1687
  @param mode: the mode to give the file (can be None)
1688
  @type uid: int
1689
  @param uid: the owner of the file (can be -1 for default)
1690
  @type gid: int
1691
  @param gid: the group of the file (can be -1 for default)
1692
  @type atime: float
1693
  @param atime: the atime to set on the file (can be None)
1694
  @type mtime: float
1695
  @param mtime: the mtime to set on the file (can be None)
1696
  @rtype: None
1697

1698
  """
1699
  if not os.path.isabs(file_name):
1700
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1701

    
1702
  if file_name not in _ALLOWED_UPLOAD_FILES:
1703
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1704
          file_name)
1705

    
1706
  raw_data = _Decompress(data)
1707

    
1708
  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1709
                  atime=atime, mtime=mtime)
1710

    
1711

    
1712
def WriteSsconfFiles(values):
1713
  """Update all ssconf files.
1714

1715
  Wrapper around the SimpleStore.WriteFiles.
1716

1717
  """
1718
  ssconf.SimpleStore().WriteFiles(values)
1719

    
1720

    
1721
def _ErrnoOrStr(err):
1722
  """Format an EnvironmentError exception.
1723

1724
  If the L{err} argument has an errno attribute, it will be looked up
1725
  and converted into a textual C{E...} description. Otherwise the
1726
  string representation of the error will be returned.
1727

1728
  @type err: L{EnvironmentError}
1729
  @param err: the exception to format
1730

1731
  """
1732
  if hasattr(err, 'errno'):
1733
    detail = errno.errorcode[err.errno]
1734
  else:
1735
    detail = str(err)
1736
  return detail
1737

    
1738

    
1739
def _OSOndiskAPIVersion(os_dir):
1740
  """Compute and return the API version of a given OS.
1741

1742
  This function will try to read the API version of the OS residing in
1743
  the 'os_dir' directory.
1744

1745
  @type os_dir: str
1746
  @param os_dir: the directory in which we should look for the OS
1747
  @rtype: tuple
1748
  @return: tuple (status, data) with status denoting the validity and
1749
      data holding either the vaid versions or an error message
1750

1751
  """
1752
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1753

    
1754
  try:
1755
    st = os.stat(api_file)
1756
  except EnvironmentError, err:
1757
    return False, ("Required file '%s' not found under path %s: %s" %
1758
                   (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1759

    
1760
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1761
    return False, ("File '%s' in %s is not a regular file" %
1762
                   (constants.OS_API_FILE, os_dir))
1763

    
1764
  try:
1765
    api_versions = utils.ReadFile(api_file).splitlines()
1766
  except EnvironmentError, err:
1767
    return False, ("Error while reading the API version file at %s: %s" %
1768
                   (api_file, _ErrnoOrStr(err)))
1769

    
1770
  try:
1771
    api_versions = [int(version.strip()) for version in api_versions]
1772
  except (TypeError, ValueError), err:
1773
    return False, ("API version(s) can't be converted to integer: %s" %
1774
                   str(err))
1775

    
1776
  return True, api_versions
1777

    
1778

    
1779
def DiagnoseOS(top_dirs=None):
1780
  """Compute the validity for all OSes.
1781

1782
  @type top_dirs: list
1783
  @param top_dirs: the list of directories in which to
1784
      search (if not given defaults to
1785
      L{constants.OS_SEARCH_PATH})
1786
  @rtype: list of L{objects.OS}
1787
  @return: a list of tuples (name, path, status, diagnose, variants,
1788
      parameters, api_version) for all (potential) OSes under all
1789
      search paths, where:
1790
          - name is the (potential) OS name
1791
          - path is the full path to the OS
1792
          - status True/False is the validity of the OS
1793
          - diagnose is the error message for an invalid OS, otherwise empty
1794
          - variants is a list of supported OS variants, if any
1795
          - parameters is a list of (name, help) parameters, if any
1796
          - api_version is a list of support OS API versions
1797

1798
  """
1799
  if top_dirs is None:
1800
    top_dirs = constants.OS_SEARCH_PATH
1801

    
1802
  result = []
1803
  for dir_name in top_dirs:
1804
    if os.path.isdir(dir_name):
1805
      try:
1806
        f_names = utils.ListVisibleFiles(dir_name)
1807
      except EnvironmentError, err:
1808
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1809
        break
1810
      for name in f_names:
1811
        os_path = utils.PathJoin(dir_name, name)
1812
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1813
        if status:
1814
          diagnose = ""
1815
          variants = os_inst.supported_variants
1816
          parameters = os_inst.supported_parameters
1817
          api_versions = os_inst.api_versions
1818
        else:
1819
          diagnose = os_inst
1820
          variants = parameters = api_versions = []
1821
        result.append((name, os_path, status, diagnose, variants,
1822
                       parameters, api_versions))
1823

    
1824
  return result
1825

    
1826

    
1827
def _TryOSFromDisk(name, base_dir=None):
1828
  """Create an OS instance from disk.
1829

1830
  This function will return an OS instance if the given name is a
1831
  valid OS name.
1832

1833
  @type base_dir: string
1834
  @keyword base_dir: Base directory containing OS installations.
1835
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1836
  @rtype: tuple
1837
  @return: success and either the OS instance if we find a valid one,
1838
      or error message
1839

1840
  """
1841
  if base_dir is None:
1842
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1843
  else:
1844
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1845

    
1846
  if os_dir is None:
1847
    return False, "Directory for OS %s not found in search path" % name
1848

    
1849
  status, api_versions = _OSOndiskAPIVersion(os_dir)
1850
  if not status:
1851
    # push the error up
1852
    return status, api_versions
1853

    
1854
  if not constants.OS_API_VERSIONS.intersection(api_versions):
1855
    return False, ("API version mismatch for path '%s': found %s, want %s." %
1856
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
1857

    
1858
  # OS Files dictionary, we will populate it with the absolute path names
1859
  os_files = dict.fromkeys(constants.OS_SCRIPTS)
1860

    
1861
  if max(api_versions) >= constants.OS_API_V15:
1862
    os_files[constants.OS_VARIANTS_FILE] = ''
1863

    
1864
  if max(api_versions) >= constants.OS_API_V20:
1865
    os_files[constants.OS_PARAMETERS_FILE] = ''
1866
  else:
1867
    del os_files[constants.OS_SCRIPT_VERIFY]
1868

    
1869
  for filename in os_files:
1870
    os_files[filename] = utils.PathJoin(os_dir, filename)
1871

    
1872
    try:
1873
      st = os.stat(os_files[filename])
1874
    except EnvironmentError, err:
1875
      return False, ("File '%s' under path '%s' is missing (%s)" %
1876
                     (filename, os_dir, _ErrnoOrStr(err)))
1877

    
1878
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1879
      return False, ("File '%s' under path '%s' is not a regular file" %
1880
                     (filename, os_dir))
1881

    
1882
    if filename in constants.OS_SCRIPTS:
1883
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1884
        return False, ("File '%s' under path '%s' is not executable" %
1885
                       (filename, os_dir))
1886

    
1887
  variants = []
1888
  if constants.OS_VARIANTS_FILE in os_files:
1889
    variants_file = os_files[constants.OS_VARIANTS_FILE]
1890
    try:
1891
      variants = utils.ReadFile(variants_file).splitlines()
1892
    except EnvironmentError, err:
1893
      return False, ("Error while reading the OS variants file at %s: %s" %
1894
                     (variants_file, _ErrnoOrStr(err)))
1895
    if not variants:
1896
      return False, ("No supported os variant found")
1897

    
1898
  parameters = []
1899
  if constants.OS_PARAMETERS_FILE in os_files:
1900
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
1901
    try:
1902
      parameters = utils.ReadFile(parameters_file).splitlines()
1903
    except EnvironmentError, err:
1904
      return False, ("Error while reading the OS parameters file at %s: %s" %
1905
                     (parameters_file, _ErrnoOrStr(err)))
1906
    parameters = [v.split(None, 1) for v in parameters]
1907

    
1908
  os_obj = objects.OS(name=name, path=os_dir,
1909
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
1910
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
1911
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
1912
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
1913
                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
1914
                                                 None),
1915
                      supported_variants=variants,
1916
                      supported_parameters=parameters,
1917
                      api_versions=api_versions)
1918
  return True, os_obj
1919

    
1920

    
1921
def OSFromDisk(name, base_dir=None):
1922
  """Create an OS instance from disk.
1923

1924
  This function will return an OS instance if the given name is a
1925
  valid OS name. Otherwise, it will raise an appropriate
1926
  L{RPCFail} exception, detailing why this is not a valid OS.
1927

1928
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1929
  an exception but returns true/false status data.
1930

1931
  @type base_dir: string
1932
  @keyword base_dir: Base directory containing OS installations.
1933
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1934
  @rtype: L{objects.OS}
1935
  @return: the OS instance if we find a valid one
1936
  @raise RPCFail: if we don't find a valid OS
1937

1938
  """
1939
  name_only = name.split("+", 1)[0]
1940
  status, payload = _TryOSFromDisk(name_only, base_dir)
1941

    
1942
  if not status:
1943
    _Fail(payload)
1944

    
1945
  return payload
1946

    
1947

    
1948
def OSCoreEnv(inst_os, os_params, debug=0):
1949
  """Calculate the basic environment for an os script.
1950

1951
  @type inst_os: L{objects.OS}
1952
  @param inst_os: operating system for which the environment is being built
1953
  @type os_params: dict
1954
  @param os_params: the OS parameters
1955
  @type debug: integer
1956
  @param debug: debug level (0 or 1, for OS Api 10)
1957
  @rtype: dict
1958
  @return: dict of environment variables
1959
  @raise errors.BlockDeviceError: if the block device
1960
      cannot be found
1961

1962
  """
1963
  result = {}
1964
  api_version = \
1965
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1966
  result['OS_API_VERSION'] = '%d' % api_version
1967
  result['OS_NAME'] = inst_os.name
1968
  result['DEBUG_LEVEL'] = '%d' % debug
1969

    
1970
  # OS variants
1971
  if api_version >= constants.OS_API_V15:
1972
    try:
1973
      variant = inst_os.name.split('+', 1)[1]
1974
    except IndexError:
1975
      variant = inst_os.supported_variants[0]
1976
    result['OS_VARIANT'] = variant
1977

    
1978
  # OS params
1979
  for pname, pvalue in os_params.items():
1980
    result['OSP_%s' % pname.upper()] = pvalue
1981

    
1982
  return result
1983

    
1984

    
1985
def OSEnvironment(instance, inst_os, debug=0):
1986
  """Calculate the environment for an os script.
1987

1988
  @type instance: L{objects.Instance}
1989
  @param instance: target instance for the os script run
1990
  @type inst_os: L{objects.OS}
1991
  @param inst_os: operating system for which the environment is being built
1992
  @type debug: integer
1993
  @param debug: debug level (0 or 1, for OS Api 10)
1994
  @rtype: dict
1995
  @return: dict of environment variables
1996
  @raise errors.BlockDeviceError: if the block device
1997
      cannot be found
1998

1999
  """
2000
  result = OSCoreEnv(inst_os, instance.osparams, debug=debug)
2001

    
2002
  result['INSTANCE_NAME'] = instance.name
2003
  result['INSTANCE_OS'] = instance.os
2004
  result['HYPERVISOR'] = instance.hypervisor
2005
  result['DISK_COUNT'] = '%d' % len(instance.disks)
2006
  result['NIC_COUNT'] = '%d' % len(instance.nics)
2007

    
2008
  # Disks
2009
  for idx, disk in enumerate(instance.disks):
2010
    real_disk = _OpenRealBD(disk)
2011
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
2012
    result['DISK_%d_ACCESS' % idx] = disk.mode
2013
    if constants.HV_DISK_TYPE in instance.hvparams:
2014
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
2015
        instance.hvparams[constants.HV_DISK_TYPE]
2016
    if disk.dev_type in constants.LDS_BLOCK:
2017
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
2018
    elif disk.dev_type == constants.LD_FILE:
2019
      result['DISK_%d_BACKEND_TYPE' % idx] = \
2020
        'file:%s' % disk.physical_id[0]
2021

    
2022
  # NICs
2023
  for idx, nic in enumerate(instance.nics):
2024
    result['NIC_%d_MAC' % idx] = nic.mac
2025
    if nic.ip:
2026
      result['NIC_%d_IP' % idx] = nic.ip
2027
    result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
2028
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2029
      result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
2030
    if nic.nicparams[constants.NIC_LINK]:
2031
      result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
2032
    if constants.HV_NIC_TYPE in instance.hvparams:
2033
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
2034
        instance.hvparams[constants.HV_NIC_TYPE]
2035

    
2036
  # HV/BE params
2037
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2038
    for key, value in source.items():
2039
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2040

    
2041
  return result
2042

    
2043

    
2044
def BlockdevGrow(disk, amount):
2045
  """Grow a stack of block devices.
2046

2047
  This function is called recursively, with the childrens being the
2048
  first ones to resize.
2049

2050
  @type disk: L{objects.Disk}
2051
  @param disk: the disk to be grown
2052
  @rtype: (status, result)
2053
  @return: a tuple with the status of the operation
2054
      (True/False), and the errors message if status
2055
      is False
2056

2057
  """
2058
  r_dev = _RecursiveFindBD(disk)
2059
  if r_dev is None:
2060
    _Fail("Cannot find block device %s", disk)
2061

    
2062
  try:
2063
    r_dev.Grow(amount)
2064
  except errors.BlockDeviceError, err:
2065
    _Fail("Failed to grow block device: %s", err, exc=True)
2066

    
2067

    
2068
def BlockdevSnapshot(disk):
2069
  """Create a snapshot copy of a block device.
2070

2071
  This function is called recursively, and the snapshot is actually created
2072
  just for the leaf lvm backend device.
2073

2074
  @type disk: L{objects.Disk}
2075
  @param disk: the disk to be snapshotted
2076
  @rtype: string
2077
  @return: snapshot disk path
2078

2079
  """
2080
  if disk.dev_type == constants.LD_DRBD8:
2081
    if not disk.children:
2082
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2083
            disk.unique_id)
2084
    return BlockdevSnapshot(disk.children[0])
2085
  elif disk.dev_type == constants.LD_LV:
2086
    r_dev = _RecursiveFindBD(disk)
2087
    if r_dev is not None:
2088
      # FIXME: choose a saner value for the snapshot size
2089
      # let's stay on the safe side and ask for the full size, for now
2090
      return r_dev.Snapshot(disk.size)
2091
    else:
2092
      _Fail("Cannot find block device %s", disk)
2093
  else:
2094
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2095
          disk.unique_id, disk.dev_type)
2096

    
2097

    
2098
def FinalizeExport(instance, snap_disks):
2099
  """Write out the export configuration information.
2100

2101
  @type instance: L{objects.Instance}
2102
  @param instance: the instance which we export, used for
2103
      saving configuration
2104
  @type snap_disks: list of L{objects.Disk}
2105
  @param snap_disks: list of snapshot block devices, which
2106
      will be used to get the actual name of the dump file
2107

2108
  @rtype: None
2109

2110
  """
2111
  destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2112
  finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2113

    
2114
  config = objects.SerializableConfigParser()
2115

    
2116
  config.add_section(constants.INISECT_EXP)
2117
  config.set(constants.INISECT_EXP, 'version', '0')
2118
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2119
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2120
  config.set(constants.INISECT_EXP, 'os', instance.os)
2121
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
2122

    
2123
  config.add_section(constants.INISECT_INS)
2124
  config.set(constants.INISECT_INS, 'name', instance.name)
2125
  config.set(constants.INISECT_INS, 'memory', '%d' %
2126
             instance.beparams[constants.BE_MEMORY])
2127
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
2128
             instance.beparams[constants.BE_VCPUS])
2129
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2130
  config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2131

    
2132
  nic_total = 0
2133
  for nic_count, nic in enumerate(instance.nics):
2134
    nic_total += 1
2135
    config.set(constants.INISECT_INS, 'nic%d_mac' %
2136
               nic_count, '%s' % nic.mac)
2137
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2138
    for param in constants.NICS_PARAMETER_TYPES:
2139
      config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2140
                 '%s' % nic.nicparams.get(param, None))
2141
  # TODO: redundant: on load can read nics until it doesn't exist
2142
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2143

    
2144
  disk_total = 0
2145
  for disk_count, disk in enumerate(snap_disks):
2146
    if disk:
2147
      disk_total += 1
2148
      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2149
                 ('%s' % disk.iv_name))
2150
      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2151
                 ('%s' % disk.physical_id[1]))
2152
      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2153
                 ('%d' % disk.size))
2154

    
2155
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2156

    
2157
  # New-style hypervisor/backend parameters
2158

    
2159
  config.add_section(constants.INISECT_HYP)
2160
  for name, value in instance.hvparams.items():
2161
    if name not in constants.HVC_GLOBALS:
2162
      config.set(constants.INISECT_HYP, name, str(value))
2163

    
2164
  config.add_section(constants.INISECT_BEP)
2165
  for name, value in instance.beparams.items():
2166
    config.set(constants.INISECT_BEP, name, str(value))
2167

    
2168
  config.add_section(constants.INISECT_OSP)
2169
  for name, value in instance.osparams.items():
2170
    config.set(constants.INISECT_OSP, name, str(value))
2171

    
2172
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2173
                  data=config.Dumps())
2174
  shutil.rmtree(finaldestdir, ignore_errors=True)
2175
  shutil.move(destdir, finaldestdir)
2176

    
2177

    
2178
def ExportInfo(dest):
2179
  """Get export configuration information.
2180

2181
  @type dest: str
2182
  @param dest: directory containing the export
2183

2184
  @rtype: L{objects.SerializableConfigParser}
2185
  @return: a serializable config file containing the
2186
      export info
2187

2188
  """
2189
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2190

    
2191
  config = objects.SerializableConfigParser()
2192
  config.read(cff)
2193

    
2194
  if (not config.has_section(constants.INISECT_EXP) or
2195
      not config.has_section(constants.INISECT_INS)):
2196
    _Fail("Export info file doesn't have the required fields")
2197

    
2198
  return config.Dumps()
2199

    
2200

    
2201
def ListExports():
2202
  """Return a list of exports currently available on this machine.
2203

2204
  @rtype: list
2205
  @return: list of the exports
2206

2207
  """
2208
  if os.path.isdir(constants.EXPORT_DIR):
2209
    return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2210
  else:
2211
    _Fail("No exports directory")
2212

    
2213

    
2214
def RemoveExport(export):
2215
  """Remove an existing export from the node.
2216

2217
  @type export: str
2218
  @param export: the name of the export to remove
2219
  @rtype: None
2220

2221
  """
2222
  target = utils.PathJoin(constants.EXPORT_DIR, export)
2223

    
2224
  try:
2225
    shutil.rmtree(target)
2226
  except EnvironmentError, err:
2227
    _Fail("Error while removing the export: %s", err, exc=True)
2228

    
2229

    
2230
def BlockdevRename(devlist):
2231
  """Rename a list of block devices.
2232

2233
  @type devlist: list of tuples
2234
  @param devlist: list of tuples of the form  (disk,
2235
      new_logical_id, new_physical_id); disk is an
2236
      L{objects.Disk} object describing the current disk,
2237
      and new logical_id/physical_id is the name we
2238
      rename it to
2239
  @rtype: boolean
2240
  @return: True if all renames succeeded, False otherwise
2241

2242
  """
2243
  msgs = []
2244
  result = True
2245
  for disk, unique_id in devlist:
2246
    dev = _RecursiveFindBD(disk)
2247
    if dev is None:
2248
      msgs.append("Can't find device %s in rename" % str(disk))
2249
      result = False
2250
      continue
2251
    try:
2252
      old_rpath = dev.dev_path
2253
      dev.Rename(unique_id)
2254
      new_rpath = dev.dev_path
2255
      if old_rpath != new_rpath:
2256
        DevCacheManager.RemoveCache(old_rpath)
2257
        # FIXME: we should add the new cache information here, like:
2258
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2259
        # but we don't have the owner here - maybe parse from existing
2260
        # cache? for now, we only lose lvm data when we rename, which
2261
        # is less critical than DRBD or MD
2262
    except errors.BlockDeviceError, err:
2263
      msgs.append("Can't rename device '%s' to '%s': %s" %
2264
                  (dev, unique_id, err))
2265
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2266
      result = False
2267
  if not result:
2268
    _Fail("; ".join(msgs))
2269

    
2270

    
2271
def _TransformFileStorageDir(file_storage_dir):
2272
  """Checks whether given file_storage_dir is valid.
2273

2274
  Checks wheter the given file_storage_dir is within the cluster-wide
2275
  default file_storage_dir stored in SimpleStore. Only paths under that
2276
  directory are allowed.
2277

2278
  @type file_storage_dir: str
2279
  @param file_storage_dir: the path to check
2280

2281
  @return: the normalized path if valid, None otherwise
2282

2283
  """
2284
  if not constants.ENABLE_FILE_STORAGE:
2285
    _Fail("File storage disabled at configure time")
2286
  cfg = _GetConfig()
2287
  file_storage_dir = os.path.normpath(file_storage_dir)
2288
  base_file_storage_dir = cfg.GetFileStorageDir()
2289
  if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2290
      base_file_storage_dir):
2291
    _Fail("File storage directory '%s' is not under base file"
2292
          " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2293
  return file_storage_dir
2294

    
2295

    
2296
def CreateFileStorageDir(file_storage_dir):
2297
  """Create file storage directory.
2298

2299
  @type file_storage_dir: str
2300
  @param file_storage_dir: directory to create
2301

2302
  @rtype: tuple
2303
  @return: tuple with first element a boolean indicating wheter dir
2304
      creation was successful or not
2305

2306
  """
2307
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2308
  if os.path.exists(file_storage_dir):
2309
    if not os.path.isdir(file_storage_dir):
2310
      _Fail("Specified storage dir '%s' is not a directory",
2311
            file_storage_dir)
2312
  else:
2313
    try:
2314
      os.makedirs(file_storage_dir, 0750)
2315
    except OSError, err:
2316
      _Fail("Cannot create file storage directory '%s': %s",
2317
            file_storage_dir, err, exc=True)
2318

    
2319

    
2320
def RemoveFileStorageDir(file_storage_dir):
2321
  """Remove file storage directory.
2322

2323
  Remove it only if it's empty. If not log an error and return.
2324

2325
  @type file_storage_dir: str
2326
  @param file_storage_dir: the directory we should cleanup
2327
  @rtype: tuple (success,)
2328
  @return: tuple of one element, C{success}, denoting
2329
      whether the operation was successful
2330

2331
  """
2332
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2333
  if os.path.exists(file_storage_dir):
2334
    if not os.path.isdir(file_storage_dir):
2335
      _Fail("Specified Storage directory '%s' is not a directory",
2336
            file_storage_dir)
2337
    # deletes dir only if empty, otherwise we want to fail the rpc call
2338
    try:
2339
      os.rmdir(file_storage_dir)
2340
    except OSError, err:
2341
      _Fail("Cannot remove file storage directory '%s': %s",
2342
            file_storage_dir, err)
2343

    
2344

    
2345
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2346
  """Rename the file storage directory.
2347

2348
  @type old_file_storage_dir: str
2349
  @param old_file_storage_dir: the current path
2350
  @type new_file_storage_dir: str
2351
  @param new_file_storage_dir: the name we should rename to
2352
  @rtype: tuple (success,)
2353
  @return: tuple of one element, C{success}, denoting
2354
      whether the operation was successful
2355

2356
  """
2357
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2358
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2359
  if not os.path.exists(new_file_storage_dir):
2360
    if os.path.isdir(old_file_storage_dir):
2361
      try:
2362
        os.rename(old_file_storage_dir, new_file_storage_dir)
2363
      except OSError, err:
2364
        _Fail("Cannot rename '%s' to '%s': %s",
2365
              old_file_storage_dir, new_file_storage_dir, err)
2366
    else:
2367
      _Fail("Specified storage dir '%s' is not a directory",
2368
            old_file_storage_dir)
2369
  else:
2370
    if os.path.exists(old_file_storage_dir):
2371
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2372
            old_file_storage_dir, new_file_storage_dir)
2373

    
2374

    
2375
def _EnsureJobQueueFile(file_name):
2376
  """Checks whether the given filename is in the queue directory.
2377

2378
  @type file_name: str
2379
  @param file_name: the file name we should check
2380
  @rtype: None
2381
  @raises RPCFail: if the file is not valid
2382

2383
  """
2384
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2385
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2386

    
2387
  if not result:
2388
    _Fail("Passed job queue file '%s' does not belong to"
2389
          " the queue directory '%s'", file_name, queue_dir)
2390

    
2391

    
2392
def JobQueueUpdate(file_name, content):
2393
  """Updates a file in the queue directory.
2394

2395
  This is just a wrapper over L{utils.WriteFile}, with proper
2396
  checking.
2397

2398
  @type file_name: str
2399
  @param file_name: the job file name
2400
  @type content: str
2401
  @param content: the new job contents
2402
  @rtype: boolean
2403
  @return: the success of the operation
2404

2405
  """
2406
  _EnsureJobQueueFile(file_name)
2407
  getents = runtime.GetEnts()
2408

    
2409
  # Write and replace the file atomically
2410
  utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2411
                  gid=getents.masterd_gid)
2412

    
2413

    
2414
def JobQueueRename(old, new):
2415
  """Renames a job queue file.
2416

2417
  This is just a wrapper over os.rename with proper checking.
2418

2419
  @type old: str
2420
  @param old: the old (actual) file name
2421
  @type new: str
2422
  @param new: the desired file name
2423
  @rtype: tuple
2424
  @return: the success of the operation and payload
2425

2426
  """
2427
  _EnsureJobQueueFile(old)
2428
  _EnsureJobQueueFile(new)
2429

    
2430
  utils.RenameFile(old, new, mkdir=True)
2431

    
2432

    
2433
def BlockdevClose(instance_name, disks):
2434
  """Closes the given block devices.
2435

2436
  This means they will be switched to secondary mode (in case of
2437
  DRBD).
2438

2439
  @param instance_name: if the argument is not empty, the symlinks
2440
      of this instance will be removed
2441
  @type disks: list of L{objects.Disk}
2442
  @param disks: the list of disks to be closed
2443
  @rtype: tuple (success, message)
2444
  @return: a tuple of success and message, where success
2445
      indicates the succes of the operation, and message
2446
      which will contain the error details in case we
2447
      failed
2448

2449
  """
2450
  bdevs = []
2451
  for cf in disks:
2452
    rd = _RecursiveFindBD(cf)
2453
    if rd is None:
2454
      _Fail("Can't find device %s", cf)
2455
    bdevs.append(rd)
2456

    
2457
  msg = []
2458
  for rd in bdevs:
2459
    try:
2460
      rd.Close()
2461
    except errors.BlockDeviceError, err:
2462
      msg.append(str(err))
2463
  if msg:
2464
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2465
  else:
2466
    if instance_name:
2467
      _RemoveBlockDevLinks(instance_name, disks)
2468

    
2469

    
2470
def ValidateHVParams(hvname, hvparams):
2471
  """Validates the given hypervisor parameters.
2472

2473
  @type hvname: string
2474
  @param hvname: the hypervisor name
2475
  @type hvparams: dict
2476
  @param hvparams: the hypervisor parameters to be validated
2477
  @rtype: None
2478

2479
  """
2480
  try:
2481
    hv_type = hypervisor.GetHypervisor(hvname)
2482
    hv_type.ValidateParameters(hvparams)
2483
  except errors.HypervisorError, err:
2484
    _Fail(str(err), log=False)
2485

    
2486

    
2487
def _CheckOSPList(os_obj, parameters):
2488
  """Check whether a list of parameters is supported by the OS.
2489

2490
  @type os_obj: L{objects.OS}
2491
  @param os_obj: OS object to check
2492
  @type parameters: list
2493
  @param parameters: the list of parameters to check
2494

2495
  """
2496
  supported = [v[0] for v in os_obj.supported_parameters]
2497
  delta = frozenset(parameters).difference(supported)
2498
  if delta:
2499
    _Fail("The following parameters are not supported"
2500
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2501

    
2502

    
2503
def ValidateOS(required, osname, checks, osparams):
2504
  """Validate the given OS' parameters.
2505

2506
  @type required: boolean
2507
  @param required: whether absence of the OS should translate into
2508
      failure or not
2509
  @type osname: string
2510
  @param osname: the OS to be validated
2511
  @type checks: list
2512
  @param checks: list of the checks to run (currently only 'parameters')
2513
  @type osparams: dict
2514
  @param osparams: dictionary with OS parameters
2515
  @rtype: boolean
2516
  @return: True if the validation passed, or False if the OS was not
2517
      found and L{required} was false
2518

2519
  """
2520
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2521
    _Fail("Unknown checks required for OS %s: %s", osname,
2522
          set(checks).difference(constants.OS_VALIDATE_CALLS))
2523

    
2524
  name_only = osname.split("+", 1)[0]
2525
  status, tbv = _TryOSFromDisk(name_only, None)
2526

    
2527
  if not status:
2528
    if required:
2529
      _Fail(tbv)
2530
    else:
2531
      return False
2532

    
2533
  if max(tbv.api_versions) < constants.OS_API_V20:
2534
    return True
2535

    
2536
  if constants.OS_VALIDATE_PARAMETERS in checks:
2537
    _CheckOSPList(tbv, osparams.keys())
2538

    
2539
  validate_env = OSCoreEnv(tbv, osparams)
2540
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2541
                        cwd=tbv.path)
2542
  if result.failed:
2543
    logging.error("os validate command '%s' returned error: %s output: %s",
2544
                  result.cmd, result.fail_reason, result.output)
2545
    _Fail("OS validation script failed (%s), output: %s",
2546
          result.fail_reason, result.output, log=False)
2547

    
2548
  return True
2549

    
2550

    
2551
def DemoteFromMC():
2552
  """Demotes the current node from master candidate role.
2553

2554
  """
2555
  # try to ensure we're not the master by mistake
2556
  master, myself = ssconf.GetMasterAndMyself()
2557
  if master == myself:
2558
    _Fail("ssconf status shows I'm the master node, will not demote")
2559

    
2560
  result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2561
  if not result.failed:
2562
    _Fail("The master daemon is running, will not demote")
2563

    
2564
  try:
2565
    if os.path.isfile(constants.CLUSTER_CONF_FILE):
2566
      utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2567
  except EnvironmentError, err:
2568
    if err.errno != errno.ENOENT:
2569
      _Fail("Error while backing up cluster file: %s", err, exc=True)
2570

    
2571
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2572

    
2573

    
2574
def _GetX509Filenames(cryptodir, name):
2575
  """Returns the full paths for the private key and certificate.
2576

2577
  """
2578
  return (utils.PathJoin(cryptodir, name),
2579
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2580
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2581

    
2582

    
2583
def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2584
  """Creates a new X509 certificate for SSL/TLS.
2585

2586
  @type validity: int
2587
  @param validity: Validity in seconds
2588
  @rtype: tuple; (string, string)
2589
  @return: Certificate name and public part
2590

2591
  """
2592
  (key_pem, cert_pem) = \
2593
    utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2594
                                     min(validity, _MAX_SSL_CERT_VALIDITY))
2595

    
2596
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
2597
                              prefix="x509-%s-" % utils.TimestampForFilename())
2598
  try:
2599
    name = os.path.basename(cert_dir)
2600
    assert len(name) > 5
2601

    
2602
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2603

    
2604
    utils.WriteFile(key_file, mode=0400, data=key_pem)
2605
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2606

    
2607
    # Never return private key as it shouldn't leave the node
2608
    return (name, cert_pem)
2609
  except Exception:
2610
    shutil.rmtree(cert_dir, ignore_errors=True)
2611
    raise
2612

    
2613

    
2614
def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2615
  """Removes a X509 certificate.
2616

2617
  @type name: string
2618
  @param name: Certificate name
2619

2620
  """
2621
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2622

    
2623
  utils.RemoveFile(key_file)
2624
  utils.RemoveFile(cert_file)
2625

    
2626
  try:
2627
    os.rmdir(cert_dir)
2628
  except EnvironmentError, err:
2629
    _Fail("Cannot remove certificate directory '%s': %s",
2630
          cert_dir, err)
2631

    
2632

    
2633
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2634
  """Returns the command for the requested input/output.
2635

2636
  @type instance: L{objects.Instance}
2637
  @param instance: The instance object
2638
  @param mode: Import/export mode
2639
  @param ieio: Input/output type
2640
  @param ieargs: Input/output arguments
2641

2642
  """
2643
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2644

    
2645
  env = None
2646
  prefix = None
2647
  suffix = None
2648
  exp_size = None
2649

    
2650
  if ieio == constants.IEIO_FILE:
2651
    (filename, ) = ieargs
2652

    
2653
    if not utils.IsNormAbsPath(filename):
2654
      _Fail("Path '%s' is not normalized or absolute", filename)
2655

    
2656
    directory = os.path.normpath(os.path.dirname(filename))
2657

    
2658
    if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2659
        constants.EXPORT_DIR):
2660
      _Fail("File '%s' is not under exports directory '%s'",
2661
            filename, constants.EXPORT_DIR)
2662

    
2663
    # Create directory
2664
    utils.Makedirs(directory, mode=0750)
2665

    
2666
    quoted_filename = utils.ShellQuote(filename)
2667

    
2668
    if mode == constants.IEM_IMPORT:
2669
      suffix = "> %s" % quoted_filename
2670
    elif mode == constants.IEM_EXPORT:
2671
      suffix = "< %s" % quoted_filename
2672

    
2673
      # Retrieve file size
2674
      try:
2675
        st = os.stat(filename)
2676
      except EnvironmentError, err:
2677
        logging.error("Can't stat(2) %s: %s", filename, err)
2678
      else:
2679
        exp_size = utils.BytesToMebibyte(st.st_size)
2680

    
2681
  elif ieio == constants.IEIO_RAW_DISK:
2682
    (disk, ) = ieargs
2683

    
2684
    real_disk = _OpenRealBD(disk)
2685

    
2686
    if mode == constants.IEM_IMPORT:
2687
      # we set here a smaller block size as, due to transport buffering, more
2688
      # than 64-128k will mostly ignored; we use nocreat to fail if the device
2689
      # is not already there or we pass a wrong path; we use notrunc to no
2690
      # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2691
      # much memory; this means that at best, we flush every 64k, which will
2692
      # not be very fast
2693
      suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2694
                                    " bs=%s oflag=dsync"),
2695
                                    real_disk.dev_path,
2696
                                    str(64 * 1024))
2697

    
2698
    elif mode == constants.IEM_EXPORT:
2699
      # the block size on the read dd is 1MiB to match our units
2700
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2701
                                   real_disk.dev_path,
2702
                                   str(1024 * 1024), # 1 MB
2703
                                   str(disk.size))
2704
      exp_size = disk.size
2705

    
2706
  elif ieio == constants.IEIO_SCRIPT:
2707
    (disk, disk_index, ) = ieargs
2708

    
2709
    assert isinstance(disk_index, (int, long))
2710

    
2711
    real_disk = _OpenRealBD(disk)
2712

    
2713
    inst_os = OSFromDisk(instance.os)
2714
    env = OSEnvironment(instance, inst_os)
2715

    
2716
    if mode == constants.IEM_IMPORT:
2717
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2718
      env["IMPORT_INDEX"] = str(disk_index)
2719
      script = inst_os.import_script
2720

    
2721
    elif mode == constants.IEM_EXPORT:
2722
      env["EXPORT_DEVICE"] = real_disk.dev_path
2723
      env["EXPORT_INDEX"] = str(disk_index)
2724
      script = inst_os.export_script
2725

    
2726
    # TODO: Pass special environment only to script
2727
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2728

    
2729
    if mode == constants.IEM_IMPORT:
2730
      suffix = "| %s" % script_cmd
2731

    
2732
    elif mode == constants.IEM_EXPORT:
2733
      prefix = "%s |" % script_cmd
2734

    
2735
    # Let script predict size
2736
    exp_size = constants.IE_CUSTOM_SIZE
2737

    
2738
  else:
2739
    _Fail("Invalid %s I/O mode %r", mode, ieio)
2740

    
2741
  return (env, prefix, suffix, exp_size)
2742

    
2743

    
2744
def _CreateImportExportStatusDir(prefix):
2745
  """Creates status directory for import/export.
2746

2747
  """
2748
  return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2749
                          prefix=("%s-%s-" %
2750
                                  (prefix, utils.TimestampForFilename())))
2751

    
2752

    
2753
def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
2754
  """Starts an import or export daemon.
2755

2756
  @param mode: Import/output mode
2757
  @type opts: L{objects.ImportExportOptions}
2758
  @param opts: Daemon options
2759
  @type host: string
2760
  @param host: Remote host for export (None for import)
2761
  @type port: int
2762
  @param port: Remote port for export (None for import)
2763
  @type instance: L{objects.Instance}
2764
  @param instance: Instance object
2765
  @param ieio: Input/output type
2766
  @param ieioargs: Input/output arguments
2767

2768
  """
2769
  if mode == constants.IEM_IMPORT:
2770
    prefix = "import"
2771

    
2772
    if not (host is None and port is None):
2773
      _Fail("Can not specify host or port on import")
2774

    
2775
  elif mode == constants.IEM_EXPORT:
2776
    prefix = "export"
2777

    
2778
    if host is None or port is None:
2779
      _Fail("Host and port must be specified for an export")
2780

    
2781
  else:
2782
    _Fail("Invalid mode %r", mode)
2783

    
2784
  if (opts.key_name is None) ^ (opts.ca_pem is None):
2785
    _Fail("Cluster certificate can only be used for both key and CA")
2786

    
2787
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
2788
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
2789

    
2790
  if opts.key_name is None:
2791
    # Use server.pem
2792
    key_path = constants.NODED_CERT_FILE
2793
    cert_path = constants.NODED_CERT_FILE
2794
    assert opts.ca_pem is None
2795
  else:
2796
    (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
2797
                                                 opts.key_name)
2798
    assert opts.ca_pem is not None
2799

    
2800
  for i in [key_path, cert_path]:
2801
    if not os.path.exists(i):
2802
      _Fail("File '%s' does not exist" % i)
2803

    
2804
  status_dir = _CreateImportExportStatusDir(prefix)
2805
  try:
2806
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
2807
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
2808
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
2809

    
2810
    if opts.ca_pem is None:
2811
      # Use server.pem
2812
      ca = utils.ReadFile(constants.NODED_CERT_FILE)
2813
    else:
2814
      ca = opts.ca_pem
2815

    
2816
    # Write CA file
2817
    utils.WriteFile(ca_file, data=ca, mode=0400)
2818

    
2819
    cmd = [
2820
      constants.IMPORT_EXPORT_DAEMON,
2821
      status_file, mode,
2822
      "--key=%s" % key_path,
2823
      "--cert=%s" % cert_path,
2824
      "--ca=%s" % ca_file,
2825
      ]
2826

    
2827
    if host:
2828
      cmd.append("--host=%s" % host)
2829

    
2830
    if port:
2831
      cmd.append("--port=%s" % port)
2832

    
2833
    if opts.compress:
2834
      cmd.append("--compress=%s" % opts.compress)
2835

    
2836
    if opts.magic:
2837
      cmd.append("--magic=%s" % opts.magic)
2838

    
2839
    if exp_size is not None:
2840
      cmd.append("--expected-size=%s" % exp_size)
2841

    
2842
    if cmd_prefix:
2843
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
2844

    
2845
    if cmd_suffix:
2846
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
2847

    
2848
    logfile = _InstanceLogName(prefix, instance.os, instance.name)
2849

    
2850
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
2851
    # support for receiving a file descriptor for output
2852
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
2853
                      output=logfile)
2854

    
2855
    # The import/export name is simply the status directory name
2856
    return os.path.basename(status_dir)
2857

    
2858
  except Exception:
2859
    shutil.rmtree(status_dir, ignore_errors=True)
2860
    raise
2861

    
2862

    
2863
def GetImportExportStatus(names):
2864
  """Returns import/export daemon status.
2865

2866
  @type names: sequence
2867
  @param names: List of names
2868
  @rtype: List of dicts
2869
  @return: Returns a list of the state of each named import/export or None if a
2870
           status couldn't be read
2871

2872
  """
2873
  result = []
2874

    
2875
  for name in names:
2876
    status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
2877
                                 _IES_STATUS_FILE)
2878

    
2879
    try:
2880
      data = utils.ReadFile(status_file)
2881
    except EnvironmentError, err:
2882
      if err.errno != errno.ENOENT:
2883
        raise
2884
      data = None
2885

    
2886
    if not data:
2887
      result.append(None)
2888
      continue
2889

    
2890
    result.append(serializer.LoadJson(data))
2891

    
2892
  return result
2893

    
2894

    
2895
def AbortImportExport(name):
2896
  """Sends SIGTERM to a running import/export daemon.
2897

2898
  """
2899
  logging.info("Abort import/export %s", name)
2900

    
2901
  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2902
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2903

    
2904
  if pid:
2905
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
2906
                 name, pid)
2907
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
2908

    
2909

    
2910
def CleanupImportExport(name):
2911
  """Cleanup after an import or export.
2912

2913
  If the import/export daemon is still running it's killed. Afterwards the
2914
  whole status directory is removed.
2915

2916
  """
2917
  logging.info("Finalizing import/export %s", name)
2918

    
2919
  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2920

    
2921
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2922

    
2923
  if pid:
2924
    logging.info("Import/export %s is still running with PID %s",
2925
                 name, pid)
2926
    utils.KillProcess(pid, waitpid=False)
2927

    
2928
  shutil.rmtree(status_dir, ignore_errors=True)
2929

    
2930

    
2931
def _FindDisks(nodes_ip, disks):
2932
  """Sets the physical ID on disks and returns the block devices.
2933

2934
  """
2935
  # set the correct physical ID
2936
  my_name = netutils.Hostname.GetSysName()
2937
  for cf in disks:
2938
    cf.SetPhysicalID(my_name, nodes_ip)
2939

    
2940
  bdevs = []
2941

    
2942
  for cf in disks:
2943
    rd = _RecursiveFindBD(cf)
2944
    if rd is None:
2945
      _Fail("Can't find device %s", cf)
2946
    bdevs.append(rd)
2947
  return bdevs
2948

    
2949

    
2950
def DrbdDisconnectNet(nodes_ip, disks):
2951
  """Disconnects the network on a list of drbd devices.
2952

2953
  """
2954
  bdevs = _FindDisks(nodes_ip, disks)
2955

    
2956
  # disconnect disks
2957
  for rd in bdevs:
2958
    try:
2959
      rd.DisconnectNet()
2960
    except errors.BlockDeviceError, err:
2961
      _Fail("Can't change network configuration to standalone mode: %s",
2962
            err, exc=True)
2963

    
2964

    
2965
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2966
  """Attaches the network on a list of drbd devices.
2967

2968
  """
2969
  bdevs = _FindDisks(nodes_ip, disks)
2970

    
2971
  if multimaster:
2972
    for idx, rd in enumerate(bdevs):
2973
      try:
2974
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2975
      except EnvironmentError, err:
2976
        _Fail("Can't create symlink: %s", err)
2977
  # reconnect disks, switch to new master configuration and if
2978
  # needed primary mode
2979
  for rd in bdevs:
2980
    try:
2981
      rd.AttachNet(multimaster)
2982
    except errors.BlockDeviceError, err:
2983
      _Fail("Can't change network configuration: %s", err)
2984

    
2985
  # wait until the disks are connected; we need to retry the re-attach
2986
  # if the device becomes standalone, as this might happen if the one
2987
  # node disconnects and reconnects in a different mode before the
2988
  # other node reconnects; in this case, one or both of the nodes will
2989
  # decide it has wrong configuration and switch to standalone
2990

    
2991
  def _Attach():
2992
    all_connected = True
2993

    
2994
    for rd in bdevs:
2995
      stats = rd.GetProcStatus()
2996

    
2997
      all_connected = (all_connected and
2998
                       (stats.is_connected or stats.is_in_resync))
2999

    
3000
      if stats.is_standalone:
3001
        # peer had different config info and this node became
3002
        # standalone, even though this should not happen with the
3003
        # new staged way of changing disk configs
3004
        try:
3005
          rd.AttachNet(multimaster)
3006
        except errors.BlockDeviceError, err:
3007
          _Fail("Can't change network configuration: %s", err)
3008

    
3009
    if not all_connected:
3010
      raise utils.RetryAgain()
3011

    
3012
  try:
3013
    # Start with a delay of 100 miliseconds and go up to 5 seconds
3014
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3015
  except utils.RetryTimeout:
3016
    _Fail("Timeout in disk reconnecting")
3017

    
3018
  if multimaster:
3019
    # change to primary mode
3020
    for rd in bdevs:
3021
      try:
3022
        rd.Open()
3023
      except errors.BlockDeviceError, err:
3024
        _Fail("Can't change to primary mode: %s", err)
3025

    
3026

    
3027
def DrbdWaitSync(nodes_ip, disks):
3028
  """Wait until DRBDs have synchronized.
3029

3030
  """
3031
  def _helper(rd):
3032
    stats = rd.GetProcStatus()
3033
    if not (stats.is_connected or stats.is_in_resync):
3034
      raise utils.RetryAgain()
3035
    return stats
3036

    
3037
  bdevs = _FindDisks(nodes_ip, disks)
3038

    
3039
  min_resync = 100
3040
  alldone = True
3041
  for rd in bdevs:
3042
    try:
3043
      # poll each second for 15 seconds
3044
      stats = utils.Retry(_helper, 1, 15, args=[rd])
3045
    except utils.RetryTimeout:
3046
      stats = rd.GetProcStatus()
3047
      # last check
3048
      if not (stats.is_connected or stats.is_in_resync):
3049
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3050
    alldone = alldone and (not stats.is_in_resync)
3051
    if stats.sync_percent is not None:
3052
      min_resync = min(min_resync, stats.sync_percent)
3053

    
3054
  return (alldone, min_resync)
3055

    
3056

    
3057
def GetDrbdUsermodeHelper():
3058
  """Returns DRBD usermode helper currently configured.
3059

3060
  """
3061
  try:
3062
    return bdev.BaseDRBD.GetUsermodeHelper()
3063
  except errors.BlockDeviceError, err:
3064
    _Fail(str(err))
3065

    
3066

    
3067
def PowercycleNode(hypervisor_type):
3068
  """Hard-powercycle the node.
3069

3070
  Because we need to return first, and schedule the powercycle in the
3071
  background, we won't be able to report failures nicely.
3072

3073
  """
3074
  hyper = hypervisor.GetHypervisor(hypervisor_type)
3075
  try:
3076
    pid = os.fork()
3077
  except OSError:
3078
    # if we can't fork, we'll pretend that we're in the child process
3079
    pid = 0
3080
  if pid > 0:
3081
    return "Reboot scheduled in 5 seconds"
3082
  # ensure the child is running on ram
3083
  try:
3084
    utils.Mlockall()
3085
  except Exception: # pylint: disable-msg=W0703
3086
    pass
3087
  time.sleep(5)
3088
  hyper.PowercycleNode()
3089

    
3090

    
3091
class HooksRunner(object):
3092
  """Hook runner.
3093

3094
  This class is instantiated on the node side (ganeti-noded) and not
3095
  on the master side.
3096

3097
  """
3098
  def __init__(self, hooks_base_dir=None):
3099
    """Constructor for hooks runner.
3100

3101
    @type hooks_base_dir: str or None
3102
    @param hooks_base_dir: if not None, this overrides the
3103
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
3104

3105
    """
3106
    if hooks_base_dir is None:
3107
      hooks_base_dir = constants.HOOKS_BASE_DIR
3108
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
3109
    # constant
3110
    self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
3111

    
3112
  def RunHooks(self, hpath, phase, env):
3113
    """Run the scripts in the hooks directory.
3114

3115
    @type hpath: str
3116
    @param hpath: the path to the hooks directory which
3117
        holds the scripts
3118
    @type phase: str
3119
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
3120
        L{constants.HOOKS_PHASE_POST}
3121
    @type env: dict
3122
    @param env: dictionary with the environment for the hook
3123
    @rtype: list
3124
    @return: list of 3-element tuples:
3125
      - script path
3126
      - script result, either L{constants.HKR_SUCCESS} or
3127
        L{constants.HKR_FAIL}
3128
      - output of the script
3129

3130
    @raise errors.ProgrammerError: for invalid input
3131
        parameters
3132

3133
    """
3134
    if phase == constants.HOOKS_PHASE_PRE:
3135
      suffix = "pre"
3136
    elif phase == constants.HOOKS_PHASE_POST:
3137
      suffix = "post"
3138
    else:
3139
      _Fail("Unknown hooks phase '%s'", phase)
3140

    
3141

    
3142
    subdir = "%s-%s.d" % (hpath, suffix)
3143
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3144

    
3145
    results = []
3146

    
3147
    if not os.path.isdir(dir_name):
3148
      # for non-existing/non-dirs, we simply exit instead of logging a
3149
      # warning at every operation
3150
      return results
3151

    
3152
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3153

    
3154
    for (relname, relstatus, runresult)  in runparts_results:
3155
      if relstatus == constants.RUNPARTS_SKIP:
3156
        rrval = constants.HKR_SKIP
3157
        output = ""
3158
      elif relstatus == constants.RUNPARTS_ERR:
3159
        rrval = constants.HKR_FAIL
3160
        output = "Hook script execution error: %s" % runresult
3161
      elif relstatus == constants.RUNPARTS_RUN:
3162
        if runresult.failed:
3163
          rrval = constants.HKR_FAIL
3164
        else:
3165
          rrval = constants.HKR_SUCCESS
3166
        output = utils.SafeEncode(runresult.output.strip())
3167
      results.append(("%s/%s" % (subdir, relname), rrval, output))
3168

    
3169
    return results
3170

    
3171

    
3172
class IAllocatorRunner(object):
3173
  """IAllocator runner.
3174

3175
  This class is instantiated on the node side (ganeti-noded) and not on
3176
  the master side.
3177

3178
  """
3179
  @staticmethod
3180
  def Run(name, idata):
3181
    """Run an iallocator script.
3182

3183
    @type name: str
3184
    @param name: the iallocator script name
3185
    @type idata: str
3186
    @param idata: the allocator input data
3187

3188
    @rtype: tuple
3189
    @return: two element tuple of:
3190
       - status
3191
       - either error message or stdout of allocator (for success)
3192

3193
    """
3194
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3195
                                  os.path.isfile)
3196
    if alloc_script is None:
3197
      _Fail("iallocator module '%s' not found in the search path", name)
3198

    
3199
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3200
    try:
3201
      os.write(fd, idata)
3202
      os.close(fd)
3203
      result = utils.RunCmd([alloc_script, fin_name])
3204
      if result.failed:
3205
        _Fail("iallocator module '%s' failed: %s, output '%s'",
3206
              name, result.fail_reason, result.output)
3207
    finally:
3208
      os.unlink(fin_name)
3209

    
3210
    return result.stdout
3211

    
3212

    
3213
class DevCacheManager(object):
3214
  """Simple class for managing a cache of block device information.
3215

3216
  """
3217
  _DEV_PREFIX = "/dev/"
3218
  _ROOT_DIR = constants.BDEV_CACHE_DIR
3219

    
3220
  @classmethod
3221
  def _ConvertPath(cls, dev_path):
3222
    """Converts a /dev/name path to the cache file name.
3223

3224
    This replaces slashes with underscores and strips the /dev
3225
    prefix. It then returns the full path to the cache file.
3226

3227
    @type dev_path: str
3228
    @param dev_path: the C{/dev/} path name
3229
    @rtype: str
3230
    @return: the converted path name
3231

3232
    """
3233
    if dev_path.startswith(cls._DEV_PREFIX):
3234
      dev_path = dev_path[len(cls._DEV_PREFIX):]
3235
    dev_path = dev_path.replace("/", "_")
3236
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3237
    return fpath
3238

    
3239
  @classmethod
3240
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3241
    """Updates the cache information for a given device.
3242

3243
    @type dev_path: str
3244
    @param dev_path: the pathname of the device
3245
    @type owner: str
3246
    @param owner: the owner (instance name) of the device
3247
    @type on_primary: bool
3248
    @param on_primary: whether this is the primary
3249
        node nor not
3250
    @type iv_name: str
3251
    @param iv_name: the instance-visible name of the
3252
        device, as in objects.Disk.iv_name
3253

3254
    @rtype: None
3255

3256
    """
3257
    if dev_path is None:
3258
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
3259
      return
3260
    fpath = cls._ConvertPath(dev_path)
3261
    if on_primary:
3262
      state = "primary"
3263
    else:
3264
      state = "secondary"
3265
    if iv_name is None:
3266
      iv_name = "not_visible"
3267
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3268
    try:
3269
      utils.WriteFile(fpath, data=fdata)
3270
    except EnvironmentError, err:
3271
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3272

    
3273
  @classmethod
3274
  def RemoveCache(cls, dev_path):
3275
    """Remove data for a dev_path.
3276

3277
    This is just a wrapper over L{utils.RemoveFile} with a converted
3278
    path name and logging.
3279

3280
    @type dev_path: str
3281
    @param dev_path: the pathname of the device
3282

3283
    @rtype: None
3284

3285
    """
3286
    if dev_path is None:
3287
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
3288
      return
3289
    fpath = cls._ConvertPath(dev_path)
3290
    try:
3291
      utils.RemoveFile(fpath)
3292
    except EnvironmentError, err:
3293
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)