Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ acd9ff9e

History | View | Annotate | Download (97.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable-msg=E1103
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37

    
38
import os
39
import os.path
40
import shutil
41
import time
42
import stat
43
import errno
44
import re
45
import random
46
import logging
47
import tempfile
48
import zlib
49
import base64
50
import signal
51

    
52
from ganeti import errors
53
from ganeti import utils
54
from ganeti import ssh
55
from ganeti import hypervisor
56
from ganeti import constants
57
from ganeti import bdev
58
from ganeti import objects
59
from ganeti import ssconf
60
from ganeti import serializer
61

    
62

    
63
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
64
_ALLOWED_CLEAN_DIRS = frozenset([
65
  constants.DATA_DIR,
66
  constants.JOB_QUEUE_ARCHIVE_DIR,
67
  constants.QUEUE_DIR,
68
  constants.CRYPTO_KEYS_DIR,
69
  ])
70
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
71
_X509_KEY_FILE = "key"
72
_X509_CERT_FILE = "cert"
73
_IES_STATUS_FILE = "status"
74
_IES_PID_FILE = "pid"
75
_IES_CA_FILE = "ca"
76

    
77

    
78
class RPCFail(Exception):
79
  """Class denoting RPC failure.
80

81
  Its argument is the error message.
82

83
  """
84

    
85

    
86
def _Fail(msg, *args, **kwargs):
87
  """Log an error and the raise an RPCFail exception.
88

89
  This exception is then handled specially in the ganeti daemon and
90
  turned into a 'failed' return type. As such, this function is a
91
  useful shortcut for logging the error and returning it to the master
92
  daemon.
93

94
  @type msg: string
95
  @param msg: the text of the exception
96
  @raise RPCFail
97

98
  """
99
  if args:
100
    msg = msg % args
101
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
102
    if "exc" in kwargs and kwargs["exc"]:
103
      logging.exception(msg)
104
    else:
105
      logging.error(msg)
106
  raise RPCFail(msg)
107

    
108

    
109
def _GetConfig():
110
  """Simple wrapper to return a SimpleStore.
111

112
  @rtype: L{ssconf.SimpleStore}
113
  @return: a SimpleStore instance
114

115
  """
116
  return ssconf.SimpleStore()
117

    
118

    
119
def _GetSshRunner(cluster_name):
120
  """Simple wrapper to return an SshRunner.
121

122
  @type cluster_name: str
123
  @param cluster_name: the cluster name, which is needed
124
      by the SshRunner constructor
125
  @rtype: L{ssh.SshRunner}
126
  @return: an SshRunner instance
127

128
  """
129
  return ssh.SshRunner(cluster_name)
130

    
131

    
132
def _Decompress(data):
133
  """Unpacks data compressed by the RPC client.
134

135
  @type data: list or tuple
136
  @param data: Data sent by RPC client
137
  @rtype: str
138
  @return: Decompressed data
139

140
  """
141
  assert isinstance(data, (list, tuple))
142
  assert len(data) == 2
143
  (encoding, content) = data
144
  if encoding == constants.RPC_ENCODING_NONE:
145
    return content
146
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
147
    return zlib.decompress(base64.b64decode(content))
148
  else:
149
    raise AssertionError("Unknown data encoding")
150

    
151

    
152
def _CleanDirectory(path, exclude=None):
153
  """Removes all regular files in a directory.
154

155
  @type path: str
156
  @param path: the directory to clean
157
  @type exclude: list
158
  @param exclude: list of files to be excluded, defaults
159
      to the empty list
160

161
  """
162
  if path not in _ALLOWED_CLEAN_DIRS:
163
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
164
          path)
165

    
166
  if not os.path.isdir(path):
167
    return
168
  if exclude is None:
169
    exclude = []
170
  else:
171
    # Normalize excluded paths
172
    exclude = [os.path.normpath(i) for i in exclude]
173

    
174
  for rel_name in utils.ListVisibleFiles(path):
175
    full_name = utils.PathJoin(path, rel_name)
176
    if full_name in exclude:
177
      continue
178
    if os.path.isfile(full_name) and not os.path.islink(full_name):
179
      utils.RemoveFile(full_name)
180

    
181

    
182
def _BuildUploadFileList():
183
  """Build the list of allowed upload files.
184

185
  This is abstracted so that it's built only once at module import time.
186

187
  """
188
  allowed_files = set([
189
    constants.CLUSTER_CONF_FILE,
190
    constants.ETC_HOSTS,
191
    constants.SSH_KNOWN_HOSTS_FILE,
192
    constants.VNC_PASSWORD_FILE,
193
    constants.RAPI_CERT_FILE,
194
    constants.RAPI_USERS_FILE,
195
    constants.CONFD_HMAC_KEY,
196
    constants.CLUSTER_DOMAIN_SECRET_FILE,
197
    ])
198

    
199
  for hv_name in constants.HYPER_TYPES:
200
    hv_class = hypervisor.GetHypervisorClass(hv_name)
201
    allowed_files.update(hv_class.GetAncillaryFiles())
202

    
203
  return frozenset(allowed_files)
204

    
205

    
206
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
207

    
208

    
209
def JobQueuePurge():
210
  """Removes job queue files and archived jobs.
211

212
  @rtype: tuple
213
  @return: True, None
214

215
  """
216
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
217
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
218

    
219

    
220
def GetMasterInfo():
221
  """Returns master information.
222

223
  This is an utility function to compute master information, either
224
  for consumption here or from the node daemon.
225

226
  @rtype: tuple
227
  @return: master_netdev, master_ip, master_name
228
  @raise RPCFail: in case of errors
229

230
  """
231
  try:
232
    cfg = _GetConfig()
233
    master_netdev = cfg.GetMasterNetdev()
234
    master_ip = cfg.GetMasterIP()
235
    master_node = cfg.GetMasterNode()
236
  except errors.ConfigurationError, err:
237
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
238
  return (master_netdev, master_ip, master_node)
239

    
240

    
241
def StartMaster(start_daemons, no_voting):
242
  """Activate local node as master node.
243

244
  The function will always try activate the IP address of the master
245
  (unless someone else has it). It will also start the master daemons,
246
  based on the start_daemons parameter.
247

248
  @type start_daemons: boolean
249
  @param start_daemons: whether to also start the master
250
      daemons (ganeti-masterd and ganeti-rapi)
251
  @type no_voting: boolean
252
  @param no_voting: whether to start ganeti-masterd without a node vote
253
      (if start_daemons is True), but still non-interactively
254
  @rtype: None
255

256
  """
257
  # GetMasterInfo will raise an exception if not able to return data
258
  master_netdev, master_ip, _ = GetMasterInfo()
259

    
260
  err_msgs = []
261
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
262
    if utils.OwnIpAddress(master_ip):
263
      # we already have the ip:
264
      logging.debug("Master IP already configured, doing nothing")
265
    else:
266
      msg = "Someone else has the master ip, not activating"
267
      logging.error(msg)
268
      err_msgs.append(msg)
269
  else:
270
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
271
                           "dev", master_netdev, "label",
272
                           "%s:0" % master_netdev])
273
    if result.failed:
274
      msg = "Can't activate master IP: %s" % result.output
275
      logging.error(msg)
276
      err_msgs.append(msg)
277

    
278
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
279
                           "-s", master_ip, master_ip])
280
    # we'll ignore the exit code of arping
281

    
282
  # and now start the master and rapi daemons
283
  if start_daemons:
284
    if no_voting:
285
      masterd_args = "--no-voting --yes-do-it"
286
    else:
287
      masterd_args = ""
288

    
289
    env = {
290
      "EXTRA_MASTERD_ARGS": masterd_args,
291
      }
292

    
293
    result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
294
    if result.failed:
295
      msg = "Can't start Ganeti master: %s" % result.output
296
      logging.error(msg)
297
      err_msgs.append(msg)
298

    
299
  if err_msgs:
300
    _Fail("; ".join(err_msgs))
301

    
302

    
303
def StopMaster(stop_daemons):
304
  """Deactivate this node as master.
305

306
  The function will always try to deactivate the IP address of the
307
  master. It will also stop the master daemons depending on the
308
  stop_daemons parameter.
309

310
  @type stop_daemons: boolean
311
  @param stop_daemons: whether to also stop the master daemons
312
      (ganeti-masterd and ganeti-rapi)
313
  @rtype: None
314

315
  """
316
  # TODO: log and report back to the caller the error failures; we
317
  # need to decide in which case we fail the RPC for this
318

    
319
  # GetMasterInfo will raise an exception if not able to return data
320
  master_netdev, master_ip, _ = GetMasterInfo()
321

    
322
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
323
                         "dev", master_netdev])
324
  if result.failed:
325
    logging.error("Can't remove the master IP, error: %s", result.output)
326
    # but otherwise ignore the failure
327

    
328
  if stop_daemons:
329
    result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
330
    if result.failed:
331
      logging.error("Could not stop Ganeti master, command %s had exitcode %s"
332
                    " and error %s",
333
                    result.cmd, result.exit_code, result.output)
334

    
335

    
336
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
337
  """Joins this node to the cluster.
338

339
  This does the following:
340
      - updates the hostkeys of the machine (rsa and dsa)
341
      - adds the ssh private key to the user
342
      - adds the ssh public key to the users' authorized_keys file
343

344
  @type dsa: str
345
  @param dsa: the DSA private key to write
346
  @type dsapub: str
347
  @param dsapub: the DSA public key to write
348
  @type rsa: str
349
  @param rsa: the RSA private key to write
350
  @type rsapub: str
351
  @param rsapub: the RSA public key to write
352
  @type sshkey: str
353
  @param sshkey: the SSH private key to write
354
  @type sshpub: str
355
  @param sshpub: the SSH public key to write
356
  @rtype: boolean
357
  @return: the success of the operation
358

359
  """
360
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
361
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
362
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
363
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
364
  for name, content, mode in sshd_keys:
365
    utils.WriteFile(name, data=content, mode=mode)
366

    
367
  try:
368
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
369
                                                    mkdir=True)
370
  except errors.OpExecError, err:
371
    _Fail("Error while processing user ssh files: %s", err, exc=True)
372

    
373
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
374
    utils.WriteFile(name, data=content, mode=0600)
375

    
376
  utils.AddAuthorizedKey(auth_keys, sshpub)
377

    
378
  result = utils.RunCmd([constants.DAEMON_UTIL, "reload-ssh-keys"])
379
  if result.failed:
380
    _Fail("Unable to reload SSH keys (command %r, exit code %s, output %r)",
381
          result.cmd, result.exit_code, result.output)
382

    
383

    
384
def LeaveCluster(modify_ssh_setup):
385
  """Cleans up and remove the current node.
386

387
  This function cleans up and prepares the current node to be removed
388
  from the cluster.
389

390
  If processing is successful, then it raises an
391
  L{errors.QuitGanetiException} which is used as a special case to
392
  shutdown the node daemon.
393

394
  @param modify_ssh_setup: boolean
395

396
  """
397
  _CleanDirectory(constants.DATA_DIR)
398
  _CleanDirectory(constants.CRYPTO_KEYS_DIR)
399
  JobQueuePurge()
400

    
401
  if modify_ssh_setup:
402
    try:
403
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
404

    
405
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
406

    
407
      utils.RemoveFile(priv_key)
408
      utils.RemoveFile(pub_key)
409
    except errors.OpExecError:
410
      logging.exception("Error while processing ssh files")
411

    
412
  try:
413
    utils.RemoveFile(constants.CONFD_HMAC_KEY)
414
    utils.RemoveFile(constants.RAPI_CERT_FILE)
415
    utils.RemoveFile(constants.NODED_CERT_FILE)
416
  except: # pylint: disable-msg=W0702
417
    logging.exception("Error while removing cluster secrets")
418

    
419
  result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
420
  if result.failed:
421
    logging.error("Command %s failed with exitcode %s and error %s",
422
                  result.cmd, result.exit_code, result.output)
423

    
424
  # Raise a custom exception (handled in ganeti-noded)
425
  raise errors.QuitGanetiException(True, 'Shutdown scheduled')
426

    
427

    
428
def GetNodeInfo(vgname, hypervisor_type):
429
  """Gives back a hash with different information about the node.
430

431
  @type vgname: C{string}
432
  @param vgname: the name of the volume group to ask for disk space information
433
  @type hypervisor_type: C{str}
434
  @param hypervisor_type: the name of the hypervisor to ask for
435
      memory information
436
  @rtype: C{dict}
437
  @return: dictionary with the following keys:
438
      - vg_size is the size of the configured volume group in MiB
439
      - vg_free is the free size of the volume group in MiB
440
      - memory_dom0 is the memory allocated for domain0 in MiB
441
      - memory_free is the currently available (free) ram in MiB
442
      - memory_total is the total number of ram in MiB
443

444
  """
445
  outputarray = {}
446
  vginfo = _GetVGInfo(vgname)
447
  outputarray['vg_size'] = vginfo['vg_size']
448
  outputarray['vg_free'] = vginfo['vg_free']
449

    
450
  hyper = hypervisor.GetHypervisor(hypervisor_type)
451
  hyp_info = hyper.GetNodeInfo()
452
  if hyp_info is not None:
453
    outputarray.update(hyp_info)
454

    
455
  outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
456

    
457
  return outputarray
