Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ e8d61457

History | View | Annotate | Download (97.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable-msg=E1103
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37

    
38
import os
39
import os.path
40
import shutil
41
import time
42
import stat
43
import errno
44
import re
45
import random
46
import logging
47
import tempfile
48
import zlib
49
import base64
50
import signal
51

    
52
from ganeti import errors
53
from ganeti import utils
54
from ganeti import ssh
55
from ganeti import hypervisor
56
from ganeti import constants
57
from ganeti import bdev
58
from ganeti import objects
59
from ganeti import ssconf
60
from ganeti import serializer
61
from ganeti import netutils
62

    
63

    
64
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
65
_ALLOWED_CLEAN_DIRS = frozenset([
66
  constants.DATA_DIR,
67
  constants.JOB_QUEUE_ARCHIVE_DIR,
68
  constants.QUEUE_DIR,
69
  constants.CRYPTO_KEYS_DIR,
70
  ])
71
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
72
_X509_KEY_FILE = "key"
73
_X509_CERT_FILE = "cert"
74
_IES_STATUS_FILE = "status"
75
_IES_PID_FILE = "pid"
76
_IES_CA_FILE = "ca"
77

    
78

    
79
class RPCFail(Exception):
80
  """Class denoting RPC failure.
81

82
  Its argument is the error message.
83

84
  """
85

    
86

    
87
def _Fail(msg, *args, **kwargs):
88
  """Log an error and the raise an RPCFail exception.
89

90
  This exception is then handled specially in the ganeti daemon and
91
  turned into a 'failed' return type. As such, this function is a
92
  useful shortcut for logging the error and returning it to the master
93
  daemon.
94

95
  @type msg: string
96
  @param msg: the text of the exception
97
  @raise RPCFail
98

99
  """
100
  if args:
101
    msg = msg % args
102
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
103
    if "exc" in kwargs and kwargs["exc"]:
104
      logging.exception(msg)
105
    else:
106
      logging.error(msg)
107
  raise RPCFail(msg)
108

    
109

    
110
def _GetConfig():
111
  """Simple wrapper to return a SimpleStore.
112

113
  @rtype: L{ssconf.SimpleStore}
114
  @return: a SimpleStore instance
115

116
  """
117
  return ssconf.SimpleStore()
118

    
119

    
120
def _GetSshRunner(cluster_name):
121
  """Simple wrapper to return an SshRunner.
122

123
  @type cluster_name: str
124
  @param cluster_name: the cluster name, which is needed
125
      by the SshRunner constructor
126
  @rtype: L{ssh.SshRunner}
127
  @return: an SshRunner instance
128

129
  """
130
  return ssh.SshRunner(cluster_name)
131

    
132

    
133
def _Decompress(data):
134
  """Unpacks data compressed by the RPC client.
135

136
  @type data: list or tuple
137
  @param data: Data sent by RPC client
138
  @rtype: str
139
  @return: Decompressed data
140

141
  """
142
  assert isinstance(data, (list, tuple))
143
  assert len(data) == 2
144
  (encoding, content) = data
145
  if encoding == constants.RPC_ENCODING_NONE:
146
    return content
147
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
148
    return zlib.decompress(base64.b64decode(content))
149
  else:
150
    raise AssertionError("Unknown data encoding")
151

    
152

    
153
def _CleanDirectory(path, exclude=None):
154
  """Removes all regular files in a directory.
155

156
  @type path: str
157
  @param path: the directory to clean
158
  @type exclude: list
159
  @param exclude: list of files to be excluded, defaults
160
      to the empty list
161

162
  """
163
  if path not in _ALLOWED_CLEAN_DIRS:
164
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
165
          path)
166

    
167
  if not os.path.isdir(path):
168
    return
169
  if exclude is None:
170
    exclude = []
171
  else:
172
    # Normalize excluded paths
173
    exclude = [os.path.normpath(i) for i in exclude]
174

    
175
  for rel_name in utils.ListVisibleFiles(path):
176
    full_name = utils.PathJoin(path, rel_name)
177
    if full_name in exclude:
178
      continue
179
    if os.path.isfile(full_name) and not os.path.islink(full_name):
180
      utils.RemoveFile(full_name)
181

    
182

    
183
def _BuildUploadFileList():
184
  """Build the list of allowed upload files.
185

186
  This is abstracted so that it's built only once at module import time.
187

188
  """
189
  allowed_files = set([
190
    constants.CLUSTER_CONF_FILE,
191
    constants.ETC_HOSTS,
192
    constants.SSH_KNOWN_HOSTS_FILE,
193
    constants.VNC_PASSWORD_FILE,
194
    constants.RAPI_CERT_FILE,
195
    constants.RAPI_USERS_FILE,
196
    constants.CONFD_HMAC_KEY,
197
    constants.CLUSTER_DOMAIN_SECRET_FILE,
198
    ])
199

    
200
  for hv_name in constants.HYPER_TYPES:
201
    hv_class = hypervisor.GetHypervisorClass(hv_name)
202
    allowed_files.update(hv_class.GetAncillaryFiles())
203

    
204
  return frozenset(allowed_files)
205

    
206

    
207
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
208

    
209

    
210
def JobQueuePurge():
211
  """Removes job queue files and archived jobs.
212

213
  @rtype: tuple
214
  @return: True, None
215

216
  """
217
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
218
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
219

    
220

    
221
def GetMasterInfo():
222
  """Returns master information.
223

224
  This is an utility function to compute master information, either
225
  for consumption here or from the node daemon.
226

227
  @rtype: tuple
228
  @return: master_netdev, master_ip, master_name
229
  @raise RPCFail: in case of errors
230

231
  """
232
  try:
233
    cfg = _GetConfig()
234
    master_netdev = cfg.GetMasterNetdev()
235
    master_ip = cfg.GetMasterIP()
236
    master_node = cfg.GetMasterNode()
237
  except errors.ConfigurationError, err:
238
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
239
  return (master_netdev, master_ip, master_node)
240

    
241

    
242
def StartMaster(start_daemons, no_voting):
243
  """Activate local node as master node.
244

245
  The function will either try activate the IP address of the master
246
  (unless someone else has it) or also start the master daemons, based
247
  on the start_daemons parameter.
248

249
  @type start_daemons: boolean
250
  @param start_daemons: whether to start the master daemons
251
      (ganeti-masterd and ganeti-rapi), or (if false) activate the
252
      master ip
253
  @type no_voting: boolean
254
  @param no_voting: whether to start ganeti-masterd without a node vote
255
      (if start_daemons is True), but still non-interactively
256
  @rtype: None
257

258
  """
259
  # GetMasterInfo will raise an exception if not able to return data
260
  master_netdev, master_ip, _ = GetMasterInfo()
261

    
262
  err_msgs = []
263
  # either start the master and rapi daemons
264
  if start_daemons:
265
    if no_voting:
266
      masterd_args = "--no-voting --yes-do-it"
267
    else:
268
      masterd_args = ""
269

    
270
    env = {
271
      "EXTRA_MASTERD_ARGS": masterd_args,
272
      }
273

    
274
    result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
275
    if result.failed:
276
      msg = "Can't start Ganeti master: %s" % result.output
277
      logging.error(msg)
278
      err_msgs.append(msg)
279
  # or activate the IP
280
  else:
281
    if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
282
      if netutils.IPAddress.Own(master_ip):
283
        # we already have the ip:
284
        logging.debug("Master IP already configured, doing nothing")
285
      else:
286
        msg = "Someone else has the master ip, not activating"
287
        logging.error(msg)
288
        err_msgs.append(msg)
289
    else:
290
      netmask = 32
291
      if netutils.IP6Address.IsValid(master_ip):
292
        netmask = 128
293

    
294
      result = utils.RunCmd(["ip", "address", "add",
295
                             "%s/%d" % (master_ip, netmask),
296
                             "dev", master_netdev, "label",
297
                             "%s:0" % master_netdev])
298
      if result.failed:
299
        msg = "Can't activate master IP: %s" % result.output
300
        logging.error(msg)
301
        err_msgs.append(msg)
302

    
303
      # we ignore the exit code of the following cmds
304
      if netutils.IP4Address.IsValid(master_ip):
305
        utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
306
                      master_ip, master_ip])
307
      elif netutils.IP6Address.IsValid(master_ip):
308
        utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
309

    
310
  if err_msgs:
311
    _Fail("; ".join(err_msgs))
312

    
313

    
314
def StopMaster(stop_daemons):
315
  """Deactivate this node as master.
316

317
  The function will always try to deactivate the IP address of the
318
  master. It will also stop the master daemons depending on the
319
  stop_daemons parameter.
320

321
  @type stop_daemons: boolean
322
  @param stop_daemons: whether to also stop the master daemons
323
      (ganeti-masterd and ganeti-rapi)
324
  @rtype: None
325

326
  """
327
  # TODO: log and report back to the caller the error failures; we
328
  # need to decide in which case we fail the RPC for this
329

    
330
  # GetMasterInfo will raise an exception if not able to return data
331
  master_netdev, master_ip, _ = GetMasterInfo()
332

    
333
  netmask = 32
334
  if netutils.IP6Address.IsValid(master_ip):
335
    netmask = 128
336

    
337
  result = utils.RunCmd(["ip", "address", "del",
338
                         "%s/%d" % (master_ip, netmask),
339
                         "dev", master_netdev])
340
  if result.failed:
341
    logging.error("Can't remove the master IP, error: %s", result.output)
342
    # but otherwise ignore the failure
343

    
344
  if stop_daemons:
345
    result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
346
    if result.failed:
347
      logging.error("Could not stop Ganeti master, command %s had exitcode %s"
348
                    " and error %s",
349
                    result.cmd, result.exit_code, result.output)
350

    
351

    
352
def LeaveCluster(modify_ssh_setup):
353
  """Cleans up and remove the current node.
354

355
  This function cleans up and prepares the current node to be removed
356
  from the cluster.
357

358
  If processing is successful, then it raises an
359
  L{errors.QuitGanetiException} which is used as a special case to
360
  shutdown the node daemon.
361

362
  @param modify_ssh_setup: boolean
363

364
  """
365
  _CleanDirectory(constants.DATA_DIR)
366
  _CleanDirectory(constants.CRYPTO_KEYS_DIR)
367
  JobQueuePurge()
368

    
369
  if modify_ssh_setup:
370
    try:
371
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
372

    
373
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
374

    
375
      utils.RemoveFile(priv_key)
376
      utils.RemoveFile(pub_key)
377
    except errors.OpExecError:
378
      logging.exception("Error while processing ssh files")
379

    
380
  try:
381
    utils.RemoveFile(constants.CONFD_HMAC_KEY)
382
    utils.RemoveFile(constants.RAPI_CERT_FILE)
383
    utils.RemoveFile(constants.NODED_CERT_FILE)
384
  except: # pylint: disable-msg=W0702
385
    logging.exception("Error while removing cluster secrets")
386

    
387
  result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
388
  if result.failed:
389
    logging.error("Command %s failed with exitcode %s and error %s",
390
                  result.cmd, result.exit_code, result.output)
391

    
392
  # Raise a custom exception (handled in ganeti-noded)
393
  raise errors.QuitGanetiException(True, 'Shutdown scheduled')
394

    
395

    
396
def GetNodeInfo(vgname, hypervisor_type):
397
  """Gives back a hash with different information about the node.
398

399
  @type vgname: C{string}
400
  @param vgname: the name of the volume group to ask for disk space information
401
  @type hypervisor_type: C{str}
402
  @param hypervisor_type: the name of the hypervisor to ask for
403
      memory information
404
  @rtype: C{dict}
405
  @return: dictionary with the following keys:
406
      - vg_size is the size of the configured volume group in MiB
407
      - vg_free is the free size of the volume group in MiB
408
      - memory_dom0 is the memory allocated for domain0 in MiB
409
      - memory_free is the currently available (free) ram in MiB
410
      - memory_total is the total number of ram in MiB
411

412
  """
413
  outputarray = {}
414
  vginfo = _GetVGInfo(vgname)
415
  outputarray['vg_size'] = vginfo['vg_size']
416
  outputarray['vg_free'] = vginfo['vg_free']
417

    
418
  hyper = hypervisor.GetHypervisor(hypervisor_type)
419
  hyp_info = hyper.GetNodeInfo()
420
  if hyp_info is not None:
421
    outputarray.update(hyp_info)
422

    
423
  outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
424

    
425
  return outputarray
426

    
427

    
428
def VerifyNode(what, cluster_name):
429
  """Verify the status of the local node.
430

431
  Based on the input L{what} parameter, various checks are done on the
432
  local node.
433

434
  If the I{filelist} key is present, this list of
435
  files is checksummed and the file/checksum pairs are returned.
436

437
  If the I{nodelist} key is present, we check that we have
438
  connectivity via ssh with the target nodes (and check the hostname
439
  report).
440

441
  If the I{node-net-test} key is present, we check that we have
442
  connectivity to the given nodes via both primary IP and, if
443
  applicable, secondary IPs.
444

445
  @type what: C{dict}
446
  @param what: a dictionary of things to check:
447
      - filelist: list of files for which to compute checksums
448
      - nodelist: list of nodes we should check ssh communication with
449
      - node-net-test: list of nodes we should check node daemon port
450
        connectivity with
451
      - hypervisor: list with hypervisors to run the verify for
452
  @rtype: dict
453
  @return: a dictionary with the same keys as the input dict, and
454
      values representing the result of the checks
455

456
  """
