Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 8f065ae2

History | View | Annotate | Download (98.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable-msg=E1103
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37

    
38
import os
39
import os.path
40
import shutil
41
import time
42
import stat
43
import errno
44
import re
45
import random
46
import logging
47
import tempfile
48
import zlib
49
import base64
50
import signal
51

    
52
from ganeti import errors
53
from ganeti import utils
54
from ganeti import ssh
55
from ganeti import hypervisor
56
from ganeti import constants
57
from ganeti import bdev
58
from ganeti import objects
59
from ganeti import ssconf
60
from ganeti import serializer
61
from ganeti import netutils
62
from ganeti import runtime
63

    
64

    
65
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
66
_ALLOWED_CLEAN_DIRS = frozenset([
67
  constants.DATA_DIR,
68
  constants.JOB_QUEUE_ARCHIVE_DIR,
69
  constants.QUEUE_DIR,
70
  constants.CRYPTO_KEYS_DIR,
71
  ])
72
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
73
_X509_KEY_FILE = "key"
74
_X509_CERT_FILE = "cert"
75
_IES_STATUS_FILE = "status"
76
_IES_PID_FILE = "pid"
77
_IES_CA_FILE = "ca"
78

    
79

    
80
class RPCFail(Exception):
81
  """Class denoting RPC failure.
82

83
  Its argument is the error message.
84

85
  """
86

    
87

    
88
def _Fail(msg, *args, **kwargs):
89
  """Log an error and the raise an RPCFail exception.
90

91
  This exception is then handled specially in the ganeti daemon and
92
  turned into a 'failed' return type. As such, this function is a
93
  useful shortcut for logging the error and returning it to the master
94
  daemon.
95

96
  @type msg: string
97
  @param msg: the text of the exception
98
  @raise RPCFail
99

100
  """
101
  if args:
102
    msg = msg % args
103
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
104
    if "exc" in kwargs and kwargs["exc"]:
105
      logging.exception(msg)
106
    else:
107
      logging.error(msg)
108
  raise RPCFail(msg)
109

    
110

    
111
def _GetConfig():
112
  """Simple wrapper to return a SimpleStore.
113

114
  @rtype: L{ssconf.SimpleStore}
115
  @return: a SimpleStore instance
116

117
  """
118
  return ssconf.SimpleStore()
119

    
120

    
121
def _GetSshRunner(cluster_name):
122
  """Simple wrapper to return an SshRunner.
123

124
  @type cluster_name: str
125
  @param cluster_name: the cluster name, which is needed
126
      by the SshRunner constructor
127
  @rtype: L{ssh.SshRunner}
128
  @return: an SshRunner instance
129

130
  """
131
  return ssh.SshRunner(cluster_name)
132

    
133

    
134
def _Decompress(data):
135
  """Unpacks data compressed by the RPC client.
136

137
  @type data: list or tuple
138
  @param data: Data sent by RPC client
139
  @rtype: str
140
  @return: Decompressed data
141

142
  """
143
  assert isinstance(data, (list, tuple))
144
  assert len(data) == 2
145
  (encoding, content) = data
146
  if encoding == constants.RPC_ENCODING_NONE:
147
    return content
148
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
149
    return zlib.decompress(base64.b64decode(content))
150
  else:
151
    raise AssertionError("Unknown data encoding")
152

    
153

    
154
def _CleanDirectory(path, exclude=None):
155
  """Removes all regular files in a directory.
156

157
  @type path: str
158
  @param path: the directory to clean
159
  @type exclude: list
160
  @param exclude: list of files to be excluded, defaults
161
      to the empty list
162

163
  """
164
  if path not in _ALLOWED_CLEAN_DIRS:
165
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
166
          path)
167

    
168
  if not os.path.isdir(path):
169
    return
170
  if exclude is None:
171
    exclude = []
172
  else:
173
    # Normalize excluded paths
174
    exclude = [os.path.normpath(i) for i in exclude]
175

    
176
  for rel_name in utils.ListVisibleFiles(path):
177
    full_name = utils.PathJoin(path, rel_name)
178
    if full_name in exclude:
179
      continue
180
    if os.path.isfile(full_name) and not os.path.islink(full_name):
181
      utils.RemoveFile(full_name)
182

    
183

    
184
def _BuildUploadFileList():
185
  """Build the list of allowed upload files.
186

187
  This is abstracted so that it's built only once at module import time.
188

189
  """
190
  allowed_files = set([
191
    constants.CLUSTER_CONF_FILE,
192
    constants.ETC_HOSTS,
193
    constants.SSH_KNOWN_HOSTS_FILE,
194
    constants.VNC_PASSWORD_FILE,
195
    constants.RAPI_CERT_FILE,
196
    constants.RAPI_USERS_FILE,
197
    constants.CONFD_HMAC_KEY,
198
    constants.CLUSTER_DOMAIN_SECRET_FILE,
199
    ])
200

    
201
  for hv_name in constants.HYPER_TYPES:
202
    hv_class = hypervisor.GetHypervisorClass(hv_name)
203
    allowed_files.update(hv_class.GetAncillaryFiles())
204

    
205
  return frozenset(allowed_files)
206

    
207

    
208
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
209

    
210

    
211
def JobQueuePurge():
212
  """Removes job queue files and archived jobs.
213

214
  @rtype: tuple
215
  @return: True, None
216

217
  """
218
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
219
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
220

    
221

    
222
def GetMasterInfo():
223
  """Returns master information.
224

225
  This is an utility function to compute master information, either
226
  for consumption here or from the node daemon.
227

228
  @rtype: tuple
229
  @return: master_netdev, master_ip, master_name, primary_ip_family
230
  @raise RPCFail: in case of errors
231

232
  """
233
  try:
234
    cfg = _GetConfig()
235
    master_netdev = cfg.GetMasterNetdev()
236
    master_ip = cfg.GetMasterIP()
237
    master_node = cfg.GetMasterNode()
238
    primary_ip_family = cfg.GetPrimaryIPFamily()
239
  except errors.ConfigurationError, err:
240
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
241
  return (master_netdev, master_ip, master_node, primary_ip_family)
242

    
243

    
244
def StartMaster(start_daemons, no_voting):
245
  """Activate local node as master node.
246

247
  The function will either try activate the IP address of the master
248
  (unless someone else has it) or also start the master daemons, based
249
  on the start_daemons parameter.
250

251
  @type start_daemons: boolean
252
  @param start_daemons: whether to start the master daemons
253
      (ganeti-masterd and ganeti-rapi), or (if false) activate the
254
      master ip
255
  @type no_voting: boolean
256
  @param no_voting: whether to start ganeti-masterd without a node vote
257
      (if start_daemons is True), but still non-interactively
258
  @rtype: None
259

260
  """
261
  # GetMasterInfo will raise an exception if not able to return data
262
  master_netdev, master_ip, _, family = GetMasterInfo()
263

    
264
  err_msgs = []
265
  # either start the master and rapi daemons
266
  if start_daemons:
267
    if no_voting:
268
      masterd_args = "--no-voting --yes-do-it"
269
    else:
270
      masterd_args = ""
271

    
272
    env = {
273
      "EXTRA_MASTERD_ARGS": masterd_args,
274
      }
275

    
276
    result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
277
    if result.failed:
278
      msg = "Can't start Ganeti master: %s" % result.output
279
      logging.error(msg)
280
      err_msgs.append(msg)
281
  # or activate the IP
282
  else:
283
    if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
284
      if netutils.IPAddress.Own(master_ip):
285
        # we already have the ip:
286
        logging.debug("Master IP already configured, doing nothing")
287
      else:
288
        msg = "Someone else has the master ip, not activating"
289
        logging.error(msg)
290
        err_msgs.append(msg)
291
    else:
292
      ipcls = netutils.IP4Address
293
      if family == netutils.IP6Address.family:
294
        ipcls = netutils.IP6Address
295

    
296
      result = utils.RunCmd(["ip", "address", "add",
297
                             "%s/%d" % (master_ip, ipcls.iplen),
298
                             "dev", master_netdev, "label",
299
                             "%s:0" % master_netdev])
300
      if result.failed:
301
        msg = "Can't activate master IP: %s" % result.output
302
        logging.error(msg)
303
        err_msgs.append(msg)
304

    
305
      # we ignore the exit code of the following cmds
306
      if ipcls == netutils.IP4Address:
307
        utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
308
                      master_ip, master_ip])
309
      elif ipcls == netutils.IP6Address:
310
        try:
311
          utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
312
        except errors.OpExecError:
313
          # TODO: Better error reporting
314
          logging.warning("Can't execute ndisc6, please install if missing")
315

    
316
  if err_msgs:
317
    _Fail("; ".join(err_msgs))
318

    
319

    
320
def StopMaster(stop_daemons):
321
  """Deactivate this node as master.
322

323
  The function will always try to deactivate the IP address of the
324
  master. It will also stop the master daemons depending on the
325
  stop_daemons parameter.
326

327
  @type stop_daemons: boolean
328
  @param stop_daemons: whether to also stop the master daemons
329
      (ganeti-masterd and ganeti-rapi)
330
  @rtype: None
331

332
  """
333
  # TODO: log and report back to the caller the error failures; we
334
  # need to decide in which case we fail the RPC for this
335

    
336
  # GetMasterInfo will raise an exception if not able to return data
337
  master_netdev, master_ip, _, family = GetMasterInfo()
338

    
339
  ipcls = netutils.IP4Address
340
  if family == netutils.IP6Address.family:
341
    ipcls = netutils.IP6Address
342

    
343
  result = utils.RunCmd(["ip", "address", "del",
344
                         "%s/%d" % (master_ip, ipcls.iplen),
345
                         "dev", master_netdev])
346
  if result.failed:
347
    logging.error("Can't remove the master IP, error: %s", result.output)
348
    # but otherwise ignore the failure
349

    
350
  if stop_daemons:
351
    result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
352
    if result.failed:
353
      logging.error("Could not stop Ganeti master, command %s had exitcode %s"
354
                    " and error %s",
355
                    result.cmd, result.exit_code, result.output)
356

    
357

    
358
def EtcHostsModify(mode, host, ip):
359
  """Modify a host entry in /etc/hosts.
360

361
  @param mode: The mode to operate. Either add or remove entry
362
  @param host: The host to operate on
363
  @param ip: The ip associated with the entry
364

365
  """
366
  if mode == constants.ETC_HOSTS_ADD:
367
    if not ip:
368
      RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
369
              " present")
370
    utils.AddHostToEtcHosts(host, ip)
371
  elif mode == constants.ETC_HOSTS_REMOVE:
372
    if ip:
373
      RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
374
              " parameter is present")
375
    utils.RemoveHostFromEtcHosts(host)
376
  else:
377
    RPCFail("Mode not supported")
378

    
379

    
380
def LeaveCluster(modify_ssh_setup):
381
  """Cleans up and remove the current node.
382

383
  This function cleans up and prepares the current node to be removed
384
  from the cluster.
385

386
  If processing is successful, then it raises an
387
  L{errors.QuitGanetiException} which is used as a special case to
388
  shutdown the node daemon.
389

390
  @param modify_ssh_setup: boolean
391

392
  """
393
  _CleanDirectory(constants.DATA_DIR)
394
  _CleanDirectory(constants.CRYPTO_KEYS_DIR)
395
  JobQueuePurge()
396

    
397
  if modify_ssh_setup:
398
    try:
399
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
400

    
401
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
402

    
403
      utils.RemoveFile(priv_key)
404
      utils.RemoveFile(pub_key)
405
    except errors.OpExecError:
406
      logging.exception("Error while processing ssh files")
407

    
408
  try:
409
    utils.RemoveFile(constants.CONFD_HMAC_KEY)
410
    utils.RemoveFile(constants.RAPI_CERT_FILE)
411
    utils.RemoveFile(constants.NODED_CERT_FILE)
412
  except: # pylint: disable-msg=W0702
413
    logging.exception("Error while removing cluster secrets")
414

    
415
  result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
416
  if result.failed:
417
    logging.error("Command %s failed with exitcode %s and error %s",
418
                  result.cmd, result.exit_code, result.output)
419

    
420
  # Raise a custom exception (handled in ganeti-noded)
421
  raise errors.QuitGanetiException(True, 'Shutdown scheduled')
422

    
423

    
424
def GetNodeInfo(vgname, hypervisor_type):
425
  """Gives back a hash with different information about the node.
426

427
  @type vgname: C{string}
428
  @param vgname: the name of the volume group to ask for disk space information
429
  @type hypervisor_type: C{str}
430
  @param hypervisor_type: the name of the hypervisor to ask for
431
      memory information
432
  @rtype: C{dict}
433
  @return: dictionary with the following keys:
434
      - vg_size is the size of the configured volume group in MiB
435
      - vg_free is the free size of the volume group in MiB
436
      - memory_dom0 is the memory allocated for domain0 in MiB
437
      - memory_free is the currently available (free) ram in MiB
438
      - memory_total is the total number of ram in MiB
439

440
  """
441
  outputarray = {}
442

    
443
  vginfo = bdev.LogicalVolume.GetVGInfo([vgname])
444
  vg_free = vg_size = None
445
  if vginfo:
446
    vg_free = int(round(vginfo[0][0], 0))
447
    vg_size = int(round(vginfo[0][1], 0))
448

    
449
  outputarray['vg_size'] = vg_size
450
  outputarray['vg_free'] = vg_free
451

    
452
  hyper = hypervisor.GetHypervisor(hypervisor_type)