458

    
459

    
460
def VerifyNode(what, cluster_name):
461
  """Verify the status of the local node.
462

463
  Based on the input L{what} parameter, various checks are done on the
464
  local node.
465

466
  If the I{filelist} key is present, this list of
467
  files is checksummed and the file/checksum pairs are returned.
468

469
  If the I{nodelist} key is present, we check that we have
470
  connectivity via ssh with the target nodes (and check the hostname
471
  report).
472

473
  If the I{node-net-test} key is present, we check that we have
474
  connectivity to the given nodes via both primary IP and, if
475
  applicable, secondary IPs.
476

477
  @type what: C{dict}
478
  @param what: a dictionary of things to check:
479
      - filelist: list of files for which to compute checksums
480
      - nodelist: list of nodes we should check ssh communication with
481
      - node-net-test: list of nodes we should check node daemon port
482
        connectivity with
483
      - hypervisor: list with hypervisors to run the verify for
484
  @rtype: dict
485
  @return: a dictionary with the same keys as the input dict, and
486
      values representing the result of the checks
487

488
  """
489
  result = {}
490
  my_name = utils.HostInfo().name
491
  port = utils.GetDaemonPort(constants.NODED)
492

    
493
  if constants.NV_HYPERVISOR in what:
494
    result[constants.NV_HYPERVISOR] = tmp = {}
495
    for hv_name in what[constants.NV_HYPERVISOR]:
496
      try:
497
        val = hypervisor.GetHypervisor(hv_name).Verify()
498
      except errors.HypervisorError, err:
499
        val = "Error while checking hypervisor: %s" % str(err)
500
      tmp[hv_name] = val
501

    
502
  if constants.NV_FILELIST in what:
503
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
504
      what[constants.NV_FILELIST])
505

    
506
  if constants.NV_NODELIST in what:
507
    result[constants.NV_NODELIST] = tmp = {}
508
    random.shuffle(what[constants.NV_NODELIST])
509
    for node in what[constants.NV_NODELIST]:
510
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
511
      if not success:
512
        tmp[node] = message
513

    
514
  if constants.NV_NODENETTEST in what:
515
    result[constants.NV_NODENETTEST] = tmp = {}
516
    my_pip = my_sip = None
517
    for name, pip, sip in what[constants.NV_NODENETTEST]:
518
      if name == my_name:
519
        my_pip = pip
520
        my_sip = sip
521
        break
522
    if not my_pip:
523
      tmp[my_name] = ("Can't find my own primary/secondary IP"
524
                      " in the node list")
525
    else:
526
      for name, pip, sip in what[constants.NV_NODENETTEST]:
527
        fail = []
528
        if not utils.TcpPing(pip, port, source=my_pip):
529
          fail.append("primary")
530
        if sip != pip:
531
          if not utils.TcpPing(sip, port, source=my_sip):
532
            fail.append("secondary")
533
        if fail:
534
          tmp[name] = ("failure using the %s interface(s)" %
535
                       " and ".join(fail))
536

    
537
  if constants.NV_MASTERIP in what:
538
    # FIXME: add checks on incoming data structures (here and in the
539
    # rest of the function)
540
    master_name, master_ip = what[constants.NV_MASTERIP]
541
    if master_name == my_name:
542
      source = constants.LOCALHOST_IP_ADDRESS
543
    else:
544
      source = None
545
    result[constants.NV_MASTERIP] = utils.TcpPing(master_ip, port,
546
                                                  source=source)
547

    
548
  if constants.NV_LVLIST in what:
549
    try:
550
      val = GetVolumeList(what[constants.NV_LVLIST])
551
    except RPCFail, err:
552
      val = str(err)
553
    result[constants.NV_LVLIST] = val
554

    
555
  if constants.NV_INSTANCELIST in what:
556
    # GetInstanceList can fail
557
    try:
558
      val = GetInstanceList(what[constants.NV_INSTANCELIST])
559
    except RPCFail, err:
560
      val = str(err)
561
    result[constants.NV_INSTANCELIST] = val
562

    
563
  if constants.NV_VGLIST in what:
564
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
565

    
566
  if constants.NV_PVLIST in what:
567
    result[constants.NV_PVLIST] = \
568
      bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
569
                                   filter_allocatable=False)
570

    
571
  if constants.NV_VERSION in what:
572
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
573
                                    constants.RELEASE_VERSION)
574

    
575
  if constants.NV_HVINFO in what:
576
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
577
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
578

    
579
  if constants.NV_DRBDLIST in what:
580
    try:
581
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
582
    except errors.BlockDeviceError, err:
583
      logging.warning("Can't get used minors list", exc_info=True)
584
      used_minors = str(err)
585
    result[constants.NV_DRBDLIST] = used_minors
586

    
587
  if constants.NV_NODESETUP in what:
588
    result[constants.NV_NODESETUP] = tmpr = []
589
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
590
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
591
                  " under /sys, missing required directories /sys/block"
592
                  " and /sys/class/net")
593
    if (not os.path.isdir("/proc/sys") or
594
        not os.path.isfile("/proc/sysrq-trigger")):
595
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
596
                  " under /proc, missing required directory /proc/sys and"
597
                  " the file /proc/sysrq-trigger")
598

    
599
  if constants.NV_TIME in what:
600
    result[constants.NV_TIME] = utils.SplitTime(time.time())
601

    
602
  return result
603

    
604

    
605
def GetVolumeList(vg_name):
606
  """Compute list of logical volumes and their size.
607

608
  @type vg_name: str
609
  @param vg_name: the volume group whose LVs we should list
610
  @rtype: dict
611
  @return:
612
      dictionary of all partions (key) with value being a tuple of
613
      their size (in MiB), inactive and online status::
614

615
        {'test1': ('20.06', True, True)}
616

617
      in case of errors, a string is returned with the error
618
      details.
619

620
  """
621
  lvs = {}
622
  sep = '|'
623
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
624
                         "--separator=%s" % sep,
625
                         "-olv_name,lv_size,lv_attr", vg_name])
626
  if result.failed:
627
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
628

    
629
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
630
  for line in result.stdout.splitlines():
631
    line = line.strip()
632
    match = valid_line_re.match(line)
633
    if not match:
634
      logging.error("Invalid line returned from lvs output: '%s'", line)
635
      continue
636
    name, size, attr = match.groups()
637
    inactive = attr[4] == '-'
638
    online = attr[5] == 'o'
639
    virtual = attr[0] == 'v'
640
    if virtual:
641
      # we don't want to report such volumes as existing, since they
642
      # don't really hold data
643
      continue
644
    lvs[name] = (size, inactive, online)
645

    
646
  return lvs
647

    
648

    
649
def ListVolumeGroups():
650
  """List the volume groups and their size.
651

652
  @rtype: dict
653
  @return: dictionary with keys volume name and values the
654
      size of the volume
655

656
  """
657
  return utils.ListVolumeGroups()
658

    
659

    
660
def NodeVolumes():
661
  """List all volumes on this node.
662

663
  @rtype: list
664
  @return:
665
    A list of dictionaries, each having four keys:
666
      - name: the logical volume name,
667
      - size: the size of the logical volume
668
      - dev: the physical device on which the LV lives
669
      - vg: the volume group to which it belongs
670

671
    In case of errors, we return an empty list and log the
672
    error.
673

674
    Note that since a logical volume can live on multiple physical
675
    volumes, the resulting list might include a logical volume
676
    multiple times.
677

678
  """
679
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
680
                         "--separator=|",
681
                         "--options=lv_name,lv_size,devices,vg_name"])
682
  if result.failed:
683
    _Fail("Failed to list logical volumes, lvs output: %s",
684
          result.output)
685

    
686
  def parse_dev(dev):
687
    return dev.split('(')[0]
688

    
689
  def handle_dev(dev):
690
    return [parse_dev(x) for x in dev.split(",")]
691

    
692
  def map_line(line):
693
    line = [v.strip() for v in line]
694
    return [{'name': line[0], 'size': line[1],
695
             'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
696

    
697
  all_devs = []
698
  for line in result.stdout.splitlines():
699
    if line.count('|') >= 3:
700
      all_devs.extend(map_line(line.split('|')))
701
    else:
702
      logging.warning("Strange line in the output from lvs: '%s'", line)
703
  return all_devs
704

    
705

    
706
def BridgesExist(bridges_list):
707
  """Check if a list of bridges exist on the current node.
708

709
  @rtype: boolean
710
  @return: C{True} if all of them exist, C{False} otherwise
711

712
  """
713
  missing = []
714
  for bridge in bridges_list:
715
    if not utils.BridgeExists(bridge):
716
      missing.append(bridge)
717

    
718
  if missing:
719
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
720

    
721

    
722
def GetInstanceList(hypervisor_list):
723
  """Provides a list of instances.
724

725
  @type hypervisor_list: list
726
  @param hypervisor_list: the list of hypervisors to query information
727

728
  @rtype: list
729
  @return: a list of all running instances on the current node
730
    - instance1.example.com
731
    - instance2.example.com
732

733
  """
734
  results = []
735
  for hname in hypervisor_list:
736
    try:
737
      names = hypervisor.GetHypervisor(hname).ListInstances()
738
      results.extend(names)
739
    except errors.HypervisorError, err:
740
      _Fail("Error enumerating instances (hypervisor %s): %s",
741
            hname, err, exc=True)
742

    
743
  return results
744

    
745

    
746
def GetInstanceInfo(instance, hname):
747
  """Gives back the information about an instance as a dictionary.
748

749
  @type instance: string
750
  @param instance: the instance name
751
  @type hname: string
752
  @param hname: the hypervisor type of the instance
753

754
  @rtype: dict
755
  @return: dictionary with the following keys:
756
      - memory: memory size of instance (int)
757
      - state: xen state of instance (string)
758
      - time: cpu time of instance (float)
759

760
  """
761
  output = {}
762

    
763
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
764
  if iinfo is not None:
765
    output['memory'] = iinfo[2]
766
    output['state'] = iinfo[4]
767
    output['time'] = iinfo[5]
768

    
769
  return output
770

    
771

    
772
def GetInstanceMigratable(instance):
773
  """Gives whether an instance can be migrated.
774

775
  @type instance: L{objects.Instance}
776
  @param instance: object representing the instance to be checked.
777

778
  @rtype: tuple
779
  @return: tuple of (result, description) where:
780
      - result: whether the instance can be migrated or not
781
      - description: a description of the issue, if relevant
782

783
  """
784
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
785
  iname = instance.name
786
  if iname not in hyper.ListInstances():
787
    _Fail("Instance %s is not running", iname)
788

    
789
  for idx in range(len(instance.disks)):
790
    link_name = _GetBlockDevSymlinkPath(iname, idx)
791
    if not os.path.islink(link_name):
792
      _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
793

    
794

    
795
def GetAllInstancesInfo(hypervisor_list):
796
  """Gather data about all instances.
797

798
  This is the equivalent of L{GetInstanceInfo}, except that it
799
  computes data for all instances at once, thus being faster if one
800
  needs data about more than one instance.
801

802
  @type hypervisor_list: list
803
  @param hypervisor_list: list of hypervisors to query for instance data
804

805
  @rtype: dict
806
  @return: dictionary of instance: data, with data having the following keys:
807
      - memory: memory size of instance (int)
808
      - state: xen state of instance (string)
809
      - time: cpu time of instance (float)
810
      - vcpus: the number of vcpus
811

812
  """
813
  output = {}
814

    
815
  for hname in hypervisor_list:
816
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
817
    if iinfo:
818
      for name, _, memory, vcpus, state, times in iinfo:
819
        value = {
820
          'memory': memory,
821
          'vcpus': vcpus,
822
          'state': state,
823
          'time': times,
824
          }
825
        if name in output:
826
          # we only check static parameters, like memory and vcpus,
827
          # and not state and time which can change between the
828
          # invocations of the different hypervisors
829
          for key in 'memory', 'vcpus':
830
            if value[key] != output[name][key]:
831
              _Fail("Instance %s is running twice"
832
                    " with different parameters", name)
833
        output[name] = value
834

    
835
  return output
836

    
837

    
838
def _InstanceLogName(kind, os_name, instance):
839
  """Compute the OS log filename for a given instance and operation.
840

841
  The instance name and os name are passed in as strings since not all
842
  operations have these as part of an instance object.
843

844
  @type kind: string
845
  @param kind: the operation type (e.g. add, import, etc.)
846
  @type os_name: string
847
  @param os_name: the os name
848
  @type instance: string
849
  @param instance: the name of the instance being imported/added/etc.
850

851
  """
852
  # TODO: Use tempfile.mkstemp to create unique filename
853
  base = ("%s-%s-%s-%s.log" %
854
          (kind, os_name, instance, utils.TimestampForFilename()))
855
  return utils.PathJoin(constants.LOG_OS_DIR, base)
856

    
857

    
858
def InstanceOsAdd(instance, reinstall, debug):
859
  """Add an OS to an instance.
860

861
  @type instance: L{objects.Instance}
862
  @param instance: Instance whose OS is to be installed
863
  @type reinstall: boolean
864
  @param reinstall: whether this is an instance reinstall
865
  @type debug: integer
866
  @param debug: debug level, passed to the OS scripts
867
  @rtype: None
868

869
  """
870
  inst_os = OSFromDisk(instance.os)
871

    
872
  create_env = OSEnvironment(instance, inst_os, debug)
873
  if reinstall:
874
    create_env['INSTANCE_REINSTALL'] = "1"
875

    
876
  logfile = _InstanceLogName("add", instance.os, instance.name)
877

    
878
  result = utils.RunCmd([inst_os.create_script], env=create_env,
879
                        cwd=inst_os.path, output=logfile,)
880
  if result.failed:
881
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
882
                  " output: %s", result.cmd, result.fail_reason, logfile,
883
                  result.output)
884
    lines = [utils.SafeEncode(val)
885
             for val in utils.TailFile(logfile, lines=20)]
886
    _Fail("OS create script failed (%s), last lines in the"
887
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
888

    
889

    
890
def RunRenameInstance(instance, old_name, debug):
891
  """Run the OS rename script for an instance.
892

893
  @type instance: L{objects.Instance}
894
  @param instance: Instance whose OS is to be installed
895
  @type old_name: string
896
  @param old_name: previous instance name
897
  @type debug: integer
898
  @param debug: debug level, passed to the OS scripts
899
  @rtype: boolean
900
  @return: the success of the operation
901

902
  """
903
  inst_os = OSFromDisk(instance.os)
904

    
905
  rename_env = OSEnvironment(instance, inst_os, debug)
906
  rename_env['OLD_INSTANCE_NAME'] = old_name
907

    
908
  logfile = _InstanceLogName("rename", instance.os,
909
                             "%s-%s" % (old_name, instance.name))
910

    
911
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
912
                        cwd=inst_os.path, output=logfile)
913

    
914
  if result.failed:
915
    logging.error("os create command '%s' returned error: %s output: %s",
916
                  result.cmd, result.fail_reason, result.output)
917
    lines = [utils.SafeEncode(val)
918
             for val in utils.TailFile(logfile, lines=20)]
919
    _Fail("OS rename script failed (%s), last lines in the"
920
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
921

    
922

    
923
def _GetVGInfo(vg_name):
924
  """Get information about the volume group.
925

926
  @type vg_name: str
927
  @param vg_name: the volume group which we query
928
  @rtype: dict
929
  @return:
930
    A dictionary with the following keys:
931
      - C{vg_size} is the total size of the volume group in MiB
932
      - C{vg_free} is the free size of the volume group in MiB
933
      - C{pv_count} are the number of physical disks in that VG
934

935
    If an error occurs during gathering of data, we return the same dict
936
    with keys all set to None.
937

938
  """
939
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
940

    
941
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
942
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
943

    
944
  if retval.failed:
945
    logging.error("volume group %s not present", vg_name)
946
    return retdic
947
  valarr = retval.stdout.strip().rstrip(':').split(':')
948
  if len(valarr) == 3:
949
    try:
950
      retdic = {
951
        "vg_size": int(round(float(valarr[0]), 0)),
952
        "vg_free": int(round(float(valarr[1]), 0)),
953
        "pv_count": int(valarr[2]),
954
        }
955
    except (TypeError, ValueError), err:
956
      logging.exception("Fail to parse vgs output: %s", err)
957
  else:
958
    logging.error("vgs output has the wrong number of fields (expected"
959
                  " three): %s", str(valarr))
960
  return retdic
961

    
962

    
963
def _GetBlockDevSymlinkPath(instance_name, idx):
964
  return utils.PathJoin(constants.DISK_LINKS_DIR,
965
                        "%s:%d" % (instance_name, idx))
966

    
967

    
968
def _SymlinkBlockDev(instance_name, device_path, idx):
969
  """Set up symlinks to a instance's block device.
970

971
  This is an auxiliary function run when an instance is start (on the primary
972
  node) or when an instance is migrated (on the target node).
973

974

975
  @param instance_name: the name of the target instance
976
  @param device_path: path of the physical block device, on the node
977
  @param idx: the disk index
978
  @return: absolute path to the disk's symlink
979

980
  """
981
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
982
  try:
983
    os.symlink(device_path, link_name)
984
  except OSError, err:
985
    if err.errno == errno.EEXIST:
986
      if (not os.path.islink(link_name) or
987
          os.readlink(link_name) != device_path):
988
        os.remove(link_name)
989
        os.symlink(device_path, link_name)
990
    else:
991
      raise
992

    
993
  return link_name
994

    
995

    
996
def _RemoveBlockDevLinks(instance_name, disks):
997
  """Remove the block device symlinks belonging to the given instance.
998

999
  """
1000
  for idx, _ in enumerate(disks):
1001
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1002
    if os.path.islink(link_name):
1003
      try:
1004
        os.remove(link_name)
1005
      except OSError:
1006
        logging.exception("Can't remove symlink '%s'", link_name)
1007

    
1008

    
1009
def _GatherAndLinkBlockDevs(instance):
1010
  """Set up an instance's block device(s).
1011

1012
  This is run on the primary node at instance startup. The block
1013
  devices must be already assembled.
1014

1015
  @type instance: L{objects.Instance}
1016
  @param instance: the instance whose disks we shoul assemble
1017
  @rtype: list
1018
  @return: list of (disk_object, device_path)
1019

1020
  """
1021
  block_devices = []
1022
  for idx, disk in enumerate(instance.disks):
1023
    device = _RecursiveFindBD(disk)
1024
    if device is None:
1025
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1026
                                    str(disk))
1027
    device.Open()
1028
    try:
1029
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1030
    except OSError, e:
1031
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1032
                                    e.strerror)