457
  result = {}
458
  my_name = netutils.Hostname.GetSysName()
459
  port = netutils.GetDaemonPort(constants.NODED)
460

    
461
  if constants.NV_HYPERVISOR in what:
462
    result[constants.NV_HYPERVISOR] = tmp = {}
463
    for hv_name in what[constants.NV_HYPERVISOR]:
464
      try:
465
        val = hypervisor.GetHypervisor(hv_name).Verify()
466
      except errors.HypervisorError, err:
467
        val = "Error while checking hypervisor: %s" % str(err)
468
      tmp[hv_name] = val
469

    
470
  if constants.NV_FILELIST in what:
471
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
472
      what[constants.NV_FILELIST])
473

    
474
  if constants.NV_NODELIST in what:
475
    result[constants.NV_NODELIST] = tmp = {}
476
    random.shuffle(what[constants.NV_NODELIST])
477
    for node in what[constants.NV_NODELIST]:
478
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
479
      if not success:
480
        tmp[node] = message
481

    
482
  if constants.NV_NODENETTEST in what:
483
    result[constants.NV_NODENETTEST] = tmp = {}
484
    my_pip = my_sip = None
485
    for name, pip, sip in what[constants.NV_NODENETTEST]:
486
      if name == my_name:
487
        my_pip = pip
488
        my_sip = sip
489
        break
490
    if not my_pip:
491
      tmp[my_name] = ("Can't find my own primary/secondary IP"
492
                      " in the node list")
493
    else:
494
      for name, pip, sip in what[constants.NV_NODENETTEST]:
495
        fail = []
496
        if not netutils.TcpPing(pip, port, source=my_pip):
497
          fail.append("primary")
498
        if sip != pip:
499
          if not netutils.TcpPing(sip, port, source=my_sip):
500
            fail.append("secondary")
501
        if fail:
502
          tmp[name] = ("failure using the %s interface(s)" %
503
                       " and ".join(fail))
504

    
505
  if constants.NV_MASTERIP in what:
506
    # FIXME: add checks on incoming data structures (here and in the
507
    # rest of the function)
508
    master_name, master_ip = what[constants.NV_MASTERIP]
509
    if master_name == my_name:
510
      source = constants.IP4_ADDRESS_LOCALHOST
511
    else:
512
      source = None
513
    result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
514
                                                  source=source)
515

    
516
  if constants.NV_LVLIST in what:
517
    try:
518
      val = GetVolumeList(what[constants.NV_LVLIST])
519
    except RPCFail, err:
520
      val = str(err)
521
    result[constants.NV_LVLIST] = val
522

    
523
  if constants.NV_INSTANCELIST in what:
524
    # GetInstanceList can fail
525
    try:
526
      val = GetInstanceList(what[constants.NV_INSTANCELIST])
527
    except RPCFail, err:
528
      val = str(err)
529
    result[constants.NV_INSTANCELIST] = val
530

    
531
  if constants.NV_VGLIST in what:
532
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
533

    
534
  if constants.NV_PVLIST in what:
535
    result[constants.NV_PVLIST] = \
536
      bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
537
                                   filter_allocatable=False)
538

    
539
  if constants.NV_VERSION in what:
540
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
541
                                    constants.RELEASE_VERSION)
542

    
543
  if constants.NV_HVINFO in what:
544
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
545
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
546

    
547
  if constants.NV_DRBDLIST in what:
548
    try:
549
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
550
    except errors.BlockDeviceError, err:
551
      logging.warning("Can't get used minors list", exc_info=True)
552
      used_minors = str(err)
553
    result[constants.NV_DRBDLIST] = used_minors
554

    
555
  if constants.NV_DRBDHELPER in what:
556
    status = True
557
    try:
558
      payload = bdev.BaseDRBD.GetUsermodeHelper()
559
    except errors.BlockDeviceError, err:
560
      logging.error("Can't get DRBD usermode helper: %s", str(err))
561
      status = False
562
      payload = str(err)
563
    result[constants.NV_DRBDHELPER] = (status, payload)
564

    
565
  if constants.NV_NODESETUP in what:
566
    result[constants.NV_NODESETUP] = tmpr = []
567
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
568
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
569
                  " under /sys, missing required directories /sys/block"
570
                  " and /sys/class/net")
571
    if (not os.path.isdir("/proc/sys") or
572
        not os.path.isfile("/proc/sysrq-trigger")):
573
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
574
                  " under /proc, missing required directory /proc/sys and"
575
                  " the file /proc/sysrq-trigger")
576

    
577
  if constants.NV_TIME in what:
578
    result[constants.NV_TIME] = utils.SplitTime(time.time())
579

    
580
  if constants.NV_OSLIST in what:
581
    result[constants.NV_OSLIST] = DiagnoseOS()
582

    
583
  return result
584

    
585

    
586
def GetVolumeList(vg_name):
587
  """Compute list of logical volumes and their size.
588

589
  @type vg_name: str
590
  @param vg_name: the volume group whose LVs we should list
591
  @rtype: dict
592
  @return:
593
      dictionary of all partions (key) with value being a tuple of
594
      their size (in MiB), inactive and online status::
595

596
        {'test1': ('20.06', True, True)}
597

598
      in case of errors, a string is returned with the error
599
      details.
600

601
  """
602
  lvs = {}
603
  sep = '|'
604
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
605
                         "--separator=%s" % sep,
606
                         "-olv_name,lv_size,lv_attr", vg_name])
607
  if result.failed:
608
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
609

    
610
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
611
  for line in result.stdout.splitlines():
612
    line = line.strip()
613
    match = valid_line_re.match(line)
614
    if not match:
615
      logging.error("Invalid line returned from lvs output: '%s'", line)
616
      continue
617
    name, size, attr = match.groups()
618
    inactive = attr[4] == '-'
619
    online = attr[5] == 'o'
620
    virtual = attr[0] == 'v'
621
    if virtual:
622
      # we don't want to report such volumes as existing, since they
623
      # don't really hold data
624
      continue
625
    lvs[name] = (size, inactive, online)
626

    
627
  return lvs
628

    
629

    
630
def ListVolumeGroups():
631
  """List the volume groups and their size.
632

633
  @rtype: dict
634
  @return: dictionary with keys volume name and values the
635
      size of the volume
636

637
  """
638
  return utils.ListVolumeGroups()
639

    
640

    
641
def NodeVolumes():
642
  """List all volumes on this node.
643

644
  @rtype: list
645
  @return:
646
    A list of dictionaries, each having four keys:
647
      - name: the logical volume name,
648
      - size: the size of the logical volume
649
      - dev: the physical device on which the LV lives
650
      - vg: the volume group to which it belongs
651

652
    In case of errors, we return an empty list and log the
653
    error.
654

655
    Note that since a logical volume can live on multiple physical
656
    volumes, the resulting list might include a logical volume
657
    multiple times.
658

659
  """
660
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
661
                         "--separator=|",
662
                         "--options=lv_name,lv_size,devices,vg_name"])
663
  if result.failed:
664
    _Fail("Failed to list logical volumes, lvs output: %s",
665
          result.output)
666

    
667
  def parse_dev(dev):
668
    return dev.split('(')[0]
669

    
670
  def handle_dev(dev):
671
    return [parse_dev(x) for x in dev.split(",")]
672

    
673
  def map_line(line):
674
    line = [v.strip() for v in line]
675
    return [{'name': line[0], 'size': line[1],
676
             'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
677

    
678
  all_devs = []
679
  for line in result.stdout.splitlines():
680
    if line.count('|') >= 3:
681
      all_devs.extend(map_line(line.split('|')))
682
    else:
683
      logging.warning("Strange line in the output from lvs: '%s'", line)
684
  return all_devs
685

    
686

    
687
def BridgesExist(bridges_list):
688
  """Check if a list of bridges exist on the current node.
689

690
  @rtype: boolean
691
  @return: C{True} if all of them exist, C{False} otherwise
692

693
  """
694
  missing = []
695
  for bridge in bridges_list:
696
    if not utils.BridgeExists(bridge):
697
      missing.append(bridge)
698

    
699
  if missing:
700
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
701

    
702

    
703
def GetInstanceList(hypervisor_list):
704
  """Provides a list of instances.
705

706
  @type hypervisor_list: list
707
  @param hypervisor_list: the list of hypervisors to query information
708

709
  @rtype: list
710
  @return: a list of all running instances on the current node
711
    - instance1.example.com
712
    - instance2.example.com
713

714
  """
715
  results = []
716
  for hname in hypervisor_list:
717
    try:
718
      names = hypervisor.GetHypervisor(hname).ListInstances()
719
      results.extend(names)
720
    except errors.HypervisorError, err:
721
      _Fail("Error enumerating instances (hypervisor %s): %s",
722
            hname, err, exc=True)
723

    
724
  return results
725

    
726

    
727
def GetInstanceInfo(instance, hname):
728
  """Gives back the information about an instance as a dictionary.
729

730
  @type instance: string
731
  @param instance: the instance name
732
  @type hname: string
733
  @param hname: the hypervisor type of the instance
734

735
  @rtype: dict
736
  @return: dictionary with the following keys:
737
      - memory: memory size of instance (int)
738
      - state: xen state of instance (string)
739
      - time: cpu time of instance (float)
740

741
  """
742
  output = {}
743

    
744
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
745
  if iinfo is not None:
746
    output['memory'] = iinfo[2]
747
    output['state'] = iinfo[4]
748
    output['time'] = iinfo[5]
749

    
750
  return output
751

    
752

    
753
def GetInstanceMigratable(instance):
754
  """Gives whether an instance can be migrated.
755

756
  @type instance: L{objects.Instance}
757
  @param instance: object representing the instance to be checked.
758

759
  @rtype: tuple
760
  @return: tuple of (result, description) where:
761
      - result: whether the instance can be migrated or not
762
      - description: a description of the issue, if relevant
763

764
  """
765
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
766
  iname = instance.name
767
  if iname not in hyper.ListInstances():
768
    _Fail("Instance %s is not running", iname)
769

    
770
  for idx in range(len(instance.disks)):
771
    link_name = _GetBlockDevSymlinkPath(iname, idx)
772
    if not os.path.islink(link_name):
773
      logging.warning("Instance %s is missing symlink %s for disk %d",
774
                      iname, link_name, idx)
775

    
776

    
777
def GetAllInstancesInfo(hypervisor_list):
778
  """Gather data about all instances.
779

780
  This is the equivalent of L{GetInstanceInfo}, except that it
781
  computes data for all instances at once, thus being faster if one
782
  needs data about more than one instance.
783

784
  @type hypervisor_list: list
785
  @param hypervisor_list: list of hypervisors to query for instance data
786

787
  @rtype: dict
788
  @return: dictionary of instance: data, with data having the following keys:
789
      - memory: memory size of instance (int)
790
      - state: xen state of instance (string)
791
      - time: cpu time of instance (float)
792
      - vcpus: the number of vcpus
793

794
  """
795
  output = {}
796

    
797
  for hname in hypervisor_list:
798
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
799
    if iinfo:
800
      for name, _, memory, vcpus, state, times in iinfo:
801
        value = {
802
          'memory': memory,
803
          'vcpus': vcpus,
804
          'state': state,
805
          'time': times,
806
          }
807
        if name in output:
808
          # we only check static parameters, like memory and vcpus,
809
          # and not state and time which can change between the
810
          # invocations of the different hypervisors
811
          for key in 'memory', 'vcpus':
812
            if value[key] != output[name][key]:
813
              _Fail("Instance %s is running twice"
814
                    " with different parameters", name)
815
        output[name] = value
816

    
817
  return output
818

    
819

    
820
def _InstanceLogName(kind, os_name, instance):
821
  """Compute the OS log filename for a given instance and operation.
822

823
  The instance name and os name are passed in as strings since not all
824
  operations have these as part of an instance object.
825

826
  @type kind: string
827
  @param kind: the operation type (e.g. add, import, etc.)
828
  @type os_name: string
829
  @param os_name: the os name
830
  @type instance: string
831
  @param instance: the name of the instance being imported/added/etc.
832

833
  """
834
  # TODO: Use tempfile.mkstemp to create unique filename
835
  base = ("%s-%s-%s-%s.log" %
836
          (kind, os_name, instance, utils.TimestampForFilename()))
837
  return utils.PathJoin(constants.LOG_OS_DIR, base)
838

    
839

    
840
def InstanceOsAdd(instance, reinstall, debug):
841
  """Add an OS to an instance.
842

843
  @type instance: L{objects.Instance}
844
  @param instance: Instance whose OS is to be installed
845
  @type reinstall: boolean
846
  @param reinstall: whether this is an instance reinstall
847
  @type debug: integer
848
  @param debug: debug level, passed to the OS scripts
849
  @rtype: None
850

851
  """
852
  inst_os = OSFromDisk(instance.os)
853

    
854
  create_env = OSEnvironment(instance, inst_os, debug)
855
  if reinstall:
856
    create_env['INSTANCE_REINSTALL'] = "1"
857

    
858
  logfile = _InstanceLogName("add", instance.os, instance.name)
859

    
860
  result = utils.RunCmd([inst_os.create_script], env=create_env,
861
                        cwd=inst_os.path, output=logfile,)
862
  if result.failed:
863
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
864
                  " output: %s", result.cmd, result.fail_reason, logfile,
865
                  result.output)
866
    lines = [utils.SafeEncode(val)
867
             for val in utils.TailFile(logfile, lines=20)]
868
    _Fail("OS create script failed (%s), last lines in the"
869
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
870

    
871

    
872
def RunRenameInstance(instance, old_name, debug):
873
  """Run the OS rename script for an instance.
874

875
  @type instance: L{objects.Instance}
876
  @param instance: Instance whose OS is to be installed
877
  @type old_name: string
878
  @param old_name: previous instance name
879
  @type debug: integer
880
  @param debug: debug level, passed to the OS scripts
881
  @rtype: boolean
882
  @return: the success of the operation
883

884
  """
885
  inst_os = OSFromDisk(instance.os)
886

    
887
  rename_env = OSEnvironment(instance, inst_os, debug)
888
  rename_env['OLD_INSTANCE_NAME'] = old_name
889

    
890
  logfile = _InstanceLogName("rename", instance.os,
891
                             "%s-%s" % (old_name, instance.name))
892

    
893
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
894
                        cwd=inst_os.path, output=logfile)
895

    
896
  if result.failed:
897
    logging.error("os create command '%s' returned error: %s output: %s",
898
                  result.cmd, result.fail_reason, result.output)
899
    lines = [utils.SafeEncode(val)
900
             for val in utils.TailFile(logfile, lines=20)]
901
    _Fail("OS rename script failed (%s), last lines in the"
902
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
903

    
904

    
905
def _GetVGInfo(vg_name):
906
  """Get information about the volume group.
907

908
  @type vg_name: str
909
  @param vg_name: the volume group which we query
910
  @rtype: dict
911
  @return:
912
    A dictionary with the following keys:
913
      - C{vg_size} is the total size of the volume group in MiB
914
      - C{vg_free} is the free size of the volume group in MiB
915
      - C{pv_count} are the number of physical disks in that VG
916

917
    If an error occurs during gathering of data, we return the same dict
918
    with keys all set to None.
919

920
  """
921
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
922

    
923
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
924
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
925

    
926
  if retval.failed:
927
    logging.error("volume group %s not present", vg_name)
928
    return retdic
929
  valarr = retval.stdout.strip().rstrip(':').split(':')
930
  if len(valarr) == 3:
931
    try:
932
      retdic = {
933
        "vg_size": int(round(float(valarr[0]), 0)),
934
        "vg_free": int(round(float(valarr[1]), 0)),
935
        "pv_count": int(valarr[2]),
936
        }
937
    except (TypeError, ValueError), err:
938
      logging.exception("Fail to parse vgs output: %s", err)
939
  else:
940
    logging.error("vgs output has the wrong number of fields (expected"
941
                  " three): %s", str(valarr))
942
  return retdic
943

    
944

    
945
def _GetBlockDevSymlinkPath(instance_name, idx):
946
  return utils.PathJoin(constants.DISK_LINKS_DIR,
947
                        "%s:%d" % (instance_name, idx))
948

    
949

    
950
def _SymlinkBlockDev(instance_name, device_path, idx):
951
  """Set up symlinks to a instance's block device.
952

953
  This is an auxiliary function run when an instance is start (on the primary
954
  node) or when an instance is migrated (on the target node).
955

956

957
  @param instance_name: the name of the target instance
958
  @param device_path: path of the physical block device, on the node
959
  @param idx: the disk index
960
  @return: absolute path to the disk's symlink
961

962
  """
963
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
964
  try:
965
    os.symlink(device_path, link_name)
966
  except OSError, err:
967
    if err.errno == errno.EEXIST:
968
      if (not os.path.islink(link_name) or
969
          os.readlink(link_name) != device_path):
970
        os.remove(link_name)
971
        os.symlink(device_path, link_name)
972
    else:
973
      raise
974

    
975
  return link_name
976

    
977

    
978
def _RemoveBlockDevLinks(instance_name, disks):
979
  """Remove the block device symlinks belonging to the given instance.
980

981
  """
982
  for idx, _ in enumerate(disks):
983
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
984
    if os.path.islink(link_name):
985
      try:
986
        os.remove(link_name)
987
      except OSError:
988
        logging.exception("Can't remove symlink '%s'", link_name)
989

    
990

    
991
def _GatherAndLinkBlockDevs(instance):
992
  """Set up an instance's block device(s).
993

994
  This is run on the primary node at instance startup. The block
995
  devices must be already assembled.
996

997
  @type instance: L{objects.Instance}
998
  @param instance: the instance whose disks we shoul assemble
999
  @rtype: list
1000
  @return: list of (disk_object, device_path)
1001

1002
  """
1003
  block_devices = []
1004
  for idx, disk in enumerate(instance.disks):
1005
    device = _RecursiveFindBD(disk)
1006
    if device is None:
1007
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1008
                                    str(disk))
1009
    device.Open()
1010
    try:
1011
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1012
    except OSError, e:
1013
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1014
                                    e.strerror)
