Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ b726aff0

History | View | Annotate | Download (81.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon"""
23

    
24

    
25
import os
26
import os.path
27
import shutil
28
import time
29
import stat
30
import errno
31
import re
32
import subprocess
33
import random
34
import logging
35
import tempfile
36
import zlib
37
import base64
38

    
39
from ganeti import errors
40
from ganeti import utils
41
from ganeti import ssh
42
from ganeti import hypervisor
43
from ganeti import constants
44
from ganeti import bdev
45
from ganeti import objects
46
from ganeti import ssconf
47

    
48

    
49
class RPCFail(Exception):
50
  """Class denoting RPC failure.
51

52
  Its argument is the error message.
53

54
  """
55

    
56
def _Fail(msg, *args, **kwargs):
57
  """Log an error and the raise an RPCFail exception.
58

59
  This exception is then handled specially in the ganeti daemon and
60
  turned into a 'failed' return type. As such, this function is a
61
  useful shortcut for logging the error and returning it to the master
62
  daemon.
63

64
  @type msg: string
65
  @param msg: the text of the exception
66
  @raise RPCFail
67

68
  """
69
  if args:
70
    msg = msg % args
71
  if "exc" in kwargs and kwargs["exc"]:
72
    logging.exception(msg)
73
  else:
74
    logging.error(msg)
75
  raise RPCFail(msg)
76

    
77

    
78
def _GetConfig():
79
  """Simple wrapper to return a SimpleStore.
80

81
  @rtype: L{ssconf.SimpleStore}
82
  @return: a SimpleStore instance
83

84
  """
85
  return ssconf.SimpleStore()
86

    
87

    
88
def _GetSshRunner(cluster_name):
89
  """Simple wrapper to return an SshRunner.
90

91
  @type cluster_name: str
92
  @param cluster_name: the cluster name, which is needed
93
      by the SshRunner constructor
94
  @rtype: L{ssh.SshRunner}
95
  @return: an SshRunner instance
96

97
  """
98
  return ssh.SshRunner(cluster_name)
99

    
100

    
101
def _Decompress(data):
102
  """Unpacks data compressed by the RPC client.
103

104
  @type data: list or tuple
105
  @param data: Data sent by RPC client
106
  @rtype: str
107
  @return: Decompressed data
108

109
  """
110
  assert isinstance(data, (list, tuple))
111
  assert len(data) == 2
112
  (encoding, content) = data
113
  if encoding == constants.RPC_ENCODING_NONE:
114
    return content
115
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
116
    return zlib.decompress(base64.b64decode(content))
117
  else:
118
    raise AssertionError("Unknown data encoding")
119

    
120

    
121
def _CleanDirectory(path, exclude=None):
122
  """Removes all regular files in a directory.
123

124
  @type path: str
125
  @param path: the directory to clean
126
  @type exclude: list
127
  @param exclude: list of files to be excluded, defaults
128
      to the empty list
129

130
  """
131
  if not os.path.isdir(path):
132
    return
133
  if exclude is None:
134
    exclude = []
135
  else:
136
    # Normalize excluded paths
137
    exclude = [os.path.normpath(i) for i in exclude]
138

    
139
  for rel_name in utils.ListVisibleFiles(path):
140
    full_name = os.path.normpath(os.path.join(path, rel_name))
141
    if full_name in exclude:
142
      continue
143
    if os.path.isfile(full_name) and not os.path.islink(full_name):
144
      utils.RemoveFile(full_name)
145

    
146

    
147
def JobQueuePurge():
148
  """Removes job queue files and archived jobs.
149

150
  @rtype: None
151

152
  """
153
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
154
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
155

    
156

    
157
def GetMasterInfo():
158
  """Returns master information.
159

160
  This is an utility function to compute master information, either
161
  for consumption here or from the node daemon.
162

163
  @rtype: tuple
164
  @return: (master_netdev, master_ip, master_name) if we have a good
165
      configuration, otherwise (None, None, None)
166

167
  """
168
  try:
169
    cfg = _GetConfig()
170
    master_netdev = cfg.GetMasterNetdev()
171
    master_ip = cfg.GetMasterIP()
172
    master_node = cfg.GetMasterNode()
173
  except errors.ConfigurationError, err:
174
    logging.exception("Cluster configuration incomplete")
175
    return (None, None, None)
176
  return (master_netdev, master_ip, master_node)
177

    
178

    
179
def StartMaster(start_daemons):
180
  """Activate local node as master node.
181

182
  The function will always try activate the IP address of the master
183
  (unless someone else has it). It will also start the master daemons,
184
  based on the start_daemons parameter.
185

186
  @type start_daemons: boolean
187
  @param start_daemons: whther to also start the master
188
      daemons (ganeti-masterd and ganeti-rapi)
189
  @rtype: None
190

191
  """
192
  master_netdev, master_ip, _ = GetMasterInfo()
193
  if not master_netdev:
194
    return False, "Cluster configuration incomplete, cannot read ssconf files"
195

    
196
  payload = []
197
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
198
    if utils.OwnIpAddress(master_ip):
199
      # we already have the ip:
200
      logging.debug("Master IP already configured, doing nothing")
201
    else:
202
      msg = "Someone else has the master ip, not activating"
203
      logging.error(msg)
204
      payload.append(msg)
205
  else:
206
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
207
                           "dev", master_netdev, "label",
208
                           "%s:0" % master_netdev])
209
    if result.failed:
210
      msg = "Can't activate master IP: %s" % result.output
211
      logging.error(msg)
212
      payload.append(msg)
213

    
214
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
215
                           "-s", master_ip, master_ip])
216
    # we'll ignore the exit code of arping
217

    
218
  # and now start the master and rapi daemons
219
  if start_daemons:
220
    for daemon in 'ganeti-masterd', 'ganeti-rapi':
221
      result = utils.RunCmd([daemon])
222
      if result.failed:
223
        msg = "Can't start daemon %s: %s" % (daemon, result.output)
224
        logging.error(msg)
225
        payload.append(msg)
226

    
227
  return not payload, "; ".join(payload)
228

    
229

    
230
def StopMaster(stop_daemons):
231
  """Deactivate this node as master.
232

233
  The function will always try to deactivate the IP address of the
234
  master. It will also stop the master daemons depending on the
235
  stop_daemons parameter.
236

237
  @type stop_daemons: boolean
238
  @param stop_daemons: whether to also stop the master daemons
239
      (ganeti-masterd and ganeti-rapi)
240
  @rtype: None
241

242
  """
243
  master_netdev, master_ip, _ = GetMasterInfo()
244
  if not master_netdev:
245
    return False
246

    
247
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
248
                         "dev", master_netdev])
249
  if result.failed:
250
    logging.error("Can't remove the master IP, error: %s", result.output)
251
    # but otherwise ignore the failure
252

    
253
  if stop_daemons:
254
    # stop/kill the rapi and the master daemon
255
    for daemon in constants.RAPI_PID, constants.MASTERD_PID:
256
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
257

    
258
  return True
259

    
260

    
261
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
262
  """Joins this node to the cluster.
263

264
  This does the following:
265
      - updates the hostkeys of the machine (rsa and dsa)
266
      - adds the ssh private key to the user
267
      - adds the ssh public key to the users' authorized_keys file
268

269
  @type dsa: str
270
  @param dsa: the DSA private key to write
271
  @type dsapub: str
272
  @param dsapub: the DSA public key to write
273
  @type rsa: str
274
  @param rsa: the RSA private key to write
275
  @type rsapub: str
276
  @param rsapub: the RSA public key to write
277
  @type sshkey: str
278
  @param sshkey: the SSH private key to write
279
  @type sshpub: str
280
  @param sshpub: the SSH public key to write
281
  @rtype: boolean
282
  @return: the success of the operation
283

284
  """
285
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
286
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
287
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
288
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
289
  for name, content, mode in sshd_keys:
290
    utils.WriteFile(name, data=content, mode=mode)
291

    
292
  try:
293
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
294
                                                    mkdir=True)
295
  except errors.OpExecError, err:
296
    _Fail("Error while processing user ssh files: %s", err, exc=True)
297

    
298
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
299
    utils.WriteFile(name, data=content, mode=0600)
300

    
301
  utils.AddAuthorizedKey(auth_keys, sshpub)
302

    
303
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
304

    
305
  return (True, "Node added successfully")
306

    
307

    
308
def LeaveCluster():
309
  """Cleans up and remove the current node.
310

311
  This function cleans up and prepares the current node to be removed
312
  from the cluster.
313

314
  If processing is successful, then it raises an
315
  L{errors.QuitGanetiException} which is used as a special case to
316
  shutdown the node daemon.
317

318
  """
319
  _CleanDirectory(constants.DATA_DIR)
320
  JobQueuePurge()
321

    
322
  try:
323
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
324
  except errors.OpExecError:
325
    logging.exception("Error while processing ssh files")
326
    return
327

    
328
  f = open(pub_key, 'r')
329
  try:
330
    utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
331
  finally:
332
    f.close()
333

    
334
  utils.RemoveFile(priv_key)
335
  utils.RemoveFile(pub_key)
336

    
337
  # Return a reassuring string to the caller, and quit
338
  raise errors.QuitGanetiException(False, 'Shutdown scheduled')
339

    
340

    
341
def GetNodeInfo(vgname, hypervisor_type):
342
  """Gives back a hash with different informations about the node.
343

344
  @type vgname: C{string}
345
  @param vgname: the name of the volume group to ask for disk space information
346
  @type hypervisor_type: C{str}
347
  @param hypervisor_type: the name of the hypervisor to ask for
348
      memory information
349
  @rtype: C{dict}
350
  @return: dictionary with the following keys:
351
      - vg_size is the size of the configured volume group in MiB
352
      - vg_free is the free size of the volume group in MiB
353
      - memory_dom0 is the memory allocated for domain0 in MiB
354
      - memory_free is the currently available (free) ram in MiB
355
      - memory_total is the total number of ram in MiB
356

357
  """
358
  outputarray = {}
359
  vginfo = _GetVGInfo(vgname)
360
  outputarray['vg_size'] = vginfo['vg_size']
361
  outputarray['vg_free'] = vginfo['vg_free']
362

    
363
  hyper = hypervisor.GetHypervisor(hypervisor_type)
364
  hyp_info = hyper.GetNodeInfo()
365
  if hyp_info is not None:
366
    outputarray.update(hyp_info)
367

    
368
  f = open("/proc/sys/kernel/random/boot_id", 'r')
369
  try:
370
    outputarray["bootid"] = f.read(128).rstrip("\n")
371
  finally:
372
    f.close()
373

    
374
  return True, outputarray
375

    
376

    
377
def VerifyNode(what, cluster_name):
378
  """Verify the status of the local node.
379

380
  Based on the input L{what} parameter, various checks are done on the
381
  local node.
382

383
  If the I{filelist} key is present, this list of
384
  files is checksummed and the file/checksum pairs are returned.
385

386
  If the I{nodelist} key is present, we check that we have
387
  connectivity via ssh with the target nodes (and check the hostname
388
  report).
389

390
  If the I{node-net-test} key is present, we check that we have
391
  connectivity to the given nodes via both primary IP and, if
392
  applicable, secondary IPs.
393

394
  @type what: C{dict}
395
  @param what: a dictionary of things to check:
396
      - filelist: list of files for which to compute checksums
397
      - nodelist: list of nodes we should check ssh communication with
398
      - node-net-test: list of nodes we should check node daemon port
399
        connectivity with
400
      - hypervisor: list with hypervisors to run the verify for
401
  @rtype: dict
402
  @return: a dictionary with the same keys as the input dict, and
403
      values representing the result of the checks
404

405
  """
406
  result = {}
407

    
408
  if constants.NV_HYPERVISOR in what:
409
    result[constants.NV_HYPERVISOR] = tmp = {}
410
    for hv_name in what[constants.NV_HYPERVISOR]:
411
      tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
412

    
413
  if constants.NV_FILELIST in what:
414
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
415
      what[constants.NV_FILELIST])
416

    
417
  if constants.NV_NODELIST in what:
418
    result[constants.NV_NODELIST] = tmp = {}
419
    random.shuffle(what[constants.NV_NODELIST])
420
    for node in what[constants.NV_NODELIST]:
421
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
422
      if not success:
423
        tmp[node] = message
424

    
425
  if constants.NV_NODENETTEST in what:
426
    result[constants.NV_NODENETTEST] = tmp = {}
427
    my_name = utils.HostInfo().name
428
    my_pip = my_sip = None
429
    for name, pip, sip in what[constants.NV_NODENETTEST]:
430
      if name == my_name:
431
        my_pip = pip
432
        my_sip = sip
433
        break
434
    if not my_pip:
435
      tmp[my_name] = ("Can't find my own primary/secondary IP"
436
                      " in the node list")
437
    else:
438
      port = utils.GetNodeDaemonPort()
439
      for name, pip, sip in what[constants.NV_NODENETTEST]:
440
        fail = []
441
        if not utils.TcpPing(pip, port, source=my_pip):
442
          fail.append("primary")
443
        if sip != pip:
444
          if not utils.TcpPing(sip, port, source=my_sip):
445
            fail.append("secondary")
446
        if fail:
447
          tmp[name] = ("failure using the %s interface(s)" %
448
                       " and ".join(fail))
449

    
450
  if constants.NV_LVLIST in what:
451
    result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
452

    
453
  if constants.NV_INSTANCELIST in what:
454
    result[constants.NV_INSTANCELIST] = GetInstanceList(
455
      what[constants.NV_INSTANCELIST])
456

    
457
  if constants.NV_VGLIST in what:
458
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
459

    
460
  if constants.NV_VERSION in what:
461
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
462
                                    constants.RELEASE_VERSION)
463

    
464
  if constants.NV_HVINFO in what:
465
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
466
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
467

    
468
  if constants.NV_DRBDLIST in what:
469
    try:
470
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
471
    except errors.BlockDeviceError, err:
472
      logging.warning("Can't get used minors list", exc_info=True)
473
      used_minors = str(err)
474
    result[constants.NV_DRBDLIST] = used_minors
475

    
476
  return True, result
477

    
478

    
479
def GetVolumeList(vg_name):
480
  """Compute list of logical volumes and their size.
481

482
  @type vg_name: str
483
  @param vg_name: the volume group whose LVs we should list
484
  @rtype: dict
485
  @return:
486
      dictionary of all partions (key) with value being a tuple of
487
      their size (in MiB), inactive and online status::
488

489
        {'test1': ('20.06', True, True)}
490

491
      in case of errors, a string is returned with the error
492
      details.
493

494
  """
495
  lvs = {}
496
  sep = '|'
497
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
498
                         "--separator=%s" % sep,
499
                         "-olv_name,lv_size,lv_attr", vg_name])
500
  if result.failed:
501
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
502

    
503
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
504
  for line in result.stdout.splitlines():
505
    line = line.strip()
506
    match = valid_line_re.match(line)
507
    if not match:
508
      logging.error("Invalid line returned from lvs output: '%s'", line)
509
      continue
510
    name, size, attr = match.groups()
511
    inactive = attr[4] == '-'
512
    online = attr[5] == 'o'
513
    lvs[name] = (size, inactive, online)
514

    
515
  return lvs
516

    
517

    
518
def ListVolumeGroups():
519
  """List the volume groups and their size.
520

521
  @rtype: dict
522
  @return: dictionary with keys volume name and values the
523
      size of the volume
524

525
  """
526
  return True, utils.ListVolumeGroups()
527

    
528

    
529
def NodeVolumes():
530
  """List all volumes on this node.
531

532
  @rtype: list
533
  @return:
534
    A list of dictionaries, each having four keys:
535
      - name: the logical volume name,
536
      - size: the size of the logical volume
537
      - dev: the physical device on which the LV lives
538
      - vg: the volume group to which it belongs
539

540
    In case of errors, we return an empty list and log the
541
    error.
542

543
    Note that since a logical volume can live on multiple physical
544
    volumes, the resulting list might include a logical volume
545
    multiple times.
546

547
  """
548
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
549
                         "--separator=|",
550
                         "--options=lv_name,lv_size,devices,vg_name"])
551
  if result.failed:
552
    logging.error("Failed to list logical volumes, lvs output: %s",
553
                  result.output)
554
    return []
555

    
556
  def parse_dev(dev):
557
    if '(' in dev:
558
      return dev.split('(')[0]
559
    else:
560
      return dev
561

    
562
  def map_line(line):
563
    return {
564
      'name': line[0].strip(),
565
      'size': line[1].strip(),
566
      'dev': parse_dev(line[2].strip()),
567
      'vg': line[3].strip(),
568
    }
569

    
570
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
571
          if line.count('|') >= 3]
572

    
573

    
574
def BridgesExist(bridges_list):
575
  """Check if a list of bridges exist on the current node.
576

577
  @rtype: boolean
578
  @return: C{True} if all of them exist, C{False} otherwise
579

580
  """
581
  missing = []
582
  for bridge in bridges_list:
583
    if not utils.BridgeExists(bridge):
584
      missing.append(bridge)
585

    
586
  if missing:
587
    return False, "Missing bridges %s" % (", ".join(missing),)
588

    
589
  return True, None
590

    
591

    
592
def GetInstanceList(hypervisor_list):
593
  """Provides a list of instances.
594

595
  @type hypervisor_list: list
596
  @param hypervisor_list: the list of hypervisors to query information
597

598
  @rtype: list
599
  @return: a list of all running instances on the current node
600
    - instance1.example.com
601
    - instance2.example.com
602

603
  """
604
  results = []
605
  for hname in hypervisor_list:
606
    try:
607
      names = hypervisor.GetHypervisor(hname).ListInstances()
608
      results.extend(names)
609
    except errors.HypervisorError, err:
610
      _Fail("Error enumerating instances (hypervisor %s): %s",
611
            hname, err, exc=True)
612

    
613
  return results
614

    
615

    
616
def GetInstanceInfo(instance, hname):
617
  """Gives back the informations about an instance as a dictionary.
618

619
  @type instance: string
620
  @param instance: the instance name
621
  @type hname: string
622
  @param hname: the hypervisor type of the instance
623

624
  @rtype: dict
625
  @return: dictionary with the following keys:
626
      - memory: memory size of instance (int)
627
      - state: xen state of instance (string)
628
      - time: cpu time of instance (float)
629

630
  """
631
  output = {}
632

    
633
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
634
  if iinfo is not None:
635
    output['memory'] = iinfo[2]
636
    output['state'] = iinfo[4]
637
    output['time'] = iinfo[5]
638

    
639
  return True, output
640

    
641

    
642
def GetInstanceMigratable(instance):
643
  """Gives whether an instance can be migrated.
644

645
  @type instance: L{objects.Instance}
646
  @param instance: object representing the instance to be checked.
647

648
  @rtype: tuple
649
  @return: tuple of (result, description) where:
650
      - result: whether the instance can be migrated or not
651
      - description: a description of the issue, if relevant
652

653
  """
654
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
655
  if instance.name not in hyper.ListInstances():
656
    return (False, 'not running')
657

    
658
  for idx in range(len(instance.disks)):
659
    link_name = _GetBlockDevSymlinkPath(instance.name, idx)
660
    if not os.path.islink(link_name):
661
      return (False, 'not restarted since ganeti 1.2.5')
662

    
663
  return (True, '')
664

    
665

    
666
def GetAllInstancesInfo(hypervisor_list):
667
  """Gather data about all instances.
668

669
  This is the equivalent of L{GetInstanceInfo}, except that it
670
  computes data for all instances at once, thus being faster if one
671
  needs data about more than one instance.
672

673
  @type hypervisor_list: list
674
  @param hypervisor_list: list of hypervisors to query for instance data
675

676
  @rtype: dict
677
  @return: dictionary of instance: data, with data having the following keys:
678
      - memory: memory size of instance (int)
679
      - state: xen state of instance (string)
680
      - time: cpu time of instance (float)
681
      - vcpus: the number of vcpus
682

683
  """
684
  output = {}
685

    
686
  for hname in hypervisor_list:
687
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
688
    if iinfo:
689
      for name, inst_id, memory, vcpus, state, times in iinfo:
690
        value = {
691
          'memory': memory,
692
          'vcpus': vcpus,
693
          'state': state,
694
          'time': times,
695
          }
696
        if name in output:
697
          # we only check static parameters, like memory and vcpus,
698
          # and not state and time which can change between the
699
          # invocations of the different hypervisors
700
          for key in 'memory', 'vcpus':
701
            if value[key] != output[name][key]:
702
              _Fail("Instance %s is running twice"
703
                    " with different parameters", name)
704
        output[name] = value
705

    
706
  return True, output
707

    
708

    
709
def InstanceOsAdd(instance, reinstall):
710
  """Add an OS to an instance.
711

712
  @type instance: L{objects.Instance}
713
  @param instance: Instance whose OS is to be installed
714
  @type reinstall: boolean
715
  @param reinstall: whether this is an instance reinstall
716
  @rtype: boolean
717
  @return: the success of the operation
718

719
  """
720
  try:
721
    inst_os = OSFromDisk(instance.os)
722
  except errors.InvalidOS, err:
723
    os_name, os_dir, os_err = err.args
724
    if os_dir is None:
725
      return (False, "Can't find OS '%s': %s" % (os_name, os_err))
726
    else:
727
      return (False, "Error parsing OS '%s' in directory %s: %s" %
728
              (os_name, os_dir, os_err))
729

    
730
  create_env = OSEnvironment(instance)
731
  if reinstall:
732
    create_env['INSTANCE_REINSTALL'] = "1"
733

    
734
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
735
                                     instance.name, int(time.time()))
736

    
737
  result = utils.RunCmd([inst_os.create_script], env=create_env,
738
                        cwd=inst_os.path, output=logfile,)
739
  if result.failed:
740
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
741
                  " output: %s", result.cmd, result.fail_reason, logfile,
742
                  result.output)
743
    lines = [utils.SafeEncode(val)
744
             for val in utils.TailFile(logfile, lines=20)]
745
    return (False, "OS create script failed (%s), last lines in the"
746
            " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
747

    
748
  return (True, "Successfully installed")
749

    
750

    
751
def RunRenameInstance(instance, old_name):
752
  """Run the OS rename script for an instance.
753

754
  @type instance: L{objects.Instance}
755
  @param instance: Instance whose OS is to be installed
756
  @type old_name: string
757
  @param old_name: previous instance name
758
  @rtype: boolean
759
  @return: the success of the operation
760

761
  """
762
  inst_os = OSFromDisk(instance.os)
763

    
764
  rename_env = OSEnvironment(instance)
765
  rename_env['OLD_INSTANCE_NAME'] = old_name
766

    
767
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
768
                                           old_name,
769
                                           instance.name, int(time.time()))
770

    
771
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
772
                        cwd=inst_os.path, output=logfile)
773

    
774
  if result.failed:
775
    logging.error("os create command '%s' returned error: %s output: %s",
776
                  result.cmd, result.fail_reason, result.output)
777
    lines = [utils.SafeEncode(val)
778
             for val in utils.TailFile(logfile, lines=20)]
779
    return (False, "OS rename script failed (%s), last lines in the"
780
            " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
781

    
782
  return (True, "Rename successful")
783

    
784

    
785
def _GetVGInfo(vg_name):
786
  """Get informations about the volume group.
787

788
  @type vg_name: str
789
  @param vg_name: the volume group which we query
790
  @rtype: dict
791
  @return:
792
    A dictionary with the following keys:
793
      - C{vg_size} is the total size of the volume group in MiB
794
      - C{vg_free} is the free size of the volume group in MiB
795
      - C{pv_count} are the number of physical disks in that VG
796

797
    If an error occurs during gathering of data, we return the same dict
798
    with keys all set to None.
799

800
  """
801
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
802

    
803
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
804
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
805

    
806
  if retval.failed:
807
    logging.error("volume group %s not present", vg_name)
808
    return retdic
809
  valarr = retval.stdout.strip().rstrip(':').split(':')
810
  if len(valarr) == 3:
811
    try:
812
      retdic = {
813
        "vg_size": int(round(float(valarr[0]), 0)),
814
        "vg_free": int(round(float(valarr[1]), 0)),
815
        "pv_count": int(valarr[2]),
816
        }
817
    except ValueError, err:
818
      logging.exception("Fail to parse vgs output")
819
  else:
820
    logging.error("vgs output has the wrong number of fields (expected"
821
                  " three): %s", str(valarr))
822
  return retdic
823

    
824

    
825
def _GetBlockDevSymlinkPath(instance_name, idx):
826
  return os.path.join(constants.DISK_LINKS_DIR,
827
                      "%s:%d" % (instance_name, idx))
828

    
829

    
830
def _SymlinkBlockDev(instance_name, device_path, idx):
831
  """Set up symlinks to a instance's block device.
832

833
  This is an auxiliary function run when an instance is start (on the primary
834
  node) or when an instance is migrated (on the target node).
835

836

837
  @param instance_name: the name of the target instance
838
  @param device_path: path of the physical block device, on the node
839
  @param idx: the disk index
840
  @return: absolute path to the disk's symlink
841

842
  """
843
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
844
  try:
845
    os.symlink(device_path, link_name)
846
  except OSError, err:
847
    if err.errno == errno.EEXIST:
848
      if (not os.path.islink(link_name) or
849
          os.readlink(link_name) != device_path):
850
        os.remove(link_name)
851
        os.symlink(device_path, link_name)
852
    else:
853
      raise
854

    
855
  return link_name
856

    
857

    
858
def _RemoveBlockDevLinks(instance_name, disks):
859
  """Remove the block device symlinks belonging to the given instance.
860

861
  """
862
  for idx, disk in enumerate(disks):
863
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
864
    if os.path.islink(link_name):
865
      try:
866
        os.remove(link_name)
867
      except OSError:
868
        logging.exception("Can't remove symlink '%s'", link_name)
869

    
870

    
871
def _GatherAndLinkBlockDevs(instance):
872
  """Set up an instance's block device(s).
873

874
  This is run on the primary node at instance startup. The block
875
  devices must be already assembled.
876

877
  @type instance: L{objects.Instance}
878
  @param instance: the instance whose disks we shoul assemble
879
  @rtype: list
880
  @return: list of (disk_object, device_path)
881

882
  """
883
  block_devices = []
884
  for idx, disk in enumerate(instance.disks):
885
    device = _RecursiveFindBD(disk)
886
    if device is None:
887
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
888
                                    str(disk))
889
    device.Open()
890
    try:
891
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
892
    except OSError, e:
893
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
894
                                    e.strerror)
895

    
896
    block_devices.append((disk, link_name))
897

    
898
  return block_devices
899

    
900

    
901
def StartInstance(instance):
902
  """Start an instance.
903

904
  @type instance: L{objects.Instance}
905
  @param instance: the instance object
906
  @rtype: boolean
907
  @return: whether the startup was successful or not
908

909
  """
910
  running_instances = GetInstanceList([instance.hypervisor])
911

    
912
  if instance.name in running_instances:
913
    return (True, "Already running")
914

    
915
  try:
916
    block_devices = _GatherAndLinkBlockDevs(instance)
917
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
918
    hyper.StartInstance(instance, block_devices)
919
  except errors.BlockDeviceError, err:
920
    _Fail("Block device error: %s", err, exc=True)
921
  except errors.HypervisorError, err:
922
    _RemoveBlockDevLinks(instance.name, instance.disks)
923
    _Fail("Hypervisor error: %s", err, exc=True)
924

    
925
  return (True, "Instance started successfully")
926

    
927

    
928
def InstanceShutdown(instance):
929
  """Shut an instance down.
930

931
  @note: this functions uses polling with a hardcoded timeout.
932

933
  @type instance: L{objects.Instance}
934
  @param instance: the instance object
935
  @rtype: boolean
936
  @return: whether the startup was successful or not
937

938
  """
939
  hv_name = instance.hypervisor
940
  running_instances = GetInstanceList([hv_name])
941

    
942
  if instance.name not in running_instances:
943
    return (True, "Instance already stopped")
944

    
945
  hyper = hypervisor.GetHypervisor(hv_name)
946
  try:
947
    hyper.StopInstance(instance)
948
  except errors.HypervisorError, err:
949
    _Fail("Failed to stop instance %s: %s", instance.name, err)
950

    
951
  # test every 10secs for 2min
952

    
953
  time.sleep(1)
954
  for dummy in range(11):
955
    if instance.name not in GetInstanceList([hv_name]):
956
      break
957
    time.sleep(10)
958
  else:
959
    # the shutdown did not succeed
960
    logging.error("Shutdown of '%s' unsuccessful, using destroy",
961
                  instance.name)
962

    
963
    try:
964
      hyper.StopInstance(instance, force=True)
965
    except errors.HypervisorError, err:
966
      _Fail("Failed to force stop instance %s: %s", instance.name, err)
967

    
968
    time.sleep(1)
969
    if instance.name in GetInstanceList([hv_name]):
970
      _Fail("Could not shutdown instance %s even by destroy", instance.name)
971

    
972
  _RemoveBlockDevLinks(instance.name, instance.disks)
973

    
974
  return (True, "Instance has been shutdown successfully")
975

    
976

    
977
def InstanceReboot(instance, reboot_type):
978
  """Reboot an instance.
979

980
  @type instance: L{objects.Instance}
981
  @param instance: the instance object to reboot
982
  @type reboot_type: str
983
  @param reboot_type: the type of reboot, one the following
984
    constants:
985
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
986
        instance OS, do not recreate the VM
987
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
988
        restart the VM (at the hypervisor level)
989
      - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
990
        is not accepted here, since that mode is handled
991
        differently
992
  @rtype: boolean
993
  @return: the success of the operation
994

995
  """
996
  running_instances = GetInstanceList([instance.hypervisor])
997

    
998
  if instance.name not in running_instances:
999
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1000

    
1001
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1002
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1003
    try:
1004
      hyper.RebootInstance(instance)
1005
    except errors.HypervisorError, err:
1006
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1007
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1008
    try:
1009
      stop_result = InstanceShutdown(instance)
1010
      if not stop_result[0]:
1011
        return stop_result
1012
      return StartInstance(instance)
1013
    except errors.HypervisorError, err:
1014
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1015
  else:
1016
    _Fail("Invalid reboot_type received: %s", reboot_type)
1017

    
1018
  return (True, "Reboot successful")
1019

    
1020

    
1021
def MigrationInfo(instance):
1022
  """Gather information about an instance to be migrated.
1023

1024
  @type instance: L{objects.Instance}
1025
  @param instance: the instance definition
1026

1027
  """
1028
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1029
  try:
1030
    info = hyper.MigrationInfo(instance)
1031
  except errors.HypervisorError, err:
1032
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1033
  return (True, info)
1034

    
1035

    
1036
def AcceptInstance(instance, info, target):
1037
  """Prepare the node to accept an instance.
1038

1039
  @type instance: L{objects.Instance}
1040
  @param instance: the instance definition
1041
  @type info: string/data (opaque)
1042
  @param info: migration information, from the source node
1043
  @type target: string
1044
  @param target: target host (usually ip), on this node
1045

1046
  """
1047
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1048
  try:
1049
    hyper.AcceptInstance(instance, info, target)
1050
  except errors.HypervisorError, err:
1051
    _Fail("Failed to accept instance: %s", err, exc=True)
1052
  return (True, "Accept successfull")
1053

    
1054

    
1055
def FinalizeMigration(instance, info, success):
1056
  """Finalize any preparation to accept an instance.
1057

1058
  @type instance: L{objects.Instance}
1059
  @param instance: the instance definition
1060
  @type info: string/data (opaque)
1061
  @param info: migration information, from the source node
1062
  @type success: boolean
1063
  @param success: whether the migration was a success or a failure
1064

1065
  """
1066
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1067
  try:
1068
    hyper.FinalizeMigration(instance, info, success)
1069
  except errors.HypervisorError, err:
1070
    _Fail("Failed to finalize migration: %s", err, exc=True)
1071
  return (True, "Migration Finalized")
1072

    
1073

    
1074
def MigrateInstance(instance, target, live):
1075
  """Migrates an instance to another node.
1076

1077
  @type instance: L{objects.Instance}
1078
  @param instance: the instance definition
1079
  @type target: string
1080
  @param target: the target node name
1081
  @type live: boolean
1082
  @param live: whether the migration should be done live or not (the
1083
      interpretation of this parameter is left to the hypervisor)
1084
  @rtype: tuple
1085
  @return: a tuple of (success, msg) where:
1086
      - succes is a boolean denoting the success/failure of the operation
1087
      - msg is a string with details in case of failure
1088

1089
  """
1090
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1091

    
1092
  try:
1093
    hyper.MigrateInstance(instance.name, target, live)
1094
  except errors.HypervisorError, err:
1095
    _Fail("Failed to migrate instance: %s", err, exc=True)
1096
  return (True, "Migration successfull")
1097

    
1098

    
1099
def BlockdevCreate(disk, size, owner, on_primary, info):
1100
  """Creates a block device for an instance.
1101

1102
  @type disk: L{objects.Disk}
1103
  @param disk: the object describing the disk we should create
1104
  @type size: int
1105
  @param size: the size of the physical underlying device, in MiB
1106
  @type owner: str
1107
  @param owner: the name of the instance for which disk is created,
1108
      used for device cache data
1109
  @type on_primary: boolean
1110
  @param on_primary:  indicates if it is the primary node or not
1111
  @type info: string
1112
  @param info: string that will be sent to the physical device
1113
      creation, used for example to set (LVM) tags on LVs
1114

1115
  @return: the new unique_id of the device (this can sometime be
1116
      computed only after creation), or None. On secondary nodes,
1117
      it's not required to return anything.
1118

1119
  """
1120
  clist = []
1121
  if disk.children:
1122
    for child in disk.children:
1123
      try:
1124
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1125
      except errors.BlockDeviceError, err:
1126
        _Fail("Can't assemble device %s: %s", child, err)
1127
      if on_primary or disk.AssembleOnSecondary():
1128
        # we need the children open in case the device itself has to
1129
        # be assembled
1130
        try:
1131
          crdev.Open()
1132
        except errors.BlockDeviceError, err:
1133
          _Fail("Can't make child '%s' read-write: %s", child, err)
1134
      clist.append(crdev)
1135

    
1136
  try:
1137
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, size)
1138
  except errors.BlockDeviceError, err:
1139
    _Fail("Can't create block device: %s", err)
1140

    
1141
  if on_primary or disk.AssembleOnSecondary():
1142
    try:
1143
      device.Assemble()
1144
    except errors.BlockDeviceError, err:
1145
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1146
    device.SetSyncSpeed(constants.SYNC_SPEED)
1147
    if on_primary or disk.OpenOnSecondary():
1148
      try:
1149
        device.Open(force=True)
1150
      except errors.BlockDeviceError, err:
1151
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1152
    DevCacheManager.UpdateCache(device.dev_path, owner,
1153
                                on_primary, disk.iv_name)
1154

    
1155
  device.SetInfo(info)
1156

    
1157
  physical_id = device.unique_id
1158
  return True, physical_id
1159

    
1160

    
1161
def BlockdevRemove(disk):
1162
  """Remove a block device.
1163

1164
  @note: This is intended to be called recursively.
1165

1166
  @type disk: L{objects.Disk}
1167
  @param disk: the disk object we should remove
1168
  @rtype: boolean
1169
  @return: the success of the operation
1170

1171
  """
1172
  msgs = []
1173
  result = True
1174
  try:
1175
    rdev = _RecursiveFindBD(disk)
1176
  except errors.BlockDeviceError, err:
1177
    # probably can't attach
1178
    logging.info("Can't attach to device %s in remove", disk)
1179
    rdev = None
1180
  if rdev is not None:
1181
    r_path = rdev.dev_path
1182
    try:
1183
      rdev.Remove()
1184
    except errors.BlockDeviceError, err:
1185
      msgs.append(str(err))
1186
      result = False
1187
    if result:
1188
      DevCacheManager.RemoveCache(r_path)
1189

    
1190
  if disk.children:
1191
    for child in disk.children:
1192
      c_status, c_msg = BlockdevRemove(child)
1193
      result = result and c_status
1194
      if c_msg: # not an empty message
1195
        msgs.append(c_msg)
1196

    
1197
  return (result, "; ".join(msgs))
1198

    
1199

    
1200
def _RecursiveAssembleBD(disk, owner, as_primary):
1201
  """Activate a block device for an instance.
1202

1203
  This is run on the primary and secondary nodes for an instance.
1204

1205
  @note: this function is called recursively.
1206

1207
  @type disk: L{objects.Disk}
1208
  @param disk: the disk we try to assemble
1209
  @type owner: str
1210
  @param owner: the name of the instance which owns the disk
1211
  @type as_primary: boolean
1212
  @param as_primary: if we should make the block device
1213
      read/write
1214

1215
  @return: the assembled device or None (in case no device
1216
      was assembled)
1217
  @raise errors.BlockDeviceError: in case there is an error
1218
      during the activation of the children or the device
1219
      itself
1220

1221
  """
1222
  children = []
1223
  if disk.children:
1224
    mcn = disk.ChildrenNeeded()
1225
    if mcn == -1:
1226
      mcn = 0 # max number of Nones allowed
1227
    else:
1228
      mcn = len(disk.children) - mcn # max number of Nones
1229
    for chld_disk in disk.children:
1230
      try:
1231
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1232
      except errors.BlockDeviceError, err:
1233
        if children.count(None) >= mcn:
1234
          raise
1235
        cdev = None
1236
        logging.error("Error in child activation (but continuing): %s",
1237
                      str(err))
1238
      children.append(cdev)
1239

    
1240
  if as_primary or disk.AssembleOnSecondary():
1241
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children)
1242
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1243
    result = r_dev
1244
    if as_primary or disk.OpenOnSecondary():
1245
      r_dev.Open()
1246
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1247
                                as_primary, disk.iv_name)
1248

    
1249
  else:
1250
    result = True
1251
  return result
1252

    
1253

    
1254
def BlockdevAssemble(disk, owner, as_primary):
1255
  """Activate a block device for an instance.
1256

1257
  This is a wrapper over _RecursiveAssembleBD.
1258

1259
  @rtype: str or boolean
1260
  @return: a C{/dev/...} path for primary nodes, and
1261
      C{True} for secondary nodes
1262

1263
  """
1264
  status = True
1265
  result = "no error information"
1266
  try:
1267
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1268
    if isinstance(result, bdev.BlockDev):
1269
      result = result.dev_path
1270
  except errors.BlockDeviceError, err:
1271
    result = "Error while assembling disk: %s" % str(err)
1272
    status = False
1273
  return (status, result)
1274

    
1275

    
1276
def BlockdevShutdown(disk):
1277
  """Shut down a block device.
1278

1279
  First, if the device is assembled (Attach() is successfull), then
1280
  the device is shutdown. Then the children of the device are
1281
  shutdown.
1282

1283
  This function is called recursively. Note that we don't cache the
1284
  children or such, as oppossed to assemble, shutdown of different
1285
  devices doesn't require that the upper device was active.
1286

1287
  @type disk: L{objects.Disk}
1288
  @param disk: the description of the disk we should
1289
      shutdown
1290
  @rtype: boolean
1291
  @return: the success of the operation
1292

1293
  """
1294
  msgs = []
1295
  result = True
1296
  r_dev = _RecursiveFindBD(disk)
1297
  if r_dev is not None:
1298
    r_path = r_dev.dev_path
1299
    try:
1300
      r_dev.Shutdown()
1301
      DevCacheManager.RemoveCache(r_path)
1302
    except errors.BlockDeviceError, err:
1303
      msgs.append(str(err))
1304
      result = False
1305

    
1306
  if disk.children:
1307
    for child in disk.children:
1308
      c_status, c_msg = BlockdevShutdown(child)
1309
      result = result and c_status
1310
      if c_msg: # not an empty message
1311
        msgs.append(c_msg)
1312

    
1313
  return (result, "; ".join(msgs))
1314

    
1315

    
1316
def BlockdevAddchildren(parent_cdev, new_cdevs):
1317
  """Extend a mirrored block device.
1318

1319
  @type parent_cdev: L{objects.Disk}
1320
  @param parent_cdev: the disk to which we should add children
1321
  @type new_cdevs: list of L{objects.Disk}
1322
  @param new_cdevs: the list of children which we should add
1323
  @rtype: boolean
1324
  @return: the success of the operation
1325

1326
  """
1327
  parent_bdev = _RecursiveFindBD(parent_cdev)
1328
  if parent_bdev is None:
1329
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1330
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1331
  if new_bdevs.count(None) > 0:
1332
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1333
  parent_bdev.AddChildren(new_bdevs)
1334
  return (True, None)
1335

    
1336

    
1337
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1338
  """Shrink a mirrored block device.
1339

1340
  @type parent_cdev: L{objects.Disk}
1341
  @param parent_cdev: the disk from which we should remove children
1342
  @type new_cdevs: list of L{objects.Disk}
1343
  @param new_cdevs: the list of children which we should remove
1344
  @rtype: boolean
1345
  @return: the success of the operation
1346

1347
  """
1348
  parent_bdev = _RecursiveFindBD(parent_cdev)
1349
  if parent_bdev is None:
1350
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1351
  devs = []
1352
  for disk in new_cdevs:
1353
    rpath = disk.StaticDevPath()
1354
    if rpath is None:
1355
      bd = _RecursiveFindBD(disk)
1356
      if bd is None:
1357
        _Fail("Can't find device %s while removing children", disk)
1358
      else:
1359
        devs.append(bd.dev_path)
1360
    else:
1361
      devs.append(rpath)
1362
  parent_bdev.RemoveChildren(devs)
1363
  return (True, None)
1364

    
1365

    
1366
def BlockdevGetmirrorstatus(disks):
1367
  """Get the mirroring status of a list of devices.
1368

1369
  @type disks: list of L{objects.Disk}
1370
  @param disks: the list of disks which we should query
1371
  @rtype: disk
1372
  @return:
1373
      a list of (mirror_done, estimated_time) tuples, which
1374
      are the result of L{bdev.BlockDev.CombinedSyncStatus}
1375
  @raise errors.BlockDeviceError: if any of the disks cannot be
1376
      found
1377

1378
  """
1379
  stats = []
1380
  for dsk in disks:
1381
    rbd = _RecursiveFindBD(dsk)
1382
    if rbd is None:
1383
      _Fail("Can't find device %s", dsk)
1384
    stats.append(rbd.CombinedSyncStatus())
1385
  return True, stats
1386

    
1387

    
1388
def _RecursiveFindBD(disk):
1389
  """Check if a device is activated.
1390

1391
  If so, return informations about the real device.
1392

1393
  @type disk: L{objects.Disk}
1394
  @param disk: the disk object we need to find
1395

1396
  @return: None if the device can't be found,
1397
      otherwise the device instance
1398

1399
  """
1400
  children = []
1401
  if disk.children:
1402
    for chdisk in disk.children:
1403
      children.append(_RecursiveFindBD(chdisk))
1404

    
1405
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1406

    
1407

    
1408
def BlockdevFind(disk):
1409
  """Check if a device is activated.
1410

1411
  If it is, return informations about the real device.
1412

1413
  @type disk: L{objects.Disk}
1414
  @param disk: the disk to find
1415
  @rtype: None or tuple
1416
  @return: None if the disk cannot be found, otherwise a
1417
      tuple (device_path, major, minor, sync_percent,
1418
      estimated_time, is_degraded)
1419

1420
  """
1421
  try:
1422
    rbd = _RecursiveFindBD(disk)
1423
  except errors.BlockDeviceError, err:
1424
    _Fail("Failed to find device: %s", err, exc=True)
1425
  if rbd is None:
1426
    return (True, None)
1427
  return (True, (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus())
1428

    
1429

    
1430
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1431
  """Write a file to the filesystem.
1432

1433
  This allows the master to overwrite(!) a file. It will only perform
1434
  the operation if the file belongs to a list of configuration files.
1435

1436
  @type file_name: str
1437
  @param file_name: the target file name
1438
  @type data: str
1439
  @param data: the new contents of the file
1440
  @type mode: int
1441
  @param mode: the mode to give the file (can be None)
1442
  @type uid: int
1443
  @param uid: the owner of the file (can be -1 for default)
1444
  @type gid: int
1445
  @param gid: the group of the file (can be -1 for default)
1446
  @type atime: float
1447
  @param atime: the atime to set on the file (can be None)
1448
  @type mtime: float
1449
  @param mtime: the mtime to set on the file (can be None)
1450
  @rtype: boolean
1451
  @return: the success of the operation; errors are logged
1452
      in the node daemon log
1453

1454
  """
1455
  if not os.path.isabs(file_name):
1456
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1457

    
1458
  allowed_files = set([
1459
    constants.CLUSTER_CONF_FILE,
1460
    constants.ETC_HOSTS,
1461
    constants.SSH_KNOWN_HOSTS_FILE,
1462
    constants.VNC_PASSWORD_FILE,
1463
    constants.RAPI_CERT_FILE,
1464
    constants.RAPI_USERS_FILE,
1465
    ])
1466

    
1467
  for hv_name in constants.HYPER_TYPES:
1468
    hv_class = hypervisor.GetHypervisor(hv_name)
1469
    allowed_files.update(hv_class.GetAncillaryFiles())
1470

    
1471
  if file_name not in allowed_files:
1472
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1473
          file_name)
1474

    
1475
  raw_data = _Decompress(data)
1476

    
1477
  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1478
                  atime=atime, mtime=mtime)
1479
  return (True, "success")
1480

    
1481

    
1482
def WriteSsconfFiles(values):
1483
  """Update all ssconf files.
1484

1485
  Wrapper around the SimpleStore.WriteFiles.
1486

1487
  """
1488
  ssconf.SimpleStore().WriteFiles(values)
1489

    
1490

    
1491
def _ErrnoOrStr(err):
1492
  """Format an EnvironmentError exception.
1493

1494
  If the L{err} argument has an errno attribute, it will be looked up
1495
  and converted into a textual C{E...} description. Otherwise the
1496
  string representation of the error will be returned.
1497

1498
  @type err: L{EnvironmentError}
1499
  @param err: the exception to format
1500

1501
  """
1502
  if hasattr(err, 'errno'):
1503
    detail = errno.errorcode[err.errno]
1504
  else:
1505
    detail = str(err)
1506
  return detail
1507

    
1508

    
1509
def _OSOndiskVersion(name, os_dir):
1510
  """Compute and return the API version of a given OS.
1511

1512
  This function will try to read the API version of the OS given by
1513
  the 'name' parameter and residing in the 'os_dir' directory.
1514

1515
  @type name: str
1516
  @param name: the OS name we should look for
1517
  @type os_dir: str
1518
  @param os_dir: the directory inwhich we should look for the OS
1519
  @rtype: int or None
1520
  @return:
1521
      Either an integer denoting the version or None in the
1522
      case when this is not a valid OS name.
1523
  @raise errors.InvalidOS: if the OS cannot be found
1524

1525
  """
1526
  api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1527

    
1528
  try:
1529
    st = os.stat(api_file)
1530
  except EnvironmentError, err:
1531
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1532
                           " found (%s)" % _ErrnoOrStr(err))
1533

    
1534
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1535
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1536
                           " a regular file")
1537

    
1538
  try:
1539
    f = open(api_file)
1540
    try:
1541
      api_versions = f.readlines()
1542
    finally:
1543
      f.close()
1544
  except EnvironmentError, err:
1545
    raise errors.InvalidOS(name, os_dir, "error while reading the"
1546
                           " API version (%s)" % _ErrnoOrStr(err))
1547

    
1548
  api_versions = [version.strip() for version in api_versions]
1549
  try:
1550
    api_versions = [int(version) for version in api_versions]
1551
  except (TypeError, ValueError), err:
1552
    raise errors.InvalidOS(name, os_dir,
1553
                           "API version is not integer (%s)" % str(err))
1554

    
1555
  return api_versions
1556

    
1557

    
1558
def DiagnoseOS(top_dirs=None):
1559
  """Compute the validity for all OSes.
1560

1561
  @type top_dirs: list
1562
  @param top_dirs: the list of directories in which to
1563
      search (if not given defaults to
1564
      L{constants.OS_SEARCH_PATH})
1565
  @rtype: list of L{objects.OS}
1566
  @return: an OS object for each name in all the given
1567
      directories
1568

1569
  """
1570
  if top_dirs is None:
1571
    top_dirs = constants.OS_SEARCH_PATH
1572

    
1573
  result = []
1574
  for dir_name in top_dirs:
1575
    if os.path.isdir(dir_name):
1576
      try:
1577
        f_names = utils.ListVisibleFiles(dir_name)
1578
      except EnvironmentError, err:
1579
        logging.exception("Can't list the OS directory %s", dir_name)
1580
        break
1581
      for name in f_names:
1582
        try:
1583
          os_inst = OSFromDisk(name, base_dir=dir_name)
1584
          result.append(os_inst)
1585
        except errors.InvalidOS, err:
1586
          result.append(objects.OS.FromInvalidOS(err))
1587

    
1588
  return result
1589

    
1590

    
1591
def OSFromDisk(name, base_dir=None):
1592
  """Create an OS instance from disk.
1593

1594
  This function will return an OS instance if the given name is a
1595
  valid OS name. Otherwise, it will raise an appropriate
1596
  L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1597

1598
  @type base_dir: string
1599
  @keyword base_dir: Base directory containing OS installations.
1600
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1601
  @rtype: L{objects.OS}
1602
  @return: the OS instance if we find a valid one
1603
  @raise errors.InvalidOS: if we don't find a valid OS
1604

1605
  """
1606
  if base_dir is None:
1607
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1608
    if os_dir is None:
1609
      raise errors.InvalidOS(name, None, "OS dir not found in search path")
1610
  else:
1611
    os_dir = os.path.sep.join([base_dir, name])
1612

    
1613
  api_versions = _OSOndiskVersion(name, os_dir)
1614

    
1615
  if constants.OS_API_VERSION not in api_versions:
1616
    raise errors.InvalidOS(name, os_dir, "API version mismatch"
1617
                           " (found %s want %s)"
1618
                           % (api_versions, constants.OS_API_VERSION))
1619

    
1620
  # OS Scripts dictionary, we will populate it with the actual script names
1621
  os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1622

    
1623
  for script in os_scripts:
1624
    os_scripts[script] = os.path.sep.join([os_dir, script])
1625

    
1626
    try:
1627
      st = os.stat(os_scripts[script])
1628
    except EnvironmentError, err:
1629
      raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1630
                             (script, _ErrnoOrStr(err)))
1631

    
1632
    if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1633
      raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1634
                             script)
1635

    
1636
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1637
      raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1638
                             script)
1639

    
1640

    
1641
  return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1642
                    create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1643
                    export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1644
                    import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1645
                    rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1646
                    api_versions=api_versions)
1647

    
1648
def OSEnvironment(instance, debug=0):
1649
  """Calculate the environment for an os script.
1650

1651
  @type instance: L{objects.Instance}
1652
  @param instance: target instance for the os script run
1653
  @type debug: integer
1654
  @param debug: debug level (0 or 1, for OS Api 10)
1655
  @rtype: dict
1656
  @return: dict of environment variables
1657
  @raise errors.BlockDeviceError: if the block device
1658
      cannot be found
1659

1660
  """
1661
  result = {}
1662
  result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1663
  result['INSTANCE_NAME'] = instance.name
1664
  result['INSTANCE_OS'] = instance.os
1665
  result['HYPERVISOR'] = instance.hypervisor
1666
  result['DISK_COUNT'] = '%d' % len(instance.disks)
1667
  result['NIC_COUNT'] = '%d' % len(instance.nics)
1668
  result['DEBUG_LEVEL'] = '%d' % debug
1669
  for idx, disk in enumerate(instance.disks):
1670
    real_disk = _RecursiveFindBD(disk)
1671
    if real_disk is None:
1672
      raise errors.BlockDeviceError("Block device '%s' is not set up" %
1673
                                    str(disk))
1674
    real_disk.Open()
1675
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
1676
    result['DISK_%d_ACCESS' % idx] = disk.mode
1677
    if constants.HV_DISK_TYPE in instance.hvparams:
1678
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
1679
        instance.hvparams[constants.HV_DISK_TYPE]
1680
    if disk.dev_type in constants.LDS_BLOCK:
1681
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1682
    elif disk.dev_type == constants.LD_FILE:
1683
      result['DISK_%d_BACKEND_TYPE' % idx] = \
1684
        'file:%s' % disk.physical_id[0]
1685
  for idx, nic in enumerate(instance.nics):
1686
    result['NIC_%d_MAC' % idx] = nic.mac
1687
    if nic.ip:
1688
      result['NIC_%d_IP' % idx] = nic.ip
1689
    result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1690
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1691
      result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1692
    if nic.nicparams[constants.NIC_LINK]:
1693
      result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1694
    if constants.HV_NIC_TYPE in instance.hvparams:
1695
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
1696
        instance.hvparams[constants.HV_NIC_TYPE]
1697

    
1698
  return result
1699

    
1700
def BlockdevGrow(disk, amount):
1701
  """Grow a stack of block devices.
1702

1703
  This function is called recursively, with the childrens being the
1704
  first ones to resize.
1705

1706
  @type disk: L{objects.Disk}
1707
  @param disk: the disk to be grown
1708
  @rtype: (status, result)
1709
  @return: a tuple with the status of the operation
1710
      (True/False), and the errors message if status
1711
      is False
1712

1713
  """
1714
  r_dev = _RecursiveFindBD(disk)
1715
  if r_dev is None:
1716
    return False, "Cannot find block device %s" % (disk,)
1717

    
1718
  try:
1719
    r_dev.Grow(amount)
1720
  except errors.BlockDeviceError, err:
1721
    _Fail("Failed to grow block device: %s", err, exc=True)
1722

    
1723
  return True, None
1724

    
1725

    
1726
def BlockdevSnapshot(disk):
1727
  """Create a snapshot copy of a block device.
1728

1729
  This function is called recursively, and the snapshot is actually created
1730
  just for the leaf lvm backend device.
1731

1732
  @type disk: L{objects.Disk}
1733
  @param disk: the disk to be snapshotted
1734
  @rtype: string
1735
  @return: snapshot disk path
1736

1737
  """
1738
  if disk.children:
1739
    if len(disk.children) == 1:
1740
      # only one child, let's recurse on it
1741
      return BlockdevSnapshot(disk.children[0])
1742
    else:
1743
      # more than one child, choose one that matches
1744
      for child in disk.children:
1745
        if child.size == disk.size:
1746
          # return implies breaking the loop
1747
          return BlockdevSnapshot(child)
1748
  elif disk.dev_type == constants.LD_LV:
1749
    r_dev = _RecursiveFindBD(disk)
1750
    if r_dev is not None:
1751
      # let's stay on the safe side and ask for the full size, for now
1752
      return True, r_dev.Snapshot(disk.size)
1753
    else:
1754
      _Fail("Cannot find block device %s", disk)
1755
  else:
1756
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1757
          disk.unique_id, disk.dev_type)
1758

    
1759

    
1760
def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1761
  """Export a block device snapshot to a remote node.
1762

1763
  @type disk: L{objects.Disk}
1764
  @param disk: the description of the disk to export
1765
  @type dest_node: str
1766
  @param dest_node: the destination node to export to
1767
  @type instance: L{objects.Instance}
1768
  @param instance: the instance object to whom the disk belongs
1769
  @type cluster_name: str
1770
  @param cluster_name: the cluster name, needed for SSH hostalias
1771
  @type idx: int
1772
  @param idx: the index of the disk in the instance's disk list,
1773
      used to export to the OS scripts environment
1774
  @rtype: boolean
1775
  @return: the success of the operation
1776

1777
  """
1778
  export_env = OSEnvironment(instance)
1779

    
1780
  inst_os = OSFromDisk(instance.os)
1781
  export_script = inst_os.export_script
1782

    
1783
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1784
                                     instance.name, int(time.time()))
1785
  if not os.path.exists(constants.LOG_OS_DIR):
1786
    os.mkdir(constants.LOG_OS_DIR, 0750)
1787
  real_disk = _RecursiveFindBD(disk)
1788
  if real_disk is None:
1789
    _Fail("Block device '%s' is not set up", disk)
1790

    
1791
  real_disk.Open()
1792

    
1793
  export_env['EXPORT_DEVICE'] = real_disk.dev_path
1794
  export_env['EXPORT_INDEX'] = str(idx)
1795

    
1796
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1797
  destfile = disk.physical_id[1]
1798

    
1799
  # the target command is built out of three individual commands,
1800
  # which are joined by pipes; we check each individual command for
1801
  # valid parameters
1802
  expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1803
                               export_script, logfile)
1804

    
1805
  comprcmd = "gzip"
1806

    
1807
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1808
                                destdir, destdir, destfile)
1809
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1810
                                                   constants.GANETI_RUNAS,
1811
                                                   destcmd)
1812

    
1813
  # all commands have been checked, so we're safe to combine them
1814
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1815

    
1816
  result = utils.RunCmd(command, env=export_env)
1817

    
1818
  if result.failed:
1819
    _Fail("OS snapshot export command '%s' returned error: %s"
1820
          " output: %s", command, result.fail_reason, result.output)
1821

    
1822
  return (True, None)
1823

    
1824

    
1825
def FinalizeExport(instance, snap_disks):
1826
  """Write out the export configuration information.
1827

1828
  @type instance: L{objects.Instance}
1829
  @param instance: the instance which we export, used for
1830
      saving configuration
1831
  @type snap_disks: list of L{objects.Disk}
1832
  @param snap_disks: list of snapshot block devices, which
1833
      will be used to get the actual name of the dump file
1834

1835
  @rtype: boolean
1836
  @return: the success of the operation
1837

1838
  """
1839
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1840
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1841

    
1842
  config = objects.SerializableConfigParser()
1843

    
1844
  config.add_section(constants.INISECT_EXP)
1845
  config.set(constants.INISECT_EXP, 'version', '0')
1846
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1847
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1848
  config.set(constants.INISECT_EXP, 'os', instance.os)
1849
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
1850

    
1851
  config.add_section(constants.INISECT_INS)
1852
  config.set(constants.INISECT_INS, 'name', instance.name)
1853
  config.set(constants.INISECT_INS, 'memory', '%d' %
1854
             instance.beparams[constants.BE_MEMORY])
1855
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
1856
             instance.beparams[constants.BE_VCPUS])
1857
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1858

    
1859
  nic_total = 0
1860
  for nic_count, nic in enumerate(instance.nics):
1861
    nic_total += 1
1862
    config.set(constants.INISECT_INS, 'nic%d_mac' %
1863
               nic_count, '%s' % nic.mac)
1864
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1865
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1866
               '%s' % nic.bridge)
1867
  # TODO: redundant: on load can read nics until it doesn't exist
1868
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1869

    
1870
  disk_total = 0
1871
  for disk_count, disk in enumerate(snap_disks):
1872
    if disk:
1873
      disk_total += 1
1874
      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1875
                 ('%s' % disk.iv_name))
1876
      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1877
                 ('%s' % disk.physical_id[1]))
1878
      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1879
                 ('%d' % disk.size))
1880

    
1881
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1882

    
1883
  utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1884
                  data=config.Dumps())
1885
  shutil.rmtree(finaldestdir, True)
1886
  shutil.move(destdir, finaldestdir)
1887

    
1888
  return True, None
1889

    
1890

    
1891
def ExportInfo(dest):
1892
  """Get export configuration information.
1893

1894
  @type dest: str
1895
  @param dest: directory containing the export
1896

1897
  @rtype: L{objects.SerializableConfigParser}
1898
  @return: a serializable config file containing the
1899
      export info
1900

1901
  """
1902
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1903

    
1904
  config = objects.SerializableConfigParser()
1905
  config.read(cff)
1906

    
1907
  if (not config.has_section(constants.INISECT_EXP) or
1908
      not config.has_section(constants.INISECT_INS)):
1909
    _Fail("Export info file doesn't have the required fields")
1910

    
1911
  return True, config.Dumps()
1912

    
1913

    
1914
def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1915
  """Import an os image into an instance.
1916

1917
  @type instance: L{objects.Instance}
1918
  @param instance: instance to import the disks into
1919
  @type src_node: string
1920
  @param src_node: source node for the disk images
1921
  @type src_images: list of string
1922
  @param src_images: absolute paths of the disk images
1923
  @rtype: list of boolean
1924
  @return: each boolean represent the success of importing the n-th disk
1925

1926
  """
1927
  import_env = OSEnvironment(instance)
1928
  inst_os = OSFromDisk(instance.os)
1929
  import_script = inst_os.import_script
1930

    
1931
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1932
                                        instance.name, int(time.time()))
1933
  if not os.path.exists(constants.LOG_OS_DIR):
1934
    os.mkdir(constants.LOG_OS_DIR, 0750)
1935

    
1936
  comprcmd = "gunzip"
1937
  impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1938
                               import_script, logfile)
1939

    
1940
  final_result = []
1941
  for idx, image in enumerate(src_images):
1942
    if image:
1943
      destcmd = utils.BuildShellCmd('cat %s', image)
1944
      remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1945
                                                       constants.GANETI_RUNAS,
1946
                                                       destcmd)
1947
      command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1948
      import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1949
      import_env['IMPORT_INDEX'] = str(idx)
1950
      result = utils.RunCmd(command, env=import_env)
1951
      if result.failed:
1952
        logging.error("Disk import command '%s' returned error: %s"
1953
                      " output: %s", command, result.fail_reason,
1954
                      result.output)
1955
        final_result.append("error importing disk %d: %s, %s" %
1956
                            (idx, result.fail_reason, result.output[-100]))
1957

    
1958
  if final_result:
1959
    return False, "; ".join(final_result)
1960
  return True, None
1961

    
1962

    
1963
def ListExports():
1964
  """Return a list of exports currently available on this machine.
1965

1966
  @rtype: list
1967
  @return: list of the exports
1968

1969
  """
1970
  if os.path.isdir(constants.EXPORT_DIR):
1971
    return True, utils.ListVisibleFiles(constants.EXPORT_DIR)
1972
  else:
1973
    return False, "No exports directory"
1974

    
1975

    
1976
def RemoveExport(export):
1977
  """Remove an existing export from the node.
1978

1979
  @type export: str
1980
  @param export: the name of the export to remove
1981
  @rtype: boolean
1982
  @return: the success of the operation
1983

1984
  """
1985
  target = os.path.join(constants.EXPORT_DIR, export)
1986

    
1987
  try:
1988
    shutil.rmtree(target)
1989
  except EnvironmentError, err:
1990
    _Fail("Error while removing the export: %s", err, exc=True)
1991

    
1992
  return True, None
1993

    
1994

    
1995
def BlockdevRename(devlist):
1996
  """Rename a list of block devices.
1997

1998
  @type devlist: list of tuples
1999
  @param devlist: list of tuples of the form  (disk,
2000
      new_logical_id, new_physical_id); disk is an
2001
      L{objects.Disk} object describing the current disk,
2002
      and new logical_id/physical_id is the name we
2003
      rename it to
2004
  @rtype: boolean
2005
  @return: True if all renames succeeded, False otherwise
2006

2007
  """
2008
  msgs = []
2009
  result = True
2010
  for disk, unique_id in devlist:
2011
    dev = _RecursiveFindBD(disk)
2012
    if dev is None:
2013
      msgs.append("Can't find device %s in rename" % str(disk))
2014
      result = False
2015
      continue
2016
    try:
2017
      old_rpath = dev.dev_path
2018
      dev.Rename(unique_id)
2019
      new_rpath = dev.dev_path
2020
      if old_rpath != new_rpath:
2021
        DevCacheManager.RemoveCache(old_rpath)
2022
        # FIXME: we should add the new cache information here, like:
2023
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2024
        # but we don't have the owner here - maybe parse from existing
2025
        # cache? for now, we only lose lvm data when we rename, which
2026
        # is less critical than DRBD or MD
2027
    except errors.BlockDeviceError, err:
2028
      msgs.append("Can't rename device '%s' to '%s': %s" %
2029
                  (dev, unique_id, err))
2030
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2031
      result = False
2032
  return (result, "; ".join(msgs))
2033

    
2034

    
2035
def _TransformFileStorageDir(file_storage_dir):
2036
  """Checks whether given file_storage_dir is valid.
2037

2038
  Checks wheter the given file_storage_dir is within the cluster-wide
2039
  default file_storage_dir stored in SimpleStore. Only paths under that
2040
  directory are allowed.
2041

2042
  @type file_storage_dir: str
2043
  @param file_storage_dir: the path to check
2044

2045
  @return: the normalized path if valid, None otherwise
2046

2047
  """
2048
  cfg = _GetConfig()
2049
  file_storage_dir = os.path.normpath(file_storage_dir)
2050
  base_file_storage_dir = cfg.GetFileStorageDir()
2051
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2052
      base_file_storage_dir):
2053
    logging.error("file storage directory '%s' is not under base file"
2054
                  " storage directory '%s'",
2055
                  file_storage_dir, base_file_storage_dir)
2056
    return None
2057
  return file_storage_dir
2058

    
2059

    
2060
def CreateFileStorageDir(file_storage_dir):
2061
  """Create file storage directory.
2062

2063
  @type file_storage_dir: str
2064
  @param file_storage_dir: directory to create
2065

2066
  @rtype: tuple
2067
  @return: tuple with first element a boolean indicating wheter dir
2068
      creation was successful or not
2069

2070
  """
2071
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2072
  result = True,
2073
  if not file_storage_dir:
2074
    result = False,
2075
  else:
2076
    if os.path.exists(file_storage_dir):
2077
      if not os.path.isdir(file_storage_dir):
2078
        logging.error("'%s' is not a directory", file_storage_dir)
2079
        result = False,
2080
    else:
2081
      try:
2082
        os.makedirs(file_storage_dir, 0750)
2083
      except OSError, err:
2084
        logging.error("Cannot create file storage directory '%s': %s",
2085
                      file_storage_dir, err)
2086
        result = False,
2087
  return result
2088

    
2089

    
2090
def RemoveFileStorageDir(file_storage_dir):
2091
  """Remove file storage directory.
2092

2093
  Remove it only if it's empty. If not log an error and return.
2094

2095
  @type file_storage_dir: str
2096
  @param file_storage_dir: the directory we should cleanup
2097
  @rtype: tuple (success,)
2098
  @return: tuple of one element, C{success}, denoting
2099
      whether the operation was successfull
2100

2101
  """
2102
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2103
  result = True,
2104
  if not file_storage_dir:
2105
    result = False,
2106
  else:
2107
    if os.path.exists(file_storage_dir):
2108
      if not os.path.isdir(file_storage_dir):
2109
        logging.error("'%s' is not a directory", file_storage_dir)
2110
        result = False,
2111
      # deletes dir only if empty, otherwise we want to return False
2112
      try:
2113
        os.rmdir(file_storage_dir)
2114
      except OSError, err:
2115
        logging.exception("Cannot remove file storage directory '%s'",
2116
                          file_storage_dir)
2117
        result = False,
2118
  return result
2119

    
2120

    
2121
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2122
  """Rename the file storage directory.
2123

2124
  @type old_file_storage_dir: str
2125
  @param old_file_storage_dir: the current path
2126
  @type new_file_storage_dir: str
2127
  @param new_file_storage_dir: the name we should rename to
2128
  @rtype: tuple (success,)
2129
  @return: tuple of one element, C{success}, denoting
2130
      whether the operation was successful
2131

2132
  """
2133
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2134
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2135
  result = True,
2136
  if not old_file_storage_dir or not new_file_storage_dir:
2137
    result = False,
2138
  else:
2139
    if not os.path.exists(new_file_storage_dir):
2140
      if os.path.isdir(old_file_storage_dir):
2141
        try:
2142
          os.rename(old_file_storage_dir, new_file_storage_dir)
2143
        except OSError, err:
2144
          logging.exception("Cannot rename '%s' to '%s'",
2145
                            old_file_storage_dir, new_file_storage_dir)
2146
          result =  False,
2147
      else:
2148
        logging.error("'%s' is not a directory", old_file_storage_dir)
2149
        result = False,
2150
    else:
2151
      if os.path.exists(old_file_storage_dir):
2152
        logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
2153
                      old_file_storage_dir, new_file_storage_dir)
2154
        result = False,
2155
  return result
2156

    
2157

    
2158
def _IsJobQueueFile(file_name):
2159
  """Checks whether the given filename is in the queue directory.
2160

2161
  @type file_name: str
2162
  @param file_name: the file name we should check
2163
  @rtype: boolean
2164
  @return: whether the file is under the queue directory
2165

2166
  """
2167
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2168
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2169

    
2170
  if not result:
2171
    logging.error("'%s' is not a file in the queue directory",
2172
                  file_name)
2173

    
2174
  return result
2175

    
2176

    
2177
def JobQueueUpdate(file_name, content):
2178
  """Updates a file in the queue directory.
2179

2180
  This is just a wrapper over L{utils.WriteFile}, with proper
2181
  checking.
2182

2183
  @type file_name: str
2184
  @param file_name: the job file name
2185
  @type content: str
2186
  @param content: the new job contents
2187
  @rtype: boolean
2188
  @return: the success of the operation
2189

2190
  """
2191
  if not _IsJobQueueFile(file_name):
2192
    return False
2193

    
2194
  # Write and replace the file atomically
2195
  utils.WriteFile(file_name, data=_Decompress(content))
2196

    
2197
  return True
2198

    
2199

    
2200
def JobQueueRename(old, new):
2201
  """Renames a job queue file.
2202

2203
  This is just a wrapper over os.rename with proper checking.
2204

2205
  @type old: str
2206
  @param old: the old (actual) file name
2207
  @type new: str
2208
  @param new: the desired file name
2209
  @rtype: boolean
2210
  @return: the success of the operation
2211

2212
  """
2213
  if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2214
    return False
2215

    
2216
  utils.RenameFile(old, new, mkdir=True)
2217

    
2218
  return True
2219

    
2220

    
2221
def JobQueueSetDrainFlag(drain_flag):
2222
  """Set the drain flag for the queue.
2223

2224
  This will set or unset the queue drain flag.
2225

2226
  @type drain_flag: boolean
2227
  @param drain_flag: if True, will set the drain flag, otherwise reset it.
2228
  @rtype: boolean
2229
  @return: always True
2230
  @warning: the function always returns True
2231

2232
  """
2233
  if drain_flag:
2234
    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2235
  else:
2236
    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2237

    
2238
  return True
2239

    
2240

    
2241
def BlockdevClose(instance_name, disks):
2242
  """Closes the given block devices.
2243

2244
  This means they will be switched to secondary mode (in case of
2245
  DRBD).
2246

2247
  @param instance_name: if the argument is not empty, the symlinks
2248
      of this instance will be removed
2249
  @type disks: list of L{objects.Disk}
2250
  @param disks: the list of disks to be closed
2251
  @rtype: tuple (success, message)
2252
  @return: a tuple of success and message, where success
2253
      indicates the succes of the operation, and message
2254
      which will contain the error details in case we
2255
      failed
2256

2257
  """
2258
  bdevs = []
2259
  for cf in disks:
2260
    rd = _RecursiveFindBD(cf)
2261
    if rd is None:
2262
      _Fail("Can't find device %s", cf)
2263
    bdevs.append(rd)
2264

    
2265
  msg = []
2266
  for rd in bdevs:
2267
    try:
2268
      rd.Close()
2269
    except errors.BlockDeviceError, err:
2270
      msg.append(str(err))
2271
  if msg:
2272
    return (False, "Can't make devices secondary: %s" % ",".join(msg))
2273
  else:
2274
    if instance_name:
2275
      _RemoveBlockDevLinks(instance_name, disks)
2276
    return (True, "All devices secondary")
2277

    
2278

    
2279
def ValidateHVParams(hvname, hvparams):
2280
  """Validates the given hypervisor parameters.
2281

2282
  @type hvname: string
2283
  @param hvname: the hypervisor name
2284
  @type hvparams: dict
2285
  @param hvparams: the hypervisor parameters to be validated
2286
  @rtype: tuple (success, message)
2287
  @return: a tuple of success and message, where success
2288
      indicates the succes of the operation, and message
2289
      which will contain the error details in case we
2290
      failed
2291

2292
  """
2293
  try:
2294
    hv_type = hypervisor.GetHypervisor(hvname)
2295
    hv_type.ValidateParameters(hvparams)
2296
    return (True, "Validation passed")
2297
  except errors.HypervisorError, err:
2298
    return (False, str(err))
2299

    
2300

    
2301
def DemoteFromMC():
2302
  """Demotes the current node from master candidate role.
2303

2304
  """
2305
  # try to ensure we're not the master by mistake
2306
  master, myself = ssconf.GetMasterAndMyself()
2307
  if master == myself:
2308
    return (False, "ssconf status shows I'm the master node, will not demote")
2309
  pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2310
  if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2311
    return (False, "The master daemon is running, will not demote")
2312
  try:
2313
    utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2314
  except EnvironmentError, err:
2315
    if err.errno != errno.ENOENT:
2316
      return (False, "Error while backing up cluster file: %s" % str(err))
2317
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2318
  return (True, "Done")
2319

    
2320

    
2321
def _FindDisks(nodes_ip, disks):
2322
  """Sets the physical ID on disks and returns the block devices.
2323

2324
  """
2325
  # set the correct physical ID
2326
  my_name = utils.HostInfo().name
2327
  for cf in disks:
2328
    cf.SetPhysicalID(my_name, nodes_ip)
2329

    
2330
  bdevs = []
2331

    
2332
  for cf in disks:
2333
    rd = _RecursiveFindBD(cf)
2334
    if rd is None:
2335
      return (False, "Can't find device %s" % cf)
2336
    bdevs.append(rd)
2337
  return (True, bdevs)
2338

    
2339

    
2340
def DrbdDisconnectNet(nodes_ip, disks):
2341
  """Disconnects the network on a list of drbd devices.
2342

2343
  """
2344
  status, bdevs = _FindDisks(nodes_ip, disks)
2345
  if not status:
2346
    return status, bdevs
2347

    
2348
  # disconnect disks
2349
  for rd in bdevs:
2350
    try:
2351
      rd.DisconnectNet()
2352
    except errors.BlockDeviceError, err:
2353
      _Fail("Can't change network configuration to standalone mode: %s",
2354
            err, exc=True)
2355
  return (True, "All disks are now disconnected")
2356

    
2357

    
2358
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2359
  """Attaches the network on a list of drbd devices.
2360

2361
  """
2362
  status, bdevs = _FindDisks(nodes_ip, disks)
2363
  if not status:
2364
    return status, bdevs
2365

    
2366
  if multimaster:
2367
    for idx, rd in enumerate(bdevs):
2368
      try:
2369
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2370
      except EnvironmentError, err:
2371
        _Fail("Can't create symlink: %s", err)
2372
  # reconnect disks, switch to new master configuration and if
2373
  # needed primary mode
2374
  for rd in bdevs:
2375
    try:
2376
      rd.AttachNet(multimaster)
2377
    except errors.BlockDeviceError, err:
2378
      _Fail("Can't change network configuration: %s", err)
2379
  # wait until the disks are connected; we need to retry the re-attach
2380
  # if the device becomes standalone, as this might happen if the one
2381
  # node disconnects and reconnects in a different mode before the
2382
  # other node reconnects; in this case, one or both of the nodes will
2383
  # decide it has wrong configuration and switch to standalone
2384
  RECONNECT_TIMEOUT = 2 * 60
2385
  sleep_time = 0.100 # start with 100 miliseconds
2386
  timeout_limit = time.time() + RECONNECT_TIMEOUT
2387
  while time.time() < timeout_limit:
2388
    all_connected = True
2389
    for rd in bdevs:
2390
      stats = rd.GetProcStatus()
2391
      if not (stats.is_connected or stats.is_in_resync):
2392
        all_connected = False
2393
      if stats.is_standalone:
2394
        # peer had different config info and this node became
2395
        # standalone, even though this should not happen with the
2396
        # new staged way of changing disk configs
2397
        try:
2398
          rd.ReAttachNet(multimaster)
2399
        except errors.BlockDeviceError, err:
2400
          _Fail("Can't change network configuration: %s", err)
2401
    if all_connected:
2402
      break
2403
    time.sleep(sleep_time)
2404
    sleep_time = min(5, sleep_time * 1.5)
2405
  if not all_connected:
2406
    return (False, "Timeout in disk reconnecting")
2407
  if multimaster:
2408
    # change to primary mode
2409
    for rd in bdevs:
2410
      try:
2411
        rd.Open()
2412
      except errors.BlockDeviceError, err:
2413
        _Fail("Can't change to primary mode: %s", err)
2414
  if multimaster:
2415
    msg = "multi-master and primary"
2416
  else:
2417
    msg = "single-master"
2418
  return (True, "Disks are now configured as %s" % msg)
2419

    
2420

    
2421
def DrbdWaitSync(nodes_ip, disks):
2422
  """Wait until DRBDs have synchronized.
2423

2424
  """
2425
  status, bdevs = _FindDisks(nodes_ip, disks)
2426
  if not status:
2427
    return status, bdevs
2428

    
2429
  min_resync = 100
2430
  alldone = True
2431
  failure = False
2432
  for rd in bdevs:
2433
    stats = rd.GetProcStatus()
2434
    if not (stats.is_connected or stats.is_in_resync):
2435
      failure = True
2436
      break
2437
    alldone = alldone and (not stats.is_in_resync)
2438
    if stats.sync_percent is not None:
2439
      min_resync = min(min_resync, stats.sync_percent)
2440
  return (not failure, (alldone, min_resync))
2441

    
2442

    
2443
def PowercycleNode(hypervisor_type):
2444
  """Hard-powercycle the node.
2445

2446
  Because we need to return first, and schedule the powercycle in the
2447
  background, we won't be able to report failures nicely.
2448

2449
  """
2450
  hyper = hypervisor.GetHypervisor(hypervisor_type)
2451
  try:
2452
    pid = os.fork()
2453
  except OSError, err:
2454
    # if we can't fork, we'll pretend that we're in the child process
2455
    pid = 0
2456
  if pid > 0:
2457
    return (True, "Reboot scheduled in 5 seconds")
2458
  time.sleep(5)
2459
  hyper.PowercycleNode()
2460

    
2461

    
2462
class HooksRunner(object):
2463
  """Hook runner.
2464

2465
  This class is instantiated on the node side (ganeti-noded) and not
2466
  on the master side.
2467

2468
  """
2469
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2470

    
2471
  def __init__(self, hooks_base_dir=None):
2472
    """Constructor for hooks runner.
2473

2474
    @type hooks_base_dir: str or None
2475
    @param hooks_base_dir: if not None, this overrides the
2476
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
2477

2478
    """
2479
    if hooks_base_dir is None:
2480
      hooks_base_dir = constants.HOOKS_BASE_DIR
2481
    self._BASE_DIR = hooks_base_dir
2482

    
2483
  @staticmethod
2484
  def ExecHook(script, env):
2485
    """Exec one hook script.
2486

2487
    @type script: str
2488
    @param script: the full path to the script
2489
    @type env: dict
2490
    @param env: the environment with which to exec the script
2491
    @rtype: tuple (success, message)
2492
    @return: a tuple of success and message, where success
2493
        indicates the succes of the operation, and message
2494
        which will contain the error details in case we
2495
        failed
2496

2497
    """
2498
    # exec the process using subprocess and log the output
2499
    fdstdin = None
2500
    try:
2501
      fdstdin = open("/dev/null", "r")
2502
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2503
                               stderr=subprocess.STDOUT, close_fds=True,
2504
                               shell=False, cwd="/", env=env)
2505
      output = ""
2506
      try:
2507
        output = child.stdout.read(4096)
2508
        child.stdout.close()
2509
      except EnvironmentError, err:
2510
        output += "Hook script error: %s" % str(err)
2511

    
2512
      while True:
2513
        try:
2514
          result = child.wait()
2515
          break
2516
        except EnvironmentError, err:
2517
          if err.errno == errno.EINTR:
2518
            continue
2519
          raise
2520
    finally:
2521
      # try not to leak fds
2522
      for fd in (fdstdin, ):
2523
        if fd is not None:
2524
          try:
2525
            fd.close()
2526
          except EnvironmentError, err:
2527
            # just log the error
2528
            #logging.exception("Error while closing fd %s", fd)
2529
            pass
2530

    
2531
    return result == 0, utils.SafeEncode(output.strip())
2532

    
2533
  def RunHooks(self, hpath, phase, env):
2534
    """Run the scripts in the hooks directory.
2535

2536
    @type hpath: str
2537
    @param hpath: the path to the hooks directory which
2538
        holds the scripts
2539
    @type phase: str
2540
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
2541
        L{constants.HOOKS_PHASE_POST}
2542
    @type env: dict
2543
    @param env: dictionary with the environment for the hook
2544
    @rtype: list
2545
    @return: list of 3-element tuples:
2546
      - script path
2547
      - script result, either L{constants.HKR_SUCCESS} or
2548
        L{constants.HKR_FAIL}
2549
      - output of the script
2550

2551
    @raise errors.ProgrammerError: for invalid input
2552
        parameters
2553

2554
    """
2555
    if phase == constants.HOOKS_PHASE_PRE:
2556
      suffix = "pre"
2557
    elif phase == constants.HOOKS_PHASE_POST:
2558
      suffix = "post"
2559
    else:
2560
      raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2561
    rr = []
2562

    
2563
    subdir = "%s-%s.d" % (hpath, suffix)
2564
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2565
    try:
2566
      dir_contents = utils.ListVisibleFiles(dir_name)
2567
    except OSError, err:
2568
      # FIXME: must log output in case of failures
2569
      return rr
2570

    
2571
    # we use the standard python sort order,
2572
    # so 00name is the recommended naming scheme
2573
    dir_contents.sort()
2574
    for relname in dir_contents:
2575
      fname = os.path.join(dir_name, relname)
2576
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2577
          self.RE_MASK.match(relname) is not None):
2578
        rrval = constants.HKR_SKIP
2579
        output = ""
2580
      else:
2581
        result, output = self.ExecHook(fname, env)
2582
        if not result:
2583
          rrval = constants.HKR_FAIL
2584
        else:
2585
          rrval = constants.HKR_SUCCESS
2586
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
2587

    
2588
    return rr
2589

    
2590

    
2591
class IAllocatorRunner(object):
2592
  """IAllocator runner.
2593

2594
  This class is instantiated on the node side (ganeti-noded) and not on
2595
  the master side.
2596

2597
  """
2598
  def Run(self, name, idata):
2599
    """Run an iallocator script.
2600

2601
    @type name: str
2602
    @param name: the iallocator script name
2603
    @type idata: str
2604
    @param idata: the allocator input data
2605

2606
    @rtype: tuple
2607
    @return: four element tuple of:
2608
       - run status (one of the IARUN_ constants)
2609
       - stdout
2610
       - stderr
2611
       - fail reason (as from L{utils.RunResult})
2612

2613
    """
2614
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2615
                                  os.path.isfile)
2616
    if alloc_script is None:
2617
      return (constants.IARUN_NOTFOUND, None, None, None)
2618

    
2619
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2620
    try:
2621
      os.write(fd, idata)
2622
      os.close(fd)
2623
      result = utils.RunCmd([alloc_script, fin_name])
2624
      if result.failed:
2625
        return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2626
                result.fail_reason)
2627
    finally:
2628
      os.unlink(fin_name)
2629

    
2630
    return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2631

    
2632

    
2633
class DevCacheManager(object):
2634
  """Simple class for managing a cache of block device information.
2635

2636
  """
2637
  _DEV_PREFIX = "/dev/"
2638
  _ROOT_DIR = constants.BDEV_CACHE_DIR
2639

    
2640
  @classmethod
2641
  def _ConvertPath(cls, dev_path):
2642
    """Converts a /dev/name path to the cache file name.
2643

2644
    This replaces slashes with underscores and strips the /dev
2645
    prefix. It then returns the full path to the cache file.
2646

2647
    @type dev_path: str
2648
    @param dev_path: the C{/dev/} path name
2649
    @rtype: str
2650
    @return: the converted path name
2651

2652
    """
2653
    if dev_path.startswith(cls._DEV_PREFIX):
2654
      dev_path = dev_path[len(cls._DEV_PREFIX):]
2655
    dev_path = dev_path.replace("/", "_")
2656
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2657
    return fpath
2658

    
2659
  @classmethod
2660
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2661
    """Updates the cache information for a given device.
2662

2663
    @type dev_path: str
2664
    @param dev_path: the pathname of the device
2665
    @type owner: str
2666
    @param owner: the owner (instance name) of the device
2667
    @type on_primary: bool
2668
    @param on_primary: whether this is the primary
2669
        node nor not
2670
    @type iv_name: str
2671
    @param iv_name: the instance-visible name of the
2672
        device, as in objects.Disk.iv_name
2673

2674
    @rtype: None
2675

2676
    """
2677
    if dev_path is None:
2678
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
2679
      return
2680
    fpath = cls._ConvertPath(dev_path)
2681
    if on_primary:
2682
      state = "primary"
2683
    else:
2684
      state = "secondary"
2685
    if iv_name is None:
2686
      iv_name = "not_visible"
2687
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2688
    try:
2689
      utils.WriteFile(fpath, data=fdata)
2690
    except EnvironmentError, err:
2691
      logging.exception("Can't update bdev cache for %s", dev_path)
2692

    
2693
  @classmethod
2694
  def RemoveCache(cls, dev_path):
2695
    """Remove data for a dev_path.
2696

2697
    This is just a wrapper over L{utils.RemoveFile} with a converted
2698
    path name and logging.
2699

2700
    @type dev_path: str
2701
    @param dev_path: the pathname of the device
2702

2703
    @rtype: None
2704

2705
    """
2706
    if dev_path is None:
2707
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
2708
      return
2709
    fpath = cls._ConvertPath(dev_path)
2710
    try:
2711
      utils.RemoveFile(fpath)
2712
    except EnvironmentError, err:
2713
      logging.exception("Can't update bdev cache for %s", dev_path)