1033

    
1034
    block_devices.append((disk, link_name))
1035

    
1036
  return block_devices
1037

    
1038

    
1039
def StartInstance(instance):
1040
  """Start an instance.
1041

1042
  @type instance: L{objects.Instance}
1043
  @param instance: the instance object
1044
  @rtype: None
1045

1046
  """
1047
  running_instances = GetInstanceList([instance.hypervisor])
1048

    
1049
  if instance.name in running_instances:
1050
    logging.info("Instance %s already running, not starting", instance.name)
1051
    return
1052

    
1053
  try:
1054
    block_devices = _GatherAndLinkBlockDevs(instance)
1055
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1056
    hyper.StartInstance(instance, block_devices)
1057
  except errors.BlockDeviceError, err:
1058
    _Fail("Block device error: %s", err, exc=True)
1059
  except errors.HypervisorError, err:
1060
    _RemoveBlockDevLinks(instance.name, instance.disks)
1061
    _Fail("Hypervisor error: %s", err, exc=True)
1062

    
1063

    
1064
def InstanceShutdown(instance, timeout):
1065
  """Shut an instance down.
1066

1067
  @note: this functions uses polling with a hardcoded timeout.
1068

1069
  @type instance: L{objects.Instance}
1070
  @param instance: the instance object
1071
  @type timeout: integer
1072
  @param timeout: maximum timeout for soft shutdown
1073
  @rtype: None
1074

1075
  """
1076
  hv_name = instance.hypervisor
1077
  hyper = hypervisor.GetHypervisor(hv_name)
1078
  iname = instance.name
1079

    
1080
  if instance.name not in hyper.ListInstances():
1081
    logging.info("Instance %s not running, doing nothing", iname)
1082
    return
1083

    
1084
  class _TryShutdown:
1085
    def __init__(self):
1086
      self.tried_once = False
1087

    
1088
    def __call__(self):
1089
      if iname not in hyper.ListInstances():
1090
        return
1091

    
1092
      try:
1093
        hyper.StopInstance(instance, retry=self.tried_once)
1094
      except errors.HypervisorError, err:
1095
        if iname not in hyper.ListInstances():
1096
          # if the instance is no longer existing, consider this a
1097
          # success and go to cleanup
1098
          return
1099

    
1100
        _Fail("Failed to stop instance %s: %s", iname, err)
1101

    
1102
      self.tried_once = True
1103

    
1104
      raise utils.RetryAgain()
1105

    
1106
  try:
1107
    utils.Retry(_TryShutdown(), 5, timeout)
1108
  except utils.RetryTimeout:
1109
    # the shutdown did not succeed
1110
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1111

    
1112
    try:
1113
      hyper.StopInstance(instance, force=True)
1114
    except errors.HypervisorError, err:
1115
      if iname in hyper.ListInstances():
1116
        # only raise an error if the instance still exists, otherwise
1117
        # the error could simply be "instance ... unknown"!
1118
        _Fail("Failed to force stop instance %s: %s", iname, err)
1119

    
1120
    time.sleep(1)
1121

    
1122
    if iname in hyper.ListInstances():
1123
      _Fail("Could not shutdown instance %s even by destroy", iname)
1124

    
1125
  try:
1126
    hyper.CleanupInstance(instance.name)
1127
  except errors.HypervisorError, err:
1128
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1129

    
1130
  _RemoveBlockDevLinks(iname, instance.disks)
1131

    
1132

    
1133
def InstanceReboot(instance, reboot_type, shutdown_timeout):
1134
  """Reboot an instance.
1135

1136
  @type instance: L{objects.Instance}
1137
  @param instance: the instance object to reboot
1138
  @type reboot_type: str
1139
  @param reboot_type: the type of reboot, one the following
1140
    constants:
1141
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1142
        instance OS, do not recreate the VM
1143
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1144
        restart the VM (at the hypervisor level)
1145
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1146
        not accepted here, since that mode is handled differently, in
1147
        cmdlib, and translates into full stop and start of the
1148
        instance (instead of a call_instance_reboot RPC)
1149
  @type shutdown_timeout: integer
1150
  @param shutdown_timeout: maximum timeout for soft shutdown
1151
  @rtype: None
1152

1153
  """
1154
  running_instances = GetInstanceList([instance.hypervisor])
1155

    
1156
  if instance.name not in running_instances:
1157
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1158

    
1159
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1160
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1161
    try:
1162
      hyper.RebootInstance(instance)
1163
    except errors.HypervisorError, err:
1164
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1165
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1166
    try:
1167
      InstanceShutdown(instance, shutdown_timeout)
1168
      return StartInstance(instance)
1169
    except errors.HypervisorError, err:
1170
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1171
  else:
1172
    _Fail("Invalid reboot_type received: %s", reboot_type)
1173

    
1174

    
1175
def MigrationInfo(instance):
1176
  """Gather information about an instance to be migrated.
1177

1178
  @type instance: L{objects.Instance}
1179
  @param instance: the instance definition
1180

1181
  """
1182
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1183
  try:
1184
    info = hyper.MigrationInfo(instance)
1185
  except errors.HypervisorError, err:
1186
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1187
  return info
1188

    
1189

    
1190
def AcceptInstance(instance, info, target):
1191
  """Prepare the node to accept an instance.
1192

1193
  @type instance: L{objects.Instance}
1194
  @param instance: the instance definition
1195
  @type info: string/data (opaque)
1196
  @param info: migration information, from the source node
1197
  @type target: string
1198
  @param target: target host (usually ip), on this node
1199

1200
  """
1201
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1202
  try:
1203
    hyper.AcceptInstance(instance, info, target)
1204
  except errors.HypervisorError, err:
1205
    _Fail("Failed to accept instance: %s", err, exc=True)
1206

    
1207

    
1208
def FinalizeMigration(instance, info, success):
1209
  """Finalize any preparation to accept an instance.
1210

1211
  @type instance: L{objects.Instance}
1212
  @param instance: the instance definition
1213
  @type info: string/data (opaque)
1214
  @param info: migration information, from the source node
1215
  @type success: boolean
1216
  @param success: whether the migration was a success or a failure
1217

1218
  """
1219
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1220
  try:
1221
    hyper.FinalizeMigration(instance, info, success)
1222
  except errors.HypervisorError, err:
1223
    _Fail("Failed to finalize migration: %s", err, exc=True)
1224

    
1225

    
1226
def MigrateInstance(instance, target, live):
1227
  """Migrates an instance to another node.
1228

1229
  @type instance: L{objects.Instance}
1230
  @param instance: the instance definition
1231
  @type target: string
1232
  @param target: the target node name
1233
  @type live: boolean
1234
  @param live: whether the migration should be done live or not (the
1235
      interpretation of this parameter is left to the hypervisor)
1236
  @rtype: tuple
1237
  @return: a tuple of (success, msg) where:
1238
      - succes is a boolean denoting the success/failure of the operation
1239
      - msg is a string with details in case of failure
1240

1241
  """
1242
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1243

    
1244
  try:
1245
    hyper.MigrateInstance(instance, target, live)
1246
  except errors.HypervisorError, err:
1247
    _Fail("Failed to migrate instance: %s", err, exc=True)
1248

    
1249

    
1250
def BlockdevCreate(disk, size, owner, on_primary, info):
1251
  """Creates a block device for an instance.
1252

1253
  @type disk: L{objects.Disk}
1254
  @param disk: the object describing the disk we should create
1255
  @type size: int
1256
  @param size: the size of the physical underlying device, in MiB
1257
  @type owner: str
1258
  @param owner: the name of the instance for which disk is created,
1259
      used for device cache data
1260
  @type on_primary: boolean
1261
  @param on_primary:  indicates if it is the primary node or not
1262
  @type info: string
1263
  @param info: string that will be sent to the physical device
1264
      creation, used for example to set (LVM) tags on LVs
1265

1266
  @return: the new unique_id of the device (this can sometime be
1267
      computed only after creation), or None. On secondary nodes,
1268
      it's not required to return anything.
1269

1270
  """