1015

    
1016
    block_devices.append((disk, link_name))
1017

    
1018
  return block_devices
1019

    
1020

    
1021
def StartInstance(instance):
1022
  """Start an instance.
1023

1024
  @type instance: L{objects.Instance}
1025
  @param instance: the instance object
1026
  @rtype: None
1027

1028
  """
1029
  running_instances = GetInstanceList([instance.hypervisor])
1030

    
1031
  if instance.name in running_instances:
1032
    logging.info("Instance %s already running, not starting", instance.name)
1033
    return
1034

    
1035
  try:
1036
    block_devices = _GatherAndLinkBlockDevs(instance)
1037
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1038
    hyper.StartInstance(instance, block_devices)
1039
  except errors.BlockDeviceError, err:
1040
    _Fail("Block device error: %s", err, exc=True)
1041
  except errors.HypervisorError, err:
1042
    _RemoveBlockDevLinks(instance.name, instance.disks)
1043
    _Fail("Hypervisor error: %s", err, exc=True)
1044

    
1045

    
1046
def InstanceShutdown(instance, timeout):
1047
  """Shut an instance down.
1048

1049
  @note: this functions uses polling with a hardcoded timeout.
1050

1051
  @type instance: L{objects.Instance}
1052
  @param instance: the instance object
1053
  @type timeout: integer
1054
  @param timeout: maximum timeout for soft shutdown
1055
  @rtype: None
1056

1057
  """
1058
  hv_name = instance.hypervisor
1059
  hyper = hypervisor.GetHypervisor(hv_name)
1060
  iname = instance.name
1061

    
1062
  if instance.name not in hyper.ListInstances():
1063
    logging.info("Instance %s not running, doing nothing", iname)
1064
    return
1065

    
1066
  class _TryShutdown:
1067
    def __init__(self):
1068
      self.tried_once = False
1069

    
1070
    def __call__(self):
1071
      if iname not in hyper.ListInstances():
1072
        return
1073

    
1074
      try:
1075
        hyper.StopInstance(instance, retry=self.tried_once)
1076
      except errors.HypervisorError, err:
1077
        if iname not in hyper.ListInstances():
1078
          # if the instance is no longer existing, consider this a
1079
          # success and go to cleanup
1080
          return
1081

    
1082
        _Fail("Failed to stop instance %s: %s", iname, err)
1083

    
1084
      self.tried_once = True
1085

    
1086
      raise utils.RetryAgain()
1087

    
1088
  try:
1089
    utils.Retry(_TryShutdown(), 5, timeout)
1090
  except utils.RetryTimeout:
1091
    # the shutdown did not succeed
1092
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1093

    
1094
    try:
1095
      hyper.StopInstance(instance, force=True)
1096
    except errors.HypervisorError, err:
1097
      if iname in hyper.ListInstances():
1098
        # only raise an error if the instance still exists, otherwise
1099
        # the error could simply be "instance ... unknown"!
1100
        _Fail("Failed to force stop instance %s: %s", iname, err)
1101

    
1102
    time.sleep(1)
1103

    
1104
    if iname in hyper.ListInstances():
1105
      _Fail("Could not shutdown instance %s even by destroy", iname)
1106

    
1107
  try:
1108
    hyper.CleanupInstance(instance.name)
1109
  except errors.HypervisorError, err:
1110
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1111

    
1112
  _RemoveBlockDevLinks(iname, instance.disks)
1113

    
1114

    
1115
def InstanceReboot(instance, reboot_type, shutdown_timeout):
1116
  """Reboot an instance.
1117

1118
  @type instance: L{objects.Instance}
1119
  @param instance: the instance object to reboot
1120
  @type reboot_type: str
1121
  @param reboot_type: the type of reboot, one the following
1122
    constants:
1123
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1124
        instance OS, do not recreate the VM
1125
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1126
        restart the VM (at the hypervisor level)
1127
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1128
        not accepted here, since that mode is handled differently, in
1129
        cmdlib, and translates into full stop and start of the
1130
        instance (instead of a call_instance_reboot RPC)
1131
  @type shutdown_timeout: integer
1132
  @param shutdown_timeout: maximum timeout for soft shutdown
1133
  @rtype: None
1134

1135
  """
1136
  running_instances = GetInstanceList([instance.hypervisor])
1137

    
1138
  if instance.name not in running_instances:
1139
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1140

    
1141
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1142
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1143
    try:
1144
      hyper.RebootInstance(instance)
1145
    except errors.HypervisorError, err:
1146
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1147
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1148
    try:
1149
      InstanceShutdown(instance, shutdown_timeout)
1150
      return StartInstance(instance)
1151
    except errors.HypervisorError, err:
1152
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1153
  else:
1154
    _Fail("Invalid reboot_type received: %s", reboot_type)
1155

    
1156

    
1157
def MigrationInfo(instance):
1158
  """Gather information about an instance to be migrated.
1159

1160
  @type instance: L{objects.Instance}
1161
  @param instance: the instance definition
1162

1163
  """
1164
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1165
  try:
1166
    info = hyper.MigrationInfo(instance)
1167
  except errors.HypervisorError, err:
1168
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1169
  return info
1170

    
1171

    
1172
def AcceptInstance(instance, info, target):
1173
  """Prepare the node to accept an instance.
1174

1175
  @type instance: L{objects.Instance}
1176
  @param instance: the instance definition
1177
  @type info: string/data (opaque)
1178
  @param info: migration information, from the source node
1179
  @type target: string
1180
  @param target: target host (usually ip), on this node
1181

1182
  """
1183
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1184
  try:
1185
    hyper.AcceptInstance(instance, info, target)
1186
  except errors.HypervisorError, err:
1187
    _Fail("Failed to accept instance: %s", err, exc=True)
1188

    
1189

    
1190
def FinalizeMigration(instance, info, success):
1191
  """Finalize any preparation to accept an instance.
1192

1193
  @type instance: L{objects.Instance}
1194
  @param instance: the instance definition
1195
  @type info: string/data (opaque)
1196
  @param info: migration information, from the source node
1197
  @type success: boolean
1198
  @param success: whether the migration was a success or a failure
1199

1200
  """
1201
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1202
  try:
1203
    hyper.FinalizeMigration(instance, info, success)
1204
  except errors.HypervisorError, err:
1205
    _Fail("Failed to finalize migration: %s", err, exc=True)
1206

    
1207

    
1208
def MigrateInstance(instance, target, live):
1209
  """Migrates an instance to another node.
1210

1211
  @type instance: L{objects.Instance}
1212
  @param instance: the instance definition
1213
  @type target: string
1214
  @param target: the target node name
1215
  @type live: boolean
1216
  @param live: whether the migration should be done live or not (the
1217
      interpretation of this parameter is left to the hypervisor)
1218
  @rtype: tuple
1219
  @return: a tuple of (success, msg) where:
1220
      - succes is a boolean denoting the success/failure of the operation
1221
      - msg is a string with details in case of failure
1222

1223
  """
1224
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1225

    
1226
  try:
1227
    hyper.MigrateInstance(instance, target, live)
1228
  except errors.HypervisorError, err:
1229
    _Fail("Failed to migrate instance: %s", err, exc=True)
1230

    
1231

    
1232
def BlockdevCreate(disk, size, owner, on_primary, info):
1233
  """Creates a block device for an instance.
1234

1235
  @type disk: L{objects.Disk}
1236
  @param disk: the object describing the disk we should create
1237
  @type size: int
1238
  @param size: the size of the physical underlying device, in MiB
1239
  @type owner: str
1240
  @param owner: the name of the instance for which disk is created,
1241
      used for device cache data
1242
  @type on_primary: boolean
1243
  @param on_primary:  indicates if it is the primary node or not
1244
  @type info: string
1245
  @param info: string that will be sent to the physical device
1246
      creation, used for example to set (LVM) tags on LVs
1247

1248
  @return: the new unique_id of the device (this can sometime be
1249
      computed only after creation), or None. On secondary nodes,
1250
      it's not required to return anything.
1251

1252
  """
1253
  # TODO: remove the obsolete 'size' argument
1254
  # pylint: disable-msg=W0613
1255
  clist = []
1256
  if disk.children:
1257
    for child in disk.children:
1258
      try:
1259
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1260
      except errors.BlockDeviceError, err:
1261
        _Fail("Can't assemble device %s: %s", child, err)
1262
      if on_primary or disk.AssembleOnSecondary():
1263
        # we need the children open in case the device itself has to
1264
        # be assembled
1265
        try:
1266
          # pylint: disable-msg=E1103
1267
          crdev.Open()