453
  hyp_info = hyper.GetNodeInfo()
454
  if hyp_info is not None:
455
    outputarray.update(hyp_info)
456

    
457
  outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
458

    
459
  return outputarray
460

    
461

    
462
def VerifyNode(what, cluster_name):
463
  """Verify the status of the local node.
464

465
  Based on the input L{what} parameter, various checks are done on the
466
  local node.
467

468
  If the I{filelist} key is present, this list of
469
  files is checksummed and the file/checksum pairs are returned.
470

471
  If the I{nodelist} key is present, we check that we have
472
  connectivity via ssh with the target nodes (and check the hostname
473
  report).
474

475
  If the I{node-net-test} key is present, we check that we have
476
  connectivity to the given nodes via both primary IP and, if
477
  applicable, secondary IPs.
478

479
  @type what: C{dict}
480
  @param what: a dictionary of things to check:
481
      - filelist: list of files for which to compute checksums
482
      - nodelist: list of nodes we should check ssh communication with
483
      - node-net-test: list of nodes we should check node daemon port
484
        connectivity with
485
      - hypervisor: list with hypervisors to run the verify for
486
  @rtype: dict
487
  @return: a dictionary with the same keys as the input dict, and
488
      values representing the result of the checks
489

490
  """
491
  result = {}
492
  my_name = netutils.Hostname.GetSysName()
493
  port = netutils.GetDaemonPort(constants.NODED)
494

    
495
  if constants.NV_HYPERVISOR in what:
496
    result[constants.NV_HYPERVISOR] = tmp = {}
497
    for hv_name in what[constants.NV_HYPERVISOR]:
498
      try:
499
        val = hypervisor.GetHypervisor(hv_name).Verify()
500
      except errors.HypervisorError, err:
501
        val = "Error while checking hypervisor: %s" % str(err)
502
      tmp[hv_name] = val
503

    
504
  if constants.NV_FILELIST in what:
505
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
506
      what[constants.NV_FILELIST])
507

    
508
  if constants.NV_NODELIST in what:
509
    result[constants.NV_NODELIST] = tmp = {}
510
    random.shuffle(what[constants.NV_NODELIST])
511
    for node in what[constants.NV_NODELIST]:
512
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
513
      if not success:
514
        tmp[node] = message
515

    
516
  if constants.NV_NODENETTEST in what:
517
    result[constants.NV_NODENETTEST] = tmp = {}
518
    my_pip = my_sip = None
519
    for name, pip, sip in what[constants.NV_NODENETTEST]:
520
      if name == my_name:
521
        my_pip = pip
522
        my_sip = sip
523
        break
524
    if not my_pip:
525
      tmp[my_name] = ("Can't find my own primary/secondary IP"
526
                      " in the node list")
527
    else:
528
      for name, pip, sip in what[constants.NV_NODENETTEST]:
529
        fail = []
530
        if not netutils.TcpPing(pip, port, source=my_pip):
531
          fail.append("primary")
532
        if sip != pip:
533
          if not netutils.TcpPing(sip, port, source=my_sip):
534
            fail.append("secondary")
535
        if fail:
536
          tmp[name] = ("failure using the %s interface(s)" %
537
                       " and ".join(fail))
538

    
539
  if constants.NV_MASTERIP in what:
540
    # FIXME: add checks on incoming data structures (here and in the
541
    # rest of the function)
542
    master_name, master_ip = what[constants.NV_MASTERIP]
543
    if master_name == my_name:
544
      source = constants.IP4_ADDRESS_LOCALHOST
545
    else:
546
      source = None
547
    result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
548
                                                  source=source)
549

    
550
  if constants.NV_LVLIST in what:
551
    try:
552
      val = GetVolumeList(what[constants.NV_LVLIST])
553
    except RPCFail, err:
554
      val = str(err)
555
    result[constants.NV_LVLIST] = val
556

    
557
  if constants.NV_INSTANCELIST in what:
558
    # GetInstanceList can fail
559
    try:
560
      val = GetInstanceList(what[constants.NV_INSTANCELIST])
561
    except RPCFail, err:
562
      val = str(err)
563
    result[constants.NV_INSTANCELIST] = val
564

    
565
  if constants.NV_VGLIST in what:
566
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
567

    
568
  if constants.NV_PVLIST in what:
569
    result[constants.NV_PVLIST] = \
570
      bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
571
                                   filter_allocatable=False)
572

    
573
  if constants.NV_VERSION in what:
574
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
575
                                    constants.RELEASE_VERSION)
576

    
577
  if constants.NV_HVINFO in what:
578
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
579
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
580

    
581
  if constants.NV_DRBDLIST in what:
582
    try:
583
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
584
    except errors.BlockDeviceError, err:
585
      logging.warning("Can't get used minors list", exc_info=True)
586
      used_minors = str(err)
587
    result[constants.NV_DRBDLIST] = used_minors
588

    
589
  if constants.NV_DRBDHELPER in what:
590
    status = True
591
    try:
592
      payload = bdev.BaseDRBD.GetUsermodeHelper()
593
    except errors.BlockDeviceError, err:
594
      logging.error("Can't get DRBD usermode helper: %s", str(err))
595
      status = False
596
      payload = str(err)
597
    result[constants.NV_DRBDHELPER] = (status, payload)
598

    
599
  if constants.NV_NODESETUP in what:
600
    result[constants.NV_NODESETUP] = tmpr = []
601
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
602
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
603
                  " under /sys, missing required directories /sys/block"
604
                  " and /sys/class/net")
605
    if (not os.path.isdir("/proc/sys") or
606
        not os.path.isfile("/proc/sysrq-trigger")):
607
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
608
                  " under /proc, missing required directory /proc/sys and"
609
                  " the file /proc/sysrq-trigger")
610

    
611
  if constants.NV_TIME in what:
612
    result[constants.NV_TIME] = utils.SplitTime(time.time())
613

    
614
  if constants.NV_OSLIST in what:
615
    result[constants.NV_OSLIST] = DiagnoseOS()
616

    
617
  return result
618

    
619

    
620
def GetVolumeList(vg_name):
621
  """Compute list of logical volumes and their size.
622

623
  @type vg_name: str
624
  @param vg_name: the volume group whose LVs we should list
625
  @rtype: dict
626
  @return:
627
      dictionary of all partions (key) with value being a tuple of
628
      their size (in MiB), inactive and online status::
629

630
        {'test1': ('20.06', True, True)}
631

632
      in case of errors, a string is returned with the error
633
      details.
634

635
  """
636
  lvs = {}
637
  sep = '|'
638
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
639
                         "--separator=%s" % sep,
640
                         "-olv_name,lv_size,lv_attr", vg_name])
641
  if result.failed:
642
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
643

    
644
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
645
  for line in result.stdout.splitlines():
646
    line = line.strip()
647
    match = valid_line_re.match(line)
648
    if not match:
649
      logging.error("Invalid line returned from lvs output: '%s'", line)
650
      continue
651
    name, size, attr = match.groups()
652
    inactive = attr[4] == '-'
653
    online = attr[5] == 'o'
654
    virtual = attr[0] == 'v'
655
    if virtual:
656
      # we don't want to report such volumes as existing, since they
657
      # don't really hold data
658
      continue
659
    lvs[name] = (size, inactive, online)
660

    
661
  return lvs
662

    
663

    
664
def ListVolumeGroups():
665
  """List the volume groups and their size.
666

667
  @rtype: dict
668
  @return: dictionary with keys volume name and values the
669
      size of the volume
670

671
  """
672
  return utils.ListVolumeGroups()
673

    
674

    
675
def NodeVolumes():
676
  """List all volumes on this node.
677

678
  @rtype: list
679
  @return:
680
    A list of dictionaries, each having four keys:
681
      - name: the logical volume name,
682
      - size: the size of the logical volume
683
      - dev: the physical device on which the LV lives
684
      - vg: the volume group to which it belongs
685

686
    In case of errors, we return an empty list and log the
687
    error.
688

689
    Note that since a logical volume can live on multiple physical
690
    volumes, the resulting list might include a logical volume
691
    multiple times.
692

693
  """
694
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
695
                         "--separator=|",
696
                         "--options=lv_name,lv_size,devices,vg_name"])
697
  if result.failed:
698
    _Fail("Failed to list logical volumes, lvs output: %s",
699
          result.output)
700

    
701
  def parse_dev(dev):
702
    return dev.split('(')[0]
703

    
704
  def handle_dev(dev):
705
    return [parse_dev(x) for x in dev.split(",")]
706

    
707
  def map_line(line):
708
    line = [v.strip() for v in line]
709
    return [{'name': line[0], 'size': line[1],
710
             'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
711

    
712
  all_devs = []
713
  for line in result.stdout.splitlines():
714
    if line.count('|') >= 3:
715
      all_devs.extend(map_line(line.split('|')))
716
    else:
717
      logging.warning("Strange line in the output from lvs: '%s'", line)
718
  return all_devs
719

    
720

    
721
def BridgesExist(bridges_list):
722
  """Check if a list of bridges exist on the current node.
723

724
  @rtype: boolean
725
  @return: C{True} if all of them exist, C{False} otherwise
726

727
  """
728
  missing = []
729
  for bridge in bridges_list:
730
    if not utils.BridgeExists(bridge):
731
      missing.append(bridge)
732

    
733
  if missing:
734
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
735

    
736

    
737
def GetInstanceList(hypervisor_list):
738
  """Provides a list of instances.
739

740
  @type hypervisor_list: list
741
  @param hypervisor_list: the list of hypervisors to query information
742

743
  @rtype: list
744
  @return: a list of all running instances on the current node
745
    - instance1.example.com
746
    - instance2.example.com
747

748
  """
749
  results = []
750
  for hname in hypervisor_list:
751
    try:
752
      names = hypervisor.GetHypervisor(hname).ListInstances()
753
      results.extend(names)
754
    except errors.HypervisorError, err:
755
      _Fail("Error enumerating instances (hypervisor %s): %s",
756
            hname, err, exc=True)
757

    
758
  return results
759

    
760

    
761
def GetInstanceInfo(instance, hname):
762
  """Gives back the information about an instance as a dictionary.
763

764
  @type instance: string
765
  @param instance: the instance name
766
  @type hname: string
767
  @param hname: the hypervisor type of the instance
768

769
  @rtype: dict
770
  @return: dictionary with the following keys:
771
      - memory: memory size of instance (int)
772
      - state: xen state of instance (string)
773
      - time: cpu time of instance (float)
774

775
  """
776
  output = {}
777

    
778
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
779
  if iinfo is not None:
780
    output['memory'] = iinfo[2]
781
    output['state'] = iinfo[4]
782
    output['time'] = iinfo[5]
783

    
784
  return output
785

    
786

    
787
def GetInstanceMigratable(instance):
788
  """Gives whether an instance can be migrated.
789

790
  @type instance: L{objects.Instance}
791
  @param instance: object representing the instance to be checked.
792

793
  @rtype: tuple
794
  @return: tuple of (result, description) where:
795
      - result: whether the instance can be migrated or not
796
      - description: a description of the issue, if relevant
797

798
  """
799
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
800
  iname = instance.name
801
  if iname not in hyper.ListInstances():
802
    _Fail("Instance %s is not running", iname)
803

    
804
  for idx in range(len(instance.disks)):
805
    link_name = _GetBlockDevSymlinkPath(iname, idx)
806
    if not os.path.islink(link_name):
807
      logging.warning("Instance %s is missing symlink %s for disk %d",
808
                      iname, link_name, idx)
809

    
810

    
811
def GetAllInstancesInfo(hypervisor_list):
812
  """Gather data about all instances.
813

814
  This is the equivalent of L{GetInstanceInfo}, except that it
815
  computes data for all instances at once, thus being faster if one
816
  needs data about more than one instance.
817

818
  @type hypervisor_list: list
819
  @param hypervisor_list: list of hypervisors to query for instance data
820

821
  @rtype: dict
822
  @return: dictionary of instance: data, with data having the following keys:
823
      - memory: memory size of instance (int)
824
      - state: xen state of instance (string)
825
      - time: cpu time of instance (float)
826
      - vcpus: the number of vcpus
827

828
  """
829
  output = {}
830

    
831
  for hname in hypervisor_list:
832
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
833
    if iinfo:
834
      for name, _, memory, vcpus, state, times in iinfo:
835
        value = {
836
          'memory': memory,
837
          'vcpus': vcpus,
838
          'state': state,
839
          'time': times,
840
          }
841
        if name in output:
842
          # we only check static parameters, like memory and vcpus,
843
          # and not state and time which can change between the
844
          # invocations of the different hypervisors
845
          for key in 'memory', 'vcpus':
846
            if value[key] != output[name][key]:
847
              _Fail("Instance %s is running twice"
848
                    " with different parameters", name)
849
        output[name] = value
850

    
851
  return output
852

    
853

    
854
def _InstanceLogName(kind, os_name, instance):
855
  """Compute the OS log filename for a given instance and operation.
856

857
  The instance name and os name are passed in as strings since not all
858
  operations have these as part of an instance object.
859

860
  @type kind: string
861
  @param kind: the operation type (e.g. add, import, etc.)
862
  @type os_name: string
863
  @param os_name: the os name
864
  @type instance: string
865
  @param instance: the name of the instance being imported/added/etc.
866

867
  """
868
  # TODO: Use tempfile.mkstemp to create unique filename
869
  base = ("%s-%s-%s-%s.log" %
870
          (kind, os_name, instance, utils.TimestampForFilename()))
871
  return utils.PathJoin(constants.LOG_OS_DIR, base)
872

    
873

    
874
def InstanceOsAdd(instance, reinstall, debug):
875
  """Add an OS to an instance.
876

877
  @type instance: L{objects.Instance}
878
  @param instance: Instance whose OS is to be installed
879
  @type reinstall: boolean
880
  @param reinstall: whether this is an instance reinstall
881
  @type debug: integer
882
  @param debug: debug level, passed to the OS scripts
883
  @rtype: None
884

885
  """
886
  inst_os = OSFromDisk(instance.os)
887

    
888
  create_env = OSEnvironment(instance, inst_os, debug)
889
  if reinstall:
890
    create_env['INSTANCE_REINSTALL'] = "1"
891

    
892
  logfile = _InstanceLogName("add", instance.os, instance.name)
893

    
894
  result = utils.RunCmd([inst_os.create_script], env=create_env,
895
                        cwd=inst_os.path, output=logfile,)
896
  if result.failed:
897
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
898
                  " output: %s", result.cmd, result.fail_reason, logfile,
899
                  result.output)
900
    lines = [utils.SafeEncode(val)
901
             for val in utils.TailFile(logfile, lines=20)]
902
    _Fail("OS create script failed (%s), last lines in the"
903
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
904

    
905

    
906
def RunRenameInstance(instance, old_name, debug):
907
  """Run the OS rename script for an instance.
908

909
  @type instance: L{objects.Instance}
910
  @param instance: Instance whose OS is to be installed
911
  @type old_name: string
912
  @param old_name: previous instance name
913
  @type debug: integer
914
  @param debug: debug level, passed to the OS scripts
915
  @rtype: boolean
916
  @return: the success of the operation
917

918
  """
919
  inst_os = OSFromDisk(instance.os)
920

    
921
  rename_env = OSEnvironment(instance, inst_os, debug)
922
  rename_env['OLD_INSTANCE_NAME'] = old_name
923

    
924
  logfile = _InstanceLogName("rename", instance.os,
925
                             "%s-%s" % (old_name, instance.name))
926

    
927
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
928
                        cwd=inst_os.path, output=logfile)
929

    
930
  if result.failed:
931
    logging.error("os create command '%s' returned error: %s output: %s",
932
                  result.cmd, result.fail_reason, result.output)
933
    lines = [utils.SafeEncode(val)
934
             for val in utils.TailFile(logfile, lines=20)]
935
    _Fail("OS rename script failed (%s), last lines in the"
936
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
937

    
938

    
939
def _GetBlockDevSymlinkPath(instance_name, idx):
940
  return utils.PathJoin(constants.DISK_LINKS_DIR,
941
                        "%s:%d" % (instance_name, idx))
942

    
943

    
944
def _SymlinkBlockDev(instance_name, device_path, idx):
945
  """Set up symlinks to a instance's block device.
946

947
  This is an auxiliary function run when an instance is start (on the primary
948
  node) or when an instance is migrated (on the target node).
949

950

951
  @param instance_name: the name of the target instance
952
  @param device_path: path of the physical block device, on the node
953
  @param idx: the disk index
954
  @return: absolute path to the disk's symlink
955

956
  """
957
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
958
  try:
959
    os.symlink(device_path, link_name)
960
  except OSError, err:
961
    if err.errno == errno.EEXIST:
962
      if (not os.path.islink(link_name) or
963
          os.readlink(link_name) != device_path):
964
        os.remove(link_name)
965
        os.symlink(device_path, link_name)
966
    else:
967
      raise
968

    
969
  return link_name
970

    
971

    
972
def _RemoveBlockDevLinks(instance_name, disks):
973
  """Remove the block device symlinks belonging to the given instance.
974

975
  """
976
  for idx, _ in enumerate(disks):
977
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
978
    if os.path.islink(link_name):
979
      try:
980
        os.remove(link_name)
981
      except OSError:
982
        logging.exception("Can't remove symlink '%s'", link_name)
983

    
984

    
985
def _GatherAndLinkBlockDevs(instance):
986
  """Set up an instance's block device(s).
987

988
  This is run on the primary node at instance startup. The block
989
  devices must be already assembled.
990

991
  @type instance: L{objects.Instance}
992
  @param instance: the instance whose disks we shoul assemble
993
  @rtype: list
994
  @return: list of (disk_object, device_path)
995

996
  """
997
  block_devices = []
998
  for idx, disk in enumerate(instance.disks):
999
    device = _RecursiveFindBD(disk)
1000
    if device is None:
1001
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1002
                                    str(disk))
1003
    device.Open()
1004
    try:
1005
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1006
    except OSError, e:
1007
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1008
                                    e.strerror)