1271
  # TODO: remove the obsolete 'size' argument
1272
  # pylint: disable-msg=W0613
1273
  clist = []
1274
  if disk.children:
1275
    for child in disk.children:
1276
      try:
1277
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1278
      except errors.BlockDeviceError, err:
1279
        _Fail("Can't assemble device %s: %s", child, err)
1280
      if on_primary or disk.AssembleOnSecondary():
1281
        # we need the children open in case the device itself has to
1282
        # be assembled
1283
        try:
1284
          # pylint: disable-msg=E1103
1285
          crdev.Open()
1286
        except errors.BlockDeviceError, err:
1287
          _Fail("Can't make child '%s' read-write: %s", child, err)
1288
      clist.append(crdev)
1289

    
1290
  try:
1291
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1292
  except errors.BlockDeviceError, err:
1293
    _Fail("Can't create block device: %s", err)
1294

    
1295
  if on_primary or disk.AssembleOnSecondary():
1296
    try:
1297
      device.Assemble()
1298
    except errors.BlockDeviceError, err:
1299
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1300
    device.SetSyncSpeed(constants.SYNC_SPEED)
1301
    if on_primary or disk.OpenOnSecondary():
1302
      try:
1303
        device.Open(force=True)
1304
      except errors.BlockDeviceError, err:
1305
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1306
    DevCacheManager.UpdateCache(device.dev_path, owner,
1307
                                on_primary, disk.iv_name)
1308

    
1309
  device.SetInfo(info)
1310

    
1311
  return device.unique_id
1312

    
1313

    
1314
def BlockdevRemove(disk):
1315
  """Remove a block device.
1316

1317
  @note: This is intended to be called recursively.
1318

1319
  @type disk: L{objects.Disk}
1320
  @param disk: the disk object we should remove
1321
  @rtype: boolean
1322
  @return: the success of the operation
1323

1324
  """
1325
  msgs = []
1326
  try:
1327
    rdev = _RecursiveFindBD(disk)
1328
  except errors.BlockDeviceError, err:
1329
    # probably can't attach
1330
    logging.info("Can't attach to device %s in remove", disk)
1331
    rdev = None
1332
  if rdev is not None:
1333
    r_path = rdev.dev_path
1334
    try:
1335
      rdev.Remove()
1336
    except errors.BlockDeviceError, err:
1337
      msgs.append(str(err))
1338
    if not msgs:
1339
      DevCacheManager.RemoveCache(r_path)
1340

    
1341
  if disk.children:
1342
    for child in disk.children:
1343
      try:
1344
        BlockdevRemove(child)
1345
      except RPCFail, err:
1346
        msgs.append(str(err))
1347

    
1348
  if msgs:
1349
    _Fail("; ".join(msgs))
1350

    
1351

    
1352
def _RecursiveAssembleBD(disk, owner, as_primary):
1353
  """Activate a block device for an instance.
1354

1355
  This is run on the primary and secondary nodes for an instance.
1356

1357
  @note: this function is called recursively.
1358

1359
  @type disk: L{objects.Disk}
1360
  @param disk: the disk we try to assemble
1361
  @type owner: str
1362
  @param owner: the name of the instance which owns the disk
1363
  @type as_primary: boolean
1364
  @param as_primary: if we should make the block device
1365
      read/write
1366

1367
  @return: the assembled device or None (in case no device
1368
      was assembled)
1369
  @raise errors.BlockDeviceError: in case there is an error
1370
      during the activation of the children or the device
1371
      itself
1372

1373
  """
1374
  children = []
1375
  if disk.children:
1376
    mcn = disk.ChildrenNeeded()
1377
    if mcn == -1:
1378
      mcn = 0 # max number of Nones allowed
1379
    else:
1380
      mcn = len(disk.children) - mcn # max number of Nones
1381
    for chld_disk in disk.children:
1382
      try:
1383
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1384
      except errors.BlockDeviceError, err:
1385
        if children.count(None) >= mcn:
1386
          raise
1387
        cdev = None
1388
        logging.error("Error in child activation (but continuing): %s",
1389
                      str(err))
1390
      children.append(cdev)
1391

    
1392
  if as_primary or disk.AssembleOnSecondary():
1393
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1394
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1395
    result = r_dev
1396
    if as_primary or disk.OpenOnSecondary():
1397
      r_dev.Open()
1398
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1399
                                as_primary, disk.iv_name)
1400

    
1401
  else:
1402
    result = True
1403
  return result
1404

    
1405

    
1406
def BlockdevAssemble(disk, owner, as_primary):
1407
  """Activate a block device for an instance.
1408

1409
  This is a wrapper over _RecursiveAssembleBD.
1410

1411
  @rtype: str or boolean
1412
  @return: a C{/dev/...} path for primary nodes, and
1413
      C{True} for secondary nodes
1414

1415
  """
1416
  try:
1417
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1418
    if isinstance(result, bdev.BlockDev):
1419
      # pylint: disable-msg=E1103
1420
      result = result.dev_path
1421
  except errors.BlockDeviceError, err:
1422
    _Fail("Error while assembling disk: %s", err, exc=True)
1423

    
1424
  return result
1425

    
1426

    
1427
def BlockdevShutdown(disk):
1428
  """Shut down a block device.
1429

1430
  First, if the device is assembled (Attach() is successful), then
1431
  the device is shutdown. Then the children of the device are
1432
  shutdown.
1433

1434
  This function is called recursively. Note that we don't cache the
1435
  children or such, as oppossed to assemble, shutdown of different
1436
  devices doesn't require that the upper device was active.
1437

1438
  @type disk: L{objects.Disk}
1439
  @param disk: the description of the disk we should
1440
      shutdown
1441
  @rtype: None
1442

1443
  """
1444
  msgs = []
1445
  r_dev = _RecursiveFindBD(disk)
1446
  if r_dev is not None:
1447
    r_path = r_dev.dev_path
1448
    try:
1449
      r_dev.Shutdown()
1450
      DevCacheManager.RemoveCache(r_path)
1451
    except errors.BlockDeviceError, err:
1452
      msgs.append(str(err))
1453

    
1454
  if disk.children:
1455
    for child in disk.children:
1456
      try:
1457
        BlockdevShutdown(child)
1458
      except RPCFail, err:
1459
        msgs.append(str(err))
1460

    
1461
  if msgs:
1462
    _Fail("; ".join(msgs))
1463

    
1464

    
1465
def BlockdevAddchildren(parent_cdev, new_cdevs):
1466
  """Extend a mirrored block device.
1467

1468
  @type parent_cdev: L{objects.Disk}
1469
  @param parent_cdev: the disk to which we should add children
1470
  @type new_cdevs: list of L{objects.Disk}
1471
  @param new_cdevs: the list of children which we should add
1472
  @rtype: None
1473

1474
  """
1475
  parent_bdev = _RecursiveFindBD(parent_cdev)
1476
  if parent_bdev is None:
1477
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1478
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1479
  if new_bdevs.count(None) > 0:
1480
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1481
  parent_bdev.AddChildren(new_bdevs)
1482

    
1483

    
1484
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1485
  """Shrink a mirrored block device.
1486

1487
  @type parent_cdev: L{objects.Disk}
1488
  @param parent_cdev: the disk from which we should remove children
1489
  @type new_cdevs: list of L{objects.Disk}
1490
  @param new_cdevs: the list of children which we should remove
1491
  @rtype: None
1492

1493
  """
1494
  parent_bdev = _RecursiveFindBD(parent_cdev)
1495
  if parent_bdev is None:
1496
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1497
  devs = []
1498
  for disk in new_cdevs:
1499
    rpath = disk.StaticDevPath()
1500
    if rpath is None:
1501
      bd = _RecursiveFindBD(disk)
1502
      if bd is None:
1503
        _Fail("Can't find device %s while removing children", disk)
1504
      else:
1505
        devs.append(bd.dev_path)
1506
    else:
1507
      if not utils.IsNormAbsPath(rpath):
1508
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1509
      devs.append(rpath)
1510
  parent_bdev.RemoveChildren(devs)
1511

    
1512

    
1513
def BlockdevGetmirrorstatus(disks):
1514
  """Get the mirroring status of a list of devices.
1515

1516
  @type disks: list of L{objects.Disk}
1517
  @param disks: the list of disks which we should query
1518
  @rtype: disk
1519
  @return:
1520
      a list of (mirror_done, estimated_time) tuples, which
1521
      are the result of L{bdev.BlockDev.CombinedSyncStatus}
1522
  @raise errors.BlockDeviceError: if any of the disks cannot be
1523
      found
1524

1525
  """
1526
  stats = []
1527
  for dsk in disks:
1528
    rbd = _RecursiveFindBD(dsk)
1529
    if rbd is None:
1530
      _Fail("Can't find device %s", dsk)
1531

    
1532
    stats.append(rbd.CombinedSyncStatus())
1533

    
1534
  return stats
1535

    
1536

    
1537
def _RecursiveFindBD(disk):
1538
  """Check if a device is activated.
1539

1540
  If so, return information about the real device.
1541

1542
  @type disk: L{objects.Disk}
1543
  @param disk: the disk object we need to find
1544

1545
  @return: None if the device can't be found,
1546
      otherwise the device instance
1547

1548
  """
1549
  children = []
1550
  if disk.children:
1551
    for chdisk in disk.children:
1552
      children.append(_RecursiveFindBD(chdisk))
1553

    
1554
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1555

    
1556

    
1557
def _OpenRealBD(disk):
1558
  """Opens the underlying block device of a disk.
1559

1560
  @type disk: L{objects.Disk}
1561
  @param disk: the disk object we want to open
1562

1563
  """
1564
  real_disk = _RecursiveFindBD(disk)
1565
  if real_disk is None:
1566
    _Fail("Block device '%s' is not set up", disk)
1567

    
1568
  real_disk.Open()
1569

    
1570
  return real_disk
1571

    
1572

    
1573
def BlockdevFind(disk):
1574
  """Check if a device is activated.
1575

1576
  If it is, return information about the real device.
1577

1578
  @type disk: L{objects.Disk}
1579
  @param disk: the disk to find
1580
  @rtype: None or objects.BlockDevStatus
1581
  @return: None if the disk cannot be found, otherwise a the current
1582
           information
1583

1584
  """
1585
  try:
1586
    rbd = _RecursiveFindBD(disk)
1587
  except errors.BlockDeviceError, err:
1588
    _Fail("Failed to find device: %s", err, exc=True)
1589

    
1590
  if rbd is None:
1591
    return None
1592

    
1593
  return rbd.GetSyncStatus()
1594

    
1595

    
1596
def BlockdevGetsize(disks):
1597
  """Computes the size of the given disks.
1598

1599
  If a disk is not found, returns None instead.
1600

1601
  @type disks: list of L{objects.Disk}
1602
  @param disks: the list of disk to compute the size for
1603
  @rtype: list
1604
  @return: list with elements None if the disk cannot be found,
1605
      otherwise the size
1606

1607
  """
1608
  result = []
1609
  for cf in disks:
1610
    try:
1611
      rbd = _RecursiveFindBD(cf)
1612
    except errors.BlockDeviceError:
1613
      result.append(None)
1614
      continue
1615
    if rbd is None:
1616
      result.append(None)
1617
    else:
1618
      result.append(rbd.GetActualSize())
1619
  return result
1620

    
1621

    
1622
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1623
  """Export a block device to a remote node.
1624

1625
  @type disk: L{objects.Disk}
1626
  @param disk: the description of the disk to export
1627
  @type dest_node: str
1628
  @param dest_node: the destination node to export to
1629
  @type dest_path: str
1630
  @param dest_path: the destination path on the target node
1631
  @type cluster_name: str
1632
  @param cluster_name: the cluster name, needed for SSH hostalias
1633
  @rtype: None
1634

1635
  """
1636
  real_disk = _OpenRealBD(disk)
1637

    
1638
  # the block size on the read dd is 1MiB to match our units
1639
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1640
                               "dd if=%s bs=1048576 count=%s",
1641
                               real_disk.dev_path, str(disk.size))
1642

    
1643
  # we set here a smaller block size as, due to ssh buffering, more
1644
  # than 64-128k will mostly ignored; we use nocreat to fail if the
1645
  # device is not already there or we pass a wrong path; we use
1646
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1647
  # to not buffer too much memory; this means that at best, we flush
1648
  # every 64k, which will not be very fast
1649
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1650
                                " oflag=dsync", dest_path)
1651

    
1652
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1653
                                                   constants.GANETI_RUNAS,
1654
                                                   destcmd)
1655

    
1656
  # all commands have been checked, so we're safe to combine them
1657
  command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1658

    
1659
  result = utils.RunCmd(["bash", "-c", command])
1660

    
1661
  if result.failed:
1662
    _Fail("Disk copy command '%s' returned error: %s"
1663
          " output: %s", command, result.fail_reason, result.output)
1664

    
1665

    
1666
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1667
  """Write a file to the filesystem.
1668

1669
  This allows the master to overwrite(!) a file. It will only perform
1670
  the operation if the file belongs to a list of configuration files.
1671

1672
  @type file_name: str
1673
  @param file_name: the target file name
1674
  @type data: str
1675
  @param data: the new contents of the file
1676
  @type mode: int
1677
  @param mode: the mode to give the file (can be None)
1678
  @type uid: int
1679
  @param uid: the owner of the file (can be -1 for default)
1680
  @type gid: int
1681
  @param gid: the group of the file (can be -1 for default)
1682
  @type atime: float
1683
  @param atime: the atime to set on the file (can be None)
1684
  @type mtime: float
1685
  @param mtime: the mtime to set on the file (can be None)
1686
  @rtype: None
1687

1688
  """