1268
        except errors.BlockDeviceError, err:
1269
          _Fail("Can't make child '%s' read-write: %s", child, err)
1270
      clist.append(crdev)
1271

    
1272
  try:
1273
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1274
  except errors.BlockDeviceError, err:
1275
    _Fail("Can't create block device: %s", err)
1276

    
1277
  if on_primary or disk.AssembleOnSecondary():
1278
    try:
1279
      device.Assemble()
1280
    except errors.BlockDeviceError, err:
1281
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1282
    device.SetSyncSpeed(constants.SYNC_SPEED)
1283
    if on_primary or disk.OpenOnSecondary():
1284
      try:
1285
        device.Open(force=True)
1286
      except errors.BlockDeviceError, err:
1287
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1288
    DevCacheManager.UpdateCache(device.dev_path, owner,
1289
                                on_primary, disk.iv_name)
1290

    
1291
  device.SetInfo(info)
1292

    
1293
  return device.unique_id
1294

    
1295

    
1296
def BlockdevRemove(disk):
1297
  """Remove a block device.
1298

1299
  @note: This is intended to be called recursively.
1300

1301
  @type disk: L{objects.Disk}
1302
  @param disk: the disk object we should remove
1303
  @rtype: boolean
1304
  @return: the success of the operation
1305

1306
  """
1307
  msgs = []
1308
  try:
1309
    rdev = _RecursiveFindBD(disk)
1310
  except errors.BlockDeviceError, err:
1311
    # probably can't attach
1312
    logging.info("Can't attach to device %s in remove", disk)
1313
    rdev = None
1314
  if rdev is not None:
1315
    r_path = rdev.dev_path
1316
    try:
1317
      rdev.Remove()
1318
    except errors.BlockDeviceError, err:
1319
      msgs.append(str(err))
1320
    if not msgs:
1321
      DevCacheManager.RemoveCache(r_path)
1322

    
1323
  if disk.children:
1324
    for child in disk.children:
1325
      try:
1326
        BlockdevRemove(child)
1327
      except RPCFail, err:
1328
        msgs.append(str(err))
1329

    
1330
  if msgs:
1331
    _Fail("; ".join(msgs))
1332

    
1333

    
1334
def _RecursiveAssembleBD(disk, owner, as_primary):
1335
  """Activate a block device for an instance.
1336

1337
  This is run on the primary and secondary nodes for an instance.
1338

1339
  @note: this function is called recursively.
1340

1341
  @type disk: L{objects.Disk}
1342
  @param disk: the disk we try to assemble
1343
  @type owner: str
1344
  @param owner: the name of the instance which owns the disk
1345
  @type as_primary: boolean
1346
  @param as_primary: if we should make the block device
1347
      read/write
1348

1349
  @return: the assembled device or None (in case no device
1350
      was assembled)
1351
  @raise errors.BlockDeviceError: in case there is an error
1352
      during the activation of the children or the device
1353
      itself
1354

1355
  """
1356
  children = []
1357
  if disk.children:
1358
    mcn = disk.ChildrenNeeded()
1359
    if mcn == -1:
1360
      mcn = 0 # max number of Nones allowed
1361
    else:
1362
      mcn = len(disk.children) - mcn # max number of Nones
1363
    for chld_disk in disk.children:
1364
      try:
1365
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1366
      except errors.BlockDeviceError, err:
1367
        if children.count(None) >= mcn:
1368
          raise
1369
        cdev = None
1370
        logging.error("Error in child activation (but continuing): %s",
1371
                      str(err))
1372
      children.append(cdev)
1373

    
1374
  if as_primary or disk.AssembleOnSecondary():
1375
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1376
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1377
    result = r_dev
1378
    if as_primary or disk.OpenOnSecondary():
1379
      r_dev.Open()
1380
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1381
                                as_primary, disk.iv_name)
1382

    
1383
  else:
1384
    result = True
1385
  return result
1386

    
1387

    
1388
def BlockdevAssemble(disk, owner, as_primary):
1389
  """Activate a block device for an instance.
1390

1391
  This is a wrapper over _RecursiveAssembleBD.
1392

1393
  @rtype: str or boolean
1394
  @return: a C{/dev/...} path for primary nodes, and
1395
      C{True} for secondary nodes
1396

1397
  """
1398
  try:
1399
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1400
    if isinstance(result, bdev.BlockDev):
1401
      # pylint: disable-msg=E1103
1402
      result = result.dev_path
1403
  except errors.BlockDeviceError, err:
1404
    _Fail("Error while assembling disk: %s", err, exc=True)
1405

    
1406
  return result
1407

    
1408

    
1409
def BlockdevShutdown(disk):
1410
  """Shut down a block device.
1411

1412
  First, if the device is assembled (Attach() is successful), then
1413
  the device is shutdown. Then the children of the device are
1414
  shutdown.
1415

1416
  This function is called recursively. Note that we don't cache the
1417
  children or such, as oppossed to assemble, shutdown of different
1418
  devices doesn't require that the upper device was active.
1419

1420
  @type disk: L{objects.Disk}
1421
  @param disk: the description of the disk we should
1422
      shutdown
1423
  @rtype: None
1424

1425
  """
1426
  msgs = []
1427
  r_dev = _RecursiveFindBD(disk)
1428
  if r_dev is not None:
1429
    r_path = r_dev.dev_path
1430
    try:
1431
      r_dev.Shutdown()
1432
      DevCacheManager.RemoveCache(r_path)
1433
    except errors.BlockDeviceError, err:
1434
      msgs.append(str(err))
1435

    
1436
  if disk.children:
1437
    for child in disk.children:
1438
      try:
1439
        BlockdevShutdown(child)
1440
      except RPCFail, err:
1441
        msgs.append(str(err))
1442

    
1443
  if msgs:
1444
    _Fail("; ".join(msgs))
1445

    
1446

    
1447
def BlockdevAddchildren(parent_cdev, new_cdevs):
1448
  """Extend a mirrored block device.
1449

1450
  @type parent_cdev: L{objects.Disk}
1451
  @param parent_cdev: the disk to which we should add children
1452
  @type new_cdevs: list of L{objects.Disk}
1453
  @param new_cdevs: the list of children which we should add
1454
  @rtype: None
1455

1456
  """
1457
  parent_bdev = _RecursiveFindBD(parent_cdev)
1458
  if parent_bdev is None:
1459
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1460
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1461
  if new_bdevs.count(None) > 0:
1462
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1463
  parent_bdev.AddChildren(new_bdevs)
1464

    
1465

    
1466
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1467
  """Shrink a mirrored block device.
1468

1469
  @type parent_cdev: L{objects.Disk}
1470
  @param parent_cdev: the disk from which we should remove children
1471
  @type new_cdevs: list of L{objects.Disk}
1472
  @param new_cdevs: the list of children which we should remove
1473
  @rtype: None
1474

1475
  """
1476
  parent_bdev = _RecursiveFindBD(parent_cdev)
1477
  if parent_bdev is None:
1478
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1479
  devs = []
1480
  for disk in new_cdevs:
1481
    rpath = disk.StaticDevPath()
1482
    if rpath is None:
1483
      bd = _RecursiveFindBD(disk)
1484
      if bd is None:
1485
        _Fail("Can't find device %s while removing children", disk)
1486
      else:
1487
        devs.append(bd.dev_path)
1488
    else:
1489
      if not utils.IsNormAbsPath(rpath):
1490
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1491
      devs.append(rpath)
1492
  parent_bdev.RemoveChildren(devs)
1493

    
1494

    
1495
def BlockdevGetmirrorstatus(disks):
1496
  """Get the mirroring status of a list of devices.
1497

1498
  @type disks: list of L{objects.Disk}
1499
  @param disks: the list of disks which we should query
1500
  @rtype: disk
1501
  @return:
1502
      a list of (mirror_done, estimated_time) tuples, which
1503
      are the result of L{bdev.BlockDev.CombinedSyncStatus}
1504
  @raise errors.BlockDeviceError: if any of the disks cannot be
1505
      found
1506

1507
  """
1508
  stats = []
1509
  for dsk in disks:
1510
    rbd = _RecursiveFindBD(dsk)
1511
    if rbd is None:
1512
      _Fail("Can't find device %s", dsk)
1513

    
1514
    stats.append(rbd.CombinedSyncStatus())
1515

    
1516
  return stats
1517

    
1518

    
1519
def _RecursiveFindBD(disk):
1520
  """Check if a device is activated.
1521

1522
  If so, return information about the real device.
1523

1524
  @type disk: L{objects.Disk}
1525
  @param disk: the disk object we need to find
1526

1527
  @return: None if the device can't be found,
1528
      otherwise the device instance
1529

1530
  """
1531
  children = []
1532
  if disk.children:
1533
    for chdisk in disk.children:
1534
      children.append(_RecursiveFindBD(chdisk))
1535

    
1536
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1537

    
1538

    
1539
def _OpenRealBD(disk):
1540
  """Opens the underlying block device of a disk.
1541

1542
  @type disk: L{objects.Disk}
1543
  @param disk: the disk object we want to open
1544

1545
  """
1546
  real_disk = _RecursiveFindBD(disk)
1547
  if real_disk is None:
1548
    _Fail("Block device '%s' is not set up", disk)
1549

    
1550
  real_disk.Open()
1551

    
1552
  return real_disk
1553

    
1554

    
1555
def BlockdevFind(disk):
1556
  """Check if a device is activated.
1557

1558
  If it is, return information about the real device.
1559

1560
  @type disk: L{objects.Disk}
1561
  @param disk: the disk to find
1562
  @rtype: None or objects.BlockDevStatus
1563
  @return: None if the disk cannot be found, otherwise a the current
1564
           information
1565

1566
  """
1567
  try:
1568
    rbd = _RecursiveFindBD(disk)
1569
  except errors.BlockDeviceError, err:
1570
    _Fail("Failed to find device: %s", err, exc=True)
1571

    
1572
  if rbd is None:
1573
    return None
1574

    
1575
  return rbd.GetSyncStatus()
1576

    
1577

    
1578
def BlockdevGetsize(disks):
1579
  """Computes the size of the given disks.
1580

1581
  If a disk is not found, returns None instead.
1582

1583
  @type disks: list of L{objects.Disk}
1584
  @param disks: the list of disk to compute the size for
1585
  @rtype: list
1586
  @return: list with elements None if the disk cannot be found,
1587
      otherwise the size
1588

1589
  """
1590
  result = []
1591
  for cf in disks:
1592
    try:
1593
      rbd = _RecursiveFindBD(cf)
1594
    except errors.BlockDeviceError:
1595
      result.append(None)
1596
      continue
1597
    if rbd is None:
1598
      result.append(None)
1599
    else:
1600
      result.append(rbd.GetActualSize())
1601
  return result
1602

    
1603

    
1604
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1605
  """Export a block device to a remote node.
1606

1607
  @type disk: L{objects.Disk}
1608
  @param disk: the description of the disk to export
1609
  @type dest_node: str
1610
  @param dest_node: the destination node to export to
1611
  @type dest_path: str
1612
  @param dest_path: the destination path on the target node
1613
  @type cluster_name: str
1614
  @param cluster_name: the cluster name, needed for SSH hostalias
1615
  @rtype: None
1616

1617
  """
1618
  real_disk = _OpenRealBD(disk)
1619

    
1620
  # the block size on the read dd is 1MiB to match our units
1621
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1622
                               "dd if=%s bs=1048576 count=%s",
1623
                               real_disk.dev_path, str(disk.size))
1624

    
1625
  # we set here a smaller block size as, due to ssh buffering, more
1626
  # than 64-128k will mostly ignored; we use nocreat to fail if the
1627
  # device is not already there or we pass a wrong path; we use
1628
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1629
  # to not buffer too much memory; this means that at best, we flush
1630
  # every 64k, which will not be very fast
1631
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1632
                                " oflag=dsync", dest_path)
1633

    
1634
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1635
                                                   constants.GANETI_RUNAS,
1636
                                                   destcmd)
1637

    
1638
  # all commands have been checked, so we're safe to combine them
1639
  command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1640

    
1641
  result = utils.RunCmd(["bash", "-c", command])
1642

    
1643
  if result.failed:
1644
    _Fail("Disk copy command '%s' returned error: %s"
1645
          " output: %s", command, result.fail_reason, result.output)
1646

    
1647

    
1648
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1649
  """Write a file to the filesystem.
1650

1651
  This allows the master to overwrite(!) a file. It will only perform
1652
  the operation if the file belongs to a list of configuration files.
1653

1654
  @type file_name: str
1655
  @param file_name: the target file name
1656
  @type data: str
1657
  @param data: the new contents of the file
1658
  @type mode: int
1659
  @param mode: the mode to give the file (can be None)
1660
  @type uid: int
1661
  @param uid: the owner of the file (can be -1 for default)
1662
  @type gid: int
1663
  @param gid: the group of the file (can be -1 for default)
1664
  @type atime: float
1665
  @param atime: the atime to set on the file (can be None)
1666
  @type mtime: float
1667
  @param mtime: the mtime to set on the file (can be None)
1668
  @rtype: None
1669

1670
  """
1671
  if not os.path.isabs(file_name):
1672
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1673

    
1674
  if file_name not in _ALLOWED_UPLOAD_FILES:
1675
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1676
          file_name)
1677

    
1678
  raw_data = _Decompress(data)