1009

    
1010
    block_devices.append((disk, link_name))
1011

    
1012
  return block_devices
1013

    
1014

    
1015
def StartInstance(instance):
1016
  """Start an instance.
1017

1018
  @type instance: L{objects.Instance}
1019
  @param instance: the instance object
1020
  @rtype: None
1021

1022
  """
1023
  running_instances = GetInstanceList([instance.hypervisor])
1024

    
1025
  if instance.name in running_instances:
1026
    logging.info("Instance %s already running, not starting", instance.name)
1027
    return
1028

    
1029
  try:
1030
    block_devices = _GatherAndLinkBlockDevs(instance)
1031
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1032
    hyper.StartInstance(instance, block_devices)
1033
  except errors.BlockDeviceError, err:
1034
    _Fail("Block device error: %s", err, exc=True)
1035
  except errors.HypervisorError, err:
1036
    _RemoveBlockDevLinks(instance.name, instance.disks)
1037
    _Fail("Hypervisor error: %s", err, exc=True)
1038

    
1039

    
1040
def InstanceShutdown(instance, timeout):
1041
  """Shut an instance down.
1042

1043
  @note: this functions uses polling with a hardcoded timeout.
1044

1045
  @type instance: L{objects.Instance}
1046
  @param instance: the instance object
1047
  @type timeout: integer
1048
  @param timeout: maximum timeout for soft shutdown
1049
  @rtype: None
1050

1051
  """
1052
  hv_name = instance.hypervisor
1053
  hyper = hypervisor.GetHypervisor(hv_name)
1054
  iname = instance.name
1055

    
1056
  if instance.name not in hyper.ListInstances():
1057
    logging.info("Instance %s not running, doing nothing", iname)
1058
    return
1059

    
1060
  class _TryShutdown:
1061
    def __init__(self):
1062
      self.tried_once = False
1063

    
1064
    def __call__(self):
1065
      if iname not in hyper.ListInstances():
1066
        return
1067

    
1068
      try:
1069
        hyper.StopInstance(instance, retry=self.tried_once)
1070
      except errors.HypervisorError, err:
1071
        if iname not in hyper.ListInstances():
1072
          # if the instance is no longer existing, consider this a
1073
          # success and go to cleanup
1074
          return
1075

    
1076
        _Fail("Failed to stop instance %s: %s", iname, err)
1077

    
1078
      self.tried_once = True
1079

    
1080
      raise utils.RetryAgain()
1081

    
1082
  try:
1083
    utils.Retry(_TryShutdown(), 5, timeout)
1084
  except utils.RetryTimeout:
1085
    # the shutdown did not succeed
1086
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1087

    
1088
    try:
1089
      hyper.StopInstance(instance, force=True)
1090
    except errors.HypervisorError, err:
1091
      if iname in hyper.ListInstances():
1092
        # only raise an error if the instance still exists, otherwise
1093
        # the error could simply be "instance ... unknown"!
1094
        _Fail("Failed to force stop instance %s: %s", iname, err)
1095

    
1096
    time.sleep(1)
1097

    
1098
    if iname in hyper.ListInstances():
1099
      _Fail("Could not shutdown instance %s even by destroy", iname)
1100

    
1101
  try:
1102
    hyper.CleanupInstance(instance.name)
1103
  except errors.HypervisorError, err:
1104
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1105

    
1106
  _RemoveBlockDevLinks(iname, instance.disks)
1107

    
1108

    
1109
def InstanceReboot(instance, reboot_type, shutdown_timeout):
1110
  """Reboot an instance.
1111

1112
  @type instance: L{objects.Instance}
1113
  @param instance: the instance object to reboot
1114
  @type reboot_type: str
1115
  @param reboot_type: the type of reboot, one the following
1116
    constants:
1117
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1118
        instance OS, do not recreate the VM
1119
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1120
        restart the VM (at the hypervisor level)
1121
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1122
        not accepted here, since that mode is handled differently, in
1123
        cmdlib, and translates into full stop and start of the
1124
        instance (instead of a call_instance_reboot RPC)
1125
  @type shutdown_timeout: integer
1126
  @param shutdown_timeout: maximum timeout for soft shutdown
1127
  @rtype: None
1128

1129
  """
1130
  running_instances = GetInstanceList([instance.hypervisor])
1131

    
1132
  if instance.name not in running_instances:
1133
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1134

    
1135
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1136
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1137
    try:
1138
      hyper.RebootInstance(instance)
1139
    except errors.HypervisorError, err:
1140
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1141
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1142
    try:
1143
      InstanceShutdown(instance, shutdown_timeout)
1144
      return StartInstance(instance)
1145
    except errors.HypervisorError, err:
1146
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1147
  else:
1148
    _Fail("Invalid reboot_type received: %s", reboot_type)
1149

    
1150

    
1151
def MigrationInfo(instance):
1152
  """Gather information about an instance to be migrated.
1153

1154
  @type instance: L{objects.Instance}
1155
  @param instance: the instance definition
1156

1157
  """
1158
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1159
  try:
1160
    info = hyper.MigrationInfo(instance)
1161
  except errors.HypervisorError, err:
1162
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1163
  return info
1164

    
1165

    
1166
def AcceptInstance(instance, info, target):
1167
  """Prepare the node to accept an instance.
1168

1169
  @type instance: L{objects.Instance}
1170
  @param instance: the instance definition
1171
  @type info: string/data (opaque)
1172
  @param info: migration information, from the source node
1173
  @type target: string
1174
  @param target: target host (usually ip), on this node
1175

1176
  """
1177
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1178
  try:
1179
    hyper.AcceptInstance(instance, info, target)
1180
  except errors.HypervisorError, err:
1181
    _Fail("Failed to accept instance: %s", err, exc=True)
1182

    
1183

    
1184
def FinalizeMigration(instance, info, success):
1185
  """Finalize any preparation to accept an instance.
1186

1187
  @type instance: L{objects.Instance}
1188
  @param instance: the instance definition
1189
  @type info: string/data (opaque)
1190
  @param info: migration information, from the source node
1191
  @type success: boolean
1192
  @param success: whether the migration was a success or a failure
1193

1194
  """
1195
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1196
  try:
1197
    hyper.FinalizeMigration(instance, info, success)
1198
  except errors.HypervisorError, err:
1199
    _Fail("Failed to finalize migration: %s", err, exc=True)
1200

    
1201

    
1202
def MigrateInstance(instance, target, live):
1203
  """Migrates an instance to another node.
1204

1205
  @type instance: L{objects.Instance}
1206
  @param instance: the instance definition
1207
  @type target: string
1208
  @param target: the target node name
1209
  @type live: boolean
1210
  @param live: whether the migration should be done live or not (the
1211
      interpretation of this parameter is left to the hypervisor)
1212
  @rtype: tuple
1213
  @return: a tuple of (success, msg) where:
1214
      - succes is a boolean denoting the success/failure of the operation
1215
      - msg is a string with details in case of failure
1216

1217
  """
1218
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1219

    
1220
  try:
1221
    hyper.MigrateInstance(instance, target, live)
1222
  except errors.HypervisorError, err:
1223
    _Fail("Failed to migrate instance: %s", err, exc=True)
1224

    
1225

    
1226
def BlockdevCreate(disk, size, owner, on_primary, info):
1227
  """Creates a block device for an instance.
1228

1229
  @type disk: L{objects.Disk}
1230
  @param disk: the object describing the disk we should create
1231
  @type size: int
1232
  @param size: the size of the physical underlying device, in MiB
1233
  @type owner: str
1234
  @param owner: the name of the instance for which disk is created,
1235
      used for device cache data
1236
  @type on_primary: boolean
1237
  @param on_primary:  indicates if it is the primary node or not
1238
  @type info: string
1239
  @param info: string that will be sent to the physical device
1240
      creation, used for example to set (LVM) tags on LVs
1241

1242
  @return: the new unique_id of the device (this can sometime be
1243
      computed only after creation), or None. On secondary nodes,
1244
      it's not required to return anything.
1245

1246
  """
1247
  # TODO: remove the obsolete 'size' argument
1248
  # pylint: disable-msg=W0613
1249
  clist = []
1250
  if disk.children:
1251
    for child in disk.children:
1252
      try:
1253
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1254
      except errors.BlockDeviceError, err:
1255
        _Fail("Can't assemble device %s: %s", child, err)
1256
      if on_primary or disk.AssembleOnSecondary():
1257
        # we need the children open in case the device itself has to
1258
        # be assembled
1259
        try:
1260
          # pylint: disable-msg=E1103
1261
          crdev.Open()
1262
        except errors.BlockDeviceError, err:
1263
          _Fail("Can't make child '%s' read-write: %s", child, err)
1264
      clist.append(crdev)
1265

    
1266
  try:
1267
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1268
  except errors.BlockDeviceError, err:
1269
    _Fail("Can't create block device: %s", err)
1270

    
1271
  if on_primary or disk.AssembleOnSecondary():
1272
    try:
1273
      device.Assemble()