1689
  if not os.path.isabs(file_name):
1690
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1691

    
1692
  if file_name not in _ALLOWED_UPLOAD_FILES:
1693
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1694
          file_name)
1695

    
1696
  raw_data = _Decompress(data)
1697

    
1698
  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1699
                  atime=atime, mtime=mtime)
1700

    
1701

    
1702
def WriteSsconfFiles(values):
1703
  """Update all ssconf files.
1704

1705
  Wrapper around the SimpleStore.WriteFiles.
1706

1707
  """
1708
  ssconf.SimpleStore().WriteFiles(values)
1709

    
1710

    
1711
def _ErrnoOrStr(err):
1712
  """Format an EnvironmentError exception.
1713

1714
  If the L{err} argument has an errno attribute, it will be looked up
1715
  and converted into a textual C{E...} description. Otherwise the
1716
  string representation of the error will be returned.
1717

1718
  @type err: L{EnvironmentError}
1719
  @param err: the exception to format
1720

1721
  """
1722
  if hasattr(err, 'errno'):
1723
    detail = errno.errorcode[err.errno]
1724
  else:
1725
    detail = str(err)
1726
  return detail
1727

    
1728

    
1729
def _OSOndiskAPIVersion(os_dir):
1730
  """Compute and return the API version of a given OS.
1731

1732
  This function will try to read the API version of the OS residing in
1733
  the 'os_dir' directory.
1734

1735
  @type os_dir: str
1736
  @param os_dir: the directory in which we should look for the OS
1737
  @rtype: tuple
1738
  @return: tuple (status, data) with status denoting the validity and
1739
      data holding either the vaid versions or an error message
1740

1741
  """
1742
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1743

    
1744
  try:
1745
    st = os.stat(api_file)
1746
  except EnvironmentError, err:
1747
    return False, ("Required file '%s' not found under path %s: %s" %
1748
                   (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1749

    
1750
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1751
    return False, ("File '%s' in %s is not a regular file" %
1752
                   (constants.OS_API_FILE, os_dir))
1753

    
1754
  try:
1755
    api_versions = utils.ReadFile(api_file).splitlines()
1756
  except EnvironmentError, err:
1757
    return False, ("Error while reading the API version file at %s: %s" %
1758
                   (api_file, _ErrnoOrStr(err)))
1759

    
1760
  try:
1761
    api_versions = [int(version.strip()) for version in api_versions]
1762
  except (TypeError, ValueError), err:
1763
    return False, ("API version(s) can't be converted to integer: %s" %
1764
                   str(err))
1765

    
1766
  return True, api_versions
1767

    
1768

    
1769
def DiagnoseOS(top_dirs=None):
1770
  """Compute the validity for all OSes.
1771

1772
  @type top_dirs: list
1773
  @param top_dirs: the list of directories in which to
1774
      search (if not given defaults to
1775
      L{constants.OS_SEARCH_PATH})
1776
  @rtype: list of L{objects.OS}
1777
  @return: a list of tuples (name, path, status, diagnose, variants)
1778
      for all (potential) OSes under all search paths, where:
1779
          - name is the (potential) OS name
1780
          - path is the full path to the OS
1781
          - status True/False is the validity of the OS
1782
          - diagnose is the error message for an invalid OS, otherwise empty
1783
          - variants is a list of supported OS variants, if any
1784
          - parameters is a list of (name, help) parameters, if any
1785

1786
  """
1787
  if top_dirs is None:
1788
    top_dirs = constants.OS_SEARCH_PATH
1789

    
1790
  result = []
1791
  for dir_name in top_dirs:
1792
    if os.path.isdir(dir_name):
1793
      try:
1794
        f_names = utils.ListVisibleFiles(dir_name)
1795
      except EnvironmentError, err:
1796
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1797
        break
1798
      for name in f_names:
1799
        os_path = utils.PathJoin(dir_name, name)
1800
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1801
        if status:
1802
          diagnose = ""
1803
          variants = os_inst.supported_variants
1804
          parameters = os_inst.supported_parameters
1805
        else:
1806
          diagnose = os_inst
1807
          variants = parameters = []
1808
        result.append((name, os_path, status, diagnose, variants, parameters))
1809

    
1810
  return result
1811

    
1812

    
1813
def _TryOSFromDisk(name, base_dir=None):
1814
  """Create an OS instance from disk.
1815

1816
  This function will return an OS instance if the given name is a
1817
  valid OS name.
1818

1819
  @type base_dir: string
1820
  @keyword base_dir: Base directory containing OS installations.
1821
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1822
  @rtype: tuple
1823
  @return: success and either the OS instance if we find a valid one,
1824
      or error message
1825

1826
  """
1827
  if base_dir is None:
1828
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1829
  else:
1830
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1831

    
1832
  if os_dir is None:
1833
    return False, "Directory for OS %s not found in search path" % name
1834

    
1835
  status, api_versions = _OSOndiskAPIVersion(os_dir)
1836
  if not status:
1837
    # push the error up
1838
    return status, api_versions
1839

    
1840
  if not constants.OS_API_VERSIONS.intersection(api_versions):
1841
    return False, ("API version mismatch for path '%s': found %s, want %s." %
1842
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
1843

    
1844
  # OS Files dictionary, we will populate it with the absolute path names
1845
  os_files = dict.fromkeys(constants.OS_SCRIPTS)
1846

    
1847
  if max(api_versions) >= constants.OS_API_V15:
1848
    os_files[constants.OS_VARIANTS_FILE] = ''
1849

    
1850
  if max(api_versions) >= constants.OS_API_V20:
1851
    os_files[constants.OS_PARAMETERS_FILE] = ''
1852
  else:
1853
    del os_files[constants.OS_SCRIPT_VERIFY]
1854

    
1855
  for filename in os_files:
1856
    os_files[filename] = utils.PathJoin(os_dir, filename)
1857

    
1858
    try:
1859
      st = os.stat(os_files[filename])
1860
    except EnvironmentError, err:
1861
      return False, ("File '%s' under path '%s' is missing (%s)" %
1862
                     (filename, os_dir, _ErrnoOrStr(err)))
1863

    
1864
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1865
      return False, ("File '%s' under path '%s' is not a regular file" %
1866
                     (filename, os_dir))
1867

    
1868
    if filename in constants.OS_SCRIPTS:
1869
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1870
        return False, ("File '%s' under path '%s' is not executable" %
1871
                       (filename, os_dir))
1872

    
1873
  variants = []
1874
  if constants.OS_VARIANTS_FILE in os_files:
1875
    variants_file = os_files[constants.OS_VARIANTS_FILE]
1876
    try:
1877
      variants = utils.ReadFile(variants_file).splitlines()
1878
    except EnvironmentError, err:
1879
      return False, ("Error while reading the OS variants file at %s: %s" %
1880
                     (variants_file, _ErrnoOrStr(err)))
1881
    if not variants:
1882
      return False, ("No supported os variant found")
1883

    
1884
  parameters = []
1885
  if constants.OS_PARAMETERS_FILE in os_files:
1886
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
1887
    try:
1888
      parameters = utils.ReadFile(parameters_file).splitlines()
1889
    except EnvironmentError, err:
1890
      return False, ("Error while reading the OS parameters file at %s: %s" %
1891
                     (parameters_file, _ErrnoOrStr(err)))
1892
    parameters = [v.split(None, 1) for v in parameters]
1893

    
1894
  os_obj = objects.OS(name=name, path=os_dir,
1895
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
1896
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
1897
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
1898
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
1899
                      verify_script=os_files[constants.OS_SCRIPT_VERIFY],
1900
                      supported_variants=variants,
1901
                      supported_parameters=parameters,
1902
                      api_versions=api_versions)
1903
  return True, os_obj
1904

    
1905

    
1906
def OSFromDisk(name, base_dir=None):
1907
  """Create an OS instance from disk.
1908

1909
  This function will return an OS instance if the given name is a
1910
  valid OS name. Otherwise, it will raise an appropriate
1911
  L{RPCFail} exception, detailing why this is not a valid OS.
1912

1913
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1914
  an exception but returns true/false status data.
1915

1916
  @type base_dir: string
1917
  @keyword base_dir: Base directory containing OS installations.
1918
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1919
  @rtype: L{objects.OS}
1920
  @return: the OS instance if we find a valid one
1921
  @raise RPCFail: if we don't find a valid OS
1922

1923
  """
1924
  name_only = name.split("+", 1)[0]
1925
  status, payload = _TryOSFromDisk(name_only, base_dir)
1926

    
1927
  if not status:
1928
    _Fail(payload)
1929

    
1930
  return payload
1931

    
1932

    
1933
def OSCoreEnv(inst_os, debug=0):
1934
  """Calculate the basic environment for an os script.
1935

1936
  @type inst_os: L{objects.OS}
1937
  @param inst_os: operating system for which the environment is being built
1938
  @type debug: integer
1939
  @param debug: debug level (0 or 1, for OS Api 10)
1940
  @rtype: dict
1941
  @return: dict of environment variables
1942
  @raise errors.BlockDeviceError: if the block device
1943
      cannot be found
1944

1945
  """
1946
  result = {}
1947
  api_version = \
1948
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1949
  result['OS_API_VERSION'] = '%d' % api_version
1950
  result['OS_NAME'] = inst_os.name
1951
  result['DEBUG_LEVEL'] = '%d' % debug
1952

    
1953
  # OS variants
1954
  if api_version >= constants.OS_API_V15:
1955
    try:
1956
      variant = inst_os.name.split('+', 1)[1]
1957
    except IndexError:
1958
      variant = inst_os.supported_variants[0]
1959
    result['OS_VARIANT'] = variant
1960

    
1961
  return result
1962

    
1963

    
1964
def OSEnvironment(instance, inst_os, debug=0):
1965
  """Calculate the environment for an os script.
1966

1967
  @type instance: L{objects.Instance}
1968
  @param instance: target instance for the os script run
1969
  @type inst_os: L{objects.OS}
1970
  @param inst_os: operating system for which the environment is being built
1971
  @type debug: integer
1972
  @param debug: debug level (0 or 1, for OS Api 10)
1973
  @rtype: dict
1974
  @return: dict of environment variables
1975
  @raise errors.BlockDeviceError: if the block device
1976
      cannot be found
1977

1978
  """
1979
  result = OSCoreEnv(inst_os, debug)
1980

    
1981
  result['INSTANCE_NAME'] = instance.name
1982
  result['INSTANCE_OS'] = instance.os
1983
  result['HYPERVISOR'] = instance.hypervisor
1984
  result['DISK_COUNT'] = '%d' % len(instance.disks)
1985
  result['NIC_COUNT'] = '%d' % len(instance.nics)
1986

    
1987
  # Disks
1988
  for idx, disk in enumerate(instance.disks):
1989
    real_disk = _OpenRealBD(disk)
1990
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
1991
    result['DISK_%d_ACCESS' % idx] = disk.mode
1992
    if constants.HV_DISK_TYPE in instance.hvparams:
1993
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
1994
        instance.hvparams[constants.HV_DISK_TYPE]
1995
    if disk.dev_type in constants.LDS_BLOCK:
1996
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1997
    elif disk.dev_type == constants.LD_FILE:
1998
      result['DISK_%d_BACKEND_TYPE' % idx] = \
1999
        'file:%s' % disk.physical_id[0]
2000

    
2001
  # NICs
2002
  for idx, nic in enumerate(instance.nics):
2003
    result['NIC_%d_MAC' % idx] = nic.mac
2004
    if nic.ip:
2005
      result['NIC_%d_IP' % idx] = nic.ip
2006
    result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
2007
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2008
      result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
2009
    if nic.nicparams[constants.NIC_LINK]:
2010
      result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
2011
    if constants.HV_NIC_TYPE in instance.hvparams:
2012
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
2013
        instance.hvparams[constants.HV_NIC_TYPE]
2014

    
2015
  # HV/BE params
2016
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2017
    for key, value in source.items():
2018
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2019

    
2020
  return result
2021

    
2022

    
2023
def BlockdevGrow(disk, amount):
2024
  """Grow a stack of block devices.
2025

2026
  This function is called recursively, with the childrens being the
2027
  first ones to resize.
2028

2029
  @type disk: L{objects.Disk}
2030
  @param disk: the disk to be grown
2031
  @rtype: (status, result)
2032
  @return: a tuple with the status of the operation
2033
      (True/False), and the errors message if status
2034
      is False
2035

2036
  """
2037
  r_dev = _RecursiveFindBD(disk)
2038
  if r_dev is None:
2039
    _Fail("Cannot find block device %s", disk)
2040

    
2041
  try:
2042
    r_dev.Grow(amount)
2043
  except errors.BlockDeviceError, err:
2044
    _Fail("Failed to grow block device: %s", err, exc=True)
2045

    
2046

    
2047
def BlockdevSnapshot(disk):
2048
  """Create a snapshot copy of a block device.
2049

2050
  This function is called recursively, and the snapshot is actually created
2051
  just for the leaf lvm backend device.
2052

2053
  @type disk: L{objects.Disk}
2054
  @param disk: the disk to be snapshotted
2055
  @rtype: string
2056
  @return: snapshot disk path
2057

2058
  """
2059
  if disk.dev_type == constants.LD_DRBD8:
2060
    if not disk.children:
2061
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2062
            disk.unique_id)
2063
    return BlockdevSnapshot(disk.children[0])
2064
  elif disk.dev_type == constants.LD_LV:
2065
    r_dev = _RecursiveFindBD(disk)
2066
    if r_dev is not None:
2067
      # FIXME: choose a saner value for the snapshot size
2068
      # let's stay on the safe side and ask for the full size, for now
2069
      return r_dev.Snapshot(disk.size)
2070
    else:
2071
      _Fail("Cannot find block device %s", disk)
2072
  else:
2073
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2074
          disk.unique_id, disk.dev_type)
2075

    
2076

    
2077
def FinalizeExport(instance, snap_disks):
2078
  """Write out the export configuration information.
2079

2080
  @type instance: L{objects.Instance}
2081
  @param instance: the instance which we export, used for
2082
      saving configuration
2083
  @type snap_disks: list of L{objects.Disk}
2084
  @param snap_disks: list of snapshot block devices, which
2085
      will be used to get the actual name of the dump file
2086

2087
  @rtype: None
2088

2089
  """
2090
  destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2091
  finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2092

    
2093
  config = objects.SerializableConfigParser()
2094

    
2095
  config.add_section(constants.INISECT_EXP)
2096
  config.set(constants.INISECT_EXP, 'version', '0')
2097
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2098
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2099
  config.set(constants.INISECT_EXP, 'os', instance.os)
2100
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
2101

    
2102
  config.add_section(constants.INISECT_INS)
2103
  config.set(constants.INISECT_INS, 'name', instance.name)
2104
  config.set(constants.INISECT_INS, 'memory', '%d' %
2105
             instance.beparams[constants.BE_MEMORY])
2106
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
2107
             instance.beparams[constants.BE_VCPUS])
