Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 691744c4

History | View | Annotate | Download (82.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26

27
"""
28

    
29

    
30
import os
31
import os.path
32
import shutil
33
import time
34
import stat
35
import errno
36
import re
37
import subprocess
38
import random
39
import logging
40
import tempfile
41
import zlib
42
import base64
43

    
44
from ganeti import errors
45
from ganeti import utils
46
from ganeti import ssh
47
from ganeti import hypervisor
48
from ganeti import constants
49
from ganeti import bdev
50
from ganeti import objects
51
from ganeti import ssconf
52

    
53

    
54
def _GetConfig():
55
  """Simple wrapper to return a SimpleStore.
56

57
  @rtype: L{ssconf.SimpleStore}
58
  @return: a SimpleStore instance
59

60
  """
61
  return ssconf.SimpleStore()
62

    
63

    
64
def _GetSshRunner(cluster_name):
65
  """Simple wrapper to return an SshRunner.
66

67
  @type cluster_name: str
68
  @param cluster_name: the cluster name, which is needed
69
      by the SshRunner constructor
70
  @rtype: L{ssh.SshRunner}
71
  @return: an SshRunner instance
72

73
  """
74
  return ssh.SshRunner(cluster_name)
75

    
76

    
77
def _Decompress(data):
78
  """Unpacks data compressed by the RPC client.
79

80
  @type data: list or tuple
81
  @param data: Data sent by RPC client
82
  @rtype: str
83
  @return: Decompressed data
84

85
  """
86
  assert isinstance(data, (list, tuple))
87
  assert len(data) == 2
88
  (encoding, content) = data
89
  if encoding == constants.RPC_ENCODING_NONE:
90
    return content
91
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
92
    return zlib.decompress(base64.b64decode(content))
93
  else:
94
    raise AssertionError("Unknown data encoding")
95

    
96

    
97
def _CleanDirectory(path, exclude=None):
98
  """Removes all regular files in a directory.
99

100
  @type path: str
101
  @param path: the directory to clean
102
  @type exclude: list
103
  @param exclude: list of files to be excluded, defaults
104
      to the empty list
105

106
  """
107
  if not os.path.isdir(path):
108
    return
109
  if exclude is None:
110
    exclude = []
111
  else:
112
    # Normalize excluded paths
113
    exclude = [os.path.normpath(i) for i in exclude]
114

    
115
  for rel_name in utils.ListVisibleFiles(path):
116
    full_name = os.path.normpath(os.path.join(path, rel_name))
117
    if full_name in exclude:
118
      continue
119
    if os.path.isfile(full_name) and not os.path.islink(full_name):
120
      utils.RemoveFile(full_name)
121

    
122

    
123
def _BuildUploadFileList():
124
  """Build the list of allowed upload files.
125

126
  This is abstracted so that it's built only once at module import time.
127

128
  """
129
  return frozenset([
130
      constants.CLUSTER_CONF_FILE,
131
      constants.ETC_HOSTS,
132
      constants.SSH_KNOWN_HOSTS_FILE,
133
      constants.VNC_PASSWORD_FILE,
134
      ])
135

    
136

    
137
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
138

    
139

    
140
def JobQueuePurge():
141
  """Removes job queue files and archived jobs.
142

143
  @rtype: None
144

145
  """
146
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
147
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
148

    
149

    
150
def GetMasterInfo():
151
  """Returns master information.
152

153
  This is an utility function to compute master information, either
154
  for consumption here or from the node daemon.
155

156
  @rtype: tuple
157
  @return: (master_netdev, master_ip, master_name) if we have a good
158
      configuration, otherwise (None, None, None)
159

160
  """
161
  try:
162
    cfg = _GetConfig()
163
    master_netdev = cfg.GetMasterNetdev()
164
    master_ip = cfg.GetMasterIP()
165
    master_node = cfg.GetMasterNode()
166
  except errors.ConfigurationError:
167
    logging.exception("Cluster configuration incomplete")
168
    return (None, None, None)
169
  return (master_netdev, master_ip, master_node)
170

    
171

    
172
def StartMaster(start_daemons, no_voting):
173
  """Activate local node as master node.
174

175
  The function will always try activate the IP address of the master
176
  (unless someone else has it). It will also start the master daemons,
177
  based on the start_daemons parameter.
178

179
  @type start_daemons: boolean
180
  @param start_daemons: whther to also start the master
181
      daemons (ganeti-masterd and ganeti-rapi)
182
  @type no_voting: boolean
183
  @param no_voting: whether to start ganeti-masterd without a node vote
184
      (if start_daemons is True), but still non-interactively
185
  @rtype: None
186

187
  """
188
  ok = True
189
  master_netdev, master_ip, _ = GetMasterInfo()
190
  if not master_netdev:
191
    return False
192

    
193
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
194
    if utils.OwnIpAddress(master_ip):
195
      # we already have the ip:
196
      logging.debug("Already started")
197
    else:
198
      logging.error("Someone else has the master ip, not activating")
199
      ok = False
200
  else:
201
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
202
                           "dev", master_netdev, "label",
203
                           "%s:0" % master_netdev])
204
    if result.failed:
205
      logging.error("Can't activate master IP: %s", result.output)
206
      ok = False
207

    
208
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
209
                           "-s", master_ip, master_ip])
210
    # we'll ignore the exit code of arping
211

    
212
  # and now start the master and rapi daemons
213
  if start_daemons:
214
    daemons_params = {
215
        'ganeti-masterd': [],
216
        'ganeti-rapi': [],
217
        }
218
    if no_voting:
219
      daemons_params['ganeti-masterd'].append('--no-voting')
220
      daemons_params['ganeti-masterd'].append('--yes-do-it')
221
    for daemon in daemons_params:
222
      cmd = [daemon]
223
      cmd.extend(daemons_params[daemon])
224
      result = utils.RunCmd(cmd)
225
      if result.failed:
226
        logging.error("Can't start daemon %s: %s", daemon, result.output)
227
        ok = False
228
  return ok
229

    
230

    
231
def StopMaster(stop_daemons):
232
  """Deactivate this node as master.
233

234
  The function will always try to deactivate the IP address of the
235
  master. It will also stop the master daemons depending on the
236
  stop_daemons parameter.
237

238
  @type stop_daemons: boolean
239
  @param stop_daemons: whether to also stop the master daemons
240
      (ganeti-masterd and ganeti-rapi)
241
  @rtype: None
242

243
  """
244
  master_netdev, master_ip, _ = GetMasterInfo()
245
  if not master_netdev:
246
    return False
247

    
248
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
249
                         "dev", master_netdev])
250
  if result.failed:
251
    logging.error("Can't remove the master IP, error: %s", result.output)
252
    # but otherwise ignore the failure
253

    
254
  if stop_daemons:
255
    # stop/kill the rapi and the master daemon
256
    for daemon in constants.RAPI_PID, constants.MASTERD_PID:
257
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
258

    
259
  return True
260

    
261

    
262
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
263
  """Joins this node to the cluster.
264

265
  This does the following:
266
      - updates the hostkeys of the machine (rsa and dsa)
267
      - adds the ssh private key to the user
268
      - adds the ssh public key to the users' authorized_keys file
269

270
  @type dsa: str
271
  @param dsa: the DSA private key to write
272
  @type dsapub: str
273
  @param dsapub: the DSA public key to write
274
  @type rsa: str
275
  @param rsa: the RSA private key to write
276
  @type rsapub: str
277
  @param rsapub: the RSA public key to write
278
  @type sshkey: str
279
  @param sshkey: the SSH private key to write
280
  @type sshpub: str
281
  @param sshpub: the SSH public key to write
282
  @rtype: boolean
283
  @return: the success of the operation
284

285
  """
286
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
287
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
288
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
289
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
290
  for name, content, mode in sshd_keys:
291
    utils.WriteFile(name, data=content, mode=mode)
292

    
293
  try:
294
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
295
                                                    mkdir=True)
296
  except errors.OpExecError, err:
297
    msg = "Error while processing user ssh files"
298
    logging.exception(msg)
299
    return (False, "%s: %s" % (msg, err))
300

    
301
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
302
    utils.WriteFile(name, data=content, mode=0600)
303

    
304
  utils.AddAuthorizedKey(auth_keys, sshpub)
305

    
306
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
307

    
308
  return (True, "Node added successfully")
309

    
310

    
311
def LeaveCluster():
312
  """Cleans up and remove the current node.
313

314
  This function cleans up and prepares the current node to be removed
315
  from the cluster.
316

317
  If processing is successful, then it raises an
318
  L{errors.QuitGanetiException} which is used as a special case to
319
  shutdown the node daemon.
320

321
  """
322
  _CleanDirectory(constants.DATA_DIR)
323
  JobQueuePurge()
324

    
325
  try:
326
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
327
  except errors.OpExecError:
328
    logging.exception("Error while processing ssh files")
329
    return
330

    
331
  f = open(pub_key, 'r')
332
  try:
333
    utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
334
  finally:
335
    f.close()
336

    
337
  utils.RemoveFile(priv_key)
338
  utils.RemoveFile(pub_key)
339

    
340
  # Return a reassuring string to the caller, and quit
341
  raise errors.QuitGanetiException(False, 'Shutdown scheduled')
342

    
343

    
344
def GetNodeInfo(vgname, hypervisor_type):
345
  """Gives back a hash with different information about the node.
346

347
  @type vgname: C{string}
348
  @param vgname: the name of the volume group to ask for disk space information
349
  @type hypervisor_type: C{str}
350
  @param hypervisor_type: the name of the hypervisor to ask for
351
      memory information
352
  @rtype: C{dict}
353
  @return: dictionary with the following keys:
354
      - vg_size is the size of the configured volume group in MiB
355
      - vg_free is the free size of the volume group in MiB
356
      - memory_dom0 is the memory allocated for domain0 in MiB
357
      - memory_free is the currently available (free) ram in MiB
358
      - memory_total is the total number of ram in MiB
359

360
  """
361
  outputarray = {}
362
  vginfo = _GetVGInfo(vgname)
363
  outputarray['vg_size'] = vginfo['vg_size']
364
  outputarray['vg_free'] = vginfo['vg_free']
365

    
366
  hyper = hypervisor.GetHypervisor(hypervisor_type)
367
  hyp_info = hyper.GetNodeInfo()
368
  if hyp_info is not None:
369
    outputarray.update(hyp_info)
370

    
371
  f = open("/proc/sys/kernel/random/boot_id", 'r')
372
  try:
373
    outputarray["bootid"] = f.read(128).rstrip("\n")
374
  finally:
375
    f.close()
376

    
377
  return outputarray
378

    
379

    
380
def VerifyNode(what, cluster_name):
381
  """Verify the status of the local node.
382

383
  Based on the input L{what} parameter, various checks are done on the
384
  local node.
385

386
  If the I{filelist} key is present, this list of
387
  files is checksummed and the file/checksum pairs are returned.
388

389
  If the I{nodelist} key is present, we check that we have
390
  connectivity via ssh with the target nodes (and check the hostname
391
  report).
392

393
  If the I{node-net-test} key is present, we check that we have
394
  connectivity to the given nodes via both primary IP and, if
395
  applicable, secondary IPs.
396

397
  @type what: C{dict}
398
  @param what: a dictionary of things to check:
399
      - filelist: list of files for which to compute checksums
400
      - nodelist: list of nodes we should check ssh communication with
401
      - node-net-test: list of nodes we should check node daemon port
402
        connectivity with
403
      - hypervisor: list with hypervisors to run the verify for
404
  @rtype: dict
405
  @return: a dictionary with the same keys as the input dict, and
406
      values representing the result of the checks
407

408
  """
409
  result = {}
410

    
411
  if constants.NV_HYPERVISOR in what:
412
    result[constants.NV_HYPERVISOR] = tmp = {}
413
    for hv_name in what[constants.NV_HYPERVISOR]:
414
      tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
415

    
416
  if constants.NV_FILELIST in what:
417
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
418
      what[constants.NV_FILELIST])
419

    
420
  if constants.NV_NODELIST in what:
421
    result[constants.NV_NODELIST] = tmp = {}
422
    random.shuffle(what[constants.NV_NODELIST])
423
    for node in what[constants.NV_NODELIST]:
424
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
425
      if not success:
426
        tmp[node] = message
427

    
428
  if constants.NV_NODENETTEST in what:
429
    result[constants.NV_NODENETTEST] = tmp = {}
430
    my_name = utils.HostInfo().name
431
    my_pip = my_sip = None
432
    for name, pip, sip in what[constants.NV_NODENETTEST]:
433
      if name == my_name:
434
        my_pip = pip
435
        my_sip = sip
436
        break
437
    if not my_pip:
438
      tmp[my_name] = ("Can't find my own primary/secondary IP"
439
                      " in the node list")
440
    else:
441
      port = utils.GetNodeDaemonPort()
442
      for name, pip, sip in what[constants.NV_NODENETTEST]:
443
        fail = []
444
        if not utils.TcpPing(pip, port, source=my_pip):
445
          fail.append("primary")
446
        if sip != pip:
447
          if not utils.TcpPing(sip, port, source=my_sip):
448
            fail.append("secondary")
449
        if fail:
450
          tmp[name] = ("failure using the %s interface(s)" %
451
                       " and ".join(fail))
452

    
453
  if constants.NV_LVLIST in what:
454
    result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
455

    
456
  if constants.NV_INSTANCELIST in what:
457
    result[constants.NV_INSTANCELIST] = GetInstanceList(
458
      what[constants.NV_INSTANCELIST])
459

    
460
  if constants.NV_VGLIST in what:
461
    result[constants.NV_VGLIST] = ListVolumeGroups()
462

    
463
  if constants.NV_VERSION in what:
464
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
465
                                    constants.RELEASE_VERSION)
466

    
467
  if constants.NV_HVINFO in what:
468
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
469
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
470

    
471
  if constants.NV_DRBDLIST in what:
472
    try:
473
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
474
    except errors.BlockDeviceError, err:
475
      logging.warning("Can't get used minors list", exc_info=True)
476
      used_minors = str(err)
477
    result[constants.NV_DRBDLIST] = used_minors
478

    
479
  return result
480

    
481

    
482
def GetVolumeList(vg_name):
483
  """Compute list of logical volumes and their size.
484

485
  @type vg_name: str
486
  @param vg_name: the volume group whose LVs we should list
487
  @rtype: dict
488
  @return:
489
      dictionary of all partions (key) with value being a tuple of
490
      their size (in MiB), inactive and online status::
491

492
        {'test1': ('20.06', True, True)}
493

494
      in case of errors, a string is returned with the error
495
      details.
496

497
  """
498
  lvs = {}
499
  sep = '|'
500
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
501
                         "--separator=%s" % sep,
502
                         "-olv_name,lv_size,lv_attr", vg_name])
503
  if result.failed:
504
    logging.error("Failed to list logical volumes, lvs output: %s",
505
                  result.output)
506
    return result.output
507

    
508
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
509
  for line in result.stdout.splitlines():
510
    line = line.strip()
511
    match = valid_line_re.match(line)
512
    if not match:
513
      logging.error("Invalid line returned from lvs output: '%s'", line)
514
      continue
515
    name, size, attr = match.groups()
516
    inactive = attr[4] == '-'
517
    online = attr[5] == 'o'
518
    lvs[name] = (size, inactive, online)
519

    
520
  return lvs
521

    
522

    
523
def ListVolumeGroups():
524
  """List the volume groups and their size.
525

526
  @rtype: dict
527
  @return: dictionary with keys volume name and values the
528
      size of the volume
529

530
  """
531
  return utils.ListVolumeGroups()
532

    
533

    
534
def NodeVolumes():
535
  """List all volumes on this node.
536

537
  @rtype: list
538
  @return:
539
    A list of dictionaries, each having four keys:
540
      - name: the logical volume name,
541
      - size: the size of the logical volume
542
      - dev: the physical device on which the LV lives
543
      - vg: the volume group to which it belongs
544

545
    In case of errors, we return an empty list and log the
546
    error.
547

548
    Note that since a logical volume can live on multiple physical
549
    volumes, the resulting list might include a logical volume
550
    multiple times.
551

552
  """
553
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
554
                         "--separator=|",
555
                         "--options=lv_name,lv_size,devices,vg_name"])
556
  if result.failed:
557
    logging.error("Failed to list logical volumes, lvs output: %s",
558
                  result.output)
559
    return []
560

    
561
  def parse_dev(dev):
562
    if '(' in dev:
563
      return dev.split('(')[0]
564
    else:
565
      return dev
566

    
567
  def map_line(line):
568
    return {
569
      'name': line[0].strip(),
570
      'size': line[1].strip(),
571
      'dev': parse_dev(line[2].strip()),
572
      'vg': line[3].strip(),
573
    }
574

    
575
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
576
          if line.count('|') >= 3]
577

    
578

    
579
def BridgesExist(bridges_list):
580
  """Check if a list of bridges exist on the current node.
581

582
  @rtype: boolean
583
  @return: C{True} if all of them exist, C{False} otherwise
584

585
  """
586
  for bridge in bridges_list:
587
    if not utils.BridgeExists(bridge):
588
      return False
589

    
590
  return True
591

    
592

    
593
def GetInstanceList(hypervisor_list):
594
  """Provides a list of instances.
595

596
  @type hypervisor_list: list
597
  @param hypervisor_list: the list of hypervisors to query information
598

599
  @rtype: list
600
  @return: a list of all running instances on the current node
601
    - instance1.example.com
602
    - instance2.example.com
603

604
  """
605
  results = []
606
  for hname in hypervisor_list:
607
    try:
608
      names = hypervisor.GetHypervisor(hname).ListInstances()
609
      results.extend(names)
610
    except errors.HypervisorError:
611
      logging.exception("Error enumerating instances for hypevisor %s", hname)
612
      raise
613

    
614
  return results
615

    
616

    
617
def GetInstanceInfo(instance, hname):
618
  """Gives back the information about an instance as a dictionary.
619

620
  @type instance: string
621
  @param instance: the instance name
622
  @type hname: string
623
  @param hname: the hypervisor type of the instance
624

625
  @rtype: dict
626
  @return: dictionary with the following keys:
627
      - memory: memory size of instance (int)
628
      - state: xen state of instance (string)
629
      - time: cpu time of instance (float)
630

631
  """
632
  output = {}
633

    
634
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
635
  if iinfo is not None:
636
    output['memory'] = iinfo[2]
637
    output['state'] = iinfo[4]
638
    output['time'] = iinfo[5]
639

    
640
  return output
641

    
642

    
643
def GetInstanceMigratable(instance):
644
  """Gives whether an instance can be migrated.
645

646
  @type instance: L{objects.Instance}
647
  @param instance: object representing the instance to be checked.
648

649
  @rtype: tuple
650
  @return: tuple of (result, description) where:
651
      - result: whether the instance can be migrated or not
652
      - description: a description of the issue, if relevant
653

654
  """
655
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
656
  if instance.name not in hyper.ListInstances():
657
    return (False, 'not running')
658

    
659
  for idx in range(len(instance.disks)):
660
    link_name = _GetBlockDevSymlinkPath(instance.name, idx)
661
    if not os.path.islink(link_name):
662
      return (False, 'not restarted since ganeti 1.2.5')
663

    
664
  return (True, '')
665

    
666

    
667
def GetAllInstancesInfo(hypervisor_list):
668
  """Gather data about all instances.
669

670
  This is the equivalent of L{GetInstanceInfo}, except that it
671
  computes data for all instances at once, thus being faster if one
672
  needs data about more than one instance.
673

674
  @type hypervisor_list: list
675
  @param hypervisor_list: list of hypervisors to query for instance data
676

677
  @rtype: dict
678
  @return: dictionary of instance: data, with data having the following keys:
679
      - memory: memory size of instance (int)
680
      - state: xen state of instance (string)
681
      - time: cpu time of instance (float)
682
      - vcpus: the number of vcpus
683

684
  """
685
  output = {}
686

    
687
  for hname in hypervisor_list:
688
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
689
    if iinfo:
690
      for name, inst_id, memory, vcpus, state, times in iinfo:
691
        value = {
692
          'memory': memory,
693
          'vcpus': vcpus,
694
          'state': state,
695
          'time': times,
696
          }
697
        if name in output:
698
          # we only check static parameters, like memory and vcpus,
699
          # and not state and time which can change between the
700
          # invocations of the different hypervisors
701
          for key in 'memory', 'vcpus':
702
            if value[key] != output[name][key]:
703
              raise errors.HypervisorError("Instance %s is running twice"
704
                                           " with different parameters" % name)
705
        output[name] = value
706

    
707
  return output
708

    
709

    
710
def InstanceOsAdd(instance):
711
  """Add an OS to an instance.
712

713
  @type instance: L{objects.Instance}
714
  @param instance: Instance whose OS is to be installed
715
  @rtype: boolean
716
  @return: the success of the operation
717

718
  """
719
  try:
720
    inst_os = OSFromDisk(instance.os)
721
  except errors.InvalidOS, err:
722
    os_name, os_dir, os_err = err.args
723
    if os_dir is None:
724
      return (False, "Can't find OS '%s': %s" % (os_name, os_err))
725
    else:
726
      return (False, "Error parsing OS '%s' in directory %s: %s" %
727
              (os_name, os_dir, os_err))
728

    
729
  create_env = OSEnvironment(instance)
730

    
731
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
732
                                     instance.name, int(time.time()))
733

    
734
  result = utils.RunCmd([inst_os.create_script], env=create_env,
735
                        cwd=inst_os.path, output=logfile,)
736
  if result.failed:
737
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
738
                  " output: %s", result.cmd, result.fail_reason, logfile,
739
                  result.output)
740
    lines = [utils.SafeEncode(val)
741
             for val in utils.TailFile(logfile, lines=20)]
742
    return (False, "OS create script failed (%s), last lines in the"
743
            " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
744

    
745
  return (True, "Successfully installed")
746

    
747

    
748
def RunRenameInstance(instance, old_name):
749
  """Run the OS rename script for an instance.
750

751
  @type instance: L{objects.Instance}
752
  @param instance: Instance whose OS is to be installed
753
  @type old_name: string
754
  @param old_name: previous instance name
755
  @rtype: boolean
756
  @return: the success of the operation
757

758
  """
759
  inst_os = OSFromDisk(instance.os)
760

    
761
  rename_env = OSEnvironment(instance)
762
  rename_env['OLD_INSTANCE_NAME'] = old_name
763

    
764
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
765
                                           old_name,
766
                                           instance.name, int(time.time()))
767

    
768
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
769
                        cwd=inst_os.path, output=logfile)
770

    
771
  if result.failed:
772
    logging.error("os create command '%s' returned error: %s output: %s",
773
                  result.cmd, result.fail_reason, result.output)
774
    lines = [utils.SafeEncode(val)
775
             for val in utils.TailFile(logfile, lines=20)]
776
    return (False, "OS rename script failed (%s), last lines in the"
777
            " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
778

    
779
  return (True, "Rename successful")
780

    
781

    
782
def _GetVGInfo(vg_name):
783
  """Get information about the volume group.
784

785
  @type vg_name: str
786
  @param vg_name: the volume group which we query
787
  @rtype: dict
788
  @return:
789
    A dictionary with the following keys:
790
      - C{vg_size} is the total size of the volume group in MiB
791
      - C{vg_free} is the free size of the volume group in MiB
792
      - C{pv_count} are the number of physical disks in that VG
793

794
    If an error occurs during gathering of data, we return the same dict
795
    with keys all set to None.
796

797
  """
798
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
799

    
800
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
801
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
802

    
803
  if retval.failed:
804
    logging.error("volume group %s not present", vg_name)
805
    return retdic
806
  valarr = retval.stdout.strip().rstrip(':').split(':')
807
  if len(valarr) == 3:
808
    try:
809
      retdic = {
810
        "vg_size": int(round(float(valarr[0]), 0)),
811
        "vg_free": int(round(float(valarr[1]), 0)),
812
        "pv_count": int(valarr[2]),
813
        }
814
    except (TypeError, ValueError), err:
815
      logging.exception("Fail to parse vgs output")
816
  else:
817
    logging.error("vgs output has the wrong number of fields (expected"
818
                  " three): %s", str(valarr))
819
  return retdic
820

    
821

    
822
def _GetBlockDevSymlinkPath(instance_name, idx):
823
  return os.path.join(constants.DISK_LINKS_DIR,
824
                      "%s:%d" % (instance_name, idx))
825

    
826

    
827
def _SymlinkBlockDev(instance_name, device_path, idx):
828
  """Set up symlinks to a instance's block device.
829

830
  This is an auxiliary function run when an instance is start (on the primary
831
  node) or when an instance is migrated (on the target node).
832

833

834
  @param instance_name: the name of the target instance
835
  @param device_path: path of the physical block device, on the node
836
  @param idx: the disk index
837
  @return: absolute path to the disk's symlink
838

839
  """
840
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
841
  try:
842
    os.symlink(device_path, link_name)
843
  except OSError, err:
844
    if err.errno == errno.EEXIST:
845
      if (not os.path.islink(link_name) or
846
          os.readlink(link_name) != device_path):
847
        os.remove(link_name)
848
        os.symlink(device_path, link_name)
849
    else:
850
      raise
851

    
852
  return link_name
853

    
854

    
855
def _RemoveBlockDevLinks(instance_name, disks):
856
  """Remove the block device symlinks belonging to the given instance.
857

858
  """
859
  for idx, disk in enumerate(disks):
860
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
861
    if os.path.islink(link_name):
862
      try:
863
        os.remove(link_name)
864
      except OSError:
865
        logging.exception("Can't remove symlink '%s'", link_name)
866

    
867

    
868
def _GatherAndLinkBlockDevs(instance):
869
  """Set up an instance's block device(s).
870

871
  This is run on the primary node at instance startup. The block
872
  devices must be already assembled.
873

874
  @type instance: L{objects.Instance}
875
  @param instance: the instance whose disks we shoul assemble
876
  @rtype: list
877
  @return: list of (disk_object, device_path)
878

879
  """
880
  block_devices = []
881
  for idx, disk in enumerate(instance.disks):
882
    device = _RecursiveFindBD(disk)
883
    if device is None:
884
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
885
                                    str(disk))
886
    device.Open()
887
    try:
888
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
889
    except OSError, e:
890
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
891
                                    e.strerror)
892

    
893
    block_devices.append((disk, link_name))
894

    
895
  return block_devices
896

    
897

    
898
def StartInstance(instance):
899
  """Start an instance.
900

901
  @type instance: L{objects.Instance}
902
  @param instance: the instance object
903
  @rtype: boolean
904
  @return: whether the startup was successful or not
905

906
  """
907
  running_instances = GetInstanceList([instance.hypervisor])
908

    
909
  if instance.name in running_instances:
910
    return (True, "Already running")
911

    
912
  try:
913
    block_devices = _GatherAndLinkBlockDevs(instance)
914
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
915
    hyper.StartInstance(instance, block_devices)
916
  except errors.BlockDeviceError, err:
917
    logging.exception("Failed to start instance")
918
    return (False, "Block device error: %s" % str(err))
919
  except errors.HypervisorError, err:
920
    logging.exception("Failed to start instance")
921
    _RemoveBlockDevLinks(instance.name, instance.disks)
922
    return (False, "Hypervisor error: %s" % str(err))
923

    
924
  return (True, "Instance started successfully")
925

    
926

    
927
def InstanceShutdown(instance):
928
  """Shut an instance down.
929

930
  @note: this functions uses polling with a hardcoded timeout.
931

932
  @type instance: L{objects.Instance}
933
  @param instance: the instance object
934
  @rtype: boolean
935
  @return: whether the startup was successful or not
936

937
  """
938
  hv_name = instance.hypervisor
939
  running_instances = GetInstanceList([hv_name])
940

    
941
  if instance.name not in running_instances:
942
    return (True, "Instance already stopped")
943

    
944
  hyper = hypervisor.GetHypervisor(hv_name)
945
  try:
946
    hyper.StopInstance(instance)
947
  except errors.HypervisorError, err:
948
    msg = "Failed to stop instance %s: %s" % (instance.name, err)
949
    logging.error(msg)
950
    return (False, msg)
951

    
952
  # test every 10secs for 2min
953

    
954
  time.sleep(1)
955
  for _ in range(11):
956
    if instance.name not in GetInstanceList([hv_name]):
957
      break
958
    time.sleep(10)
959
  else:
960
    # the shutdown did not succeed
961
    logging.error("Shutdown of '%s' unsuccessful, using destroy",
962
                  instance.name)
963

    
964
    try:
965
      hyper.StopInstance(instance, force=True)
966
    except errors.HypervisorError, err:
967
      msg = "Failed to force stop instance %s: %s" % (instance.name, err)
968
      logging.error(msg)
969
      return (False, msg)
970

    
971
    time.sleep(1)
972
    if instance.name in GetInstanceList([hv_name]):
973
      msg = ("Could not shutdown instance %s even by destroy" %
974
             instance.name)
975
      logging.error(msg)
976
      return (False, msg)
977

    
978
  _RemoveBlockDevLinks(instance.name, instance.disks)
979

    
980
  return (True, "Instance has been shutdown successfully")
981

    
982

    
983
def InstanceReboot(instance, reboot_type):
984
  """Reboot an instance.
985

986
  @type instance: L{objects.Instance}
987
  @param instance: the instance object to reboot
988
  @type reboot_type: str
989
  @param reboot_type: the type of reboot, one the following
990
    constants:
991
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
992
        instance OS, do not recreate the VM
993
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
994
        restart the VM (at the hypervisor level)
995
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
996
        not accepted here, since that mode is handled differently, in
997
        cmdlib, and translates into full stop and start of the
998
        instance (instead of a call_instance_reboot RPC)
999
  @rtype: boolean
1000
  @return: the success of the operation
1001

1002
  """
1003
  running_instances = GetInstanceList([instance.hypervisor])
1004

    
1005
  if instance.name not in running_instances:
1006
    msg = "Cannot reboot instance %s that is not running" % instance.name
1007
    logging.error(msg)
1008
    return (False, msg)
1009

    
1010
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1011
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1012
    try:
1013
      hyper.RebootInstance(instance)
1014
    except errors.HypervisorError, err:
1015
      msg = "Failed to soft reboot instance %s: %s" % (instance.name, err)
1016
      logging.error(msg)
1017
      return (False, msg)
1018
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1019
    try:
1020
      stop_result = InstanceShutdown(instance)
1021
      if not stop_result[0]:
1022
        return stop_result
1023
      return StartInstance(instance)
1024
    except errors.HypervisorError, err:
1025
      msg = "Failed to hard reboot instance %s: %s" % (instance.name, err)
1026
      logging.error(msg)
1027
      return (False, msg)
1028
  else:
1029
    return (False, "Invalid reboot_type received: %s" % (reboot_type,))
1030

    
1031
  return (True, "Reboot successful")
1032

    
1033

    
1034
def MigrationInfo(instance):
1035
  """Gather information about an instance to be migrated.
1036

1037
  @type instance: L{objects.Instance}
1038
  @param instance: the instance definition
1039

1040
  """
1041
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1042
  try:
1043
    info = hyper.MigrationInfo(instance)
1044
  except errors.HypervisorError, err:
1045
    msg = "Failed to fetch migration information"
1046
    logging.exception(msg)
1047
    return (False, '%s: %s' % (msg, err))
1048
  return (True, info)
1049

    
1050

    
1051
def AcceptInstance(instance, info, target):
1052
  """Prepare the node to accept an instance.
1053

1054
  @type instance: L{objects.Instance}
1055
  @param instance: the instance definition
1056
  @type info: string/data (opaque)
1057
  @param info: migration information, from the source node
1058
  @type target: string
1059
  @param target: target host (usually ip), on this node
1060

1061
  """
1062
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1063
  try:
1064
    hyper.AcceptInstance(instance, info, target)
1065
  except errors.HypervisorError, err:
1066
    msg = "Failed to accept instance"
1067
    logging.exception(msg)
1068
    return (False, '%s: %s' % (msg, err))
1069
  return (True, "Accept successful")
1070

    
1071

    
1072
def FinalizeMigration(instance, info, success):
1073
  """Finalize any preparation to accept an instance.
1074

1075
  @type instance: L{objects.Instance}
1076
  @param instance: the instance definition
1077
  @type info: string/data (opaque)
1078
  @param info: migration information, from the source node
1079
  @type success: boolean
1080
  @param success: whether the migration was a success or a failure
1081

1082
  """
1083
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1084
  try:
1085
    hyper.FinalizeMigration(instance, info, success)
1086
  except errors.HypervisorError, err:
1087
    msg = "Failed to finalize migration"
1088
    logging.exception(msg)
1089
    return (False, '%s: %s' % (msg, err))
1090
  return (True, "Migration Finalized")
1091

    
1092

    
1093
def MigrateInstance(instance, target, live):
1094
  """Migrates an instance to another node.
1095

1096
  @type instance: L{objects.Instance}
1097
  @param instance: the instance definition
1098
  @type target: string
1099
  @param target: the target node name
1100
  @type live: boolean
1101
  @param live: whether the migration should be done live or not (the
1102
      interpretation of this parameter is left to the hypervisor)
1103
  @rtype: tuple
1104
  @return: a tuple of (success, msg) where:
1105
      - succes is a boolean denoting the success/failure of the operation
1106
      - msg is a string with details in case of failure
1107

1108
  """
1109
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1110

    
1111
  try:
1112
    hyper.MigrateInstance(instance.name, target, live)
1113
  except errors.HypervisorError, err:
1114
    msg = "Failed to migrate instance"
1115
    logging.exception(msg)
1116
    return (False, "%s: %s" % (msg, err))
1117
  return (True, "Migration successful")
1118

    
1119

    
1120
def BlockdevCreate(disk, size, owner, on_primary, info):
1121
  """Creates a block device for an instance.
1122

1123
  @type disk: L{objects.Disk}
1124
  @param disk: the object describing the disk we should create
1125
  @type size: int
1126
  @param size: the size of the physical underlying device, in MiB
1127
  @type owner: str
1128
  @param owner: the name of the instance for which disk is created,
1129
      used for device cache data
1130
  @type on_primary: boolean
1131
  @param on_primary:  indicates if it is the primary node or not
1132
  @type info: string
1133
  @param info: string that will be sent to the physical device
1134
      creation, used for example to set (LVM) tags on LVs
1135

1136
  @return: the new unique_id of the device (this can sometime be
1137
      computed only after creation), or None. On secondary nodes,
1138
      it's not required to return anything.
1139

1140
  """
1141
  clist = []
1142
  if disk.children:
1143
    for child in disk.children:
1144
      try:
1145
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1146
      except errors.BlockDeviceError, err:
1147
        errmsg = "Can't assemble device %s: %s" % (child, err)
1148
        logging.error(errmsg)
1149
        return False, errmsg
1150
      if on_primary or disk.AssembleOnSecondary():
1151
        # we need the children open in case the device itself has to
1152
        # be assembled
1153
        try:
1154
          # pylint: disable-msg=E1103
1155
          crdev.Open()
1156
        except errors.BlockDeviceError, err:
1157
          errmsg = "Can't make child '%s' read-write: %s" % (child, err)
1158
          logging.error(errmsg)
1159
          return False, errmsg
1160
      clist.append(crdev)
1161

    
1162
  try:
1163
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1164
  except errors.BlockDeviceError, err:
1165
    return False, "Can't create block device: %s" % str(err)
1166

    
1167
  if on_primary or disk.AssembleOnSecondary():
1168
    try:
1169
      device.Assemble()
1170
    except errors.BlockDeviceError, err:
1171
      errmsg = ("Can't assemble device after creation, very"
1172
                " unusual event: %s" % str(err))
1173
      logging.error(errmsg)
1174
      return False, errmsg
1175
    device.SetSyncSpeed(constants.SYNC_SPEED)
1176
    if on_primary or disk.OpenOnSecondary():
1177
      try:
1178
        device.Open(force=True)
1179
      except errors.BlockDeviceError, err:
1180
        errmsg = ("Can't make device r/w after creation, very"
1181
                  " unusual event: %s" % str(err))
1182
        logging.error(errmsg)
1183
        return False, errmsg
1184
    DevCacheManager.UpdateCache(device.dev_path, owner,
1185
                                on_primary, disk.iv_name)
1186

    
1187
  device.SetInfo(info)
1188

    
1189
  physical_id = device.unique_id
1190
  return True, physical_id
1191

    
1192

    
1193
def BlockdevRemove(disk):
1194
  """Remove a block device.
1195

1196
  @note: This is intended to be called recursively.
1197

1198
  @type disk: L{objects.Disk}
1199
  @param disk: the disk object we should remove
1200
  @rtype: boolean
1201
  @return: the success of the operation
1202

1203
  """
1204
  msgs = []
1205
  result = True
1206
  try:
1207
    rdev = _RecursiveFindBD(disk)
1208
  except errors.BlockDeviceError, err:
1209
    # probably can't attach
1210
    logging.info("Can't attach to device %s in remove", disk)
1211
    rdev = None
1212
  if rdev is not None:
1213
    r_path = rdev.dev_path
1214
    try:
1215
      rdev.Remove()
1216
    except errors.BlockDeviceError, err:
1217
      msgs.append(str(err))
1218
      result = False
1219
    if result:
1220
      DevCacheManager.RemoveCache(r_path)
1221

    
1222
  if disk.children:
1223
    for child in disk.children:
1224
      c_status, c_msg = BlockdevRemove(child)
1225
      result = result and c_status
1226
      if c_msg: # not an empty message
1227
        msgs.append(c_msg)
1228

    
1229
  return (result, "; ".join(msgs))
1230

    
1231

    
1232
def _RecursiveAssembleBD(disk, owner, as_primary):
1233
  """Activate a block device for an instance.
1234

1235
  This is run on the primary and secondary nodes for an instance.
1236

1237
  @note: this function is called recursively.
1238

1239
  @type disk: L{objects.Disk}
1240
  @param disk: the disk we try to assemble
1241
  @type owner: str
1242
  @param owner: the name of the instance which owns the disk
1243
  @type as_primary: boolean
1244
  @param as_primary: if we should make the block device
1245
      read/write
1246

1247
  @return: the assembled device or None (in case no device
1248
      was assembled)
1249
  @raise errors.BlockDeviceError: in case there is an error
1250
      during the activation of the children or the device
1251
      itself
1252

1253
  """
1254
  children = []
1255
  if disk.children:
1256
    mcn = disk.ChildrenNeeded()
1257
    if mcn == -1:
1258
      mcn = 0 # max number of Nones allowed
1259
    else:
1260
      mcn = len(disk.children) - mcn # max number of Nones
1261
    for chld_disk in disk.children:
1262
      try:
1263
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1264
      except errors.BlockDeviceError, err:
1265
        if children.count(None) >= mcn:
1266
          raise
1267
        cdev = None
1268
        logging.error("Error in child activation (but continuing): %s",
1269
                      str(err))
1270
      children.append(cdev)
1271

    
1272
  if as_primary or disk.AssembleOnSecondary():
1273
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1274
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1275
    result = r_dev
1276
    if as_primary or disk.OpenOnSecondary():
1277
      r_dev.Open()
1278
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1279
                                as_primary, disk.iv_name)
1280

    
1281
  else:
1282
    result = True
1283
  return result
1284

    
1285

    
1286
def BlockdevAssemble(disk, owner, as_primary):
1287
  """Activate a block device for an instance.
1288

1289
  This is a wrapper over _RecursiveAssembleBD.
1290

1291
  @rtype: str or boolean
1292
  @return: a C{/dev/...} path for primary nodes, and
1293
      C{True} for secondary nodes
1294

1295
  """
1296
  status = True
1297
  result = "no error information"
1298
  try:
1299
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1300
    if isinstance(result, bdev.BlockDev):
1301
      # pylint: disable-msg=E1103
1302
      result = result.dev_path
1303
  except errors.BlockDeviceError, err:
1304
    result = "Error while assembling disk: %s" % str(err)
1305
    status = False
1306
  return (status, result)
1307

    
1308

    
1309
def BlockdevShutdown(disk):
1310
  """Shut down a block device.
1311

1312
  First, if the device is assembled (Attach() is successful), then
1313
  the device is shutdown. Then the children of the device are
1314
  shutdown.
1315

1316
  This function is called recursively. Note that we don't cache the
1317
  children or such, as oppossed to assemble, shutdown of different
1318
  devices doesn't require that the upper device was active.
1319

1320
  @type disk: L{objects.Disk}
1321
  @param disk: the description of the disk we should
1322
      shutdown
1323
  @rtype: boolean
1324
  @return: the success of the operation
1325

1326
  """
1327
  msgs = []
1328
  result = True
1329
  r_dev = _RecursiveFindBD(disk)
1330
  if r_dev is not None:
1331
    r_path = r_dev.dev_path
1332
    try:
1333
      r_dev.Shutdown()
1334
      DevCacheManager.RemoveCache(r_path)
1335
    except errors.BlockDeviceError, err:
1336
      msgs.append(str(err))
1337
      result = False
1338

    
1339
  if disk.children:
1340
    for child in disk.children:
1341
      c_status, c_msg = BlockdevShutdown(child)
1342
      result = result and c_status
1343
      if c_msg: # not an empty message
1344
        msgs.append(c_msg)
1345

    
1346
  return (result, "; ".join(msgs))
1347

    
1348

    
1349
def BlockdevAddchildren(parent_cdev, new_cdevs):
1350
  """Extend a mirrored block device.
1351

1352
  @type parent_cdev: L{objects.Disk}
1353
  @param parent_cdev: the disk to which we should add children
1354
  @type new_cdevs: list of L{objects.Disk}
1355
  @param new_cdevs: the list of children which we should add
1356
  @rtype: boolean
1357
  @return: the success of the operation
1358

1359
  """
1360
  parent_bdev = _RecursiveFindBD(parent_cdev)
1361
  if parent_bdev is None:
1362
    logging.error("Can't find parent device")
1363
    return False
1364
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1365
  if new_bdevs.count(None) > 0:
1366
    logging.error("Can't find new device(s) to add: %s:%s",
1367
                  new_bdevs, new_cdevs)
1368
    return False
1369
  parent_bdev.AddChildren(new_bdevs)
1370
  return True
1371

    
1372

    
1373
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1374
  """Shrink a mirrored block device.
1375

1376
  @type parent_cdev: L{objects.Disk}
1377
  @param parent_cdev: the disk from which we should remove children
1378
  @type new_cdevs: list of L{objects.Disk}
1379
  @param new_cdevs: the list of children which we should remove
1380
  @rtype: boolean
1381
  @return: the success of the operation
1382

1383
  """
1384
  parent_bdev = _RecursiveFindBD(parent_cdev)
1385
  if parent_bdev is None:
1386
    logging.error("Can't find parent in remove children: %s", parent_cdev)
1387
    return False
1388
  devs = []
1389
  for disk in new_cdevs:
1390
    rpath = disk.StaticDevPath()
1391
    if rpath is None:
1392
      bd = _RecursiveFindBD(disk)
1393
      if bd is None:
1394
        logging.error("Can't find dynamic device %s while removing children",
1395
                      disk)
1396
        return False
1397
      else:
1398
        devs.append(bd.dev_path)
1399
    else:
1400
      devs.append(rpath)
1401
  parent_bdev.RemoveChildren(devs)
1402
  return True
1403

    
1404

    
1405
def BlockdevGetmirrorstatus(disks):
1406
  """Get the mirroring status of a list of devices.
1407

1408
  @type disks: list of L{objects.Disk}
1409
  @param disks: the list of disks which we should query
1410
  @rtype: disk
1411
  @return:
1412
      a list of (mirror_done, estimated_time) tuples, which
1413
      are the result of L{bdev.BlockDev.CombinedSyncStatus}
1414
  @raise errors.BlockDeviceError: if any of the disks cannot be
1415
      found
1416

1417
  """
1418
  stats = []
1419
  for dsk in disks:
1420
    rbd = _RecursiveFindBD(dsk)
1421
    if rbd is None:
1422
      raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1423
    stats.append(rbd.CombinedSyncStatus())
1424
  return stats
1425

    
1426

    
1427
def _RecursiveFindBD(disk):
1428
  """Check if a device is activated.
1429

1430
  If so, return information about the real device.
1431

1432
  @type disk: L{objects.Disk}
1433
  @param disk: the disk object we need to find
1434

1435
  @return: None if the device can't be found,
1436
      otherwise the device instance
1437

1438
  """
1439
  children = []
1440
  if disk.children:
1441
    for chdisk in disk.children:
1442
      children.append(_RecursiveFindBD(chdisk))
1443

    
1444
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1445

    
1446

    
1447
def BlockdevFind(disk):
1448
  """Check if a device is activated.
1449

1450
  If it is, return information about the real device.
1451

1452
  @type disk: L{objects.Disk}
1453
  @param disk: the disk to find
1454
  @rtype: None or tuple
1455
  @return: None if the disk cannot be found, otherwise a
1456
      tuple (device_path, major, minor, sync_percent,
1457
      estimated_time, is_degraded)
1458

1459
  """
1460
  try:
1461
    rbd = _RecursiveFindBD(disk)
1462
  except errors.BlockDeviceError, err:
1463
    return (False, str(err))
1464
  if rbd is None:
1465
    return (True, None)
1466
  return (True, (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus())
1467

    
1468

    
1469
def BlockdevGetsize(disks):
1470
  """Computes the size of the given disks.
1471

1472
  If a disk is not found, returns None instead.
1473

1474
  @type disks: list of L{objects.Disk}
1475
  @param disks: the list of disk to compute the size for
1476
  @rtype: list
1477
  @return: list with elements None if the disk cannot be found,
1478
      otherwise the size
1479

1480
  """
1481
  result = []
1482
  for cf in disks:
1483
    try:
1484
      rbd = _RecursiveFindBD(cf)
1485
    except errors.BlockDeviceError, err:
1486
      result.append(None)
1487
      continue
1488
    if rbd is None:
1489
      result.append(None)
1490
    else:
1491
      result.append(rbd.GetActualSize())
1492
  return result
1493

    
1494

    
1495
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1496
  """Write a file to the filesystem.
1497

1498
  This allows the master to overwrite(!) a file. It will only perform
1499
  the operation if the file belongs to a list of configuration files.
1500

1501
  @type file_name: str
1502
  @param file_name: the target file name
1503
  @type data: str
1504
  @param data: the new contents of the file
1505
  @type mode: int
1506
  @param mode: the mode to give the file (can be None)
1507
  @type uid: int
1508
  @param uid: the owner of the file (can be -1 for default)
1509
  @type gid: int
1510
  @param gid: the group of the file (can be -1 for default)
1511
  @type atime: float
1512
  @param atime: the atime to set on the file (can be None)
1513
  @type mtime: float
1514
  @param mtime: the mtime to set on the file (can be None)
1515
  @rtype: boolean
1516
  @return: the success of the operation; errors are logged
1517
      in the node daemon log
1518

1519
  """
1520
  if not os.path.isabs(file_name):
1521
    logging.error("Filename passed to UploadFile is not absolute: '%s'",
1522
                  file_name)
1523
    return False
1524

    
1525
  if file_name not in _ALLOWED_UPLOAD_FILES:
1526
    logging.error("Filename passed to UploadFile not in allowed"
1527
                 " upload targets: '%s'", file_name)
1528
    return False
1529

    
1530
  raw_data = _Decompress(data)
1531

    
1532
  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1533
                  atime=atime, mtime=mtime)
1534
  return True
1535

    
1536

    
1537
def WriteSsconfFiles(values):
1538
  """Update all ssconf files.
1539

1540
  Wrapper around the SimpleStore.WriteFiles.
1541

1542
  """
1543
  ssconf.SimpleStore().WriteFiles(values)
1544

    
1545

    
1546
def _ErrnoOrStr(err):
1547
  """Format an EnvironmentError exception.
1548

1549
  If the L{err} argument has an errno attribute, it will be looked up
1550
  and converted into a textual C{E...} description. Otherwise the
1551
  string representation of the error will be returned.
1552

1553
  @type err: L{EnvironmentError}
1554
  @param err: the exception to format
1555

1556
  """
1557
  if hasattr(err, 'errno'):
1558
    detail = errno.errorcode[err.errno]
1559
  else:
1560
    detail = str(err)
1561
  return detail
1562

    
1563

    
1564
def _OSOndiskVersion(name, os_dir):
1565
  """Compute and return the API version of a given OS.
1566

1567
  This function will try to read the API version of the OS given by
1568
  the 'name' parameter and residing in the 'os_dir' directory.
1569

1570
  @type name: str
1571
  @param name: the OS name we should look for
1572
  @type os_dir: str
1573
  @param os_dir: the directory inwhich we should look for the OS
1574
  @rtype: int or None
1575
  @return:
1576
      Either an integer denoting the version or None in the
1577
      case when this is not a valid OS name.
1578
  @raise errors.InvalidOS: if the OS cannot be found
1579

1580
  """
1581
  api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1582

    
1583
  try:
1584
    st = os.stat(api_file)
1585
  except EnvironmentError, err:
1586
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1587
                           " found (%s)" % _ErrnoOrStr(err))
1588

    
1589
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1590
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1591
                           " a regular file")
1592

    
1593
  try:
1594
    f = open(api_file)
1595
    try:
1596
      api_versions = f.readlines()
1597
    finally:
1598
      f.close()
1599
  except EnvironmentError, err:
1600
    raise errors.InvalidOS(name, os_dir, "error while reading the"
1601
                           " API version (%s)" % _ErrnoOrStr(err))
1602

    
1603
  api_versions = [version.strip() for version in api_versions]
1604
  try:
1605
    api_versions = [int(version) for version in api_versions]
1606
  except (TypeError, ValueError), err:
1607
    raise errors.InvalidOS(name, os_dir,
1608
                           "API version is not integer (%s)" % str(err))
1609

    
1610
  return api_versions
1611

    
1612

    
1613
def DiagnoseOS(top_dirs=None):
1614
  """Compute the validity for all OSes.
1615

1616
  @type top_dirs: list
1617
  @param top_dirs: the list of directories in which to
1618
      search (if not given defaults to
1619
      L{constants.OS_SEARCH_PATH})
1620
  @rtype: list of L{objects.OS}
1621
  @return: an OS object for each name in all the given
1622
      directories
1623

1624
  """
1625
  if top_dirs is None:
1626
    top_dirs = constants.OS_SEARCH_PATH
1627

    
1628
  result = []
1629
  for dir_name in top_dirs:
1630
    if os.path.isdir(dir_name):
1631
      try:
1632
        f_names = utils.ListVisibleFiles(dir_name)
1633
      except EnvironmentError, err:
1634
        logging.exception("Can't list the OS directory %s", dir_name)
1635
        break
1636
      for name in f_names:
1637
        try:
1638
          os_inst = OSFromDisk(name, base_dir=dir_name)
1639
          result.append(os_inst)
1640
        except errors.InvalidOS, err:
1641
          result.append(objects.OS.FromInvalidOS(err))
1642

    
1643
  return result
1644

    
1645

    
1646
def OSFromDisk(name, base_dir=None):
1647
  """Create an OS instance from disk.
1648

1649
  This function will return an OS instance if the given name is a
1650
  valid OS name. Otherwise, it will raise an appropriate
1651
  L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1652

1653
  @type base_dir: string
1654
  @keyword base_dir: Base directory containing OS installations.
1655
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1656
  @rtype: L{objects.OS}
1657
  @return: the OS instance if we find a valid one
1658
  @raise errors.InvalidOS: if we don't find a valid OS
1659

1660
  """
1661
  if base_dir is None:
1662
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1663
  else:
1664
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1665

    
1666
  if os_dir is None:
1667
    raise errors.InvalidOS(name, None, "OS dir not found in search path")
1668

    
1669
  api_versions = _OSOndiskVersion(name, os_dir)
1670

    
1671
  if constants.OS_API_VERSION not in api_versions:
1672
    raise errors.InvalidOS(name, os_dir, "API version mismatch"
1673
                           " (found %s want %s)"
1674
                           % (api_versions, constants.OS_API_VERSION))
1675

    
1676
  # OS Scripts dictionary, we will populate it with the actual script names
1677
  os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1678

    
1679
  for script in os_scripts:
1680
    os_scripts[script] = os.path.sep.join([os_dir, script])
1681

    
1682
    try:
1683
      st = os.stat(os_scripts[script])
1684
    except EnvironmentError, err:
1685
      raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1686
                             (script, _ErrnoOrStr(err)))
1687

    
1688
    if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1689
      raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1690
                             script)
1691

    
1692
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1693
      raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1694
                             script)
1695

    
1696

    
1697
  return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1698
                    create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1699
                    export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1700
                    import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1701
                    rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1702
                    api_versions=api_versions)
1703

    
1704
def OSEnvironment(instance, debug=0):
1705
  """Calculate the environment for an os script.
1706

1707
  @type instance: L{objects.Instance}
1708
  @param instance: target instance for the os script run
1709
  @type debug: integer
1710
  @param debug: debug level (0 or 1, for OS Api 10)
1711
  @rtype: dict
1712
  @return: dict of environment variables
1713
  @raise errors.BlockDeviceError: if the block device
1714
      cannot be found
1715

1716
  """
1717
  result = {}
1718
  result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1719
  result['INSTANCE_NAME'] = instance.name
1720
  result['INSTANCE_OS'] = instance.os
1721
  result['HYPERVISOR'] = instance.hypervisor
1722
  result['DISK_COUNT'] = '%d' % len(instance.disks)
1723
  result['NIC_COUNT'] = '%d' % len(instance.nics)
1724
  result['DEBUG_LEVEL'] = '%d' % debug
1725
  for idx, disk in enumerate(instance.disks):
1726
    real_disk = _RecursiveFindBD(disk)
1727
    if real_disk is None:
1728
      raise errors.BlockDeviceError("Block device '%s' is not set up" %
1729
                                    str(disk))
1730
    real_disk.Open()
1731
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
1732
    result['DISK_%d_ACCESS' % idx] = disk.mode
1733
    if constants.HV_DISK_TYPE in instance.hvparams:
1734
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
1735
        instance.hvparams[constants.HV_DISK_TYPE]
1736
    if disk.dev_type in constants.LDS_BLOCK:
1737
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1738
    elif disk.dev_type == constants.LD_FILE:
1739
      result['DISK_%d_BACKEND_TYPE' % idx] = \
1740
        'file:%s' % disk.physical_id[0]
1741
  for idx, nic in enumerate(instance.nics):
1742
    result['NIC_%d_MAC' % idx] = nic.mac
1743
    if nic.ip:
1744
      result['NIC_%d_IP' % idx] = nic.ip
1745
    result['NIC_%d_BRIDGE' % idx] = nic.bridge
1746
    if constants.HV_NIC_TYPE in instance.hvparams:
1747
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
1748
        instance.hvparams[constants.HV_NIC_TYPE]
1749

    
1750
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1751
    for key, value in source.items():
1752
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1753

    
1754
  return result
1755

    
1756
def BlockdevGrow(disk, amount):
1757
  """Grow a stack of block devices.
1758

1759
  This function is called recursively, with the childrens being the
1760
  first ones to resize.
1761

1762
  @type disk: L{objects.Disk}
1763
  @param disk: the disk to be grown
1764
  @rtype: (status, result)
1765
  @return: a tuple with the status of the operation
1766
      (True/False), and the errors message if status
1767
      is False
1768

1769
  """
1770
  r_dev = _RecursiveFindBD(disk)
1771
  if r_dev is None:
1772
    return False, "Cannot find block device %s" % (disk,)
1773

    
1774
  try:
1775
    r_dev.Grow(amount)
1776
  except errors.BlockDeviceError, err:
1777
    return False, str(err)
1778

    
1779
  return True, None
1780

    
1781

    
1782
def BlockdevSnapshot(disk):
1783
  """Create a snapshot copy of a block device.
1784

1785
  This function is called recursively, and the snapshot is actually created
1786
  just for the leaf lvm backend device.
1787

1788
  @type disk: L{objects.Disk}
1789
  @param disk: the disk to be snapshotted
1790
  @rtype: string
1791
  @return: snapshot disk path
1792

1793
  """
1794
  if disk.children:
1795
    if len(disk.children) == 1:
1796
      # only one child, let's recurse on it
1797
      return BlockdevSnapshot(disk.children[0])
1798
    else:
1799
      # more than one child, choose one that matches
1800
      for child in disk.children:
1801
        if child.size == disk.size:
1802
          # return implies breaking the loop
1803
          return BlockdevSnapshot(child)
1804
  elif disk.dev_type == constants.LD_LV:
1805
    r_dev = _RecursiveFindBD(disk)
1806
    if r_dev is not None:
1807
      # let's stay on the safe side and ask for the full size, for now
1808
      return r_dev.Snapshot(disk.size)
1809
    else:
1810
      return None
1811
  else:
1812
    raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1813
                                 " '%s' of type '%s'" %
1814
                                 (disk.unique_id, disk.dev_type))
1815

    
1816

    
1817
def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1818
  """Export a block device snapshot to a remote node.
1819

1820
  @type disk: L{objects.Disk}
1821
  @param disk: the description of the disk to export
1822
  @type dest_node: str
1823
  @param dest_node: the destination node to export to
1824
  @type instance: L{objects.Instance}
1825
  @param instance: the instance object to whom the disk belongs
1826
  @type cluster_name: str
1827
  @param cluster_name: the cluster name, needed for SSH hostalias
1828
  @type idx: int
1829
  @param idx: the index of the disk in the instance's disk list,
1830
      used to export to the OS scripts environment
1831
  @rtype: boolean
1832
  @return: the success of the operation
1833

1834
  """
1835
  export_env = OSEnvironment(instance)
1836

    
1837
  inst_os = OSFromDisk(instance.os)
1838
  export_script = inst_os.export_script
1839

    
1840
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1841
                                     instance.name, int(time.time()))
1842
  if not os.path.exists(constants.LOG_OS_DIR):
1843
    os.mkdir(constants.LOG_OS_DIR, 0750)
1844
  real_disk = _RecursiveFindBD(disk)
1845
  if real_disk is None:
1846
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1847
                                  str(disk))
1848
  real_disk.Open()
1849

    
1850
  export_env['EXPORT_DEVICE'] = real_disk.dev_path
1851
  export_env['EXPORT_INDEX'] = str(idx)
1852

    
1853
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1854
  destfile = disk.physical_id[1]
1855

    
1856
  # the target command is built out of three individual commands,
1857
  # which are joined by pipes; we check each individual command for
1858
  # valid parameters
1859
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
1860
                               inst_os.path, export_script, logfile)
1861

    
1862
  comprcmd = "gzip"
1863

    
1864
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1865
                                destdir, destdir, destfile)
1866
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1867
                                                   constants.GANETI_RUNAS,
1868
                                                   destcmd)
1869

    
1870
  # all commands have been checked, so we're safe to combine them
1871
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1872

    
1873
  result = utils.RunCmd(["bash", "-c", command], env=export_env)
1874

    
1875
  if result.failed:
1876
    logging.error("os snapshot export command '%s' returned error: %s"
1877
                  " output: %s", command, result.fail_reason, result.output)
1878
    return False
1879

    
1880
  return True
1881

    
1882

    
1883
def FinalizeExport(instance, snap_disks):
1884
  """Write out the export configuration information.
1885

1886
  @type instance: L{objects.Instance}
1887
  @param instance: the instance which we export, used for
1888
      saving configuration
1889
  @type snap_disks: list of L{objects.Disk}
1890
  @param snap_disks: list of snapshot block devices, which
1891
      will be used to get the actual name of the dump file
1892

1893
  @rtype: boolean
1894
  @return: the success of the operation
1895

1896
  """
1897
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1898
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1899

    
1900
  config = objects.SerializableConfigParser()
1901

    
1902
  config.add_section(constants.INISECT_EXP)
1903
  config.set(constants.INISECT_EXP, 'version', '0')
1904
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1905
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1906
  config.set(constants.INISECT_EXP, 'os', instance.os)
1907
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
1908

    
1909
  config.add_section(constants.INISECT_INS)
1910
  config.set(constants.INISECT_INS, 'name', instance.name)
1911
  config.set(constants.INISECT_INS, 'memory', '%d' %
1912
             instance.beparams[constants.BE_MEMORY])
1913
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
1914
             instance.beparams[constants.BE_VCPUS])
1915
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1916

    
1917
  nic_total = 0
1918
  for nic_count, nic in enumerate(instance.nics):
1919
    nic_total += 1
1920
    config.set(constants.INISECT_INS, 'nic%d_mac' %
1921
               nic_count, '%s' % nic.mac)
1922
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1923
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1924
               '%s' % nic.bridge)
1925
  # TODO: redundant: on load can read nics until it doesn't exist
1926
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1927

    
1928
  disk_total = 0
1929
  for disk_count, disk in enumerate(snap_disks):
1930
    if disk:
1931
      disk_total += 1
1932
      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1933
                 ('%s' % disk.iv_name))
1934
      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1935
                 ('%s' % disk.physical_id[1]))
1936
      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1937
                 ('%d' % disk.size))
1938

    
1939
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1940

    
1941
  utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1942
                  data=config.Dumps())
1943
  shutil.rmtree(finaldestdir, True)
1944
  shutil.move(destdir, finaldestdir)
1945

    
1946
  return True
1947

    
1948

    
1949
def ExportInfo(dest):
1950
  """Get export configuration information.
1951

1952
  @type dest: str
1953
  @param dest: directory containing the export
1954

1955
  @rtype: L{objects.SerializableConfigParser}
1956
  @return: a serializable config file containing the
1957
      export info
1958

1959
  """
1960
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1961

    
1962
  config = objects.SerializableConfigParser()
1963
  config.read(cff)
1964

    
1965
  if (not config.has_section(constants.INISECT_EXP) or
1966
      not config.has_section(constants.INISECT_INS)):
1967
    return None
1968

    
1969
  return config
1970

    
1971

    
1972
def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1973
  """Import an os image into an instance.
1974

1975
  @type instance: L{objects.Instance}
1976
  @param instance: instance to import the disks into
1977
  @type src_node: string
1978
  @param src_node: source node for the disk images
1979
  @type src_images: list of string
1980
  @param src_images: absolute paths of the disk images
1981
  @rtype: list of boolean
1982
  @return: each boolean represent the success of importing the n-th disk
1983

1984
  """
1985
  import_env = OSEnvironment(instance)
1986
  inst_os = OSFromDisk(instance.os)
1987
  import_script = inst_os.import_script
1988

    
1989
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1990
                                        instance.name, int(time.time()))
1991
  if not os.path.exists(constants.LOG_OS_DIR):
1992
    os.mkdir(constants.LOG_OS_DIR, 0750)
1993

    
1994
  comprcmd = "gunzip"
1995
  impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1996
                               import_script, logfile)
1997

    
1998
  final_result = []
1999
  for idx, image in enumerate(src_images):
2000
    if image:
2001
      destcmd = utils.BuildShellCmd('cat %s', image)
2002
      remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
2003
                                                       constants.GANETI_RUNAS,
2004
                                                       destcmd)
2005
      command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
2006
      import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
2007
      import_env['IMPORT_INDEX'] = str(idx)
2008
      result = utils.RunCmd(command, env=import_env)
2009
      if result.failed:
2010
        logging.error("Disk import command '%s' returned error: %s"
2011
                      " output: %s", command, result.fail_reason,
2012
                      result.output)
2013
        final_result.append(False)
2014
      else:
2015
        final_result.append(True)
2016
    else:
2017
      final_result.append(True)
2018

    
2019
  return final_result
2020

    
2021

    
2022
def ListExports():
2023
  """Return a list of exports currently available on this machine.
2024

2025
  @rtype: list
2026
  @return: list of the exports
2027

2028
  """
2029
  if os.path.isdir(constants.EXPORT_DIR):
2030
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
2031
  else:
2032
    return []
2033

    
2034

    
2035
def RemoveExport(export):
2036
  """Remove an existing export from the node.
2037

2038
  @type export: str
2039
  @param export: the name of the export to remove
2040
  @rtype: boolean
2041
  @return: the success of the operation
2042

2043
  """
2044
  target = os.path.join(constants.EXPORT_DIR, export)
2045

    
2046
  shutil.rmtree(target)
2047
  # TODO: catch some of the relevant exceptions and provide a pretty
2048
  # error message if rmtree fails.
2049

    
2050
  return True
2051

    
2052

    
2053
def BlockdevRename(devlist):
2054
  """Rename a list of block devices.
2055

2056
  @type devlist: list of tuples
2057
  @param devlist: list of tuples of the form  (disk,
2058
      new_logical_id, new_physical_id); disk is an
2059
      L{objects.Disk} object describing the current disk,
2060
      and new logical_id/physical_id is the name we
2061
      rename it to
2062
  @rtype: boolean
2063
  @return: True if all renames succeeded, False otherwise
2064

2065
  """
2066
  result = True
2067
  for disk, unique_id in devlist:
2068
    dev = _RecursiveFindBD(disk)
2069
    if dev is None:
2070
      result = False
2071
      continue
2072
    try:
2073
      old_rpath = dev.dev_path
2074
      dev.Rename(unique_id)
2075
      new_rpath = dev.dev_path
2076
      if old_rpath != new_rpath:
2077
        DevCacheManager.RemoveCache(old_rpath)
2078
        # FIXME: we should add the new cache information here, like:
2079
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2080
        # but we don't have the owner here - maybe parse from existing
2081
        # cache? for now, we only lose lvm data when we rename, which
2082
        # is less critical than DRBD or MD
2083
    except errors.BlockDeviceError:
2084
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2085
      result = False
2086
  return result
2087

    
2088

    
2089
def _TransformFileStorageDir(file_storage_dir):
2090
  """Checks whether given file_storage_dir is valid.
2091

2092
  Checks wheter the given file_storage_dir is within the cluster-wide
2093
  default file_storage_dir stored in SimpleStore. Only paths under that
2094
  directory are allowed.
2095

2096
  @type file_storage_dir: str
2097
  @param file_storage_dir: the path to check
2098

2099
  @return: the normalized path if valid, None otherwise
2100

2101
  """
2102
  cfg = _GetConfig()
2103
  file_storage_dir = os.path.normpath(file_storage_dir)
2104
  base_file_storage_dir = cfg.GetFileStorageDir()
2105
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2106
      base_file_storage_dir):
2107
    logging.error("file storage directory '%s' is not under base file"
2108
                  " storage directory '%s'",
2109
                  file_storage_dir, base_file_storage_dir)
2110
    return None
2111
  return file_storage_dir
2112

    
2113

    
2114
def CreateFileStorageDir(file_storage_dir):
2115
  """Create file storage directory.
2116

2117
  @type file_storage_dir: str
2118
  @param file_storage_dir: directory to create
2119

2120
  @rtype: tuple
2121
  @return: tuple with first element a boolean indicating wheter dir
2122
      creation was successful or not
2123

2124
  """
2125
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2126
  result = True,
2127
  if not file_storage_dir:
2128
    result = False,
2129
  else:
2130
    if os.path.exists(file_storage_dir):
2131
      if not os.path.isdir(file_storage_dir):
2132
        logging.error("'%s' is not a directory", file_storage_dir)
2133
        result = False,
2134
    else:
2135
      try:
2136
        os.makedirs(file_storage_dir, 0750)
2137
      except OSError, err:
2138
        logging.error("Cannot create file storage directory '%s': %s",
2139
                      file_storage_dir, err)
2140
        result = False,
2141
  return result
2142

    
2143

    
2144
def RemoveFileStorageDir(file_storage_dir):
2145
  """Remove file storage directory.
2146

2147
  Remove it only if it's empty. If not log an error and return.
2148

2149
  @type file_storage_dir: str
2150
  @param file_storage_dir: the directory we should cleanup
2151
  @rtype: tuple (success,)
2152
  @return: tuple of one element, C{success}, denoting
2153
      whether the operation was successful
2154

2155
  """
2156
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2157
  result = True,
2158
  if not file_storage_dir:
2159
    result = False,
2160
  else:
2161
    if os.path.exists(file_storage_dir):
2162
      if not os.path.isdir(file_storage_dir):
2163
        logging.error("'%s' is not a directory", file_storage_dir)
2164
        result = False,
2165
      # deletes dir only if empty, otherwise we want to return False
2166
      try:
2167
        os.rmdir(file_storage_dir)
2168
      except OSError:
2169
        logging.exception("Cannot remove file storage directory '%s'",
2170
                          file_storage_dir)
2171
        result = False,
2172
  return result
2173

    
2174

    
2175
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2176
  """Rename the file storage directory.
2177

2178
  @type old_file_storage_dir: str
2179
  @param old_file_storage_dir: the current path
2180
  @type new_file_storage_dir: str
2181
  @param new_file_storage_dir: the name we should rename to
2182
  @rtype: tuple (success,)
2183
  @return: tuple of one element, C{success}, denoting
2184
      whether the operation was successful
2185

2186
  """
2187
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2188
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2189
  result = True,
2190
  if not old_file_storage_dir or not new_file_storage_dir:
2191
    result = False,
2192
  else:
2193
    if not os.path.exists(new_file_storage_dir):
2194
      if os.path.isdir(old_file_storage_dir):
2195
        try:
2196
          os.rename(old_file_storage_dir, new_file_storage_dir)
2197
        except OSError:
2198
          logging.exception("Cannot rename '%s' to '%s'",
2199
                            old_file_storage_dir, new_file_storage_dir)
2200
          result =  False,
2201
      else:
2202
        logging.error("'%s' is not a directory", old_file_storage_dir)
2203
        result = False,
2204
    else:
2205
      if os.path.exists(old_file_storage_dir):
2206
        logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
2207
                      old_file_storage_dir, new_file_storage_dir)
2208
        result = False,
2209
  return result
2210

    
2211

    
2212
def _IsJobQueueFile(file_name):
2213
  """Checks whether the given filename is in the queue directory.
2214

2215
  @type file_name: str
2216
  @param file_name: the file name we should check
2217
  @rtype: boolean
2218
  @return: whether the file is under the queue directory
2219

2220
  """
2221
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2222
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2223

    
2224
  if not result:
2225
    logging.error("'%s' is not a file in the queue directory",
2226
                  file_name)
2227

    
2228
  return result
2229

    
2230

    
2231
def JobQueueUpdate(file_name, content):
2232
  """Updates a file in the queue directory.
2233

2234
  This is just a wrapper over L{utils.WriteFile}, with proper
2235
  checking.
2236

2237
  @type file_name: str
2238
  @param file_name: the job file name
2239
  @type content: str
2240
  @param content: the new job contents
2241
  @rtype: boolean
2242
  @return: the success of the operation
2243

2244
  """
2245
  if not _IsJobQueueFile(file_name):
2246
    return False
2247

    
2248
  # Write and replace the file atomically
2249
  utils.WriteFile(file_name, data=_Decompress(content))
2250

    
2251
  return True
2252

    
2253

    
2254
def JobQueueRename(old, new):
2255
  """Renames a job queue file.
2256

2257
  This is just a wrapper over os.rename with proper checking.
2258

2259
  @type old: str
2260
  @param old: the old (actual) file name
2261
  @type new: str
2262
  @param new: the desired file name
2263
  @rtype: boolean
2264
  @return: the success of the operation
2265

2266
  """
2267
  if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2268
    return False
2269

    
2270
  utils.RenameFile(old, new, mkdir=True)
2271

    
2272
  return True
2273

    
2274

    
2275
def JobQueueSetDrainFlag(drain_flag):
2276
  """Set the drain flag for the queue.
2277

2278
  This will set or unset the queue drain flag.
2279

2280
  @type drain_flag: boolean
2281
  @param drain_flag: if True, will set the drain flag, otherwise reset it.
2282
  @rtype: boolean
2283
  @return: always True
2284
  @warning: the function always returns True
2285

2286
  """
2287
  if drain_flag:
2288
    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2289
  else:
2290
    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2291

    
2292
  return True
2293

    
2294

    
2295
def BlockdevClose(instance_name, disks):
2296
  """Closes the given block devices.
2297

2298
  This means they will be switched to secondary mode (in case of
2299
  DRBD).
2300

2301
  @param instance_name: if the argument is not empty, the symlinks
2302
      of this instance will be removed
2303
  @type disks: list of L{objects.Disk}
2304
  @param disks: the list of disks to be closed
2305
  @rtype: tuple (success, message)
2306
  @return: a tuple of success and message, where success
2307
      indicates the succes of the operation, and message
2308
      which will contain the error details in case we
2309
      failed
2310

2311
  """
2312
  bdevs = []
2313
  for cf in disks:
2314
    rd = _RecursiveFindBD(cf)
2315
    if rd is None:
2316
      return (False, "Can't find device %s" % cf)
2317
    bdevs.append(rd)
2318

    
2319
  msg = []
2320
  for rd in bdevs:
2321
    try:
2322
      rd.Close()
2323
    except errors.BlockDeviceError, err:
2324
      msg.append(str(err))
2325
  if msg:
2326
    return (False, "Can't make devices secondary: %s" % ",".join(msg))
2327
  else:
2328
    if instance_name:
2329
      _RemoveBlockDevLinks(instance_name, disks)
2330
    return (True, "All devices secondary")
2331

    
2332

    
2333
def ValidateHVParams(hvname, hvparams):
2334
  """Validates the given hypervisor parameters.
2335

2336
  @type hvname: string
2337
  @param hvname: the hypervisor name
2338
  @type hvparams: dict
2339
  @param hvparams: the hypervisor parameters to be validated
2340
  @rtype: tuple (success, message)
2341
  @return: a tuple of success and message, where success
2342
      indicates the succes of the operation, and message
2343
      which will contain the error details in case we
2344
      failed
2345

2346
  """
2347
  try:
2348
    hv_type = hypervisor.GetHypervisor(hvname)
2349
    hv_type.ValidateParameters(hvparams)
2350
    return (True, "Validation passed")
2351
  except errors.HypervisorError, err:
2352
    return (False, str(err))
2353

    
2354

    
2355
def DemoteFromMC():
2356
  """Demotes the current node from master candidate role.
2357

2358
  """
2359
  # try to ensure we're not the master by mistake
2360
  master, myself = ssconf.GetMasterAndMyself()
2361
  if master == myself:
2362
    return (False, "ssconf status shows I'm the master node, will not demote")
2363
  pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2364
  if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2365
    return (False, "The master daemon is running, will not demote")
2366
  try:
2367
    if os.path.isfile(constants.CLUSTER_CONF_FILE):
2368
      utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2369
  except EnvironmentError, err:
2370
    if err.errno != errno.ENOENT:
2371
      return (False, "Error while backing up cluster file: %s" % str(err))
2372
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2373
  return (True, "Done")
2374

    
2375

    
2376
def _FindDisks(nodes_ip, disks):
2377
  """Sets the physical ID on disks and returns the block devices.
2378

2379
  """
2380
  # set the correct physical ID
2381
  my_name = utils.HostInfo().name
2382
  for cf in disks:
2383
    cf.SetPhysicalID(my_name, nodes_ip)
2384

    
2385
  bdevs = []
2386

    
2387
  for cf in disks:
2388
    rd = _RecursiveFindBD(cf)
2389
    if rd is None:
2390
      return (False, "Can't find device %s" % cf)
2391
    bdevs.append(rd)
2392
  return (True, bdevs)
2393

    
2394

    
2395
def DrbdDisconnectNet(nodes_ip, disks):
2396
  """Disconnects the network on a list of drbd devices.
2397

2398
  """
2399
  status, bdevs = _FindDisks(nodes_ip, disks)
2400
  if not status:
2401
    return status, bdevs
2402

    
2403
  # disconnect disks
2404
  for rd in bdevs:
2405
    try:
2406
      rd.DisconnectNet()
2407
    except errors.BlockDeviceError, err:
2408
      logging.exception("Failed to go into standalone mode")
2409
      return (False, "Can't change network configuration: %s" % str(err))
2410
  return (True, "All disks are now disconnected")
2411

    
2412

    
2413
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2414
  """Attaches the network on a list of drbd devices.
2415

2416
  """
2417
  status, bdevs = _FindDisks(nodes_ip, disks)
2418
  if not status:
2419
    return status, bdevs
2420

    
2421
  if multimaster:
2422
    for idx, rd in enumerate(bdevs):
2423
      try:
2424
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2425
      except EnvironmentError, err:
2426
        return (False, "Can't create symlink: %s" % str(err))
2427
  # reconnect disks, switch to new master configuration and if
2428
  # needed primary mode
2429
  for rd in bdevs:
2430
    try:
2431
      rd.AttachNet(multimaster)
2432
    except errors.BlockDeviceError, err:
2433
      return (False, "Can't change network configuration: %s" % str(err))
2434
  # wait until the disks are connected; we need to retry the re-attach
2435
  # if the device becomes standalone, as this might happen if the one
2436
  # node disconnects and reconnects in a different mode before the
2437
  # other node reconnects; in this case, one or both of the nodes will
2438
  # decide it has wrong configuration and switch to standalone
2439
  RECONNECT_TIMEOUT = 2 * 60
2440
  sleep_time = 0.100 # start with 100 miliseconds
2441
  timeout_limit = time.time() + RECONNECT_TIMEOUT
2442
  while time.time() < timeout_limit:
2443
    all_connected = True
2444
    for rd in bdevs:
2445
      stats = rd.GetProcStatus()
2446
      if not (stats.is_connected or stats.is_in_resync):
2447
        all_connected = False
2448
      if stats.is_standalone:
2449
        # peer had different config info and this node became
2450
        # standalone, even though this should not happen with the
2451
        # new staged way of changing disk configs
2452
        try:
2453
          rd.AttachNet(multimaster)
2454
        except errors.BlockDeviceError, err:
2455
          return (False, "Can't change network configuration: %s" % str(err))
2456
    if all_connected:
2457
      break
2458
    time.sleep(sleep_time)
2459
    sleep_time = min(5, sleep_time * 1.5)
2460
  if not all_connected:
2461
    return (False, "Timeout in disk reconnecting")
2462
  if multimaster:
2463
    # change to primary mode
2464
    for rd in bdevs:
2465
      try:
2466
        rd.Open()
2467
      except errors.BlockDeviceError, err:
2468
        return (False, "Can't change to primary mode: %s" % str(err))
2469
  if multimaster:
2470
    msg = "multi-master and primary"
2471
  else:
2472
    msg = "single-master"
2473
  return (True, "Disks are now configured as %s" % msg)
2474

    
2475

    
2476
def DrbdWaitSync(nodes_ip, disks):
2477
  """Wait until DRBDs have synchronized.
2478

2479
  """
2480
  status, bdevs = _FindDisks(nodes_ip, disks)
2481
  if not status:
2482
    return status, bdevs
2483

    
2484
  min_resync = 100
2485
  alldone = True
2486
  failure = False
2487
  for rd in bdevs:
2488
    stats = rd.GetProcStatus()
2489
    if not (stats.is_connected or stats.is_in_resync):
2490
      failure = True
2491
      break
2492
    alldone = alldone and (not stats.is_in_resync)
2493
    if stats.sync_percent is not None:
2494
      min_resync = min(min_resync, stats.sync_percent)
2495
  return (not failure, (alldone, min_resync))
2496

    
2497

    
2498
class HooksRunner(object):
2499
  """Hook runner.
2500

2501
  This class is instantiated on the node side (ganeti-noded) and not
2502
  on the master side.
2503

2504
  """
2505
  def __init__(self, hooks_base_dir=None):
2506
    """Constructor for hooks runner.
2507

2508
    @type hooks_base_dir: str or None
2509
    @param hooks_base_dir: if not None, this overrides the
2510
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
2511

2512
    """
2513
    if hooks_base_dir is None:
2514
      hooks_base_dir = constants.HOOKS_BASE_DIR
2515
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
2516
    # constant
2517
    self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
2518

    
2519
  @staticmethod
2520
  def ExecHook(script, env):
2521
    """Exec one hook script.
2522

2523
    @type script: str
2524
    @param script: the full path to the script
2525
    @type env: dict
2526
    @param env: the environment with which to exec the script
2527
    @rtype: tuple (success, message)
2528
    @return: a tuple of success and message, where success
2529
        indicates the succes of the operation, and message
2530
        which will contain the error details in case we
2531
        failed
2532

2533
    """
2534
    # exec the process using subprocess and log the output
2535
    fdstdin = None
2536
    try:
2537
      fdstdin = open("/dev/null", "r")
2538
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2539
                               stderr=subprocess.STDOUT, close_fds=True,
2540
                               shell=False, cwd="/", env=env)
2541
      output = ""
2542
      try:
2543
        output = child.stdout.read(4096)
2544
        child.stdout.close()
2545
      except EnvironmentError, err:
2546
        output += "Hook script error: %s" % str(err)
2547

    
2548
      while True:
2549
        try:
2550
          result = child.wait()
2551
          break
2552
        except EnvironmentError, err:
2553
          if err.errno == errno.EINTR:
2554
            continue
2555
          raise
2556
    finally:
2557
      # try not to leak fds
2558
      for fd in (fdstdin, ):
2559
        if fd is not None:
2560
          try:
2561
            fd.close()
2562
          except EnvironmentError, err:
2563
            # just log the error
2564
            #logging.exception("Error while closing fd %s", fd)
2565
            pass
2566

    
2567
    return result == 0, utils.SafeEncode(output.strip())
2568

    
2569
  def RunHooks(self, hpath, phase, env):
2570
    """Run the scripts in the hooks directory.
2571

2572
    @type hpath: str
2573
    @param hpath: the path to the hooks directory which
2574
        holds the scripts
2575
    @type phase: str
2576
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
2577
        L{constants.HOOKS_PHASE_POST}
2578
    @type env: dict
2579
    @param env: dictionary with the environment for the hook
2580
    @rtype: list
2581
    @return: list of 3-element tuples:
2582
      - script path
2583
      - script result, either L{constants.HKR_SUCCESS} or
2584
        L{constants.HKR_FAIL}
2585
      - output of the script
2586

2587
    @raise errors.ProgrammerError: for invalid input
2588
        parameters
2589

2590
    """
2591
    if phase == constants.HOOKS_PHASE_PRE:
2592
      suffix = "pre"
2593
    elif phase == constants.HOOKS_PHASE_POST:
2594
      suffix = "post"
2595
    else:
2596
      raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2597
    rr = []
2598

    
2599
    subdir = "%s-%s.d" % (hpath, suffix)
2600
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2601
    try:
2602
      dir_contents = utils.ListVisibleFiles(dir_name)
2603
    except OSError:
2604
      # FIXME: must log output in case of failures
2605
      return rr
2606

    
2607
    # we use the standard python sort order,
2608
    # so 00name is the recommended naming scheme
2609
    dir_contents.sort()
2610
    for relname in dir_contents:
2611
      fname = os.path.join(dir_name, relname)
2612
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2613
              constants.EXT_PLUGIN_MASK.match(relname) is not None):
2614
        rrval = constants.HKR_SKIP
2615
        output = ""
2616
      else:
2617
        result, output = self.ExecHook(fname, env)
2618
        if not result:
2619
          rrval = constants.HKR_FAIL
2620
        else:
2621
          rrval = constants.HKR_SUCCESS
2622
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
2623

    
2624
    return rr
2625

    
2626

    
2627
class IAllocatorRunner(object):
2628
  """IAllocator runner.
2629

2630
  This class is instantiated on the node side (ganeti-noded) and not on
2631
  the master side.
2632

2633
  """
2634
  def Run(self, name, idata):
2635
    """Run an iallocator script.
2636

2637
    @type name: str
2638
    @param name: the iallocator script name
2639
    @type idata: str
2640
    @param idata: the allocator input data
2641

2642
    @rtype: tuple
2643
    @return: four element tuple of:
2644
       - run status (one of the IARUN_ constants)
2645
       - stdout
2646
       - stderr
2647
       - fail reason (as from L{utils.RunResult})
2648

2649
    """
2650
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2651
                                  os.path.isfile)
2652
    if alloc_script is None:
2653
      return (constants.IARUN_NOTFOUND, None, None, None)
2654

    
2655
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2656
    try:
2657
      os.write(fd, idata)
2658
      os.close(fd)
2659
      result = utils.RunCmd([alloc_script, fin_name])
2660
      if result.failed:
2661
        return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2662
                result.fail_reason)
2663
    finally:
2664
      os.unlink(fin_name)
2665

    
2666
    return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2667

    
2668

    
2669
class DevCacheManager(object):
2670
  """Simple class for managing a cache of block device information.
2671

2672
  """
2673
  _DEV_PREFIX = "/dev/"
2674
  _ROOT_DIR = constants.BDEV_CACHE_DIR
2675

    
2676
  @classmethod
2677
  def _ConvertPath(cls, dev_path):
2678
    """Converts a /dev/name path to the cache file name.
2679

2680
    This replaces slashes with underscores and strips the /dev
2681
    prefix. It then returns the full path to the cache file.
2682

2683
    @type dev_path: str
2684
    @param dev_path: the C{/dev/} path name
2685
    @rtype: str
2686
    @return: the converted path name
2687

2688
    """
2689
    if dev_path.startswith(cls._DEV_PREFIX):
2690
      dev_path = dev_path[len(cls._DEV_PREFIX):]
2691
    dev_path = dev_path.replace("/", "_")
2692
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2693
    return fpath
2694

    
2695
  @classmethod
2696
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2697
    """Updates the cache information for a given device.
2698

2699
    @type dev_path: str
2700
    @param dev_path: the pathname of the device
2701
    @type owner: str
2702
    @param owner: the owner (instance name) of the device
2703
    @type on_primary: bool
2704
    @param on_primary: whether this is the primary
2705
        node nor not
2706
    @type iv_name: str
2707
    @param iv_name: the instance-visible name of the
2708
        device, as in objects.Disk.iv_name
2709

2710
    @rtype: None
2711

2712
    """
2713
    if dev_path is None:
2714
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
2715
      return
2716
    fpath = cls._ConvertPath(dev_path)
2717
    if on_primary:
2718
      state = "primary"
2719
    else:
2720
      state = "secondary"
2721
    if iv_name is None:
2722
      iv_name = "not_visible"
2723
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2724
    try:
2725
      utils.WriteFile(fpath, data=fdata)
2726
    except EnvironmentError:
2727
      logging.exception("Can't update bdev cache for %s", dev_path)
2728

    
2729
  @classmethod
2730
  def RemoveCache(cls, dev_path):
2731
    """Remove data for a dev_path.
2732

2733
    This is just a wrapper over L{utils.RemoveFile} with a converted
2734
    path name and logging.
2735

2736
    @type dev_path: str
2737
    @param dev_path: the pathname of the device
2738

2739
    @rtype: None
2740

2741
    """
2742
    if dev_path is None:
2743
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
2744
      return
2745
    fpath = cls._ConvertPath(dev_path)
2746
    try:
2747
      utils.RemoveFile(fpath)
2748
    except EnvironmentError:
2749
      logging.exception("Can't update bdev cache for %s", dev_path)