1274
    except errors.BlockDeviceError, err:
1275
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1276
    device.SetSyncSpeed(constants.SYNC_SPEED)
1277
    if on_primary or disk.OpenOnSecondary():
1278
      try:
1279
        device.Open(force=True)
1280
      except errors.BlockDeviceError, err:
1281
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1282
    DevCacheManager.UpdateCache(device.dev_path, owner,
1283
                                on_primary, disk.iv_name)
1284

    
1285
  device.SetInfo(info)
1286

    
1287
  return device.unique_id
1288

    
1289

    
1290
def _WipeDevice(path):
1291
  """This function actually wipes the device.
1292

1293
  @param path: The path to the device to wipe
1294

1295
  """
1296
  result = utils.RunCmd("%s%s" % (constants.WIPE_CMD, utils.ShellQuote(path)))
1297

    
1298
  if result.failed:
1299
    _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1300
          result.fail_reason, result.output)
1301

    
1302

    
1303
def BlockdevWipe(disk):
1304
  """Wipes a block device.
1305

1306
  @type disk: L{objects.Disk}
1307
  @param disk: the disk object we want to wipe
1308

1309
  """
1310
  try:
1311
    rdev = _RecursiveFindBD(disk)
1312
  except errors.BlockDeviceError, err:
1313
    _Fail("Cannot execute wipe for device %s: device not found", err)
1314

    
1315
  _WipeDevice(rdev.dev_path)
1316

    
1317

    
1318
def BlockdevRemove(disk):
1319
  """Remove a block device.
1320

1321
  @note: This is intended to be called recursively.
1322

1323
  @type disk: L{objects.Disk}
1324
  @param disk: the disk object we should remove
1325
  @rtype: boolean
1326
  @return: the success of the operation
1327

1328
  """
1329
  msgs = []
1330
  try:
1331
    rdev = _RecursiveFindBD(disk)
1332
  except errors.BlockDeviceError, err:
1333
    # probably can't attach
1334
    logging.info("Can't attach to device %s in remove", disk)
1335
    rdev = None
1336
  if rdev is not None:
1337
    r_path = rdev.dev_path
1338
    try:
1339
      rdev.Remove()
1340
    except errors.BlockDeviceError, err:
1341
      msgs.append(str(err))
1342
    if not msgs:
1343
      DevCacheManager.RemoveCache(r_path)
1344

    
1345
  if disk.children:
1346
    for child in disk.children:
1347
      try:
1348
        BlockdevRemove(child)
1349
      except RPCFail, err:
1350
        msgs.append(str(err))
1351

    
1352
  if msgs:
1353
    _Fail("; ".join(msgs))
1354

    
1355

    
1356
def _RecursiveAssembleBD(disk, owner, as_primary):
1357
  """Activate a block device for an instance.
1358

1359
  This is run on the primary and secondary nodes for an instance.
1360

1361
  @note: this function is called recursively.
1362

1363
  @type disk: L{objects.Disk}
1364
  @param disk: the disk we try to assemble
1365
  @type owner: str
1366
  @param owner: the name of the instance which owns the disk
1367
  @type as_primary: boolean
1368
  @param as_primary: if we should make the block device
1369
      read/write
1370

1371
  @return: the assembled device or None (in case no device
1372
      was assembled)
1373
  @raise errors.BlockDeviceError: in case there is an error
1374
      during the activation of the children or the device
1375
      itself
1376

1377
  """
1378
  children = []
1379
  if disk.children:
1380
    mcn = disk.ChildrenNeeded()
1381
    if mcn == -1:
1382
      mcn = 0 # max number of Nones allowed
1383
    else:
1384
      mcn = len(disk.children) - mcn # max number of Nones
1385
    for chld_disk in disk.children:
1386
      try:
1387
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1388
      except errors.BlockDeviceError, err:
1389
        if children.count(None) >= mcn:
1390
          raise
1391
        cdev = None
1392
        logging.error("Error in child activation (but continuing): %s",
1393
                      str(err))
1394
      children.append(cdev)
1395

    
1396
  if as_primary or disk.AssembleOnSecondary():
1397
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1398
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1399
    result = r_dev
1400
    if as_primary or disk.OpenOnSecondary():
1401
      r_dev.Open()
1402
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1403
                                as_primary, disk.iv_name)
1404

    
1405
  else:
1406
    result = True
1407
  return result
1408

    
1409

    
1410
def BlockdevAssemble(disk, owner, as_primary):
1411
  """Activate a block device for an instance.
1412

1413
  This is a wrapper over _RecursiveAssembleBD.
1414

1415
  @rtype: str or boolean
1416
  @return: a C{/dev/...} path for primary nodes, and
1417
      C{True} for secondary nodes
1418

1419
  """
1420
  try:
1421
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1422
    if isinstance(result, bdev.BlockDev):
1423
      # pylint: disable-msg=E1103
1424
      result = result.dev_path
1425
  except errors.BlockDeviceError, err:
1426
    _Fail("Error while assembling disk: %s", err, exc=True)
1427

    
1428
  return result
1429

    
1430

    
1431
def BlockdevShutdown(disk):
1432
  """Shut down a block device.
1433

1434
  First, if the device is assembled (Attach() is successful), then
1435
  the device is shutdown. Then the children of the device are
1436
  shutdown.
1437

1438
  This function is called recursively. Note that we don't cache the
1439
  children or such, as oppossed to assemble, shutdown of different
1440
  devices doesn't require that the upper device was active.
1441

1442
  @type disk: L{objects.Disk}
1443
  @param disk: the description of the disk we should
1444
      shutdown
1445
  @rtype: None
1446

1447
  """
1448
  msgs = []
1449
  r_dev = _RecursiveFindBD(disk)
1450
  if r_dev is not None:
1451
    r_path = r_dev.dev_path
1452
    try:
1453
      r_dev.Shutdown()
1454
      DevCacheManager.RemoveCache(r_path)
1455
    except errors.BlockDeviceError, err:
1456
      msgs.append(str(err))
1457

    
1458
  if disk.children:
1459
    for child in disk.children:
1460
      try:
1461
        BlockdevShutdown(child)
1462
      except RPCFail, err:
1463
        msgs.append(str(err))
1464

    
1465
  if msgs:
1466
    _Fail("; ".join(msgs))
1467

    
1468

    
1469
def BlockdevAddchildren(parent_cdev, new_cdevs):
1470
  """Extend a mirrored block device.
1471

1472
  @type parent_cdev: L{objects.Disk}
1473
  @param parent_cdev: the disk to which we should add children
1474
  @type new_cdevs: list of L{objects.Disk}
1475
  @param new_cdevs: the list of children which we should add
1476
  @rtype: None
1477

1478
  """
1479
  parent_bdev = _RecursiveFindBD(parent_cdev)
1480
  if parent_bdev is None:
1481
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1482
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1483
  if new_bdevs.count(None) > 0:
1484
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1485
  parent_bdev.AddChildren(new_bdevs)
1486

    
1487

    
1488
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1489
  """Shrink a mirrored block device.
1490

1491
  @type parent_cdev: L{objects.Disk}
1492
  @param parent_cdev: the disk from which we should remove children
1493
  @type new_cdevs: list of L{objects.Disk}
1494
  @param new_cdevs: the list of children which we should remove
1495
  @rtype: None
1496

1497
  """
1498
  parent_bdev = _RecursiveFindBD(parent_cdev)
1499
  if parent_bdev is None:
1500
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1501
  devs = []
1502
  for disk in new_cdevs:
1503
    rpath = disk.StaticDevPath()
1504
    if rpath is None:
1505
      bd = _RecursiveFindBD(disk)
1506
      if bd is None:
1507
        _Fail("Can't find device %s while removing children", disk)
1508
      else:
1509
        devs.append(bd.dev_path)
1510
    else:
1511
      if not utils.IsNormAbsPath(rpath):
1512
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1513
      devs.append(rpath)
1514
  parent_bdev.RemoveChildren(devs)
1515

    
1516

    
1517
def BlockdevGetmirrorstatus(disks):
1518
  """Get the mirroring status of a list of devices.
1519

1520
  @type disks: list of L{objects.Disk}
1521
  @param disks: the list of disks which we should query
1522
  @rtype: disk
1523
  @return:
1524
      a list of (mirror_done, estimated_time) tuples, which
1525
      are the result of L{bdev.BlockDev.CombinedSyncStatus}
1526
  @raise errors.BlockDeviceError: if any of the disks cannot be
1527
      found
1528

1529
  """
1530
  stats = []
1531
  for dsk in disks:
1532
    rbd = _RecursiveFindBD(dsk)
1533
    if rbd is None:
1534
      _Fail("Can't find device %s", dsk)
1535

    
1536
    stats.append(rbd.CombinedSyncStatus())
1537

    
1538
  return stats
1539

    
1540

    
1541
def _RecursiveFindBD(disk):
1542
  """Check if a device is activated.
1543

1544
  If so, return information about the real device.
1545

1546
  @type disk: L{objects.Disk}
1547
  @param disk: the disk object we need to find
1548

1549
  @return: None if the device can't be found,
1550
      otherwise the device instance
1551

1552
  """
1553
  children = []
1554
  if disk.children:
1555
    for chdisk in disk.children:
1556
      children.append(_RecursiveFindBD(chdisk))
1557

    
1558
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1559

    
1560

    
1561
def _OpenRealBD(disk):
1562
  """Opens the underlying block device of a disk.
1563

1564
  @type disk: L{objects.Disk}
1565
  @param disk: the disk object we want to open
1566

1567
  """
1568
  real_disk = _RecursiveFindBD(disk)
1569
  if real_disk is None:
1570
    _Fail("Block device '%s' is not set up", disk)
1571

    
1572
  real_disk.Open()
1573

    
1574
  return real_disk
1575

    
1576

    
1577
def BlockdevFind(disk):
1578
  """Check if a device is activated.
1579

1580
  If it is, return information about the real device.
1581

1582
  @type disk: L{objects.Disk}
1583
  @param disk: the disk to find
1584
  @rtype: None or objects.BlockDevStatus
1585
  @return: None if the disk cannot be found, otherwise a the current
1586
           information
1587

1588
  """
1589
  try:
1590
    rbd = _RecursiveFindBD(disk)
1591
  except errors.BlockDeviceError, err:
1592
    _Fail("Failed to find device: %s", err, exc=True)
1593

    
1594
  if rbd is None:
1595
    return None
1596

    
1597
  return rbd.GetSyncStatus()
1598

    
1599

    
1600
def BlockdevGetsize(disks):
1601
  """Computes the size of the given disks.
1602

1603
  If a disk is not found, returns None instead.
1604

1605
  @type disks: list of L{objects.Disk}
1606
  @param disks: the list of disk to compute the size for
1607
  @rtype: list
1608
  @return: list with elements None if the disk cannot be found,
1609
      otherwise the size
1610

1611
  """
1612
  result = []
1613
  for cf in disks:
1614
    try:
1615
      rbd = _RecursiveFindBD(cf)
1616
    except errors.BlockDeviceError:
1617
      result.append(None)
1618
      continue
1619
    if rbd is None:
1620
      result.append(None)
1621
    else:
1622
      result.append(rbd.GetActualSize())
1623
  return result
1624

    
1625

    
1626
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1627
  """Export a block device to a remote node.
1628

1629
  @type disk: L{objects.Disk}
1630
  @param disk: the description of the disk to export
1631
  @type dest_node: str
1632
  @param dest_node: the destination node to export to
1633
  @type dest_path: str
1634
  @param dest_path: the destination path on the target node
1635
  @type cluster_name: str
1636
  @param cluster_name: the cluster name, needed for SSH hostalias
1637
  @rtype: None
1638

1639
  """
1640
  real_disk = _OpenRealBD(disk)
1641

    
1642
  # the block size on the read dd is 1MiB to match our units
1643
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1644
                               "dd if=%s bs=1048576 count=%s",
1645
                               real_disk.dev_path, str(disk.size))
1646

    
1647
  # we set here a smaller block size as, due to ssh buffering, more
1648
  # than 64-128k will mostly ignored; we use nocreat to fail if the
1649
  # device is not already there or we pass a wrong path; we use
1650
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1651
  # to not buffer too much memory; this means that at best, we flush
1652
  # every 64k, which will not be very fast
1653
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1654
                                " oflag=dsync", dest_path)
1655

    
1656
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1657
                                                   constants.GANETI_RUNAS,
1658
                                                   destcmd)
1659

    
1660
  # all commands have been checked, so we're safe to combine them
1661
  command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1662

    
1663
  result = utils.RunCmd(["bash", "-c", command])
1664

    
1665
  if result.failed:
1666
    _Fail("Disk copy command '%s' returned error: %s"
1667
          " output: %s", command, result.fail_reason, result.output)
1668

    
1669

    
1670
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1671
  """Write a file to the filesystem.
1672

1673
  This allows the master to overwrite(!) a file. It will only perform
1674
  the operation if the file belongs to a list of configuration files.
1675

1676
  @type file_name: str
1677
  @param file_name: the target file name
1678
  @type data: str
1679
  @param data: the new contents of the file
1680
  @type mode: int
1681
  @param mode: the mode to give the file (can be None)
1682
  @type uid: int
1683
  @param uid: the owner of the file (can be -1 for default)
1684
  @type gid: int
1685
  @param gid: the group of the file (can be -1 for default)
1686
  @type atime: float
1687
  @param atime: the atime to set on the file (can be None)
1688
  @type mtime: float
1689
  @param mtime: the mtime to set on the file (can be None)
1690
  @rtype: None
1691

1692
  """