2108
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2109
  config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2110

    
2111
  nic_total = 0
2112
  for nic_count, nic in enumerate(instance.nics):
2113
    nic_total += 1
2114
    config.set(constants.INISECT_INS, 'nic%d_mac' %
2115
               nic_count, '%s' % nic.mac)
2116
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2117
    for param in constants.NICS_PARAMETER_TYPES:
2118
      config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2119
                 '%s' % nic.nicparams.get(param, None))
2120
  # TODO: redundant: on load can read nics until it doesn't exist
2121
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2122

    
2123
  disk_total = 0
2124
  for disk_count, disk in enumerate(snap_disks):
2125
    if disk:
2126
      disk_total += 1
2127
      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2128
                 ('%s' % disk.iv_name))
2129
      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2130
                 ('%s' % disk.physical_id[1]))
2131
      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2132
                 ('%d' % disk.size))
2133

    
2134
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2135

    
2136
  # New-style hypervisor/backend parameters
2137

    
2138
  config.add_section(constants.INISECT_HYP)
2139
  for name, value in instance.hvparams.items():
2140
    if name not in constants.HVC_GLOBALS:
2141
      config.set(constants.INISECT_HYP, name, str(value))
2142

    
2143
  config.add_section(constants.INISECT_BEP)
2144
  for name, value in instance.beparams.items():
2145
    config.set(constants.INISECT_BEP, name, str(value))
2146

    
2147
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2148
                  data=config.Dumps())
2149
  shutil.rmtree(finaldestdir, ignore_errors=True)
2150
  shutil.move(destdir, finaldestdir)
2151

    
2152

    
2153
def ExportInfo(dest):
2154
  """Get export configuration information.
2155

2156
  @type dest: str
2157
  @param dest: directory containing the export
2158

2159
  @rtype: L{objects.SerializableConfigParser}
2160
  @return: a serializable config file containing the
2161
      export info
2162

2163
  """
2164
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2165

    
2166
  config = objects.SerializableConfigParser()
2167
  config.read(cff)
2168

    
2169
  if (not config.has_section(constants.INISECT_EXP) or
2170
      not config.has_section(constants.INISECT_INS)):
2171
    _Fail("Export info file doesn't have the required fields")
2172

    
2173
  return config.Dumps()
2174

    
2175

    
2176
def ListExports():
2177
  """Return a list of exports currently available on this machine.
2178

2179
  @rtype: list
2180
  @return: list of the exports
2181

2182
  """
2183
  if os.path.isdir(constants.EXPORT_DIR):
2184
    return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2185
  else:
2186
    _Fail("No exports directory")
2187

    
2188

    
2189
def RemoveExport(export):
2190
  """Remove an existing export from the node.
2191

2192
  @type export: str
2193
  @param export: the name of the export to remove
2194
  @rtype: None
2195

2196
  """
2197
  target = utils.PathJoin(constants.EXPORT_DIR, export)
2198

    
2199
  try:
2200
    shutil.rmtree(target)
2201
  except EnvironmentError, err:
2202
    _Fail("Error while removing the export: %s", err, exc=True)
2203

    
2204

    
2205
def BlockdevRename(devlist):
2206
  """Rename a list of block devices.
2207

2208
  @type devlist: list of tuples
2209
  @param devlist: list of tuples of the form  (disk,
2210
      new_logical_id, new_physical_id); disk is an
2211
      L{objects.Disk} object describing the current disk,
2212
      and new logical_id/physical_id is the name we
2213
      rename it to
2214
  @rtype: boolean
2215
  @return: True if all renames succeeded, False otherwise
2216

2217
  """
2218
  msgs = []
2219
  result = True
2220
  for disk, unique_id in devlist:
2221
    dev = _RecursiveFindBD(disk)
2222
    if dev is None:
2223
      msgs.append("Can't find device %s in rename" % str(disk))
2224
      result = False
2225
      continue
2226
    try:
2227
      old_rpath = dev.dev_path
2228
      dev.Rename(unique_id)
2229
      new_rpath = dev.dev_path
2230
      if old_rpath != new_rpath:
2231
        DevCacheManager.RemoveCache(old_rpath)
2232
        # FIXME: we should add the new cache information here, like:
2233
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2234
        # but we don't have the owner here - maybe parse from existing
2235
        # cache? for now, we only lose lvm data when we rename, which
2236
        # is less critical than DRBD or MD
2237
    except errors.BlockDeviceError, err:
2238
      msgs.append("Can't rename device '%s' to '%s': %s" %
2239
                  (dev, unique_id, err))
2240
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2241
      result = False
2242
  if not result:
2243
    _Fail("; ".join(msgs))
2244

    
2245

    
2246
def _TransformFileStorageDir(file_storage_dir):
2247
  """Checks whether given file_storage_dir is valid.
2248

2249
  Checks wheter the given file_storage_dir is within the cluster-wide
2250
  default file_storage_dir stored in SimpleStore. Only paths under that
2251
  directory are allowed.
2252

2253
  @type file_storage_dir: str
2254
  @param file_storage_dir: the path to check
2255

2256
  @return: the normalized path if valid, None otherwise
2257

2258
  """
2259
  if not constants.ENABLE_FILE_STORAGE:
2260
    _Fail("File storage disabled at configure time")
2261
  cfg = _GetConfig()
2262
  file_storage_dir = os.path.normpath(file_storage_dir)
2263
  base_file_storage_dir = cfg.GetFileStorageDir()
2264
  if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2265
      base_file_storage_dir):
2266
    _Fail("File storage directory '%s' is not under base file"
2267
          " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2268
  return file_storage_dir
2269

    
2270

    
2271
def CreateFileStorageDir(file_storage_dir):
2272
  """Create file storage directory.
2273

2274
  @type file_storage_dir: str
2275
  @param file_storage_dir: directory to create
2276

2277
  @rtype: tuple
2278
  @return: tuple with first element a boolean indicating wheter dir
2279
      creation was successful or not
2280

2281
  """
2282
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2283
  if os.path.exists(file_storage_dir):
2284
    if not os.path.isdir(file_storage_dir):
2285
      _Fail("Specified storage dir '%s' is not a directory",
2286
            file_storage_dir)
2287
  else:
2288
    try:
2289
      os.makedirs(file_storage_dir, 0750)
2290
    except OSError, err:
2291
      _Fail("Cannot create file storage directory '%s': %s",
2292
            file_storage_dir, err, exc=True)
2293

    
2294

    
2295
def RemoveFileStorageDir(file_storage_dir):
2296
  """Remove file storage directory.
2297

2298
  Remove it only if it's empty. If not log an error and return.
2299

2300
  @type file_storage_dir: str
2301
  @param file_storage_dir: the directory we should cleanup
2302
  @rtype: tuple (success,)
2303
  @return: tuple of one element, C{success}, denoting
2304
      whether the operation was successful
2305

2306
  """
2307
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2308
  if os.path.exists(file_storage_dir):
2309
    if not os.path.isdir(file_storage_dir):
2310
      _Fail("Specified Storage directory '%s' is not a directory",
2311
            file_storage_dir)
2312
    # deletes dir only if empty, otherwise we want to fail the rpc call
2313
    try:
2314
      os.rmdir(file_storage_dir)
2315
    except OSError, err:
2316
      _Fail("Cannot remove file storage directory '%s': %s",
2317
            file_storage_dir, err)
2318

    
2319

    
2320
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2321
  """Rename the file storage directory.
2322

2323
  @type old_file_storage_dir: str
2324
  @param old_file_storage_dir: the current path
2325
  @type new_file_storage_dir: str
2326
  @param new_file_storage_dir: the name we should rename to
2327
  @rtype: tuple (success,)
2328
  @return: tuple of one element, C{success}, denoting
2329
      whether the operation was successful
2330

2331
  """
2332
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2333
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2334
  if not os.path.exists(new_file_storage_dir):
2335
    if os.path.isdir(old_file_storage_dir):
2336
      try:
2337
        os.rename(old_file_storage_dir, new_file_storage_dir)
2338
      except OSError, err:
2339
        _Fail("Cannot rename '%s' to '%s': %s",
2340
              old_file_storage_dir, new_file_storage_dir, err)
2341
    else:
2342
      _Fail("Specified storage dir '%s' is not a directory",
2343
            old_file_storage_dir)
2344
  else:
2345
    if os.path.exists(old_file_storage_dir):
2346
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2347
            old_file_storage_dir, new_file_storage_dir)
2348

    
2349

    
2350
def _EnsureJobQueueFile(file_name):
2351
  """Checks whether the given filename is in the queue directory.
2352

2353
  @type file_name: str
2354
  @param file_name: the file name we should check
2355
  @rtype: None
2356
  @raises RPCFail: if the file is not valid
2357

2358
  """
2359
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2360
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2361

    
2362
  if not result:
2363
    _Fail("Passed job queue file '%s' does not belong to"
2364
          " the queue directory '%s'", file_name, queue_dir)
2365

    
2366

    
2367
def JobQueueUpdate(file_name, content):
2368
  """Updates a file in the queue directory.
2369

2370
  This is just a wrapper over L{utils.WriteFile}, with proper
2371
  checking.
2372

2373
  @type file_name: str
2374
  @param file_name: the job file name
2375
  @type content: str
2376
  @param content: the new job contents
2377
  @rtype: boolean
2378
  @return: the success of the operation
2379

2380
  """
2381
  _EnsureJobQueueFile(file_name)
2382

    
2383
  # Write and replace the file atomically
2384
  utils.WriteFile(file_name, data=_Decompress(content))
2385

    
2386

    
2387
def JobQueueRename(old, new):
2388
  """Renames a job queue file.
2389

2390
  This is just a wrapper over os.rename with proper checking.
2391

2392
  @type old: str
2393
  @param old: the old (actual) file name
2394
  @type new: str
2395
  @param new: the desired file name
2396
  @rtype: tuple
2397
  @return: the success of the operation and payload
2398

2399
  """
2400
  _EnsureJobQueueFile(old)
2401
  _EnsureJobQueueFile(new)
2402

    
2403
  utils.RenameFile(old, new, mkdir=True)
2404

    
2405

    
2406
def BlockdevClose(instance_name, disks):
2407
  """Closes the given block devices.
2408

2409
  This means they will be switched to secondary mode (in case of
2410
  DRBD).
2411

2412
  @param instance_name: if the argument is not empty, the symlinks
2413
      of this instance will be removed
2414
  @type disks: list of L{objects.Disk}
2415
  @param disks: the list of disks to be closed
2416
  @rtype: tuple (success, message)
2417
  @return: a tuple of success and message, where success
2418
      indicates the succes of the operation, and message
2419
      which will contain the error details in case we
2420
      failed
2421

2422
  """
2423
  bdevs = []
2424
  for cf in disks:
2425
    rd = _RecursiveFindBD(cf)
2426
    if rd is None:
2427
      _Fail("Can't find device %s", cf)
2428
    bdevs.append(rd)
2429

    
2430
  msg = []
2431
  for rd in bdevs:
2432
    try:
2433
      rd.Close()
2434
    except errors.BlockDeviceError, err:
2435
      msg.append(str(err))
2436
  if msg:
2437
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2438
  else:
2439
    if instance_name:
2440
      _RemoveBlockDevLinks(instance_name, disks)