1679

    
1680
  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1681
                  atime=atime, mtime=mtime)
1682

    
1683

    
1684
def WriteSsconfFiles(values):
1685
  """Update all ssconf files.
1686

1687
  Wrapper around the SimpleStore.WriteFiles.
1688

1689
  """
1690
  ssconf.SimpleStore().WriteFiles(values)
1691

    
1692

    
1693
def _ErrnoOrStr(err):
1694
  """Format an EnvironmentError exception.
1695

1696
  If the L{err} argument has an errno attribute, it will be looked up
1697
  and converted into a textual C{E...} description. Otherwise the
1698
  string representation of the error will be returned.
1699

1700
  @type err: L{EnvironmentError}
1701
  @param err: the exception to format
1702

1703
  """
1704
  if hasattr(err, 'errno'):
1705
    detail = errno.errorcode[err.errno]
1706
  else:
1707
    detail = str(err)
1708
  return detail
1709

    
1710

    
1711
def _OSOndiskAPIVersion(os_dir):
1712
  """Compute and return the API version of a given OS.
1713

1714
  This function will try to read the API version of the OS residing in
1715
  the 'os_dir' directory.
1716

1717
  @type os_dir: str
1718
  @param os_dir: the directory in which we should look for the OS
1719
  @rtype: tuple
1720
  @return: tuple (status, data) with status denoting the validity and
1721
      data holding either the vaid versions or an error message
1722

1723
  """
1724
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1725

    
1726
  try:
1727
    st = os.stat(api_file)
1728
  except EnvironmentError, err:
1729
    return False, ("Required file '%s' not found under path %s: %s" %
1730
                   (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1731

    
1732
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1733
    return False, ("File '%s' in %s is not a regular file" %
1734
                   (constants.OS_API_FILE, os_dir))
1735

    
1736
  try:
1737
    api_versions = utils.ReadFile(api_file).splitlines()
1738
  except EnvironmentError, err:
1739
    return False, ("Error while reading the API version file at %s: %s" %
1740
                   (api_file, _ErrnoOrStr(err)))
1741

    
1742
  try:
1743
    api_versions = [int(version.strip()) for version in api_versions]
1744
  except (TypeError, ValueError), err:
1745
    return False, ("API version(s) can't be converted to integer: %s" %
1746
                   str(err))
1747

    
1748
  return True, api_versions
1749

    
1750

    
1751
def DiagnoseOS(top_dirs=None):
1752
  """Compute the validity for all OSes.
1753

1754
  @type top_dirs: list
1755
  @param top_dirs: the list of directories in which to
1756
      search (if not given defaults to
1757
      L{constants.OS_SEARCH_PATH})
1758
  @rtype: list of L{objects.OS}
1759
  @return: a list of tuples (name, path, status, diagnose, variants,
1760
      parameters, api_version) for all (potential) OSes under all
1761
      search paths, where:
1762
          - name is the (potential) OS name
1763
          - path is the full path to the OS
1764
          - status True/False is the validity of the OS
1765
          - diagnose is the error message for an invalid OS, otherwise empty
1766
          - variants is a list of supported OS variants, if any
1767
          - parameters is a list of (name, help) parameters, if any
1768
          - api_version is a list of support OS API versions
1769

1770
  """
1771
  if top_dirs is None:
1772
    top_dirs = constants.OS_SEARCH_PATH
1773

    
1774
  result = []
1775
  for dir_name in top_dirs:
1776
    if os.path.isdir(dir_name):
1777
      try:
1778
        f_names = utils.ListVisibleFiles(dir_name)
1779
      except EnvironmentError, err:
1780
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1781
        break
1782
      for name in f_names:
1783
        os_path = utils.PathJoin(dir_name, name)
1784
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1785
        if status:
1786
          diagnose = ""
1787
          variants = os_inst.supported_variants
1788
          parameters = os_inst.supported_parameters
1789
          api_versions = os_inst.api_versions
1790
        else:
1791
          diagnose = os_inst
1792
          variants = parameters = api_versions = []
1793
        result.append((name, os_path, status, diagnose, variants,
1794
                       parameters, api_versions))
1795

    
1796
  return result
1797

    
1798

    
1799
def _TryOSFromDisk(name, base_dir=None):
1800
  """Create an OS instance from disk.
1801

1802
  This function will return an OS instance if the given name is a
1803
  valid OS name.
1804

1805
  @type base_dir: string
1806
  @keyword base_dir: Base directory containing OS installations.
1807
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1808
  @rtype: tuple
1809
  @return: success and either the OS instance if we find a valid one,
1810
      or error message
1811

1812
  """
1813
  if base_dir is None:
1814
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1815
  else:
1816
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1817

    
1818
  if os_dir is None:
1819
    return False, "Directory for OS %s not found in search path" % name
1820

    
1821
  status, api_versions = _OSOndiskAPIVersion(os_dir)
1822
  if not status:
1823
    # push the error up
1824
    return status, api_versions
1825

    
1826
  if not constants.OS_API_VERSIONS.intersection(api_versions):
1827
    return False, ("API version mismatch for path '%s': found %s, want %s." %
1828
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
1829

    
1830
  # OS Files dictionary, we will populate it with the absolute path names
1831
  os_files = dict.fromkeys(constants.OS_SCRIPTS)
1832

    
1833
  if max(api_versions) >= constants.OS_API_V15:
1834
    os_files[constants.OS_VARIANTS_FILE] = ''
1835

    
1836
  if max(api_versions) >= constants.OS_API_V20:
1837
    os_files[constants.OS_PARAMETERS_FILE] = ''
1838
  else:
1839
    del os_files[constants.OS_SCRIPT_VERIFY]
1840

    
1841
  for filename in os_files:
1842
    os_files[filename] = utils.PathJoin(os_dir, filename)
1843

    
1844
    try:
1845
      st = os.stat(os_files[filename])
1846
    except EnvironmentError, err:
1847
      return False, ("File '%s' under path '%s' is missing (%s)" %
1848
                     (filename, os_dir, _ErrnoOrStr(err)))
1849

    
1850
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1851
      return False, ("File '%s' under path '%s' is not a regular file" %
1852
                     (filename, os_dir))
1853

    
1854
    if filename in constants.OS_SCRIPTS:
1855
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1856
        return False, ("File '%s' under path '%s' is not executable" %
1857
                       (filename, os_dir))
1858

    
1859
  variants = []
1860
  if constants.OS_VARIANTS_FILE in os_files:
1861
    variants_file = os_files[constants.OS_VARIANTS_FILE]
1862
    try:
1863
      variants = utils.ReadFile(variants_file).splitlines()
1864
    except EnvironmentError, err:
1865
      return False, ("Error while reading the OS variants file at %s: %s" %
1866
                     (variants_file, _ErrnoOrStr(err)))
1867
    if not variants:
1868
      return False, ("No supported os variant found")
1869

    
1870
  parameters = []
1871
  if constants.OS_PARAMETERS_FILE in os_files:
1872
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
1873
    try:
1874
      parameters = utils.ReadFile(parameters_file).splitlines()
1875
    except EnvironmentError, err:
1876
      return False, ("Error while reading the OS parameters file at %s: %s" %
1877
                     (parameters_file, _ErrnoOrStr(err)))
1878
    parameters = [v.split(None, 1) for v in parameters]
1879

    
1880
  os_obj = objects.OS(name=name, path=os_dir,
1881
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
1882
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
1883
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
1884
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
1885
                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
1886
                                                 None),
1887
                      supported_variants=variants,
1888
                      supported_parameters=parameters,
1889
                      api_versions=api_versions)
1890
  return True, os_obj
1891

    
1892

    
1893
def OSFromDisk(name, base_dir=None):
1894
  """Create an OS instance from disk.
1895

1896
  This function will return an OS instance if the given name is a
1897
  valid OS name. Otherwise, it will raise an appropriate
1898
  L{RPCFail} exception, detailing why this is not a valid OS.
1899

1900
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1901
  an exception but returns true/false status data.
1902

1903
  @type base_dir: string
1904
  @keyword base_dir: Base directory containing OS installations.
1905
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1906
  @rtype: L{objects.OS}
1907
  @return: the OS instance if we find a valid one
1908
  @raise RPCFail: if we don't find a valid OS
1909

1910
  """
1911
  name_only = name.split("+", 1)[0]
1912
  status, payload = _TryOSFromDisk(name_only, base_dir)
1913

    
1914
  if not status:
1915
    _Fail(payload)
1916

    
1917
  return payload
1918

    
1919

    
1920
def OSCoreEnv(inst_os, os_params, debug=0):
1921
  """Calculate the basic environment for an os script.
1922

1923
  @type inst_os: L{objects.OS}
1924
  @param inst_os: operating system for which the environment is being built
1925
  @type os_params: dict
1926
  @param os_params: the OS parameters
1927
  @type debug: integer
1928
  @param debug: debug level (0 or 1, for OS Api 10)
1929
  @rtype: dict
1930
  @return: dict of environment variables
1931
  @raise errors.BlockDeviceError: if the block device
1932
      cannot be found
1933

1934
  """
1935
  result = {}
1936
  api_version = \
1937
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1938
  result['OS_API_VERSION'] = '%d' % api_version
1939
  result['OS_NAME'] = inst_os.name
1940
  result['DEBUG_LEVEL'] = '%d' % debug
1941

    
1942
  # OS variants
1943
  if api_version >= constants.OS_API_V15:
1944
    try:
1945
      variant = inst_os.name.split('+', 1)[1]
1946
    except IndexError:
1947
      variant = inst_os.supported_variants[0]
1948
    result['OS_VARIANT'] = variant
1949

    
1950
  # OS params
1951
  for pname, pvalue in os_params.items():
1952
    result['OSP_%s' % pname.upper()] = pvalue
1953

    
1954
  return result
1955

    
1956

    
1957
def OSEnvironment(instance, inst_os, debug=0):
1958
  """Calculate the environment for an os script.
1959

1960
  @type instance: L{objects.Instance}
1961
  @param instance: target instance for the os script run
1962
  @type inst_os: L{objects.OS}
1963
  @param inst_os: operating system for which the environment is being built
1964
  @type debug: integer
1965
  @param debug: debug level (0 or 1, for OS Api 10)
1966
  @rtype: dict
1967
  @return: dict of environment variables
1968
  @raise errors.BlockDeviceError: if the block device
1969
      cannot be found
1970

1971
  """
1972
  result = OSCoreEnv(inst_os, instance.osparams, debug=debug)
1973

    
1974
  result['INSTANCE_NAME'] = instance.name
1975
  result['INSTANCE_OS'] = instance.os
1976
  result['HYPERVISOR'] = instance.hypervisor
1977
  result['DISK_COUNT'] = '%d' % len(instance.disks)
1978
  result['NIC_COUNT'] = '%d' % len(instance.nics)
1979

    
1980
  # Disks
1981
  for idx, disk in enumerate(instance.disks):
1982
    real_disk = _OpenRealBD(disk)
1983
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
1984
    result['DISK_%d_ACCESS' % idx] = disk.mode
1985
    if constants.HV_DISK_TYPE in instance.hvparams:
1986
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
1987
        instance.hvparams[constants.HV_DISK_TYPE]
1988
    if disk.dev_type in constants.LDS_BLOCK:
1989
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1990
    elif disk.dev_type == constants.LD_FILE:
1991
      result['DISK_%d_BACKEND_TYPE' % idx] = \
1992
        'file:%s' % disk.physical_id[0]
1993

    
1994
  # NICs
1995
  for idx, nic in enumerate(instance.nics):
1996
    result['NIC_%d_MAC' % idx] = nic.mac
1997
    if nic.ip:
1998
      result['NIC_%d_IP' % idx] = nic.ip
1999
    result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
2000
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2001
      result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
2002
    if nic.nicparams[constants.NIC_LINK]:
2003
      result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
2004
    if constants.HV_NIC_TYPE in instance.hvparams:
2005
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
2006
        instance.hvparams[constants.HV_NIC_TYPE]
2007

    
2008
  # HV/BE params
2009
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2010
    for key, value in source.items():
2011
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2012

    
2013
  return result
2014

    
2015

    
2016
def BlockdevGrow(disk, amount):
2017
  """Grow a stack of block devices.
2018

2019
  This function is called recursively, with the childrens being the
2020
  first ones to resize.
2021

2022
  @type disk: L{objects.Disk}
2023
  @param disk: the disk to be grown
2024
  @rtype: (status, result)
2025
  @return: a tuple with the status of the operation
2026
      (True/False), and the errors message if status
2027
      is False
2028

2029
  """
2030
  r_dev = _RecursiveFindBD(disk)
2031
  if r_dev is None:
2032
    _Fail("Cannot find block device %s", disk)
2033

    
2034
  try:
2035
    r_dev.Grow(amount)
2036
  except errors.BlockDeviceError, err:
2037
    _Fail("Failed to grow block device: %s", err, exc=True)
2038

    
2039

    
2040
def BlockdevSnapshot(disk):
2041
  """Create a snapshot copy of a block device.
2042

2043
  This function is called recursively, and the snapshot is actually created
2044
  just for the leaf lvm backend device.
2045

2046
  @type disk: L{objects.Disk}
2047
  @param disk: the disk to be snapshotted
2048
  @rtype: string
2049
  @return: snapshot disk path
2050

2051
  """
2052
  if disk.dev_type == constants.LD_DRBD8:
2053
    if not disk.children:
2054
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2055
            disk.unique_id)
2056
    return BlockdevSnapshot(disk.children[0])
2057
  elif disk.dev_type == constants.LD_LV:
2058
    r_dev = _RecursiveFindBD(disk)
2059
    if r_dev is not None:
2060
      # FIXME: choose a saner value for the snapshot size
2061
      # let's stay on the safe side and ask for the full size, for now
2062
      return r_dev.Snapshot(disk.size)
2063
    else:
2064
      _Fail("Cannot find block device %s", disk)
2065
  else:
2066
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2067
          disk.unique_id, disk.dev_type)