1693
  if not os.path.isabs(file_name):
1694
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1695

    
1696
  if file_name not in _ALLOWED_UPLOAD_FILES:
1697
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1698
          file_name)
1699

    
1700
  raw_data = _Decompress(data)
1701

    
1702
  utils.SafeWriteFile(file_name, None,
1703
                      data=raw_data, mode=mode, uid=uid, gid=gid,
1704
                      atime=atime, mtime=mtime)
1705

    
1706

    
1707
def WriteSsconfFiles(values):
1708
  """Update all ssconf files.
1709

1710
  Wrapper around the SimpleStore.WriteFiles.
1711

1712
  """
1713
  ssconf.SimpleStore().WriteFiles(values)
1714

    
1715

    
1716
def _ErrnoOrStr(err):
1717
  """Format an EnvironmentError exception.
1718

1719
  If the L{err} argument has an errno attribute, it will be looked up
1720
  and converted into a textual C{E...} description. Otherwise the
1721
  string representation of the error will be returned.
1722

1723
  @type err: L{EnvironmentError}
1724
  @param err: the exception to format
1725

1726
  """
1727
  if hasattr(err, 'errno'):
1728
    detail = errno.errorcode[err.errno]
1729
  else:
1730
    detail = str(err)
1731
  return detail
1732

    
1733

    
1734
def _OSOndiskAPIVersion(os_dir):
1735
  """Compute and return the API version of a given OS.
1736

1737
  This function will try to read the API version of the OS residing in
1738
  the 'os_dir' directory.
1739

1740
  @type os_dir: str
1741
  @param os_dir: the directory in which we should look for the OS
1742
  @rtype: tuple
1743
  @return: tuple (status, data) with status denoting the validity and
1744
      data holding either the vaid versions or an error message
1745

1746
  """
1747
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1748

    
1749
  try:
1750
    st = os.stat(api_file)
1751
  except EnvironmentError, err:
1752
    return False, ("Required file '%s' not found under path %s: %s" %
1753
                   (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1754

    
1755
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1756
    return False, ("File '%s' in %s is not a regular file" %
1757
                   (constants.OS_API_FILE, os_dir))
1758

    
1759
  try:
1760
    api_versions = utils.ReadFile(api_file).splitlines()
1761
  except EnvironmentError, err:
1762
    return False, ("Error while reading the API version file at %s: %s" %
1763
                   (api_file, _ErrnoOrStr(err)))
1764

    
1765
  try:
1766
    api_versions = [int(version.strip()) for version in api_versions]
1767
  except (TypeError, ValueError), err:
1768
    return False, ("API version(s) can't be converted to integer: %s" %
1769
                   str(err))
1770

    
1771
  return True, api_versions
1772

    
1773

    
1774
def DiagnoseOS(top_dirs=None):
1775
  """Compute the validity for all OSes.
1776

1777
  @type top_dirs: list
1778
  @param top_dirs: the list of directories in which to
1779
      search (if not given defaults to
1780
      L{constants.OS_SEARCH_PATH})
1781
  @rtype: list of L{objects.OS}
1782
  @return: a list of tuples (name, path, status, diagnose, variants,
1783
      parameters, api_version) for all (potential) OSes under all
1784
      search paths, where:
1785
          - name is the (potential) OS name
1786
          - path is the full path to the OS
1787
          - status True/False is the validity of the OS
1788
          - diagnose is the error message for an invalid OS, otherwise empty
1789
          - variants is a list of supported OS variants, if any
1790
          - parameters is a list of (name, help) parameters, if any
1791
          - api_version is a list of support OS API versions
1792

1793
  """
1794
  if top_dirs is None:
1795
    top_dirs = constants.OS_SEARCH_PATH
1796

    
1797
  result = []
1798
  for dir_name in top_dirs:
1799
    if os.path.isdir(dir_name):
1800
      try:
1801
        f_names = utils.ListVisibleFiles(dir_name)
1802
      except EnvironmentError, err:
1803
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1804
        break
1805
      for name in f_names:
1806
        os_path = utils.PathJoin(dir_name, name)
1807
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1808
        if status:
1809
          diagnose = ""
1810
          variants = os_inst.supported_variants
1811
          parameters = os_inst.supported_parameters
1812
          api_versions = os_inst.api_versions
1813
        else:
1814
          diagnose = os_inst
1815
          variants = parameters = api_versions = []
1816
        result.append((name, os_path, status, diagnose, variants,
1817
                       parameters, api_versions))
1818

    
1819
  return result
1820

    
1821

    
1822
def _TryOSFromDisk(name, base_dir=None):
1823
  """Create an OS instance from disk.
1824

1825
  This function will return an OS instance if the given name is a
1826
  valid OS name.
1827

1828
  @type base_dir: string
1829
  @keyword base_dir: Base directory containing OS installations.
1830
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1831
  @rtype: tuple
1832
  @return: success and either the OS instance if we find a valid one,
1833
      or error message
1834

1835
  """
1836
  if base_dir is None:
1837
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1838
  else:
1839
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1840

    
1841
  if os_dir is None:
1842
    return False, "Directory for OS %s not found in search path" % name
1843

    
1844
  status, api_versions = _OSOndiskAPIVersion(os_dir)
1845
  if not status:
1846
    # push the error up
1847
    return status, api_versions
1848

    
1849
  if not constants.OS_API_VERSIONS.intersection(api_versions):
1850
    return False, ("API version mismatch for path '%s': found %s, want %s." %
1851
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
1852

    
1853
  # OS Files dictionary, we will populate it with the absolute path names
1854
  os_files = dict.fromkeys(constants.OS_SCRIPTS)
1855

    
1856
  if max(api_versions) >= constants.OS_API_V15:
1857
    os_files[constants.OS_VARIANTS_FILE] = ''
1858

    
1859
  if max(api_versions) >= constants.OS_API_V20:
1860
    os_files[constants.OS_PARAMETERS_FILE] = ''
1861
  else:
1862
    del os_files[constants.OS_SCRIPT_VERIFY]
1863

    
1864
  for filename in os_files:
1865
    os_files[filename] = utils.PathJoin(os_dir, filename)
1866

    
1867
    try:
1868
      st = os.stat(os_files[filename])
1869
    except EnvironmentError, err:
1870
      return False, ("File '%s' under path '%s' is missing (%s)" %
1871
                     (filename, os_dir, _ErrnoOrStr(err)))
1872

    
1873
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1874
      return False, ("File '%s' under path '%s' is not a regular file" %
1875
                     (filename, os_dir))
1876

    
1877
    if filename in constants.OS_SCRIPTS:
1878
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1879
        return False, ("File '%s' under path '%s' is not executable" %
1880
                       (filename, os_dir))
1881

    
1882
  variants = []
1883
  if constants.OS_VARIANTS_FILE in os_files:
1884
    variants_file = os_files[constants.OS_VARIANTS_FILE]
1885
    try:
1886
      variants = utils.ReadFile(variants_file).splitlines()
1887
    except EnvironmentError, err:
1888
      return False, ("Error while reading the OS variants file at %s: %s" %
1889
                     (variants_file, _ErrnoOrStr(err)))
1890
    if not variants:
1891
      return False, ("No supported os variant found")
1892

    
1893
  parameters = []
1894
  if constants.OS_PARAMETERS_FILE in os_files:
1895
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
1896
    try:
1897
      parameters = utils.ReadFile(parameters_file).splitlines()
1898
    except EnvironmentError, err:
1899
      return False, ("Error while reading the OS parameters file at %s: %s" %
1900
                     (parameters_file, _ErrnoOrStr(err)))
1901
    parameters = [v.split(None, 1) for v in parameters]
1902

    
1903
  os_obj = objects.OS(name=name, path=os_dir,
1904
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
1905
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
1906
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
1907
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
1908
                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
1909
                                                 None),
1910
                      supported_variants=variants,
1911
                      supported_parameters=parameters,
1912
                      api_versions=api_versions)
1913
  return True, os_obj
1914

    
1915

    
1916
def OSFromDisk(name, base_dir=None):
1917
  """Create an OS instance from disk.
1918

1919
  This function will return an OS instance if the given name is a
1920
  valid OS name. Otherwise, it will raise an appropriate
1921
  L{RPCFail} exception, detailing why this is not a valid OS.
1922

1923
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1924
  an exception but returns true/false status data.
1925

1926
  @type base_dir: string
1927
  @keyword base_dir: Base directory containing OS installations.
1928
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1929
  @rtype: L{objects.OS}
1930
  @return: the OS instance if we find a valid one
1931
  @raise RPCFail: if we don't find a valid OS
1932

1933
  """
1934
  name_only = objects.OS.GetName(name)
1935
  status, payload = _TryOSFromDisk(name_only, base_dir)
1936

    
1937
  if not status:
1938
    _Fail(payload)
1939

    
1940
  return payload
1941

    
1942

    
1943
def OSCoreEnv(os_name, inst_os, os_params, debug=0):
1944
  """Calculate the basic environment for an os script.
1945

1946
  @type os_name: str
1947
  @param os_name: full operating system name (including variant)
1948
  @type inst_os: L{objects.OS}
1949
  @param inst_os: operating system for which the environment is being built
1950
  @type os_params: dict
1951
  @param os_params: the OS parameters
1952
  @type debug: integer
1953
  @param debug: debug level (0 or 1, for OS Api 10)
1954
  @rtype: dict
1955
  @return: dict of environment variables
1956
  @raise errors.BlockDeviceError: if the block device
1957
      cannot be found
1958

1959
  """
1960
  result = {}
1961
  api_version = \
1962
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1963
  result['OS_API_VERSION'] = '%d' % api_version
1964
  result['OS_NAME'] = inst_os.name
1965
  result['DEBUG_LEVEL'] = '%d' % debug
1966

    
1967
  # OS variants
1968
  if api_version >= constants.OS_API_V15:
1969
    variant = objects.OS.GetVariant(os_name)
1970
    if not variant:
1971
      variant = inst_os.supported_variants[0]
1972
    result['OS_VARIANT'] = variant
1973

    
1974
  # OS params
1975
  for pname, pvalue in os_params.items():
1976
    result['OSP_%s' % pname.upper()] = pvalue
1977

    
1978
  return result
1979

    
1980

    
1981
def OSEnvironment(instance, inst_os, debug=0):
1982
  """Calculate the environment for an os script.
1983

1984
  @type instance: L{objects.Instance}
1985
  @param instance: target instance for the os script run
1986
  @type inst_os: L{objects.OS}
1987
  @param inst_os: operating system for which the environment is being built
1988
  @type debug: integer
1989
  @param debug: debug level (0 or 1, for OS Api 10)
1990
  @rtype: dict
1991
  @return: dict of environment variables
1992
  @raise errors.BlockDeviceError: if the block device
1993
      cannot be found
1994

1995
  """
1996
  result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
1997

    
1998
  result['INSTANCE_NAME'] = instance.name
1999
  result['INSTANCE_OS'] = instance.os
2000
  result['HYPERVISOR'] = instance.hypervisor
2001
  result['DISK_COUNT'] = '%d' % len(instance.disks)
2002
  result['NIC_COUNT'] = '%d' % len(instance.nics)
2003

    
2004
  # Disks
2005
  for idx, disk in enumerate(instance.disks):
2006
    real_disk = _OpenRealBD(disk)
2007
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
2008
    result['DISK_%d_ACCESS' % idx] = disk.mode
2009
    if constants.HV_DISK_TYPE in instance.hvparams:
2010
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
2011
        instance.hvparams[constants.HV_DISK_TYPE]
2012
    if disk.dev_type in constants.LDS_BLOCK:
2013
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
2014
    elif disk.dev_type == constants.LD_FILE:
2015
      result['DISK_%d_BACKEND_TYPE' % idx] = \
2016
        'file:%s' % disk.physical_id[0]
2017

    
2018
  # NICs
2019
  for idx, nic in enumerate(instance.nics):
2020
    result['NIC_%d_MAC' % idx] = nic.mac
2021
    if nic.ip:
2022
      result['NIC_%d_IP' % idx] = nic.ip
2023
    result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
2024
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2025
      result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
2026
    if nic.nicparams[constants.NIC_LINK]:
2027
      result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
2028
    if constants.HV_NIC_TYPE in instance.hvparams:
2029
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
2030
        instance.hvparams[constants.HV_NIC_TYPE]
2031

    
2032
  # HV/BE params
2033
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2034
    for key, value in source.items():
2035
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2036

    
2037
  return result
2038

    
2039

    
2040
def BlockdevGrow(disk, amount):
2041
  """Grow a stack of block devices.
2042

2043
  This function is called recursively, with the childrens being the
2044
  first ones to resize.
2045

2046
  @type disk: L{objects.Disk}
2047
  @param disk: the disk to be grown
2048
  @rtype: (status, result)
2049
  @return: a tuple with the status of the operation
2050
      (True/False), and the errors message if status
2051
      is False
2052

2053
  """
2054
  r_dev = _RecursiveFindBD(disk)
2055
  if r_dev is None:
2056
    _Fail("Cannot find block device %s", disk)
2057

    
2058
  try:
2059
    r_dev.Grow(amount)
2060
  except errors.BlockDeviceError, err:
2061
    _Fail("Failed to grow block device: %s", err, exc=True)
2062

    
2063

    
2064
def BlockdevSnapshot(disk):
2065
  """Create a snapshot copy of a block device.
2066

2067
  This function is called recursively, and the snapshot is actually created
2068
  just for the leaf lvm backend device.
2069

2070
  @type disk: L{objects.Disk}
2071
  @param disk: the disk to be snapshotted
2072
  @rtype: string
2073
  @return: snapshot disk path
2074

2075
  """
2076
  if disk.dev_type == constants.LD_DRBD8:
2077
    if not disk.children:
2078
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2079
            disk.unique_id)
2080
    return BlockdevSnapshot(disk.children[0])
2081
  elif disk.dev_type == constants.LD_LV:
2082
    r_dev = _RecursiveFindBD(disk)
2083
    if r_dev is not None:
2084
      # FIXME: choose a saner value for the snapshot size
2085
      # let's stay on the safe side and ask for the full size, for now
2086
      return r_dev.Snapshot(disk.size)
2087
    else:
2088
      _Fail("Cannot find block device %s", disk)
2089
  else:
2090
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2091
          disk.unique_id, disk.dev_type)