2441

    
2442

    
2443
def ValidateHVParams(hvname, hvparams):
2444
  """Validates the given hypervisor parameters.
2445

2446
  @type hvname: string
2447
  @param hvname: the hypervisor name
2448
  @type hvparams: dict
2449
  @param hvparams: the hypervisor parameters to be validated
2450
  @rtype: None
2451

2452
  """
2453
  try:
2454
    hv_type = hypervisor.GetHypervisor(hvname)
2455
    hv_type.ValidateParameters(hvparams)
2456
  except errors.HypervisorError, err:
2457
    _Fail(str(err), log=False)
2458

    
2459

    
2460
def _CheckOSPList(os_obj, parameters):
2461
  """Check whether a list of parameters is supported by the OS.
2462

2463
  @type os_obj: L{objects.OS}
2464
  @param os_obj: OS object to check
2465
  @type parameters: list
2466
  @param parameters: the list of parameters to check
2467

2468
  """
2469
  supported = [v[0] for v in os_obj.supported_parameters]
2470
  delta = frozenset(parameters).difference(supported)
2471
  if delta:
2472
    _Fail("The following parameters are not supported"
2473
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2474

    
2475

    
2476
def ValidateOS(required, osname, checks, osparams):
2477
  """Validate the given OS' parameters.
2478

2479
  @type required: boolean
2480
  @param required: whether absence of the OS should translate into
2481
      failure or not
2482
  @type osname: string
2483
  @param osname: the OS to be validated
2484
  @type checks: list
2485
  @param checks: list of the checks to run (currently only 'parameters')
2486
  @type osparams: dict
2487
  @param osparams: dictionary with OS parameters
2488
  @rtype: boolean
2489
  @return: True if the validation passed, or False if the OS was not
2490
      found and L{required} was false
2491

2492
  """
2493
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2494
    _Fail("Unknown checks required for OS %s: %s", osname,
2495
          set(checks).difference(constants.OS_VALIDATE_CALLS))
2496

    
2497
  name_only = osname.split("+", 1)[0]
2498
  status, tbv = _TryOSFromDisk(name_only, None)
2499

    
2500
  if not status:
2501
    if required:
2502
      _Fail(tbv)
2503
    else:
2504
      return False
2505

    
2506
  if constants.OS_VALIDATE_PARAMETERS in checks:
2507
    _CheckOSPList(tbv, osparams.keys())
2508

    
2509
  validate_env = OSCoreEnv(tbv, osparams)
2510
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2511
                        cwd=tbv.path)
2512
  if result.failed:
2513
    logging.error("os validate command '%s' returned error: %s output: %s",
2514
                  result.cmd, result.fail_reason, result.output)
2515
    _Fail("OS validation script failed (%s), output: %s",
2516
          result.fail_reason, result.output, log=False)
2517

    
2518
  return True
2519

    
2520

    
2521
def DemoteFromMC():
2522
  """Demotes the current node from master candidate role.
2523

2524
  """
2525
  # try to ensure we're not the master by mistake
2526
  master, myself = ssconf.GetMasterAndMyself()
2527
  if master == myself:
2528
    _Fail("ssconf status shows I'm the master node, will not demote")
2529

    
2530
  result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2531
  if not result.failed:
2532
    _Fail("The master daemon is running, will not demote")
2533

    
2534
  try:
2535
    if os.path.isfile(constants.CLUSTER_CONF_FILE):
2536
      utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2537
  except EnvironmentError, err:
2538
    if err.errno != errno.ENOENT:
2539
      _Fail("Error while backing up cluster file: %s", err, exc=True)
2540

    
2541
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2542

    
2543

    
2544
def _GetX509Filenames(cryptodir, name):
2545
  """Returns the full paths for the private key and certificate.
2546

2547
  """
2548
  return (utils.PathJoin(cryptodir, name),
2549
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2550
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2551

    
2552

    
2553
def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2554
  """Creates a new X509 certificate for SSL/TLS.
2555

2556
  @type validity: int
2557
  @param validity: Validity in seconds
2558
  @rtype: tuple; (string, string)
2559
  @return: Certificate name and public part
2560

2561
  """
2562
  (key_pem, cert_pem) = \
2563
    utils.GenerateSelfSignedX509Cert(utils.HostInfo.SysName(),
2564
                                     min(validity, _MAX_SSL_CERT_VALIDITY))
2565

    
2566
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
2567
                              prefix="x509-%s-" % utils.TimestampForFilename())
2568
  try:
2569
    name = os.path.basename(cert_dir)
2570
    assert len(name) > 5
2571

    
2572
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2573

    
2574
    utils.WriteFile(key_file, mode=0400, data=key_pem)
2575
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2576

    
2577
    # Never return private key as it shouldn't leave the node
2578
    return (name, cert_pem)
2579
  except Exception:
2580
    shutil.rmtree(cert_dir, ignore_errors=True)
2581
    raise
2582

    
2583

    
2584
def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2585
  """Removes a X509 certificate.
2586

2587
  @type name: string
2588
  @param name: Certificate name
2589

2590
  """
2591
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2592

    
2593
  utils.RemoveFile(key_file)
2594
  utils.RemoveFile(cert_file)
2595

    
2596
  try:
2597
    os.rmdir(cert_dir)
2598
  except EnvironmentError, err:
2599
    _Fail("Cannot remove certificate directory '%s': %s",
2600
          cert_dir, err)
2601

    
2602

    
2603
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2604
  """Returns the command for the requested input/output.
2605

2606
  @type instance: L{objects.Instance}
2607
  @param instance: The instance object
2608
  @param mode: Import/export mode
2609
  @param ieio: Input/output type
2610
  @param ieargs: Input/output arguments
2611

2612
  """
2613
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2614

    
2615
  env = None
2616
  prefix = None
2617
  suffix = None
2618
  exp_size = None
2619

    
2620
  if ieio == constants.IEIO_FILE:
2621
    (filename, ) = ieargs
2622

    
2623
    if not utils.IsNormAbsPath(filename):
2624
      _Fail("Path '%s' is not normalized or absolute", filename)
2625

    
2626
    directory = os.path.normpath(os.path.dirname(filename))
2627

    
2628
    if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2629
        constants.EXPORT_DIR):
2630
      _Fail("File '%s' is not under exports directory '%s'",
2631
            filename, constants.EXPORT_DIR)
2632

    
2633
    # Create directory
2634
    utils.Makedirs(directory, mode=0750)
2635

    
2636
    quoted_filename = utils.ShellQuote(filename)
2637

    
2638
    if mode == constants.IEM_IMPORT:
2639
      suffix = "> %s" % quoted_filename
2640
    elif mode == constants.IEM_EXPORT:
2641
      suffix = "< %s" % quoted_filename
2642

    
2643
      # Retrieve file size
2644
      try:
2645
        st = os.stat(filename)
2646
      except EnvironmentError, err:
2647
        logging.error("Can't stat(2) %s: %s", filename, err)
2648
      else:
2649
        exp_size = utils.BytesToMebibyte(st.st_size)
2650

    
2651
  elif ieio == constants.IEIO_RAW_DISK:
2652
    (disk, ) = ieargs
2653

    
2654
    real_disk = _OpenRealBD(disk)
2655

    
2656
    if mode == constants.IEM_IMPORT:
2657
      # we set here a smaller block size as, due to transport buffering, more
2658
      # than 64-128k will mostly ignored; we use nocreat to fail if the device
2659
      # is not already there or we pass a wrong path; we use notrunc to no
2660
      # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2661
      # much memory; this means that at best, we flush every 64k, which will
2662
      # not be very fast
2663
      suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2664
                                    " bs=%s oflag=dsync"),
2665
                                    real_disk.dev_path,
2666
                                    str(64 * 1024))
2667

    
2668
    elif mode == constants.IEM_EXPORT:
2669
      # the block size on the read dd is 1MiB to match our units
2670
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2671
                                   real_disk.dev_path,
2672
                                   str(1024 * 1024), # 1 MB
2673
                                   str(disk.size))
2674
      exp_size = disk.size
2675

    
2676
  elif ieio == constants.IEIO_SCRIPT:
2677
    (disk, disk_index, ) = ieargs
2678

    
2679
    assert isinstance(disk_index, (int, long))
2680

    
2681
    real_disk = _OpenRealBD(disk)
2682

    
2683
    inst_os = OSFromDisk(instance.os)
2684
    env = OSEnvironment(instance, inst_os)
2685

    
2686
    if mode == constants.IEM_IMPORT:
2687
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2688
      env["IMPORT_INDEX"] = str(disk_index)
2689
      script = inst_os.import_script
2690

    
2691
    elif mode == constants.IEM_EXPORT:
2692
      env["EXPORT_DEVICE"] = real_disk.dev_path
2693
      env["EXPORT_INDEX"] = str(disk_index)
2694
      script = inst_os.export_script
2695

    
2696
    # TODO: Pass special environment only to script
2697
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2698

    
2699
    if mode == constants.IEM_IMPORT:
2700
      suffix = "| %s" % script_cmd
2701

    
2702
    elif mode == constants.IEM_EXPORT:
2703
      prefix = "%s |" % script_cmd
2704

    
2705
    # Let script predict size
2706
    exp_size = constants.IE_CUSTOM_SIZE
2707

    
2708
  else:
2709
    _Fail("Invalid %s I/O mode %r", mode, ieio)
2710

    
2711
  return (env, prefix, suffix, exp_size)
2712

    
2713

    
2714
def _CreateImportExportStatusDir(prefix):
2715
  """Creates status directory for import/export.
2716

2717
  """
2718
  return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2719
                          prefix=("%s-%s-" %
2720
                                  (prefix, utils.TimestampForFilename())))
2721

    
2722

    
2723
def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
2724
  """Starts an import or export daemon.
2725

2726
  @param mode: Import/output mode
2727
  @type opts: L{objects.ImportExportOptions}
2728
  @param opts: Daemon options
2729
  @type host: string
2730
  @param host: Remote host for export (None for import)
2731
  @type port: int
2732
  @param port: Remote port for export (None for import)
2733
  @type instance: L{objects.Instance}
2734
  @param instance: Instance object
2735
  @param ieio: Input/output type
2736
  @param ieioargs: Input/output arguments
2737

2738
  """
2739
  if mode == constants.IEM_IMPORT:
2740
    prefix = "import"
2741

    
2742
    if not (host is None and port is None):
2743
      _Fail("Can not specify host or port on import")
2744

    
2745
  elif mode == constants.IEM_EXPORT:
2746
    prefix = "export"
2747

    
2748
    if host is None or port is None:
2749
      _Fail("Host and port must be specified for an export")
2750

    
2751
  else:
2752
    _Fail("Invalid mode %r", mode)
2753

    
2754
  if (opts.key_name is None) ^ (opts.ca_pem is None):
2755
    _Fail("Cluster certificate can only be used for both key and CA")
2756

    
2757
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
2758
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
2759

    
2760
  if opts.key_name is None:
2761
    # Use server.pem
2762
    key_path = constants.NODED_CERT_FILE
2763
    cert_path = constants.NODED_CERT_FILE
2764
    assert opts.ca_pem is None
2765
  else:
2766
    (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
2767
                                                 opts.key_name)
2768
    assert opts.ca_pem is not None
2769

    
2770
  for i in [key_path, cert_path]:
2771
    if not os.path.exists(i):
2772
      _Fail("File '%s' does not exist" % i)
2773

    
2774
  status_dir = _CreateImportExportStatusDir(prefix)
2775
  try:
2776
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
2777
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
2778
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
2779

    
2780
    if opts.ca_pem is None:
2781
      # Use server.pem
2782
      ca = utils.ReadFile(constants.NODED_CERT_FILE)
2783
    else:
2784
      ca = opts.ca_pem
2785

    
2786
    # Write CA file
2787
    utils.WriteFile(ca_file, data=ca, mode=0400)
2788

    
2789
    cmd = [
2790
      constants.IMPORT_EXPORT_DAEMON,
2791
      status_file, mode,
2792
      "--key=%s" % key_path,
2793
      "--cert=%s" % cert_path,
2794
      "--ca=%s" % ca_file,
2795
      ]
2796

    
2797
    if host:
2798
      cmd.append("--host=%s" % host)
2799

    
2800
    if port:
2801
      cmd.append("--port=%s" % port)
2802

    
2803
    if opts.compress:
2804
      cmd.append("--compress=%s" % opts.compress)
2805

    
2806
    if opts.magic:
2807
      cmd.append("--magic=%s" % opts.magic)
2808

    
2809
    if exp_size is not None:
2810
      cmd.append("--expected-size=%s" % exp_size)
2811

    
2812
    if cmd_prefix:
2813
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
2814

    
2815
    if cmd_suffix:
2816
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
2817

    
2818
    logfile = _InstanceLogName(prefix, instance.os, instance.name)
2819

    
2820
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
2821
    # support for receiving a file descriptor for output
2822
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
2823
                      output=logfile)
2824

    
2825
    # The import/export name is simply the status directory name
2826
    return os.path.basename(status_dir)
2827

    
2828
  except Exception:
2829
    shutil.rmtree(status_dir, ignore_errors=True)
2830
    raise
2831

    
2832

    
2833
def GetImportExportStatus(names):
2834
  """Returns import/export daemon status.
2835

2836
  @type names: sequence
2837
  @param names: List of names
2838
  @rtype: List of dicts
2839
  @return: Returns a list of the state of each named import/export or None if a
2840
           status couldn't be read
2841

2842
  """