2068

    
2069

    
2070
def FinalizeExport(instance, snap_disks):
2071
  """Write out the export configuration information.
2072

2073
  @type instance: L{objects.Instance}
2074
  @param instance: the instance which we export, used for
2075
      saving configuration
2076
  @type snap_disks: list of L{objects.Disk}
2077
  @param snap_disks: list of snapshot block devices, which
2078
      will be used to get the actual name of the dump file
2079

2080
  @rtype: None
2081

2082
  """
2083
  destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2084
  finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2085

    
2086
  config = objects.SerializableConfigParser()
2087

    
2088
  config.add_section(constants.INISECT_EXP)
2089
  config.set(constants.INISECT_EXP, 'version', '0')
2090
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2091
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2092
  config.set(constants.INISECT_EXP, 'os', instance.os)
2093
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
2094

    
2095
  config.add_section(constants.INISECT_INS)
2096
  config.set(constants.INISECT_INS, 'name', instance.name)
2097
  config.set(constants.INISECT_INS, 'memory', '%d' %
2098
             instance.beparams[constants.BE_MEMORY])
2099
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
2100
             instance.beparams[constants.BE_VCPUS])
2101
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2102
  config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2103

    
2104
  nic_total = 0
2105
  for nic_count, nic in enumerate(instance.nics):
2106
    nic_total += 1
2107
    config.set(constants.INISECT_INS, 'nic%d_mac' %
2108
               nic_count, '%s' % nic.mac)
2109
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2110
    for param in constants.NICS_PARAMETER_TYPES:
2111
      config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2112
                 '%s' % nic.nicparams.get(param, None))
2113
  # TODO: redundant: on load can read nics until it doesn't exist
2114
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2115

    
2116
  disk_total = 0
2117
  for disk_count, disk in enumerate(snap_disks):
2118
    if disk:
2119
      disk_total += 1
2120
      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2121
                 ('%s' % disk.iv_name))
2122
      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2123
                 ('%s' % disk.physical_id[1]))
2124
      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2125
                 ('%d' % disk.size))
2126

    
2127
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2128

    
2129
  # New-style hypervisor/backend parameters
2130

    
2131
  config.add_section(constants.INISECT_HYP)
2132
  for name, value in instance.hvparams.items():
2133
    if name not in constants.HVC_GLOBALS:
2134
      config.set(constants.INISECT_HYP, name, str(value))
2135

    
2136
  config.add_section(constants.INISECT_BEP)
2137
  for name, value in instance.beparams.items():
2138
    config.set(constants.INISECT_BEP, name, str(value))
2139

    
2140
  config.add_section(constants.INISECT_OSP)
2141
  for name, value in instance.osparams.items():
2142
    config.set(constants.INISECT_OSP, name, str(value))
2143

    
2144
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2145
                  data=config.Dumps())
2146
  shutil.rmtree(finaldestdir, ignore_errors=True)
2147
  shutil.move(destdir, finaldestdir)
2148

    
2149

    
2150
def ExportInfo(dest):
2151
  """Get export configuration information.
2152

2153
  @type dest: str
2154
  @param dest: directory containing the export
2155

2156
  @rtype: L{objects.SerializableConfigParser}
2157
  @return: a serializable config file containing the
2158
      export info
2159

2160
  """
2161
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2162

    
2163
  config = objects.SerializableConfigParser()
2164
  config.read(cff)
2165

    
2166
  if (not config.has_section(constants.INISECT_EXP) or
2167
      not config.has_section(constants.INISECT_INS)):
2168
    _Fail("Export info file doesn't have the required fields")
2169

    
2170
  return config.Dumps()
2171

    
2172

    
2173
def ListExports():
2174
  """Return a list of exports currently available on this machine.
2175

2176
  @rtype: list
2177
  @return: list of the exports
2178

2179
  """
2180
  if os.path.isdir(constants.EXPORT_DIR):
2181
    return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2182
  else:
2183
    _Fail("No exports directory")
2184

    
2185

    
2186
def RemoveExport(export):
2187
  """Remove an existing export from the node.
2188

2189
  @type export: str
2190
  @param export: the name of the export to remove
2191
  @rtype: None
2192

2193
  """
2194
  target = utils.PathJoin(constants.EXPORT_DIR, export)
2195

    
2196
  try:
2197
    shutil.rmtree(target)
2198
  except EnvironmentError, err:
2199
    _Fail("Error while removing the export: %s", err, exc=True)
2200

    
2201

    
2202
def BlockdevRename(devlist):
2203
  """Rename a list of block devices.
2204

2205
  @type devlist: list of tuples
2206
  @param devlist: list of tuples of the form  (disk,
2207
      new_logical_id, new_physical_id); disk is an
2208
      L{objects.Disk} object describing the current disk,
2209
      and new logical_id/physical_id is the name we
2210
      rename it to
2211
  @rtype: boolean
2212
  @return: True if all renames succeeded, False otherwise
2213

2214
  """
2215
  msgs = []
2216
  result = True
2217
  for disk, unique_id in devlist:
2218
    dev = _RecursiveFindBD(disk)
2219
    if dev is None:
2220
      msgs.append("Can't find device %s in rename" % str(disk))
2221
      result = False
2222
      continue
2223
    try:
2224
      old_rpath = dev.dev_path
2225
      dev.Rename(unique_id)
2226
      new_rpath = dev.dev_path
2227
      if old_rpath != new_rpath:
2228
        DevCacheManager.RemoveCache(old_rpath)
2229
        # FIXME: we should add the new cache information here, like:
2230
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2231
        # but we don't have the owner here - maybe parse from existing
2232
        # cache? for now, we only lose lvm data when we rename, which
2233
        # is less critical than DRBD or MD
2234
    except errors.BlockDeviceError, err:
2235
      msgs.append("Can't rename device '%s' to '%s': %s" %
2236
                  (dev, unique_id, err))
2237
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2238
      result = False
2239
  if not result:
2240
    _Fail("; ".join(msgs))
2241

    
2242

    
2243
def _TransformFileStorageDir(file_storage_dir):
2244
  """Checks whether given file_storage_dir is valid.
2245

2246
  Checks wheter the given file_storage_dir is within the cluster-wide
2247
  default file_storage_dir stored in SimpleStore. Only paths under that
2248
  directory are allowed.
2249

2250
  @type file_storage_dir: str
2251
  @param file_storage_dir: the path to check
2252

2253
  @return: the normalized path if valid, None otherwise
2254

2255
  """
2256
  if not constants.ENABLE_FILE_STORAGE:
2257
    _Fail("File storage disabled at configure time")
2258
  cfg = _GetConfig()
2259
  file_storage_dir = os.path.normpath(file_storage_dir)
2260
  base_file_storage_dir = cfg.GetFileStorageDir()
2261
  if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2262
      base_file_storage_dir):
2263
    _Fail("File storage directory '%s' is not under base file"
2264
          " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2265
  return file_storage_dir
2266

    
2267

    
2268
def CreateFileStorageDir(file_storage_dir):
2269
  """Create file storage directory.
2270

2271
  @type file_storage_dir: str
2272
  @param file_storage_dir: directory to create
2273

2274
  @rtype: tuple
2275
  @return: tuple with first element a boolean indicating wheter dir
2276
      creation was successful or not
2277

2278
  """
2279
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2280
  if os.path.exists(file_storage_dir):
2281
    if not os.path.isdir(file_storage_dir):
2282
      _Fail("Specified storage dir '%s' is not a directory",
2283
            file_storage_dir)
2284
  else:
2285
    try:
2286
      os.makedirs(file_storage_dir, 0750)
2287
    except OSError, err:
2288
      _Fail("Cannot create file storage directory '%s': %s",
2289
            file_storage_dir, err, exc=True)
2290

    
2291

    
2292
def RemoveFileStorageDir(file_storage_dir):
2293
  """Remove file storage directory.
2294

2295
  Remove it only if it's empty. If not log an error and return.
2296

2297
  @type file_storage_dir: str
2298
  @param file_storage_dir: the directory we should cleanup
2299
  @rtype: tuple (success,)
2300
  @return: tuple of one element, C{success}, denoting
2301
      whether the operation was successful
2302

2303
  """
2304
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2305
  if os.path.exists(file_storage_dir):
2306
    if not os.path.isdir(file_storage_dir):
2307
      _Fail("Specified Storage directory '%s' is not a directory",
2308
            file_storage_dir)
2309
    # deletes dir only if empty, otherwise we want to fail the rpc call
2310
    try:
2311
      os.rmdir(file_storage_dir)
2312
    except OSError, err:
2313
      _Fail("Cannot remove file storage directory '%s': %s",
2314
            file_storage_dir, err)
2315

    
2316

    
2317
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2318
  """Rename the file storage directory.
2319

2320
  @type old_file_storage_dir: str
2321
  @param old_file_storage_dir: the current path
2322
  @type new_file_storage_dir: str
2323
  @param new_file_storage_dir: the name we should rename to
2324
  @rtype: tuple (success,)
2325
  @return: tuple of one element, C{success}, denoting
2326
      whether the operation was successful
2327

2328
  """
2329
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2330
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2331
  if not os.path.exists(new_file_storage_dir):
2332
    if os.path.isdir(old_file_storage_dir):
2333
      try:
2334
        os.rename(old_file_storage_dir, new_file_storage_dir)
2335
      except OSError, err:
2336
        _Fail("Cannot rename '%s' to '%s': %s",
2337
              old_file_storage_dir, new_file_storage_dir, err)
2338
    else:
2339
      _Fail("Specified storage dir '%s' is not a directory",
2340
            old_file_storage_dir)
2341
  else:
2342
    if os.path.exists(old_file_storage_dir):
2343
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2344
            old_file_storage_dir, new_file_storage_dir)
2345

    
2346

    
2347
def _EnsureJobQueueFile(file_name):
2348
  """Checks whether the given filename is in the queue directory.
2349

2350
  @type file_name: str
2351
  @param file_name: the file name we should check
2352
  @rtype: None
2353
  @raises RPCFail: if the file is not valid
2354

2355
  """
2356
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2357
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2358

    
2359
  if not result:
2360
    _Fail("Passed job queue file '%s' does not belong to"
2361
          " the queue directory '%s'", file_name, queue_dir)
2362

    
2363

    
2364
def JobQueueUpdate(file_name, content):
2365
  """Updates a file in the queue directory.
2366

2367
  This is just a wrapper over L{utils.WriteFile}, with proper
2368
  checking.
2369

2370
  @type file_name: str
2371
  @param file_name: the job file name
2372
  @type content: str
2373
  @param content: the new job contents
2374
  @rtype: boolean
2375
  @return: the success of the operation
2376

2377
  """
2378
  _EnsureJobQueueFile(file_name)
2379

    
2380
  # Write and replace the file atomically
2381
  utils.WriteFile(file_name, data=_Decompress(content))
2382

    
2383

    
2384
def JobQueueRename(old, new):
2385
  """Renames a job queue file.
2386

2387
  This is just a wrapper over os.rename with proper checking.
2388

2389
  @type old: str
2390
  @param old: the old (actual) file name
2391
  @type new: str
2392
  @param new: the desired file name
2393
  @rtype: tuple
2394
  @return: the success of the operation and payload
2395

2396
  """
2397
  _EnsureJobQueueFile(old)
2398
  _EnsureJobQueueFile(new)
2399

    
2400
  utils.RenameFile(old, new, mkdir=True)
2401

    
2402

    
2403
def BlockdevClose(instance_name, disks):
2404
  """Closes the given block devices.
2405

2406
  This means they will be switched to secondary mode (in case of
2407
  DRBD).
2408

2409
  @param instance_name: if the argument is not empty, the symlinks
2410
      of this instance will be removed
2411
  @type disks: list of L{objects.Disk}
2412
  @param disks: the list of disks to be closed
2413
  @rtype: tuple (success, message)
2414
  @return: a tuple of success and message, where success
2415
      indicates the succes of the operation, and message
2416
      which will contain the error details in case we
2417
      failed
2418

2419
  """
2420
  bdevs = []
2421
  for cf in disks:
2422
    rd = _RecursiveFindBD(cf)
2423
    if rd is None:
2424
      _Fail("Can't find device %s", cf)
2425
    bdevs.append(rd)
2426

    
2427
  msg = []
2428
  for rd in bdevs:
2429
    try:
2430
      rd.Close()
2431
    except errors.BlockDeviceError, err:
2432
      msg.append(str(err))
2433
  if msg:
2434
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2435
  else:
2436
    if instance_name:
2437
      _RemoveBlockDevLinks(instance_name, disks)
2438

    
2439

    
2440
def ValidateHVParams(hvname, hvparams):
2441
  """Validates the given hypervisor parameters.
2442

2443
  @type hvname: string
2444
  @param hvname: the hypervisor name
2445
  @type hvparams: dict
2446
  @param hvparams: the hypervisor parameters to be validated
2447
  @rtype: None
2448

2449
  """