2092

    
2093

    
2094
def FinalizeExport(instance, snap_disks):
2095
  """Write out the export configuration information.
2096

2097
  @type instance: L{objects.Instance}
2098
  @param instance: the instance which we export, used for
2099
      saving configuration
2100
  @type snap_disks: list of L{objects.Disk}
2101
  @param snap_disks: list of snapshot block devices, which
2102
      will be used to get the actual name of the dump file
2103

2104
  @rtype: None
2105

2106
  """
2107
  destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2108
  finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2109

    
2110
  config = objects.SerializableConfigParser()
2111

    
2112
  config.add_section(constants.INISECT_EXP)
2113
  config.set(constants.INISECT_EXP, 'version', '0')
2114
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2115
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2116
  config.set(constants.INISECT_EXP, 'os', instance.os)
2117
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
2118

    
2119
  config.add_section(constants.INISECT_INS)
2120
  config.set(constants.INISECT_INS, 'name', instance.name)
2121
  config.set(constants.INISECT_INS, 'memory', '%d' %
2122
             instance.beparams[constants.BE_MEMORY])
2123
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
2124
             instance.beparams[constants.BE_VCPUS])
2125
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2126
  config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2127

    
2128
  nic_total = 0
2129
  for nic_count, nic in enumerate(instance.nics):
2130
    nic_total += 1
2131
    config.set(constants.INISECT_INS, 'nic%d_mac' %
2132
               nic_count, '%s' % nic.mac)
2133
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2134
    for param in constants.NICS_PARAMETER_TYPES:
2135
      config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2136
                 '%s' % nic.nicparams.get(param, None))
2137
  # TODO: redundant: on load can read nics until it doesn't exist
2138
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2139

    
2140
  disk_total = 0
2141
  for disk_count, disk in enumerate(snap_disks):
2142
    if disk:
2143
      disk_total += 1
2144
      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2145
                 ('%s' % disk.iv_name))
2146
      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2147
                 ('%s' % disk.physical_id[1]))
2148
      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2149
                 ('%d' % disk.size))
2150

    
2151
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2152

    
2153
  # New-style hypervisor/backend parameters
2154

    
2155
  config.add_section(constants.INISECT_HYP)
2156
  for name, value in instance.hvparams.items():
2157
    if name not in constants.HVC_GLOBALS:
2158
      config.set(constants.INISECT_HYP, name, str(value))
2159

    
2160
  config.add_section(constants.INISECT_BEP)
2161
  for name, value in instance.beparams.items():
2162
    config.set(constants.INISECT_BEP, name, str(value))
2163

    
2164
  config.add_section(constants.INISECT_OSP)
2165
  for name, value in instance.osparams.items():
2166
    config.set(constants.INISECT_OSP, name, str(value))
2167

    
2168
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2169
                  data=config.Dumps())
2170
  shutil.rmtree(finaldestdir, ignore_errors=True)
2171
  shutil.move(destdir, finaldestdir)
2172

    
2173

    
2174
def ExportInfo(dest):
2175
  """Get export configuration information.
2176

2177
  @type dest: str
2178
  @param dest: directory containing the export
2179

2180
  @rtype: L{objects.SerializableConfigParser}
2181
  @return: a serializable config file containing the
2182
      export info
2183

2184
  """
2185
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2186

    
2187
  config = objects.SerializableConfigParser()
2188
  config.read(cff)
2189

    
2190
  if (not config.has_section(constants.INISECT_EXP) or
2191
      not config.has_section(constants.INISECT_INS)):
2192
    _Fail("Export info file doesn't have the required fields")
2193

    
2194
  return config.Dumps()
2195

    
2196

    
2197
def ListExports():
2198
  """Return a list of exports currently available on this machine.
2199

2200
  @rtype: list
2201
  @return: list of the exports
2202

2203
  """
2204
  if os.path.isdir(constants.EXPORT_DIR):
2205
    return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2206
  else:
2207
    _Fail("No exports directory")
2208

    
2209

    
2210
def RemoveExport(export):
2211
  """Remove an existing export from the node.
2212

2213
  @type export: str
2214
  @param export: the name of the export to remove
2215
  @rtype: None
2216

2217
  """
2218
  target = utils.PathJoin(constants.EXPORT_DIR, export)
2219

    
2220
  try:
2221
    shutil.rmtree(target)
2222
  except EnvironmentError, err:
2223
    _Fail("Error while removing the export: %s", err, exc=True)
2224

    
2225

    
2226
def BlockdevRename(devlist):
2227
  """Rename a list of block devices.
2228

2229
  @type devlist: list of tuples
2230
  @param devlist: list of tuples of the form  (disk,
2231
      new_logical_id, new_physical_id); disk is an
2232
      L{objects.Disk} object describing the current disk,
2233
      and new logical_id/physical_id is the name we
2234
      rename it to
2235
  @rtype: boolean
2236
  @return: True if all renames succeeded, False otherwise
2237

2238
  """
2239
  msgs = []
2240
  result = True
2241
  for disk, unique_id in devlist:
2242
    dev = _RecursiveFindBD(disk)
2243
    if dev is None:
2244
      msgs.append("Can't find device %s in rename" % str(disk))
2245
      result = False
2246
      continue
2247
    try:
2248
      old_rpath = dev.dev_path
2249
      dev.Rename(unique_id)
2250
      new_rpath = dev.dev_path
2251
      if old_rpath != new_rpath:
2252
        DevCacheManager.RemoveCache(old_rpath)
2253
        # FIXME: we should add the new cache information here, like:
2254
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2255
        # but we don't have the owner here - maybe parse from existing
2256
        # cache? for now, we only lose lvm data when we rename, which
2257
        # is less critical than DRBD or MD
2258
    except errors.BlockDeviceError, err:
2259
      msgs.append("Can't rename device '%s' to '%s': %s" %
2260
                  (dev, unique_id, err))
2261
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2262
      result = False
2263
  if not result:
2264
    _Fail("; ".join(msgs))
2265

    
2266

    
2267
def _TransformFileStorageDir(file_storage_dir):
2268
  """Checks whether given file_storage_dir is valid.
2269

2270
  Checks wheter the given file_storage_dir is within the cluster-wide
2271
  default file_storage_dir stored in SimpleStore. Only paths under that
2272
  directory are allowed.
2273

2274
  @type file_storage_dir: str
2275
  @param file_storage_dir: the path to check
2276

2277
  @return: the normalized path if valid, None otherwise
2278

2279
  """
2280
  if not constants.ENABLE_FILE_STORAGE:
2281
    _Fail("File storage disabled at configure time")
2282
  cfg = _GetConfig()
2283
  file_storage_dir = os.path.normpath(file_storage_dir)
2284
  base_file_storage_dir = cfg.GetFileStorageDir()
2285
  if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2286
      base_file_storage_dir):
2287
    _Fail("File storage directory '%s' is not under base file"
2288
          " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2289
  return file_storage_dir
2290

    
2291

    
2292
def CreateFileStorageDir(file_storage_dir):
2293
  """Create file storage directory.
2294

2295
  @type file_storage_dir: str
2296
  @param file_storage_dir: directory to create
2297

2298
  @rtype: tuple
2299
  @return: tuple with first element a boolean indicating wheter dir
2300
      creation was successful or not
2301

2302
  """
2303
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2304
  if os.path.exists(file_storage_dir):
2305
    if not os.path.isdir(file_storage_dir):
2306
      _Fail("Specified storage dir '%s' is not a directory",
2307
            file_storage_dir)
2308
  else:
2309
    try:
2310
      os.makedirs(file_storage_dir, 0750)
2311
    except OSError, err:
2312
      _Fail("Cannot create file storage directory '%s': %s",
2313
            file_storage_dir, err, exc=True)
2314

    
2315

    
2316
def RemoveFileStorageDir(file_storage_dir):
2317
  """Remove file storage directory.
2318

2319
  Remove it only if it's empty. If not log an error and return.
2320

2321
  @type file_storage_dir: str
2322
  @param file_storage_dir: the directory we should cleanup
2323
  @rtype: tuple (success,)
2324
  @return: tuple of one element, C{success}, denoting
2325
      whether the operation was successful
2326

2327
  """
2328
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2329
  if os.path.exists(file_storage_dir):
2330
    if not os.path.isdir(file_storage_dir):
2331
      _Fail("Specified Storage directory '%s' is not a directory",
2332
            file_storage_dir)
2333
    # deletes dir only if empty, otherwise we want to fail the rpc call
2334
    try:
2335
      os.rmdir(file_storage_dir)
2336
    except OSError, err:
2337
      _Fail("Cannot remove file storage directory '%s': %s",
2338
            file_storage_dir, err)
2339

    
2340

    
2341
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2342
  """Rename the file storage directory.
2343

2344
  @type old_file_storage_dir: str
2345
  @param old_file_storage_dir: the current path
2346
  @type new_file_storage_dir: str
2347
  @param new_file_storage_dir: the name we should rename to
2348
  @rtype: tuple (success,)
2349
  @return: tuple of one element, C{success}, denoting
2350
      whether the operation was successful
2351

2352
  """
2353
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2354
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2355
  if not os.path.exists(new_file_storage_dir):
2356
    if os.path.isdir(old_file_storage_dir):
2357
      try:
2358
        os.rename(old_file_storage_dir, new_file_storage_dir)
2359
      except OSError, err:
2360
        _Fail("Cannot rename '%s' to '%s': %s",
2361
              old_file_storage_dir, new_file_storage_dir, err)
2362
    else:
2363
      _Fail("Specified storage dir '%s' is not a directory",
2364
            old_file_storage_dir)
2365
  else:
2366
    if os.path.exists(old_file_storage_dir):
2367
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2368
            old_file_storage_dir, new_file_storage_dir)
2369

    
2370

    
2371
def _EnsureJobQueueFile(file_name):
2372
  """Checks whether the given filename is in the queue directory.
2373

2374
  @type file_name: str
2375
  @param file_name: the file name we should check
2376
  @rtype: None
2377
  @raises RPCFail: if the file is not valid
2378

2379
  """
2380
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2381
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2382

    
2383
  if not result:
2384
    _Fail("Passed job queue file '%s' does not belong to"
2385
          " the queue directory '%s'", file_name, queue_dir)
2386

    
2387

    
2388
def JobQueueUpdate(file_name, content):
2389
  """Updates a file in the queue directory.
2390

2391
  This is just a wrapper over L{utils.WriteFile}, with proper
2392
  checking.
2393

2394
  @type file_name: str
2395
  @param file_name: the job file name
2396
  @type content: str
2397
  @param content: the new job contents
2398
  @rtype: boolean
2399
  @return: the success of the operation
2400

2401
  """
2402
  _EnsureJobQueueFile(file_name)
2403
  getents = runtime.GetEnts()
2404

    
2405
  # Write and replace the file atomically
2406
  utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2407
                  gid=getents.masterd_gid)
2408

    
2409

    
2410
def JobQueueRename(old, new):
2411
  """Renames a job queue file.
2412

2413
  This is just a wrapper over os.rename with proper checking.
2414

2415
  @type old: str
2416
  @param old: the old (actual) file name
2417
  @type new: str
2418
  @param new: the desired file name
2419
  @rtype: tuple
2420
  @return: the success of the operation and payload
2421

2422
  """
2423
  _EnsureJobQueueFile(old)
2424
  _EnsureJobQueueFile(new)
2425

    
2426
  utils.RenameFile(old, new, mkdir=True)
2427

    
2428

    
2429
def BlockdevClose(instance_name, disks):
2430
  """Closes the given block devices.
2431

2432
  This means they will be switched to secondary mode (in case of
2433
  DRBD).
2434

2435
  @param instance_name: if the argument is not empty, the symlinks
2436
      of this instance will be removed
2437
  @type disks: list of L{objects.Disk}
2438
  @param disks: the list of disks to be closed
2439
  @rtype: tuple (success, message)
2440
  @return: a tuple of success and message, where success
2441
      indicates the succes of the operation, and message
2442
      which will contain the error details in case we
2443
      failed
2444

2445
  """
2446
  bdevs = []
2447
  for cf in disks:
2448
    rd = _RecursiveFindBD(cf)
2449
    if rd is None:
2450
      _Fail("Can't find device %s", cf)
2451
    bdevs.append(rd)
2452

    
2453
  msg = []
2454
  for rd in bdevs:
2455
    try:
2456
      rd.Close()
2457
    except errors.BlockDeviceError, err:
2458
      msg.append(str(err))
2459
  if msg:
2460
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2461
  else:
2462
    if instance_name:
2463
      _RemoveBlockDevLinks(instance_name, disks)