2843
  result = []
2844

    
2845
  for name in names:
2846
    status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
2847
                                 _IES_STATUS_FILE)
2848

    
2849
    try:
2850
      data = utils.ReadFile(status_file)
2851
    except EnvironmentError, err:
2852
      if err.errno != errno.ENOENT:
2853
        raise
2854
      data = None
2855

    
2856
    if not data:
2857
      result.append(None)
2858
      continue
2859

    
2860
    result.append(serializer.LoadJson(data))
2861

    
2862
  return result
2863

    
2864

    
2865
def AbortImportExport(name):
2866
  """Sends SIGTERM to a running import/export daemon.
2867

2868
  """
2869
  logging.info("Abort import/export %s", name)
2870

    
2871
  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2872
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2873

    
2874
  if pid:
2875
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
2876
                 name, pid)
2877
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
2878

    
2879

    
2880
def CleanupImportExport(name):
2881
  """Cleanup after an import or export.
2882

2883
  If the import/export daemon is still running it's killed. Afterwards the
2884
  whole status directory is removed.
2885

2886
  """
2887
  logging.info("Finalizing import/export %s", name)
2888

    
2889
  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2890

    
2891
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2892

    
2893
  if pid:
2894
    logging.info("Import/export %s is still running with PID %s",
2895
                 name, pid)
2896
    utils.KillProcess(pid, waitpid=False)
2897

    
2898
  shutil.rmtree(status_dir, ignore_errors=True)
2899

    
2900

    
2901
def _FindDisks(nodes_ip, disks):
2902
  """Sets the physical ID on disks and returns the block devices.
2903

2904
  """
2905
  # set the correct physical ID
2906
  my_name = utils.HostInfo().name
2907
  for cf in disks:
2908
    cf.SetPhysicalID(my_name, nodes_ip)
2909

    
2910
  bdevs = []
2911

    
2912
  for cf in disks:
2913
    rd = _RecursiveFindBD(cf)
2914
    if rd is None:
2915
      _Fail("Can't find device %s", cf)
2916
    bdevs.append(rd)
2917
  return bdevs
2918

    
2919

    
2920
def DrbdDisconnectNet(nodes_ip, disks):
2921
  """Disconnects the network on a list of drbd devices.
2922

2923
  """
2924
  bdevs = _FindDisks(nodes_ip, disks)
2925

    
2926
  # disconnect disks
2927
  for rd in bdevs:
2928
    try:
2929
      rd.DisconnectNet()
2930
    except errors.BlockDeviceError, err:
2931
      _Fail("Can't change network configuration to standalone mode: %s",
2932
            err, exc=True)
2933

    
2934

    
2935
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2936
  """Attaches the network on a list of drbd devices.
2937

2938
  """
2939
  bdevs = _FindDisks(nodes_ip, disks)
2940

    
2941
  if multimaster:
2942
    for idx, rd in enumerate(bdevs):
2943
      try:
2944
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2945
      except EnvironmentError, err:
2946
        _Fail("Can't create symlink: %s", err)
2947
  # reconnect disks, switch to new master configuration and if
2948
  # needed primary mode
2949
  for rd in bdevs:
2950
    try:
2951
      rd.AttachNet(multimaster)
2952
    except errors.BlockDeviceError, err:
2953
      _Fail("Can't change network configuration: %s", err)
2954

    
2955
  # wait until the disks are connected; we need to retry the re-attach
2956
  # if the device becomes standalone, as this might happen if the one
2957
  # node disconnects and reconnects in a different mode before the
2958
  # other node reconnects; in this case, one or both of the nodes will
2959
  # decide it has wrong configuration and switch to standalone
2960

    
2961
  def _Attach():
2962
    all_connected = True
2963

    
2964
    for rd in bdevs:
2965
      stats = rd.GetProcStatus()
2966

    
2967
      all_connected = (all_connected and
2968
                       (stats.is_connected or stats.is_in_resync))
2969

    
2970
      if stats.is_standalone:
2971
        # peer had different config info and this node became
2972
        # standalone, even though this should not happen with the
2973
        # new staged way of changing disk configs
2974
        try:
2975
          rd.AttachNet(multimaster)
2976
        except errors.BlockDeviceError, err:
2977
          _Fail("Can't change network configuration: %s", err)
2978

    
2979
    if not all_connected:
2980
      raise utils.RetryAgain()
2981

    
2982
  try:
2983
    # Start with a delay of 100 miliseconds and go up to 5 seconds
2984
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
2985
  except utils.RetryTimeout:
2986
    _Fail("Timeout in disk reconnecting")
2987

    
2988
  if multimaster:
2989
    # change to primary mode
2990
    for rd in bdevs:
2991
      try:
2992
        rd.Open()
2993
      except errors.BlockDeviceError, err:
2994
        _Fail("Can't change to primary mode: %s", err)
2995

    
2996

    
2997
def DrbdWaitSync(nodes_ip, disks):
2998
  """Wait until DRBDs have synchronized.
2999

3000
  """
3001
  def _helper(rd):
3002
    stats = rd.GetProcStatus()
3003
    if not (stats.is_connected or stats.is_in_resync):
3004
      raise utils.RetryAgain()
3005
    return stats
3006

    
3007
  bdevs = _FindDisks(nodes_ip, disks)
3008

    
3009
  min_resync = 100
3010
  alldone = True
3011
  for rd in bdevs:
3012
    try:
3013
      # poll each second for 15 seconds
3014
      stats = utils.Retry(_helper, 1, 15, args=[rd])
3015
    except utils.RetryTimeout:
3016
      stats = rd.GetProcStatus()
3017
      # last check
3018
      if not (stats.is_connected or stats.is_in_resync):
3019
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3020
    alldone = alldone and (not stats.is_in_resync)
3021
    if stats.sync_percent is not None:
3022
      min_resync = min(min_resync, stats.sync_percent)
3023

    
3024
  return (alldone, min_resync)
3025

    
3026

    
3027
def PowercycleNode(hypervisor_type):
3028
  """Hard-powercycle the node.
3029

3030
  Because we need to return first, and schedule the powercycle in the
3031
  background, we won't be able to report failures nicely.
3032

3033
  """
3034
  hyper = hypervisor.GetHypervisor(hypervisor_type)
3035
  try:
3036
    pid = os.fork()
3037
  except OSError:
3038
    # if we can't fork, we'll pretend that we're in the child process
3039
    pid = 0
3040
  if pid > 0:
3041
    return "Reboot scheduled in 5 seconds"
3042
  # ensure the child is running on ram
3043
  try:
3044
    utils.Mlockall()
3045
  except Exception: # pylint: disable-msg=W0703
3046
    pass
3047
  time.sleep(5)
3048
  hyper.PowercycleNode()
3049

    
3050

    
3051
class HooksRunner(object):
3052
  """Hook runner.
3053

3054
  This class is instantiated on the node side (ganeti-noded) and not
3055
  on the master side.
3056

3057
  """
3058
  def __init__(self, hooks_base_dir=None):
3059
    """Constructor for hooks runner.
3060

3061
    @type hooks_base_dir: str or None
3062
    @param hooks_base_dir: if not None, this overrides the
3063
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
3064

3065
    """
3066
    if hooks_base_dir is None:
3067
      hooks_base_dir = constants.HOOKS_BASE_DIR
3068
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
3069
    # constant
3070
    self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
3071

    
3072
  def RunHooks(self, hpath, phase, env):
3073
    """Run the scripts in the hooks directory.
3074

3075
    @type hpath: str
3076
    @param hpath: the path to the hooks directory which
3077
        holds the scripts
3078
    @type phase: str
3079
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
3080
        L{constants.HOOKS_PHASE_POST}
3081
    @type env: dict
3082
    @param env: dictionary with the environment for the hook
3083
    @rtype: list
3084
    @return: list of 3-element tuples:
3085
      - script path
3086
      - script result, either L{constants.HKR_SUCCESS} or
3087
        L{constants.HKR_FAIL}
3088
      - output of the script
3089

3090
    @raise errors.ProgrammerError: for invalid input
3091
        parameters
3092

3093
    """
3094
    if phase == constants.HOOKS_PHASE_PRE:
3095
      suffix = "pre"
3096
    elif phase == constants.HOOKS_PHASE_POST:
3097
      suffix = "post"
3098
    else:
3099
      _Fail("Unknown hooks phase '%s'", phase)
3100

    
3101

    
3102
    subdir = "%s-%s.d" % (hpath, suffix)
3103
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3104

    
3105
    results = []
3106

    
3107
    if not os.path.isdir(dir_name):
3108
      # for non-existing/non-dirs, we simply exit instead of logging a
3109
      # warning at every operation
3110
      return results
3111

    
3112
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3113

    
3114
    for (relname, relstatus, runresult)  in runparts_results:
3115
      if relstatus == constants.RUNPARTS_SKIP:
3116
        rrval = constants.HKR_SKIP
3117
        output = ""
3118
      elif relstatus == constants.RUNPARTS_ERR:
3119
        rrval = constants.HKR_FAIL
3120
        output = "Hook script execution error: %s" % runresult
3121
      elif relstatus == constants.RUNPARTS_RUN:
3122
        if runresult.failed:
3123
          rrval = constants.HKR_FAIL
3124
        else:
3125
          rrval = constants.HKR_SUCCESS
3126
        output = utils.SafeEncode(runresult.output.strip())
3127
      results.append(("%s/%s" % (subdir, relname), rrval, output))
3128

    
3129
    return results
3130

    
3131

    
3132
class IAllocatorRunner(object):
3133
  """IAllocator runner.
3134

3135
  This class is instantiated on the node side (ganeti-noded) and not on
3136
  the master side.
3137

3138
  """
3139
  @staticmethod
3140
  def Run(name, idata):
3141
    """Run an iallocator script.
3142

3143
    @type name: str
3144
    @param name: the iallocator script name
3145
    @type idata: str
3146
    @param idata: the allocator input data
3147

3148
    @rtype: tuple
3149
    @return: two element tuple of:
3150
       - status
3151
       - either error message or stdout of allocator (for success)
3152

3153
    """
3154
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3155
                                  os.path.isfile)
3156
    if alloc_script is None:
3157
      _Fail("iallocator module '%s' not found in the search path", name)
3158

    
3159
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3160
    try:
3161
      os.write(fd, idata)
3162
      os.close(fd)
3163
      result = utils.RunCmd([alloc_script, fin_name])
3164
      if result.failed:
3165
        _Fail("iallocator module '%s' failed: %s, output '%s'",
3166
              name, result.fail_reason, result.output)
3167
    finally:
3168
      os.unlink(fin_name)
3169

    
3170
    return result.stdout
3171

    
3172

    
3173
class DevCacheManager(object):
3174
  """Simple class for managing a cache of block device information.
3175

3176
  """
3177
  _DEV_PREFIX = "/dev/"
3178
  _ROOT_DIR = constants.BDEV_CACHE_DIR
3179

    
3180
  @classmethod
3181
  def _ConvertPath(cls, dev_path):
3182
    """Converts a /dev/name path to the cache file name.
3183

3184
    This replaces slashes with underscores and strips the /dev
3185
    prefix. It then returns the full path to the cache file.
3186

3187
    @type dev_path: str
3188
    @param dev_path: the C{/dev/} path name
3189
    @rtype: str
3190
    @return: the converted path name
3191

3192
    """
3193
    if dev_path.startswith(cls._DEV_PREFIX):
3194
      dev_path = dev_path[len(cls._DEV_PREFIX):]
3195
    dev_path = dev_path.replace("/", "_")
3196
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3197
    return fpath
3198

    
3199
  @classmethod
3200
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3201
    """Updates the cache information for a given device.
3202

3203
    @type dev_path: str
3204
    @param dev_path: the pathname of the device
3205
    @type owner: str
3206
    @param owner: the owner (instance name) of the device
3207
    @type on_primary: bool
3208
    @param on_primary: whether this is the primary
3209
        node nor not
3210
    @type iv_name: str
3211
    @param iv_name: the instance-visible name of the
3212
        device, as in objects.Disk.iv_name
3213

3214
    @rtype: None
3215

3216
    """
3217
    if dev_path is None:
3218
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
3219
      return
3220
    fpath = cls._ConvertPath(dev_path)
3221
    if on_primary:
3222
      state = "primary"
3223
    else:
3224
      state = "secondary"
3225
    if iv_name is None:
3226
      iv_name = "not_visible"
3227
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3228
    try:
3229
      utils.WriteFile(fpath, data=fdata)
3230
    except EnvironmentError, err:
3231
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3232

    
3233
  @classmethod
3234
  def RemoveCache(cls, dev_path):
3235
    """Remove data for a dev_path.
3236

3237
    This is just a wrapper over L{utils.RemoveFile} with a converted
3238
    path name and logging.
3239

3240
    @type dev_path: str
3241
    @param dev_path: the pathname of the device
3242

3243
    @rtype: None
3244

3245
    """
3246
    if dev_path is None:
3247
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
3248
      return
3249
    fpath = cls._ConvertPath(dev_path)
3250
    try:
3251
      utils.RemoveFile(fpath)
3252
    except EnvironmentError, err:
3253
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)