2450
  try:
2451
    hv_type = hypervisor.GetHypervisor(hvname)
2452
    hv_type.ValidateParameters(hvparams)
2453
  except errors.HypervisorError, err:
2454
    _Fail(str(err), log=False)
2455

    
2456

    
2457
def _CheckOSPList(os_obj, parameters):
2458
  """Check whether a list of parameters is supported by the OS.
2459

2460
  @type os_obj: L{objects.OS}
2461
  @param os_obj: OS object to check
2462
  @type parameters: list
2463
  @param parameters: the list of parameters to check
2464

2465
  """
2466
  supported = [v[0] for v in os_obj.supported_parameters]
2467
  delta = frozenset(parameters).difference(supported)
2468
  if delta:
2469
    _Fail("The following parameters are not supported"
2470
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2471

    
2472

    
2473
def ValidateOS(required, osname, checks, osparams):
2474
  """Validate the given OS' parameters.
2475

2476
  @type required: boolean
2477
  @param required: whether absence of the OS should translate into
2478
      failure or not
2479
  @type osname: string
2480
  @param osname: the OS to be validated
2481
  @type checks: list
2482
  @param checks: list of the checks to run (currently only 'parameters')
2483
  @type osparams: dict
2484
  @param osparams: dictionary with OS parameters
2485
  @rtype: boolean
2486
  @return: True if the validation passed, or False if the OS was not
2487
      found and L{required} was false
2488

2489
  """
2490
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2491
    _Fail("Unknown checks required for OS %s: %s", osname,
2492
          set(checks).difference(constants.OS_VALIDATE_CALLS))
2493

    
2494
  name_only = osname.split("+", 1)[0]
2495
  status, tbv = _TryOSFromDisk(name_only, None)
2496

    
2497
  if not status:
2498
    if required:
2499
      _Fail(tbv)
2500
    else:
2501
      return False
2502

    
2503
  if max(tbv.api_versions) < constants.OS_API_V20:
2504
    return True
2505

    
2506
  if constants.OS_VALIDATE_PARAMETERS in checks:
2507
    _CheckOSPList(tbv, osparams.keys())
2508

    
2509
  validate_env = OSCoreEnv(tbv, osparams)
2510
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2511
                        cwd=tbv.path)
2512
  if result.failed:
2513
    logging.error("os validate command '%s' returned error: %s output: %s",
2514
                  result.cmd, result.fail_reason, result.output)
2515
    _Fail("OS validation script failed (%s), output: %s",
2516
          result.fail_reason, result.output, log=False)
2517

    
2518
  return True
2519

    
2520

    
2521
def DemoteFromMC():
2522
  """Demotes the current node from master candidate role.
2523

2524
  """
2525
  # try to ensure we're not the master by mistake
2526
  master, myself = ssconf.GetMasterAndMyself()
2527
  if master == myself:
2528
    _Fail("ssconf status shows I'm the master node, will not demote")
2529

    
2530
  result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2531
  if not result.failed:
2532
    _Fail("The master daemon is running, will not demote")
2533

    
2534
  try:
2535
    if os.path.isfile(constants.CLUSTER_CONF_FILE):
2536
      utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2537
  except EnvironmentError, err:
2538
    if err.errno != errno.ENOENT:
2539
      _Fail("Error while backing up cluster file: %s", err, exc=True)
2540

    
2541
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2542

    
2543

    
2544
def _GetX509Filenames(cryptodir, name):
2545
  """Returns the full paths for the private key and certificate.
2546

2547
  """
2548
  return (utils.PathJoin(cryptodir, name),
2549
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2550
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2551

    
2552

    
2553
def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2554
  """Creates a new X509 certificate for SSL/TLS.
2555

2556
  @type validity: int
2557
  @param validity: Validity in seconds
2558
  @rtype: tuple; (string, string)
2559
  @return: Certificate name and public part
2560

2561
  """
2562
  (key_pem, cert_pem) = \
2563
    utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2564
                                     min(validity, _MAX_SSL_CERT_VALIDITY))
2565

    
2566
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
2567
                              prefix="x509-%s-" % utils.TimestampForFilename())
2568
  try:
2569
    name = os.path.basename(cert_dir)
2570
    assert len(name) > 5
2571

    
2572
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2573

    
2574
    utils.WriteFile(key_file, mode=0400, data=key_pem)
2575
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2576

    
2577
    # Never return private key as it shouldn't leave the node
2578
    return (name, cert_pem)
2579
  except Exception:
2580
    shutil.rmtree(cert_dir, ignore_errors=True)
2581
    raise
2582

    
2583

    
2584
def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2585
  """Removes a X509 certificate.
2586

2587
  @type name: string
2588
  @param name: Certificate name
2589

2590
  """
2591
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2592

    
2593
  utils.RemoveFile(key_file)
2594
  utils.RemoveFile(cert_file)
2595

    
2596
  try:
2597
    os.rmdir(cert_dir)
2598
  except EnvironmentError, err:
2599
    _Fail("Cannot remove certificate directory '%s': %s",
2600
          cert_dir, err)
2601

    
2602

    
2603
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2604
  """Returns the command for the requested input/output.
2605

2606
  @type instance: L{objects.Instance}
2607
  @param instance: The instance object
2608
  @param mode: Import/export mode
2609
  @param ieio: Input/output type
2610
  @param ieargs: Input/output arguments
2611

2612
  """
2613
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2614

    
2615
  env = None
2616
  prefix = None
2617
  suffix = None
2618
  exp_size = None
2619

    
2620
  if ieio == constants.IEIO_FILE:
2621
    (filename, ) = ieargs
2622

    
2623
    if not utils.IsNormAbsPath(filename):
2624
      _Fail("Path '%s' is not normalized or absolute", filename)
2625

    
2626
    directory = os.path.normpath(os.path.dirname(filename))
2627

    
2628
    if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2629
        constants.EXPORT_DIR):
2630
      _Fail("File '%s' is not under exports directory '%s'",
2631
            filename, constants.EXPORT_DIR)
2632

    
2633
    # Create directory
2634
    utils.Makedirs(directory, mode=0750)
2635

    
2636
    quoted_filename = utils.ShellQuote(filename)
2637

    
2638
    if mode == constants.IEM_IMPORT:
2639
      suffix = "> %s" % quoted_filename
2640
    elif mode == constants.IEM_EXPORT:
2641
      suffix = "< %s" % quoted_filename
2642

    
2643
      # Retrieve file size
2644
      try:
2645
        st = os.stat(filename)
2646
      except EnvironmentError, err:
2647
        logging.error("Can't stat(2) %s: %s", filename, err)
2648
      else:
2649
        exp_size = utils.BytesToMebibyte(st.st_size)
2650

    
2651
  elif ieio == constants.IEIO_RAW_DISK:
2652
    (disk, ) = ieargs
2653

    
2654
    real_disk = _OpenRealBD(disk)
2655

    
2656
    if mode == constants.IEM_IMPORT:
2657
      # we set here a smaller block size as, due to transport buffering, more
2658
      # than 64-128k will mostly ignored; we use nocreat to fail if the device
2659
      # is not already there or we pass a wrong path; we use notrunc to no
2660
      # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2661
      # much memory; this means that at best, we flush every 64k, which will
2662
      # not be very fast
2663
      suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2664
                                    " bs=%s oflag=dsync"),
2665
                                    real_disk.dev_path,
2666
                                    str(64 * 1024))
2667

    
2668
    elif mode == constants.IEM_EXPORT:
2669
      # the block size on the read dd is 1MiB to match our units
2670
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2671
                                   real_disk.dev_path,
2672
                                   str(1024 * 1024), # 1 MB
2673
                                   str(disk.size))
2674
      exp_size = disk.size
2675

    
2676
  elif ieio == constants.IEIO_SCRIPT:
2677
    (disk, disk_index, ) = ieargs
2678

    
2679
    assert isinstance(disk_index, (int, long))
2680

    
2681
    real_disk = _OpenRealBD(disk)
2682

    
2683
    inst_os = OSFromDisk(instance.os)
2684
    env = OSEnvironment(instance, inst_os)
2685

    
2686
    if mode == constants.IEM_IMPORT:
2687
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2688
      env["IMPORT_INDEX"] = str(disk_index)
2689
      script = inst_os.import_script
2690

    
2691
    elif mode == constants.IEM_EXPORT:
2692
      env["EXPORT_DEVICE"] = real_disk.dev_path
2693
      env["EXPORT_INDEX"] = str(disk_index)
2694
      script = inst_os.export_script
2695

    
2696
    # TODO: Pass special environment only to script
2697
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2698

    
2699
    if mode == constants.IEM_IMPORT:
2700
      suffix = "| %s" % script_cmd
2701

    
2702
    elif mode == constants.IEM_EXPORT:
2703
      prefix = "%s |" % script_cmd
2704

    
2705
    # Let script predict size
2706
    exp_size = constants.IE_CUSTOM_SIZE
2707

    
2708
  else:
2709
    _Fail("Invalid %s I/O mode %r", mode, ieio)
2710

    
2711
  return (env, prefix, suffix, exp_size)
2712

    
2713

    
2714
def _CreateImportExportStatusDir(prefix):
2715
  """Creates status directory for import/export.
2716

2717
  """
2718
  return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2719
                          prefix=("%s-%s-" %
2720
                                  (prefix, utils.TimestampForFilename())))
2721

    
2722

    
2723
def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
2724
  """Starts an import or export daemon.
2725

2726
  @param mode: Import/output mode
2727
  @type opts: L{objects.ImportExportOptions}
2728
  @param opts: Daemon options
2729
  @type host: string
2730
  @param host: Remote host for export (None for import)
2731
  @type port: int
2732
  @param port: Remote port for export (None for import)
2733
  @type instance: L{objects.Instance}
2734
  @param instance: Instance object
2735
  @param ieio: Input/output type
2736
  @param ieioargs: Input/output arguments
2737

2738
  """
2739
  if mode == constants.IEM_IMPORT:
2740
    prefix = "import"
2741

    
2742
    if not (host is None and port is None):
2743
      _Fail("Can not specify host or port on import")
2744

    
2745
  elif mode == constants.IEM_EXPORT:
2746
    prefix = "export"
2747

    
2748
    if host is None or port is None:
2749
      _Fail("Host and port must be specified for an export")
2750

    
2751
  else:
2752
    _Fail("Invalid mode %r", mode)
2753

    
2754
  if (opts.key_name is None) ^ (opts.ca_pem is None):
2755
    _Fail("Cluster certificate can only be used for both key and CA")
2756

    
2757
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
2758
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
2759

    
2760
  if opts.key_name is None:
2761
    # Use server.pem
2762
    key_path = constants.NODED_CERT_FILE
2763
    cert_path = constants.NODED_CERT_FILE
2764
    assert opts.ca_pem is None
2765
  else:
2766
    (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
2767
                                                 opts.key_name)
2768
    assert opts.ca_pem is not None
2769

    
2770
  for i in [key_path, cert_path]:
2771
    if not os.path.exists(i):
2772
      _Fail("File '%s' does not exist" % i)
2773

    
2774
  status_dir = _CreateImportExportStatusDir(prefix)
2775
  try:
2776
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
2777
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
2778
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
2779

    
2780
    if opts.ca_pem is None:
2781
      # Use server.pem
2782
      ca = utils.ReadFile(constants.NODED_CERT_FILE)
2783
    else:
2784
      ca = opts.ca_pem
2785

    
2786
    # Write CA file
2787
    utils.WriteFile(ca_file, data=ca, mode=0400)
2788

    
2789
    cmd = [
2790
      constants.IMPORT_EXPORT_DAEMON,
2791
      status_file, mode,
2792
      "--key=%s" % key_path,
2793
      "--cert=%s" % cert_path,
2794
      "--ca=%s" % ca_file,
2795
      ]
2796

    
2797
    if host:
2798
      cmd.append("--host=%s" % host)
2799

    
2800
    if port:
2801
      cmd.append("--port=%s" % port)
2802

    
2803
    if opts.compress:
2804
      cmd.append("--compress=%s" % opts.compress)
2805

    
2806
    if opts.magic:
2807
      cmd.append("--magic=%s" % opts.magic)
2808

    
2809
    if exp_size is not None:
2810
      cmd.append("--expected-size=%s" % exp_size)
2811

    
2812
    if cmd_prefix:
2813
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
2814

    
2815
    if cmd_suffix:
2816
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
2817

    
2818
    logfile = _InstanceLogName(prefix, instance.os, instance.name)
2819

    
2820
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
2821
    # support for receiving a file descriptor for output
2822
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
2823
                      output=logfile)
2824

    
2825
    # The import/export name is simply the status directory name
2826
    return os.path.basename(status_dir)
2827

    
2828
  except Exception:
2829
    shutil.rmtree(status_dir, ignore_errors=True)
2830
    raise
2831

    
2832

    
2833
def GetImportExportStatus(names):
2834
  """Returns import/export daemon status.
2835

2836
  @type names: sequence
2837
  @param names: List of names
2838
  @rtype: List of dicts
2839
  @return: Returns a list of the state of each named import/export or None if a
2840
           status couldn't be read
2841

2842
  """
2843
  result = []
2844

    
2845
  for name in names:
2846
    status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
2847
                                 _IES_STATUS_FILE)