2464

    
2465

    
2466
def ValidateHVParams(hvname, hvparams):
2467
  """Validates the given hypervisor parameters.
2468

2469
  @type hvname: string
2470
  @param hvname: the hypervisor name
2471
  @type hvparams: dict
2472
  @param hvparams: the hypervisor parameters to be validated
2473
  @rtype: None
2474

2475
  """
2476
  try:
2477
    hv_type = hypervisor.GetHypervisor(hvname)
2478
    hv_type.ValidateParameters(hvparams)
2479
  except errors.HypervisorError, err:
2480
    _Fail(str(err), log=False)
2481

    
2482

    
2483
def _CheckOSPList(os_obj, parameters):
2484
  """Check whether a list of parameters is supported by the OS.
2485

2486
  @type os_obj: L{objects.OS}
2487
  @param os_obj: OS object to check
2488
  @type parameters: list
2489
  @param parameters: the list of parameters to check
2490

2491
  """
2492
  supported = [v[0] for v in os_obj.supported_parameters]
2493
  delta = frozenset(parameters).difference(supported)
2494
  if delta:
2495
    _Fail("The following parameters are not supported"
2496
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2497

    
2498

    
2499
def ValidateOS(required, osname, checks, osparams):
2500
  """Validate the given OS' parameters.
2501

2502
  @type required: boolean
2503
  @param required: whether absence of the OS should translate into
2504
      failure or not
2505
  @type osname: string
2506
  @param osname: the OS to be validated
2507
  @type checks: list
2508
  @param checks: list of the checks to run (currently only 'parameters')
2509
  @type osparams: dict
2510
  @param osparams: dictionary with OS parameters
2511
  @rtype: boolean
2512
  @return: True if the validation passed, or False if the OS was not
2513
      found and L{required} was false
2514

2515
  """
2516
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2517
    _Fail("Unknown checks required for OS %s: %s", osname,
2518
          set(checks).difference(constants.OS_VALIDATE_CALLS))
2519

    
2520
  name_only = objects.OS.GetName(osname)
2521
  status, tbv = _TryOSFromDisk(name_only, None)
2522

    
2523
  if not status:
2524
    if required:
2525
      _Fail(tbv)
2526
    else:
2527
      return False
2528

    
2529
  if max(tbv.api_versions) < constants.OS_API_V20:
2530
    return True
2531

    
2532
  if constants.OS_VALIDATE_PARAMETERS in checks:
2533
    _CheckOSPList(tbv, osparams.keys())
2534

    
2535
  validate_env = OSCoreEnv(osname, tbv, osparams)
2536
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2537
                        cwd=tbv.path)
2538
  if result.failed:
2539
    logging.error("os validate command '%s' returned error: %s output: %s",
2540
                  result.cmd, result.fail_reason, result.output)
2541
    _Fail("OS validation script failed (%s), output: %s",
2542
          result.fail_reason, result.output, log=False)
2543

    
2544
  return True
2545

    
2546

    
2547
def DemoteFromMC():
2548
  """Demotes the current node from master candidate role.
2549

2550
  """
2551
  # try to ensure we're not the master by mistake
2552
  master, myself = ssconf.GetMasterAndMyself()
2553
  if master == myself:
2554
    _Fail("ssconf status shows I'm the master node, will not demote")
2555

    
2556
  result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2557
  if not result.failed:
2558
    _Fail("The master daemon is running, will not demote")
2559

    
2560
  try:
2561
    if os.path.isfile(constants.CLUSTER_CONF_FILE):
2562
      utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2563
  except EnvironmentError, err:
2564
    if err.errno != errno.ENOENT:
2565
      _Fail("Error while backing up cluster file: %s", err, exc=True)
2566

    
2567
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2568

    
2569

    
2570
def _GetX509Filenames(cryptodir, name):
2571
  """Returns the full paths for the private key and certificate.
2572

2573
  """
2574
  return (utils.PathJoin(cryptodir, name),
2575
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2576
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2577

    
2578

    
2579
def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2580
  """Creates a new X509 certificate for SSL/TLS.
2581

2582
  @type validity: int
2583
  @param validity: Validity in seconds
2584
  @rtype: tuple; (string, string)
2585
  @return: Certificate name and public part
2586

2587
  """
2588
  (key_pem, cert_pem) = \
2589
    utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2590
                                     min(validity, _MAX_SSL_CERT_VALIDITY))
2591

    
2592
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
2593
                              prefix="x509-%s-" % utils.TimestampForFilename())
2594
  try:
2595
    name = os.path.basename(cert_dir)
2596
    assert len(name) > 5
2597

    
2598
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2599

    
2600
    utils.WriteFile(key_file, mode=0400, data=key_pem)
2601
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2602

    
2603
    # Never return private key as it shouldn't leave the node
2604
    return (name, cert_pem)
2605
  except Exception:
2606
    shutil.rmtree(cert_dir, ignore_errors=True)
2607
    raise
2608

    
2609

    
2610
def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2611
  """Removes a X509 certificate.
2612

2613
  @type name: string
2614
  @param name: Certificate name
2615

2616
  """
2617
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2618

    
2619
  utils.RemoveFile(key_file)
2620
  utils.RemoveFile(cert_file)
2621

    
2622
  try:
2623
    os.rmdir(cert_dir)
2624
  except EnvironmentError, err:
2625
    _Fail("Cannot remove certificate directory '%s': %s",
2626
          cert_dir, err)
2627

    
2628

    
2629
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2630
  """Returns the command for the requested input/output.
2631

2632
  @type instance: L{objects.Instance}
2633
  @param instance: The instance object
2634
  @param mode: Import/export mode
2635
  @param ieio: Input/output type
2636
  @param ieargs: Input/output arguments
2637

2638
  """
2639
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2640

    
2641
  env = None
2642
  prefix = None
2643
  suffix = None
2644
  exp_size = None
2645

    
2646
  if ieio == constants.IEIO_FILE:
2647
    (filename, ) = ieargs
2648

    
2649
    if not utils.IsNormAbsPath(filename):
2650
      _Fail("Path '%s' is not normalized or absolute", filename)
2651

    
2652
    directory = os.path.normpath(os.path.dirname(filename))
2653

    
2654
    if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2655
        constants.EXPORT_DIR):
2656
      _Fail("File '%s' is not under exports directory '%s'",
2657
            filename, constants.EXPORT_DIR)
2658

    
2659
    # Create directory
2660
    utils.Makedirs(directory, mode=0750)
2661

    
2662
    quoted_filename = utils.ShellQuote(filename)
2663

    
2664
    if mode == constants.IEM_IMPORT:
2665
      suffix = "> %s" % quoted_filename
2666
    elif mode == constants.IEM_EXPORT:
2667
      suffix = "< %s" % quoted_filename
2668

    
2669
      # Retrieve file size
2670
      try:
2671
        st = os.stat(filename)
2672
      except EnvironmentError, err:
2673
        logging.error("Can't stat(2) %s: %s", filename, err)
2674
      else:
2675
        exp_size = utils.BytesToMebibyte(st.st_size)
2676

    
2677
  elif ieio == constants.IEIO_RAW_DISK:
2678
    (disk, ) = ieargs
2679

    
2680
    real_disk = _OpenRealBD(disk)
2681

    
2682
    if mode == constants.IEM_IMPORT:
2683
      # we set here a smaller block size as, due to transport buffering, more
2684
      # than 64-128k will mostly ignored; we use nocreat to fail if the device
2685
      # is not already there or we pass a wrong path; we use notrunc to no
2686
      # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2687
      # much memory; this means that at best, we flush every 64k, which will
2688
      # not be very fast
2689
      suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2690
                                    " bs=%s oflag=dsync"),
2691
                                    real_disk.dev_path,
2692
                                    str(64 * 1024))
2693

    
2694
    elif mode == constants.IEM_EXPORT:
2695
      # the block size on the read dd is 1MiB to match our units
2696
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2697
                                   real_disk.dev_path,
2698
                                   str(1024 * 1024), # 1 MB
2699
                                   str(disk.size))
2700
      exp_size = disk.size
2701

    
2702
  elif ieio == constants.IEIO_SCRIPT:
2703
    (disk, disk_index, ) = ieargs
2704

    
2705
    assert isinstance(disk_index, (int, long))
2706

    
2707
    real_disk = _OpenRealBD(disk)
2708

    
2709
    inst_os = OSFromDisk(instance.os)
2710
    env = OSEnvironment(instance, inst_os)
2711

    
2712
    if mode == constants.IEM_IMPORT:
2713
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2714
      env["IMPORT_INDEX"] = str(disk_index)
2715
      script = inst_os.import_script
2716

    
2717
    elif mode == constants.IEM_EXPORT:
2718
      env["EXPORT_DEVICE"] = real_disk.dev_path
2719
      env["EXPORT_INDEX"] = str(disk_index)
2720
      script = inst_os.export_script
2721

    
2722
    # TODO: Pass special environment only to script
2723
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2724

    
2725
    if mode == constants.IEM_IMPORT:
2726
      suffix = "| %s" % script_cmd
2727

    
2728
    elif mode == constants.IEM_EXPORT:
2729
      prefix = "%s |" % script_cmd
2730

    
2731
    # Let script predict size
2732
    exp_size = constants.IE_CUSTOM_SIZE
2733

    
2734
  else:
2735
    _Fail("Invalid %s I/O mode %r", mode, ieio)
2736

    
2737
  return (env, prefix, suffix, exp_size)
2738

    
2739

    
2740
def _CreateImportExportStatusDir(prefix):
2741
  """Creates status directory for import/export.
2742

2743
  """
2744
  return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2745
                          prefix=("%s-%s-" %
2746
                                  (prefix, utils.TimestampForFilename())))
2747

    
2748

    
2749
def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
2750
  """Starts an import or export daemon.
2751

2752
  @param mode: Import/output mode
2753
  @type opts: L{objects.ImportExportOptions}
2754
  @param opts: Daemon options
2755
  @type host: string
2756
  @param host: Remote host for export (None for import)
2757
  @type port: int
2758
  @param port: Remote port for export (None for import)
2759
  @type instance: L{objects.Instance}
2760
  @param instance: Instance object
2761
  @param ieio: Input/output type
2762
  @param ieioargs: Input/output arguments
2763

2764
  """
2765
  if mode == constants.IEM_IMPORT:
2766
    prefix = "import"
2767

    
2768
    if not (host is None and port is None):
2769
      _Fail("Can not specify host or port on import")
2770

    
2771
  elif mode == constants.IEM_EXPORT:
2772
    prefix = "export"
2773

    
2774
    if host is None or port is None:
2775
      _Fail("Host and port must be specified for an export")
2776

    
2777
  else:
2778
    _Fail("Invalid mode %r", mode)
2779

    
2780
  if (opts.key_name is None) ^ (opts.ca_pem is None):
2781
    _Fail("Cluster certificate can only be used for both key and CA")
2782

    
2783
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
2784
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
2785

    
2786
  if opts.key_name is None:
2787
    # Use server.pem
2788
    key_path = constants.NODED_CERT_FILE
2789
    cert_path = constants.NODED_CERT_FILE
2790
    assert opts.ca_pem is None
2791
  else:
2792
    (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
2793
                                                 opts.key_name)
2794
    assert opts.ca_pem is not None
2795

    
2796
  for i in [key_path, cert_path]:
2797
    if not os.path.exists(i):
2798
      _Fail("File '%s' does not exist" % i)
2799

    
2800
  status_dir = _CreateImportExportStatusDir(prefix)
2801
  try:
2802
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
2803
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
2804
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
2805

    
2806
    if opts.ca_pem is None:
2807
      # Use server.pem
2808
      ca = utils.ReadFile(constants.NODED_CERT_FILE)
2809
    else:
2810
      ca = opts.ca_pem
2811

    
2812
    # Write CA file
2813
    utils.WriteFile(ca_file, data=ca, mode=0400)
2814

    
2815
    cmd = [
2816
      constants.IMPORT_EXPORT_DAEMON,
2817
      status_file, mode,
2818
      "--key=%s" % key_path,
2819
      "--cert=%s" % cert_path,
2820
      "--ca=%s" % ca_file,
2821
      ]
2822

    
2823
    if host:
2824
      cmd.append("--host=%s" % host)
2825

    
2826
    if port:
2827
      cmd.append("--port=%s" % port)
2828

    
2829
    if opts.compress:
2830
      cmd.append("--compress=%s" % opts.compress)
2831

    
2832
    if opts.magic:
2833
      cmd.append("--magic=%s" % opts.magic)
2834

    
2835
    if exp_size is not None:
2836
      cmd.append("--expected-size=%s" % exp_size)
2837

    
2838
    if cmd_prefix:
2839
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
2840

    
2841
    if cmd_suffix:
2842
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
2843

    
2844
    logfile = _InstanceLogName(prefix, instance.os, instance.name)
2845

    
2846
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
2847
    # support for receiving a file descriptor for output
2848
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
2849
                      output=logfile)
2850

    
2851
    # The import/export name is simply the status directory name
2852
    return os.path.basename(status_dir)
2853

    
2854
  except Exception:
2855
    shutil.rmtree(status_dir, ignore_errors=True)
2856
    raise
2857

    
2858

    
2859
def GetImportExportStatus(names):
2860
  """Returns import/export daemon status.
2861

2862
  @type names: sequence
2863
  @param names: List of names
2864
  @rtype: List of dicts
2865
  @return: Returns a list of the state of each named import/export or None if a
2866
           status couldn't be read
2867

2868
  """
