Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 7a0156dc

History | View | Annotate | Download (83.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26

27
"""
28

    
29

    
30
import os
31
import os.path
32
import shutil
33
import time
34
import stat
35
import errno
36
import re
37
import subprocess
38
import random
39
import logging
40
import tempfile
41
import zlib
42
import base64
43

    
44
from ganeti import errors
45
from ganeti import utils
46
from ganeti import ssh
47
from ganeti import hypervisor
48
from ganeti import constants
49
from ganeti import bdev
50
from ganeti import objects
51
from ganeti import ssconf
52

    
53

    
54
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
55

    
56

    
57
class RPCFail(Exception):
58
  """Class denoting RPC failure.
59

60
  Its argument is the error message.
61

62
  """
63

    
64

    
65
def _Fail(msg, *args, **kwargs):
66
  """Log an error and the raise an RPCFail exception.
67

68
  This exception is then handled specially in the ganeti daemon and
69
  turned into a 'failed' return type. As such, this function is a
70
  useful shortcut for logging the error and returning it to the master
71
  daemon.
72

73
  @type msg: string
74
  @param msg: the text of the exception
75
  @raise RPCFail
76

77
  """
78
  if args:
79
    msg = msg % args
80
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
81
    if "exc" in kwargs and kwargs["exc"]:
82
      logging.exception(msg)
83
    else:
84
      logging.error(msg)
85
  raise RPCFail(msg)
86

    
87

    
88
def _GetConfig():
89
  """Simple wrapper to return a SimpleStore.
90

91
  @rtype: L{ssconf.SimpleStore}
92
  @return: a SimpleStore instance
93

94
  """
95
  return ssconf.SimpleStore()
96

    
97

    
98
def _GetSshRunner(cluster_name):
99
  """Simple wrapper to return an SshRunner.
100

101
  @type cluster_name: str
102
  @param cluster_name: the cluster name, which is needed
103
      by the SshRunner constructor
104
  @rtype: L{ssh.SshRunner}
105
  @return: an SshRunner instance
106

107
  """
108
  return ssh.SshRunner(cluster_name)
109

    
110

    
111
def _Decompress(data):
112
  """Unpacks data compressed by the RPC client.
113

114
  @type data: list or tuple
115
  @param data: Data sent by RPC client
116
  @rtype: str
117
  @return: Decompressed data
118

119
  """
120
  assert isinstance(data, (list, tuple))
121
  assert len(data) == 2
122
  (encoding, content) = data
123
  if encoding == constants.RPC_ENCODING_NONE:
124
    return content
125
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
126
    return zlib.decompress(base64.b64decode(content))
127
  else:
128
    raise AssertionError("Unknown data encoding")
129

    
130

    
131
def _CleanDirectory(path, exclude=None):
132
  """Removes all regular files in a directory.
133

134
  @type path: str
135
  @param path: the directory to clean
136
  @type exclude: list
137
  @param exclude: list of files to be excluded, defaults
138
      to the empty list
139

140
  """
141
  if not os.path.isdir(path):
142
    return
143
  if exclude is None:
144
    exclude = []
145
  else:
146
    # Normalize excluded paths
147
    exclude = [os.path.normpath(i) for i in exclude]
148

    
149
  for rel_name in utils.ListVisibleFiles(path):
150
    full_name = os.path.normpath(os.path.join(path, rel_name))
151
    if full_name in exclude:
152
      continue
153
    if os.path.isfile(full_name) and not os.path.islink(full_name):
154
      utils.RemoveFile(full_name)
155

    
156

    
157
def _BuildUploadFileList():
158
  """Build the list of allowed upload files.
159

160
  This is abstracted so that it's built only once at module import time.
161

162
  """
163
  allowed_files = set([
164
    constants.CLUSTER_CONF_FILE,
165
    constants.ETC_HOSTS,
166
    constants.SSH_KNOWN_HOSTS_FILE,
167
    constants.VNC_PASSWORD_FILE,
168
    constants.RAPI_CERT_FILE,
169
    constants.RAPI_USERS_FILE,
170
    constants.HMAC_CLUSTER_KEY,
171
    ])
172

    
173
  for hv_name in constants.HYPER_TYPES:
174
    hv_class = hypervisor.GetHypervisorClass(hv_name)
175
    allowed_files.update(hv_class.GetAncillaryFiles())
176

    
177
  return frozenset(allowed_files)
178

    
179

    
180
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
181

    
182

    
183
def JobQueuePurge():
184
  """Removes job queue files and archived jobs.
185

186
  @rtype: tuple
187
  @return: True, None
188

189
  """
190
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
191
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
192

    
193

    
194
def GetMasterInfo():
195
  """Returns master information.
196

197
  This is an utility function to compute master information, either
198
  for consumption here or from the node daemon.
199

200
  @rtype: tuple
201
  @return: master_netdev, master_ip, master_name
202
  @raise RPCFail: in case of errors
203

204
  """
205
  try:
206
    cfg = _GetConfig()
207
    master_netdev = cfg.GetMasterNetdev()
208
    master_ip = cfg.GetMasterIP()
209
    master_node = cfg.GetMasterNode()
210
  except errors.ConfigurationError, err:
211
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
212
  return (master_netdev, master_ip, master_node)
213

    
214

    
215
def StartMaster(start_daemons, no_voting):
216
  """Activate local node as master node.
217

218
  The function will always try activate the IP address of the master
219
  (unless someone else has it). It will also start the master daemons,
220
  based on the start_daemons parameter.
221

222
  @type start_daemons: boolean
223
  @param start_daemons: whether to also start the master
224
      daemons (ganeti-masterd and ganeti-rapi)
225
  @type no_voting: boolean
226
  @param no_voting: whether to start ganeti-masterd without a node vote
227
      (if start_daemons is True), but still non-interactively
228
  @rtype: None
229

230
  """
231
  # GetMasterInfo will raise an exception if not able to return data
232
  master_netdev, master_ip, _ = GetMasterInfo()
233

    
234
  err_msgs = []
235
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
236
    if utils.OwnIpAddress(master_ip):
237
      # we already have the ip:
238
      logging.debug("Master IP already configured, doing nothing")
239
    else:
240
      msg = "Someone else has the master ip, not activating"
241
      logging.error(msg)
242
      err_msgs.append(msg)
243
  else:
244
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
245
                           "dev", master_netdev, "label",
246
                           "%s:0" % master_netdev])
247
    if result.failed:
248
      msg = "Can't activate master IP: %s" % result.output
249
      logging.error(msg)
250
      err_msgs.append(msg)
251

    
252
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
253
                           "-s", master_ip, master_ip])
254
    # we'll ignore the exit code of arping
255

    
256
  # and now start the master and rapi daemons
257
  if start_daemons:
258
    daemons_params = {
259
        'ganeti-masterd': [],
260
        'ganeti-rapi': [],
261
        }
262
    if no_voting:
263
      daemons_params['ganeti-masterd'].append('--no-voting')
264
      daemons_params['ganeti-masterd'].append('--yes-do-it')
265
    for daemon in daemons_params:
266
      cmd = [daemon]
267
      cmd.extend(daemons_params[daemon])
268
      result = utils.RunCmd(cmd)
269
      if result.failed:
270
        msg = "Can't start daemon %s: %s" % (daemon, result.output)
271
        logging.error(msg)
272
        err_msgs.append(msg)
273

    
274
  if err_msgs:
275
    _Fail("; ".join(err_msgs))
276

    
277

    
278
def StopMaster(stop_daemons):
279
  """Deactivate this node as master.
280

281
  The function will always try to deactivate the IP address of the
282
  master. It will also stop the master daemons depending on the
283
  stop_daemons parameter.
284

285
  @type stop_daemons: boolean
286
  @param stop_daemons: whether to also stop the master daemons
287
      (ganeti-masterd and ganeti-rapi)
288
  @rtype: None
289

290
  """
291
  # TODO: log and report back to the caller the error failures; we
292
  # need to decide in which case we fail the RPC for this
293

    
294
  # GetMasterInfo will raise an exception if not able to return data
295
  master_netdev, master_ip, _ = GetMasterInfo()
296

    
297
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
298
                         "dev", master_netdev])
299
  if result.failed:
300
    logging.error("Can't remove the master IP, error: %s", result.output)
301
    # but otherwise ignore the failure
302

    
303
  if stop_daemons:
304
    # stop/kill the rapi and the master daemon
305
    for daemon in constants.RAPI, constants.MASTERD:
306
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
307

    
308

    
309
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
310
  """Joins this node to the cluster.
311

312
  This does the following:
313
      - updates the hostkeys of the machine (rsa and dsa)
314
      - adds the ssh private key to the user
315
      - adds the ssh public key to the users' authorized_keys file
316

317
  @type dsa: str
318
  @param dsa: the DSA private key to write
319
  @type dsapub: str
320
  @param dsapub: the DSA public key to write
321
  @type rsa: str
322
  @param rsa: the RSA private key to write
323
  @type rsapub: str
324
  @param rsapub: the RSA public key to write
325
  @type sshkey: str
326
  @param sshkey: the SSH private key to write
327
  @type sshpub: str
328
  @param sshpub: the SSH public key to write
329
  @rtype: boolean
330
  @return: the success of the operation
331

332
  """
333
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
334
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
335
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
336
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
337
  for name, content, mode in sshd_keys:
338
    utils.WriteFile(name, data=content, mode=mode)
339

    
340
  try:
341
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
342
                                                    mkdir=True)
343
  except errors.OpExecError, err:
344
    _Fail("Error while processing user ssh files: %s", err, exc=True)
345

    
346
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
347
    utils.WriteFile(name, data=content, mode=0600)
348

    
349
  utils.AddAuthorizedKey(auth_keys, sshpub)
350

    
351
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
352

    
353

    
354
def LeaveCluster():
355
  """Cleans up and remove the current node.
356

357
  This function cleans up and prepares the current node to be removed
358
  from the cluster.
359

360
  If processing is successful, then it raises an
361
  L{errors.QuitGanetiException} which is used as a special case to
362
  shutdown the node daemon.
363

364
  """
365
  _CleanDirectory(constants.DATA_DIR)
366
  JobQueuePurge()
367

    
368
  try:
369
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
370

    
371
    utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
372

    
373
    utils.RemoveFile(priv_key)
374
    utils.RemoveFile(pub_key)
375
  except errors.OpExecError:
376
    logging.exception("Error while processing ssh files")
377

    
378
  # Raise a custom exception (handled in ganeti-noded)
379
  raise errors.QuitGanetiException(True, 'Shutdown scheduled')
380

    
381

    
382
def GetNodeInfo(vgname, hypervisor_type):
383
  """Gives back a hash with different information about the node.
384

385
  @type vgname: C{string}
386
  @param vgname: the name of the volume group to ask for disk space information
387
  @type hypervisor_type: C{str}
388
  @param hypervisor_type: the name of the hypervisor to ask for
389
      memory information
390
  @rtype: C{dict}
391
  @return: dictionary with the following keys:
392
      - vg_size is the size of the configured volume group in MiB
393
      - vg_free is the free size of the volume group in MiB
394
      - memory_dom0 is the memory allocated for domain0 in MiB
395
      - memory_free is the currently available (free) ram in MiB
396
      - memory_total is the total number of ram in MiB
397

398
  """
399
  outputarray = {}
400
  vginfo = _GetVGInfo(vgname)
401
  outputarray['vg_size'] = vginfo['vg_size']
402
  outputarray['vg_free'] = vginfo['vg_free']
403

    
404
  hyper = hypervisor.GetHypervisor(hypervisor_type)
405
  hyp_info = hyper.GetNodeInfo()
406
  if hyp_info is not None:
407
    outputarray.update(hyp_info)
408

    
409
  outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
410

    
411
  return outputarray
412

    
413

    
414
def VerifyNode(what, cluster_name):
415
  """Verify the status of the local node.
416

417
  Based on the input L{what} parameter, various checks are done on the
418
  local node.
419

420
  If the I{filelist} key is present, this list of
421
  files is checksummed and the file/checksum pairs are returned.
422

423
  If the I{nodelist} key is present, we check that we have
424
  connectivity via ssh with the target nodes (and check the hostname
425
  report).
426

427
  If the I{node-net-test} key is present, we check that we have
428
  connectivity to the given nodes via both primary IP and, if
429
  applicable, secondary IPs.
430

431
  @type what: C{dict}
432
  @param what: a dictionary of things to check:
433
      - filelist: list of files for which to compute checksums
434
      - nodelist: list of nodes we should check ssh communication with
435
      - node-net-test: list of nodes we should check node daemon port
436
        connectivity with
437
      - hypervisor: list with hypervisors to run the verify for
438
  @rtype: dict
439
  @return: a dictionary with the same keys as the input dict, and
440
      values representing the result of the checks
441

442
  """
443
  result = {}
444

    
445
  if constants.NV_HYPERVISOR in what:
446
    result[constants.NV_HYPERVISOR] = tmp = {}
447
    for hv_name in what[constants.NV_HYPERVISOR]:
448
      tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
449

    
450
  if constants.NV_FILELIST in what:
451
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
452
      what[constants.NV_FILELIST])
453

    
454
  if constants.NV_NODELIST in what:
455
    result[constants.NV_NODELIST] = tmp = {}
456
    random.shuffle(what[constants.NV_NODELIST])
457
    for node in what[constants.NV_NODELIST]:
458
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
459
      if not success:
460
        tmp[node] = message
461

    
462
  if constants.NV_NODENETTEST in what:
463
    result[constants.NV_NODENETTEST] = tmp = {}
464
    my_name = utils.HostInfo().name
465
    my_pip = my_sip = None
466
    for name, pip, sip in what[constants.NV_NODENETTEST]:
467
      if name == my_name:
468
        my_pip = pip
469
        my_sip = sip
470
        break
471
    if not my_pip:
472
      tmp[my_name] = ("Can't find my own primary/secondary IP"
473
                      " in the node list")
474
    else:
475
      port = utils.GetDaemonPort(constants.NODED)
476
      for name, pip, sip in what[constants.NV_NODENETTEST]:
477
        fail = []
478
        if not utils.TcpPing(pip, port, source=my_pip):
479
          fail.append("primary")
480
        if sip != pip:
481
          if not utils.TcpPing(sip, port, source=my_sip):
482
            fail.append("secondary")
483
        if fail:
484
          tmp[name] = ("failure using the %s interface(s)" %
485
                       " and ".join(fail))
486

    
487
  if constants.NV_LVLIST in what:
488
    result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
489

    
490
  if constants.NV_INSTANCELIST in what:
491
    result[constants.NV_INSTANCELIST] = GetInstanceList(
492
      what[constants.NV_INSTANCELIST])
493

    
494
  if constants.NV_VGLIST in what:
495
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
496

    
497
  if constants.NV_VERSION in what:
498
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
499
                                    constants.RELEASE_VERSION)
500

    
501
  if constants.NV_HVINFO in what:
502
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
503
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
504

    
505
  if constants.NV_DRBDLIST in what:
506
    try:
507
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
508
    except errors.BlockDeviceError, err:
509
      logging.warning("Can't get used minors list", exc_info=True)
510
      used_minors = str(err)
511
    result[constants.NV_DRBDLIST] = used_minors
512

    
513
  return result
514

    
515

    
516
def GetVolumeList(vg_name):
517
  """Compute list of logical volumes and their size.
518

519
  @type vg_name: str
520
  @param vg_name: the volume group whose LVs we should list
521
  @rtype: dict
522
  @return:
523
      dictionary of all partions (key) with value being a tuple of
524
      their size (in MiB), inactive and online status::
525

526
        {'test1': ('20.06', True, True)}
527

528
      in case of errors, a string is returned with the error
529
      details.
530

531
  """
532
  lvs = {}
533
  sep = '|'
534
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
535
                         "--separator=%s" % sep,
536
                         "-olv_name,lv_size,lv_attr", vg_name])
537
  if result.failed:
538
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
539

    
540
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
541
  for line in result.stdout.splitlines():
542
    line = line.strip()
543
    match = valid_line_re.match(line)
544
    if not match:
545
      logging.error("Invalid line returned from lvs output: '%s'", line)
546
      continue
547
    name, size, attr = match.groups()
548
    inactive = attr[4] == '-'
549
    online = attr[5] == 'o'
550
    virtual = attr[0] == 'v'
551
    if virtual:
552
      # we don't want to report such volumes as existing, since they
553
      # don't really hold data
554
      continue
555
    lvs[name] = (size, inactive, online)
556

    
557
  return lvs
558

    
559

    
560
def ListVolumeGroups():
561
  """List the volume groups and their size.
562

563
  @rtype: dict
564
  @return: dictionary with keys volume name and values the
565
      size of the volume
566

567
  """
568
  return utils.ListVolumeGroups()
569

    
570

    
571
def NodeVolumes():
572
  """List all volumes on this node.
573

574
  @rtype: list
575
  @return:
576
    A list of dictionaries, each having four keys:
577
      - name: the logical volume name,
578
      - size: the size of the logical volume
579
      - dev: the physical device on which the LV lives
580
      - vg: the volume group to which it belongs
581

582
    In case of errors, we return an empty list and log the
583
    error.
584

585
    Note that since a logical volume can live on multiple physical
586
    volumes, the resulting list might include a logical volume
587
    multiple times.
588

589
  """
590
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
591
                         "--separator=|",
592
                         "--options=lv_name,lv_size,devices,vg_name"])
593
  if result.failed:
594
    _Fail("Failed to list logical volumes, lvs output: %s",
595
          result.output)
596

    
597
  def parse_dev(dev):
598
    if '(' in dev:
599
      return dev.split('(')[0]
600
    else:
601
      return dev
602

    
603
  def map_line(line):
604
    return {
605
      'name': line[0].strip(),
606
      'size': line[1].strip(),
607
      'dev': parse_dev(line[2].strip()),
608
      'vg': line[3].strip(),
609
    }
610

    
611
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
612
          if line.count('|') >= 3]
613

    
614

    
615
def BridgesExist(bridges_list):
616
  """Check if a list of bridges exist on the current node.
617

618
  @rtype: boolean
619
  @return: C{True} if all of them exist, C{False} otherwise
620

621
  """
622
  missing = []
623
  for bridge in bridges_list:
624
    if not utils.BridgeExists(bridge):
625
      missing.append(bridge)
626

    
627
  if missing:
628
    _Fail("Missing bridges %s", ", ".join(missing))
629

    
630

    
631
def GetInstanceList(hypervisor_list):
632
  """Provides a list of instances.
633

634
  @type hypervisor_list: list
635
  @param hypervisor_list: the list of hypervisors to query information
636

637
  @rtype: list
638
  @return: a list of all running instances on the current node
639
    - instance1.example.com
640
    - instance2.example.com
641

642
  """
643
  results = []
644
  for hname in hypervisor_list:
645
    try:
646
      names = hypervisor.GetHypervisor(hname).ListInstances()
647
      results.extend(names)
648
    except errors.HypervisorError, err:
649
      _Fail("Error enumerating instances (hypervisor %s): %s",
650
            hname, err, exc=True)
651

    
652
  return results
653

    
654

    
655
def GetInstanceInfo(instance, hname):
656
  """Gives back the information about an instance as a dictionary.
657

658
  @type instance: string
659
  @param instance: the instance name
660
  @type hname: string
661
  @param hname: the hypervisor type of the instance
662

663
  @rtype: dict
664
  @return: dictionary with the following keys:
665
      - memory: memory size of instance (int)
666
      - state: xen state of instance (string)
667
      - time: cpu time of instance (float)
668

669
  """
670
  output = {}
671

    
672
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
673
  if iinfo is not None:
674
    output['memory'] = iinfo[2]
675
    output['state'] = iinfo[4]
676
    output['time'] = iinfo[5]
677

    
678
  return output
679

    
680

    
681
def GetInstanceMigratable(instance):
682
  """Gives whether an instance can be migrated.
683

684
  @type instance: L{objects.Instance}
685
  @param instance: object representing the instance to be checked.
686

687
  @rtype: tuple
688
  @return: tuple of (result, description) where:
689
      - result: whether the instance can be migrated or not
690
      - description: a description of the issue, if relevant
691

692
  """
693
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
694
  iname = instance.name
695
  if iname not in hyper.ListInstances():
696
    _Fail("Instance %s is not running", iname)
697

    
698
  for idx in range(len(instance.disks)):
699
    link_name = _GetBlockDevSymlinkPath(iname, idx)
700
    if not os.path.islink(link_name):
701
      _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
702

    
703

    
704
def GetAllInstancesInfo(hypervisor_list):
705
  """Gather data about all instances.
706

707
  This is the equivalent of L{GetInstanceInfo}, except that it
708
  computes data for all instances at once, thus being faster if one
709
  needs data about more than one instance.
710

711
  @type hypervisor_list: list
712
  @param hypervisor_list: list of hypervisors to query for instance data
713

714
  @rtype: dict
715
  @return: dictionary of instance: data, with data having the following keys:
716
      - memory: memory size of instance (int)
717
      - state: xen state of instance (string)
718
      - time: cpu time of instance (float)
719
      - vcpus: the number of vcpus
720

721
  """
722
  output = {}
723

    
724
  for hname in hypervisor_list:
725
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
726
    if iinfo:
727
      for name, _, memory, vcpus, state, times in iinfo:
728
        value = {
729
          'memory': memory,
730
          'vcpus': vcpus,
731
          'state': state,
732
          'time': times,
733
          }
734
        if name in output:
735
          # we only check static parameters, like memory and vcpus,
736
          # and not state and time which can change between the
737
          # invocations of the different hypervisors
738
          for key in 'memory', 'vcpus':
739
            if value[key] != output[name][key]:
740
              _Fail("Instance %s is running twice"
741
                    " with different parameters", name)
742
        output[name] = value
743

    
744
  return output
745

    
746

    
747
def InstanceOsAdd(instance, reinstall):
748
  """Add an OS to an instance.
749

750
  @type instance: L{objects.Instance}
751
  @param instance: Instance whose OS is to be installed
752
  @type reinstall: boolean
753
  @param reinstall: whether this is an instance reinstall
754
  @rtype: None
755

756
  """
757
  inst_os = OSFromDisk(instance.os)
758

    
759
  create_env = OSEnvironment(instance, inst_os)
760
  if reinstall:
761
    create_env['INSTANCE_REINSTALL'] = "1"
762

    
763
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
764
                                     instance.name, int(time.time()))
765

    
766
  result = utils.RunCmd([inst_os.create_script], env=create_env,
767
                        cwd=inst_os.path, output=logfile,)
768
  if result.failed:
769
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
770
                  " output: %s", result.cmd, result.fail_reason, logfile,
771
                  result.output)
772
    lines = [utils.SafeEncode(val)
773
             for val in utils.TailFile(logfile, lines=20)]
774
    _Fail("OS create script failed (%s), last lines in the"
775
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
776

    
777

    
778
def RunRenameInstance(instance, old_name):
779
  """Run the OS rename script for an instance.
780

781
  @type instance: L{objects.Instance}
782
  @param instance: Instance whose OS is to be installed
783
  @type old_name: string
784
  @param old_name: previous instance name
785
  @rtype: boolean
786
  @return: the success of the operation
787

788
  """
789
  inst_os = OSFromDisk(instance.os)
790

    
791
  rename_env = OSEnvironment(instance, inst_os)
792
  rename_env['OLD_INSTANCE_NAME'] = old_name
793

    
794
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
795
                                           old_name,
796
                                           instance.name, int(time.time()))
797

    
798
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
799
                        cwd=inst_os.path, output=logfile)
800

    
801
  if result.failed:
802
    logging.error("os create command '%s' returned error: %s output: %s",
803
                  result.cmd, result.fail_reason, result.output)
804
    lines = [utils.SafeEncode(val)
805
             for val in utils.TailFile(logfile, lines=20)]
806
    _Fail("OS rename script failed (%s), last lines in the"
807
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
808

    
809

    
810
def _GetVGInfo(vg_name):
811
  """Get information about the volume group.
812

813
  @type vg_name: str
814
  @param vg_name: the volume group which we query
815
  @rtype: dict
816
  @return:
817
    A dictionary with the following keys:
818
      - C{vg_size} is the total size of the volume group in MiB
819
      - C{vg_free} is the free size of the volume group in MiB
820
      - C{pv_count} are the number of physical disks in that VG
821

822
    If an error occurs during gathering of data, we return the same dict
823
    with keys all set to None.
824

825
  """
826
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
827

    
828
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
829
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
830

    
831
  if retval.failed:
832
    logging.error("volume group %s not present", vg_name)
833
    return retdic
834
  valarr = retval.stdout.strip().rstrip(':').split(':')
835
  if len(valarr) == 3:
836
    try:
837
      retdic = {
838
        "vg_size": int(round(float(valarr[0]), 0)),
839
        "vg_free": int(round(float(valarr[1]), 0)),
840
        "pv_count": int(valarr[2]),
841
        }
842
    except ValueError, err:
843
      logging.exception("Fail to parse vgs output: %s", err)
844
  else:
845
    logging.error("vgs output has the wrong number of fields (expected"
846
                  " three): %s", str(valarr))
847
  return retdic
848

    
849

    
850
def _GetBlockDevSymlinkPath(instance_name, idx):
851
  return os.path.join(constants.DISK_LINKS_DIR,
852
                      "%s:%d" % (instance_name, idx))
853

    
854

    
855
def _SymlinkBlockDev(instance_name, device_path, idx):
856
  """Set up symlinks to a instance's block device.
857

858
  This is an auxiliary function run when an instance is start (on the primary
859
  node) or when an instance is migrated (on the target node).
860

861

862
  @param instance_name: the name of the target instance
863
  @param device_path: path of the physical block device, on the node
864
  @param idx: the disk index
865
  @return: absolute path to the disk's symlink
866

867
  """
868
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
869
  try:
870
    os.symlink(device_path, link_name)
871
  except OSError, err:
872
    if err.errno == errno.EEXIST:
873
      if (not os.path.islink(link_name) or
874
          os.readlink(link_name) != device_path):
875
        os.remove(link_name)
876
        os.symlink(device_path, link_name)
877
    else:
878
      raise
879

    
880
  return link_name
881

    
882

    
883
def _RemoveBlockDevLinks(instance_name, disks):
884
  """Remove the block device symlinks belonging to the given instance.
885

886
  """
887
  for idx, _ in enumerate(disks):
888
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
889
    if os.path.islink(link_name):
890
      try:
891
        os.remove(link_name)
892
      except OSError:
893
        logging.exception("Can't remove symlink '%s'", link_name)
894

    
895

    
896
def _GatherAndLinkBlockDevs(instance):
897
  """Set up an instance's block device(s).
898

899
  This is run on the primary node at instance startup. The block
900
  devices must be already assembled.
901

902
  @type instance: L{objects.Instance}
903
  @param instance: the instance whose disks we shoul assemble
904
  @rtype: list
905
  @return: list of (disk_object, device_path)
906

907
  """
908
  block_devices = []
909
  for idx, disk in enumerate(instance.disks):
910
    device = _RecursiveFindBD(disk)
911
    if device is None:
912
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
913
                                    str(disk))
914
    device.Open()
915
    try:
916
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
917
    except OSError, e:
918
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
919
                                    e.strerror)
920

    
921
    block_devices.append((disk, link_name))
922

    
923
  return block_devices
924

    
925

    
926
def StartInstance(instance):
927
  """Start an instance.
928

929
  @type instance: L{objects.Instance}
930
  @param instance: the instance object
931
  @rtype: None
932

933
  """
934
  running_instances = GetInstanceList([instance.hypervisor])
935

    
936
  if instance.name in running_instances:
937
    logging.info("Instance %s already running, not starting", instance.name)
938
    return
939

    
940
  try:
941
    block_devices = _GatherAndLinkBlockDevs(instance)
942
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
943
    hyper.StartInstance(instance, block_devices)
944
  except errors.BlockDeviceError, err:
945
    _Fail("Block device error: %s", err, exc=True)
946
  except errors.HypervisorError, err:
947
    _RemoveBlockDevLinks(instance.name, instance.disks)
948
    _Fail("Hypervisor error: %s", err, exc=True)
949

    
950

    
951
def InstanceShutdown(instance):
952
  """Shut an instance down.
953

954
  @note: this functions uses polling with a hardcoded timeout.
955

956
  @type instance: L{objects.Instance}
957
  @param instance: the instance object
958
  @rtype: None
959

960
  """
961
  hv_name = instance.hypervisor
962
  running_instances = GetInstanceList([hv_name])
963
  iname = instance.name
964

    
965
  if iname not in running_instances:
966
    logging.info("Instance %s not running, doing nothing", iname)
967
    return
968

    
969
  hyper = hypervisor.GetHypervisor(hv_name)
970
  try:
971
    hyper.StopInstance(instance)
972
  except errors.HypervisorError, err:
973
    _Fail("Failed to stop instance %s: %s", iname, err)
974

    
975
  # test every 10secs for 2min
976

    
977
  time.sleep(1)
978
  for _ in range(11):
979
    if instance.name not in GetInstanceList([hv_name]):
980
      break
981
    time.sleep(10)
982
  else:
983
    # the shutdown did not succeed
984
    logging.error("Shutdown of '%s' unsuccessful, using destroy", iname)
985

    
986
    try:
987
      hyper.StopInstance(instance, force=True)
988
    except errors.HypervisorError, err:
989
      _Fail("Failed to force stop instance %s: %s", iname, err)
990

    
991
    time.sleep(1)
992
    if instance.name in GetInstanceList([hv_name]):
993
      _Fail("Could not shutdown instance %s even by destroy", iname)
994

    
995
  _RemoveBlockDevLinks(iname, instance.disks)
996

    
997

    
998
def InstanceReboot(instance, reboot_type):
999
  """Reboot an instance.
1000

1001
  @type instance: L{objects.Instance}
1002
  @param instance: the instance object to reboot
1003
  @type reboot_type: str
1004
  @param reboot_type: the type of reboot, one the following
1005
    constants:
1006
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1007
        instance OS, do not recreate the VM
1008
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1009
        restart the VM (at the hypervisor level)
1010
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1011
        not accepted here, since that mode is handled differently, in
1012
        cmdlib, and translates into full stop and start of the
1013
        instance (instead of a call_instance_reboot RPC)
1014
  @rtype: None
1015

1016
  """
1017
  running_instances = GetInstanceList([instance.hypervisor])
1018

    
1019
  if instance.name not in running_instances:
1020
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1021

    
1022
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1023
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1024
    try:
1025
      hyper.RebootInstance(instance)
1026
    except errors.HypervisorError, err:
1027
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1028
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1029
    try:
1030
      InstanceShutdown(instance)
1031
      return StartInstance(instance)
1032
    except errors.HypervisorError, err:
1033
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1034
  else:
1035
    _Fail("Invalid reboot_type received: %s", reboot_type)
1036

    
1037

    
1038
def MigrationInfo(instance):
1039
  """Gather information about an instance to be migrated.
1040

1041
  @type instance: L{objects.Instance}
1042
  @param instance: the instance definition
1043

1044
  """
1045
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1046
  try:
1047
    info = hyper.MigrationInfo(instance)
1048
  except errors.HypervisorError, err:
1049
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1050
  return info
1051

    
1052

    
1053
def AcceptInstance(instance, info, target):
1054
  """Prepare the node to accept an instance.
1055

1056
  @type instance: L{objects.Instance}
1057
  @param instance: the instance definition
1058
  @type info: string/data (opaque)
1059
  @param info: migration information, from the source node
1060
  @type target: string
1061
  @param target: target host (usually ip), on this node
1062

1063
  """
1064
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1065
  try:
1066
    hyper.AcceptInstance(instance, info, target)
1067
  except errors.HypervisorError, err:
1068
    _Fail("Failed to accept instance: %s", err, exc=True)
1069

    
1070

    
1071
def FinalizeMigration(instance, info, success):
1072
  """Finalize any preparation to accept an instance.
1073

1074
  @type instance: L{objects.Instance}
1075
  @param instance: the instance definition
1076
  @type info: string/data (opaque)
1077
  @param info: migration information, from the source node
1078
  @type success: boolean
1079
  @param success: whether the migration was a success or a failure
1080

1081
  """
1082
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1083
  try:
1084
    hyper.FinalizeMigration(instance, info, success)
1085
  except errors.HypervisorError, err:
1086
    _Fail("Failed to finalize migration: %s", err, exc=True)
1087

    
1088

    
1089
def MigrateInstance(instance, target, live):
1090
  """Migrates an instance to another node.
1091

1092
  @type instance: L{objects.Instance}
1093
  @param instance: the instance definition
1094
  @type target: string
1095
  @param target: the target node name
1096
  @type live: boolean
1097
  @param live: whether the migration should be done live or not (the
1098
      interpretation of this parameter is left to the hypervisor)
1099
  @rtype: tuple
1100
  @return: a tuple of (success, msg) where:
1101
      - succes is a boolean denoting the success/failure of the operation
1102
      - msg is a string with details in case of failure
1103

1104
  """
1105
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1106

    
1107
  try:
1108
    hyper.MigrateInstance(instance.name, target, live)
1109
  except errors.HypervisorError, err:
1110
    _Fail("Failed to migrate instance: %s", err, exc=True)
1111

    
1112

    
1113
def BlockdevCreate(disk, size, owner, on_primary, info):
1114
  """Creates a block device for an instance.
1115

1116
  @type disk: L{objects.Disk}
1117
  @param disk: the object describing the disk we should create
1118
  @type size: int
1119
  @param size: the size of the physical underlying device, in MiB
1120
  @type owner: str
1121
  @param owner: the name of the instance for which disk is created,
1122
      used for device cache data
1123
  @type on_primary: boolean
1124
  @param on_primary:  indicates if it is the primary node or not
1125
  @type info: string
1126
  @param info: string that will be sent to the physical device
1127
      creation, used for example to set (LVM) tags on LVs
1128

1129
  @return: the new unique_id of the device (this can sometime be
1130
      computed only after creation), or None. On secondary nodes,
1131
      it's not required to return anything.
1132

1133
  """
1134
  clist = []
1135
  if disk.children:
1136
    for child in disk.children:
1137
      try:
1138
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1139
      except errors.BlockDeviceError, err:
1140
        _Fail("Can't assemble device %s: %s", child, err)
1141
      if on_primary or disk.AssembleOnSecondary():
1142
        # we need the children open in case the device itself has to
1143
        # be assembled
1144
        try:
1145
          crdev.Open()
1146
        except errors.BlockDeviceError, err:
1147
          _Fail("Can't make child '%s' read-write: %s", child, err)
1148
      clist.append(crdev)
1149

    
1150
  try:
1151
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1152
  except errors.BlockDeviceError, err:
1153
    _Fail("Can't create block device: %s", err)
1154

    
1155
  if on_primary or disk.AssembleOnSecondary():
1156
    try:
1157
      device.Assemble()
1158
    except errors.BlockDeviceError, err:
1159
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1160
    device.SetSyncSpeed(constants.SYNC_SPEED)
1161
    if on_primary or disk.OpenOnSecondary():
1162
      try:
1163
        device.Open(force=True)
1164
      except errors.BlockDeviceError, err:
1165
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1166
    DevCacheManager.UpdateCache(device.dev_path, owner,
1167
                                on_primary, disk.iv_name)
1168

    
1169
  device.SetInfo(info)
1170

    
1171
  return device.unique_id
1172

    
1173

    
1174
def BlockdevRemove(disk):
1175
  """Remove a block device.
1176

1177
  @note: This is intended to be called recursively.
1178

1179
  @type disk: L{objects.Disk}
1180
  @param disk: the disk object we should remove
1181
  @rtype: boolean
1182
  @return: the success of the operation
1183

1184
  """
1185
  msgs = []
1186
  try:
1187
    rdev = _RecursiveFindBD(disk)
1188
  except errors.BlockDeviceError, err:
1189
    # probably can't attach
1190
    logging.info("Can't attach to device %s in remove", disk)
1191
    rdev = None
1192
  if rdev is not None:
1193
    r_path = rdev.dev_path
1194
    try:
1195
      rdev.Remove()
1196
    except errors.BlockDeviceError, err:
1197
      msgs.append(str(err))
1198
    if not msgs:
1199
      DevCacheManager.RemoveCache(r_path)
1200

    
1201
  if disk.children:
1202
    for child in disk.children:
1203
      try:
1204
        BlockdevRemove(child)
1205
      except RPCFail, err:
1206
        msgs.append(str(err))
1207

    
1208
  if msgs:
1209
    _Fail("; ".join(msgs))
1210

    
1211

    
1212
def _RecursiveAssembleBD(disk, owner, as_primary):
1213
  """Activate a block device for an instance.
1214

1215
  This is run on the primary and secondary nodes for an instance.
1216

1217
  @note: this function is called recursively.
1218

1219
  @type disk: L{objects.Disk}
1220
  @param disk: the disk we try to assemble
1221
  @type owner: str
1222
  @param owner: the name of the instance which owns the disk
1223
  @type as_primary: boolean
1224
  @param as_primary: if we should make the block device
1225
      read/write
1226

1227
  @return: the assembled device or None (in case no device
1228
      was assembled)
1229
  @raise errors.BlockDeviceError: in case there is an error
1230
      during the activation of the children or the device
1231
      itself
1232

1233
  """
1234
  children = []
1235
  if disk.children:
1236
    mcn = disk.ChildrenNeeded()
1237
    if mcn == -1:
1238
      mcn = 0 # max number of Nones allowed
1239
    else:
1240
      mcn = len(disk.children) - mcn # max number of Nones
1241
    for chld_disk in disk.children:
1242
      try:
1243
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1244
      except errors.BlockDeviceError, err:
1245
        if children.count(None) >= mcn:
1246
          raise
1247
        cdev = None
1248
        logging.error("Error in child activation (but continuing): %s",
1249
                      str(err))
1250
      children.append(cdev)
1251

    
1252
  if as_primary or disk.AssembleOnSecondary():
1253
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1254
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1255
    result = r_dev
1256
    if as_primary or disk.OpenOnSecondary():
1257
      r_dev.Open()
1258
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1259
                                as_primary, disk.iv_name)
1260

    
1261
  else:
1262
    result = True
1263
  return result
1264

    
1265

    
1266
def BlockdevAssemble(disk, owner, as_primary):
1267
  """Activate a block device for an instance.
1268

1269
  This is a wrapper over _RecursiveAssembleBD.
1270

1271
  @rtype: str or boolean
1272
  @return: a C{/dev/...} path for primary nodes, and
1273
      C{True} for secondary nodes
1274

1275
  """
1276
  try:
1277
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1278
    if isinstance(result, bdev.BlockDev):
1279
      result = result.dev_path
1280
  except errors.BlockDeviceError, err:
1281
    _Fail("Error while assembling disk: %s", err, exc=True)
1282

    
1283
  return result
1284

    
1285

    
1286
def BlockdevShutdown(disk):
1287
  """Shut down a block device.
1288

1289
  First, if the device is assembled (Attach() is successful), then
1290
  the device is shutdown. Then the children of the device are
1291
  shutdown.
1292

1293
  This function is called recursively. Note that we don't cache the
1294
  children or such, as oppossed to assemble, shutdown of different
1295
  devices doesn't require that the upper device was active.
1296

1297
  @type disk: L{objects.Disk}
1298
  @param disk: the description of the disk we should
1299
      shutdown
1300
  @rtype: None
1301

1302
  """
1303
  msgs = []
1304
  r_dev = _RecursiveFindBD(disk)
1305
  if r_dev is not None:
1306
    r_path = r_dev.dev_path
1307
    try:
1308
      r_dev.Shutdown()
1309
      DevCacheManager.RemoveCache(r_path)
1310
    except errors.BlockDeviceError, err:
1311
      msgs.append(str(err))
1312

    
1313
  if disk.children:
1314
    for child in disk.children:
1315
      try:
1316
        BlockdevShutdown(child)
1317
      except RPCFail, err:
1318
        msgs.append(str(err))
1319

    
1320
  if msgs:
1321
    _Fail("; ".join(msgs))
1322

    
1323

    
1324
def BlockdevAddchildren(parent_cdev, new_cdevs):
1325
  """Extend a mirrored block device.
1326

1327
  @type parent_cdev: L{objects.Disk}
1328
  @param parent_cdev: the disk to which we should add children
1329
  @type new_cdevs: list of L{objects.Disk}
1330
  @param new_cdevs: the list of children which we should add
1331
  @rtype: None
1332

1333
  """
1334
  parent_bdev = _RecursiveFindBD(parent_cdev)
1335
  if parent_bdev is None:
1336
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1337
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1338
  if new_bdevs.count(None) > 0:
1339
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1340
  parent_bdev.AddChildren(new_bdevs)
1341

    
1342

    
1343
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1344
  """Shrink a mirrored block device.
1345

1346
  @type parent_cdev: L{objects.Disk}
1347
  @param parent_cdev: the disk from which we should remove children
1348
  @type new_cdevs: list of L{objects.Disk}
1349
  @param new_cdevs: the list of children which we should remove
1350
  @rtype: None
1351

1352
  """
1353
  parent_bdev = _RecursiveFindBD(parent_cdev)
1354
  if parent_bdev is None:
1355
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1356
  devs = []
1357
  for disk in new_cdevs:
1358
    rpath = disk.StaticDevPath()
1359
    if rpath is None:
1360
      bd = _RecursiveFindBD(disk)
1361
      if bd is None:
1362
        _Fail("Can't find device %s while removing children", disk)
1363
      else:
1364
        devs.append(bd.dev_path)
1365
    else:
1366
      devs.append(rpath)
1367
  parent_bdev.RemoveChildren(devs)
1368

    
1369

    
1370
def BlockdevGetmirrorstatus(disks):
1371
  """Get the mirroring status of a list of devices.
1372

1373
  @type disks: list of L{objects.Disk}
1374
  @param disks: the list of disks which we should query
1375
  @rtype: disk
1376
  @return:
1377
      a list of (mirror_done, estimated_time) tuples, which
1378
      are the result of L{bdev.BlockDev.CombinedSyncStatus}
1379
  @raise errors.BlockDeviceError: if any of the disks cannot be
1380
      found
1381

1382
  """
1383
  stats = []
1384
  for dsk in disks:
1385
    rbd = _RecursiveFindBD(dsk)
1386
    if rbd is None:
1387
      _Fail("Can't find device %s", dsk)
1388

    
1389
    stats.append(rbd.CombinedSyncStatus())
1390

    
1391
  return stats
1392

    
1393

    
1394
def _RecursiveFindBD(disk):
1395
  """Check if a device is activated.
1396

1397
  If so, return information about the real device.
1398

1399
  @type disk: L{objects.Disk}
1400
  @param disk: the disk object we need to find
1401

1402
  @return: None if the device can't be found,
1403
      otherwise the device instance
1404

1405
  """
1406
  children = []
1407
  if disk.children:
1408
    for chdisk in disk.children:
1409
      children.append(_RecursiveFindBD(chdisk))
1410

    
1411
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1412

    
1413

    
1414
def BlockdevFind(disk):
1415
  """Check if a device is activated.
1416

1417
  If it is, return information about the real device.
1418

1419
  @type disk: L{objects.Disk}
1420
  @param disk: the disk to find
1421
  @rtype: None or objects.BlockDevStatus
1422
  @return: None if the disk cannot be found, otherwise a the current
1423
           information
1424

1425
  """
1426
  try:
1427
    rbd = _RecursiveFindBD(disk)
1428
  except errors.BlockDeviceError, err:
1429
    _Fail("Failed to find device: %s", err, exc=True)
1430

    
1431
  if rbd is None:
1432
    return None
1433

    
1434
  return rbd.GetSyncStatus()
1435

    
1436

    
1437
def BlockdevGetsize(disks):
1438
  """Computes the size of the given disks.
1439

1440
  If a disk is not found, returns None instead.
1441

1442
  @type disks: list of L{objects.Disk}
1443
  @param disks: the list of disk to compute the size for
1444
  @rtype: list
1445
  @return: list with elements None if the disk cannot be found,
1446
      otherwise the size
1447

1448
  """
1449
  result = []
1450
  for cf in disks:
1451
    try:
1452
      rbd = _RecursiveFindBD(cf)
1453
    except errors.BlockDeviceError, err:
1454
      result.append(None)
1455
      continue
1456
    if rbd is None:
1457
      result.append(None)
1458
    else:
1459
      result.append(rbd.GetActualSize())
1460
  return result
1461

    
1462

    
1463
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1464
  """Export a block device to a remote node.
1465

1466
  @type disk: L{objects.Disk}
1467
  @param disk: the description of the disk to export
1468
  @type dest_node: str
1469
  @param dest_node: the destination node to export to
1470
  @type dest_path: str
1471
  @param dest_path: the destination path on the target node
1472
  @type cluster_name: str
1473
  @param cluster_name: the cluster name, needed for SSH hostalias
1474
  @rtype: None
1475

1476
  """
1477
  real_disk = _RecursiveFindBD(disk)
1478
  if real_disk is None:
1479
    _Fail("Block device '%s' is not set up", disk)
1480

    
1481
  real_disk.Open()
1482

    
1483
  # the block size on the read dd is 1MiB to match our units
1484
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1485
                               "dd if=%s bs=1048576 count=%s",
1486
                               real_disk.dev_path, str(disk.size))
1487

    
1488
  # we set here a smaller block size as, due to ssh buffering, more
1489
  # than 64-128k will mostly ignored; we use nocreat to fail if the
1490
  # device is not already there or we pass a wrong path; we use
1491
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1492
  # to not buffer too much memory; this means that at best, we flush
1493
  # every 64k, which will not be very fast
1494
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1495
                                " oflag=dsync", dest_path)
1496

    
1497
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1498
                                                   constants.GANETI_RUNAS,
1499
                                                   destcmd)
1500

    
1501
  # all commands have been checked, so we're safe to combine them
1502
  command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1503

    
1504
  result = utils.RunCmd(["bash", "-c", command])
1505

    
1506
  if result.failed:
1507
    _Fail("Disk copy command '%s' returned error: %s"
1508
          " output: %s", command, result.fail_reason, result.output)
1509

    
1510

    
1511
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1512
  """Write a file to the filesystem.
1513

1514
  This allows the master to overwrite(!) a file. It will only perform
1515
  the operation if the file belongs to a list of configuration files.
1516

1517
  @type file_name: str
1518
  @param file_name: the target file name
1519
  @type data: str
1520
  @param data: the new contents of the file
1521
  @type mode: int
1522
  @param mode: the mode to give the file (can be None)
1523
  @type uid: int
1524
  @param uid: the owner of the file (can be -1 for default)
1525
  @type gid: int
1526
  @param gid: the group of the file (can be -1 for default)
1527
  @type atime: float
1528
  @param atime: the atime to set on the file (can be None)
1529
  @type mtime: float
1530
  @param mtime: the mtime to set on the file (can be None)
1531
  @rtype: None
1532

1533
  """
1534
  if not os.path.isabs(file_name):
1535
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1536

    
1537
  if file_name not in _ALLOWED_UPLOAD_FILES:
1538
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1539
          file_name)
1540

    
1541
  raw_data = _Decompress(data)
1542

    
1543
  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1544
                  atime=atime, mtime=mtime)
1545

    
1546

    
1547
def WriteSsconfFiles(values):
1548
  """Update all ssconf files.
1549

1550
  Wrapper around the SimpleStore.WriteFiles.
1551

1552
  """
1553
  ssconf.SimpleStore().WriteFiles(values)
1554

    
1555

    
1556
def _ErrnoOrStr(err):
1557
  """Format an EnvironmentError exception.
1558

1559
  If the L{err} argument has an errno attribute, it will be looked up
1560
  and converted into a textual C{E...} description. Otherwise the
1561
  string representation of the error will be returned.
1562

1563
  @type err: L{EnvironmentError}
1564
  @param err: the exception to format
1565

1566
  """
1567
  if hasattr(err, 'errno'):
1568
    detail = errno.errorcode[err.errno]
1569
  else:
1570
    detail = str(err)
1571
  return detail
1572

    
1573

    
1574
def _OSOndiskAPIVersion(name, os_dir):
1575
  """Compute and return the API version of a given OS.
1576

1577
  This function will try to read the API version of the OS given by
1578
  the 'name' parameter and residing in the 'os_dir' directory.
1579

1580
  @type name: str
1581
  @param name: the OS name we should look for
1582
  @type os_dir: str
1583
  @param os_dir: the directory inwhich we should look for the OS
1584
  @rtype: tuple
1585
  @return: tuple (status, data) with status denoting the validity and
1586
      data holding either the vaid versions or an error message
1587

1588
  """
1589
  api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1590

    
1591
  try:
1592
    st = os.stat(api_file)
1593
  except EnvironmentError, err:
1594
    return False, ("Required file 'ganeti_api_version' file not"
1595
                   " found under path %s: %s" % (os_dir, _ErrnoOrStr(err)))
1596

    
1597
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1598
    return False, ("File 'ganeti_api_version' file at %s is not"
1599
                   " a regular file" % os_dir)
1600

    
1601
  try:
1602
    api_versions = utils.ReadFile(api_file).splitlines()
1603
  except EnvironmentError, err:
1604
    return False, ("Error while reading the API version file at %s: %s" %
1605
                   (api_file, _ErrnoOrStr(err)))
1606

    
1607
  try:
1608
    api_versions = [int(version.strip()) for version in api_versions]
1609
  except (TypeError, ValueError), err:
1610
    return False, ("API version(s) can't be converted to integer: %s" %
1611
                   str(err))
1612

    
1613
  return True, api_versions
1614

    
1615

    
1616
def DiagnoseOS(top_dirs=None):
1617
  """Compute the validity for all OSes.
1618

1619
  @type top_dirs: list
1620
  @param top_dirs: the list of directories in which to
1621
      search (if not given defaults to
1622
      L{constants.OS_SEARCH_PATH})
1623
  @rtype: list of L{objects.OS}
1624
  @return: a list of tuples (name, path, status, diagnose)
1625
      for all (potential) OSes under all search paths, where:
1626
          - name is the (potential) OS name
1627
          - path is the full path to the OS
1628
          - status True/False is the validity of the OS
1629
          - diagnose is the error message for an invalid OS, otherwise empty
1630

1631
  """
1632
  if top_dirs is None:
1633
    top_dirs = constants.OS_SEARCH_PATH
1634

    
1635
  result = []
1636
  for dir_name in top_dirs:
1637
    if os.path.isdir(dir_name):
1638
      try:
1639
        f_names = utils.ListVisibleFiles(dir_name)
1640
      except EnvironmentError, err:
1641
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1642
        break
1643
      for name in f_names:
1644
        os_path = os.path.sep.join([dir_name, name])
1645
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1646
        if status:
1647
          diagnose = ""
1648
        else:
1649
          diagnose = os_inst
1650
        result.append((name, os_path, status, diagnose))
1651

    
1652
  return result
1653

    
1654

    
1655
def _TryOSFromDisk(name, base_dir=None):
1656
  """Create an OS instance from disk.
1657

1658
  This function will return an OS instance if the given name is a
1659
  valid OS name.
1660

1661
  @type base_dir: string
1662
  @keyword base_dir: Base directory containing OS installations.
1663
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1664
  @rtype: tuple
1665
  @return: success and either the OS instance if we find a valid one,
1666
      or error message
1667

1668
  """
1669
  if base_dir is None:
1670
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1671
    if os_dir is None:
1672
      return False, "Directory for OS %s not found in search path" % name
1673
  else:
1674
    os_dir = os.path.sep.join([base_dir, name])
1675

    
1676
  status, api_versions = _OSOndiskAPIVersion(name, os_dir)
1677
  if not status:
1678
    # push the error up
1679
    return status, api_versions
1680

    
1681
  if not constants.OS_API_VERSIONS.intersection(api_versions):
1682
    return False, ("API version mismatch for path '%s': found %s, want %s." %
1683
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
1684

    
1685
  # OS Scripts dictionary, we will populate it with the actual script names
1686
  os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1687

    
1688
  for script in os_scripts:
1689
    os_scripts[script] = os.path.sep.join([os_dir, script])
1690

    
1691
    try:
1692
      st = os.stat(os_scripts[script])
1693
    except EnvironmentError, err:
1694
      return False, ("Script '%s' under path '%s' is missing (%s)" %
1695
                     (script, os_dir, _ErrnoOrStr(err)))
1696

    
1697
    if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1698
      return False, ("Script '%s' under path '%s' is not executable" %
1699
                     (script, os_dir))
1700

    
1701
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1702
      return False, ("Script '%s' under path '%s' is not a regular file" %
1703
                     (script, os_dir))
1704

    
1705
  os_obj = objects.OS(name=name, path=os_dir,
1706
                      create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1707
                      export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1708
                      import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1709
                      rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1710
                      api_versions=api_versions)
1711
  return True, os_obj
1712

    
1713

    
1714
def OSFromDisk(name, base_dir=None):
1715
  """Create an OS instance from disk.
1716

1717
  This function will return an OS instance if the given name is a
1718
  valid OS name. Otherwise, it will raise an appropriate
1719
  L{RPCFail} exception, detailing why this is not a valid OS.
1720

1721
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1722
  an exception but returns true/false status data.
1723

1724
  @type base_dir: string
1725
  @keyword base_dir: Base directory containing OS installations.
1726
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1727
  @rtype: L{objects.OS}
1728
  @return: the OS instance if we find a valid one
1729
  @raise RPCFail: if we don't find a valid OS
1730

1731
  """
1732
  status, payload = _TryOSFromDisk(name, base_dir)
1733

    
1734
  if not status:
1735
    _Fail(payload)
1736

    
1737
  return payload
1738

    
1739

    
1740
def OSEnvironment(instance, os, debug=0):
1741
  """Calculate the environment for an os script.
1742

1743
  @type instance: L{objects.Instance}
1744
  @param instance: target instance for the os script run
1745
  @type os: L{objects.OS}
1746
  @param os: operating system for which the environment is being built
1747
  @type debug: integer
1748
  @param debug: debug level (0 or 1, for OS Api 10)
1749
  @rtype: dict
1750
  @return: dict of environment variables
1751
  @raise errors.BlockDeviceError: if the block device
1752
      cannot be found
1753

1754
  """
1755
  result = {}
1756
  api_version = max(constants.OS_API_VERSIONS.intersection(os.api_versions))
1757
  result['OS_API_VERSION'] = '%d' % api_version
1758
  result['INSTANCE_NAME'] = instance.name
1759
  result['INSTANCE_OS'] = instance.os
1760
  result['HYPERVISOR'] = instance.hypervisor
1761
  result['DISK_COUNT'] = '%d' % len(instance.disks)
1762
  result['NIC_COUNT'] = '%d' % len(instance.nics)
1763
  result['DEBUG_LEVEL'] = '%d' % debug
1764
  for idx, disk in enumerate(instance.disks):
1765
    real_disk = _RecursiveFindBD(disk)
1766
    if real_disk is None:
1767
      raise errors.BlockDeviceError("Block device '%s' is not set up" %
1768
                                    str(disk))
1769
    real_disk.Open()
1770
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
1771
    result['DISK_%d_ACCESS' % idx] = disk.mode
1772
    if constants.HV_DISK_TYPE in instance.hvparams:
1773
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
1774
        instance.hvparams[constants.HV_DISK_TYPE]
1775
    if disk.dev_type in constants.LDS_BLOCK:
1776
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1777
    elif disk.dev_type == constants.LD_FILE:
1778
      result['DISK_%d_BACKEND_TYPE' % idx] = \
1779
        'file:%s' % disk.physical_id[0]
1780
  for idx, nic in enumerate(instance.nics):
1781
    result['NIC_%d_MAC' % idx] = nic.mac
1782
    if nic.ip:
1783
      result['NIC_%d_IP' % idx] = nic.ip
1784
    result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1785
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1786
      result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1787
    if nic.nicparams[constants.NIC_LINK]:
1788
      result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1789
    if constants.HV_NIC_TYPE in instance.hvparams:
1790
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
1791
        instance.hvparams[constants.HV_NIC_TYPE]
1792

    
1793
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1794
    for key, value in source.items():
1795
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1796

    
1797
  return result
1798

    
1799
def BlockdevGrow(disk, amount):
1800
  """Grow a stack of block devices.
1801

1802
  This function is called recursively, with the childrens being the
1803
  first ones to resize.
1804

1805
  @type disk: L{objects.Disk}
1806
  @param disk: the disk to be grown
1807
  @rtype: (status, result)
1808
  @return: a tuple with the status of the operation
1809
      (True/False), and the errors message if status
1810
      is False
1811

1812
  """
1813
  r_dev = _RecursiveFindBD(disk)
1814
  if r_dev is None:
1815
    _Fail("Cannot find block device %s", disk)
1816

    
1817
  try:
1818
    r_dev.Grow(amount)
1819
  except errors.BlockDeviceError, err:
1820
    _Fail("Failed to grow block device: %s", err, exc=True)
1821

    
1822

    
1823
def BlockdevSnapshot(disk):
1824
  """Create a snapshot copy of a block device.
1825

1826
  This function is called recursively, and the snapshot is actually created
1827
  just for the leaf lvm backend device.
1828

1829
  @type disk: L{objects.Disk}
1830
  @param disk: the disk to be snapshotted
1831
  @rtype: string
1832
  @return: snapshot disk path
1833

1834
  """
1835
  if disk.children:
1836
    if len(disk.children) == 1:
1837
      # only one child, let's recurse on it
1838
      return BlockdevSnapshot(disk.children[0])
1839
    else:
1840
      # more than one child, choose one that matches
1841
      for child in disk.children:
1842
        if child.size == disk.size:
1843
          # return implies breaking the loop
1844
          return BlockdevSnapshot(child)
1845
  elif disk.dev_type == constants.LD_LV:
1846
    r_dev = _RecursiveFindBD(disk)
1847
    if r_dev is not None:
1848
      # let's stay on the safe side and ask for the full size, for now
1849
      return r_dev.Snapshot(disk.size)
1850
    else:
1851
      _Fail("Cannot find block device %s", disk)
1852
  else:
1853
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1854
          disk.unique_id, disk.dev_type)
1855

    
1856

    
1857
def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1858
  """Export a block device snapshot to a remote node.
1859

1860
  @type disk: L{objects.Disk}
1861
  @param disk: the description of the disk to export
1862
  @type dest_node: str
1863
  @param dest_node: the destination node to export to
1864
  @type instance: L{objects.Instance}
1865
  @param instance: the instance object to whom the disk belongs
1866
  @type cluster_name: str
1867
  @param cluster_name: the cluster name, needed for SSH hostalias
1868
  @type idx: int
1869
  @param idx: the index of the disk in the instance's disk list,
1870
      used to export to the OS scripts environment
1871
  @rtype: None
1872

1873
  """
1874
  inst_os = OSFromDisk(instance.os)
1875
  export_env = OSEnvironment(instance, inst_os)
1876

    
1877
  export_script = inst_os.export_script
1878

    
1879
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1880
                                     instance.name, int(time.time()))
1881
  if not os.path.exists(constants.LOG_OS_DIR):
1882
    os.mkdir(constants.LOG_OS_DIR, 0750)
1883
  real_disk = _RecursiveFindBD(disk)
1884
  if real_disk is None:
1885
    _Fail("Block device '%s' is not set up", disk)
1886

    
1887
  real_disk.Open()
1888

    
1889
  export_env['EXPORT_DEVICE'] = real_disk.dev_path
1890
  export_env['EXPORT_INDEX'] = str(idx)
1891

    
1892
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1893
  destfile = disk.physical_id[1]
1894

    
1895
  # the target command is built out of three individual commands,
1896
  # which are joined by pipes; we check each individual command for
1897
  # valid parameters
1898
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
1899
                               inst_os.path, export_script, logfile)
1900

    
1901
  comprcmd = "gzip"
1902

    
1903
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1904
                                destdir, destdir, destfile)
1905
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1906
                                                   constants.GANETI_RUNAS,
1907
                                                   destcmd)
1908

    
1909
  # all commands have been checked, so we're safe to combine them
1910
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1911

    
1912
  result = utils.RunCmd(["bash", "-c", command], env=export_env)
1913

    
1914
  if result.failed:
1915
    _Fail("OS snapshot export command '%s' returned error: %s"
1916
          " output: %s", command, result.fail_reason, result.output)
1917

    
1918

    
1919
def FinalizeExport(instance, snap_disks):
1920
  """Write out the export configuration information.
1921

1922
  @type instance: L{objects.Instance}
1923
  @param instance: the instance which we export, used for
1924
      saving configuration
1925
  @type snap_disks: list of L{objects.Disk}
1926
  @param snap_disks: list of snapshot block devices, which
1927
      will be used to get the actual name of the dump file
1928

1929
  @rtype: None
1930

1931
  """
1932
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1933
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1934

    
1935
  config = objects.SerializableConfigParser()
1936

    
1937
  config.add_section(constants.INISECT_EXP)
1938
  config.set(constants.INISECT_EXP, 'version', '0')
1939
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1940
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1941
  config.set(constants.INISECT_EXP, 'os', instance.os)
1942
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
1943

    
1944
  config.add_section(constants.INISECT_INS)
1945
  config.set(constants.INISECT_INS, 'name', instance.name)
1946
  config.set(constants.INISECT_INS, 'memory', '%d' %
1947
             instance.beparams[constants.BE_MEMORY])
1948
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
1949
             instance.beparams[constants.BE_VCPUS])
1950
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1951

    
1952
  nic_total = 0
1953
  for nic_count, nic in enumerate(instance.nics):
1954
    nic_total += 1
1955
    config.set(constants.INISECT_INS, 'nic%d_mac' %
1956
               nic_count, '%s' % nic.mac)
1957
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1958
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1959
               '%s' % nic.bridge)
1960
  # TODO: redundant: on load can read nics until it doesn't exist
1961
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1962

    
1963
  disk_total = 0
1964
  for disk_count, disk in enumerate(snap_disks):
1965
    if disk:
1966
      disk_total += 1
1967
      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1968
                 ('%s' % disk.iv_name))
1969
      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1970
                 ('%s' % disk.physical_id[1]))
1971
      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1972
                 ('%d' % disk.size))
1973

    
1974
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1975

    
1976
  utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1977
                  data=config.Dumps())
1978
  shutil.rmtree(finaldestdir, True)
1979
  shutil.move(destdir, finaldestdir)
1980

    
1981

    
1982
def ExportInfo(dest):
1983
  """Get export configuration information.
1984

1985
  @type dest: str
1986
  @param dest: directory containing the export
1987

1988
  @rtype: L{objects.SerializableConfigParser}
1989
  @return: a serializable config file containing the
1990
      export info
1991

1992
  """
1993
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1994

    
1995
  config = objects.SerializableConfigParser()
1996
  config.read(cff)
1997

    
1998
  if (not config.has_section(constants.INISECT_EXP) or
1999
      not config.has_section(constants.INISECT_INS)):
2000
    _Fail("Export info file doesn't have the required fields")
2001

    
2002
  return config.Dumps()
2003

    
2004

    
2005
def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
2006
  """Import an os image into an instance.
2007

2008
  @type instance: L{objects.Instance}
2009
  @param instance: instance to import the disks into
2010
  @type src_node: string
2011
  @param src_node: source node for the disk images
2012
  @type src_images: list of string
2013
  @param src_images: absolute paths of the disk images
2014
  @rtype: list of boolean
2015
  @return: each boolean represent the success of importing the n-th disk
2016

2017
  """
2018
  inst_os = OSFromDisk(instance.os)
2019
  import_env = OSEnvironment(instance, inst_os)
2020
  import_script = inst_os.import_script
2021

    
2022
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
2023
                                        instance.name, int(time.time()))
2024
  if not os.path.exists(constants.LOG_OS_DIR):
2025
    os.mkdir(constants.LOG_OS_DIR, 0750)
2026

    
2027
  comprcmd = "gunzip"
2028
  impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
2029
                               import_script, logfile)
2030

    
2031
  final_result = []
2032
  for idx, image in enumerate(src_images):
2033
    if image:
2034
      destcmd = utils.BuildShellCmd('cat %s', image)
2035
      remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
2036
                                                       constants.GANETI_RUNAS,
2037
                                                       destcmd)
2038
      command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
2039
      import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
2040
      import_env['IMPORT_INDEX'] = str(idx)
2041
      result = utils.RunCmd(command, env=import_env)
2042
      if result.failed:
2043
        logging.error("Disk import command '%s' returned error: %s"
2044
                      " output: %s", command, result.fail_reason,
2045
                      result.output)
2046
        final_result.append("error importing disk %d: %s, %s" %
2047
                            (idx, result.fail_reason, result.output[-100]))
2048

    
2049
  if final_result:
2050
    _Fail("; ".join(final_result), log=False)
2051

    
2052

    
2053
def ListExports():
2054
  """Return a list of exports currently available on this machine.
2055

2056
  @rtype: list
2057
  @return: list of the exports
2058

2059
  """
2060
  if os.path.isdir(constants.EXPORT_DIR):
2061
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
2062
  else:
2063
    _Fail("No exports directory")
2064

    
2065

    
2066
def RemoveExport(export):
2067
  """Remove an existing export from the node.
2068

2069
  @type export: str
2070
  @param export: the name of the export to remove
2071
  @rtype: None
2072

2073
  """
2074
  target = os.path.join(constants.EXPORT_DIR, export)
2075

    
2076
  try:
2077
    shutil.rmtree(target)
2078
  except EnvironmentError, err:
2079
    _Fail("Error while removing the export: %s", err, exc=True)
2080

    
2081

    
2082
def BlockdevRename(devlist):
2083
  """Rename a list of block devices.
2084

2085
  @type devlist: list of tuples
2086
  @param devlist: list of tuples of the form  (disk,
2087
      new_logical_id, new_physical_id); disk is an
2088
      L{objects.Disk} object describing the current disk,
2089
      and new logical_id/physical_id is the name we
2090
      rename it to
2091
  @rtype: boolean
2092
  @return: True if all renames succeeded, False otherwise
2093

2094
  """
2095
  msgs = []
2096
  result = True
2097
  for disk, unique_id in devlist:
2098
    dev = _RecursiveFindBD(disk)
2099
    if dev is None:
2100
      msgs.append("Can't find device %s in rename" % str(disk))
2101
      result = False
2102
      continue
2103
    try:
2104
      old_rpath = dev.dev_path
2105
      dev.Rename(unique_id)
2106
      new_rpath = dev.dev_path
2107
      if old_rpath != new_rpath:
2108
        DevCacheManager.RemoveCache(old_rpath)
2109
        # FIXME: we should add the new cache information here, like:
2110
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2111
        # but we don't have the owner here - maybe parse from existing
2112
        # cache? for now, we only lose lvm data when we rename, which
2113
        # is less critical than DRBD or MD
2114
    except errors.BlockDeviceError, err:
2115
      msgs.append("Can't rename device '%s' to '%s': %s" %
2116
                  (dev, unique_id, err))
2117
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2118
      result = False
2119
  if not result:
2120
    _Fail("; ".join(msgs))
2121

    
2122

    
2123
def _TransformFileStorageDir(file_storage_dir):
2124
  """Checks whether given file_storage_dir is valid.
2125

2126
  Checks wheter the given file_storage_dir is within the cluster-wide
2127
  default file_storage_dir stored in SimpleStore. Only paths under that
2128
  directory are allowed.
2129

2130
  @type file_storage_dir: str
2131
  @param file_storage_dir: the path to check
2132

2133
  @return: the normalized path if valid, None otherwise
2134

2135
  """
2136
  cfg = _GetConfig()
2137
  file_storage_dir = os.path.normpath(file_storage_dir)
2138
  base_file_storage_dir = cfg.GetFileStorageDir()
2139
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2140
      base_file_storage_dir):
2141
    _Fail("File storage directory '%s' is not under base file"
2142
          " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2143
  return file_storage_dir
2144

    
2145

    
2146
def CreateFileStorageDir(file_storage_dir):
2147
  """Create file storage directory.
2148

2149
  @type file_storage_dir: str
2150
  @param file_storage_dir: directory to create
2151

2152
  @rtype: tuple
2153
  @return: tuple with first element a boolean indicating wheter dir
2154
      creation was successful or not
2155

2156
  """
2157
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2158
  if os.path.exists(file_storage_dir):
2159
    if not os.path.isdir(file_storage_dir):
2160
      _Fail("Specified storage dir '%s' is not a directory",
2161
            file_storage_dir)
2162
  else:
2163
    try:
2164
      os.makedirs(file_storage_dir, 0750)
2165
    except OSError, err:
2166
      _Fail("Cannot create file storage directory '%s': %s",
2167
            file_storage_dir, err, exc=True)
2168

    
2169

    
2170
def RemoveFileStorageDir(file_storage_dir):
2171
  """Remove file storage directory.
2172

2173
  Remove it only if it's empty. If not log an error and return.
2174

2175
  @type file_storage_dir: str
2176
  @param file_storage_dir: the directory we should cleanup
2177
  @rtype: tuple (success,)
2178
  @return: tuple of one element, C{success}, denoting
2179
      whether the operation was successful
2180

2181
  """
2182
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2183
  if os.path.exists(file_storage_dir):
2184
    if not os.path.isdir(file_storage_dir):
2185
      _Fail("Specified Storage directory '%s' is not a directory",
2186
            file_storage_dir)
2187
    # deletes dir only if empty, otherwise we want to fail the rpc call
2188
    try:
2189
      os.rmdir(file_storage_dir)
2190
    except OSError, err:
2191
      _Fail("Cannot remove file storage directory '%s': %s",
2192
            file_storage_dir, err)
2193

    
2194

    
2195
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2196
  """Rename the file storage directory.
2197

2198
  @type old_file_storage_dir: str
2199
  @param old_file_storage_dir: the current path
2200
  @type new_file_storage_dir: str
2201
  @param new_file_storage_dir: the name we should rename to
2202
  @rtype: tuple (success,)
2203
  @return: tuple of one element, C{success}, denoting
2204
      whether the operation was successful
2205

2206
  """
2207
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2208
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2209
  if not os.path.exists(new_file_storage_dir):
2210
    if os.path.isdir(old_file_storage_dir):
2211
      try:
2212
        os.rename(old_file_storage_dir, new_file_storage_dir)
2213
      except OSError, err:
2214
        _Fail("Cannot rename '%s' to '%s': %s",
2215
              old_file_storage_dir, new_file_storage_dir, err)
2216
    else:
2217
      _Fail("Specified storage dir '%s' is not a directory",
2218
            old_file_storage_dir)
2219
  else:
2220
    if os.path.exists(old_file_storage_dir):
2221
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2222
            old_file_storage_dir, new_file_storage_dir)
2223

    
2224

    
2225
def _EnsureJobQueueFile(file_name):
2226
  """Checks whether the given filename is in the queue directory.
2227

2228
  @type file_name: str
2229
  @param file_name: the file name we should check
2230
  @rtype: None
2231
  @raises RPCFail: if the file is not valid
2232

2233
  """
2234
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2235
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2236

    
2237
  if not result:
2238
    _Fail("Passed job queue file '%s' does not belong to"
2239
          " the queue directory '%s'", file_name, queue_dir)
2240

    
2241

    
2242
def JobQueueUpdate(file_name, content):
2243
  """Updates a file in the queue directory.
2244

2245
  This is just a wrapper over L{utils.WriteFile}, with proper
2246
  checking.
2247

2248
  @type file_name: str
2249
  @param file_name: the job file name
2250
  @type content: str
2251
  @param content: the new job contents
2252
  @rtype: boolean
2253
  @return: the success of the operation
2254

2255
  """
2256
  _EnsureJobQueueFile(file_name)
2257

    
2258
  # Write and replace the file atomically
2259
  utils.WriteFile(file_name, data=_Decompress(content))
2260

    
2261

    
2262
def JobQueueRename(old, new):
2263
  """Renames a job queue file.
2264

2265
  This is just a wrapper over os.rename with proper checking.
2266

2267
  @type old: str
2268
  @param old: the old (actual) file name
2269
  @type new: str
2270
  @param new: the desired file name
2271
  @rtype: tuple
2272
  @return: the success of the operation and payload
2273

2274
  """
2275
  _EnsureJobQueueFile(old)
2276
  _EnsureJobQueueFile(new)
2277

    
2278
  utils.RenameFile(old, new, mkdir=True)
2279

    
2280

    
2281
def JobQueueSetDrainFlag(drain_flag):
2282
  """Set the drain flag for the queue.
2283

2284
  This will set or unset the queue drain flag.
2285

2286
  @type drain_flag: boolean
2287
  @param drain_flag: if True, will set the drain flag, otherwise reset it.
2288
  @rtype: truple
2289
  @return: always True, None
2290
  @warning: the function always returns True
2291

2292
  """
2293
  if drain_flag:
2294
    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2295
  else:
2296
    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2297

    
2298

    
2299
def BlockdevClose(instance_name, disks):
2300
  """Closes the given block devices.
2301

2302
  This means they will be switched to secondary mode (in case of
2303
  DRBD).
2304

2305
  @param instance_name: if the argument is not empty, the symlinks
2306
      of this instance will be removed
2307
  @type disks: list of L{objects.Disk}
2308
  @param disks: the list of disks to be closed
2309
  @rtype: tuple (success, message)
2310
  @return: a tuple of success and message, where success
2311
      indicates the succes of the operation, and message
2312
      which will contain the error details in case we
2313
      failed
2314

2315
  """
2316
  bdevs = []
2317
  for cf in disks:
2318
    rd = _RecursiveFindBD(cf)
2319
    if rd is None:
2320
      _Fail("Can't find device %s", cf)
2321
    bdevs.append(rd)
2322

    
2323
  msg = []
2324
  for rd in bdevs:
2325
    try:
2326
      rd.Close()
2327
    except errors.BlockDeviceError, err:
2328
      msg.append(str(err))
2329
  if msg:
2330
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2331
  else:
2332
    if instance_name:
2333
      _RemoveBlockDevLinks(instance_name, disks)
2334

    
2335

    
2336
def ValidateHVParams(hvname, hvparams):
2337
  """Validates the given hypervisor parameters.
2338

2339
  @type hvname: string
2340
  @param hvname: the hypervisor name
2341
  @type hvparams: dict
2342
  @param hvparams: the hypervisor parameters to be validated
2343
  @rtype: None
2344

2345
  """
2346
  try:
2347
    hv_type = hypervisor.GetHypervisor(hvname)
2348
    hv_type.ValidateParameters(hvparams)
2349
  except errors.HypervisorError, err:
2350
    _Fail(str(err), log=False)
2351

    
2352

    
2353
def DemoteFromMC():
2354
  """Demotes the current node from master candidate role.
2355

2356
  """
2357
  # try to ensure we're not the master by mistake
2358
  master, myself = ssconf.GetMasterAndMyself()
2359
  if master == myself:
2360
    _Fail("ssconf status shows I'm the master node, will not demote")
2361
  pid_file = utils.DaemonPidFileName(constants.MASTERD)
2362
  if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2363
    _Fail("The master daemon is running, will not demote")
2364
  try:
2365
    if os.path.isfile(constants.CLUSTER_CONF_FILE):
2366
      utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2367
  except EnvironmentError, err:
2368
    if err.errno != errno.ENOENT:
2369
      _Fail("Error while backing up cluster file: %s", err, exc=True)
2370
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2371

    
2372

    
2373
def _FindDisks(nodes_ip, disks):
2374
  """Sets the physical ID on disks and returns the block devices.
2375

2376
  """
2377
  # set the correct physical ID
2378
  my_name = utils.HostInfo().name
2379
  for cf in disks:
2380
    cf.SetPhysicalID(my_name, nodes_ip)
2381

    
2382
  bdevs = []
2383

    
2384
  for cf in disks:
2385
    rd = _RecursiveFindBD(cf)
2386
    if rd is None:
2387
      _Fail("Can't find device %s", cf)
2388
    bdevs.append(rd)
2389
  return bdevs
2390

    
2391

    
2392
def DrbdDisconnectNet(nodes_ip, disks):
2393
  """Disconnects the network on a list of drbd devices.
2394

2395
  """
2396
  bdevs = _FindDisks(nodes_ip, disks)
2397

    
2398
  # disconnect disks
2399
  for rd in bdevs:
2400
    try:
2401
      rd.DisconnectNet()
2402
    except errors.BlockDeviceError, err:
2403
      _Fail("Can't change network configuration to standalone mode: %s",
2404
            err, exc=True)
2405

    
2406

    
2407
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2408
  """Attaches the network on a list of drbd devices.
2409

2410
  """
2411
  bdevs = _FindDisks(nodes_ip, disks)
2412

    
2413
  if multimaster:
2414
    for idx, rd in enumerate(bdevs):
2415
      try:
2416
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2417
      except EnvironmentError, err:
2418
        _Fail("Can't create symlink: %s", err)
2419
  # reconnect disks, switch to new master configuration and if
2420
  # needed primary mode
2421
  for rd in bdevs:
2422
    try:
2423
      rd.AttachNet(multimaster)
2424
    except errors.BlockDeviceError, err:
2425
      _Fail("Can't change network configuration: %s", err)
2426
  # wait until the disks are connected; we need to retry the re-attach
2427
  # if the device becomes standalone, as this might happen if the one
2428
  # node disconnects and reconnects in a different mode before the
2429
  # other node reconnects; in this case, one or both of the nodes will
2430
  # decide it has wrong configuration and switch to standalone
2431
  RECONNECT_TIMEOUT = 2 * 60
2432
  sleep_time = 0.100 # start with 100 miliseconds
2433
  timeout_limit = time.time() + RECONNECT_TIMEOUT
2434
  while time.time() < timeout_limit:
2435
    all_connected = True
2436
    for rd in bdevs:
2437
      stats = rd.GetProcStatus()
2438
      if not (stats.is_connected or stats.is_in_resync):
2439
        all_connected = False
2440
      if stats.is_standalone:
2441
        # peer had different config info and this node became
2442
        # standalone, even though this should not happen with the
2443
        # new staged way of changing disk configs
2444
        try:
2445
          rd.AttachNet(multimaster)
2446
        except errors.BlockDeviceError, err:
2447
          _Fail("Can't change network configuration: %s", err)
2448
    if all_connected:
2449
      break
2450
    time.sleep(sleep_time)
2451
    sleep_time = min(5, sleep_time * 1.5)
2452
  if not all_connected:
2453
    _Fail("Timeout in disk reconnecting")
2454
  if multimaster:
2455
    # change to primary mode
2456
    for rd in bdevs:
2457
      try:
2458
        rd.Open()
2459
      except errors.BlockDeviceError, err:
2460
        _Fail("Can't change to primary mode: %s", err)
2461

    
2462

    
2463
def DrbdWaitSync(nodes_ip, disks):
2464
  """Wait until DRBDs have synchronized.
2465

2466
  """
2467
  bdevs = _FindDisks(nodes_ip, disks)
2468

    
2469
  min_resync = 100
2470
  alldone = True
2471
  for rd in bdevs:
2472
    stats = rd.GetProcStatus()
2473
    if not (stats.is_connected or stats.is_in_resync):
2474
      _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2475
    alldone = alldone and (not stats.is_in_resync)
2476
    if stats.sync_percent is not None:
2477
      min_resync = min(min_resync, stats.sync_percent)
2478

    
2479
  return (alldone, min_resync)
2480

    
2481

    
2482
def PowercycleNode(hypervisor_type):
2483
  """Hard-powercycle the node.
2484

2485
  Because we need to return first, and schedule the powercycle in the
2486
  background, we won't be able to report failures nicely.
2487

2488
  """
2489
  hyper = hypervisor.GetHypervisor(hypervisor_type)
2490
  try:
2491
    pid = os.fork()
2492
  except OSError:
2493
    # if we can't fork, we'll pretend that we're in the child process
2494
    pid = 0
2495
  if pid > 0:
2496
    return "Reboot scheduled in 5 seconds"
2497
  time.sleep(5)
2498
  hyper.PowercycleNode()
2499

    
2500

    
2501
class HooksRunner(object):
2502
  """Hook runner.
2503

2504
  This class is instantiated on the node side (ganeti-noded) and not
2505
  on the master side.
2506

2507
  """
2508
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2509

    
2510
  def __init__(self, hooks_base_dir=None):
2511
    """Constructor for hooks runner.
2512

2513
    @type hooks_base_dir: str or None
2514
    @param hooks_base_dir: if not None, this overrides the
2515
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
2516

2517
    """
2518
    if hooks_base_dir is None:
2519
      hooks_base_dir = constants.HOOKS_BASE_DIR
2520
    self._BASE_DIR = hooks_base_dir
2521

    
2522
  @staticmethod
2523
  def ExecHook(script, env):
2524
    """Exec one hook script.
2525

2526
    @type script: str
2527
    @param script: the full path to the script
2528
    @type env: dict
2529
    @param env: the environment with which to exec the script
2530
    @rtype: tuple (success, message)
2531
    @return: a tuple of success and message, where success
2532
        indicates the succes of the operation, and message
2533
        which will contain the error details in case we
2534
        failed
2535

2536
    """
2537
    # exec the process using subprocess and log the output
2538
    fdstdin = None
2539
    try:
2540
      fdstdin = open("/dev/null", "r")
2541
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2542
                               stderr=subprocess.STDOUT, close_fds=True,
2543
                               shell=False, cwd="/", env=env)
2544
      output = ""
2545
      try:
2546
        output = child.stdout.read(4096)
2547
        child.stdout.close()
2548
      except EnvironmentError, err:
2549
        output += "Hook script error: %s" % str(err)
2550

    
2551
      while True:
2552
        try:
2553
          result = child.wait()
2554
          break
2555
        except EnvironmentError, err:
2556
          if err.errno == errno.EINTR:
2557
            continue
2558
          raise
2559
    finally:
2560
      # try not to leak fds
2561
      for fd in (fdstdin, ):
2562
        if fd is not None:
2563
          try:
2564
            fd.close()
2565
          except EnvironmentError, err:
2566
            # just log the error
2567
            #logging.exception("Error while closing fd %s", fd)
2568
            pass
2569

    
2570
    return result == 0, utils.SafeEncode(output.strip())
2571

    
2572
  def RunHooks(self, hpath, phase, env):
2573
    """Run the scripts in the hooks directory.
2574

2575
    @type hpath: str
2576
    @param hpath: the path to the hooks directory which
2577
        holds the scripts
2578
    @type phase: str
2579
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
2580
        L{constants.HOOKS_PHASE_POST}
2581
    @type env: dict
2582
    @param env: dictionary with the environment for the hook
2583
    @rtype: list
2584
    @return: list of 3-element tuples:
2585
      - script path
2586
      - script result, either L{constants.HKR_SUCCESS} or
2587
        L{constants.HKR_FAIL}
2588
      - output of the script
2589

2590
    @raise errors.ProgrammerError: for invalid input
2591
        parameters
2592

2593
    """
2594
    if phase == constants.HOOKS_PHASE_PRE:
2595
      suffix = "pre"
2596
    elif phase == constants.HOOKS_PHASE_POST:
2597
      suffix = "post"
2598
    else:
2599
      _Fail("Unknown hooks phase '%s'", phase)
2600

    
2601
    rr = []
2602

    
2603
    subdir = "%s-%s.d" % (hpath, suffix)
2604
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2605
    try:
2606
      dir_contents = utils.ListVisibleFiles(dir_name)
2607
    except OSError:
2608
      # FIXME: must log output in case of failures
2609
      return rr
2610

    
2611
    # we use the standard python sort order,
2612
    # so 00name is the recommended naming scheme
2613
    dir_contents.sort()
2614
    for relname in dir_contents:
2615
      fname = os.path.join(dir_name, relname)
2616
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2617
          self.RE_MASK.match(relname) is not None):
2618
        rrval = constants.HKR_SKIP
2619
        output = ""
2620
      else:
2621
        result, output = self.ExecHook(fname, env)
2622
        if not result:
2623
          rrval = constants.HKR_FAIL
2624
        else:
2625
          rrval = constants.HKR_SUCCESS
2626
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
2627

    
2628
    return rr
2629

    
2630

    
2631
class IAllocatorRunner(object):
2632
  """IAllocator runner.
2633

2634
  This class is instantiated on the node side (ganeti-noded) and not on
2635
  the master side.
2636

2637
  """
2638
  def Run(self, name, idata):
2639
    """Run an iallocator script.
2640

2641
    @type name: str
2642
    @param name: the iallocator script name
2643
    @type idata: str
2644
    @param idata: the allocator input data
2645

2646
    @rtype: tuple
2647
    @return: two element tuple of:
2648
       - status
2649
       - either error message or stdout of allocator (for success)
2650

2651
    """
2652
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2653
                                  os.path.isfile)
2654
    if alloc_script is None:
2655
      _Fail("iallocator module '%s' not found in the search path", name)
2656

    
2657
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2658
    try:
2659
      os.write(fd, idata)
2660
      os.close(fd)
2661
      result = utils.RunCmd([alloc_script, fin_name])
2662
      if result.failed:
2663
        _Fail("iallocator module '%s' failed: %s, output '%s'",
2664
              name, result.fail_reason, result.output)
2665
    finally:
2666
      os.unlink(fin_name)
2667

    
2668
    return result.stdout
2669

    
2670

    
2671
class DevCacheManager(object):
2672
  """Simple class for managing a cache of block device information.
2673

2674
  """
2675
  _DEV_PREFIX = "/dev/"
2676
  _ROOT_DIR = constants.BDEV_CACHE_DIR
2677

    
2678
  @classmethod
2679
  def _ConvertPath(cls, dev_path):
2680
    """Converts a /dev/name path to the cache file name.
2681

2682
    This replaces slashes with underscores and strips the /dev
2683
    prefix. It then returns the full path to the cache file.
2684

2685
    @type dev_path: str
2686
    @param dev_path: the C{/dev/} path name
2687
    @rtype: str
2688
    @return: the converted path name
2689

2690
    """
2691
    if dev_path.startswith(cls._DEV_PREFIX):
2692
      dev_path = dev_path[len(cls._DEV_PREFIX):]
2693
    dev_path = dev_path.replace("/", "_")
2694
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2695
    return fpath
2696

    
2697
  @classmethod
2698
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2699
    """Updates the cache information for a given device.
2700

2701
    @type dev_path: str
2702
    @param dev_path: the pathname of the device
2703
    @type owner: str
2704
    @param owner: the owner (instance name) of the device
2705
    @type on_primary: bool
2706
    @param on_primary: whether this is the primary
2707
        node nor not
2708
    @type iv_name: str
2709
    @param iv_name: the instance-visible name of the
2710
        device, as in objects.Disk.iv_name
2711

2712
    @rtype: None
2713

2714
    """
2715
    if dev_path is None:
2716
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
2717
      return
2718
    fpath = cls._ConvertPath(dev_path)
2719
    if on_primary:
2720
      state = "primary"
2721
    else:
2722
      state = "secondary"
2723
    if iv_name is None:
2724
      iv_name = "not_visible"
2725
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2726
    try:
2727
      utils.WriteFile(fpath, data=fdata)
2728
    except EnvironmentError, err:
2729
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
2730

    
2731
  @classmethod
2732
  def RemoveCache(cls, dev_path):
2733
    """Remove data for a dev_path.
2734

2735
    This is just a wrapper over L{utils.RemoveFile} with a converted
2736
    path name and logging.
2737

2738
    @type dev_path: str
2739
    @param dev_path: the pathname of the device
2740

2741
    @rtype: None
2742

2743
    """
2744
    if dev_path is None:
2745
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
2746
      return
2747
    fpath = cls._ConvertPath(dev_path)
2748
    try:
2749
      utils.RemoveFile(fpath)
2750
    except EnvironmentError, err:
2751
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)