Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ afdc3985

History | View | Annotate | Download (81.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon"""
23

    
24

    
25
import os
26
import os.path
27
import shutil
28
import time
29
import stat
30
import errno
31
import re
32
import subprocess
33
import random
34
import logging
35
import tempfile
36
import zlib
37
import base64
38

    
39
from ganeti import errors
40
from ganeti import utils
41
from ganeti import ssh
42
from ganeti import hypervisor
43
from ganeti import constants
44
from ganeti import bdev
45
from ganeti import objects
46
from ganeti import ssconf
47

    
48

    
49
class RPCFail(Exception):
50
  """Class denoting RPC failure.
51

52
  Its argument is the error message.
53

54
  """
55

    
56
def _Fail(msg, *args, **kwargs):
57
  """Log an error and the raise an RPCFail exception.
58

59
  This exception is then handled specially in the ganeti daemon and
60
  turned into a 'failed' return type. As such, this function is a
61
  useful shortcut for logging the error and returning it to the master
62
  daemon.
63

64
  @type msg: string
65
  @param msg: the text of the exception
66
  @raise RPCFail
67

68
  """
69
  if args:
70
    msg = msg % args
71
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
72
    if "exc" in kwargs and kwargs["exc"]:
73
      logging.exception(msg)
74
    else:
75
      logging.error(msg)
76
  raise RPCFail(msg)
77

    
78

    
79
def _GetConfig():
80
  """Simple wrapper to return a SimpleStore.
81

82
  @rtype: L{ssconf.SimpleStore}
83
  @return: a SimpleStore instance
84

85
  """
86
  return ssconf.SimpleStore()
87

    
88

    
89
def _GetSshRunner(cluster_name):
90
  """Simple wrapper to return an SshRunner.
91

92
  @type cluster_name: str
93
  @param cluster_name: the cluster name, which is needed
94
      by the SshRunner constructor
95
  @rtype: L{ssh.SshRunner}
96
  @return: an SshRunner instance
97

98
  """
99
  return ssh.SshRunner(cluster_name)
100

    
101

    
102
def _Decompress(data):
103
  """Unpacks data compressed by the RPC client.
104

105
  @type data: list or tuple
106
  @param data: Data sent by RPC client
107
  @rtype: str
108
  @return: Decompressed data
109

110
  """
111
  assert isinstance(data, (list, tuple))
112
  assert len(data) == 2
113
  (encoding, content) = data
114
  if encoding == constants.RPC_ENCODING_NONE:
115
    return content
116
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
117
    return zlib.decompress(base64.b64decode(content))
118
  else:
119
    raise AssertionError("Unknown data encoding")
120

    
121

    
122
def _CleanDirectory(path, exclude=None):
123
  """Removes all regular files in a directory.
124

125
  @type path: str
126
  @param path: the directory to clean
127
  @type exclude: list
128
  @param exclude: list of files to be excluded, defaults
129
      to the empty list
130

131
  """
132
  if not os.path.isdir(path):
133
    return
134
  if exclude is None:
135
    exclude = []
136
  else:
137
    # Normalize excluded paths
138
    exclude = [os.path.normpath(i) for i in exclude]
139

    
140
  for rel_name in utils.ListVisibleFiles(path):
141
    full_name = os.path.normpath(os.path.join(path, rel_name))
142
    if full_name in exclude:
143
      continue
144
    if os.path.isfile(full_name) and not os.path.islink(full_name):
145
      utils.RemoveFile(full_name)
146

    
147

    
148
def JobQueuePurge():
149
  """Removes job queue files and archived jobs.
150

151
  @rtype: tuple
152
  @return: True, None
153

154
  """
155
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
156
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
157
  return True, None
158

    
159

    
160
def GetMasterInfo():
161
  """Returns master information.
162

163
  This is an utility function to compute master information, either
164
  for consumption here or from the node daemon.
165

166
  @rtype: tuple
167
  @return: True, (master_netdev, master_ip, master_name) in case of success
168
  @raise RPCFail: in case of errors
169

170
  """
171
  try:
172
    cfg = _GetConfig()
173
    master_netdev = cfg.GetMasterNetdev()
174
    master_ip = cfg.GetMasterIP()
175
    master_node = cfg.GetMasterNode()
176
  except errors.ConfigurationError, err:
177
    _Fail("Cluster configuration incomplete", exc=True)
178
  return True, (master_netdev, master_ip, master_node)
179

    
180

    
181
def StartMaster(start_daemons):
182
  """Activate local node as master node.
183

184
  The function will always try activate the IP address of the master
185
  (unless someone else has it). It will also start the master daemons,
186
  based on the start_daemons parameter.
187

188
  @type start_daemons: boolean
189
  @param start_daemons: whther to also start the master
190
      daemons (ganeti-masterd and ganeti-rapi)
191
  @rtype: None
192

193
  """
194
  # GetMasterInfo will raise an exception if not able to return data
195
  master_netdev, master_ip, _ = GetMasterInfo()[1]
196

    
197
  payload = []
198
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
199
    if utils.OwnIpAddress(master_ip):
200
      # we already have the ip:
201
      logging.debug("Master IP already configured, doing nothing")
202
    else:
203
      msg = "Someone else has the master ip, not activating"
204
      logging.error(msg)
205
      payload.append(msg)
206
  else:
207
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
208
                           "dev", master_netdev, "label",
209
                           "%s:0" % master_netdev])
210
    if result.failed:
211
      msg = "Can't activate master IP: %s" % result.output
212
      logging.error(msg)
213
      payload.append(msg)
214

    
215
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
216
                           "-s", master_ip, master_ip])
217
    # we'll ignore the exit code of arping
218

    
219
  # and now start the master and rapi daemons
220
  if start_daemons:
221
    for daemon in 'ganeti-masterd', 'ganeti-rapi':
222
      result = utils.RunCmd([daemon])
223
      if result.failed:
224
        msg = "Can't start daemon %s: %s" % (daemon, result.output)
225
        logging.error(msg)
226
        payload.append(msg)
227

    
228
  if payload:
229
    _Fail("; ".join(payload))
230

    
231
  return True, None
232

    
233

    
234
def StopMaster(stop_daemons):
235
  """Deactivate this node as master.
236

237
  The function will always try to deactivate the IP address of the
238
  master. It will also stop the master daemons depending on the
239
  stop_daemons parameter.
240

241
  @type stop_daemons: boolean
242
  @param stop_daemons: whether to also stop the master daemons
243
      (ganeti-masterd and ganeti-rapi)
244
  @rtype: None
245

246
  """
247
  # TODO: log and report back to the caller the error failures; we
248
  # need to decide in which case we fail the RPC for this
249

    
250
  # GetMasterInfo will raise an exception if not able to return data
251
  master_netdev, master_ip, _ = GetMasterInfo()[1]
252

    
253
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
254
                         "dev", master_netdev])
255
  if result.failed:
256
    logging.error("Can't remove the master IP, error: %s", result.output)
257
    # but otherwise ignore the failure
258

    
259
  if stop_daemons:
260
    # stop/kill the rapi and the master daemon
261
    for daemon in constants.RAPI_PID, constants.MASTERD_PID:
262
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
263

    
264
  return True, None
265

    
266

    
267
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
268
  """Joins this node to the cluster.
269

270
  This does the following:
271
      - updates the hostkeys of the machine (rsa and dsa)
272
      - adds the ssh private key to the user
273
      - adds the ssh public key to the users' authorized_keys file
274

275
  @type dsa: str
276
  @param dsa: the DSA private key to write
277
  @type dsapub: str
278
  @param dsapub: the DSA public key to write
279
  @type rsa: str
280
  @param rsa: the RSA private key to write
281
  @type rsapub: str
282
  @param rsapub: the RSA public key to write
283
  @type sshkey: str
284
  @param sshkey: the SSH private key to write
285
  @type sshpub: str
286
  @param sshpub: the SSH public key to write
287
  @rtype: boolean
288
  @return: the success of the operation
289

290
  """
291
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
292
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
293
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
294
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
295
  for name, content, mode in sshd_keys:
296
    utils.WriteFile(name, data=content, mode=mode)
297

    
298
  try:
299
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
300
                                                    mkdir=True)
301
  except errors.OpExecError, err:
302
    _Fail("Error while processing user ssh files: %s", err, exc=True)
303

    
304
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
305
    utils.WriteFile(name, data=content, mode=0600)
306

    
307
  utils.AddAuthorizedKey(auth_keys, sshpub)
308

    
309
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
310

    
311
  return (True, "Node added successfully")
312

    
313

    
314
def LeaveCluster():
315
  """Cleans up and remove the current node.
316

317
  This function cleans up and prepares the current node to be removed
318
  from the cluster.
319

320
  If processing is successful, then it raises an
321
  L{errors.QuitGanetiException} which is used as a special case to
322
  shutdown the node daemon.
323

324
  """
325
  _CleanDirectory(constants.DATA_DIR)
326
  JobQueuePurge()
327

    
328
  try:
329
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
330

    
331
    f = open(pub_key, 'r')
332
    try:
333
      utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
334
    finally:
335
      f.close()
336

    
337
    utils.RemoveFile(priv_key)
338
    utils.RemoveFile(pub_key)
339
  except errors.OpExecError:
340
    logging.exception("Error while processing ssh files")
341

    
342
  # Raise a custom exception (handled in ganeti-noded)
343
  raise errors.QuitGanetiException(True, 'Shutdown scheduled')
344

    
345

    
346
def GetNodeInfo(vgname, hypervisor_type):
347
  """Gives back a hash with different informations about the node.
348

349
  @type vgname: C{string}
350
  @param vgname: the name of the volume group to ask for disk space information
351
  @type hypervisor_type: C{str}
352
  @param hypervisor_type: the name of the hypervisor to ask for
353
      memory information
354
  @rtype: C{dict}
355
  @return: dictionary with the following keys:
356
      - vg_size is the size of the configured volume group in MiB
357
      - vg_free is the free size of the volume group in MiB
358
      - memory_dom0 is the memory allocated for domain0 in MiB
359
      - memory_free is the currently available (free) ram in MiB
360
      - memory_total is the total number of ram in MiB
361

362
  """
363
  outputarray = {}
364
  vginfo = _GetVGInfo(vgname)
365
  outputarray['vg_size'] = vginfo['vg_size']
366
  outputarray['vg_free'] = vginfo['vg_free']
367

    
368
  hyper = hypervisor.GetHypervisor(hypervisor_type)
369
  hyp_info = hyper.GetNodeInfo()
370
  if hyp_info is not None:
371
    outputarray.update(hyp_info)
372

    
373
  f = open("/proc/sys/kernel/random/boot_id", 'r')
374
  try:
375
    outputarray["bootid"] = f.read(128).rstrip("\n")
376
  finally:
377
    f.close()
378

    
379
  return True, outputarray
380

    
381

    
382
def VerifyNode(what, cluster_name):
383
  """Verify the status of the local node.
384

385
  Based on the input L{what} parameter, various checks are done on the
386
  local node.
387

388
  If the I{filelist} key is present, this list of
389
  files is checksummed and the file/checksum pairs are returned.
390

391
  If the I{nodelist} key is present, we check that we have
392
  connectivity via ssh with the target nodes (and check the hostname
393
  report).
394

395
  If the I{node-net-test} key is present, we check that we have
396
  connectivity to the given nodes via both primary IP and, if
397
  applicable, secondary IPs.
398

399
  @type what: C{dict}
400
  @param what: a dictionary of things to check:
401
      - filelist: list of files for which to compute checksums
402
      - nodelist: list of nodes we should check ssh communication with
403
      - node-net-test: list of nodes we should check node daemon port
404
        connectivity with
405
      - hypervisor: list with hypervisors to run the verify for
406
  @rtype: dict
407
  @return: a dictionary with the same keys as the input dict, and
408
      values representing the result of the checks
409

410
  """
411
  result = {}
412

    
413
  if constants.NV_HYPERVISOR in what:
414
    result[constants.NV_HYPERVISOR] = tmp = {}
415
    for hv_name in what[constants.NV_HYPERVISOR]:
416
      tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
417

    
418
  if constants.NV_FILELIST in what:
419
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
420
      what[constants.NV_FILELIST])
421

    
422
  if constants.NV_NODELIST in what:
423
    result[constants.NV_NODELIST] = tmp = {}
424
    random.shuffle(what[constants.NV_NODELIST])
425
    for node in what[constants.NV_NODELIST]:
426
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
427
      if not success:
428
        tmp[node] = message
429

    
430
  if constants.NV_NODENETTEST in what:
431
    result[constants.NV_NODENETTEST] = tmp = {}
432
    my_name = utils.HostInfo().name
433
    my_pip = my_sip = None
434
    for name, pip, sip in what[constants.NV_NODENETTEST]:
435
      if name == my_name:
436
        my_pip = pip
437
        my_sip = sip
438
        break
439
    if not my_pip:
440
      tmp[my_name] = ("Can't find my own primary/secondary IP"
441
                      " in the node list")
442
    else:
443
      port = utils.GetNodeDaemonPort()
444
      for name, pip, sip in what[constants.NV_NODENETTEST]:
445
        fail = []
446
        if not utils.TcpPing(pip, port, source=my_pip):
447
          fail.append("primary")
448
        if sip != pip:
449
          if not utils.TcpPing(sip, port, source=my_sip):
450
            fail.append("secondary")
451
        if fail:
452
          tmp[name] = ("failure using the %s interface(s)" %
453
                       " and ".join(fail))
454

    
455
  if constants.NV_LVLIST in what:
456
    result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
457

    
458
  if constants.NV_INSTANCELIST in what:
459
    result[constants.NV_INSTANCELIST] = GetInstanceList(
460
      what[constants.NV_INSTANCELIST])
461

    
462
  if constants.NV_VGLIST in what:
463
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
464

    
465
  if constants.NV_VERSION in what:
466
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
467
                                    constants.RELEASE_VERSION)
468

    
469
  if constants.NV_HVINFO in what:
470
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
471
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
472

    
473
  if constants.NV_DRBDLIST in what:
474
    try:
475
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
476
    except errors.BlockDeviceError, err:
477
      logging.warning("Can't get used minors list", exc_info=True)
478
      used_minors = str(err)
479
    result[constants.NV_DRBDLIST] = used_minors
480

    
481
  return True, result
482

    
483

    
484
def GetVolumeList(vg_name):
485
  """Compute list of logical volumes and their size.
486

487
  @type vg_name: str
488
  @param vg_name: the volume group whose LVs we should list
489
  @rtype: dict
490
  @return:
491
      dictionary of all partions (key) with value being a tuple of
492
      their size (in MiB), inactive and online status::
493

494
        {'test1': ('20.06', True, True)}
495

496
      in case of errors, a string is returned with the error
497
      details.
498

499
  """
500
  lvs = {}
501
  sep = '|'
502
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
503
                         "--separator=%s" % sep,
504
                         "-olv_name,lv_size,lv_attr", vg_name])
505
  if result.failed:
506
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
507

    
508
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
509
  for line in result.stdout.splitlines():
510
    line = line.strip()
511
    match = valid_line_re.match(line)
512
    if not match:
513
      logging.error("Invalid line returned from lvs output: '%s'", line)
514
      continue
515
    name, size, attr = match.groups()
516
    inactive = attr[4] == '-'
517
    online = attr[5] == 'o'
518
    lvs[name] = (size, inactive, online)
519

    
520
  return lvs
521

    
522

    
523
def ListVolumeGroups():
524
  """List the volume groups and their size.
525

526
  @rtype: dict
527
  @return: dictionary with keys volume name and values the
528
      size of the volume
529

530
  """
531
  return True, utils.ListVolumeGroups()
532

    
533

    
534
def NodeVolumes():
535
  """List all volumes on this node.
536

537
  @rtype: list
538
  @return:
539
    A list of dictionaries, each having four keys:
540
      - name: the logical volume name,
541
      - size: the size of the logical volume
542
      - dev: the physical device on which the LV lives
543
      - vg: the volume group to which it belongs
544

545
    In case of errors, we return an empty list and log the
546
    error.
547

548
    Note that since a logical volume can live on multiple physical
549
    volumes, the resulting list might include a logical volume
550
    multiple times.
551

552
  """
553
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
554
                         "--separator=|",
555
                         "--options=lv_name,lv_size,devices,vg_name"])
556
  if result.failed:
557
    _Fail("Failed to list logical volumes, lvs output: %s",
558
          result.output)
559

    
560
  def parse_dev(dev):
561
    if '(' in dev:
562
      return dev.split('(')[0]
563
    else:
564
      return dev
565

    
566
  def map_line(line):
567
    return {
568
      'name': line[0].strip(),
569
      'size': line[1].strip(),
570
      'dev': parse_dev(line[2].strip()),
571
      'vg': line[3].strip(),
572
    }
573

    
574
  return True, [map_line(line.split('|'))
575
                for line in result.stdout.splitlines()
576
                if line.count('|') >= 3]
577

    
578

    
579
def BridgesExist(bridges_list):
580
  """Check if a list of bridges exist on the current node.
581

582
  @rtype: boolean
583
  @return: C{True} if all of them exist, C{False} otherwise
584

585
  """
586
  missing = []
587
  for bridge in bridges_list:
588
    if not utils.BridgeExists(bridge):
589
      missing.append(bridge)
590

    
591
  if missing:
592
    _Fail("Missing bridges %s", ", ".join(missing))
593

    
594
  return True, None
595

    
596

    
597
def GetInstanceList(hypervisor_list):
598
  """Provides a list of instances.
599

600
  @type hypervisor_list: list
601
  @param hypervisor_list: the list of hypervisors to query information
602

603
  @rtype: list
604
  @return: a list of all running instances on the current node
605
    - instance1.example.com
606
    - instance2.example.com
607

608
  """
609
  results = []
610
  for hname in hypervisor_list:
611
    try:
612
      names = hypervisor.GetHypervisor(hname).ListInstances()
613
      results.extend(names)
614
    except errors.HypervisorError, err:
615
      _Fail("Error enumerating instances (hypervisor %s): %s",
616
            hname, err, exc=True)
617

    
618
  return results
619

    
620

    
621
def GetInstanceInfo(instance, hname):
622
  """Gives back the informations about an instance as a dictionary.
623

624
  @type instance: string
625
  @param instance: the instance name
626
  @type hname: string
627
  @param hname: the hypervisor type of the instance
628

629
  @rtype: dict
630
  @return: dictionary with the following keys:
631
      - memory: memory size of instance (int)
632
      - state: xen state of instance (string)
633
      - time: cpu time of instance (float)
634

635
  """
636
  output = {}
637

    
638
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
639
  if iinfo is not None:
640
    output['memory'] = iinfo[2]
641
    output['state'] = iinfo[4]
642
    output['time'] = iinfo[5]
643

    
644
  return True, output
645

    
646

    
647
def GetInstanceMigratable(instance):
648
  """Gives whether an instance can be migrated.
649

650
  @type instance: L{objects.Instance}
651
  @param instance: object representing the instance to be checked.
652

653
  @rtype: tuple
654
  @return: tuple of (result, description) where:
655
      - result: whether the instance can be migrated or not
656
      - description: a description of the issue, if relevant
657

658
  """
659
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
660
  iname = instance.name
661
  if iname not in hyper.ListInstances():
662
    _Fail("Instance %s is not running", iname)
663

    
664
  for idx in range(len(instance.disks)):
665
    link_name = _GetBlockDevSymlinkPath(iname, idx)
666
    if not os.path.islink(link_name):
667
      _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
668

    
669
  return True, None
670

    
671

    
672
def GetAllInstancesInfo(hypervisor_list):
673
  """Gather data about all instances.
674

675
  This is the equivalent of L{GetInstanceInfo}, except that it
676
  computes data for all instances at once, thus being faster if one
677
  needs data about more than one instance.
678

679
  @type hypervisor_list: list
680
  @param hypervisor_list: list of hypervisors to query for instance data
681

682
  @rtype: dict
683
  @return: dictionary of instance: data, with data having the following keys:
684
      - memory: memory size of instance (int)
685
      - state: xen state of instance (string)
686
      - time: cpu time of instance (float)
687
      - vcpus: the number of vcpus
688

689
  """
690
  output = {}
691

    
692
  for hname in hypervisor_list:
693
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
694
    if iinfo:
695
      for name, inst_id, memory, vcpus, state, times in iinfo:
696
        value = {
697
          'memory': memory,
698
          'vcpus': vcpus,
699
          'state': state,
700
          'time': times,
701
          }
702
        if name in output:
703
          # we only check static parameters, like memory and vcpus,
704
          # and not state and time which can change between the
705
          # invocations of the different hypervisors
706
          for key in 'memory', 'vcpus':
707
            if value[key] != output[name][key]:
708
              _Fail("Instance %s is running twice"
709
                    " with different parameters", name)
710
        output[name] = value
711

    
712
  return True, output
713

    
714

    
715
def InstanceOsAdd(instance, reinstall):
716
  """Add an OS to an instance.
717

718
  @type instance: L{objects.Instance}
719
  @param instance: Instance whose OS is to be installed
720
  @type reinstall: boolean
721
  @param reinstall: whether this is an instance reinstall
722
  @rtype: boolean
723
  @return: the success of the operation
724

725
  """
726
  inst_os = OSFromDisk(instance.os)
727

    
728

    
729
  create_env = OSEnvironment(instance)
730
  if reinstall:
731
    create_env['INSTANCE_REINSTALL'] = "1"
732

    
733
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
734
                                     instance.name, int(time.time()))
735

    
736
  result = utils.RunCmd([inst_os.create_script], env=create_env,
737
                        cwd=inst_os.path, output=logfile,)
738
  if result.failed:
739
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
740
                  " output: %s", result.cmd, result.fail_reason, logfile,
741
                  result.output)
742
    lines = [utils.SafeEncode(val)
743
             for val in utils.TailFile(logfile, lines=20)]
744
    _Fail("OS create script failed (%s), last lines in the"
745
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
746

    
747
  return (True, "Successfully installed")
748

    
749

    
750
def RunRenameInstance(instance, old_name):
751
  """Run the OS rename script for an instance.
752

753
  @type instance: L{objects.Instance}
754
  @param instance: Instance whose OS is to be installed
755
  @type old_name: string
756
  @param old_name: previous instance name
757
  @rtype: boolean
758
  @return: the success of the operation
759

760
  """
761
  inst_os = OSFromDisk(instance.os)
762

    
763
  rename_env = OSEnvironment(instance)
764
  rename_env['OLD_INSTANCE_NAME'] = old_name
765

    
766
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
767
                                           old_name,
768
                                           instance.name, int(time.time()))
769

    
770
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
771
                        cwd=inst_os.path, output=logfile)
772

    
773
  if result.failed:
774
    logging.error("os create command '%s' returned error: %s output: %s",
775
                  result.cmd, result.fail_reason, result.output)
776
    lines = [utils.SafeEncode(val)
777
             for val in utils.TailFile(logfile, lines=20)]
778
    _Fail("OS rename script failed (%s), last lines in the"
779
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
780

    
781
  return (True, "Rename successful")
782

    
783

    
784
def _GetVGInfo(vg_name):
785
  """Get informations about the volume group.
786

787
  @type vg_name: str
788
  @param vg_name: the volume group which we query
789
  @rtype: dict
790
  @return:
791
    A dictionary with the following keys:
792
      - C{vg_size} is the total size of the volume group in MiB
793
      - C{vg_free} is the free size of the volume group in MiB
794
      - C{pv_count} are the number of physical disks in that VG
795

796
    If an error occurs during gathering of data, we return the same dict
797
    with keys all set to None.
798

799
  """
800
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
801

    
802
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
803
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
804

    
805
  if retval.failed:
806
    logging.error("volume group %s not present", vg_name)
807
    return retdic
808
  valarr = retval.stdout.strip().rstrip(':').split(':')
809
  if len(valarr) == 3:
810
    try:
811
      retdic = {
812
        "vg_size": int(round(float(valarr[0]), 0)),
813
        "vg_free": int(round(float(valarr[1]), 0)),
814
        "pv_count": int(valarr[2]),
815
        }
816
    except ValueError, err:
817
      logging.exception("Fail to parse vgs output")
818
  else:
819
    logging.error("vgs output has the wrong number of fields (expected"
820
                  " three): %s", str(valarr))
821
  return retdic
822

    
823

    
824
def _GetBlockDevSymlinkPath(instance_name, idx):
825
  return os.path.join(constants.DISK_LINKS_DIR,
826
                      "%s:%d" % (instance_name, idx))
827

    
828

    
829
def _SymlinkBlockDev(instance_name, device_path, idx):
830
  """Set up symlinks to a instance's block device.
831

832
  This is an auxiliary function run when an instance is start (on the primary
833
  node) or when an instance is migrated (on the target node).
834

835

836
  @param instance_name: the name of the target instance
837
  @param device_path: path of the physical block device, on the node
838
  @param idx: the disk index
839
  @return: absolute path to the disk's symlink
840

841
  """
842
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
843
  try:
844
    os.symlink(device_path, link_name)
845
  except OSError, err:
846
    if err.errno == errno.EEXIST:
847
      if (not os.path.islink(link_name) or
848
          os.readlink(link_name) != device_path):
849
        os.remove(link_name)
850
        os.symlink(device_path, link_name)
851
    else:
852
      raise
853

    
854
  return link_name
855

    
856

    
857
def _RemoveBlockDevLinks(instance_name, disks):
858
  """Remove the block device symlinks belonging to the given instance.
859

860
  """
861
  for idx, disk in enumerate(disks):
862
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
863
    if os.path.islink(link_name):
864
      try:
865
        os.remove(link_name)
866
      except OSError:
867
        logging.exception("Can't remove symlink '%s'", link_name)
868

    
869

    
870
def _GatherAndLinkBlockDevs(instance):
871
  """Set up an instance's block device(s).
872

873
  This is run on the primary node at instance startup. The block
874
  devices must be already assembled.
875

876
  @type instance: L{objects.Instance}
877
  @param instance: the instance whose disks we shoul assemble
878
  @rtype: list
879
  @return: list of (disk_object, device_path)
880

881
  """
882
  block_devices = []
883
  for idx, disk in enumerate(instance.disks):
884
    device = _RecursiveFindBD(disk)
885
    if device is None:
886
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
887
                                    str(disk))
888
    device.Open()
889
    try:
890
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
891
    except OSError, e:
892
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
893
                                    e.strerror)
894

    
895
    block_devices.append((disk, link_name))
896

    
897
  return block_devices
898

    
899

    
900
def StartInstance(instance):
901
  """Start an instance.
902

903
  @type instance: L{objects.Instance}
904
  @param instance: the instance object
905
  @rtype: boolean
906
  @return: whether the startup was successful or not
907

908
  """
909
  running_instances = GetInstanceList([instance.hypervisor])
910

    
911
  if instance.name in running_instances:
912
    return (True, "Already running")
913

    
914
  try:
915
    block_devices = _GatherAndLinkBlockDevs(instance)
916
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
917
    hyper.StartInstance(instance, block_devices)
918
  except errors.BlockDeviceError, err:
919
    _Fail("Block device error: %s", err, exc=True)
920
  except errors.HypervisorError, err:
921
    _RemoveBlockDevLinks(instance.name, instance.disks)
922
    _Fail("Hypervisor error: %s", err, exc=True)
923

    
924
  return (True, "Instance started successfully")
925

    
926

    
927
def InstanceShutdown(instance):
928
  """Shut an instance down.
929

930
  @note: this functions uses polling with a hardcoded timeout.
931

932
  @type instance: L{objects.Instance}
933
  @param instance: the instance object
934
  @rtype: boolean
935
  @return: whether the startup was successful or not
936

937
  """
938
  hv_name = instance.hypervisor
939
  running_instances = GetInstanceList([hv_name])
940

    
941
  if instance.name not in running_instances:
942
    return (True, "Instance already stopped")
943

    
944
  hyper = hypervisor.GetHypervisor(hv_name)
945
  try:
946
    hyper.StopInstance(instance)
947
  except errors.HypervisorError, err:
948
    _Fail("Failed to stop instance %s: %s", instance.name, err)
949

    
950
  # test every 10secs for 2min
951

    
952
  time.sleep(1)
953
  for dummy in range(11):
954
    if instance.name not in GetInstanceList([hv_name]):
955
      break
956
    time.sleep(10)
957
  else:
958
    # the shutdown did not succeed
959
    logging.error("Shutdown of '%s' unsuccessful, using destroy",
960
                  instance.name)
961

    
962
    try:
963
      hyper.StopInstance(instance, force=True)
964
    except errors.HypervisorError, err:
965
      _Fail("Failed to force stop instance %s: %s", instance.name, err)
966

    
967
    time.sleep(1)
968
    if instance.name in GetInstanceList([hv_name]):
969
      _Fail("Could not shutdown instance %s even by destroy", instance.name)
970

    
971
  _RemoveBlockDevLinks(instance.name, instance.disks)
972

    
973
  return (True, "Instance has been shutdown successfully")
974

    
975

    
976
def InstanceReboot(instance, reboot_type):
977
  """Reboot an instance.
978

979
  @type instance: L{objects.Instance}
980
  @param instance: the instance object to reboot
981
  @type reboot_type: str
982
  @param reboot_type: the type of reboot, one the following
983
    constants:
984
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
985
        instance OS, do not recreate the VM
986
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
987
        restart the VM (at the hypervisor level)
988
      - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
989
        is not accepted here, since that mode is handled
990
        differently
991
  @rtype: boolean
992
  @return: the success of the operation
993

994
  """
995
  running_instances = GetInstanceList([instance.hypervisor])
996

    
997
  if instance.name not in running_instances:
998
    _Fail("Cannot reboot instance %s that is not running", instance.name)
999

    
1000
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1001
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1002
    try:
1003
      hyper.RebootInstance(instance)
1004
    except errors.HypervisorError, err:
1005
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1006
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1007
    try:
1008
      stop_result = InstanceShutdown(instance)
1009
      if not stop_result[0]:
1010
        return stop_result
1011
      return StartInstance(instance)
1012
    except errors.HypervisorError, err:
1013
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1014
  else:
1015
    _Fail("Invalid reboot_type received: %s", reboot_type)
1016

    
1017
  return (True, "Reboot successful")
1018

    
1019

    
1020
def MigrationInfo(instance):
1021
  """Gather information about an instance to be migrated.
1022

1023
  @type instance: L{objects.Instance}
1024
  @param instance: the instance definition
1025

1026
  """
1027
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1028
  try:
1029
    info = hyper.MigrationInfo(instance)
1030
  except errors.HypervisorError, err:
1031
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1032
  return (True, info)
1033

    
1034

    
1035
def AcceptInstance(instance, info, target):
1036
  """Prepare the node to accept an instance.
1037

1038
  @type instance: L{objects.Instance}
1039
  @param instance: the instance definition
1040
  @type info: string/data (opaque)
1041
  @param info: migration information, from the source node
1042
  @type target: string
1043
  @param target: target host (usually ip), on this node
1044

1045
  """
1046
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1047
  try:
1048
    hyper.AcceptInstance(instance, info, target)
1049
  except errors.HypervisorError, err:
1050
    _Fail("Failed to accept instance: %s", err, exc=True)
1051
  return (True, "Accept successfull")
1052

    
1053

    
1054
def FinalizeMigration(instance, info, success):
1055
  """Finalize any preparation to accept an instance.
1056

1057
  @type instance: L{objects.Instance}
1058
  @param instance: the instance definition
1059
  @type info: string/data (opaque)
1060
  @param info: migration information, from the source node
1061
  @type success: boolean
1062
  @param success: whether the migration was a success or a failure
1063

1064
  """
1065
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1066
  try:
1067
    hyper.FinalizeMigration(instance, info, success)
1068
  except errors.HypervisorError, err:
1069
    _Fail("Failed to finalize migration: %s", err, exc=True)
1070
  return (True, "Migration Finalized")
1071

    
1072

    
1073
def MigrateInstance(instance, target, live):
1074
  """Migrates an instance to another node.
1075

1076
  @type instance: L{objects.Instance}
1077
  @param instance: the instance definition
1078
  @type target: string
1079
  @param target: the target node name
1080
  @type live: boolean
1081
  @param live: whether the migration should be done live or not (the
1082
      interpretation of this parameter is left to the hypervisor)
1083
  @rtype: tuple
1084
  @return: a tuple of (success, msg) where:
1085
      - succes is a boolean denoting the success/failure of the operation
1086
      - msg is a string with details in case of failure
1087

1088
  """
1089
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1090

    
1091
  try:
1092
    hyper.MigrateInstance(instance.name, target, live)
1093
  except errors.HypervisorError, err:
1094
    _Fail("Failed to migrate instance: %s", err, exc=True)
1095
  return (True, "Migration successfull")
1096

    
1097

    
1098
def BlockdevCreate(disk, size, owner, on_primary, info):
1099
  """Creates a block device for an instance.
1100

1101
  @type disk: L{objects.Disk}
1102
  @param disk: the object describing the disk we should create
1103
  @type size: int
1104
  @param size: the size of the physical underlying device, in MiB
1105
  @type owner: str
1106
  @param owner: the name of the instance for which disk is created,
1107
      used for device cache data
1108
  @type on_primary: boolean
1109
  @param on_primary:  indicates if it is the primary node or not
1110
  @type info: string
1111
  @param info: string that will be sent to the physical device
1112
      creation, used for example to set (LVM) tags on LVs
1113

1114
  @return: the new unique_id of the device (this can sometime be
1115
      computed only after creation), or None. On secondary nodes,
1116
      it's not required to return anything.
1117

1118
  """
1119
  clist = []
1120
  if disk.children:
1121
    for child in disk.children:
1122
      try:
1123
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1124
      except errors.BlockDeviceError, err:
1125
        _Fail("Can't assemble device %s: %s", child, err)
1126
      if on_primary or disk.AssembleOnSecondary():
1127
        # we need the children open in case the device itself has to
1128
        # be assembled
1129
        try:
1130
          crdev.Open()
1131
        except errors.BlockDeviceError, err:
1132
          _Fail("Can't make child '%s' read-write: %s", child, err)
1133
      clist.append(crdev)
1134

    
1135
  try:
1136
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, size)
1137
  except errors.BlockDeviceError, err:
1138
    _Fail("Can't create block device: %s", err)
1139

    
1140
  if on_primary or disk.AssembleOnSecondary():
1141
    try:
1142
      device.Assemble()
1143
    except errors.BlockDeviceError, err:
1144
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1145
    device.SetSyncSpeed(constants.SYNC_SPEED)
1146
    if on_primary or disk.OpenOnSecondary():
1147
      try:
1148
        device.Open(force=True)
1149
      except errors.BlockDeviceError, err:
1150
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1151
    DevCacheManager.UpdateCache(device.dev_path, owner,
1152
                                on_primary, disk.iv_name)
1153

    
1154
  device.SetInfo(info)
1155

    
1156
  physical_id = device.unique_id
1157
  return True, physical_id
1158

    
1159

    
1160
def BlockdevRemove(disk):
1161
  """Remove a block device.
1162

1163
  @note: This is intended to be called recursively.
1164

1165
  @type disk: L{objects.Disk}
1166
  @param disk: the disk object we should remove
1167
  @rtype: boolean
1168
  @return: the success of the operation
1169

1170
  """
1171
  msgs = []
1172
  result = True
1173
  try:
1174
    rdev = _RecursiveFindBD(disk)
1175
  except errors.BlockDeviceError, err:
1176
    # probably can't attach
1177
    logging.info("Can't attach to device %s in remove", disk)
1178
    rdev = None
1179
  if rdev is not None:
1180
    r_path = rdev.dev_path
1181
    try:
1182
      rdev.Remove()
1183
    except errors.BlockDeviceError, err:
1184
      msgs.append(str(err))
1185
      result = False
1186
    if result:
1187
      DevCacheManager.RemoveCache(r_path)
1188

    
1189
  if disk.children:
1190
    for child in disk.children:
1191
      c_status, c_msg = BlockdevRemove(child)
1192
      result = result and c_status
1193
      if c_msg: # not an empty message
1194
        msgs.append(c_msg)
1195

    
1196
  if not result:
1197
    _Fail("; ".join(msgs))
1198

    
1199
  return True, None
1200

    
1201

    
1202
def _RecursiveAssembleBD(disk, owner, as_primary):
1203
  """Activate a block device for an instance.
1204

1205
  This is run on the primary and secondary nodes for an instance.
1206

1207
  @note: this function is called recursively.
1208

1209
  @type disk: L{objects.Disk}
1210
  @param disk: the disk we try to assemble
1211
  @type owner: str
1212
  @param owner: the name of the instance which owns the disk
1213
  @type as_primary: boolean
1214
  @param as_primary: if we should make the block device
1215
      read/write
1216

1217
  @return: the assembled device or None (in case no device
1218
      was assembled)
1219
  @raise errors.BlockDeviceError: in case there is an error
1220
      during the activation of the children or the device
1221
      itself
1222

1223
  """
1224
  children = []
1225
  if disk.children:
1226
    mcn = disk.ChildrenNeeded()
1227
    if mcn == -1:
1228
      mcn = 0 # max number of Nones allowed
1229
    else:
1230
      mcn = len(disk.children) - mcn # max number of Nones
1231
    for chld_disk in disk.children:
1232
      try:
1233
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1234
      except errors.BlockDeviceError, err:
1235
        if children.count(None) >= mcn:
1236
          raise
1237
        cdev = None
1238
        logging.error("Error in child activation (but continuing): %s",
1239
                      str(err))
1240
      children.append(cdev)
1241

    
1242
  if as_primary or disk.AssembleOnSecondary():
1243
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children)
1244
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1245
    result = r_dev
1246
    if as_primary or disk.OpenOnSecondary():
1247
      r_dev.Open()
1248
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1249
                                as_primary, disk.iv_name)
1250

    
1251
  else:
1252
    result = True
1253
  return result
1254

    
1255

    
1256
def BlockdevAssemble(disk, owner, as_primary):
1257
  """Activate a block device for an instance.
1258

1259
  This is a wrapper over _RecursiveAssembleBD.
1260

1261
  @rtype: str or boolean
1262
  @return: a C{/dev/...} path for primary nodes, and
1263
      C{True} for secondary nodes
1264

1265
  """
1266
  try:
1267
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1268
    if isinstance(result, bdev.BlockDev):
1269
      result = result.dev_path
1270
  except errors.BlockDeviceError, err:
1271
    _Fail("Error while assembling disk: %s", err, exc=True)
1272

    
1273
  return True, result
1274

    
1275

    
1276
def BlockdevShutdown(disk):
1277
  """Shut down a block device.
1278

1279
  First, if the device is assembled (Attach() is successfull), then
1280
  the device is shutdown. Then the children of the device are
1281
  shutdown.
1282

1283
  This function is called recursively. Note that we don't cache the
1284
  children or such, as oppossed to assemble, shutdown of different
1285
  devices doesn't require that the upper device was active.
1286

1287
  @type disk: L{objects.Disk}
1288
  @param disk: the description of the disk we should
1289
      shutdown
1290
  @rtype: boolean
1291
  @return: the success of the operation
1292

1293
  """
1294
  msgs = []
1295
  result = True
1296
  r_dev = _RecursiveFindBD(disk)
1297
  if r_dev is not None:
1298
    r_path = r_dev.dev_path
1299
    try:
1300
      r_dev.Shutdown()
1301
      DevCacheManager.RemoveCache(r_path)
1302
    except errors.BlockDeviceError, err:
1303
      msgs.append(str(err))
1304
      result = False
1305

    
1306
  if disk.children:
1307
    for child in disk.children:
1308
      c_status, c_msg = BlockdevShutdown(child)
1309
      result = result and c_status
1310
      if c_msg: # not an empty message
1311
        msgs.append(c_msg)
1312

    
1313
  if not result:
1314
    _Fail("; ".join(msgs))
1315
  return (True, None)
1316

    
1317

    
1318
def BlockdevAddchildren(parent_cdev, new_cdevs):
1319
  """Extend a mirrored block device.
1320

1321
  @type parent_cdev: L{objects.Disk}
1322
  @param parent_cdev: the disk to which we should add children
1323
  @type new_cdevs: list of L{objects.Disk}
1324
  @param new_cdevs: the list of children which we should add
1325
  @rtype: boolean
1326
  @return: the success of the operation
1327

1328
  """
1329
  parent_bdev = _RecursiveFindBD(parent_cdev)
1330
  if parent_bdev is None:
1331
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1332
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1333
  if new_bdevs.count(None) > 0:
1334
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1335
  parent_bdev.AddChildren(new_bdevs)
1336
  return (True, None)
1337

    
1338

    
1339
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1340
  """Shrink a mirrored block device.
1341

1342
  @type parent_cdev: L{objects.Disk}
1343
  @param parent_cdev: the disk from which we should remove children
1344
  @type new_cdevs: list of L{objects.Disk}
1345
  @param new_cdevs: the list of children which we should remove
1346
  @rtype: boolean
1347
  @return: the success of the operation
1348

1349
  """
1350
  parent_bdev = _RecursiveFindBD(parent_cdev)
1351
  if parent_bdev is None:
1352
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1353
  devs = []
1354
  for disk in new_cdevs:
1355
    rpath = disk.StaticDevPath()
1356
    if rpath is None:
1357
      bd = _RecursiveFindBD(disk)
1358
      if bd is None:
1359
        _Fail("Can't find device %s while removing children", disk)
1360
      else:
1361
        devs.append(bd.dev_path)
1362
    else:
1363
      devs.append(rpath)
1364
  parent_bdev.RemoveChildren(devs)
1365
  return (True, None)
1366

    
1367

    
1368
def BlockdevGetmirrorstatus(disks):
1369
  """Get the mirroring status of a list of devices.
1370

1371
  @type disks: list of L{objects.Disk}
1372
  @param disks: the list of disks which we should query
1373
  @rtype: disk
1374
  @return:
1375
      a list of (mirror_done, estimated_time) tuples, which
1376
      are the result of L{bdev.BlockDev.CombinedSyncStatus}
1377
  @raise errors.BlockDeviceError: if any of the disks cannot be
1378
      found
1379

1380
  """
1381
  stats = []
1382
  for dsk in disks:
1383
    rbd = _RecursiveFindBD(dsk)
1384
    if rbd is None:
1385
      _Fail("Can't find device %s", dsk)
1386
    stats.append(rbd.CombinedSyncStatus())
1387
  return True, stats
1388

    
1389

    
1390
def _RecursiveFindBD(disk):
1391
  """Check if a device is activated.
1392

1393
  If so, return informations about the real device.
1394

1395
  @type disk: L{objects.Disk}
1396
  @param disk: the disk object we need to find
1397

1398
  @return: None if the device can't be found,
1399
      otherwise the device instance
1400

1401
  """
1402
  children = []
1403
  if disk.children:
1404
    for chdisk in disk.children:
1405
      children.append(_RecursiveFindBD(chdisk))
1406

    
1407
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1408

    
1409

    
1410
def BlockdevFind(disk):
1411
  """Check if a device is activated.
1412

1413
  If it is, return informations about the real device.
1414

1415
  @type disk: L{objects.Disk}
1416
  @param disk: the disk to find
1417
  @rtype: None or tuple
1418
  @return: None if the disk cannot be found, otherwise a
1419
      tuple (device_path, major, minor, sync_percent,
1420
      estimated_time, is_degraded)
1421

1422
  """
1423
  try:
1424
    rbd = _RecursiveFindBD(disk)
1425
  except errors.BlockDeviceError, err:
1426
    _Fail("Failed to find device: %s", err, exc=True)
1427
  if rbd is None:
1428
    return (True, None)
1429
  return (True, (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus())
1430

    
1431

    
1432
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1433
  """Write a file to the filesystem.
1434

1435
  This allows the master to overwrite(!) a file. It will only perform
1436
  the operation if the file belongs to a list of configuration files.
1437

1438
  @type file_name: str
1439
  @param file_name: the target file name
1440
  @type data: str
1441
  @param data: the new contents of the file
1442
  @type mode: int
1443
  @param mode: the mode to give the file (can be None)
1444
  @type uid: int
1445
  @param uid: the owner of the file (can be -1 for default)
1446
  @type gid: int
1447
  @param gid: the group of the file (can be -1 for default)
1448
  @type atime: float
1449
  @param atime: the atime to set on the file (can be None)
1450
  @type mtime: float
1451
  @param mtime: the mtime to set on the file (can be None)
1452
  @rtype: boolean
1453
  @return: the success of the operation; errors are logged
1454
      in the node daemon log
1455

1456
  """
1457
  if not os.path.isabs(file_name):
1458
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1459

    
1460
  allowed_files = set([
1461
    constants.CLUSTER_CONF_FILE,
1462
    constants.ETC_HOSTS,
1463
    constants.SSH_KNOWN_HOSTS_FILE,
1464
    constants.VNC_PASSWORD_FILE,
1465
    constants.RAPI_CERT_FILE,
1466
    constants.RAPI_USERS_FILE,
1467
    ])
1468

    
1469
  for hv_name in constants.HYPER_TYPES:
1470
    hv_class = hypervisor.GetHypervisor(hv_name)
1471
    allowed_files.update(hv_class.GetAncillaryFiles())
1472

    
1473
  if file_name not in allowed_files:
1474
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1475
          file_name)
1476

    
1477
  raw_data = _Decompress(data)
1478

    
1479
  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1480
                  atime=atime, mtime=mtime)
1481
  return (True, "success")
1482

    
1483

    
1484
def WriteSsconfFiles(values):
1485
  """Update all ssconf files.
1486

1487
  Wrapper around the SimpleStore.WriteFiles.
1488

1489
  """
1490
  ssconf.SimpleStore().WriteFiles(values)
1491
  return True, None
1492

    
1493

    
1494
def _ErrnoOrStr(err):
1495
  """Format an EnvironmentError exception.
1496

1497
  If the L{err} argument has an errno attribute, it will be looked up
1498
  and converted into a textual C{E...} description. Otherwise the
1499
  string representation of the error will be returned.
1500

1501
  @type err: L{EnvironmentError}
1502
  @param err: the exception to format
1503

1504
  """
1505
  if hasattr(err, 'errno'):
1506
    detail = errno.errorcode[err.errno]
1507
  else:
1508
    detail = str(err)
1509
  return detail
1510

    
1511

    
1512
def _OSOndiskVersion(name, os_dir):
1513
  """Compute and return the API version of a given OS.
1514

1515
  This function will try to read the API version of the OS given by
1516
  the 'name' parameter and residing in the 'os_dir' directory.
1517

1518
  @type name: str
1519
  @param name: the OS name we should look for
1520
  @type os_dir: str
1521
  @param os_dir: the directory inwhich we should look for the OS
1522
  @rtype: tuple
1523
  @return: tuple (status, data) with status denoting the validity and
1524
      data holding either the vaid versions or an error message
1525

1526
  """
1527
  api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1528

    
1529
  try:
1530
    st = os.stat(api_file)
1531
  except EnvironmentError, err:
1532
    return False, ("Required file 'ganeti_api_version' file not"
1533
                   " found under path %s: %s" % (os_dir, _ErrnoOrStr(err)))
1534

    
1535
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1536
    return False, ("File 'ganeti_api_version' file at %s is not"
1537
                   " a regular file" % os_dir)
1538

    
1539
  try:
1540
    f = open(api_file)
1541
    try:
1542
      api_versions = f.readlines()
1543
    finally:
1544
      f.close()
1545
  except EnvironmentError, err:
1546
    return False, ("Error while reading the API version file at %s: %s" %
1547
                   (api_file, _ErrnoOrStr(err)))
1548

    
1549
  api_versions = [version.strip() for version in api_versions]
1550
  try:
1551
    api_versions = [int(version) for version in api_versions]
1552
  except (TypeError, ValueError), err:
1553
    return False, ("API version(s) can't be converted to integer: %s" %
1554
                   str(err))
1555

    
1556
  return True, api_versions
1557

    
1558

    
1559
def DiagnoseOS(top_dirs=None):
1560
  """Compute the validity for all OSes.
1561

1562
  @type top_dirs: list
1563
  @param top_dirs: the list of directories in which to
1564
      search (if not given defaults to
1565
      L{constants.OS_SEARCH_PATH})
1566
  @rtype: list of L{objects.OS}
1567
  @return: a list of tuples (name, path, status, diagnose)
1568
      for all (potential) OSes under all search paths, where:
1569
          - name is the (potential) OS name
1570
          - path is the full path to the OS
1571
          - status True/False is the validity of the OS
1572
          - diagnose is the error message for an invalid OS, otherwise empty
1573

1574
  """
1575
  if top_dirs is None:
1576
    top_dirs = constants.OS_SEARCH_PATH
1577

    
1578
  result = []
1579
  for dir_name in top_dirs:
1580
    if os.path.isdir(dir_name):
1581
      try:
1582
        f_names = utils.ListVisibleFiles(dir_name)
1583
      except EnvironmentError, err:
1584
        logging.exception("Can't list the OS directory %s", dir_name)
1585
        break
1586
      for name in f_names:
1587
        os_path = os.path.sep.join([dir_name, name])
1588
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1589
        if status:
1590
          diagnose = ""
1591
        else:
1592
          diagnose = os_inst
1593
        result.append((name, os_path, status, diagnose))
1594

    
1595
  return True, result
1596

    
1597

    
1598
def _TryOSFromDisk(name, base_dir=None):
1599
  """Create an OS instance from disk.
1600

1601
  This function will return an OS instance if the given name is a
1602
  valid OS name.
1603

1604
  @type base_dir: string
1605
  @keyword base_dir: Base directory containing OS installations.
1606
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1607
  @rtype: tuple
1608
  @return: success and either the OS instance if we find a valid one,
1609
      or error message
1610

1611
  """
1612
  if base_dir is None:
1613
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1614
    if os_dir is None:
1615
      return False, "Directory for OS %s not found in search path" % name
1616
  else:
1617
    os_dir = os.path.sep.join([base_dir, name])
1618

    
1619
  status, api_versions = _OSOndiskVersion(name, os_dir)
1620
  if not status:
1621
    # push the error up
1622
    return status, api_versions
1623

    
1624
  if constants.OS_API_VERSION not in api_versions:
1625
    return False, ("API version mismatch for path '%s': found %s, want %s." %
1626
                   (os_dir, api_versions, constants.OS_API_VERSION))
1627

    
1628
  # OS Scripts dictionary, we will populate it with the actual script names
1629
  os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1630

    
1631
  for script in os_scripts:
1632
    os_scripts[script] = os.path.sep.join([os_dir, script])
1633

    
1634
    try:
1635
      st = os.stat(os_scripts[script])
1636
    except EnvironmentError, err:
1637
      return False, ("Script '%s' under path '%s' is missing (%s)" %
1638
                     (script, os_dir, _ErrnoOrStr(err)))
1639

    
1640
    if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1641
      return False, ("Script '%s' under path '%s' is not executable" %
1642
                     (script, os_dir))
1643

    
1644
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1645
      return False, ("Script '%s' under path '%s' is not a regular file" %
1646
                     (script, os_dir))
1647

    
1648
  os_obj = objects.OS(name=name, path=os_dir,
1649
                      create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1650
                      export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1651
                      import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1652
                      rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1653
                      api_versions=api_versions)
1654
  return True, os_obj
1655

    
1656

    
1657
def OSFromDisk(name, base_dir=None):
1658
  """Create an OS instance from disk.
1659

1660
  This function will return an OS instance if the given name is a
1661
  valid OS name. Otherwise, it will raise an appropriate
1662
  L{RPCFail} exception, detailing why this is not a valid OS.
1663

1664
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1665
  an exception but returns true/false status data.
1666

1667
  @type base_dir: string
1668
  @keyword base_dir: Base directory containing OS installations.
1669
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1670
  @rtype: L{objects.OS}
1671
  @return: the OS instance if we find a valid one
1672
  @raise RPCFail: if we don't find a valid OS
1673

1674
  """
1675
  status, payload = _TryOSFromDisk(name, base_dir)
1676

    
1677
  if not status:
1678
    _Fail(payload)
1679

    
1680
  return payload
1681

    
1682

    
1683
def OSEnvironment(instance, debug=0):
1684
  """Calculate the environment for an os script.
1685

1686
  @type instance: L{objects.Instance}
1687
  @param instance: target instance for the os script run
1688
  @type debug: integer
1689
  @param debug: debug level (0 or 1, for OS Api 10)
1690
  @rtype: dict
1691
  @return: dict of environment variables
1692
  @raise errors.BlockDeviceError: if the block device
1693
      cannot be found
1694

1695
  """
1696
  result = {}
1697
  result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1698
  result['INSTANCE_NAME'] = instance.name
1699
  result['INSTANCE_OS'] = instance.os
1700
  result['HYPERVISOR'] = instance.hypervisor
1701
  result['DISK_COUNT'] = '%d' % len(instance.disks)
1702
  result['NIC_COUNT'] = '%d' % len(instance.nics)
1703
  result['DEBUG_LEVEL'] = '%d' % debug
1704
  for idx, disk in enumerate(instance.disks):
1705
    real_disk = _RecursiveFindBD(disk)
1706
    if real_disk is None:
1707
      raise errors.BlockDeviceError("Block device '%s' is not set up" %
1708
                                    str(disk))
1709
    real_disk.Open()
1710
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
1711
    result['DISK_%d_ACCESS' % idx] = disk.mode
1712
    if constants.HV_DISK_TYPE in instance.hvparams:
1713
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
1714
        instance.hvparams[constants.HV_DISK_TYPE]
1715
    if disk.dev_type in constants.LDS_BLOCK:
1716
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1717
    elif disk.dev_type == constants.LD_FILE:
1718
      result['DISK_%d_BACKEND_TYPE' % idx] = \
1719
        'file:%s' % disk.physical_id[0]
1720
  for idx, nic in enumerate(instance.nics):
1721
    result['NIC_%d_MAC' % idx] = nic.mac
1722
    if nic.ip:
1723
      result['NIC_%d_IP' % idx] = nic.ip
1724
    result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1725
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1726
      result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1727
    if nic.nicparams[constants.NIC_LINK]:
1728
      result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1729
    if constants.HV_NIC_TYPE in instance.hvparams:
1730
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
1731
        instance.hvparams[constants.HV_NIC_TYPE]
1732

    
1733
  return result
1734

    
1735
def BlockdevGrow(disk, amount):
1736
  """Grow a stack of block devices.
1737

1738
  This function is called recursively, with the childrens being the
1739
  first ones to resize.
1740

1741
  @type disk: L{objects.Disk}
1742
  @param disk: the disk to be grown
1743
  @rtype: (status, result)
1744
  @return: a tuple with the status of the operation
1745
      (True/False), and the errors message if status
1746
      is False
1747

1748
  """
1749
  r_dev = _RecursiveFindBD(disk)
1750
  if r_dev is None:
1751
    _Fail("Cannot find block device %s", disk)
1752

    
1753
  try:
1754
    r_dev.Grow(amount)
1755
  except errors.BlockDeviceError, err:
1756
    _Fail("Failed to grow block device: %s", err, exc=True)
1757

    
1758
  return True, None
1759

    
1760

    
1761
def BlockdevSnapshot(disk):
1762
  """Create a snapshot copy of a block device.
1763

1764
  This function is called recursively, and the snapshot is actually created
1765
  just for the leaf lvm backend device.
1766

1767
  @type disk: L{objects.Disk}
1768
  @param disk: the disk to be snapshotted
1769
  @rtype: string
1770
  @return: snapshot disk path
1771

1772
  """
1773
  if disk.children:
1774
    if len(disk.children) == 1:
1775
      # only one child, let's recurse on it
1776
      return BlockdevSnapshot(disk.children[0])
1777
    else:
1778
      # more than one child, choose one that matches
1779
      for child in disk.children:
1780
        if child.size == disk.size:
1781
          # return implies breaking the loop
1782
          return BlockdevSnapshot(child)
1783
  elif disk.dev_type == constants.LD_LV:
1784
    r_dev = _RecursiveFindBD(disk)
1785
    if r_dev is not None:
1786
      # let's stay on the safe side and ask for the full size, for now
1787
      return True, r_dev.Snapshot(disk.size)
1788
    else:
1789
      _Fail("Cannot find block device %s", disk)
1790
  else:
1791
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1792
          disk.unique_id, disk.dev_type)
1793

    
1794

    
1795
def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1796
  """Export a block device snapshot to a remote node.
1797

1798
  @type disk: L{objects.Disk}
1799
  @param disk: the description of the disk to export
1800
  @type dest_node: str
1801
  @param dest_node: the destination node to export to
1802
  @type instance: L{objects.Instance}
1803
  @param instance: the instance object to whom the disk belongs
1804
  @type cluster_name: str
1805
  @param cluster_name: the cluster name, needed for SSH hostalias
1806
  @type idx: int
1807
  @param idx: the index of the disk in the instance's disk list,
1808
      used to export to the OS scripts environment
1809
  @rtype: boolean
1810
  @return: the success of the operation
1811

1812
  """
1813
  export_env = OSEnvironment(instance)
1814

    
1815
  inst_os = OSFromDisk(instance.os)
1816
  export_script = inst_os.export_script
1817

    
1818
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1819
                                     instance.name, int(time.time()))
1820
  if not os.path.exists(constants.LOG_OS_DIR):
1821
    os.mkdir(constants.LOG_OS_DIR, 0750)
1822
  real_disk = _RecursiveFindBD(disk)
1823
  if real_disk is None:
1824
    _Fail("Block device '%s' is not set up", disk)
1825

    
1826
  real_disk.Open()
1827

    
1828
  export_env['EXPORT_DEVICE'] = real_disk.dev_path
1829
  export_env['EXPORT_INDEX'] = str(idx)
1830

    
1831
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1832
  destfile = disk.physical_id[1]
1833

    
1834
  # the target command is built out of three individual commands,
1835
  # which are joined by pipes; we check each individual command for
1836
  # valid parameters
1837
  expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1838
                               export_script, logfile)
1839

    
1840
  comprcmd = "gzip"
1841

    
1842
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1843
                                destdir, destdir, destfile)
1844
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1845
                                                   constants.GANETI_RUNAS,
1846
                                                   destcmd)
1847

    
1848
  # all commands have been checked, so we're safe to combine them
1849
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1850

    
1851
  result = utils.RunCmd(command, env=export_env)
1852

    
1853
  if result.failed:
1854
    _Fail("OS snapshot export command '%s' returned error: %s"
1855
          " output: %s", command, result.fail_reason, result.output)
1856

    
1857
  return (True, None)
1858

    
1859

    
1860
def FinalizeExport(instance, snap_disks):
1861
  """Write out the export configuration information.
1862

1863
  @type instance: L{objects.Instance}
1864
  @param instance: the instance which we export, used for
1865
      saving configuration
1866
  @type snap_disks: list of L{objects.Disk}
1867
  @param snap_disks: list of snapshot block devices, which
1868
      will be used to get the actual name of the dump file
1869

1870
  @rtype: boolean
1871
  @return: the success of the operation
1872

1873
  """
1874
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1875
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1876

    
1877
  config = objects.SerializableConfigParser()
1878

    
1879
  config.add_section(constants.INISECT_EXP)
1880
  config.set(constants.INISECT_EXP, 'version', '0')
1881
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1882
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1883
  config.set(constants.INISECT_EXP, 'os', instance.os)
1884
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
1885

    
1886
  config.add_section(constants.INISECT_INS)
1887
  config.set(constants.INISECT_INS, 'name', instance.name)
1888
  config.set(constants.INISECT_INS, 'memory', '%d' %
1889
             instance.beparams[constants.BE_MEMORY])
1890
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
1891
             instance.beparams[constants.BE_VCPUS])
1892
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1893

    
1894
  nic_total = 0
1895
  for nic_count, nic in enumerate(instance.nics):
1896
    nic_total += 1
1897
    config.set(constants.INISECT_INS, 'nic%d_mac' %
1898
               nic_count, '%s' % nic.mac)
1899
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1900
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1901
               '%s' % nic.bridge)
1902
  # TODO: redundant: on load can read nics until it doesn't exist
1903
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1904

    
1905
  disk_total = 0
1906
  for disk_count, disk in enumerate(snap_disks):
1907
    if disk:
1908
      disk_total += 1
1909
      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1910
                 ('%s' % disk.iv_name))
1911
      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1912
                 ('%s' % disk.physical_id[1]))
1913
      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1914
                 ('%d' % disk.size))
1915

    
1916
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1917

    
1918
  utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1919
                  data=config.Dumps())
1920
  shutil.rmtree(finaldestdir, True)
1921
  shutil.move(destdir, finaldestdir)
1922

    
1923
  return True, None
1924

    
1925

    
1926
def ExportInfo(dest):
1927
  """Get export configuration information.
1928

1929
  @type dest: str
1930
  @param dest: directory containing the export
1931

1932
  @rtype: L{objects.SerializableConfigParser}
1933
  @return: a serializable config file containing the
1934
      export info
1935

1936
  """
1937
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1938

    
1939
  config = objects.SerializableConfigParser()
1940
  config.read(cff)
1941

    
1942
  if (not config.has_section(constants.INISECT_EXP) or
1943
      not config.has_section(constants.INISECT_INS)):
1944
    _Fail("Export info file doesn't have the required fields")
1945

    
1946
  return True, config.Dumps()
1947

    
1948

    
1949
def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1950
  """Import an os image into an instance.
1951

1952
  @type instance: L{objects.Instance}
1953
  @param instance: instance to import the disks into
1954
  @type src_node: string
1955
  @param src_node: source node for the disk images
1956
  @type src_images: list of string
1957
  @param src_images: absolute paths of the disk images
1958
  @rtype: list of boolean
1959
  @return: each boolean represent the success of importing the n-th disk
1960

1961
  """
1962
  import_env = OSEnvironment(instance)
1963
  inst_os = OSFromDisk(instance.os)
1964
  import_script = inst_os.import_script
1965

    
1966
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1967
                                        instance.name, int(time.time()))
1968
  if not os.path.exists(constants.LOG_OS_DIR):
1969
    os.mkdir(constants.LOG_OS_DIR, 0750)
1970

    
1971
  comprcmd = "gunzip"
1972
  impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1973
                               import_script, logfile)
1974

    
1975
  final_result = []
1976
  for idx, image in enumerate(src_images):
1977
    if image:
1978
      destcmd = utils.BuildShellCmd('cat %s', image)
1979
      remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1980
                                                       constants.GANETI_RUNAS,
1981
                                                       destcmd)
1982
      command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1983
      import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1984
      import_env['IMPORT_INDEX'] = str(idx)
1985
      result = utils.RunCmd(command, env=import_env)
1986
      if result.failed:
1987
        logging.error("Disk import command '%s' returned error: %s"
1988
                      " output: %s", command, result.fail_reason,
1989
                      result.output)
1990
        final_result.append("error importing disk %d: %s, %s" %
1991
                            (idx, result.fail_reason, result.output[-100]))
1992

    
1993
  if final_result:
1994
    _Fail("; ".join(final_result), log=False)
1995
  return True, None
1996

    
1997

    
1998
def ListExports():
1999
  """Return a list of exports currently available on this machine.
2000

2001
  @rtype: list
2002
  @return: list of the exports
2003

2004
  """
2005
  if os.path.isdir(constants.EXPORT_DIR):
2006
    return True, utils.ListVisibleFiles(constants.EXPORT_DIR)
2007
  else:
2008
    _Fail("No exports directory")
2009

    
2010

    
2011
def RemoveExport(export):
2012
  """Remove an existing export from the node.
2013

2014
  @type export: str
2015
  @param export: the name of the export to remove
2016
  @rtype: boolean
2017
  @return: the success of the operation
2018

2019
  """
2020
  target = os.path.join(constants.EXPORT_DIR, export)
2021

    
2022
  try:
2023
    shutil.rmtree(target)
2024
  except EnvironmentError, err:
2025
    _Fail("Error while removing the export: %s", err, exc=True)
2026

    
2027
  return True, None
2028

    
2029

    
2030
def BlockdevRename(devlist):
2031
  """Rename a list of block devices.
2032

2033
  @type devlist: list of tuples
2034
  @param devlist: list of tuples of the form  (disk,
2035
      new_logical_id, new_physical_id); disk is an
2036
      L{objects.Disk} object describing the current disk,
2037
      and new logical_id/physical_id is the name we
2038
      rename it to
2039
  @rtype: boolean
2040
  @return: True if all renames succeeded, False otherwise
2041

2042
  """
2043
  msgs = []
2044
  result = True
2045
  for disk, unique_id in devlist:
2046
    dev = _RecursiveFindBD(disk)
2047
    if dev is None:
2048
      msgs.append("Can't find device %s in rename" % str(disk))
2049
      result = False
2050
      continue
2051
    try:
2052
      old_rpath = dev.dev_path
2053
      dev.Rename(unique_id)
2054
      new_rpath = dev.dev_path
2055
      if old_rpath != new_rpath:
2056
        DevCacheManager.RemoveCache(old_rpath)
2057
        # FIXME: we should add the new cache information here, like:
2058
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2059
        # but we don't have the owner here - maybe parse from existing
2060
        # cache? for now, we only lose lvm data when we rename, which
2061
        # is less critical than DRBD or MD
2062
    except errors.BlockDeviceError, err:
2063
      msgs.append("Can't rename device '%s' to '%s': %s" %
2064
                  (dev, unique_id, err))
2065
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2066
      result = False
2067
  if not result:
2068
    _Fail("; ".join(msgs))
2069
  return True, None
2070

    
2071

    
2072
def _TransformFileStorageDir(file_storage_dir):
2073
  """Checks whether given file_storage_dir is valid.
2074

2075
  Checks wheter the given file_storage_dir is within the cluster-wide
2076
  default file_storage_dir stored in SimpleStore. Only paths under that
2077
  directory are allowed.
2078

2079
  @type file_storage_dir: str
2080
  @param file_storage_dir: the path to check
2081

2082
  @return: the normalized path if valid, None otherwise
2083

2084
  """
2085
  cfg = _GetConfig()
2086
  file_storage_dir = os.path.normpath(file_storage_dir)
2087
  base_file_storage_dir = cfg.GetFileStorageDir()
2088
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2089
      base_file_storage_dir):
2090
    _Fail("File storage directory '%s' is not under base file"
2091
          " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2092
  return file_storage_dir
2093

    
2094

    
2095
def CreateFileStorageDir(file_storage_dir):
2096
  """Create file storage directory.
2097

2098
  @type file_storage_dir: str
2099
  @param file_storage_dir: directory to create
2100

2101
  @rtype: tuple
2102
  @return: tuple with first element a boolean indicating wheter dir
2103
      creation was successful or not
2104

2105
  """
2106
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2107
  if os.path.exists(file_storage_dir):
2108
    if not os.path.isdir(file_storage_dir):
2109
      _Fail("Specified storage dir '%s' is not a directory",
2110
            file_storage_dir)
2111
  else:
2112
    try:
2113
      os.makedirs(file_storage_dir, 0750)
2114
    except OSError, err:
2115
      _Fail("Cannot create file storage directory '%s': %s",
2116
            file_storage_dir, err, exc=True)
2117
  return True, None
2118

    
2119

    
2120
def RemoveFileStorageDir(file_storage_dir):
2121
  """Remove file storage directory.
2122

2123
  Remove it only if it's empty. If not log an error and return.
2124

2125
  @type file_storage_dir: str
2126
  @param file_storage_dir: the directory we should cleanup
2127
  @rtype: tuple (success,)
2128
  @return: tuple of one element, C{success}, denoting
2129
      whether the operation was successfull
2130

2131
  """
2132
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2133
  if os.path.exists(file_storage_dir):
2134
    if not os.path.isdir(file_storage_dir):
2135
      _Fail("Specified Storage directory '%s' is not a directory",
2136
            file_storage_dir)
2137
    # deletes dir only if empty, otherwise we want to fail the rpc call
2138
    try:
2139
      os.rmdir(file_storage_dir)
2140
    except OSError, err:
2141
      _Fail("Cannot remove file storage directory '%s': %s",
2142
            file_storage_dir, err)
2143

    
2144
  return True, None
2145

    
2146

    
2147
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2148
  """Rename the file storage directory.
2149

2150
  @type old_file_storage_dir: str
2151
  @param old_file_storage_dir: the current path
2152
  @type new_file_storage_dir: str
2153
  @param new_file_storage_dir: the name we should rename to
2154
  @rtype: tuple (success,)
2155
  @return: tuple of one element, C{success}, denoting
2156
      whether the operation was successful
2157

2158
  """
2159
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2160
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2161
  if not os.path.exists(new_file_storage_dir):
2162
    if os.path.isdir(old_file_storage_dir):
2163
      try:
2164
        os.rename(old_file_storage_dir, new_file_storage_dir)
2165
      except OSError, err:
2166
        _Fail("Cannot rename '%s' to '%s': %s",
2167
              old_file_storage_dir, new_file_storage_dir, err)
2168
    else:
2169
      _Fail("Specified storage dir '%s' is not a directory",
2170
            old_file_storage_dir)
2171
  else:
2172
    if os.path.exists(old_file_storage_dir):
2173
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2174
            old_file_storage_dir, new_file_storage_dir)
2175
  return True, None
2176

    
2177

    
2178
def _EnsureJobQueueFile(file_name):
2179
  """Checks whether the given filename is in the queue directory.
2180

2181
  @type file_name: str
2182
  @param file_name: the file name we should check
2183
  @rtype: None
2184
  @raises RPCFail: if the file is not valid
2185

2186
  """
2187
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2188
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2189

    
2190
  if not result:
2191
    _Fail("Passed job queue file '%s' does not belong to"
2192
          " the queue directory '%s'", file_name, queue_dir)
2193

    
2194

    
2195
def JobQueueUpdate(file_name, content):
2196
  """Updates a file in the queue directory.
2197

2198
  This is just a wrapper over L{utils.WriteFile}, with proper
2199
  checking.
2200

2201
  @type file_name: str
2202
  @param file_name: the job file name
2203
  @type content: str
2204
  @param content: the new job contents
2205
  @rtype: boolean
2206
  @return: the success of the operation
2207

2208
  """
2209
  _EnsureJobQueueFile(file_name)
2210

    
2211
  # Write and replace the file atomically
2212
  utils.WriteFile(file_name, data=_Decompress(content))
2213

    
2214
  return True, None
2215

    
2216

    
2217
def JobQueueRename(old, new):
2218
  """Renames a job queue file.
2219

2220
  This is just a wrapper over os.rename with proper checking.
2221

2222
  @type old: str
2223
  @param old: the old (actual) file name
2224
  @type new: str
2225
  @param new: the desired file name
2226
  @rtype: tuple
2227
  @return: the success of the operation and payload
2228

2229
  """
2230
  _EnsureJobQueueFile(old)
2231
  _EnsureJobQueueFile(new)
2232

    
2233
  utils.RenameFile(old, new, mkdir=True)
2234

    
2235
  return True, None
2236

    
2237

    
2238
def JobQueueSetDrainFlag(drain_flag):
2239
  """Set the drain flag for the queue.
2240

2241
  This will set or unset the queue drain flag.
2242

2243
  @type drain_flag: boolean
2244
  @param drain_flag: if True, will set the drain flag, otherwise reset it.
2245
  @rtype: truple
2246
  @return: always True, None
2247
  @warning: the function always returns True
2248

2249
  """
2250
  if drain_flag:
2251
    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2252
  else:
2253
    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2254

    
2255
  return True, None
2256

    
2257

    
2258
def BlockdevClose(instance_name, disks):
2259
  """Closes the given block devices.
2260

2261
  This means they will be switched to secondary mode (in case of
2262
  DRBD).
2263

2264
  @param instance_name: if the argument is not empty, the symlinks
2265
      of this instance will be removed
2266
  @type disks: list of L{objects.Disk}
2267
  @param disks: the list of disks to be closed
2268
  @rtype: tuple (success, message)
2269
  @return: a tuple of success and message, where success
2270
      indicates the succes of the operation, and message
2271
      which will contain the error details in case we
2272
      failed
2273

2274
  """
2275
  bdevs = []
2276
  for cf in disks:
2277
    rd = _RecursiveFindBD(cf)
2278
    if rd is None:
2279
      _Fail("Can't find device %s", cf)
2280
    bdevs.append(rd)
2281

    
2282
  msg = []
2283
  for rd in bdevs:
2284
    try:
2285
      rd.Close()
2286
    except errors.BlockDeviceError, err:
2287
      msg.append(str(err))
2288
  if msg:
2289
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2290
  else:
2291
    if instance_name:
2292
      _RemoveBlockDevLinks(instance_name, disks)
2293
    return (True, "All devices secondary")
2294

    
2295

    
2296
def ValidateHVParams(hvname, hvparams):
2297
  """Validates the given hypervisor parameters.
2298

2299
  @type hvname: string
2300
  @param hvname: the hypervisor name
2301
  @type hvparams: dict
2302
  @param hvparams: the hypervisor parameters to be validated
2303
  @rtype: tuple (success, message)
2304
  @return: a tuple of success and message, where success
2305
      indicates the succes of the operation, and message
2306
      which will contain the error details in case we
2307
      failed
2308

2309
  """
2310
  try:
2311
    hv_type = hypervisor.GetHypervisor(hvname)
2312
    hv_type.ValidateParameters(hvparams)
2313
    return (True, "Validation passed")
2314
  except errors.HypervisorError, err:
2315
    _Fail(str(err), log=False)
2316

    
2317

    
2318
def DemoteFromMC():
2319
  """Demotes the current node from master candidate role.
2320

2321
  """
2322
  # try to ensure we're not the master by mistake
2323
  master, myself = ssconf.GetMasterAndMyself()
2324
  if master == myself:
2325
    _Fail("ssconf status shows I'm the master node, will not demote")
2326
  pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2327
  if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2328
    _Fail("The master daemon is running, will not demote")
2329
  try:
2330
    utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2331
  except EnvironmentError, err:
2332
    if err.errno != errno.ENOENT:
2333
      _Fail("Error while backing up cluster file: %s", err, exc=True)
2334
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2335
  return (True, "Done")
2336

    
2337

    
2338
def _FindDisks(nodes_ip, disks):
2339
  """Sets the physical ID on disks and returns the block devices.
2340

2341
  """
2342
  # set the correct physical ID
2343
  my_name = utils.HostInfo().name
2344
  for cf in disks:
2345
    cf.SetPhysicalID(my_name, nodes_ip)
2346

    
2347
  bdevs = []
2348

    
2349
  for cf in disks:
2350
    rd = _RecursiveFindBD(cf)
2351
    if rd is None:
2352
      _Fail("Can't find device %s", cf)
2353
    bdevs.append(rd)
2354
  return bdevs
2355

    
2356

    
2357
def DrbdDisconnectNet(nodes_ip, disks):
2358
  """Disconnects the network on a list of drbd devices.
2359

2360
  """
2361
  bdevs = _FindDisks(nodes_ip, disks)
2362

    
2363
  # disconnect disks
2364
  for rd in bdevs:
2365
    try:
2366
      rd.DisconnectNet()
2367
    except errors.BlockDeviceError, err:
2368
      _Fail("Can't change network configuration to standalone mode: %s",
2369
            err, exc=True)
2370
  return (True, "All disks are now disconnected")
2371

    
2372

    
2373
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2374
  """Attaches the network on a list of drbd devices.
2375

2376
  """
2377
  bdevs = _FindDisks(nodes_ip, disks)
2378

    
2379
  if multimaster:
2380
    for idx, rd in enumerate(bdevs):
2381
      try:
2382
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2383
      except EnvironmentError, err:
2384
        _Fail("Can't create symlink: %s", err)
2385
  # reconnect disks, switch to new master configuration and if
2386
  # needed primary mode
2387
  for rd in bdevs:
2388
    try:
2389
      rd.AttachNet(multimaster)
2390
    except errors.BlockDeviceError, err:
2391
      _Fail("Can't change network configuration: %s", err)
2392
  # wait until the disks are connected; we need to retry the re-attach
2393
  # if the device becomes standalone, as this might happen if the one
2394
  # node disconnects and reconnects in a different mode before the
2395
  # other node reconnects; in this case, one or both of the nodes will
2396
  # decide it has wrong configuration and switch to standalone
2397
  RECONNECT_TIMEOUT = 2 * 60
2398
  sleep_time = 0.100 # start with 100 miliseconds
2399
  timeout_limit = time.time() + RECONNECT_TIMEOUT
2400
  while time.time() < timeout_limit:
2401
    all_connected = True
2402
    for rd in bdevs:
2403
      stats = rd.GetProcStatus()
2404
      if not (stats.is_connected or stats.is_in_resync):
2405
        all_connected = False
2406
      if stats.is_standalone:
2407
        # peer had different config info and this node became
2408
        # standalone, even though this should not happen with the
2409
        # new staged way of changing disk configs
2410
        try:
2411
          rd.ReAttachNet(multimaster)
2412
        except errors.BlockDeviceError, err:
2413
          _Fail("Can't change network configuration: %s", err)
2414
    if all_connected:
2415
      break
2416
    time.sleep(sleep_time)
2417
    sleep_time = min(5, sleep_time * 1.5)
2418
  if not all_connected:
2419
    _Fail("Timeout in disk reconnecting")
2420
  if multimaster:
2421
    # change to primary mode
2422
    for rd in bdevs:
2423
      try:
2424
        rd.Open()
2425
      except errors.BlockDeviceError, err:
2426
        _Fail("Can't change to primary mode: %s", err)
2427
  if multimaster:
2428
    msg = "multi-master and primary"
2429
  else:
2430
    msg = "single-master"
2431
  return (True, "Disks are now configured as %s" % msg)
2432

    
2433

    
2434
def DrbdWaitSync(nodes_ip, disks):
2435
  """Wait until DRBDs have synchronized.
2436

2437
  """
2438
  bdevs = _FindDisks(nodes_ip, disks)
2439

    
2440
  min_resync = 100
2441
  alldone = True
2442
  failure = False
2443
  for rd in bdevs:
2444
    stats = rd.GetProcStatus()
2445
    if not (stats.is_connected or stats.is_in_resync):
2446
      _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2447
    alldone = alldone and (not stats.is_in_resync)
2448
    if stats.sync_percent is not None:
2449
      min_resync = min(min_resync, stats.sync_percent)
2450

    
2451
  return (True, (alldone, min_resync))
2452

    
2453

    
2454
def PowercycleNode(hypervisor_type):
2455
  """Hard-powercycle the node.
2456

2457
  Because we need to return first, and schedule the powercycle in the
2458
  background, we won't be able to report failures nicely.
2459

2460
  """
2461
  hyper = hypervisor.GetHypervisor(hypervisor_type)
2462
  try:
2463
    pid = os.fork()
2464
  except OSError, err:
2465
    # if we can't fork, we'll pretend that we're in the child process
2466
    pid = 0
2467
  if pid > 0:
2468
    return (True, "Reboot scheduled in 5 seconds")
2469
  time.sleep(5)
2470
  hyper.PowercycleNode()
2471

    
2472

    
2473
class HooksRunner(object):
2474
  """Hook runner.
2475

2476
  This class is instantiated on the node side (ganeti-noded) and not
2477
  on the master side.
2478

2479
  """
2480
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2481

    
2482
  def __init__(self, hooks_base_dir=None):
2483
    """Constructor for hooks runner.
2484

2485
    @type hooks_base_dir: str or None
2486
    @param hooks_base_dir: if not None, this overrides the
2487
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
2488

2489
    """
2490
    if hooks_base_dir is None:
2491
      hooks_base_dir = constants.HOOKS_BASE_DIR
2492
    self._BASE_DIR = hooks_base_dir
2493

    
2494
  @staticmethod
2495
  def ExecHook(script, env):
2496
    """Exec one hook script.
2497

2498
    @type script: str
2499
    @param script: the full path to the script
2500
    @type env: dict
2501
    @param env: the environment with which to exec the script
2502
    @rtype: tuple (success, message)
2503
    @return: a tuple of success and message, where success
2504
        indicates the succes of the operation, and message
2505
        which will contain the error details in case we
2506
        failed
2507

2508
    """
2509
    # exec the process using subprocess and log the output
2510
    fdstdin = None
2511
    try:
2512
      fdstdin = open("/dev/null", "r")
2513
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2514
                               stderr=subprocess.STDOUT, close_fds=True,
2515
                               shell=False, cwd="/", env=env)
2516
      output = ""
2517
      try:
2518
        output = child.stdout.read(4096)
2519
        child.stdout.close()
2520
      except EnvironmentError, err:
2521
        output += "Hook script error: %s" % str(err)
2522

    
2523
      while True:
2524
        try:
2525
          result = child.wait()
2526
          break
2527
        except EnvironmentError, err:
2528
          if err.errno == errno.EINTR:
2529
            continue
2530
          raise
2531
    finally:
2532
      # try not to leak fds
2533
      for fd in (fdstdin, ):
2534
        if fd is not None:
2535
          try:
2536
            fd.close()
2537
          except EnvironmentError, err:
2538
            # just log the error
2539
            #logging.exception("Error while closing fd %s", fd)
2540
            pass
2541

    
2542
    return result == 0, utils.SafeEncode(output.strip())
2543

    
2544
  def RunHooks(self, hpath, phase, env):
2545
    """Run the scripts in the hooks directory.
2546

2547
    @type hpath: str
2548
    @param hpath: the path to the hooks directory which
2549
        holds the scripts
2550
    @type phase: str
2551
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
2552
        L{constants.HOOKS_PHASE_POST}
2553
    @type env: dict
2554
    @param env: dictionary with the environment for the hook
2555
    @rtype: list
2556
    @return: list of 3-element tuples:
2557
      - script path
2558
      - script result, either L{constants.HKR_SUCCESS} or
2559
        L{constants.HKR_FAIL}
2560
      - output of the script
2561

2562
    @raise errors.ProgrammerError: for invalid input
2563
        parameters
2564

2565
    """
2566
    if phase == constants.HOOKS_PHASE_PRE:
2567
      suffix = "pre"
2568
    elif phase == constants.HOOKS_PHASE_POST:
2569
      suffix = "post"
2570
    else:
2571
      _Fail("Unknown hooks phase '%s'", phase)
2572

    
2573
    rr = []
2574

    
2575
    subdir = "%s-%s.d" % (hpath, suffix)
2576
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2577
    try:
2578
      dir_contents = utils.ListVisibleFiles(dir_name)
2579
    except OSError, err:
2580
      # FIXME: must log output in case of failures
2581
      return True, rr
2582

    
2583
    # we use the standard python sort order,
2584
    # so 00name is the recommended naming scheme
2585
    dir_contents.sort()
2586
    for relname in dir_contents:
2587
      fname = os.path.join(dir_name, relname)
2588
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2589
          self.RE_MASK.match(relname) is not None):
2590
        rrval = constants.HKR_SKIP
2591
        output = ""
2592
      else:
2593
        result, output = self.ExecHook(fname, env)
2594
        if not result:
2595
          rrval = constants.HKR_FAIL
2596
        else:
2597
          rrval = constants.HKR_SUCCESS
2598
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
2599

    
2600
    return True, rr
2601

    
2602

    
2603
class IAllocatorRunner(object):
2604
  """IAllocator runner.
2605

2606
  This class is instantiated on the node side (ganeti-noded) and not on
2607
  the master side.
2608

2609
  """
2610
  def Run(self, name, idata):
2611
    """Run an iallocator script.
2612

2613
    @type name: str
2614
    @param name: the iallocator script name
2615
    @type idata: str
2616
    @param idata: the allocator input data
2617

2618
    @rtype: tuple
2619
    @return: two element tuple of:
2620
       - status
2621
       - either error message or stdout of allocator (for success)
2622

2623
    """
2624
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2625
                                  os.path.isfile)
2626
    if alloc_script is None:
2627
      _Fail("iallocator module '%s' not found in the search path", name)
2628

    
2629
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2630
    try:
2631
      os.write(fd, idata)
2632
      os.close(fd)
2633
      result = utils.RunCmd([alloc_script, fin_name])
2634
      if result.failed:
2635
        _Fail("iallocator module '%s' failed: %s, output '%s'",
2636
              name, result.fail_reason, result.output)
2637
    finally:
2638
      os.unlink(fin_name)
2639

    
2640
    return True, result.stdout
2641

    
2642

    
2643
class DevCacheManager(object):
2644
  """Simple class for managing a cache of block device information.
2645

2646
  """
2647
  _DEV_PREFIX = "/dev/"
2648
  _ROOT_DIR = constants.BDEV_CACHE_DIR
2649

    
2650
  @classmethod
2651
  def _ConvertPath(cls, dev_path):
2652
    """Converts a /dev/name path to the cache file name.
2653

2654
    This replaces slashes with underscores and strips the /dev
2655
    prefix. It then returns the full path to the cache file.
2656

2657
    @type dev_path: str
2658
    @param dev_path: the C{/dev/} path name
2659
    @rtype: str
2660
    @return: the converted path name
2661

2662
    """
2663
    if dev_path.startswith(cls._DEV_PREFIX):
2664
      dev_path = dev_path[len(cls._DEV_PREFIX):]
2665
    dev_path = dev_path.replace("/", "_")
2666
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2667
    return fpath
2668

    
2669
  @classmethod
2670
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2671
    """Updates the cache information for a given device.
2672

2673
    @type dev_path: str
2674
    @param dev_path: the pathname of the device
2675
    @type owner: str
2676
    @param owner: the owner (instance name) of the device
2677
    @type on_primary: bool
2678
    @param on_primary: whether this is the primary
2679
        node nor not
2680
    @type iv_name: str
2681
    @param iv_name: the instance-visible name of the
2682
        device, as in objects.Disk.iv_name
2683

2684
    @rtype: None
2685

2686
    """
2687
    if dev_path is None:
2688
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
2689
      return
2690
    fpath = cls._ConvertPath(dev_path)
2691
    if on_primary:
2692
      state = "primary"
2693
    else:
2694
      state = "secondary"
2695
    if iv_name is None:
2696
      iv_name = "not_visible"
2697
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2698
    try:
2699
      utils.WriteFile(fpath, data=fdata)
2700
    except EnvironmentError, err:
2701
      logging.exception("Can't update bdev cache for %s", dev_path)
2702

    
2703
  @classmethod
2704
  def RemoveCache(cls, dev_path):
2705
    """Remove data for a dev_path.
2706

2707
    This is just a wrapper over L{utils.RemoveFile} with a converted
2708
    path name and logging.
2709

2710
    @type dev_path: str
2711
    @param dev_path: the pathname of the device
2712

2713
    @rtype: None
2714

2715
    """
2716
    if dev_path is None:
2717
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
2718
      return
2719
    fpath = cls._ConvertPath(dev_path)
2720
    try:
2721
      utils.RemoveFile(fpath)
2722
    except EnvironmentError, err:
2723
      logging.exception("Can't update bdev cache for %s", dev_path)