2869
  result = []
2870

    
2871
  for name in names:
2872
    status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
2873
                                 _IES_STATUS_FILE)
2874

    
2875
    try:
2876
      data = utils.ReadFile(status_file)
2877
    except EnvironmentError, err:
2878
      if err.errno != errno.ENOENT:
2879
        raise
2880
      data = None
2881

    
2882
    if not data:
2883
      result.append(None)
2884
      continue
2885

    
2886
    result.append(serializer.LoadJson(data))
2887

    
2888
  return result
2889

    
2890

    
2891
def AbortImportExport(name):
2892
  """Sends SIGTERM to a running import/export daemon.
2893

2894
  """
2895
  logging.info("Abort import/export %s", name)
2896

    
2897
  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2898
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2899

    
2900
  if pid:
2901
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
2902
                 name, pid)
2903
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
2904

    
2905

    
2906
def CleanupImportExport(name):
2907
  """Cleanup after an import or export.
2908

2909
  If the import/export daemon is still running it's killed. Afterwards the
2910
  whole status directory is removed.
2911

2912
  """
2913
  logging.info("Finalizing import/export %s", name)
2914

    
2915
  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2916

    
2917
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2918

    
2919
  if pid:
2920
    logging.info("Import/export %s is still running with PID %s",
2921
                 name, pid)
2922
    utils.KillProcess(pid, waitpid=False)
2923

    
2924
  shutil.rmtree(status_dir, ignore_errors=True)
2925

    
2926

    
2927
def _FindDisks(nodes_ip, disks):
2928
  """Sets the physical ID on disks and returns the block devices.
2929

2930
  """
2931
  # set the correct physical ID
2932
  my_name = netutils.Hostname.GetSysName()
2933
  for cf in disks:
2934
    cf.SetPhysicalID(my_name, nodes_ip)
2935

    
2936
  bdevs = []
2937

    
2938
  for cf in disks:
2939
    rd = _RecursiveFindBD(cf)
2940
    if rd is None:
2941
      _Fail("Can't find device %s", cf)
2942
    bdevs.append(rd)
2943
  return bdevs
2944

    
2945

    
2946
def DrbdDisconnectNet(nodes_ip, disks):
2947
  """Disconnects the network on a list of drbd devices.
2948

2949
  """
2950
  bdevs = _FindDisks(nodes_ip, disks)
2951

    
2952
  # disconnect disks
2953
  for rd in bdevs:
2954
    try:
2955
      rd.DisconnectNet()
2956
    except errors.BlockDeviceError, err:
2957
      _Fail("Can't change network configuration to standalone mode: %s",
2958
            err, exc=True)
2959

    
2960

    
2961
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2962
  """Attaches the network on a list of drbd devices.
2963

2964
  """
2965
  bdevs = _FindDisks(nodes_ip, disks)
2966

    
2967
  if multimaster:
2968
    for idx, rd in enumerate(bdevs):
2969
      try:
2970
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2971
      except EnvironmentError, err:
2972
        _Fail("Can't create symlink: %s", err)
2973
  # reconnect disks, switch to new master configuration and if
2974
  # needed primary mode
2975
  for rd in bdevs:
2976
    try:
2977
      rd.AttachNet(multimaster)
2978
    except errors.BlockDeviceError, err:
2979
      _Fail("Can't change network configuration: %s", err)
2980

    
2981
  # wait until the disks are connected; we need to retry the re-attach
2982
  # if the device becomes standalone, as this might happen if the one
2983
  # node disconnects and reconnects in a different mode before the
2984
  # other node reconnects; in this case, one or both of the nodes will
2985
  # decide it has wrong configuration and switch to standalone
2986

    
2987
  def _Attach():
2988
    all_connected = True
2989

    
2990
    for rd in bdevs:
2991
      stats = rd.GetProcStatus()
2992

    
2993
      all_connected = (all_connected and
2994
                       (stats.is_connected or stats.is_in_resync))
2995

    
2996
      if stats.is_standalone:
2997
        # peer had different config info and this node became
2998
        # standalone, even though this should not happen with the
2999
        # new staged way of changing disk configs
3000
        try:
3001
          rd.AttachNet(multimaster)
3002
        except errors.BlockDeviceError, err:
3003
          _Fail("Can't change network configuration: %s", err)
3004

    
3005
    if not all_connected:
3006
      raise utils.RetryAgain()
3007

    
3008
  try:
3009
    # Start with a delay of 100 miliseconds and go up to 5 seconds
3010
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3011
  except utils.RetryTimeout:
3012
    _Fail("Timeout in disk reconnecting")
3013

    
3014
  if multimaster:
3015
    # change to primary mode
3016
    for rd in bdevs:
3017
      try:
3018
        rd.Open()
3019
      except errors.BlockDeviceError, err:
3020
        _Fail("Can't change to primary mode: %s", err)
3021

    
3022

    
3023
def DrbdWaitSync(nodes_ip, disks):
3024
  """Wait until DRBDs have synchronized.
3025

3026
  """
3027
  def _helper(rd):
3028
    stats = rd.GetProcStatus()
3029
    if not (stats.is_connected or stats.is_in_resync):
3030
      raise utils.RetryAgain()
3031
    return stats
3032

    
3033
  bdevs = _FindDisks(nodes_ip, disks)
3034

    
3035
  min_resync = 100
3036
  alldone = True
3037
  for rd in bdevs:
3038
    try:
3039
      # poll each second for 15 seconds
3040
      stats = utils.Retry(_helper, 1, 15, args=[rd])
3041
    except utils.RetryTimeout:
3042
      stats = rd.GetProcStatus()
3043
      # last check
3044
      if not (stats.is_connected or stats.is_in_resync):
3045
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3046
    alldone = alldone and (not stats.is_in_resync)
3047
    if stats.sync_percent is not None:
3048
      min_resync = min(min_resync, stats.sync_percent)
3049

    
3050
  return (alldone, min_resync)
3051

    
3052

    
3053
def GetDrbdUsermodeHelper():
3054
  """Returns DRBD usermode helper currently configured.
3055

3056
  """
3057
  try:
3058
    return bdev.BaseDRBD.GetUsermodeHelper()
3059
  except errors.BlockDeviceError, err:
3060
    _Fail(str(err))
3061

    
3062

    
3063
def PowercycleNode(hypervisor_type):
3064
  """Hard-powercycle the node.
3065

3066
  Because we need to return first, and schedule the powercycle in the
3067
  background, we won't be able to report failures nicely.
3068

3069
  """
3070
  hyper = hypervisor.GetHypervisor(hypervisor_type)
3071
  try:
3072
    pid = os.fork()
3073
  except OSError:
3074
    # if we can't fork, we'll pretend that we're in the child process
3075
    pid = 0
3076
  if pid > 0:
3077
    return "Reboot scheduled in 5 seconds"
3078
  # ensure the child is running on ram
3079
  try:
3080
    utils.Mlockall()
3081
  except Exception: # pylint: disable-msg=W0703
3082
    pass
3083
  time.sleep(5)
3084
  hyper.PowercycleNode()
3085

    
3086

    
3087
class HooksRunner(object):
3088
  """Hook runner.
3089

3090
  This class is instantiated on the node side (ganeti-noded) and not
3091
  on the master side.
3092

3093
  """
3094
  def __init__(self, hooks_base_dir=None):
3095
    """Constructor for hooks runner.
3096

3097
    @type hooks_base_dir: str or None
3098
    @param hooks_base_dir: if not None, this overrides the
3099
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
3100

3101
    """
3102
    if hooks_base_dir is None:
3103
      hooks_base_dir = constants.HOOKS_BASE_DIR
3104
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
3105
    # constant
3106
    self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
3107

    
3108
  def RunHooks(self, hpath, phase, env):
3109
    """Run the scripts in the hooks directory.
3110

3111
    @type hpath: str
3112
    @param hpath: the path to the hooks directory which
3113
        holds the scripts
3114
    @type phase: str
3115
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
3116
        L{constants.HOOKS_PHASE_POST}
3117
    @type env: dict
3118
    @param env: dictionary with the environment for the hook
3119
    @rtype: list
3120
    @return: list of 3-element tuples:
3121
      - script path
3122
      - script result, either L{constants.HKR_SUCCESS} or
3123
        L{constants.HKR_FAIL}
3124
      - output of the script
3125

3126
    @raise errors.ProgrammerError: for invalid input
3127
        parameters
3128

3129
    """
3130
    if phase == constants.HOOKS_PHASE_PRE:
3131
      suffix = "pre"
3132
    elif phase == constants.HOOKS_PHASE_POST:
3133
      suffix = "post"
3134
    else:
3135
      _Fail("Unknown hooks phase '%s'", phase)
3136

    
3137

    
3138
    subdir = "%s-%s.d" % (hpath, suffix)
3139
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3140

    
3141
    results = []
3142

    
3143
    if not os.path.isdir(dir_name):
3144
      # for non-existing/non-dirs, we simply exit instead of logging a
3145
      # warning at every operation
3146
      return results
3147

    
3148
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3149

    
3150
    for (relname, relstatus, runresult)  in runparts_results:
3151
      if relstatus == constants.RUNPARTS_SKIP:
3152
        rrval = constants.HKR_SKIP
3153
        output = ""
3154
      elif relstatus == constants.RUNPARTS_ERR:
3155
        rrval = constants.HKR_FAIL
3156
        output = "Hook script execution error: %s" % runresult
3157
      elif relstatus == constants.RUNPARTS_RUN:
3158
        if runresult.failed:
3159
          rrval = constants.HKR_FAIL
3160
        else:
3161
          rrval = constants.HKR_SUCCESS
3162
        output = utils.SafeEncode(runresult.output.strip())
3163
      results.append(("%s/%s" % (subdir, relname), rrval, output))
3164

    
3165
    return results
3166

    
3167

    
3168
class IAllocatorRunner(object):
3169
  """IAllocator runner.
3170

3171
  This class is instantiated on the node side (ganeti-noded) and not on
3172
  the master side.
3173

3174
  """
3175
  @staticmethod
3176
  def Run(name, idata):
3177
    """Run an iallocator script.
3178

3179
    @type name: str
3180
    @param name: the iallocator script name
3181
    @type idata: str
3182
    @param idata: the allocator input data
3183

3184
    @rtype: tuple
3185
    @return: two element tuple of:
3186
       - status
3187
       - either error message or stdout of allocator (for success)
3188

3189
    """
3190
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3191
                                  os.path.isfile)
3192
    if alloc_script is None:
3193
      _Fail("iallocator module '%s' not found in the search path", name)
3194

    
3195
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3196
    try:
3197
      os.write(fd, idata)
3198
      os.close(fd)
3199
      result = utils.RunCmd([alloc_script, fin_name])
3200
      if result.failed:
3201
        _Fail("iallocator module '%s' failed: %s, output '%s'",
3202
              name, result.fail_reason, result.output)
3203
    finally:
3204
      os.unlink(fin_name)
3205

    
3206
    return result.stdout
3207

    
3208

    
3209
class DevCacheManager(object):
3210
  """Simple class for managing a cache of block device information.
3211

3212
  """
3213
  _DEV_PREFIX = "/dev/"
3214
  _ROOT_DIR = constants.BDEV_CACHE_DIR
3215

    
3216
  @classmethod
3217
  def _ConvertPath(cls, dev_path):
3218
    """Converts a /dev/name path to the cache file name.
3219

3220
    This replaces slashes with underscores and strips the /dev
3221
    prefix. It then returns the full path to the cache file.
3222

3223
    @type dev_path: str
3224
    @param dev_path: the C{/dev/} path name
3225
    @rtype: str
3226
    @return: the converted path name
3227

3228
    """
3229
    if dev_path.startswith(cls._DEV_PREFIX):
3230
      dev_path = dev_path[len(cls._DEV_PREFIX):]
3231
    dev_path = dev_path.replace("/", "_")
3232
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3233
    return fpath
3234

    
3235
  @classmethod
3236
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3237
    """Updates the cache information for a given device.
3238

3239
    @type dev_path: str
3240
    @param dev_path: the pathname of the device
3241
    @type owner: str
3242
    @param owner: the owner (instance name) of the device
3243
    @type on_primary: bool
3244
    @param on_primary: whether this is the primary
3245
        node nor not
3246
    @type iv_name: str
3247
    @param iv_name: the instance-visible name of the
3248
        device, as in objects.Disk.iv_name
3249

3250
    @rtype: None
3251

3252
    """
3253
    if dev_path is None:
3254
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
3255
      return
3256
    fpath = cls._ConvertPath(dev_path)
3257
    if on_primary:
3258
      state = "primary"
3259
    else:
3260
      state = "secondary"
3261
    if iv_name is None:
3262
      iv_name = "not_visible"
3263
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3264
    try:
3265
      utils.WriteFile(fpath, data=fdata)
3266
    except EnvironmentError, err:
3267
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3268

    
3269
  @classmethod
3270
  def RemoveCache(cls, dev_path):
3271
    """Remove data for a dev_path.
3272

3273
    This is just a wrapper over L{utils.RemoveFile} with a converted
3274
    path name and logging.
3275

3276
    @type dev_path: str
3277
    @param dev_path: the pathname of the device
3278

3279
    @rtype: None
3280

3281
    """
3282
    if dev_path is None:
3283
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
3284
      return
3285
    fpath = cls._ConvertPath(dev_path)
3286
    try:
3287
      utils.RemoveFile(fpath)
3288
    except EnvironmentError, err:
3289
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)