Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 360b0dc2

History | View | Annotate | Download (81 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26

27
"""
28

    
29

    
30
import os
31
import os.path
32
import shutil
33
import time
34
import stat
35
import errno
36
import re
37
import subprocess
38
import random
39
import logging
40
import tempfile
41
import zlib
42
import base64
43

    
44
from ganeti import errors
45
from ganeti import utils
46
from ganeti import ssh
47
from ganeti import hypervisor
48
from ganeti import constants
49
from ganeti import bdev
50
from ganeti import objects
51
from ganeti import ssconf
52

    
53

    
54
def _GetConfig():
55
  """Simple wrapper to return a SimpleStore.
56

57
  @rtype: L{ssconf.SimpleStore}
58
  @return: a SimpleStore instance
59

60
  """
61
  return ssconf.SimpleStore()
62

    
63

    
64
def _GetSshRunner(cluster_name):
65
  """Simple wrapper to return an SshRunner.
66

67
  @type cluster_name: str
68
  @param cluster_name: the cluster name, which is needed
69
      by the SshRunner constructor
70
  @rtype: L{ssh.SshRunner}
71
  @return: an SshRunner instance
72

73
  """
74
  return ssh.SshRunner(cluster_name)
75

    
76

    
77
def _Decompress(data):
78
  """Unpacks data compressed by the RPC client.
79

80
  @type data: list or tuple
81
  @param data: Data sent by RPC client
82
  @rtype: str
83
  @return: Decompressed data
84

85
  """
86
  assert isinstance(data, (list, tuple))
87
  assert len(data) == 2
88
  (encoding, content) = data
89
  if encoding == constants.RPC_ENCODING_NONE:
90
    return content
91
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
92
    return zlib.decompress(base64.b64decode(content))
93
  else:
94
    raise AssertionError("Unknown data encoding")
95

    
96

    
97
def _CleanDirectory(path, exclude=None):
98
  """Removes all regular files in a directory.
99

100
  @type path: str
101
  @param path: the directory to clean
102
  @type exclude: list
103
  @param exclude: list of files to be excluded, defaults
104
      to the empty list
105

106
  """
107
  if not os.path.isdir(path):
108
    return
109
  if exclude is None:
110
    exclude = []
111
  else:
112
    # Normalize excluded paths
113
    exclude = [os.path.normpath(i) for i in exclude]
114

    
115
  for rel_name in utils.ListVisibleFiles(path):
116
    full_name = os.path.normpath(os.path.join(path, rel_name))
117
    if full_name in exclude:
118
      continue
119
    if os.path.isfile(full_name) and not os.path.islink(full_name):
120
      utils.RemoveFile(full_name)
121

    
122

    
123
def _BuildUploadFileList():
124
  """Build the list of allowed upload files.
125

126
  This is abstracted so that it's built only once at module import time.
127

128
  """
129
  return frozenset([
130
      constants.CLUSTER_CONF_FILE,
131
      constants.ETC_HOSTS,
132
      constants.SSH_KNOWN_HOSTS_FILE,
133
      constants.VNC_PASSWORD_FILE,
134
      ])
135

    
136

    
137
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
138

    
139

    
140
def JobQueuePurge():
141
  """Removes job queue files and archived jobs.
142

143
  @rtype: None
144

145
  """
146
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
147
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
148

    
149

    
150
def GetMasterInfo():
151
  """Returns master information.
152

153
  This is an utility function to compute master information, either
154
  for consumption here or from the node daemon.
155

156
  @rtype: tuple
157
  @return: (master_netdev, master_ip, master_name) if we have a good
158
      configuration, otherwise (None, None, None)
159

160
  """
161
  try:
162
    cfg = _GetConfig()
163
    master_netdev = cfg.GetMasterNetdev()
164
    master_ip = cfg.GetMasterIP()
165
    master_node = cfg.GetMasterNode()
166
  except errors.ConfigurationError:
167
    logging.exception("Cluster configuration incomplete")
168
    return (None, None, None)
169
  return (master_netdev, master_ip, master_node)
170

    
171

    
172
def StartMaster(start_daemons):
173
  """Activate local node as master node.
174

175
  The function will always try activate the IP address of the master
176
  (unless someone else has it). It will also start the master daemons,
177
  based on the start_daemons parameter.
178

179
  @type start_daemons: boolean
180
  @param start_daemons: whther to also start the master
181
      daemons (ganeti-masterd and ganeti-rapi)
182
  @rtype: None
183

184
  """
185
  ok = True
186
  master_netdev, master_ip, _ = GetMasterInfo()
187
  if not master_netdev:
188
    return False
189

    
190
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
191
    if utils.OwnIpAddress(master_ip):
192
      # we already have the ip:
193
      logging.debug("Already started")
194
    else:
195
      logging.error("Someone else has the master ip, not activating")
196
      ok = False
197
  else:
198
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
199
                           "dev", master_netdev, "label",
200
                           "%s:0" % master_netdev])
201
    if result.failed:
202
      logging.error("Can't activate master IP: %s", result.output)
203
      ok = False
204

    
205
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
206
                           "-s", master_ip, master_ip])
207
    # we'll ignore the exit code of arping
208

    
209
  # and now start the master and rapi daemons
210
  if start_daemons:
211
    for daemon in 'ganeti-masterd', 'ganeti-rapi':
212
      result = utils.RunCmd([daemon])
213
      if result.failed:
214
        logging.error("Can't start daemon %s: %s", daemon, result.output)
215
        ok = False
216
  return ok
217

    
218

    
219
def StopMaster(stop_daemons):
220
  """Deactivate this node as master.
221

222
  The function will always try to deactivate the IP address of the
223
  master. It will also stop the master daemons depending on the
224
  stop_daemons parameter.
225

226
  @type stop_daemons: boolean
227
  @param stop_daemons: whether to also stop the master daemons
228
      (ganeti-masterd and ganeti-rapi)
229
  @rtype: None
230

231
  """
232
  master_netdev, master_ip, _ = GetMasterInfo()
233
  if not master_netdev:
234
    return False
235

    
236
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
237
                         "dev", master_netdev])
238
  if result.failed:
239
    logging.error("Can't remove the master IP, error: %s", result.output)
240
    # but otherwise ignore the failure
241

    
242
  if stop_daemons:
243
    # stop/kill the rapi and the master daemon
244
    for daemon in constants.RAPI_PID, constants.MASTERD_PID:
245
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
246

    
247
  return True
248

    
249

    
250
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
251
  """Joins this node to the cluster.
252

253
  This does the following:
254
      - updates the hostkeys of the machine (rsa and dsa)
255
      - adds the ssh private key to the user
256
      - adds the ssh public key to the users' authorized_keys file
257

258
  @type dsa: str
259
  @param dsa: the DSA private key to write
260
  @type dsapub: str
261
  @param dsapub: the DSA public key to write
262
  @type rsa: str
263
  @param rsa: the RSA private key to write
264
  @type rsapub: str
265
  @param rsapub: the RSA public key to write
266
  @type sshkey: str
267
  @param sshkey: the SSH private key to write
268
  @type sshpub: str
269
  @param sshpub: the SSH public key to write
270
  @rtype: boolean
271
  @return: the success of the operation
272

273
  """
274
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
275
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
276
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
277
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
278
  for name, content, mode in sshd_keys:
279
    utils.WriteFile(name, data=content, mode=mode)
280

    
281
  try:
282
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
283
                                                    mkdir=True)
284
  except errors.OpExecError, err:
285
    msg = "Error while processing user ssh files"
286
    logging.exception(msg)
287
    return (False, "%s: %s" % (msg, err))
288

    
289
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
290
    utils.WriteFile(name, data=content, mode=0600)
291

    
292
  utils.AddAuthorizedKey(auth_keys, sshpub)
293

    
294
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
295

    
296
  return (True, "Node added successfully")
297

    
298

    
299
def LeaveCluster():
300
  """Cleans up and remove the current node.
301

302
  This function cleans up and prepares the current node to be removed
303
  from the cluster.
304

305
  If processing is successful, then it raises an
306
  L{errors.QuitGanetiException} which is used as a special case to
307
  shutdown the node daemon.
308

309
  """
310
  _CleanDirectory(constants.DATA_DIR)
311
  JobQueuePurge()
312

    
313
  try:
314
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
315
  except errors.OpExecError:
316
    logging.exception("Error while processing ssh files")
317
    return
318

    
319
  f = open(pub_key, 'r')
320
  try:
321
    utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
322
  finally:
323
    f.close()
324

    
325
  utils.RemoveFile(priv_key)
326
  utils.RemoveFile(pub_key)
327

    
328
  # Return a reassuring string to the caller, and quit
329
  raise errors.QuitGanetiException(False, 'Shutdown scheduled')
330

    
331

    
332
def GetNodeInfo(vgname, hypervisor_type):
333
  """Gives back a hash with different information about the node.
334

335
  @type vgname: C{string}
336
  @param vgname: the name of the volume group to ask for disk space information
337
  @type hypervisor_type: C{str}
338
  @param hypervisor_type: the name of the hypervisor to ask for
339
      memory information
340
  @rtype: C{dict}
341
  @return: dictionary with the following keys:
342
      - vg_size is the size of the configured volume group in MiB
343
      - vg_free is the free size of the volume group in MiB
344
      - memory_dom0 is the memory allocated for domain0 in MiB
345
      - memory_free is the currently available (free) ram in MiB
346
      - memory_total is the total number of ram in MiB
347

348
  """
349
  outputarray = {}
350
  vginfo = _GetVGInfo(vgname)
351
  outputarray['vg_size'] = vginfo['vg_size']
352
  outputarray['vg_free'] = vginfo['vg_free']
353

    
354
  hyper = hypervisor.GetHypervisor(hypervisor_type)
355
  hyp_info = hyper.GetNodeInfo()
356
  if hyp_info is not None:
357
    outputarray.update(hyp_info)
358

    
359
  f = open("/proc/sys/kernel/random/boot_id", 'r')
360
  try:
361
    outputarray["bootid"] = f.read(128).rstrip("\n")
362
  finally:
363
    f.close()
364

    
365
  return outputarray
366

    
367

    
368
def VerifyNode(what, cluster_name):
369
  """Verify the status of the local node.
370

371
  Based on the input L{what} parameter, various checks are done on the
372
  local node.
373

374
  If the I{filelist} key is present, this list of
375
  files is checksummed and the file/checksum pairs are returned.
376

377
  If the I{nodelist} key is present, we check that we have
378
  connectivity via ssh with the target nodes (and check the hostname
379
  report).
380

381
  If the I{node-net-test} key is present, we check that we have
382
  connectivity to the given nodes via both primary IP and, if
383
  applicable, secondary IPs.
384

385
  @type what: C{dict}
386
  @param what: a dictionary of things to check:
387
      - filelist: list of files for which to compute checksums
388
      - nodelist: list of nodes we should check ssh communication with
389
      - node-net-test: list of nodes we should check node daemon port
390
        connectivity with
391
      - hypervisor: list with hypervisors to run the verify for
392
  @rtype: dict
393
  @return: a dictionary with the same keys as the input dict, and
394
      values representing the result of the checks
395

396
  """
397
  result = {}
398

    
399
  if constants.NV_HYPERVISOR in what:
400
    result[constants.NV_HYPERVISOR] = tmp = {}
401
    for hv_name in what[constants.NV_HYPERVISOR]:
402
      tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
403

    
404
  if constants.NV_FILELIST in what:
405
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
406
      what[constants.NV_FILELIST])
407

    
408
  if constants.NV_NODELIST in what:
409
    result[constants.NV_NODELIST] = tmp = {}
410
    random.shuffle(what[constants.NV_NODELIST])
411
    for node in what[constants.NV_NODELIST]:
412
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
413
      if not success:
414
        tmp[node] = message
415

    
416
  if constants.NV_NODENETTEST in what:
417
    result[constants.NV_NODENETTEST] = tmp = {}
418
    my_name = utils.HostInfo().name
419
    my_pip = my_sip = None
420
    for name, pip, sip in what[constants.NV_NODENETTEST]:
421
      if name == my_name:
422
        my_pip = pip
423
        my_sip = sip
424
        break
425
    if not my_pip:
426
      tmp[my_name] = ("Can't find my own primary/secondary IP"
427
                      " in the node list")
428
    else:
429
      port = utils.GetNodeDaemonPort()
430
      for name, pip, sip in what[constants.NV_NODENETTEST]:
431
        fail = []
432
        if not utils.TcpPing(pip, port, source=my_pip):
433
          fail.append("primary")
434
        if sip != pip:
435
          if not utils.TcpPing(sip, port, source=my_sip):
436
            fail.append("secondary")
437
        if fail:
438
          tmp[name] = ("failure using the %s interface(s)" %
439
                       " and ".join(fail))
440

    
441
  if constants.NV_LVLIST in what:
442
    result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
443

    
444
  if constants.NV_INSTANCELIST in what:
445
    result[constants.NV_INSTANCELIST] = GetInstanceList(
446
      what[constants.NV_INSTANCELIST])
447

    
448
  if constants.NV_VGLIST in what:
449
    result[constants.NV_VGLIST] = ListVolumeGroups()
450

    
451
  if constants.NV_VERSION in what:
452
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
453
                                    constants.RELEASE_VERSION)
454

    
455
  if constants.NV_HVINFO in what:
456
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
457
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
458

    
459
  if constants.NV_DRBDLIST in what:
460
    try:
461
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
462
    except errors.BlockDeviceError, err:
463
      logging.warning("Can't get used minors list", exc_info=True)
464
      used_minors = str(err)
465
    result[constants.NV_DRBDLIST] = used_minors
466

    
467
  return result
468

    
469

    
470
def GetVolumeList(vg_name):
471
  """Compute list of logical volumes and their size.
472

473
  @type vg_name: str
474
  @param vg_name: the volume group whose LVs we should list
475
  @rtype: dict
476
  @return:
477
      dictionary of all partions (key) with value being a tuple of
478
      their size (in MiB), inactive and online status::
479

480
        {'test1': ('20.06', True, True)}
481

482
      in case of errors, a string is returned with the error
483
      details.
484

485
  """
486
  lvs = {}
487
  sep = '|'
488
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
489
                         "--separator=%s" % sep,
490
                         "-olv_name,lv_size,lv_attr", vg_name])
491
  if result.failed:
492
    logging.error("Failed to list logical volumes, lvs output: %s",
493
                  result.output)
494
    return result.output
495

    
496
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
497
  for line in result.stdout.splitlines():
498
    line = line.strip()
499
    match = valid_line_re.match(line)
500
    if not match:
501
      logging.error("Invalid line returned from lvs output: '%s'", line)
502
      continue
503
    name, size, attr = match.groups()
504
    inactive = attr[4] == '-'
505
    online = attr[5] == 'o'
506
    lvs[name] = (size, inactive, online)
507

    
508
  return lvs
509

    
510

    
511
def ListVolumeGroups():
512
  """List the volume groups and their size.
513

514
  @rtype: dict
515
  @return: dictionary with keys volume name and values the
516
      size of the volume
517

518
  """
519
  return utils.ListVolumeGroups()
520

    
521

    
522
def NodeVolumes():
523
  """List all volumes on this node.
524

525
  @rtype: list
526
  @return:
527
    A list of dictionaries, each having four keys:
528
      - name: the logical volume name,
529
      - size: the size of the logical volume
530
      - dev: the physical device on which the LV lives
531
      - vg: the volume group to which it belongs
532

533
    In case of errors, we return an empty list and log the
534
    error.
535

536
    Note that since a logical volume can live on multiple physical
537
    volumes, the resulting list might include a logical volume
538
    multiple times.
539

540
  """
541
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
542
                         "--separator=|",
543
                         "--options=lv_name,lv_size,devices,vg_name"])
544
  if result.failed:
545
    logging.error("Failed to list logical volumes, lvs output: %s",
546
                  result.output)
547
    return []
548

    
549
  def parse_dev(dev):
550
    if '(' in dev:
551
      return dev.split('(')[0]
552
    else:
553
      return dev
554

    
555
  def map_line(line):
556
    return {
557
      'name': line[0].strip(),
558
      'size': line[1].strip(),
559
      'dev': parse_dev(line[2].strip()),
560
      'vg': line[3].strip(),
561
    }
562

    
563
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
564
          if line.count('|') >= 3]
565

    
566

    
567
def BridgesExist(bridges_list):
568
  """Check if a list of bridges exist on the current node.
569

570
  @rtype: boolean
571
  @return: C{True} if all of them exist, C{False} otherwise
572

573
  """
574
  for bridge in bridges_list:
575
    if not utils.BridgeExists(bridge):
576
      return False
577

    
578
  return True
579

    
580

    
581
def GetInstanceList(hypervisor_list):
582
  """Provides a list of instances.
583

584
  @type hypervisor_list: list
585
  @param hypervisor_list: the list of hypervisors to query information
586

587
  @rtype: list
588
  @return: a list of all running instances on the current node
589
    - instance1.example.com
590
    - instance2.example.com
591

592
  """
593
  results = []
594
  for hname in hypervisor_list:
595
    try:
596
      names = hypervisor.GetHypervisor(hname).ListInstances()
597
      results.extend(names)
598
    except errors.HypervisorError:
599
      logging.exception("Error enumerating instances for hypevisor %s", hname)
600
      raise
601

    
602
  return results
603

    
604

    
605
def GetInstanceInfo(instance, hname):
606
  """Gives back the information about an instance as a dictionary.
607

608
  @type instance: string
609
  @param instance: the instance name
610
  @type hname: string
611
  @param hname: the hypervisor type of the instance
612

613
  @rtype: dict
614
  @return: dictionary with the following keys:
615
      - memory: memory size of instance (int)
616
      - state: xen state of instance (string)
617
      - time: cpu time of instance (float)
618

619
  """
620
  output = {}
621

    
622
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
623
  if iinfo is not None:
624
    output['memory'] = iinfo[2]
625
    output['state'] = iinfo[4]
626
    output['time'] = iinfo[5]
627

    
628
  return output
629

    
630

    
631
def GetInstanceMigratable(instance):
632
  """Gives whether an instance can be migrated.
633

634
  @type instance: L{objects.Instance}
635
  @param instance: object representing the instance to be checked.
636

637
  @rtype: tuple
638
  @return: tuple of (result, description) where:
639
      - result: whether the instance can be migrated or not
640
      - description: a description of the issue, if relevant
641

642
  """
643
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
644
  if instance.name not in hyper.ListInstances():
645
    return (False, 'not running')
646

    
647
  for idx in range(len(instance.disks)):
648
    link_name = _GetBlockDevSymlinkPath(instance.name, idx)
649
    if not os.path.islink(link_name):
650
      return (False, 'not restarted since ganeti 1.2.5')
651

    
652
  return (True, '')
653

    
654

    
655
def GetAllInstancesInfo(hypervisor_list):
656
  """Gather data about all instances.
657

658
  This is the equivalent of L{GetInstanceInfo}, except that it
659
  computes data for all instances at once, thus being faster if one
660
  needs data about more than one instance.
661

662
  @type hypervisor_list: list
663
  @param hypervisor_list: list of hypervisors to query for instance data
664

665
  @rtype: dict
666
  @return: dictionary of instance: data, with data having the following keys:
667
      - memory: memory size of instance (int)
668
      - state: xen state of instance (string)
669
      - time: cpu time of instance (float)
670
      - vcpus: the number of vcpus
671

672
  """
673
  output = {}
674

    
675
  for hname in hypervisor_list:
676
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
677
    if iinfo:
678
      for name, inst_id, memory, vcpus, state, times in iinfo:
679
        value = {
680
          'memory': memory,
681
          'vcpus': vcpus,
682
          'state': state,
683
          'time': times,
684
          }
685
        if name in output:
686
          # we only check static parameters, like memory and vcpus,
687
          # and not state and time which can change between the
688
          # invocations of the different hypervisors
689
          for key in 'memory', 'vcpus':
690
            if value[key] != output[name][key]:
691
              raise errors.HypervisorError("Instance %s is running twice"
692
                                           " with different parameters" % name)
693
        output[name] = value
694

    
695
  return output
696

    
697

    
698
def InstanceOsAdd(instance):
699
  """Add an OS to an instance.
700

701
  @type instance: L{objects.Instance}
702
  @param instance: Instance whose OS is to be installed
703
  @rtype: boolean
704
  @return: the success of the operation
705

706
  """
707
  try:
708
    inst_os = OSFromDisk(instance.os)
709
  except errors.InvalidOS, err:
710
    os_name, os_dir, os_err = err.args
711
    if os_dir is None:
712
      return (False, "Can't find OS '%s': %s" % (os_name, os_err))
713
    else:
714
      return (False, "Error parsing OS '%s' in directory %s: %s" %
715
              (os_name, os_dir, os_err))
716

    
717
  create_env = OSEnvironment(instance)
718

    
719
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
720
                                     instance.name, int(time.time()))
721

    
722
  result = utils.RunCmd([inst_os.create_script], env=create_env,
723
                        cwd=inst_os.path, output=logfile,)
724
  if result.failed:
725
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
726
                  " output: %s", result.cmd, result.fail_reason, logfile,
727
                  result.output)
728
    lines = [utils.SafeEncode(val)
729
             for val in utils.TailFile(logfile, lines=20)]
730
    return (False, "OS create script failed (%s), last lines in the"
731
            " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
732

    
733
  return (True, "Successfully installed")
734

    
735

    
736
def RunRenameInstance(instance, old_name):
737
  """Run the OS rename script for an instance.
738

739
  @type instance: L{objects.Instance}
740
  @param instance: Instance whose OS is to be installed
741
  @type old_name: string
742
  @param old_name: previous instance name
743
  @rtype: boolean
744
  @return: the success of the operation
745

746
  """
747
  inst_os = OSFromDisk(instance.os)
748

    
749
  rename_env = OSEnvironment(instance)
750
  rename_env['OLD_INSTANCE_NAME'] = old_name
751

    
752
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
753
                                           old_name,
754
                                           instance.name, int(time.time()))
755

    
756
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
757
                        cwd=inst_os.path, output=logfile)
758

    
759
  if result.failed:
760
    logging.error("os create command '%s' returned error: %s output: %s",
761
                  result.cmd, result.fail_reason, result.output)
762
    lines = [utils.SafeEncode(val)
763
             for val in utils.TailFile(logfile, lines=20)]
764
    return (False, "OS rename script failed (%s), last lines in the"
765
            " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
766

    
767
  return (True, "Rename successful")
768

    
769

    
770
def _GetVGInfo(vg_name):
771
  """Get information about the volume group.
772

773
  @type vg_name: str
774
  @param vg_name: the volume group which we query
775
  @rtype: dict
776
  @return:
777
    A dictionary with the following keys:
778
      - C{vg_size} is the total size of the volume group in MiB
779
      - C{vg_free} is the free size of the volume group in MiB
780
      - C{pv_count} are the number of physical disks in that VG
781

782
    If an error occurs during gathering of data, we return the same dict
783
    with keys all set to None.
784

785
  """
786
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
787

    
788
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
789
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
790

    
791
  if retval.failed:
792
    logging.error("volume group %s not present", vg_name)
793
    return retdic
794
  valarr = retval.stdout.strip().rstrip(':').split(':')
795
  if len(valarr) == 3:
796
    try:
797
      retdic = {
798
        "vg_size": int(round(float(valarr[0]), 0)),
799
        "vg_free": int(round(float(valarr[1]), 0)),
800
        "pv_count": int(valarr[2]),
801
        }
802
    except ValueError, err:
803
      logging.exception("Fail to parse vgs output")
804
  else:
805
    logging.error("vgs output has the wrong number of fields (expected"
806
                  " three): %s", str(valarr))
807
  return retdic
808

    
809

    
810
def _GetBlockDevSymlinkPath(instance_name, idx):
811
  return os.path.join(constants.DISK_LINKS_DIR,
812
                      "%s:%d" % (instance_name, idx))
813

    
814

    
815
def _SymlinkBlockDev(instance_name, device_path, idx):
816
  """Set up symlinks to a instance's block device.
817

818
  This is an auxiliary function run when an instance is start (on the primary
819
  node) or when an instance is migrated (on the target node).
820

821

822
  @param instance_name: the name of the target instance
823
  @param device_path: path of the physical block device, on the node
824
  @param idx: the disk index
825
  @return: absolute path to the disk's symlink
826

827
  """
828
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
829
  try:
830
    os.symlink(device_path, link_name)
831
  except OSError, err:
832
    if err.errno == errno.EEXIST:
833
      if (not os.path.islink(link_name) or
834
          os.readlink(link_name) != device_path):
835
        os.remove(link_name)
836
        os.symlink(device_path, link_name)
837
    else:
838
      raise
839

    
840
  return link_name
841

    
842

    
843
def _RemoveBlockDevLinks(instance_name, disks):
844
  """Remove the block device symlinks belonging to the given instance.
845

846
  """
847
  for idx, disk in enumerate(disks):
848
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
849
    if os.path.islink(link_name):
850
      try:
851
        os.remove(link_name)
852
      except OSError:
853
        logging.exception("Can't remove symlink '%s'", link_name)
854

    
855

    
856
def _GatherAndLinkBlockDevs(instance):
857
  """Set up an instance's block device(s).
858

859
  This is run on the primary node at instance startup. The block
860
  devices must be already assembled.
861

862
  @type instance: L{objects.Instance}
863
  @param instance: the instance whose disks we shoul assemble
864
  @rtype: list
865
  @return: list of (disk_object, device_path)
866

867
  """
868
  block_devices = []
869
  for idx, disk in enumerate(instance.disks):
870
    device = _RecursiveFindBD(disk)
871
    if device is None:
872
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
873
                                    str(disk))
874
    device.Open()
875
    try:
876
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
877
    except OSError, e:
878
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
879
                                    e.strerror)
880

    
881
    block_devices.append((disk, link_name))
882

    
883
  return block_devices
884

    
885

    
886
def StartInstance(instance):
887
  """Start an instance.
888

889
  @type instance: L{objects.Instance}
890
  @param instance: the instance object
891
  @rtype: boolean
892
  @return: whether the startup was successful or not
893

894
  """
895
  running_instances = GetInstanceList([instance.hypervisor])
896

    
897
  if instance.name in running_instances:
898
    return (True, "Already running")
899

    
900
  try:
901
    block_devices = _GatherAndLinkBlockDevs(instance)
902
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
903
    hyper.StartInstance(instance, block_devices)
904
  except errors.BlockDeviceError, err:
905
    logging.exception("Failed to start instance")
906
    return (False, "Block device error: %s" % str(err))
907
  except errors.HypervisorError, err:
908
    logging.exception("Failed to start instance")
909
    _RemoveBlockDevLinks(instance.name, instance.disks)
910
    return (False, "Hypervisor error: %s" % str(err))
911

    
912
  return (True, "Instance started successfully")
913

    
914

    
915
def InstanceShutdown(instance):
916
  """Shut an instance down.
917

918
  @note: this functions uses polling with a hardcoded timeout.
919

920
  @type instance: L{objects.Instance}
921
  @param instance: the instance object
922
  @rtype: boolean
923
  @return: whether the startup was successful or not
924

925
  """
926
  hv_name = instance.hypervisor
927
  running_instances = GetInstanceList([hv_name])
928

    
929
  if instance.name not in running_instances:
930
    return (True, "Instance already stopped")
931

    
932
  hyper = hypervisor.GetHypervisor(hv_name)
933
  try:
934
    hyper.StopInstance(instance)
935
  except errors.HypervisorError, err:
936
    msg = "Failed to stop instance %s: %s" % (instance.name, err)
937
    logging.error(msg)
938
    return (False, msg)
939

    
940
  # test every 10secs for 2min
941

    
942
  time.sleep(1)
943
  for _ in range(11):
944
    if instance.name not in GetInstanceList([hv_name]):
945
      break
946
    time.sleep(10)
947
  else:
948
    # the shutdown did not succeed
949
    logging.error("Shutdown of '%s' unsuccessful, using destroy",
950
                  instance.name)
951

    
952
    try:
953
      hyper.StopInstance(instance, force=True)
954
    except errors.HypervisorError, err:
955
      msg = "Failed to force stop instance %s: %s" % (instance.name, err)
956
      logging.error(msg)
957
      return (False, msg)
958

    
959
    time.sleep(1)
960
    if instance.name in GetInstanceList([hv_name]):
961
      msg = ("Could not shutdown instance %s even by destroy" %
962
             instance.name)
963
      logging.error(msg)
964
      return (False, msg)
965

    
966
  _RemoveBlockDevLinks(instance.name, instance.disks)
967

    
968
  return (True, "Instance has been shutdown successfully")
969

    
970

    
971
def InstanceReboot(instance, reboot_type):
972
  """Reboot an instance.
973

974
  @type instance: L{objects.Instance}
975
  @param instance: the instance object to reboot
976
  @type reboot_type: str
977
  @param reboot_type: the type of reboot, one the following
978
    constants:
979
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
980
        instance OS, do not recreate the VM
981
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
982
        restart the VM (at the hypervisor level)
983
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
984
        not accepted here, since that mode is handled differently, in
985
        cmdlib, and translates into full stop and start of the
986
        instance (instead of a call_instance_reboot RPC)
987
  @rtype: boolean
988
  @return: the success of the operation
989

990
  """
991
  running_instances = GetInstanceList([instance.hypervisor])
992

    
993
  if instance.name not in running_instances:
994
    msg = "Cannot reboot instance %s that is not running" % instance.name
995
    logging.error(msg)
996
    return (False, msg)
997

    
998
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
999
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1000
    try:
1001
      hyper.RebootInstance(instance)
1002
    except errors.HypervisorError, err:
1003
      msg = "Failed to soft reboot instance %s: %s" % (instance.name, err)
1004
      logging.error(msg)
1005
      return (False, msg)
1006
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1007
    try:
1008
      stop_result = InstanceShutdown(instance)
1009
      if not stop_result[0]:
1010
        return stop_result
1011
      return StartInstance(instance)
1012
    except errors.HypervisorError, err:
1013
      msg = "Failed to hard reboot instance %s: %s" % (instance.name, err)
1014
      logging.error(msg)
1015
      return (False, msg)
1016
  else:
1017
    return (False, "Invalid reboot_type received: %s" % (reboot_type,))
1018

    
1019
  return (True, "Reboot successful")
1020

    
1021

    
1022
def MigrationInfo(instance):
1023
  """Gather information about an instance to be migrated.
1024

1025
  @type instance: L{objects.Instance}
1026
  @param instance: the instance definition
1027

1028
  """
1029
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1030
  try:
1031
    info = hyper.MigrationInfo(instance)
1032
  except errors.HypervisorError, err:
1033
    msg = "Failed to fetch migration information"
1034
    logging.exception(msg)
1035
    return (False, '%s: %s' % (msg, err))
1036
  return (True, info)
1037

    
1038

    
1039
def AcceptInstance(instance, info, target):
1040
  """Prepare the node to accept an instance.
1041

1042
  @type instance: L{objects.Instance}
1043
  @param instance: the instance definition
1044
  @type info: string/data (opaque)
1045
  @param info: migration information, from the source node
1046
  @type target: string
1047
  @param target: target host (usually ip), on this node
1048

1049
  """
1050
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1051
  try:
1052
    hyper.AcceptInstance(instance, info, target)
1053
  except errors.HypervisorError, err:
1054
    msg = "Failed to accept instance"
1055
    logging.exception(msg)
1056
    return (False, '%s: %s' % (msg, err))
1057
  return (True, "Accept successful")
1058

    
1059

    
1060
def FinalizeMigration(instance, info, success):
1061
  """Finalize any preparation to accept an instance.
1062

1063
  @type instance: L{objects.Instance}
1064
  @param instance: the instance definition
1065
  @type info: string/data (opaque)
1066
  @param info: migration information, from the source node
1067
  @type success: boolean
1068
  @param success: whether the migration was a success or a failure
1069

1070
  """
1071
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1072
  try:
1073
    hyper.FinalizeMigration(instance, info, success)
1074
  except errors.HypervisorError, err:
1075
    msg = "Failed to finalize migration"
1076
    logging.exception(msg)
1077
    return (False, '%s: %s' % (msg, err))
1078
  return (True, "Migration Finalized")
1079

    
1080

    
1081
def MigrateInstance(instance, target, live):
1082
  """Migrates an instance to another node.
1083

1084
  @type instance: L{objects.Instance}
1085
  @param instance: the instance definition
1086
  @type target: string
1087
  @param target: the target node name
1088
  @type live: boolean
1089
  @param live: whether the migration should be done live or not (the
1090
      interpretation of this parameter is left to the hypervisor)
1091
  @rtype: tuple
1092
  @return: a tuple of (success, msg) where:
1093
      - succes is a boolean denoting the success/failure of the operation
1094
      - msg is a string with details in case of failure
1095

1096
  """
1097
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1098

    
1099
  try:
1100
    hyper.MigrateInstance(instance.name, target, live)
1101
  except errors.HypervisorError, err:
1102
    msg = "Failed to migrate instance"
1103
    logging.exception(msg)
1104
    return (False, "%s: %s" % (msg, err))
1105
  return (True, "Migration successful")
1106

    
1107

    
1108
def BlockdevCreate(disk, size, owner, on_primary, info):
1109
  """Creates a block device for an instance.
1110

1111
  @type disk: L{objects.Disk}
1112
  @param disk: the object describing the disk we should create
1113
  @type size: int
1114
  @param size: the size of the physical underlying device, in MiB
1115
  @type owner: str
1116
  @param owner: the name of the instance for which disk is created,
1117
      used for device cache data
1118
  @type on_primary: boolean
1119
  @param on_primary:  indicates if it is the primary node or not
1120
  @type info: string
1121
  @param info: string that will be sent to the physical device
1122
      creation, used for example to set (LVM) tags on LVs
1123

1124
  @return: the new unique_id of the device (this can sometime be
1125
      computed only after creation), or None. On secondary nodes,
1126
      it's not required to return anything.
1127

1128
  """
1129
  clist = []
1130
  if disk.children:
1131
    for child in disk.children:
1132
      try:
1133
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1134
      except errors.BlockDeviceError, err:
1135
        errmsg = "Can't assemble device %s: %s" % (child, err)
1136
        logging.error(errmsg)
1137
        return False, errmsg
1138
      if on_primary or disk.AssembleOnSecondary():
1139
        # we need the children open in case the device itself has to
1140
        # be assembled
1141
        try:
1142
          crdev.Open()
1143
        except errors.BlockDeviceError, err:
1144
          errmsg = "Can't make child '%s' read-write: %s" % (child, err)
1145
          logging.error(errmsg)
1146
          return False, errmsg
1147
      clist.append(crdev)
1148

    
1149
  try:
1150
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1151
  except errors.BlockDeviceError, err:
1152
    return False, "Can't create block device: %s" % str(err)
1153

    
1154
  if on_primary or disk.AssembleOnSecondary():
1155
    try:
1156
      device.Assemble()
1157
    except errors.BlockDeviceError, err:
1158
      errmsg = ("Can't assemble device after creation, very"
1159
                " unusual event: %s" % str(err))
1160
      logging.error(errmsg)
1161
      return False, errmsg
1162
    device.SetSyncSpeed(constants.SYNC_SPEED)
1163
    if on_primary or disk.OpenOnSecondary():
1164
      try:
1165
        device.Open(force=True)
1166
      except errors.BlockDeviceError, err:
1167
        errmsg = ("Can't make device r/w after creation, very"
1168
                  " unusual event: %s" % str(err))
1169
        logging.error(errmsg)
1170
        return False, errmsg
1171
    DevCacheManager.UpdateCache(device.dev_path, owner,
1172
                                on_primary, disk.iv_name)
1173

    
1174
  device.SetInfo(info)
1175

    
1176
  physical_id = device.unique_id
1177
  return True, physical_id
1178

    
1179

    
1180
def BlockdevRemove(disk):
1181
  """Remove a block device.
1182

1183
  @note: This is intended to be called recursively.
1184

1185
  @type disk: L{objects.Disk}
1186
  @param disk: the disk object we should remove
1187
  @rtype: boolean
1188
  @return: the success of the operation
1189

1190
  """
1191
  msgs = []
1192
  result = True
1193
  try:
1194
    rdev = _RecursiveFindBD(disk)
1195
  except errors.BlockDeviceError, err:
1196
    # probably can't attach
1197
    logging.info("Can't attach to device %s in remove", disk)
1198
    rdev = None
1199
  if rdev is not None:
1200
    r_path = rdev.dev_path
1201
    try:
1202
      rdev.Remove()
1203
    except errors.BlockDeviceError, err:
1204
      msgs.append(str(err))
1205
      result = False
1206
    if result:
1207
      DevCacheManager.RemoveCache(r_path)
1208

    
1209
  if disk.children:
1210
    for child in disk.children:
1211
      c_status, c_msg = BlockdevRemove(child)
1212
      result = result and c_status
1213
      if c_msg: # not an empty message
1214
        msgs.append(c_msg)
1215

    
1216
  return (result, "; ".join(msgs))
1217

    
1218

    
1219
def _RecursiveAssembleBD(disk, owner, as_primary):
1220
  """Activate a block device for an instance.
1221

1222
  This is run on the primary and secondary nodes for an instance.
1223

1224
  @note: this function is called recursively.
1225

1226
  @type disk: L{objects.Disk}
1227
  @param disk: the disk we try to assemble
1228
  @type owner: str
1229
  @param owner: the name of the instance which owns the disk
1230
  @type as_primary: boolean
1231
  @param as_primary: if we should make the block device
1232
      read/write
1233

1234
  @return: the assembled device or None (in case no device
1235
      was assembled)
1236
  @raise errors.BlockDeviceError: in case there is an error
1237
      during the activation of the children or the device
1238
      itself
1239

1240
  """
1241
  children = []
1242
  if disk.children:
1243
    mcn = disk.ChildrenNeeded()
1244
    if mcn == -1:
1245
      mcn = 0 # max number of Nones allowed
1246
    else:
1247
      mcn = len(disk.children) - mcn # max number of Nones
1248
    for chld_disk in disk.children:
1249
      try:
1250
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1251
      except errors.BlockDeviceError, err:
1252
        if children.count(None) >= mcn:
1253
          raise
1254
        cdev = None
1255
        logging.error("Error in child activation (but continuing): %s",
1256
                      str(err))
1257
      children.append(cdev)
1258

    
1259
  if as_primary or disk.AssembleOnSecondary():
1260
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1261
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1262
    result = r_dev
1263
    if as_primary or disk.OpenOnSecondary():
1264
      r_dev.Open()
1265
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1266
                                as_primary, disk.iv_name)
1267

    
1268
  else:
1269
    result = True
1270
  return result
1271

    
1272

    
1273
def BlockdevAssemble(disk, owner, as_primary):
1274
  """Activate a block device for an instance.
1275

1276
  This is a wrapper over _RecursiveAssembleBD.
1277

1278
  @rtype: str or boolean
1279
  @return: a C{/dev/...} path for primary nodes, and
1280
      C{True} for secondary nodes
1281

1282
  """
1283
  status = True
1284
  result = "no error information"
1285
  try:
1286
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1287
    if isinstance(result, bdev.BlockDev):
1288
      result = result.dev_path
1289
  except errors.BlockDeviceError, err:
1290
    result = "Error while assembling disk: %s" % str(err)
1291
    status = False
1292
  return (status, result)
1293

    
1294

    
1295
def BlockdevShutdown(disk):
1296
  """Shut down a block device.
1297

1298
  First, if the device is assembled (Attach() is successful), then
1299
  the device is shutdown. Then the children of the device are
1300
  shutdown.
1301

1302
  This function is called recursively. Note that we don't cache the
1303
  children or such, as oppossed to assemble, shutdown of different
1304
  devices doesn't require that the upper device was active.
1305

1306
  @type disk: L{objects.Disk}
1307
  @param disk: the description of the disk we should
1308
      shutdown
1309
  @rtype: boolean
1310
  @return: the success of the operation
1311

1312
  """
1313
  msgs = []
1314
  result = True
1315
  r_dev = _RecursiveFindBD(disk)
1316
  if r_dev is not None:
1317
    r_path = r_dev.dev_path
1318
    try:
1319
      r_dev.Shutdown()
1320
      DevCacheManager.RemoveCache(r_path)
1321
    except errors.BlockDeviceError, err:
1322
      msgs.append(str(err))
1323
      result = False
1324

    
1325
  if disk.children:
1326
    for child in disk.children:
1327
      c_status, c_msg = BlockdevShutdown(child)
1328
      result = result and c_status
1329
      if c_msg: # not an empty message
1330
        msgs.append(c_msg)
1331

    
1332
  return (result, "; ".join(msgs))
1333

    
1334

    
1335
def BlockdevAddchildren(parent_cdev, new_cdevs):
1336
  """Extend a mirrored block device.
1337

1338
  @type parent_cdev: L{objects.Disk}
1339
  @param parent_cdev: the disk to which we should add children
1340
  @type new_cdevs: list of L{objects.Disk}
1341
  @param new_cdevs: the list of children which we should add
1342
  @rtype: boolean
1343
  @return: the success of the operation
1344

1345
  """
1346
  parent_bdev = _RecursiveFindBD(parent_cdev)
1347
  if parent_bdev is None:
1348
    logging.error("Can't find parent device")
1349
    return False
1350
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1351
  if new_bdevs.count(None) > 0:
1352
    logging.error("Can't find new device(s) to add: %s:%s",
1353
                  new_bdevs, new_cdevs)
1354
    return False
1355
  parent_bdev.AddChildren(new_bdevs)
1356
  return True
1357

    
1358

    
1359
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1360
  """Shrink a mirrored block device.
1361

1362
  @type parent_cdev: L{objects.Disk}
1363
  @param parent_cdev: the disk from which we should remove children
1364
  @type new_cdevs: list of L{objects.Disk}
1365
  @param new_cdevs: the list of children which we should remove
1366
  @rtype: boolean
1367
  @return: the success of the operation
1368

1369
  """
1370
  parent_bdev = _RecursiveFindBD(parent_cdev)
1371
  if parent_bdev is None:
1372
    logging.error("Can't find parent in remove children: %s", parent_cdev)
1373
    return False
1374
  devs = []
1375
  for disk in new_cdevs:
1376
    rpath = disk.StaticDevPath()
1377
    if rpath is None:
1378
      bd = _RecursiveFindBD(disk)
1379
      if bd is None:
1380
        logging.error("Can't find dynamic device %s while removing children",
1381
                      disk)
1382
        return False
1383
      else:
1384
        devs.append(bd.dev_path)
1385
    else:
1386
      devs.append(rpath)
1387
  parent_bdev.RemoveChildren(devs)
1388
  return True
1389

    
1390

    
1391
def BlockdevGetmirrorstatus(disks):
1392
  """Get the mirroring status of a list of devices.
1393

1394
  @type disks: list of L{objects.Disk}
1395
  @param disks: the list of disks which we should query
1396
  @rtype: disk
1397
  @return:
1398
      a list of (mirror_done, estimated_time) tuples, which
1399
      are the result of L{bdev.BlockDev.CombinedSyncStatus}
1400
  @raise errors.BlockDeviceError: if any of the disks cannot be
1401
      found
1402

1403
  """
1404
  stats = []
1405
  for dsk in disks:
1406
    rbd = _RecursiveFindBD(dsk)
1407
    if rbd is None:
1408
      raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1409
    stats.append(rbd.CombinedSyncStatus())
1410
  return stats
1411

    
1412

    
1413
def _RecursiveFindBD(disk):
1414
  """Check if a device is activated.
1415

1416
  If so, return information about the real device.
1417

1418
  @type disk: L{objects.Disk}
1419
  @param disk: the disk object we need to find
1420

1421
  @return: None if the device can't be found,
1422
      otherwise the device instance
1423

1424
  """
1425
  children = []
1426
  if disk.children:
1427
    for chdisk in disk.children:
1428
      children.append(_RecursiveFindBD(chdisk))
1429

    
1430
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1431

    
1432

    
1433
def BlockdevFind(disk):
1434
  """Check if a device is activated.
1435

1436
  If it is, return information about the real device.
1437

1438
  @type disk: L{objects.Disk}
1439
  @param disk: the disk to find
1440
  @rtype: None or tuple
1441
  @return: None if the disk cannot be found, otherwise a
1442
      tuple (device_path, major, minor, sync_percent,
1443
      estimated_time, is_degraded)
1444

1445
  """
1446
  try:
1447
    rbd = _RecursiveFindBD(disk)
1448
  except errors.BlockDeviceError, err:
1449
    return (False, str(err))
1450
  if rbd is None:
1451
    return (True, None)
1452
  return (True, (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus())
1453

    
1454

    
1455
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1456
  """Write a file to the filesystem.
1457

1458
  This allows the master to overwrite(!) a file. It will only perform
1459
  the operation if the file belongs to a list of configuration files.
1460

1461
  @type file_name: str
1462
  @param file_name: the target file name
1463
  @type data: str
1464
  @param data: the new contents of the file
1465
  @type mode: int
1466
  @param mode: the mode to give the file (can be None)
1467
  @type uid: int
1468
  @param uid: the owner of the file (can be -1 for default)
1469
  @type gid: int
1470
  @param gid: the group of the file (can be -1 for default)
1471
  @type atime: float
1472
  @param atime: the atime to set on the file (can be None)
1473
  @type mtime: float
1474
  @param mtime: the mtime to set on the file (can be None)
1475
  @rtype: boolean
1476
  @return: the success of the operation; errors are logged
1477
      in the node daemon log
1478

1479
  """
1480
  if not os.path.isabs(file_name):
1481
    logging.error("Filename passed to UploadFile is not absolute: '%s'",
1482
                  file_name)
1483
    return False
1484

    
1485
  if file_name not in _ALLOWED_UPLOAD_FILES:
1486
    logging.error("Filename passed to UploadFile not in allowed"
1487
                 " upload targets: '%s'", file_name)
1488
    return False
1489

    
1490
  raw_data = _Decompress(data)
1491

    
1492
  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1493
                  atime=atime, mtime=mtime)
1494
  return True
1495

    
1496

    
1497
def WriteSsconfFiles(values):
1498
  """Update all ssconf files.
1499

1500
  Wrapper around the SimpleStore.WriteFiles.
1501

1502
  """
1503
  ssconf.SimpleStore().WriteFiles(values)
1504

    
1505

    
1506
def _ErrnoOrStr(err):
1507
  """Format an EnvironmentError exception.
1508

1509
  If the L{err} argument has an errno attribute, it will be looked up
1510
  and converted into a textual C{E...} description. Otherwise the
1511
  string representation of the error will be returned.
1512

1513
  @type err: L{EnvironmentError}
1514
  @param err: the exception to format
1515

1516
  """
1517
  if hasattr(err, 'errno'):
1518
    detail = errno.errorcode[err.errno]
1519
  else:
1520
    detail = str(err)
1521
  return detail
1522

    
1523

    
1524
def _OSOndiskVersion(name, os_dir):
1525
  """Compute and return the API version of a given OS.
1526

1527
  This function will try to read the API version of the OS given by
1528
  the 'name' parameter and residing in the 'os_dir' directory.
1529

1530
  @type name: str
1531
  @param name: the OS name we should look for
1532
  @type os_dir: str
1533
  @param os_dir: the directory inwhich we should look for the OS
1534
  @rtype: int or None
1535
  @return:
1536
      Either an integer denoting the version or None in the
1537
      case when this is not a valid OS name.
1538
  @raise errors.InvalidOS: if the OS cannot be found
1539

1540
  """
1541
  api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1542

    
1543
  try:
1544
    st = os.stat(api_file)
1545
  except EnvironmentError, err:
1546
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1547
                           " found (%s)" % _ErrnoOrStr(err))
1548

    
1549
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1550
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1551
                           " a regular file")
1552

    
1553
  try:
1554
    f = open(api_file)
1555
    try:
1556
      api_versions = f.readlines()
1557
    finally:
1558
      f.close()
1559
  except EnvironmentError, err:
1560
    raise errors.InvalidOS(name, os_dir, "error while reading the"
1561
                           " API version (%s)" % _ErrnoOrStr(err))
1562

    
1563
  api_versions = [version.strip() for version in api_versions]
1564
  try:
1565
    api_versions = [int(version) for version in api_versions]
1566
  except (TypeError, ValueError), err:
1567
    raise errors.InvalidOS(name, os_dir,
1568
                           "API version is not integer (%s)" % str(err))
1569

    
1570
  return api_versions
1571

    
1572

    
1573
def DiagnoseOS(top_dirs=None):
1574
  """Compute the validity for all OSes.
1575

1576
  @type top_dirs: list
1577
  @param top_dirs: the list of directories in which to
1578
      search (if not given defaults to
1579
      L{constants.OS_SEARCH_PATH})
1580
  @rtype: list of L{objects.OS}
1581
  @return: an OS object for each name in all the given
1582
      directories
1583

1584
  """
1585
  if top_dirs is None:
1586
    top_dirs = constants.OS_SEARCH_PATH
1587

    
1588
  result = []
1589
  for dir_name in top_dirs:
1590
    if os.path.isdir(dir_name):
1591
      try:
1592
        f_names = utils.ListVisibleFiles(dir_name)
1593
      except EnvironmentError, err:
1594
        logging.exception("Can't list the OS directory %s", dir_name)
1595
        break
1596
      for name in f_names:
1597
        try:
1598
          os_inst = OSFromDisk(name, base_dir=dir_name)
1599
          result.append(os_inst)
1600
        except errors.InvalidOS, err:
1601
          result.append(objects.OS.FromInvalidOS(err))
1602

    
1603
  return result
1604

    
1605

    
1606
def OSFromDisk(name, base_dir=None):
1607
  """Create an OS instance from disk.
1608

1609
  This function will return an OS instance if the given name is a
1610
  valid OS name. Otherwise, it will raise an appropriate
1611
  L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1612

1613
  @type base_dir: string
1614
  @keyword base_dir: Base directory containing OS installations.
1615
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1616
  @rtype: L{objects.OS}
1617
  @return: the OS instance if we find a valid one
1618
  @raise errors.InvalidOS: if we don't find a valid OS
1619

1620
  """
1621
  if base_dir is None:
1622
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1623
    if os_dir is None:
1624
      raise errors.InvalidOS(name, None, "OS dir not found in search path")
1625
  else:
1626
    os_dir = os.path.sep.join([base_dir, name])
1627

    
1628
  api_versions = _OSOndiskVersion(name, os_dir)
1629

    
1630
  if constants.OS_API_VERSION not in api_versions:
1631
    raise errors.InvalidOS(name, os_dir, "API version mismatch"
1632
                           " (found %s want %s)"
1633
                           % (api_versions, constants.OS_API_VERSION))
1634

    
1635
  # OS Scripts dictionary, we will populate it with the actual script names
1636
  os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1637

    
1638
  for script in os_scripts:
1639
    os_scripts[script] = os.path.sep.join([os_dir, script])
1640

    
1641
    try:
1642
      st = os.stat(os_scripts[script])
1643
    except EnvironmentError, err:
1644
      raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1645
                             (script, _ErrnoOrStr(err)))
1646

    
1647
    if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1648
      raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1649
                             script)
1650

    
1651
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1652
      raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1653
                             script)
1654

    
1655

    
1656
  return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1657
                    create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1658
                    export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1659
                    import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1660
                    rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1661
                    api_versions=api_versions)
1662

    
1663
def OSEnvironment(instance, debug=0):
1664
  """Calculate the environment for an os script.
1665

1666
  @type instance: L{objects.Instance}
1667
  @param instance: target instance for the os script run
1668
  @type debug: integer
1669
  @param debug: debug level (0 or 1, for OS Api 10)
1670
  @rtype: dict
1671
  @return: dict of environment variables
1672
  @raise errors.BlockDeviceError: if the block device
1673
      cannot be found
1674

1675
  """
1676
  result = {}
1677
  result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1678
  result['INSTANCE_NAME'] = instance.name
1679
  result['INSTANCE_OS'] = instance.os
1680
  result['HYPERVISOR'] = instance.hypervisor
1681
  result['DISK_COUNT'] = '%d' % len(instance.disks)
1682
  result['NIC_COUNT'] = '%d' % len(instance.nics)
1683
  result['DEBUG_LEVEL'] = '%d' % debug
1684
  for idx, disk in enumerate(instance.disks):
1685
    real_disk = _RecursiveFindBD(disk)
1686
    if real_disk is None:
1687
      raise errors.BlockDeviceError("Block device '%s' is not set up" %
1688
                                    str(disk))
1689
    real_disk.Open()
1690
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
1691
    result['DISK_%d_ACCESS' % idx] = disk.mode
1692
    if constants.HV_DISK_TYPE in instance.hvparams:
1693
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
1694
        instance.hvparams[constants.HV_DISK_TYPE]
1695
    if disk.dev_type in constants.LDS_BLOCK:
1696
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1697
    elif disk.dev_type == constants.LD_FILE:
1698
      result['DISK_%d_BACKEND_TYPE' % idx] = \
1699
        'file:%s' % disk.physical_id[0]
1700
  for idx, nic in enumerate(instance.nics):
1701
    result['NIC_%d_MAC' % idx] = nic.mac
1702
    if nic.ip:
1703
      result['NIC_%d_IP' % idx] = nic.ip
1704
    result['NIC_%d_BRIDGE' % idx] = nic.bridge
1705
    if constants.HV_NIC_TYPE in instance.hvparams:
1706
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
1707
        instance.hvparams[constants.HV_NIC_TYPE]
1708

    
1709
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1710
    for key, value in source.items():
1711
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1712

    
1713
  return result
1714

    
1715
def BlockdevGrow(disk, amount):
1716
  """Grow a stack of block devices.
1717

1718
  This function is called recursively, with the childrens being the
1719
  first ones to resize.
1720

1721
  @type disk: L{objects.Disk}
1722
  @param disk: the disk to be grown
1723
  @rtype: (status, result)
1724
  @return: a tuple with the status of the operation
1725
      (True/False), and the errors message if status
1726
      is False
1727

1728
  """
1729
  r_dev = _RecursiveFindBD(disk)
1730
  if r_dev is None:
1731
    return False, "Cannot find block device %s" % (disk,)
1732

    
1733
  try:
1734
    r_dev.Grow(amount)
1735
  except errors.BlockDeviceError, err:
1736
    return False, str(err)
1737

    
1738
  return True, None
1739

    
1740

    
1741
def BlockdevSnapshot(disk):
1742
  """Create a snapshot copy of a block device.
1743

1744
  This function is called recursively, and the snapshot is actually created
1745
  just for the leaf lvm backend device.
1746

1747
  @type disk: L{objects.Disk}
1748
  @param disk: the disk to be snapshotted
1749
  @rtype: string
1750
  @return: snapshot disk path
1751

1752
  """
1753
  if disk.children:
1754
    if len(disk.children) == 1:
1755
      # only one child, let's recurse on it
1756
      return BlockdevSnapshot(disk.children[0])
1757
    else:
1758
      # more than one child, choose one that matches
1759
      for child in disk.children:
1760
        if child.size == disk.size:
1761
          # return implies breaking the loop
1762
          return BlockdevSnapshot(child)
1763
  elif disk.dev_type == constants.LD_LV:
1764
    r_dev = _RecursiveFindBD(disk)
1765
    if r_dev is not None:
1766
      # let's stay on the safe side and ask for the full size, for now
1767
      return r_dev.Snapshot(disk.size)
1768
    else:
1769
      return None
1770
  else:
1771
    raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1772
                                 " '%s' of type '%s'" %
1773
                                 (disk.unique_id, disk.dev_type))
1774

    
1775

    
1776
def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1777
  """Export a block device snapshot to a remote node.
1778

1779
  @type disk: L{objects.Disk}
1780
  @param disk: the description of the disk to export
1781
  @type dest_node: str
1782
  @param dest_node: the destination node to export to
1783
  @type instance: L{objects.Instance}
1784
  @param instance: the instance object to whom the disk belongs
1785
  @type cluster_name: str
1786
  @param cluster_name: the cluster name, needed for SSH hostalias
1787
  @type idx: int
1788
  @param idx: the index of the disk in the instance's disk list,
1789
      used to export to the OS scripts environment
1790
  @rtype: boolean
1791
  @return: the success of the operation
1792

1793
  """
1794
  export_env = OSEnvironment(instance)
1795

    
1796
  inst_os = OSFromDisk(instance.os)
1797
  export_script = inst_os.export_script
1798

    
1799
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1800
                                     instance.name, int(time.time()))
1801
  if not os.path.exists(constants.LOG_OS_DIR):
1802
    os.mkdir(constants.LOG_OS_DIR, 0750)
1803
  real_disk = _RecursiveFindBD(disk)
1804
  if real_disk is None:
1805
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1806
                                  str(disk))
1807
  real_disk.Open()
1808

    
1809
  export_env['EXPORT_DEVICE'] = real_disk.dev_path
1810
  export_env['EXPORT_INDEX'] = str(idx)
1811

    
1812
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1813
  destfile = disk.physical_id[1]
1814

    
1815
  # the target command is built out of three individual commands,
1816
  # which are joined by pipes; we check each individual command for
1817
  # valid parameters
1818
  expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1819
                               export_script, logfile)
1820

    
1821
  comprcmd = "gzip"
1822

    
1823
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1824
                                destdir, destdir, destfile)
1825
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1826
                                                   constants.GANETI_RUNAS,
1827
                                                   destcmd)
1828

    
1829
  # all commands have been checked, so we're safe to combine them
1830
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1831

    
1832
  result = utils.RunCmd(command, env=export_env)
1833

    
1834
  if result.failed:
1835
    logging.error("os snapshot export command '%s' returned error: %s"
1836
                  " output: %s", command, result.fail_reason, result.output)
1837
    return False
1838

    
1839
  return True
1840

    
1841

    
1842
def FinalizeExport(instance, snap_disks):
1843
  """Write out the export configuration information.
1844

1845
  @type instance: L{objects.Instance}
1846
  @param instance: the instance which we export, used for
1847
      saving configuration
1848
  @type snap_disks: list of L{objects.Disk}
1849
  @param snap_disks: list of snapshot block devices, which
1850
      will be used to get the actual name of the dump file
1851

1852
  @rtype: boolean
1853
  @return: the success of the operation
1854

1855
  """
1856
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1857
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1858

    
1859
  config = objects.SerializableConfigParser()
1860

    
1861
  config.add_section(constants.INISECT_EXP)
1862
  config.set(constants.INISECT_EXP, 'version', '0')
1863
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1864
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1865
  config.set(constants.INISECT_EXP, 'os', instance.os)
1866
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
1867

    
1868
  config.add_section(constants.INISECT_INS)
1869
  config.set(constants.INISECT_INS, 'name', instance.name)
1870
  config.set(constants.INISECT_INS, 'memory', '%d' %
1871
             instance.beparams[constants.BE_MEMORY])
1872
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
1873
             instance.beparams[constants.BE_VCPUS])
1874
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1875

    
1876
  nic_total = 0
1877
  for nic_count, nic in enumerate(instance.nics):
1878
    nic_total += 1
1879
    config.set(constants.INISECT_INS, 'nic%d_mac' %
1880
               nic_count, '%s' % nic.mac)
1881
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1882
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1883
               '%s' % nic.bridge)
1884
  # TODO: redundant: on load can read nics until it doesn't exist
1885
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1886

    
1887
  disk_total = 0
1888
  for disk_count, disk in enumerate(snap_disks):
1889
    if disk:
1890
      disk_total += 1
1891
      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1892
                 ('%s' % disk.iv_name))
1893
      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1894
                 ('%s' % disk.physical_id[1]))
1895
      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1896
                 ('%d' % disk.size))
1897

    
1898
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1899

    
1900
  utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1901
                  data=config.Dumps())
1902
  shutil.rmtree(finaldestdir, True)
1903
  shutil.move(destdir, finaldestdir)
1904

    
1905
  return True
1906

    
1907

    
1908
def ExportInfo(dest):
1909
  """Get export configuration information.
1910

1911
  @type dest: str
1912
  @param dest: directory containing the export
1913

1914
  @rtype: L{objects.SerializableConfigParser}
1915
  @return: a serializable config file containing the
1916
      export info
1917

1918
  """
1919
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1920

    
1921
  config = objects.SerializableConfigParser()
1922
  config.read(cff)
1923

    
1924
  if (not config.has_section(constants.INISECT_EXP) or
1925
      not config.has_section(constants.INISECT_INS)):
1926
    return None
1927

    
1928
  return config
1929

    
1930

    
1931
def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1932
  """Import an os image into an instance.
1933

1934
  @type instance: L{objects.Instance}
1935
  @param instance: instance to import the disks into
1936
  @type src_node: string
1937
  @param src_node: source node for the disk images
1938
  @type src_images: list of string
1939
  @param src_images: absolute paths of the disk images
1940
  @rtype: list of boolean
1941
  @return: each boolean represent the success of importing the n-th disk
1942

1943
  """
1944
  import_env = OSEnvironment(instance)
1945
  inst_os = OSFromDisk(instance.os)
1946
  import_script = inst_os.import_script
1947

    
1948
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1949
                                        instance.name, int(time.time()))
1950
  if not os.path.exists(constants.LOG_OS_DIR):
1951
    os.mkdir(constants.LOG_OS_DIR, 0750)
1952

    
1953
  comprcmd = "gunzip"
1954
  impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1955
                               import_script, logfile)
1956

    
1957
  final_result = []
1958
  for idx, image in enumerate(src_images):
1959
    if image:
1960
      destcmd = utils.BuildShellCmd('cat %s', image)
1961
      remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1962
                                                       constants.GANETI_RUNAS,
1963
                                                       destcmd)
1964
      command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1965
      import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1966
      import_env['IMPORT_INDEX'] = str(idx)
1967
      result = utils.RunCmd(command, env=import_env)
1968
      if result.failed:
1969
        logging.error("Disk import command '%s' returned error: %s"
1970
                      " output: %s", command, result.fail_reason,
1971
                      result.output)
1972
        final_result.append(False)
1973
      else:
1974
        final_result.append(True)
1975
    else:
1976
      final_result.append(True)
1977

    
1978
  return final_result
1979

    
1980

    
1981
def ListExports():
1982
  """Return a list of exports currently available on this machine.
1983

1984
  @rtype: list
1985
  @return: list of the exports
1986

1987
  """
1988
  if os.path.isdir(constants.EXPORT_DIR):
1989
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
1990
  else:
1991
    return []
1992

    
1993

    
1994
def RemoveExport(export):
1995
  """Remove an existing export from the node.
1996

1997
  @type export: str
1998
  @param export: the name of the export to remove
1999
  @rtype: boolean
2000
  @return: the success of the operation
2001

2002
  """
2003
  target = os.path.join(constants.EXPORT_DIR, export)
2004

    
2005
  shutil.rmtree(target)
2006
  # TODO: catch some of the relevant exceptions and provide a pretty
2007
  # error message if rmtree fails.
2008

    
2009
  return True
2010

    
2011

    
2012
def BlockdevRename(devlist):
2013
  """Rename a list of block devices.
2014

2015
  @type devlist: list of tuples
2016
  @param devlist: list of tuples of the form  (disk,
2017
      new_logical_id, new_physical_id); disk is an
2018
      L{objects.Disk} object describing the current disk,
2019
      and new logical_id/physical_id is the name we
2020
      rename it to
2021
  @rtype: boolean
2022
  @return: True if all renames succeeded, False otherwise
2023

2024
  """
2025
  result = True
2026
  for disk, unique_id in devlist:
2027
    dev = _RecursiveFindBD(disk)
2028
    if dev is None:
2029
      result = False
2030
      continue
2031
    try:
2032
      old_rpath = dev.dev_path
2033
      dev.Rename(unique_id)
2034
      new_rpath = dev.dev_path
2035
      if old_rpath != new_rpath:
2036
        DevCacheManager.RemoveCache(old_rpath)
2037
        # FIXME: we should add the new cache information here, like:
2038
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2039
        # but we don't have the owner here - maybe parse from existing
2040
        # cache? for now, we only lose lvm data when we rename, which
2041
        # is less critical than DRBD or MD
2042
    except errors.BlockDeviceError:
2043
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2044
      result = False
2045
  return result
2046

    
2047

    
2048
def _TransformFileStorageDir(file_storage_dir):
2049
  """Checks whether given file_storage_dir is valid.
2050

2051
  Checks wheter the given file_storage_dir is within the cluster-wide
2052
  default file_storage_dir stored in SimpleStore. Only paths under that
2053
  directory are allowed.
2054

2055
  @type file_storage_dir: str
2056
  @param file_storage_dir: the path to check
2057

2058
  @return: the normalized path if valid, None otherwise
2059

2060
  """
2061
  cfg = _GetConfig()
2062
  file_storage_dir = os.path.normpath(file_storage_dir)
2063
  base_file_storage_dir = cfg.GetFileStorageDir()
2064
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2065
      base_file_storage_dir):
2066
    logging.error("file storage directory '%s' is not under base file"
2067
                  " storage directory '%s'",
2068
                  file_storage_dir, base_file_storage_dir)
2069
    return None
2070
  return file_storage_dir
2071

    
2072

    
2073
def CreateFileStorageDir(file_storage_dir):
2074
  """Create file storage directory.
2075

2076
  @type file_storage_dir: str
2077
  @param file_storage_dir: directory to create
2078

2079
  @rtype: tuple
2080
  @return: tuple with first element a boolean indicating wheter dir
2081
      creation was successful or not
2082

2083
  """
2084
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2085
  result = True,
2086
  if not file_storage_dir:
2087
    result = False,
2088
  else:
2089
    if os.path.exists(file_storage_dir):
2090
      if not os.path.isdir(file_storage_dir):
2091
        logging.error("'%s' is not a directory", file_storage_dir)
2092
        result = False,
2093
    else:
2094
      try:
2095
        os.makedirs(file_storage_dir, 0750)
2096
      except OSError, err:
2097
        logging.error("Cannot create file storage directory '%s': %s",
2098
                      file_storage_dir, err)
2099
        result = False,
2100
  return result
2101

    
2102

    
2103
def RemoveFileStorageDir(file_storage_dir):
2104
  """Remove file storage directory.
2105

2106
  Remove it only if it's empty. If not log an error and return.
2107

2108
  @type file_storage_dir: str
2109
  @param file_storage_dir: the directory we should cleanup
2110
  @rtype: tuple (success,)
2111
  @return: tuple of one element, C{success}, denoting
2112
      whether the operation was successful
2113

2114
  """
2115
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2116
  result = True,
2117
  if not file_storage_dir:
2118
    result = False,
2119
  else:
2120
    if os.path.exists(file_storage_dir):
2121
      if not os.path.isdir(file_storage_dir):
2122
        logging.error("'%s' is not a directory", file_storage_dir)
2123
        result = False,
2124
      # deletes dir only if empty, otherwise we want to return False
2125
      try:
2126
        os.rmdir(file_storage_dir)
2127
      except OSError:
2128
        logging.exception("Cannot remove file storage directory '%s'",
2129
                          file_storage_dir)
2130
        result = False,
2131
  return result
2132

    
2133

    
2134
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2135
  """Rename the file storage directory.
2136

2137
  @type old_file_storage_dir: str
2138
  @param old_file_storage_dir: the current path
2139
  @type new_file_storage_dir: str
2140
  @param new_file_storage_dir: the name we should rename to
2141
  @rtype: tuple (success,)
2142
  @return: tuple of one element, C{success}, denoting
2143
      whether the operation was successful
2144

2145
  """
2146
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2147
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2148
  result = True,
2149
  if not old_file_storage_dir or not new_file_storage_dir:
2150
    result = False,
2151
  else:
2152
    if not os.path.exists(new_file_storage_dir):
2153
      if os.path.isdir(old_file_storage_dir):
2154
        try:
2155
          os.rename(old_file_storage_dir, new_file_storage_dir)
2156
        except OSError:
2157
          logging.exception("Cannot rename '%s' to '%s'",
2158
                            old_file_storage_dir, new_file_storage_dir)
2159
          result =  False,
2160
      else:
2161
        logging.error("'%s' is not a directory", old_file_storage_dir)
2162
        result = False,
2163
    else:
2164
      if os.path.exists(old_file_storage_dir):
2165
        logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
2166
                      old_file_storage_dir, new_file_storage_dir)
2167
        result = False,
2168
  return result
2169

    
2170

    
2171
def _IsJobQueueFile(file_name):
2172
  """Checks whether the given filename is in the queue directory.
2173

2174
  @type file_name: str
2175
  @param file_name: the file name we should check
2176
  @rtype: boolean
2177
  @return: whether the file is under the queue directory
2178

2179
  """
2180
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2181
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2182

    
2183
  if not result:
2184
    logging.error("'%s' is not a file in the queue directory",
2185
                  file_name)
2186

    
2187
  return result
2188

    
2189

    
2190
def JobQueueUpdate(file_name, content):
2191
  """Updates a file in the queue directory.
2192

2193
  This is just a wrapper over L{utils.WriteFile}, with proper
2194
  checking.
2195

2196
  @type file_name: str
2197
  @param file_name: the job file name
2198
  @type content: str
2199
  @param content: the new job contents
2200
  @rtype: boolean
2201
  @return: the success of the operation
2202

2203
  """
2204
  if not _IsJobQueueFile(file_name):
2205
    return False
2206

    
2207
  # Write and replace the file atomically
2208
  utils.WriteFile(file_name, data=_Decompress(content))
2209

    
2210
  return True
2211

    
2212

    
2213
def JobQueueRename(old, new):
2214
  """Renames a job queue file.
2215

2216
  This is just a wrapper over os.rename with proper checking.
2217

2218
  @type old: str
2219
  @param old: the old (actual) file name
2220
  @type new: str
2221
  @param new: the desired file name
2222
  @rtype: boolean
2223
  @return: the success of the operation
2224

2225
  """
2226
  if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2227
    return False
2228

    
2229
  utils.RenameFile(old, new, mkdir=True)
2230

    
2231
  return True
2232

    
2233

    
2234
def JobQueueSetDrainFlag(drain_flag):
2235
  """Set the drain flag for the queue.
2236

2237
  This will set or unset the queue drain flag.
2238

2239
  @type drain_flag: boolean
2240
  @param drain_flag: if True, will set the drain flag, otherwise reset it.
2241
  @rtype: boolean
2242
  @return: always True
2243
  @warning: the function always returns True
2244

2245
  """
2246
  if drain_flag:
2247
    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2248
  else:
2249
    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2250

    
2251
  return True
2252

    
2253

    
2254
def BlockdevClose(instance_name, disks):
2255
  """Closes the given block devices.
2256

2257
  This means they will be switched to secondary mode (in case of
2258
  DRBD).
2259

2260
  @param instance_name: if the argument is not empty, the symlinks
2261
      of this instance will be removed
2262
  @type disks: list of L{objects.Disk}
2263
  @param disks: the list of disks to be closed
2264
  @rtype: tuple (success, message)
2265
  @return: a tuple of success and message, where success
2266
      indicates the succes of the operation, and message
2267
      which will contain the error details in case we
2268
      failed
2269

2270
  """
2271
  bdevs = []
2272
  for cf in disks:
2273
    rd = _RecursiveFindBD(cf)
2274
    if rd is None:
2275
      return (False, "Can't find device %s" % cf)
2276
    bdevs.append(rd)
2277

    
2278
  msg = []
2279
  for rd in bdevs:
2280
    try:
2281
      rd.Close()
2282
    except errors.BlockDeviceError, err:
2283
      msg.append(str(err))
2284
  if msg:
2285
    return (False, "Can't make devices secondary: %s" % ",".join(msg))
2286
  else:
2287
    if instance_name:
2288
      _RemoveBlockDevLinks(instance_name, disks)
2289
    return (True, "All devices secondary")
2290

    
2291

    
2292
def ValidateHVParams(hvname, hvparams):
2293
  """Validates the given hypervisor parameters.
2294

2295
  @type hvname: string
2296
  @param hvname: the hypervisor name
2297
  @type hvparams: dict
2298
  @param hvparams: the hypervisor parameters to be validated
2299
  @rtype: tuple (success, message)
2300
  @return: a tuple of success and message, where success
2301
      indicates the succes of the operation, and message
2302
      which will contain the error details in case we
2303
      failed
2304

2305
  """
2306
  try:
2307
    hv_type = hypervisor.GetHypervisor(hvname)
2308
    hv_type.ValidateParameters(hvparams)
2309
    return (True, "Validation passed")
2310
  except errors.HypervisorError, err:
2311
    return (False, str(err))
2312

    
2313

    
2314
def DemoteFromMC():
2315
  """Demotes the current node from master candidate role.
2316

2317
  """
2318
  # try to ensure we're not the master by mistake
2319
  master, myself = ssconf.GetMasterAndMyself()
2320
  if master == myself:
2321
    return (False, "ssconf status shows I'm the master node, will not demote")
2322
  pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2323
  if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2324
    return (False, "The master daemon is running, will not demote")
2325
  try:
2326
    if os.path.isfile(constants.CLUSTER_CONF_FILE):
2327
      utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2328
  except EnvironmentError, err:
2329
    if err.errno != errno.ENOENT:
2330
      return (False, "Error while backing up cluster file: %s" % str(err))
2331
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2332
  return (True, "Done")
2333

    
2334

    
2335
def _FindDisks(nodes_ip, disks):
2336
  """Sets the physical ID on disks and returns the block devices.
2337

2338
  """
2339
  # set the correct physical ID
2340
  my_name = utils.HostInfo().name
2341
  for cf in disks:
2342
    cf.SetPhysicalID(my_name, nodes_ip)
2343

    
2344
  bdevs = []
2345

    
2346
  for cf in disks:
2347
    rd = _RecursiveFindBD(cf)
2348
    if rd is None:
2349
      return (False, "Can't find device %s" % cf)
2350
    bdevs.append(rd)
2351
  return (True, bdevs)
2352

    
2353

    
2354
def DrbdDisconnectNet(nodes_ip, disks):
2355
  """Disconnects the network on a list of drbd devices.
2356

2357
  """
2358
  status, bdevs = _FindDisks(nodes_ip, disks)
2359
  if not status:
2360
    return status, bdevs
2361

    
2362
  # disconnect disks
2363
  for rd in bdevs:
2364
    try:
2365
      rd.DisconnectNet()
2366
    except errors.BlockDeviceError, err:
2367
      logging.exception("Failed to go into standalone mode")
2368
      return (False, "Can't change network configuration: %s" % str(err))
2369
  return (True, "All disks are now disconnected")
2370

    
2371

    
2372
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2373
  """Attaches the network on a list of drbd devices.
2374

2375
  """
2376
  status, bdevs = _FindDisks(nodes_ip, disks)
2377
  if not status:
2378
    return status, bdevs
2379

    
2380
  if multimaster:
2381
    for idx, rd in enumerate(bdevs):
2382
      try:
2383
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2384
      except EnvironmentError, err:
2385
        return (False, "Can't create symlink: %s" % str(err))
2386
  # reconnect disks, switch to new master configuration and if
2387
  # needed primary mode
2388
  for rd in bdevs:
2389
    try:
2390
      rd.AttachNet(multimaster)
2391
    except errors.BlockDeviceError, err:
2392
      return (False, "Can't change network configuration: %s" % str(err))
2393
  # wait until the disks are connected; we need to retry the re-attach
2394
  # if the device becomes standalone, as this might happen if the one
2395
  # node disconnects and reconnects in a different mode before the
2396
  # other node reconnects; in this case, one or both of the nodes will
2397
  # decide it has wrong configuration and switch to standalone
2398
  RECONNECT_TIMEOUT = 2 * 60
2399
  sleep_time = 0.100 # start with 100 miliseconds
2400
  timeout_limit = time.time() + RECONNECT_TIMEOUT
2401
  while time.time() < timeout_limit:
2402
    all_connected = True
2403
    for rd in bdevs:
2404
      stats = rd.GetProcStatus()
2405
      if not (stats.is_connected or stats.is_in_resync):
2406
        all_connected = False
2407
      if stats.is_standalone:
2408
        # peer had different config info and this node became
2409
        # standalone, even though this should not happen with the
2410
        # new staged way of changing disk configs
2411
        try:
2412
          rd.AttachNet(multimaster)
2413
        except errors.BlockDeviceError, err:
2414
          return (False, "Can't change network configuration: %s" % str(err))
2415
    if all_connected:
2416
      break
2417
    time.sleep(sleep_time)
2418
    sleep_time = min(5, sleep_time * 1.5)
2419
  if not all_connected:
2420
    return (False, "Timeout in disk reconnecting")
2421
  if multimaster:
2422
    # change to primary mode
2423
    for rd in bdevs:
2424
      try:
2425
        rd.Open()
2426
      except errors.BlockDeviceError, err:
2427
        return (False, "Can't change to primary mode: %s" % str(err))
2428
  if multimaster:
2429
    msg = "multi-master and primary"
2430
  else:
2431
    msg = "single-master"
2432
  return (True, "Disks are now configured as %s" % msg)
2433

    
2434

    
2435
def DrbdWaitSync(nodes_ip, disks):
2436
  """Wait until DRBDs have synchronized.
2437

2438
  """
2439
  status, bdevs = _FindDisks(nodes_ip, disks)
2440
  if not status:
2441
    return status, bdevs
2442

    
2443
  min_resync = 100
2444
  alldone = True
2445
  failure = False
2446
  for rd in bdevs:
2447
    stats = rd.GetProcStatus()
2448
    if not (stats.is_connected or stats.is_in_resync):
2449
      failure = True
2450
      break
2451
    alldone = alldone and (not stats.is_in_resync)
2452
    if stats.sync_percent is not None:
2453
      min_resync = min(min_resync, stats.sync_percent)
2454
  return (not failure, (alldone, min_resync))
2455

    
2456

    
2457
class HooksRunner(object):
2458
  """Hook runner.
2459

2460
  This class is instantiated on the node side (ganeti-noded) and not
2461
  on the master side.
2462

2463
  """
2464
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2465

    
2466
  def __init__(self, hooks_base_dir=None):
2467
    """Constructor for hooks runner.
2468

2469
    @type hooks_base_dir: str or None
2470
    @param hooks_base_dir: if not None, this overrides the
2471
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
2472

2473
    """
2474
    if hooks_base_dir is None:
2475
      hooks_base_dir = constants.HOOKS_BASE_DIR
2476
    self._BASE_DIR = hooks_base_dir
2477

    
2478
  @staticmethod
2479
  def ExecHook(script, env):
2480
    """Exec one hook script.
2481

2482
    @type script: str
2483
    @param script: the full path to the script
2484
    @type env: dict
2485
    @param env: the environment with which to exec the script
2486
    @rtype: tuple (success, message)
2487
    @return: a tuple of success and message, where success
2488
        indicates the succes of the operation, and message
2489
        which will contain the error details in case we
2490
        failed
2491

2492
    """
2493
    # exec the process using subprocess and log the output
2494
    fdstdin = None
2495
    try:
2496
      fdstdin = open("/dev/null", "r")
2497
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2498
                               stderr=subprocess.STDOUT, close_fds=True,
2499
                               shell=False, cwd="/", env=env)
2500
      output = ""
2501
      try:
2502
        output = child.stdout.read(4096)
2503
        child.stdout.close()
2504
      except EnvironmentError, err:
2505
        output += "Hook script error: %s" % str(err)
2506

    
2507
      while True:
2508
        try:
2509
          result = child.wait()
2510
          break
2511
        except EnvironmentError, err:
2512
          if err.errno == errno.EINTR:
2513
            continue
2514
          raise
2515
    finally:
2516
      # try not to leak fds
2517
      for fd in (fdstdin, ):
2518
        if fd is not None:
2519
          try:
2520
            fd.close()
2521
          except EnvironmentError, err:
2522
            # just log the error
2523
            #logging.exception("Error while closing fd %s", fd)
2524
            pass
2525

    
2526
    return result == 0, utils.SafeEncode(output.strip())
2527

    
2528
  def RunHooks(self, hpath, phase, env):
2529
    """Run the scripts in the hooks directory.
2530

2531
    @type hpath: str
2532
    @param hpath: the path to the hooks directory which
2533
        holds the scripts
2534
    @type phase: str
2535
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
2536
        L{constants.HOOKS_PHASE_POST}
2537
    @type env: dict
2538
    @param env: dictionary with the environment for the hook
2539
    @rtype: list
2540
    @return: list of 3-element tuples:
2541
      - script path
2542
      - script result, either L{constants.HKR_SUCCESS} or
2543
        L{constants.HKR_FAIL}
2544
      - output of the script
2545

2546
    @raise errors.ProgrammerError: for invalid input
2547
        parameters
2548

2549
    """
2550
    if phase == constants.HOOKS_PHASE_PRE:
2551
      suffix = "pre"
2552
    elif phase == constants.HOOKS_PHASE_POST:
2553
      suffix = "post"
2554
    else:
2555
      raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2556
    rr = []
2557

    
2558
    subdir = "%s-%s.d" % (hpath, suffix)
2559
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2560
    try:
2561
      dir_contents = utils.ListVisibleFiles(dir_name)
2562
    except OSError:
2563
      # FIXME: must log output in case of failures
2564
      return rr
2565

    
2566
    # we use the standard python sort order,
2567
    # so 00name is the recommended naming scheme
2568
    dir_contents.sort()
2569
    for relname in dir_contents:
2570
      fname = os.path.join(dir_name, relname)
2571
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2572
          self.RE_MASK.match(relname) is not None):
2573
        rrval = constants.HKR_SKIP
2574
        output = ""
2575
      else:
2576
        result, output = self.ExecHook(fname, env)
2577
        if not result:
2578
          rrval = constants.HKR_FAIL
2579
        else:
2580
          rrval = constants.HKR_SUCCESS
2581
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
2582

    
2583
    return rr
2584

    
2585

    
2586
class IAllocatorRunner(object):
2587
  """IAllocator runner.
2588

2589
  This class is instantiated on the node side (ganeti-noded) and not on
2590
  the master side.
2591

2592
  """
2593
  def Run(self, name, idata):
2594
    """Run an iallocator script.
2595

2596
    @type name: str
2597
    @param name: the iallocator script name
2598
    @type idata: str
2599
    @param idata: the allocator input data
2600

2601
    @rtype: tuple
2602
    @return: four element tuple of:
2603
       - run status (one of the IARUN_ constants)
2604
       - stdout
2605
       - stderr
2606
       - fail reason (as from L{utils.RunResult})
2607

2608
    """
2609
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2610
                                  os.path.isfile)
2611
    if alloc_script is None:
2612
      return (constants.IARUN_NOTFOUND, None, None, None)
2613

    
2614
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2615
    try:
2616
      os.write(fd, idata)
2617
      os.close(fd)
2618
      result = utils.RunCmd([alloc_script, fin_name])
2619
      if result.failed:
2620
        return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2621
                result.fail_reason)
2622
    finally:
2623
      os.unlink(fin_name)
2624

    
2625
    return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2626

    
2627

    
2628
class DevCacheManager(object):
2629
  """Simple class for managing a cache of block device information.
2630

2631
  """
2632
  _DEV_PREFIX = "/dev/"
2633
  _ROOT_DIR = constants.BDEV_CACHE_DIR
2634

    
2635
  @classmethod
2636
  def _ConvertPath(cls, dev_path):
2637
    """Converts a /dev/name path to the cache file name.
2638

2639
    This replaces slashes with underscores and strips the /dev
2640
    prefix. It then returns the full path to the cache file.
2641

2642
    @type dev_path: str
2643
    @param dev_path: the C{/dev/} path name
2644
    @rtype: str
2645
    @return: the converted path name
2646

2647
    """
2648
    if dev_path.startswith(cls._DEV_PREFIX):
2649
      dev_path = dev_path[len(cls._DEV_PREFIX):]
2650
    dev_path = dev_path.replace("/", "_")
2651
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2652
    return fpath
2653

    
2654
  @classmethod
2655
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2656
    """Updates the cache information for a given device.
2657

2658
    @type dev_path: str
2659
    @param dev_path: the pathname of the device
2660
    @type owner: str
2661
    @param owner: the owner (instance name) of the device
2662
    @type on_primary: bool
2663
    @param on_primary: whether this is the primary
2664
        node nor not
2665
    @type iv_name: str
2666
    @param iv_name: the instance-visible name of the
2667
        device, as in objects.Disk.iv_name
2668

2669
    @rtype: None
2670

2671
    """
2672
    if dev_path is None:
2673
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
2674
      return
2675
    fpath = cls._ConvertPath(dev_path)
2676
    if on_primary:
2677
      state = "primary"
2678
    else:
2679
      state = "secondary"
2680
    if iv_name is None:
2681
      iv_name = "not_visible"
2682
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2683
    try:
2684
      utils.WriteFile(fpath, data=fdata)
2685
    except EnvironmentError:
2686
      logging.exception("Can't update bdev cache for %s", dev_path)
2687

    
2688
  @classmethod
2689
  def RemoveCache(cls, dev_path):
2690
    """Remove data for a dev_path.
2691

2692
    This is just a wrapper over L{utils.RemoveFile} with a converted
2693
    path name and logging.
2694

2695
    @type dev_path: str
2696
    @param dev_path: the pathname of the device
2697

2698
    @rtype: None
2699

2700
    """
2701
    if dev_path is None:
2702
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
2703
      return
2704
    fpath = cls._ConvertPath(dev_path)
2705
    try:
2706
      utils.RemoveFile(fpath)
2707
    except EnvironmentError:
2708
      logging.exception("Can't update bdev cache for %s", dev_path)