2848

    
2849
    try:
2850
      data = utils.ReadFile(status_file)
2851
    except EnvironmentError, err:
2852
      if err.errno != errno.ENOENT:
2853
        raise
2854
      data = None
2855

    
2856
    if not data:
2857
      result.append(None)
2858
      continue
2859

    
2860
    result.append(serializer.LoadJson(data))
2861

    
2862
  return result
2863

    
2864

    
2865
def AbortImportExport(name):
2866
  """Sends SIGTERM to a running import/export daemon.
2867

2868
  """
2869
  logging.info("Abort import/export %s", name)
2870

    
2871
  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2872
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2873

    
2874
  if pid:
2875
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
2876
                 name, pid)
2877
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
2878

    
2879

    
2880
def CleanupImportExport(name):
2881
  """Cleanup after an import or export.
2882

2883
  If the import/export daemon is still running it's killed. Afterwards the
2884
  whole status directory is removed.
2885

2886
  """
2887
  logging.info("Finalizing import/export %s", name)
2888

    
2889
  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2890

    
2891
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2892

    
2893
  if pid:
2894
    logging.info("Import/export %s is still running with PID %s",
2895
                 name, pid)
2896
    utils.KillProcess(pid, waitpid=False)
2897

    
2898
  shutil.rmtree(status_dir, ignore_errors=True)
2899

    
2900

    
2901
def _FindDisks(nodes_ip, disks):
2902
  """Sets the physical ID on disks and returns the block devices.
2903

2904
  """
2905
  # set the correct physical ID
2906
  my_name = netutils.Hostname.GetSysName()
2907
  for cf in disks:
2908
    cf.SetPhysicalID(my_name, nodes_ip)
2909

    
2910
  bdevs = []
2911

    
2912
  for cf in disks:
2913
    rd = _RecursiveFindBD(cf)
2914
    if rd is None:
2915
      _Fail("Can't find device %s", cf)
2916
    bdevs.append(rd)
2917
  return bdevs
2918

    
2919

    
2920
def DrbdDisconnectNet(nodes_ip, disks):
2921
  """Disconnects the network on a list of drbd devices.
2922

2923
  """
2924
  bdevs = _FindDisks(nodes_ip, disks)
2925

    
2926
  # disconnect disks
2927
  for rd in bdevs:
2928
    try:
2929
      rd.DisconnectNet()
2930
    except errors.BlockDeviceError, err:
2931
      _Fail("Can't change network configuration to standalone mode: %s",
2932
            err, exc=True)
2933

    
2934

    
2935
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2936
  """Attaches the network on a list of drbd devices.
2937

2938
  """
2939
  bdevs = _FindDisks(nodes_ip, disks)
2940

    
2941
  if multimaster:
2942
    for idx, rd in enumerate(bdevs):
2943
      try:
2944
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2945
      except EnvironmentError, err:
2946
        _Fail("Can't create symlink: %s", err)
2947
  # reconnect disks, switch to new master configuration and if
2948
  # needed primary mode
2949
  for rd in bdevs:
2950
    try:
2951
      rd.AttachNet(multimaster)
2952
    except errors.BlockDeviceError, err:
2953
      _Fail("Can't change network configuration: %s", err)
2954

    
2955
  # wait until the disks are connected; we need to retry the re-attach
2956
  # if the device becomes standalone, as this might happen if the one
2957
  # node disconnects and reconnects in a different mode before the
2958
  # other node reconnects; in this case, one or both of the nodes will
2959
  # decide it has wrong configuration and switch to standalone
2960

    
2961
  def _Attach():
2962
    all_connected = True
2963

    
2964
    for rd in bdevs:
2965
      stats = rd.GetProcStatus()
2966

    
2967
      all_connected = (all_connected and
2968
                       (stats.is_connected or stats.is_in_resync))
2969

    
2970
      if stats.is_standalone:
2971
        # peer had different config info and this node became
2972
        # standalone, even though this should not happen with the
2973
        # new staged way of changing disk configs
2974
        try:
2975
          rd.AttachNet(multimaster)
2976
        except errors.BlockDeviceError, err:
2977
          _Fail("Can't change network configuration: %s", err)
2978

    
2979
    if not all_connected:
2980
      raise utils.RetryAgain()
2981

    
2982
  try:
2983
    # Start with a delay of 100 miliseconds and go up to 5 seconds
2984
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
2985
  except utils.RetryTimeout:
2986
    _Fail("Timeout in disk reconnecting")
2987

    
2988
  if multimaster:
2989
    # change to primary mode
2990
    for rd in bdevs:
2991
      try:
2992
        rd.Open()
2993
      except errors.BlockDeviceError, err:
2994
        _Fail("Can't change to primary mode: %s", err)
2995

    
2996

    
2997
def DrbdWaitSync(nodes_ip, disks):
2998
  """Wait until DRBDs have synchronized.
2999

3000
  """
3001
  def _helper(rd):
3002
    stats = rd.GetProcStatus()
3003
    if not (stats.is_connected or stats.is_in_resync):
3004
      raise utils.RetryAgain()
3005
    return stats
3006

    
3007
  bdevs = _FindDisks(nodes_ip, disks)
3008

    
3009
  min_resync = 100
3010
  alldone = True
3011
  for rd in bdevs:
3012
    try:
3013
      # poll each second for 15 seconds
3014
      stats = utils.Retry(_helper, 1, 15, args=[rd])
3015
    except utils.RetryTimeout:
3016
      stats = rd.GetProcStatus()
3017
      # last check
3018
      if not (stats.is_connected or stats.is_in_resync):
3019
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3020
    alldone = alldone and (not stats.is_in_resync)
3021
    if stats.sync_percent is not None:
3022
      min_resync = min(min_resync, stats.sync_percent)
3023

    
3024
  return (alldone, min_resync)
3025

    
3026

    
3027
def GetDrbdUsermodeHelper():
3028
  """Returns DRBD usermode helper currently configured.
3029

3030
  """
3031
  try:
3032
    return bdev.BaseDRBD.GetUsermodeHelper()
3033
  except errors.BlockDeviceError, err:
3034
    _Fail(str(err))
3035

    
3036

    
3037
def PowercycleNode(hypervisor_type):
3038
  """Hard-powercycle the node.
3039

3040
  Because we need to return first, and schedule the powercycle in the
3041
  background, we won't be able to report failures nicely.
3042

3043
  """
3044
  hyper = hypervisor.GetHypervisor(hypervisor_type)
3045
  try:
3046
    pid = os.fork()
3047
  except OSError:
3048
    # if we can't fork, we'll pretend that we're in the child process
3049
    pid = 0
3050
  if pid > 0:
3051
    return "Reboot scheduled in 5 seconds"
3052
  # ensure the child is running on ram
3053
  try:
3054
    utils.Mlockall()
3055
  except Exception: # pylint: disable-msg=W0703
3056
    pass
3057
  time.sleep(5)
3058
  hyper.PowercycleNode()
3059

    
3060

    
3061
class HooksRunner(object):
3062
  """Hook runner.
3063

3064
  This class is instantiated on the node side (ganeti-noded) and not
3065
  on the master side.
3066

3067
  """
3068
  def __init__(self, hooks_base_dir=None):
3069
    """Constructor for hooks runner.
3070

3071
    @type hooks_base_dir: str or None
3072
    @param hooks_base_dir: if not None, this overrides the
3073
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
3074

3075
    """
3076
    if hooks_base_dir is None:
3077
      hooks_base_dir = constants.HOOKS_BASE_DIR
3078
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
3079
    # constant
3080
    self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
3081

    
3082
  def RunHooks(self, hpath, phase, env):
3083
    """Run the scripts in the hooks directory.
3084

3085
    @type hpath: str
3086
    @param hpath: the path to the hooks directory which
3087
        holds the scripts
3088
    @type phase: str
3089
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
3090
        L{constants.HOOKS_PHASE_POST}
3091
    @type env: dict
3092
    @param env: dictionary with the environment for the hook
3093
    @rtype: list
3094
    @return: list of 3-element tuples:
3095
      - script path
3096
      - script result, either L{constants.HKR_SUCCESS} or
3097
        L{constants.HKR_FAIL}
3098
      - output of the script
3099

3100
    @raise errors.ProgrammerError: for invalid input
3101
        parameters
3102

3103
    """
3104
    if phase == constants.HOOKS_PHASE_PRE:
3105
      suffix = "pre"
3106
    elif phase == constants.HOOKS_PHASE_POST:
3107
      suffix = "post"
3108
    else:
3109
      _Fail("Unknown hooks phase '%s'", phase)
3110

    
3111

    
3112
    subdir = "%s-%s.d" % (hpath, suffix)
3113
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3114

    
3115
    results = []
3116

    
3117
    if not os.path.isdir(dir_name):
3118
      # for non-existing/non-dirs, we simply exit instead of logging a
3119
      # warning at every operation
3120
      return results
3121

    
3122
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3123

    
3124
    for (relname, relstatus, runresult)  in runparts_results:
3125
      if relstatus == constants.RUNPARTS_SKIP:
3126
        rrval = constants.HKR_SKIP
3127
        output = ""
3128
      elif relstatus == constants.RUNPARTS_ERR:
3129
        rrval = constants.HKR_FAIL
3130
        output = "Hook script execution error: %s" % runresult
3131
      elif relstatus == constants.RUNPARTS_RUN:
3132
        if runresult.failed:
3133
          rrval = constants.HKR_FAIL
3134
        else:
3135
          rrval = constants.HKR_SUCCESS
3136
        output = utils.SafeEncode(runresult.output.strip())
3137
      results.append(("%s/%s" % (subdir, relname), rrval, output))
3138

    
3139
    return results
3140

    
3141

    
3142
class IAllocatorRunner(object):
3143
  """IAllocator runner.
3144

3145
  This class is instantiated on the node side (ganeti-noded) and not on
3146
  the master side.
3147

3148
  """
3149
  @staticmethod
3150
  def Run(name, idata):
3151
    """Run an iallocator script.
3152

3153
    @type name: str
3154
    @param name: the iallocator script name
3155
    @type idata: str
3156
    @param idata: the allocator input data
3157

3158
    @rtype: tuple
3159
    @return: two element tuple of:
3160
       - status
3161
       - either error message or stdout of allocator (for success)
3162

3163
    """
3164
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3165
                                  os.path.isfile)
3166
    if alloc_script is None:
3167
      _Fail("iallocator module '%s' not found in the search path", name)
3168

    
3169
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3170
    try:
3171
      os.write(fd, idata)
3172
      os.close(fd)
3173
      result = utils.RunCmd([alloc_script, fin_name])
3174
      if result.failed:
3175
        _Fail("iallocator module '%s' failed: %s, output '%s'",
3176
              name, result.fail_reason, result.output)
3177
    finally:
3178
      os.unlink(fin_name)
3179

    
3180
    return result.stdout
3181

    
3182

    
3183
class DevCacheManager(object):
3184
  """Simple class for managing a cache of block device information.
3185

3186
  """
3187
  _DEV_PREFIX = "/dev/"
3188
  _ROOT_DIR = constants.BDEV_CACHE_DIR
3189

    
3190
  @classmethod
3191
  def _ConvertPath(cls, dev_path):
3192
    """Converts a /dev/name path to the cache file name.
3193

3194
    This replaces slashes with underscores and strips the /dev
3195
    prefix. It then returns the full path to the cache file.
3196

3197
    @type dev_path: str
3198
    @param dev_path: the C{/dev/} path name
3199
    @rtype: str
3200
    @return: the converted path name
3201

3202
    """
3203
    if dev_path.startswith(cls._DEV_PREFIX):
3204
      dev_path = dev_path[len(cls._DEV_PREFIX):]
3205
    dev_path = dev_path.replace("/", "_")
3206
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3207
    return fpath
3208

    
3209
  @classmethod
3210
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3211
    """Updates the cache information for a given device.
3212

3213
    @type dev_path: str
3214
    @param dev_path: the pathname of the device
3215
    @type owner: str
3216
    @param owner: the owner (instance name) of the device
3217
    @type on_primary: bool
3218
    @param on_primary: whether this is the primary
3219
        node nor not
3220
    @type iv_name: str
3221
    @param iv_name: the instance-visible name of the
3222
        device, as in objects.Disk.iv_name
3223

3224
    @rtype: None
3225

3226
    """
3227
    if dev_path is None:
3228
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
3229
      return
3230
    fpath = cls._ConvertPath(dev_path)
3231
    if on_primary:
3232
      state = "primary"
3233
    else:
3234
      state = "secondary"
3235
    if iv_name is None:
3236
      iv_name = "not_visible"
3237
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3238
    try:
3239
      utils.WriteFile(fpath, data=fdata)
3240
    except EnvironmentError, err:
3241
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3242

    
3243
  @classmethod
3244
  def RemoveCache(cls, dev_path):
3245
    """Remove data for a dev_path.
3246

3247
    This is just a wrapper over L{utils.RemoveFile} with a converted
3248
    path name and logging.
3249

3250
    @type dev_path: str
3251
    @param dev_path: the pathname of the device
3252

3253
    @rtype: None
3254

3255
    """
3256
    if dev_path is None:
3257
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
3258
      return
3259
    fpath = cls._ConvertPath(dev_path)
3260
    try:
3261
      utils.RemoveFile(fpath)
3262
    except EnvironmentError, err:
3263
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)