Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ d722af8b

History | View | Annotate | Download (144.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable=E1103,C0302
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37
# C0302: This module has become too big and should be split up
38

    
39

    
40
import os
41
import os.path
42
import shutil
43
import time
44
import stat
45
import errno
46
import re
47
import random
48
import logging
49
import tempfile
50
import zlib
51
import base64
52
import signal
53

    
54
from ganeti import errors
55
from ganeti import utils
56
from ganeti import ssh
57
from ganeti import hypervisor
58
from ganeti import constants
59
from ganeti.storage import bdev
60
from ganeti.storage import drbd
61
from ganeti.storage import filestorage
62
from ganeti import objects
63
from ganeti import ssconf
64
from ganeti import serializer
65
from ganeti import netutils
66
from ganeti import runtime
67
from ganeti import compat
68
from ganeti import pathutils
69
from ganeti import vcluster
70
from ganeti import ht
71
from ganeti.storage.base import BlockDev
72
from ganeti.storage.drbd import DRBD8
73
from ganeti import hooksmaster
74

    
75

    
76
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
77
_ALLOWED_CLEAN_DIRS = compat.UniqueFrozenset([
78
  pathutils.DATA_DIR,
79
  pathutils.JOB_QUEUE_ARCHIVE_DIR,
80
  pathutils.QUEUE_DIR,
81
  pathutils.CRYPTO_KEYS_DIR,
82
  ])
83
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
84
_X509_KEY_FILE = "key"
85
_X509_CERT_FILE = "cert"
86
_IES_STATUS_FILE = "status"
87
_IES_PID_FILE = "pid"
88
_IES_CA_FILE = "ca"
89

    
90
#: Valid LVS output line regex
91
_LVSLINE_REGEX = re.compile(r"^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$")
92

    
93
# Actions for the master setup script
94
_MASTER_START = "start"
95
_MASTER_STOP = "stop"
96

    
97
#: Maximum file permissions for restricted command directory and executables
98
_RCMD_MAX_MODE = (stat.S_IRWXU |
99
                  stat.S_IRGRP | stat.S_IXGRP |
100
                  stat.S_IROTH | stat.S_IXOTH)
101

    
102
#: Delay before returning an error for restricted commands
103
_RCMD_INVALID_DELAY = 10
104

    
105
#: How long to wait to acquire lock for restricted commands (shorter than
106
#: L{_RCMD_INVALID_DELAY}) to reduce blockage of noded forks when many
107
#: command requests arrive
108
_RCMD_LOCK_TIMEOUT = _RCMD_INVALID_DELAY * 0.8
109

    
110

    
111
class RPCFail(Exception):
112
  """Class denoting RPC failure.
113

114
  Its argument is the error message.
115

116
  """
117

    
118

    
119
def _GetInstReasonFilename(instance_name):
120
  """Path of the file containing the reason of the instance status change.
121

122
  @type instance_name: string
123
  @param instance_name: The name of the instance
124
  @rtype: string
125
  @return: The path of the file
126

127
  """
128
  return utils.PathJoin(pathutils.INSTANCE_REASON_DIR, instance_name)
129

    
130

    
131
def _StoreInstReasonTrail(instance_name, trail):
132
  """Serialize a reason trail related to an instance change of state to file.
133

134
  The exact location of the file depends on the name of the instance and on
135
  the configuration of the Ganeti cluster defined at deploy time.
136

137
  @type instance_name: string
138
  @param instance_name: The name of the instance
139
  @rtype: None
140

141
  """
142
  json = serializer.DumpJson(trail)
143
  filename = _GetInstReasonFilename(instance_name)
144
  utils.WriteFile(filename, data=json)
145

    
146

    
147
def _Fail(msg, *args, **kwargs):
148
  """Log an error and the raise an RPCFail exception.
149

150
  This exception is then handled specially in the ganeti daemon and
151
  turned into a 'failed' return type. As such, this function is a
152
  useful shortcut for logging the error and returning it to the master
153
  daemon.
154

155
  @type msg: string
156
  @param msg: the text of the exception
157
  @raise RPCFail
158

159
  """
160
  if args:
161
    msg = msg % args
162
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
163
    if "exc" in kwargs and kwargs["exc"]:
164
      logging.exception(msg)
165
    else:
166
      logging.error(msg)
167
  raise RPCFail(msg)
168

    
169

    
170
def _GetConfig():
171
  """Simple wrapper to return a SimpleStore.
172

173
  @rtype: L{ssconf.SimpleStore}
174
  @return: a SimpleStore instance
175

176
  """
177
  return ssconf.SimpleStore()
178

    
179

    
180
def _GetSshRunner(cluster_name):
181
  """Simple wrapper to return an SshRunner.
182

183
  @type cluster_name: str
184
  @param cluster_name: the cluster name, which is needed
185
      by the SshRunner constructor
186
  @rtype: L{ssh.SshRunner}
187
  @return: an SshRunner instance
188

189
  """
190
  return ssh.SshRunner(cluster_name)
191

    
192

    
193
def _Decompress(data):
194
  """Unpacks data compressed by the RPC client.
195

196
  @type data: list or tuple
197
  @param data: Data sent by RPC client
198
  @rtype: str
199
  @return: Decompressed data
200

201
  """
202
  assert isinstance(data, (list, tuple))
203
  assert len(data) == 2
204
  (encoding, content) = data
205
  if encoding == constants.RPC_ENCODING_NONE:
206
    return content
207
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
208
    return zlib.decompress(base64.b64decode(content))
209
  else:
210
    raise AssertionError("Unknown data encoding")
211

    
212

    
213
def _CleanDirectory(path, exclude=None):
214
  """Removes all regular files in a directory.
215

216
  @type path: str
217
  @param path: the directory to clean
218
  @type exclude: list
219
  @param exclude: list of files to be excluded, defaults
220
      to the empty list
221

222
  """
223
  if path not in _ALLOWED_CLEAN_DIRS:
224
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
225
          path)
226

    
227
  if not os.path.isdir(path):
228
    return
229
  if exclude is None:
230
    exclude = []
231
  else:
232
    # Normalize excluded paths
233
    exclude = [os.path.normpath(i) for i in exclude]
234

    
235
  for rel_name in utils.ListVisibleFiles(path):
236
    full_name = utils.PathJoin(path, rel_name)
237
    if full_name in exclude:
238
      continue
239
    if os.path.isfile(full_name) and not os.path.islink(full_name):
240
      utils.RemoveFile(full_name)
241

    
242

    
243
def _BuildUploadFileList():
244
  """Build the list of allowed upload files.
245

246
  This is abstracted so that it's built only once at module import time.
247

248
  """
249
  allowed_files = set([
250
    pathutils.CLUSTER_CONF_FILE,
251
    pathutils.ETC_HOSTS,
252
    pathutils.SSH_KNOWN_HOSTS_FILE,
253
    pathutils.VNC_PASSWORD_FILE,
254
    pathutils.RAPI_CERT_FILE,
255
    pathutils.SPICE_CERT_FILE,
256
    pathutils.SPICE_CACERT_FILE,
257
    pathutils.RAPI_USERS_FILE,
258
    pathutils.CONFD_HMAC_KEY,
259
    pathutils.CLUSTER_DOMAIN_SECRET_FILE,
260
    ])
261

    
262
  for hv_name in constants.HYPER_TYPES:
263
    hv_class = hypervisor.GetHypervisorClass(hv_name)
264
    allowed_files.update(hv_class.GetAncillaryFiles()[0])
265

    
266
  assert pathutils.FILE_STORAGE_PATHS_FILE not in allowed_files, \
267
    "Allowed file storage paths should never be uploaded via RPC"
268

    
269
  return frozenset(allowed_files)
270

    
271

    
272
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
273

    
274

    
275
def JobQueuePurge():
276
  """Removes job queue files and archived jobs.
277

278
  @rtype: tuple
279
  @return: True, None
280

281
  """
282
  _CleanDirectory(pathutils.QUEUE_DIR, exclude=[pathutils.JOB_QUEUE_LOCK_FILE])
283
  _CleanDirectory(pathutils.JOB_QUEUE_ARCHIVE_DIR)
284

    
285

    
286
def GetMasterInfo():
287
  """Returns master information.
288

289
  This is an utility function to compute master information, either
290
  for consumption here or from the node daemon.
291

292
  @rtype: tuple
293
  @return: master_netdev, master_ip, master_name, primary_ip_family,
294
    master_netmask
295
  @raise RPCFail: in case of errors
296

297
  """
298
  try:
299
    cfg = _GetConfig()
300
    master_netdev = cfg.GetMasterNetdev()
301
    master_ip = cfg.GetMasterIP()
302
    master_netmask = cfg.GetMasterNetmask()
303
    master_node = cfg.GetMasterNode()
304
    primary_ip_family = cfg.GetPrimaryIPFamily()
305
  except errors.ConfigurationError, err:
306
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
307
  return (master_netdev, master_ip, master_node, primary_ip_family,
308
          master_netmask)
309

    
310

    
311
def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
312
  """Decorator that runs hooks before and after the decorated function.
313

314
  @type hook_opcode: string
315
  @param hook_opcode: opcode of the hook
316
  @type hooks_path: string
317
  @param hooks_path: path of the hooks
318
  @type env_builder_fn: function
319
  @param env_builder_fn: function that returns a dictionary containing the
320
    environment variables for the hooks. Will get all the parameters of the
321
    decorated function.
322
  @raise RPCFail: in case of pre-hook failure
323

324
  """
325
  def decorator(fn):
326
    def wrapper(*args, **kwargs):
327
      _, myself = ssconf.GetMasterAndMyself()
328
      nodes = ([myself], [myself])  # these hooks run locally
329

    
330
      env_fn = compat.partial(env_builder_fn, *args, **kwargs)
331

    
332
      cfg = _GetConfig()
333
      hr = HooksRunner()
334
      hm = hooksmaster.HooksMaster(hook_opcode, hooks_path, nodes,
335
                                   hr.RunLocalHooks, None, env_fn,
336
                                   logging.warning, cfg.GetClusterName(),
337
                                   cfg.GetMasterNode())
338
      hm.RunPhase(constants.HOOKS_PHASE_PRE)
339
      result = fn(*args, **kwargs)
340
      hm.RunPhase(constants.HOOKS_PHASE_POST)
341

    
342
      return result
343
    return wrapper
344
  return decorator
345

    
346

    
347
def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
348
  """Builds environment variables for master IP hooks.
349

350
  @type master_params: L{objects.MasterNetworkParameters}
351
  @param master_params: network parameters of the master
352
  @type use_external_mip_script: boolean
353
  @param use_external_mip_script: whether to use an external master IP
354
    address setup script (unused, but necessary per the implementation of the
355
    _RunLocalHooks decorator)
356

357
  """
358
  # pylint: disable=W0613
359
  ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
360
  env = {
361
    "MASTER_NETDEV": master_params.netdev,
362
    "MASTER_IP": master_params.ip,
363
    "MASTER_NETMASK": str(master_params.netmask),
364
    "CLUSTER_IP_VERSION": str(ver),
365
  }
366

    
367
  return env
368

    
369

    
370
def _RunMasterSetupScript(master_params, action, use_external_mip_script):
371
  """Execute the master IP address setup script.
372

373
  @type master_params: L{objects.MasterNetworkParameters}
374
  @param master_params: network parameters of the master
375
  @type action: string
376
  @param action: action to pass to the script. Must be one of
377
    L{backend._MASTER_START} or L{backend._MASTER_STOP}
378
  @type use_external_mip_script: boolean
379
  @param use_external_mip_script: whether to use an external master IP
380
    address setup script
381
  @raise backend.RPCFail: if there are errors during the execution of the
382
    script
383

384
  """
385
  env = _BuildMasterIpEnv(master_params)
386

    
387
  if use_external_mip_script:
388
    setup_script = pathutils.EXTERNAL_MASTER_SETUP_SCRIPT
389
  else:
390
    setup_script = pathutils.DEFAULT_MASTER_SETUP_SCRIPT
391

    
392
  result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
393

    
394
  if result.failed:
395
    _Fail("Failed to %s the master IP. Script return value: %s, output: '%s'" %
396
          (action, result.exit_code, result.output), log=True)
397

    
398

    
399
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
400
               _BuildMasterIpEnv)
401
def ActivateMasterIp(master_params, use_external_mip_script):
402
  """Activate the IP address of the master daemon.
403

404
  @type master_params: L{objects.MasterNetworkParameters}
405
  @param master_params: network parameters of the master
406
  @type use_external_mip_script: boolean
407
  @param use_external_mip_script: whether to use an external master IP
408
    address setup script
409
  @raise RPCFail: in case of errors during the IP startup
410

411
  """
412
  _RunMasterSetupScript(master_params, _MASTER_START,
413
                        use_external_mip_script)
414

    
415

    
416
def StartMasterDaemons(no_voting):
417
  """Activate local node as master node.
418

419
  The function will start the master daemons (ganeti-masterd and ganeti-rapi).
420

421
  @type no_voting: boolean
422
  @param no_voting: whether to start ganeti-masterd without a node vote
423
      but still non-interactively
424
  @rtype: None
425

426
  """
427

    
428
  if no_voting:
429
    masterd_args = "--no-voting --yes-do-it"
430
  else:
431
    masterd_args = ""
432

    
433
  env = {
434
    "EXTRA_MASTERD_ARGS": masterd_args,
435
    }
436

    
437
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
438
  if result.failed:
439
    msg = "Can't start Ganeti master: %s" % result.output
440
    logging.error(msg)
441
    _Fail(msg)
442

    
443

    
444
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
445
               _BuildMasterIpEnv)
446
def DeactivateMasterIp(master_params, use_external_mip_script):
447
  """Deactivate the master IP on this node.
448

449
  @type master_params: L{objects.MasterNetworkParameters}
450
  @param master_params: network parameters of the master
451
  @type use_external_mip_script: boolean
452
  @param use_external_mip_script: whether to use an external master IP
453
    address setup script
454
  @raise RPCFail: in case of errors during the IP turndown
455

456
  """
457
  _RunMasterSetupScript(master_params, _MASTER_STOP,
458
                        use_external_mip_script)
459

    
460

    
461
def StopMasterDaemons():
462
  """Stop the master daemons on this node.
463

464
  Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
465

466
  @rtype: None
467

468
  """
469
  # TODO: log and report back to the caller the error failures; we
470
  # need to decide in which case we fail the RPC for this
471

    
472
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
473
  if result.failed:
474
    logging.error("Could not stop Ganeti master, command %s had exitcode %s"
475
                  " and error %s",
476
                  result.cmd, result.exit_code, result.output)
477

    
478

    
479
def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
480
  """Change the netmask of the master IP.
481

482
  @param old_netmask: the old value of the netmask
483
  @param netmask: the new value of the netmask
484
  @param master_ip: the master IP
485
  @param master_netdev: the master network device
486

487
  """
488
  if old_netmask == netmask:
489
    return
490

    
491
  if not netutils.IPAddress.Own(master_ip):
492
    _Fail("The master IP address is not up, not attempting to change its"
493
          " netmask")
494

    
495
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
496
                         "%s/%s" % (master_ip, netmask),
497
                         "dev", master_netdev, "label",
498
                         "%s:0" % master_netdev])
499
  if result.failed:
500
    _Fail("Could not set the new netmask on the master IP address")
501

    
502
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
503
                         "%s/%s" % (master_ip, old_netmask),
504
                         "dev", master_netdev, "label",
505
                         "%s:0" % master_netdev])
506
  if result.failed:
507
    _Fail("Could not bring down the master IP address with the old netmask")
508

    
509

    
510
def EtcHostsModify(mode, host, ip):
511
  """Modify a host entry in /etc/hosts.
512

513
  @param mode: The mode to operate. Either add or remove entry
514
  @param host: The host to operate on
515
  @param ip: The ip associated with the entry
516

517
  """
518
  if mode == constants.ETC_HOSTS_ADD:
519
    if not ip:
520
      RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
521
              " present")
522
    utils.AddHostToEtcHosts(host, ip)
523
  elif mode == constants.ETC_HOSTS_REMOVE:
524
    if ip:
525
      RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
526
              " parameter is present")
527
    utils.RemoveHostFromEtcHosts(host)
528
  else:
529
    RPCFail("Mode not supported")
530

    
531

    
532
def LeaveCluster(modify_ssh_setup):
533
  """Cleans up and remove the current node.
534

535
  This function cleans up and prepares the current node to be removed
536
  from the cluster.
537

538
  If processing is successful, then it raises an
539
  L{errors.QuitGanetiException} which is used as a special case to
540
  shutdown the node daemon.
541

542
  @param modify_ssh_setup: boolean
543

544
  """
545
  _CleanDirectory(pathutils.DATA_DIR)
546
  _CleanDirectory(pathutils.CRYPTO_KEYS_DIR)
547
  JobQueuePurge()
548

    
549
  if modify_ssh_setup:
550
    try:
551
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
552

    
553
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
554

    
555
      utils.RemoveFile(priv_key)
556
      utils.RemoveFile(pub_key)
557
    except errors.OpExecError:
558
      logging.exception("Error while processing ssh files")
559

    
560
  try:
561
    utils.RemoveFile(pathutils.CONFD_HMAC_KEY)
562
    utils.RemoveFile(pathutils.RAPI_CERT_FILE)
563
    utils.RemoveFile(pathutils.SPICE_CERT_FILE)
564
    utils.RemoveFile(pathutils.SPICE_CACERT_FILE)
565
    utils.RemoveFile(pathutils.NODED_CERT_FILE)
566
  except: # pylint: disable=W0702
567
    logging.exception("Error while removing cluster secrets")
568

    
569
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.CONFD])
570
  if result.failed:
571
    logging.error("Command %s failed with exitcode %s and error %s",
572
                  result.cmd, result.exit_code, result.output)
573

    
574
  # Raise a custom exception (handled in ganeti-noded)
575
  raise errors.QuitGanetiException(True, "Shutdown scheduled")
576

    
577

    
578
def _CheckStorageParams(params, num_params):
579
  """Performs sanity checks for storage parameters.
580

581
  @type params: list
582
  @param params: list of storage parameters
583
  @type num_params: int
584
  @param num_params: expected number of parameters
585

586
  """
587
  if params is None:
588
    raise errors.ProgrammerError("No storage parameters for storage"
589
                                 " reporting is provided.")
590
  if not isinstance(params, list):
591
    raise errors.ProgrammerError("The storage parameters are not of type"
592
                                 " list: '%s'" % params)
593
  if not len(params) == num_params:
594
    raise errors.ProgrammerError("Did not receive the expected number of"
595
                                 "storage parameters: expected %s,"
596
                                 " received '%s'" % (num_params, len(params)))
597

    
598

    
599
def _CheckLvmStorageParams(params):
600
  """Performs sanity check for the 'exclusive storage' flag.
601

602
  @see: C{_CheckStorageParams}
603

604
  """
605
  _CheckStorageParams(params, 1)
606
  excl_stor = params[0]
607
  if not isinstance(params[0], bool):
608
    raise errors.ProgrammerError("Exclusive storage parameter is not"
609
                                 " boolean: '%s'." % excl_stor)
610
  return excl_stor
611

    
612

    
613
def _GetLvmVgSpaceInfo(name, params):
614
  """Wrapper around C{_GetVgInfo} which checks the storage parameters.
615

616
  @type name: string
617
  @param name: name of the volume group
618
  @type params: list
619
  @param params: list of storage parameters, which in this case should be
620
    containing only one for exclusive storage
621

622
  """
623
  excl_stor = _CheckLvmStorageParams(params)
624
  return _GetVgInfo(name, excl_stor)
625

    
626

    
627
def _GetVgInfo(
628
    name, excl_stor, info_fn=bdev.LogicalVolume.GetVGInfo):
629
  """Retrieves information about a LVM volume group.
630

631
  """
632
  # TODO: GetVGInfo supports returning information for multiple VGs at once
633
  vginfo = info_fn([name], excl_stor)
634
  if vginfo:
635
    vg_free = int(round(vginfo[0][0], 0))
636
    vg_size = int(round(vginfo[0][1], 0))
637
  else:
638
    vg_free = None
639
    vg_size = None
640

    
641
  return {
642
    "type": constants.ST_LVM_VG,
643
    "name": name,
644
    "storage_free": vg_free,
645
    "storage_size": vg_size,
646
    }
647

    
648

    
649
def _GetLvmPvSpaceInfo(name, params):
650
  """Wrapper around C{_GetVgSpindlesInfo} with sanity checks.
651

652
  @see: C{_GetLvmVgSpaceInfo}
653

654
  """
655
  excl_stor = _CheckLvmStorageParams(params)
656
  return _GetVgSpindlesInfo(name, excl_stor)
657

    
658

    
659
def _GetVgSpindlesInfo(
660
    name, excl_stor, info_fn=bdev.LogicalVolume.GetVgSpindlesInfo):
661
  """Retrieves information about spindles in an LVM volume group.
662

663
  @type name: string
664
  @param name: VG name
665
  @type excl_stor: bool
666
  @param excl_stor: exclusive storage
667
  @rtype: dict
668
  @return: dictionary whose keys are "name", "vg_free", "vg_size" for VG name,
669
      free spindles, total spindles respectively
670

671
  """
672
  if excl_stor:
673
    (vg_free, vg_size) = info_fn(name)
674
  else:
675
    vg_free = 0
676
    vg_size = 0
677
  return {
678
    "type": constants.ST_LVM_PV,
679
    "name": name,
680
    "storage_free": vg_free,
681
    "storage_size": vg_size,
682
    }
683

    
684

    
685
def _GetHvInfo(name, hvparams, get_hv_fn=hypervisor.GetHypervisor):
686
  """Retrieves node information from a hypervisor.
687

688
  The information returned depends on the hypervisor. Common items:
689

690
    - vg_size is the size of the configured volume group in MiB
691
    - vg_free is the free size of the volume group in MiB
692
    - memory_dom0 is the memory allocated for domain0 in MiB
693
    - memory_free is the currently available (free) ram in MiB
694
    - memory_total is the total number of ram in MiB
695
    - hv_version: the hypervisor version, if available
696

697
  @type hvparams: dict of string
698
  @param hvparams: the hypervisor's hvparams
699

700
  """
701
  return get_hv_fn(name).GetNodeInfo(hvparams=hvparams)
702

    
703

    
704
def _GetHvInfoAll(hv_specs, get_hv_fn=hypervisor.GetHypervisor):
705
  """Retrieves node information for all hypervisors.
706

707
  See C{_GetHvInfo} for information on the output.
708

709
  @type hv_specs: list of pairs (string, dict of strings)
710
  @param hv_specs: list of pairs of a hypervisor's name and its hvparams
711

712
  """
713
  if hv_specs is None:
714
    return None
715

    
716
  result = []
717
  for hvname, hvparams in hv_specs:
718
    result.append(_GetHvInfo(hvname, hvparams, get_hv_fn))
719
  return result
720

    
721

    
722
def _GetNamedNodeInfo(names, fn):
723
  """Calls C{fn} for all names in C{names} and returns a dictionary.
724

725
  @rtype: None or dict
726

727
  """
728
  if names is None:
729
    return None
730
  else:
731
    return map(fn, names)
732

    
733

    
734
def GetNodeInfo(storage_units, hv_specs):
735
  """Gives back a hash with different information about the node.
736

737
  @type storage_units: list of tuples (string, string, list)
738
  @param storage_units: List of tuples (storage unit, identifier, parameters) to
739
    ask for disk space information. In case of lvm-vg, the identifier is
740
    the VG name. The parameters can contain additional, storage-type-specific
741
    parameters, for example exclusive storage for lvm storage.
742
  @type hv_specs: list of pairs (string, dict of strings)
743
  @param hv_specs: list of pairs of a hypervisor's name and its hvparams
744
  @rtype: tuple; (string, None/dict, None/dict)
745
  @return: Tuple containing boot ID, volume group information and hypervisor
746
    information
747

748
  """
749
  bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
750
  storage_info = _GetNamedNodeInfo(
751
    storage_units,
752
    (lambda (storage_type, storage_key, storage_params):
753
        _ApplyStorageInfoFunction(storage_type, storage_key, storage_params)))
754
  hv_info = _GetHvInfoAll(hv_specs)
755
  return (bootid, storage_info, hv_info)
756

    
757

    
758
def _GetFileStorageSpaceInfo(path, params):
759
  """Wrapper around filestorage.GetSpaceInfo.
760

761
  The purpose of this wrapper is to call filestorage.GetFileStorageSpaceInfo
762
  and ignore the *args parameter to not leak it into the filestorage
763
  module's code.
764

765
  @see: C{filestorage.GetFileStorageSpaceInfo} for description of the
766
    parameters.
767

768
  """
769
  _CheckStorageParams(params, 0)
770
  return filestorage.GetFileStorageSpaceInfo(path)
771

    
772

    
773
# FIXME: implement storage reporting for all missing storage types.
774
_STORAGE_TYPE_INFO_FN = {
775
  constants.ST_BLOCK: None,
776
  constants.ST_DISKLESS: None,
777
  constants.ST_EXT: None,
778
  constants.ST_FILE: _GetFileStorageSpaceInfo,
779
  constants.ST_LVM_PV: _GetLvmPvSpaceInfo,
780
  constants.ST_LVM_VG: _GetLvmVgSpaceInfo,
781
  constants.ST_SHARED_FILE: None,
782
  constants.ST_RADOS: None,
783
}
784

    
785

    
786
def _ApplyStorageInfoFunction(storage_type, storage_key, *args):
787
  """Looks up and applies the correct function to calculate free and total
788
  storage for the given storage type.
789

790
  @type storage_type: string
791
  @param storage_type: the storage type for which the storage shall be reported.
792
  @type storage_key: string
793
  @param storage_key: identifier of a storage unit, e.g. the volume group name
794
    of an LVM storage unit
795
  @type args: any
796
  @param args: various parameters that can be used for storage reporting. These
797
    parameters and their semantics vary from storage type to storage type and
798
    are just propagated in this function.
799
  @return: the results of the application of the storage space function (see
800
    _STORAGE_TYPE_INFO_FN) if storage space reporting is implemented for that
801
    storage type
802
  @raises NotImplementedError: for storage types who don't support space
803
    reporting yet
804
  """
805
  fn = _STORAGE_TYPE_INFO_FN[storage_type]
806
  if fn is not None:
807
    return fn(storage_key, *args)
808
  else:
809
    raise NotImplementedError
810

    
811

    
812
def _CheckExclusivePvs(pvi_list):
813
  """Check that PVs are not shared among LVs
814

815
  @type pvi_list: list of L{objects.LvmPvInfo} objects
816
  @param pvi_list: information about the PVs
817

818
  @rtype: list of tuples (string, list of strings)
819
  @return: offending volumes, as tuples: (pv_name, [lv1_name, lv2_name...])
820

821
  """
822
  res = []
823
  for pvi in pvi_list:
824
    if len(pvi.lv_list) > 1:
825
      res.append((pvi.name, pvi.lv_list))
826
  return res
827

    
828

    
829
def _VerifyHypervisors(what, vm_capable, result, all_hvparams,
830
                       get_hv_fn=hypervisor.GetHypervisor):
831
  """Verifies the hypervisor. Appends the results to the 'results' list.
832

833
  @type what: C{dict}
834
  @param what: a dictionary of things to check
835
  @type vm_capable: boolean
836
  @param vm_capable: whether or not this node is vm capable
837
  @type result: dict
838
  @param result: dictionary of verification results; results of the
839
    verifications in this function will be added here
840
  @type all_hvparams: dict of dict of string
841
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
842
  @type get_hv_fn: function
843
  @param get_hv_fn: function to retrieve the hypervisor, to improve testability
844

845
  """
846
  if not vm_capable:
847
    return
848

    
849
  if constants.NV_HYPERVISOR in what:
850
    result[constants.NV_HYPERVISOR] = {}
851
    for hv_name in what[constants.NV_HYPERVISOR]:
852
      hvparams = all_hvparams[hv_name]
853
      try:
854
        val = get_hv_fn(hv_name).Verify(hvparams=hvparams)
855
      except errors.HypervisorError, err:
856
        val = "Error while checking hypervisor: %s" % str(err)
857
      result[constants.NV_HYPERVISOR][hv_name] = val
858

    
859

    
860
def _VerifyHvparams(what, vm_capable, result,
861
                    get_hv_fn=hypervisor.GetHypervisor):
862
  """Verifies the hvparams. Appends the results to the 'results' list.
863

864
  @type what: C{dict}
865
  @param what: a dictionary of things to check
866
  @type vm_capable: boolean
867
  @param vm_capable: whether or not this node is vm capable
868
  @type result: dict
869
  @param result: dictionary of verification results; results of the
870
    verifications in this function will be added here
871
  @type get_hv_fn: function
872
  @param get_hv_fn: function to retrieve the hypervisor, to improve testability
873

874
  """
875
  if not vm_capable:
876
    return
877

    
878
  if constants.NV_HVPARAMS in what:
879
    result[constants.NV_HVPARAMS] = []
880
    for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
881
      try:
882
        logging.info("Validating hv %s, %s", hv_name, hvparms)
883
        get_hv_fn(hv_name).ValidateParameters(hvparms)
884
      except errors.HypervisorError, err:
885
        result[constants.NV_HVPARAMS].append((source, hv_name, str(err)))
886

    
887

    
888
def _VerifyInstanceList(what, vm_capable, result, all_hvparams):
889
  """Verifies the instance list.
890

891
  @type what: C{dict}
892
  @param what: a dictionary of things to check
893
  @type vm_capable: boolean
894
  @param vm_capable: whether or not this node is vm capable
895
  @type result: dict
896
  @param result: dictionary of verification results; results of the
897
    verifications in this function will be added here
898
  @type all_hvparams: dict of dict of string
899
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
900

901
  """
902
  if constants.NV_INSTANCELIST in what and vm_capable:
903
    # GetInstanceList can fail
904
    try:
905
      val = GetInstanceList(what[constants.NV_INSTANCELIST],
906
                            all_hvparams=all_hvparams)
907
    except RPCFail, err:
908
      val = str(err)
909
    result[constants.NV_INSTANCELIST] = val
910

    
911

    
912
def _VerifyNodeInfo(what, vm_capable, result, all_hvparams):
913
  """Verifies the node info.
914

915
  @type what: C{dict}
916
  @param what: a dictionary of things to check
917
  @type vm_capable: boolean
918
  @param vm_capable: whether or not this node is vm capable
919
  @type result: dict
920
  @param result: dictionary of verification results; results of the
921
    verifications in this function will be added here
922
  @type all_hvparams: dict of dict of string
923
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
924

925
  """
926
  if constants.NV_HVINFO in what and vm_capable:
927
    hvname = what[constants.NV_HVINFO]
928
    hyper = hypervisor.GetHypervisor(hvname)
929
    hvparams = all_hvparams[hvname]
930
    result[constants.NV_HVINFO] = hyper.GetNodeInfo(hvparams=hvparams)
931

    
932

    
933
def VerifyNode(what, cluster_name, all_hvparams, node_groups, groups_cfg):
934
  """Verify the status of the local node.
935

936
  Based on the input L{what} parameter, various checks are done on the
937
  local node.
938

939
  If the I{filelist} key is present, this list of
940
  files is checksummed and the file/checksum pairs are returned.
941

942
  If the I{nodelist} key is present, we check that we have
943
  connectivity via ssh with the target nodes (and check the hostname
944
  report).
945

946
  If the I{node-net-test} key is present, we check that we have
947
  connectivity to the given nodes via both primary IP and, if
948
  applicable, secondary IPs.
949

950
  @type what: C{dict}
951
  @param what: a dictionary of things to check:
952
      - filelist: list of files for which to compute checksums
953
      - nodelist: list of nodes we should check ssh communication with
954
      - node-net-test: list of nodes we should check node daemon port
955
        connectivity with
956
      - hypervisor: list with hypervisors to run the verify for
957
  @type cluster_name: string
958
  @param cluster_name: the cluster's name
959
  @type all_hvparams: dict of dict of strings
960
  @param all_hvparams: a dictionary mapping hypervisor names to hvparams
961
  @type node_groups: a dict of strings
962
  @param node_groups: node _names_ mapped to their group uuids (it's enough to
963
      have only those nodes that are in `what["nodelist"]`)
964
  @type groups_cfg: a dict of dict of strings
965
  @param groups_cfg: a dictionary mapping group uuids to their configuration
966
  @rtype: dict
967
  @return: a dictionary with the same keys as the input dict, and
968
      values representing the result of the checks
969

970
  """
971
  result = {}
972
  my_name = netutils.Hostname.GetSysName()
973
  port = netutils.GetDaemonPort(constants.NODED)
974
  vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
975

    
976
  _VerifyHypervisors(what, vm_capable, result, all_hvparams)
977
  _VerifyHvparams(what, vm_capable, result)
978

    
979
  if constants.NV_FILELIST in what:
980
    fingerprints = utils.FingerprintFiles(map(vcluster.LocalizeVirtualPath,
981
                                              what[constants.NV_FILELIST]))
982
    result[constants.NV_FILELIST] = \
983
      dict((vcluster.MakeVirtualPath(key), value)
984
           for (key, value) in fingerprints.items())
985

    
986
  if constants.NV_NODELIST in what:
987
    (nodes, bynode) = what[constants.NV_NODELIST]
988

    
989
    # Add nodes from other groups (different for each node)
990
    try:
991
      nodes.extend(bynode[my_name])
992
    except KeyError:
993
      pass
994

    
995
    # Use a random order
996
    random.shuffle(nodes)
997

    
998
    # Try to contact all nodes
999
    val = {}
1000
    for node in nodes:
1001
      params = groups_cfg.get(node_groups.get(node))
1002
      ssh_port = params["ndparams"].get(constants.ND_SSH_PORT)
1003
      logging.debug("Ssh port %s (None = default) for node %s",
1004
                    str(ssh_port), node)
1005
      success, message = _GetSshRunner(cluster_name). \
1006
                            VerifyNodeHostname(node, ssh_port)
1007
      if not success:
1008
        val[node] = message
1009

    
1010
    result[constants.NV_NODELIST] = val
1011

    
1012
  if constants.NV_NODENETTEST in what:
1013
    result[constants.NV_NODENETTEST] = tmp = {}
1014
    my_pip = my_sip = None
1015
    for name, pip, sip in what[constants.NV_NODENETTEST]:
1016
      if name == my_name:
1017
        my_pip = pip
1018
        my_sip = sip
1019
        break
1020
    if not my_pip:
1021
      tmp[my_name] = ("Can't find my own primary/secondary IP"
1022
                      " in the node list")
1023
    else:
1024
      for name, pip, sip in what[constants.NV_NODENETTEST]:
1025
        fail = []
1026
        if not netutils.TcpPing(pip, port, source=my_pip):
1027
          fail.append("primary")
1028
        if sip != pip:
1029
          if not netutils.TcpPing(sip, port, source=my_sip):
1030
            fail.append("secondary")
1031
        if fail:
1032
          tmp[name] = ("failure using the %s interface(s)" %
1033
                       " and ".join(fail))
1034

    
1035
  if constants.NV_MASTERIP in what:
1036
    # FIXME: add checks on incoming data structures (here and in the
1037
    # rest of the function)
1038
    master_name, master_ip = what[constants.NV_MASTERIP]
1039
    if master_name == my_name:
1040
      source = constants.IP4_ADDRESS_LOCALHOST
1041
    else:
1042
      source = None
1043
    result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
1044
                                                     source=source)
1045

    
1046
  if constants.NV_USERSCRIPTS in what:
1047
    result[constants.NV_USERSCRIPTS] = \
1048
      [script for script in what[constants.NV_USERSCRIPTS]
1049
       if not utils.IsExecutable(script)]
1050

    
1051
  if constants.NV_OOB_PATHS in what:
1052
    result[constants.NV_OOB_PATHS] = tmp = []
1053
    for path in what[constants.NV_OOB_PATHS]:
1054
      try:
1055
        st = os.stat(path)
1056
      except OSError, err:
1057
        tmp.append("error stating out of band helper: %s" % err)
1058
      else:
1059
        if stat.S_ISREG(st.st_mode):
1060
          if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
1061
            tmp.append(None)
1062
          else:
1063
            tmp.append("out of band helper %s is not executable" % path)
1064
        else:
1065
          tmp.append("out of band helper %s is not a file" % path)
1066

    
1067
  if constants.NV_LVLIST in what and vm_capable:
1068
    try:
1069
      val = GetVolumeList(utils.ListVolumeGroups().keys())
1070
    except RPCFail, err:
1071
      val = str(err)
1072
    result[constants.NV_LVLIST] = val
1073

    
1074
  _VerifyInstanceList(what, vm_capable, result, all_hvparams)
1075

    
1076
  if constants.NV_VGLIST in what and vm_capable:
1077
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
1078

    
1079
  if constants.NV_PVLIST in what and vm_capable:
1080
    check_exclusive_pvs = constants.NV_EXCLUSIVEPVS in what
1081
    val = bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
1082
                                       filter_allocatable=False,
1083
                                       include_lvs=check_exclusive_pvs)
1084
    if check_exclusive_pvs:
1085
      result[constants.NV_EXCLUSIVEPVS] = _CheckExclusivePvs(val)
1086
      for pvi in val:
1087
        # Avoid sending useless data on the wire
1088
        pvi.lv_list = []
1089
    result[constants.NV_PVLIST] = map(objects.LvmPvInfo.ToDict, val)
1090

    
1091
  if constants.NV_VERSION in what:
1092
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
1093
                                    constants.RELEASE_VERSION)
1094

    
1095
  _VerifyNodeInfo(what, vm_capable, result, all_hvparams)
1096

    
1097
  if constants.NV_DRBDVERSION in what and vm_capable:
1098
    try:
1099
      drbd_version = DRBD8.GetProcInfo().GetVersionString()
1100
    except errors.BlockDeviceError, err:
1101
      logging.warning("Can't get DRBD version", exc_info=True)
1102
      drbd_version = str(err)
1103
    result[constants.NV_DRBDVERSION] = drbd_version
1104

    
1105
  if constants.NV_DRBDLIST in what and vm_capable:
1106
    try:
1107
      used_minors = drbd.DRBD8.GetUsedDevs()
1108
    except errors.BlockDeviceError, err:
1109
      logging.warning("Can't get used minors list", exc_info=True)
1110
      used_minors = str(err)
1111
    result[constants.NV_DRBDLIST] = used_minors
1112

    
1113
  if constants.NV_DRBDHELPER in what and vm_capable:
1114
    status = True
1115
    try:
1116
      payload = drbd.DRBD8.GetUsermodeHelper()
1117
    except errors.BlockDeviceError, err:
1118
      logging.error("Can't get DRBD usermode helper: %s", str(err))
1119
      status = False
1120
      payload = str(err)
1121
    result[constants.NV_DRBDHELPER] = (status, payload)
1122

    
1123
  if constants.NV_NODESETUP in what:
1124
    result[constants.NV_NODESETUP] = tmpr = []
1125
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
1126
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
1127
                  " under /sys, missing required directories /sys/block"
1128
                  " and /sys/class/net")
1129
    if (not os.path.isdir("/proc/sys") or
1130
        not os.path.isfile("/proc/sysrq-trigger")):
1131
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
1132
                  " under /proc, missing required directory /proc/sys and"
1133
                  " the file /proc/sysrq-trigger")
1134

    
1135
  if constants.NV_TIME in what:
1136
    result[constants.NV_TIME] = utils.SplitTime(time.time())
1137

    
1138
  if constants.NV_OSLIST in what and vm_capable:
1139
    result[constants.NV_OSLIST] = DiagnoseOS()
1140

    
1141
  if constants.NV_BRIDGES in what and vm_capable:
1142
    result[constants.NV_BRIDGES] = [bridge
1143
                                    for bridge in what[constants.NV_BRIDGES]
1144
                                    if not utils.BridgeExists(bridge)]
1145

    
1146
  if what.get(constants.NV_ACCEPTED_STORAGE_PATHS) == my_name:
1147
    result[constants.NV_ACCEPTED_STORAGE_PATHS] = \
1148
        filestorage.ComputeWrongFileStoragePaths()
1149

    
1150
  if what.get(constants.NV_FILE_STORAGE_PATH):
1151
    pathresult = filestorage.CheckFileStoragePath(
1152
        what[constants.NV_FILE_STORAGE_PATH])
1153
    if pathresult:
1154
      result[constants.NV_FILE_STORAGE_PATH] = pathresult
1155

    
1156
  if what.get(constants.NV_SHARED_FILE_STORAGE_PATH):
1157
    pathresult = filestorage.CheckFileStoragePath(
1158
        what[constants.NV_SHARED_FILE_STORAGE_PATH])
1159
    if pathresult:
1160
      result[constants.NV_SHARED_FILE_STORAGE_PATH] = pathresult
1161

    
1162
  return result
1163

    
1164

    
1165
def GetCryptoTokens(token_requests):
1166
  """Perform actions on the node's cryptographic tokens.
1167

1168
  Token types can be 'ssl' or 'ssh'. So far only some actions are implemented
1169
  for 'ssl'. Action 'get' returns the digest of the public client ssl
1170
  certificate. Action 'create' creates a new client certificate and private key
1171
  and also returns the digest of the certificate. The third parameter of a
1172
  token request are optional parameters for the actions, so far only the
1173
  filename is supported.
1174

1175
  @type token_requests: list of tuples of (string, string, dict), where the
1176
    first string is in constants.CRYPTO_TYPES, the second in
1177
    constants.CRYPTO_ACTIONS. The third parameter is a dictionary of string
1178
    to string.
1179
  @param token_requests: list of requests of cryptographic tokens and actions
1180
    to perform on them. The actions come with a dictionary of options.
1181
  @rtype: list of tuples (string, string)
1182
  @return: list of tuples of the token type and the public crypto token
1183

1184
  """
1185
  _VALID_CERT_FILES = [pathutils.NODED_CERT_FILE,
1186
                       pathutils.NODED_CLIENT_CERT_FILE,
1187
                       pathutils.NODED_CLIENT_CERT_FILE_TMP]
1188
  _DEFAULT_CERT_FILE = pathutils.NODED_CLIENT_CERT_FILE
1189
  tokens = []
1190
  for (token_type, action, options) in token_requests:
1191
    if token_type not in constants.CRYPTO_TYPES:
1192
      raise errors.ProgrammerError("Token type '%s' not supported." %
1193
                                   token_type)
1194
    if action not in constants.CRYPTO_ACTIONS:
1195
      raise errors.ProgrammerError("Action '%s' is not supported." %
1196
                                   action)
1197
    if token_type == constants.CRYPTO_TYPE_SSL_DIGEST:
1198
      if action == constants.CRYPTO_ACTION_CREATE:
1199
        cert_filename = None
1200
        if options:
1201
          cert_filename = options.get(constants.CRYPTO_OPTION_CERT_FILE)
1202
        if not cert_filename:
1203
          cert_filename = _DEFAULT_CERT_FILE
1204
        # For security reason, we don't allow arbitrary filenames
1205
        if not cert_filename in _VALID_CERT_FILES:
1206
          raise errors.ProgrammerError(
1207
            "The certificate file name path '%s' is not allowed." %
1208
            cert_filename)
1209
        utils.GenerateNewSslCert(
1210
          True, cert_filename,
1211
          "Create new client SSL certificate in %s." % cert_filename)
1212
        tokens.append((token_type,
1213
                       utils.GetClientCertificateDigest(
1214
                         cert_filename=cert_filename)))
1215
      elif action == constants.CRYPTO_ACTION_GET:
1216
        tokens.append((token_type,
1217
                       utils.GetClientCertificateDigest()))
1218
  return tokens
1219

    
1220

    
1221
def GetBlockDevSizes(devices):
1222
  """Return the size of the given block devices
1223

1224
  @type devices: list
1225
  @param devices: list of block device nodes to query
1226
  @rtype: dict
1227
  @return:
1228
    dictionary of all block devices under /dev (key). The value is their
1229
    size in MiB.
1230

1231
    {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
1232

1233
  """
1234
  DEV_PREFIX = "/dev/"
1235
  blockdevs = {}
1236

    
1237
  for devpath in devices:
1238
    if not utils.IsBelowDir(DEV_PREFIX, devpath):
1239
      continue
1240

    
1241
    try:
1242
      st = os.stat(devpath)
1243
    except EnvironmentError, err:
1244
      logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
1245
      continue
1246

    
1247
    if stat.S_ISBLK(st.st_mode):
1248
      result = utils.RunCmd(["blockdev", "--getsize64", devpath])
1249
      if result.failed:
1250
        # We don't want to fail, just do not list this device as available
1251
        logging.warning("Cannot get size for block device %s", devpath)
1252
        continue
1253

    
1254
      size = int(result.stdout) / (1024 * 1024)
1255
      blockdevs[devpath] = size
1256
  return blockdevs
1257

    
1258

    
1259
def GetVolumeList(vg_names):
1260
  """Compute list of logical volumes and their size.
1261

1262
  @type vg_names: list
1263
  @param vg_names: the volume groups whose LVs we should list, or
1264
      empty for all volume groups
1265
  @rtype: dict
1266
  @return:
1267
      dictionary of all partions (key) with value being a tuple of
1268
      their size (in MiB), inactive and online status::
1269

1270
        {'xenvg/test1': ('20.06', True, True)}
1271

1272
      in case of errors, a string is returned with the error
1273
      details.
1274

1275
  """
1276
  lvs = {}
1277
  sep = "|"
1278
  if not vg_names:
1279
    vg_names = []
1280
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
1281
                         "--separator=%s" % sep,
1282
                         "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
1283
  if result.failed:
1284
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
1285

    
1286
  for line in result.stdout.splitlines():
1287
    line = line.strip()
1288
    match = _LVSLINE_REGEX.match(line)
1289
    if not match:
1290
      logging.error("Invalid line returned from lvs output: '%s'", line)
1291
      continue
1292
    vg_name, name, size, attr = match.groups()
1293
    inactive = attr[4] == "-"
1294
    online = attr[5] == "o"
1295
    virtual = attr[0] == "v"
1296
    if virtual:
1297
      # we don't want to report such volumes as existing, since they
1298
      # don't really hold data
1299
      continue
1300
    lvs[vg_name + "/" + name] = (size, inactive, online)
1301

    
1302
  return lvs
1303

    
1304

    
1305
def ListVolumeGroups():
1306
  """List the volume groups and their size.
1307

1308
  @rtype: dict
1309
  @return: dictionary with keys volume name and values the
1310
      size of the volume
1311

1312
  """
1313
  return utils.ListVolumeGroups()
1314

    
1315

    
1316
def NodeVolumes():
1317
  """List all volumes on this node.
1318

1319
  @rtype: list
1320
  @return:
1321
    A list of dictionaries, each having four keys:
1322
      - name: the logical volume name,
1323
      - size: the size of the logical volume
1324
      - dev: the physical device on which the LV lives
1325
      - vg: the volume group to which it belongs
1326

1327
    In case of errors, we return an empty list and log the
1328
    error.
1329

1330
    Note that since a logical volume can live on multiple physical
1331
    volumes, the resulting list might include a logical volume
1332
    multiple times.
1333

1334
  """
1335
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
1336
                         "--separator=|",
1337
                         "--options=lv_name,lv_size,devices,vg_name"])
1338
  if result.failed:
1339
    _Fail("Failed to list logical volumes, lvs output: %s",
1340
          result.output)
1341

    
1342
  def parse_dev(dev):
1343
    return dev.split("(")[0]
1344

    
1345
  def handle_dev(dev):
1346
    return [parse_dev(x) for x in dev.split(",")]
1347

    
1348
  def map_line(line):
1349
    line = [v.strip() for v in line]
1350
    return [{"name": line[0], "size": line[1],
1351
             "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
1352

    
1353
  all_devs = []
1354
  for line in result.stdout.splitlines():
1355
    if line.count("|") >= 3:
1356
      all_devs.extend(map_line(line.split("|")))
1357
    else:
1358
      logging.warning("Strange line in the output from lvs: '%s'", line)
1359
  return all_devs
1360

    
1361

    
1362
def BridgesExist(bridges_list):
1363
  """Check if a list of bridges exist on the current node.
1364

1365
  @rtype: boolean
1366
  @return: C{True} if all of them exist, C{False} otherwise
1367

1368
  """
1369
  missing = []
1370
  for bridge in bridges_list:
1371
    if not utils.BridgeExists(bridge):
1372
      missing.append(bridge)
1373

    
1374
  if missing:
1375
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
1376

    
1377

    
1378
def GetInstanceListForHypervisor(hname, hvparams=None,
1379
                                 get_hv_fn=hypervisor.GetHypervisor):
1380
  """Provides a list of instances of the given hypervisor.
1381

1382
  @type hname: string
1383
  @param hname: name of the hypervisor
1384
  @type hvparams: dict of strings
1385
  @param hvparams: hypervisor parameters for the given hypervisor
1386
  @type get_hv_fn: function
1387
  @param get_hv_fn: function that returns a hypervisor for the given hypervisor
1388
    name; optional parameter to increase testability
1389

1390
  @rtype: list
1391
  @return: a list of all running instances on the current node
1392
    - instance1.example.com
1393
    - instance2.example.com
1394

1395
  """
1396
  results = []
1397
  try:
1398
    hv = get_hv_fn(hname)
1399
    names = hv.ListInstances(hvparams=hvparams)
1400
    results.extend(names)
1401
  except errors.HypervisorError, err:
1402
    _Fail("Error enumerating instances (hypervisor %s): %s",
1403
          hname, err, exc=True)
1404
  return results
1405

    
1406

    
1407
def GetInstanceList(hypervisor_list, all_hvparams=None,
1408
                    get_hv_fn=hypervisor.GetHypervisor):
1409
  """Provides a list of instances.
1410

1411
  @type hypervisor_list: list
1412
  @param hypervisor_list: the list of hypervisors to query information
1413
  @type all_hvparams: dict of dict of strings
1414
  @param all_hvparams: a dictionary mapping hypervisor types to respective
1415
    cluster-wide hypervisor parameters
1416
  @type get_hv_fn: function
1417
  @param get_hv_fn: function that returns a hypervisor for the given hypervisor
1418
    name; optional parameter to increase testability
1419

1420
  @rtype: list
1421
  @return: a list of all running instances on the current node
1422
    - instance1.example.com
1423
    - instance2.example.com
1424

1425
  """
1426
  results = []
1427
  for hname in hypervisor_list:
1428
    hvparams = all_hvparams[hname]
1429
    results.extend(GetInstanceListForHypervisor(hname, hvparams=hvparams,
1430
                                                get_hv_fn=get_hv_fn))
1431
  return results
1432

    
1433

    
1434
def GetInstanceInfo(instance, hname, hvparams=None):
1435
  """Gives back the information about an instance as a dictionary.
1436

1437
  @type instance: string
1438
  @param instance: the instance name
1439
  @type hname: string
1440
  @param hname: the hypervisor type of the instance
1441
  @type hvparams: dict of strings
1442
  @param hvparams: the instance's hvparams
1443

1444
  @rtype: dict
1445
  @return: dictionary with the following keys:
1446
      - memory: memory size of instance (int)
1447
      - state: state of instance (HvInstanceState)
1448
      - time: cpu time of instance (float)
1449
      - vcpus: the number of vcpus (int)
1450

1451
  """
1452
  output = {}
1453

    
1454
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance,
1455
                                                          hvparams=hvparams)
1456
  if iinfo is not None:
1457
    output["memory"] = iinfo[2]
1458
    output["vcpus"] = iinfo[3]
1459
    output["state"] = iinfo[4]
1460
    output["time"] = iinfo[5]
1461

    
1462
  return output
1463

    
1464

    
1465
def GetInstanceMigratable(instance):
1466
  """Computes whether an instance can be migrated.
1467

1468
  @type instance: L{objects.Instance}
1469
  @param instance: object representing the instance to be checked.
1470

1471
  @rtype: tuple
1472
  @return: tuple of (result, description) where:
1473
      - result: whether the instance can be migrated or not
1474
      - description: a description of the issue, if relevant
1475

1476
  """
1477
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1478
  iname = instance.name
1479
  if iname not in hyper.ListInstances(instance.hvparams):
1480
    _Fail("Instance %s is not running", iname)
1481

    
1482
  for idx in range(len(instance.disks)):
1483
    link_name = _GetBlockDevSymlinkPath(iname, idx)
1484
    if not os.path.islink(link_name):
1485
      logging.warning("Instance %s is missing symlink %s for disk %d",
1486
                      iname, link_name, idx)
1487

    
1488

    
1489
def GetAllInstancesInfo(hypervisor_list, all_hvparams):
1490
  """Gather data about all instances.
1491

1492
  This is the equivalent of L{GetInstanceInfo}, except that it
1493
  computes data for all instances at once, thus being faster if one
1494
  needs data about more than one instance.
1495

1496
  @type hypervisor_list: list
1497
  @param hypervisor_list: list of hypervisors to query for instance data
1498
  @type all_hvparams: dict of dict of strings
1499
  @param all_hvparams: mapping of hypervisor names to hvparams
1500

1501
  @rtype: dict
1502
  @return: dictionary of instance: data, with data having the following keys:
1503
      - memory: memory size of instance (int)
1504
      - state: xen state of instance (string)
1505
      - time: cpu time of instance (float)
1506
      - vcpus: the number of vcpus
1507

1508
  """
1509
  output = {}
1510
  for hname in hypervisor_list:
1511
    hvparams = all_hvparams[hname]
1512
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo(hvparams)
1513
    if iinfo:
1514
      for name, _, memory, vcpus, state, times in iinfo:
1515
        value = {
1516
          "memory": memory,
1517
          "vcpus": vcpus,
1518
          "state": state,
1519
          "time": times,
1520
          }
1521
        if name in output:
1522
          # we only check static parameters, like memory and vcpus,
1523
          # and not state and time which can change between the
1524
          # invocations of the different hypervisors
1525
          for key in "memory", "vcpus":
1526
            if value[key] != output[name][key]:
1527
              _Fail("Instance %s is running twice"
1528
                    " with different parameters", name)
1529
        output[name] = value
1530

    
1531
  return output
1532

    
1533

    
1534
def GetInstanceConsoleInfo(instance_param_dict,
1535
                           get_hv_fn=hypervisor.GetHypervisor):
1536
  """Gather data about the console access of a set of instances of this node.
1537

1538
  This function assumes that the caller already knows which instances are on
1539
  this node, by calling a function such as L{GetAllInstancesInfo} or
1540
  L{GetInstanceList}.
1541

1542
  For every instance, a large amount of configuration data needs to be
1543
  provided to the hypervisor interface in order to receive the console
1544
  information. Whether this could or should be cut down can be discussed.
1545
  The information is provided in a dictionary indexed by instance name,
1546
  allowing any number of instance queries to be done.
1547

1548
  @type instance_param_dict: dict of string to tuple of dictionaries, where the
1549
    dictionaries represent: L{objects.Instance}, L{objects.Node},
1550
    L{objects.NodeGroup}, HvParams, BeParams
1551
  @param instance_param_dict: mapping of instance name to parameters necessary
1552
    for console information retrieval
1553

1554
  @rtype: dict
1555
  @return: dictionary of instance: data, with data having the following keys:
1556
      - instance: instance name
1557
      - kind: console kind
1558
      - message: used with kind == CONS_MESSAGE, indicates console to be
1559
                 unavailable, supplies error message
1560
      - host: host to connect to
1561
      - port: port to use
1562
      - user: user for login
1563
      - command: the command, broken into parts as an array
1564
      - display: unknown, potentially unused?
1565

1566
  """
1567

    
1568
  output = {}
1569
  for inst_name in instance_param_dict:
1570
    instance = instance_param_dict[inst_name]["instance"]
1571
    pnode = instance_param_dict[inst_name]["node"]
1572
    group = instance_param_dict[inst_name]["group"]
1573
    hvparams = instance_param_dict[inst_name]["hvParams"]
1574
    beparams = instance_param_dict[inst_name]["beParams"]
1575

    
1576
    instance = objects.Instance.FromDict(instance)
1577
    pnode = objects.Node.FromDict(pnode)
1578
    group = objects.NodeGroup.FromDict(group)
1579

    
1580
    h = get_hv_fn(instance.hypervisor)
1581
    output[inst_name] = h.GetInstanceConsole(instance, pnode, group,
1582
                                             hvparams, beparams).ToDict()
1583

    
1584
  return output
1585

    
1586

    
1587
def _InstanceLogName(kind, os_name, instance, component):
1588
  """Compute the OS log filename for a given instance and operation.
1589

1590
  The instance name and os name are passed in as strings since not all
1591
  operations have these as part of an instance object.
1592

1593
  @type kind: string
1594
  @param kind: the operation type (e.g. add, import, etc.)
1595
  @type os_name: string
1596
  @param os_name: the os name
1597
  @type instance: string
1598
  @param instance: the name of the instance being imported/added/etc.
1599
  @type component: string or None
1600
  @param component: the name of the component of the instance being
1601
      transferred
1602

1603
  """
1604
  # TODO: Use tempfile.mkstemp to create unique filename
1605
  if component:
1606
    assert "/" not in component
1607
    c_msg = "-%s" % component
1608
  else:
1609
    c_msg = ""
1610
  base = ("%s-%s-%s%s-%s.log" %
1611
          (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1612
  return utils.PathJoin(pathutils.LOG_OS_DIR, base)
1613

    
1614

    
1615
def InstanceOsAdd(instance, reinstall, debug):
1616
  """Add an OS to an instance.
1617

1618
  @type instance: L{objects.Instance}
1619
  @param instance: Instance whose OS is to be installed
1620
  @type reinstall: boolean
1621
  @param reinstall: whether this is an instance reinstall
1622
  @type debug: integer
1623
  @param debug: debug level, passed to the OS scripts
1624
  @rtype: None
1625

1626
  """
1627
  inst_os = OSFromDisk(instance.os)
1628

    
1629
  create_env = OSEnvironment(instance, inst_os, debug)
1630
  if reinstall:
1631
    create_env["INSTANCE_REINSTALL"] = "1"
1632

    
1633
  logfile = _InstanceLogName("add", instance.os, instance.name, None)
1634

    
1635
  result = utils.RunCmd([inst_os.create_script], env=create_env,
1636
                        cwd=inst_os.path, output=logfile, reset_env=True)
1637
  if result.failed:
1638
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
1639
                  " output: %s", result.cmd, result.fail_reason, logfile,
1640
                  result.output)
1641
    lines = [utils.SafeEncode(val)
1642
             for val in utils.TailFile(logfile, lines=20)]
1643
    _Fail("OS create script failed (%s), last lines in the"
1644
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1645

    
1646

    
1647
def RunRenameInstance(instance, old_name, debug):
1648
  """Run the OS rename script for an instance.
1649

1650
  @type instance: L{objects.Instance}
1651
  @param instance: Instance whose OS is to be installed
1652
  @type old_name: string
1653
  @param old_name: previous instance name
1654
  @type debug: integer
1655
  @param debug: debug level, passed to the OS scripts
1656
  @rtype: boolean
1657
  @return: the success of the operation
1658

1659
  """
1660
  inst_os = OSFromDisk(instance.os)
1661

    
1662
  rename_env = OSEnvironment(instance, inst_os, debug)
1663
  rename_env["OLD_INSTANCE_NAME"] = old_name
1664

    
1665
  logfile = _InstanceLogName("rename", instance.os,
1666
                             "%s-%s" % (old_name, instance.name), None)
1667

    
1668
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1669
                        cwd=inst_os.path, output=logfile, reset_env=True)
1670

    
1671
  if result.failed:
1672
    logging.error("os create command '%s' returned error: %s output: %s",
1673
                  result.cmd, result.fail_reason, result.output)
1674
    lines = [utils.SafeEncode(val)
1675
             for val in utils.TailFile(logfile, lines=20)]
1676
    _Fail("OS rename script failed (%s), last lines in the"
1677
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1678

    
1679

    
1680
def _GetBlockDevSymlinkPath(instance_name, idx, _dir=None):
1681
  """Returns symlink path for block device.
1682

1683
  """
1684
  if _dir is None:
1685
    _dir = pathutils.DISK_LINKS_DIR
1686

    
1687
  return utils.PathJoin(_dir,
1688
                        ("%s%s%s" %
1689
                         (instance_name, constants.DISK_SEPARATOR, idx)))
1690

    
1691

    
1692
def _SymlinkBlockDev(instance_name, device_path, idx):
1693
  """Set up symlinks to a instance's block device.
1694

1695
  This is an auxiliary function run when an instance is start (on the primary
1696
  node) or when an instance is migrated (on the target node).
1697

1698

1699
  @param instance_name: the name of the target instance
1700
  @param device_path: path of the physical block device, on the node
1701
  @param idx: the disk index
1702
  @return: absolute path to the disk's symlink
1703

1704
  """
1705
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1706
  try:
1707
    os.symlink(device_path, link_name)
1708
  except OSError, err:
1709
    if err.errno == errno.EEXIST:
1710
      if (not os.path.islink(link_name) or
1711
          os.readlink(link_name) != device_path):
1712
        os.remove(link_name)
1713
        os.symlink(device_path, link_name)
1714
    else:
1715
      raise
1716

    
1717
  return link_name
1718

    
1719

    
1720
def _RemoveBlockDevLinks(instance_name, disks):
1721
  """Remove the block device symlinks belonging to the given instance.
1722

1723
  """
1724
  for idx, _ in enumerate(disks):
1725
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1726
    if os.path.islink(link_name):
1727
      try:
1728
        os.remove(link_name)
1729
      except OSError:
1730
        logging.exception("Can't remove symlink '%s'", link_name)
1731

    
1732

    
1733
def _CalculateDeviceURI(instance, disk, device):
1734
  """Get the URI for the device.
1735

1736
  @type instance: L{objects.Instance}
1737
  @param instance: the instance which disk belongs to
1738
  @type disk: L{objects.Disk}
1739
  @param disk: the target disk object
1740
  @type device: L{bdev.BlockDev}
1741
  @param device: the corresponding BlockDevice
1742
  @rtype: string
1743
  @return: the device uri if any else None
1744

1745
  """
1746
  access_mode = disk.params.get(constants.LDP_ACCESS,
1747
                                constants.DISK_KERNELSPACE)
1748
  if access_mode == constants.DISK_USERSPACE:
1749
    # This can raise errors.BlockDeviceError
1750
    return device.GetUserspaceAccessUri(instance.hypervisor)
1751
  else:
1752
    return None
1753

    
1754

    
1755
def _GatherAndLinkBlockDevs(instance):
1756
  """Set up an instance's block device(s).
1757

1758
  This is run on the primary node at instance startup. The block
1759
  devices must be already assembled.
1760

1761
  @type instance: L{objects.Instance}
1762
  @param instance: the instance whose disks we should assemble
1763
  @rtype: list
1764
  @return: list of (disk_object, link_name, drive_uri)
1765

1766
  """
1767
  block_devices = []
1768
  for idx, disk in enumerate(instance.disks):
1769
    device = _RecursiveFindBD(disk)
1770
    if device is None:
1771
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1772
                                    str(disk))
1773
    device.Open()
1774
    try:
1775
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1776
    except OSError, e:
1777
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1778
                                    e.strerror)
1779
    uri = _CalculateDeviceURI(instance, disk, device)
1780

    
1781
    block_devices.append((disk, link_name, uri))
1782

    
1783
  return block_devices
1784

    
1785

    
1786
def StartInstance(instance, startup_paused, reason, store_reason=True):
1787
  """Start an instance.
1788

1789
  @type instance: L{objects.Instance}
1790
  @param instance: the instance object
1791
  @type startup_paused: bool
1792
  @param instance: pause instance at startup?
1793
  @type reason: list of reasons
1794
  @param reason: the reason trail for this startup
1795
  @type store_reason: boolean
1796
  @param store_reason: whether to store the shutdown reason trail on file
1797
  @rtype: None
1798

1799
  """
1800
  running_instances = GetInstanceListForHypervisor(instance.hypervisor,
1801
                                                   instance.hvparams)
1802

    
1803
  if instance.name in running_instances:
1804
    logging.info("Instance %s already running, not starting", instance.name)
1805
    return
1806

    
1807
  try:
1808
    block_devices = _GatherAndLinkBlockDevs(instance)
1809
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1810
    hyper.StartInstance(instance, block_devices, startup_paused)
1811
    if store_reason:
1812
      _StoreInstReasonTrail(instance.name, reason)
1813
  except errors.BlockDeviceError, err:
1814
    _Fail("Block device error: %s", err, exc=True)
1815
  except errors.HypervisorError, err:
1816
    _RemoveBlockDevLinks(instance.name, instance.disks)
1817
    _Fail("Hypervisor error: %s", err, exc=True)
1818

    
1819

    
1820
def InstanceShutdown(instance, timeout, reason, store_reason=True):
1821
  """Shut an instance down.
1822

1823
  @note: this functions uses polling with a hardcoded timeout.
1824

1825
  @type instance: L{objects.Instance}
1826
  @param instance: the instance object
1827
  @type timeout: integer
1828
  @param timeout: maximum timeout for soft shutdown
1829
  @type reason: list of reasons
1830
  @param reason: the reason trail for this shutdown
1831
  @type store_reason: boolean
1832
  @param store_reason: whether to store the shutdown reason trail on file
1833
  @rtype: None
1834

1835
  """
1836
  hv_name = instance.hypervisor
1837
  hyper = hypervisor.GetHypervisor(hv_name)
1838
  iname = instance.name
1839

    
1840
  if instance.name not in hyper.ListInstances(instance.hvparams):
1841
    logging.info("Instance %s not running, doing nothing", iname)
1842
    return
1843

    
1844
  class _TryShutdown:
1845
    def __init__(self):
1846
      self.tried_once = False
1847

    
1848
    def __call__(self):
1849
      if iname not in hyper.ListInstances(instance.hvparams):
1850
        return
1851

    
1852
      try:
1853
        hyper.StopInstance(instance, retry=self.tried_once)
1854
        if store_reason:
1855
          _StoreInstReasonTrail(instance.name, reason)
1856
      except errors.HypervisorError, err:
1857
        if iname not in hyper.ListInstances(instance.hvparams):
1858
          # if the instance is no longer existing, consider this a
1859
          # success and go to cleanup
1860
          return
1861

    
1862
        _Fail("Failed to stop instance %s: %s", iname, err)
1863

    
1864
      self.tried_once = True
1865

    
1866
      raise utils.RetryAgain()
1867

    
1868
  try:
1869
    utils.Retry(_TryShutdown(), 5, timeout)
1870
  except utils.RetryTimeout:
1871
    # the shutdown did not succeed
1872
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1873

    
1874
    try:
1875
      hyper.StopInstance(instance, force=True)
1876
    except errors.HypervisorError, err:
1877
      if iname in hyper.ListInstances(instance.hvparams):
1878
        # only raise an error if the instance still exists, otherwise
1879
        # the error could simply be "instance ... unknown"!
1880
        _Fail("Failed to force stop instance %s: %s", iname, err)
1881

    
1882
    time.sleep(1)
1883

    
1884
    if iname in hyper.ListInstances(instance.hvparams):
1885
      _Fail("Could not shutdown instance %s even by destroy", iname)
1886

    
1887
  try:
1888
    hyper.CleanupInstance(instance.name)
1889
  except errors.HypervisorError, err:
1890
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1891

    
1892
  _RemoveBlockDevLinks(iname, instance.disks)
1893

    
1894

    
1895
def InstanceReboot(instance, reboot_type, shutdown_timeout, reason):
1896
  """Reboot an instance.
1897

1898
  @type instance: L{objects.Instance}
1899
  @param instance: the instance object to reboot
1900
  @type reboot_type: str
1901
  @param reboot_type: the type of reboot, one the following
1902
    constants:
1903
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1904
        instance OS, do not recreate the VM
1905
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1906
        restart the VM (at the hypervisor level)
1907
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1908
        not accepted here, since that mode is handled differently, in
1909
        cmdlib, and translates into full stop and start of the
1910
        instance (instead of a call_instance_reboot RPC)
1911
  @type shutdown_timeout: integer
1912
  @param shutdown_timeout: maximum timeout for soft shutdown
1913
  @type reason: list of reasons
1914
  @param reason: the reason trail for this reboot
1915
  @rtype: None
1916

1917
  """
1918
  running_instances = GetInstanceListForHypervisor(instance.hypervisor,
1919
                                                   instance.hvparams)
1920

    
1921
  if instance.name not in running_instances:
1922
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1923

    
1924
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1925
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1926
    try:
1927
      hyper.RebootInstance(instance)
1928
    except errors.HypervisorError, err:
1929
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1930
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1931
    try:
1932
      InstanceShutdown(instance, shutdown_timeout, reason, store_reason=False)
1933
      result = StartInstance(instance, False, reason, store_reason=False)
1934
      _StoreInstReasonTrail(instance.name, reason)
1935
      return result
1936
    except errors.HypervisorError, err:
1937
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1938
  else:
1939
    _Fail("Invalid reboot_type received: %s", reboot_type)
1940

    
1941

    
1942
def InstanceBalloonMemory(instance, memory):
1943
  """Resize an instance's memory.
1944

1945
  @type instance: L{objects.Instance}
1946
  @param instance: the instance object
1947
  @type memory: int
1948
  @param memory: new memory amount in MB
1949
  @rtype: None
1950

1951
  """
1952
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1953
  running = hyper.ListInstances(instance.hvparams)
1954
  if instance.name not in running:
1955
    logging.info("Instance %s is not running, cannot balloon", instance.name)
1956
    return
1957
  try:
1958
    hyper.BalloonInstanceMemory(instance, memory)
1959
  except errors.HypervisorError, err:
1960
    _Fail("Failed to balloon instance memory: %s", err, exc=True)
1961

    
1962

    
1963
def MigrationInfo(instance):
1964
  """Gather information about an instance to be migrated.
1965

1966
  @type instance: L{objects.Instance}
1967
  @param instance: the instance definition
1968

1969
  """
1970
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1971
  try:
1972
    info = hyper.MigrationInfo(instance)
1973
  except errors.HypervisorError, err:
1974
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1975
  return info
1976

    
1977

    
1978
def AcceptInstance(instance, info, target):
1979
  """Prepare the node to accept an instance.
1980

1981
  @type instance: L{objects.Instance}
1982
  @param instance: the instance definition
1983
  @type info: string/data (opaque)
1984
  @param info: migration information, from the source node
1985
  @type target: string
1986
  @param target: target host (usually ip), on this node
1987

1988
  """
1989
  # TODO: why is this required only for DTS_EXT_MIRROR?
1990
  if instance.disk_template in constants.DTS_EXT_MIRROR:
1991
    # Create the symlinks, as the disks are not active
1992
    # in any way
1993
    try:
1994
      _GatherAndLinkBlockDevs(instance)
1995
    except errors.BlockDeviceError, err:
1996
      _Fail("Block device error: %s", err, exc=True)
1997

    
1998
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1999
  try:
2000
    hyper.AcceptInstance(instance, info, target)
2001
  except errors.HypervisorError, err:
2002
    if instance.disk_template in constants.DTS_EXT_MIRROR:
2003
      _RemoveBlockDevLinks(instance.name, instance.disks)
2004
    _Fail("Failed to accept instance: %s", err, exc=True)
2005

    
2006

    
2007
def FinalizeMigrationDst(instance, info, success):
2008
  """Finalize any preparation to accept an instance.
2009

2010
  @type instance: L{objects.Instance}
2011
  @param instance: the instance definition
2012
  @type info: string/data (opaque)
2013
  @param info: migration information, from the source node
2014
  @type success: boolean
2015
  @param success: whether the migration was a success or a failure
2016

2017
  """
2018
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2019
  try:
2020
    hyper.FinalizeMigrationDst(instance, info, success)
2021
  except errors.HypervisorError, err:
2022
    _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
2023

    
2024

    
2025
def MigrateInstance(cluster_name, instance, target, live):
2026
  """Migrates an instance to another node.
2027

2028
  @type cluster_name: string
2029
  @param cluster_name: name of the cluster
2030
  @type instance: L{objects.Instance}
2031
  @param instance: the instance definition
2032
  @type target: string
2033
  @param target: the target node name
2034
  @type live: boolean
2035
  @param live: whether the migration should be done live or not (the
2036
      interpretation of this parameter is left to the hypervisor)
2037
  @raise RPCFail: if migration fails for some reason
2038

2039
  """
2040
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2041

    
2042
  try:
2043
    hyper.MigrateInstance(cluster_name, instance, target, live)
2044
  except errors.HypervisorError, err:
2045
    _Fail("Failed to migrate instance: %s", err, exc=True)
2046

    
2047

    
2048
def FinalizeMigrationSource(instance, success, live):
2049
  """Finalize the instance migration on the source node.
2050

2051
  @type instance: L{objects.Instance}
2052
  @param instance: the instance definition of the migrated instance
2053
  @type success: bool
2054
  @param success: whether the migration succeeded or not
2055
  @type live: bool
2056
  @param live: whether the user requested a live migration or not
2057
  @raise RPCFail: If the execution fails for some reason
2058

2059
  """
2060
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2061

    
2062
  try:
2063
    hyper.FinalizeMigrationSource(instance, success, live)
2064
  except Exception, err:  # pylint: disable=W0703
2065
    _Fail("Failed to finalize the migration on the source node: %s", err,
2066
          exc=True)
2067

    
2068

    
2069
def GetMigrationStatus(instance):
2070
  """Get the migration status
2071

2072
  @type instance: L{objects.Instance}
2073
  @param instance: the instance that is being migrated
2074
  @rtype: L{objects.MigrationStatus}
2075
  @return: the status of the current migration (one of
2076
           L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
2077
           progress info that can be retrieved from the hypervisor
2078
  @raise RPCFail: If the migration status cannot be retrieved
2079

2080
  """
2081
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2082
  try:
2083
    return hyper.GetMigrationStatus(instance)
2084
  except Exception, err:  # pylint: disable=W0703
2085
    _Fail("Failed to get migration status: %s", err, exc=True)
2086

    
2087

    
2088
def HotplugDevice(instance, action, dev_type, device, extra, seq):
2089
  """Hotplug a device
2090

2091
  Hotplug is currently supported only for KVM Hypervisor.
2092
  @type instance: L{objects.Instance}
2093
  @param instance: the instance to which we hotplug a device
2094
  @type action: string
2095
  @param action: the hotplug action to perform
2096
  @type dev_type: string
2097
  @param dev_type: the device type to hotplug
2098
  @type device: either L{objects.NIC} or L{objects.Disk}
2099
  @param device: the device object to hotplug
2100
  @type extra: string
2101
  @param extra: extra info used by hotplug code (e.g. disk link)
2102
  @type seq: int
2103
  @param seq: the index of the device from master perspective
2104
  @raise RPCFail: in case instance does not have KVM hypervisor
2105

2106
  """
2107
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2108
  try:
2109
    hyper.VerifyHotplugSupport(instance, action, dev_type)
2110
  except errors.HotplugError, err:
2111
    _Fail("Hotplug is not supported: %s", err)
2112

    
2113
  if action == constants.HOTPLUG_ACTION_ADD:
2114
    fn = hyper.HotAddDevice
2115
  elif action == constants.HOTPLUG_ACTION_REMOVE:
2116
    fn = hyper.HotDelDevice
2117
  elif action == constants.HOTPLUG_ACTION_MODIFY:
2118
    fn = hyper.HotModDevice
2119
  else:
2120
    assert action in constants.HOTPLUG_ALL_ACTIONS
2121

    
2122
  return fn(instance, dev_type, device, extra, seq)
2123

    
2124

    
2125
def HotplugSupported(instance):
2126
  """Checks if hotplug is generally supported.
2127

2128
  """
2129
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2130
  try:
2131
    hyper.HotplugSupported(instance)
2132
  except errors.HotplugError, err:
2133
    _Fail("Hotplug is not supported: %s", err)
2134

    
2135

    
2136
def BlockdevCreate(disk, size, owner, on_primary, info, excl_stor):
2137
  """Creates a block device for an instance.
2138

2139
  @type disk: L{objects.Disk}
2140
  @param disk: the object describing the disk we should create
2141
  @type size: int
2142
  @param size: the size of the physical underlying device, in MiB
2143
  @type owner: str
2144
  @param owner: the name of the instance for which disk is created,
2145
      used for device cache data
2146
  @type on_primary: boolean
2147
  @param on_primary:  indicates if it is the primary node or not
2148
  @type info: string
2149
  @param info: string that will be sent to the physical device
2150
      creation, used for example to set (LVM) tags on LVs
2151
  @type excl_stor: boolean
2152
  @param excl_stor: Whether exclusive_storage is active
2153

2154
  @return: the new unique_id of the device (this can sometime be
2155
      computed only after creation), or None. On secondary nodes,
2156
      it's not required to return anything.
2157

2158
  """
2159
  # TODO: remove the obsolete "size" argument
2160
  # pylint: disable=W0613
2161
  clist = []
2162
  if disk.children:
2163
    for child in disk.children:
2164
      try:
2165
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
2166
      except errors.BlockDeviceError, err:
2167
        _Fail("Can't assemble device %s: %s", child, err)
2168
      if on_primary or disk.AssembleOnSecondary():
2169
        # we need the children open in case the device itself has to
2170
        # be assembled
2171
        try:
2172
          # pylint: disable=E1103
2173
          crdev.Open()
2174
        except errors.BlockDeviceError, err:
2175
          _Fail("Can't make child '%s' read-write: %s", child, err)
2176
      clist.append(crdev)
2177

    
2178
  try:
2179
    device = bdev.Create(disk, clist, excl_stor)
2180
  except errors.BlockDeviceError, err:
2181
    _Fail("Can't create block device: %s", err)
2182

    
2183
  if on_primary or disk.AssembleOnSecondary():
2184
    try:
2185
      device.Assemble()
2186
    except errors.BlockDeviceError, err:
2187
      _Fail("Can't assemble device after creation, unusual event: %s", err)
2188
    if on_primary or disk.OpenOnSecondary():
2189
      try:
2190
        device.Open(force=True)
2191
      except errors.BlockDeviceError, err:
2192
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
2193
    DevCacheManager.UpdateCache(device.dev_path, owner,
2194
                                on_primary, disk.iv_name)
2195

    
2196
  device.SetInfo(info)
2197

    
2198
  return device.unique_id
2199

    
2200

    
2201
def _WipeDevice(path, offset, size):
2202
  """This function actually wipes the device.
2203

2204
  @param path: The path to the device to wipe
2205
  @param offset: The offset in MiB in the file
2206
  @param size: The size in MiB to write
2207

2208
  """
2209
  # Internal sizes are always in Mebibytes; if the following "dd" command
2210
  # should use a different block size the offset and size given to this
2211
  # function must be adjusted accordingly before being passed to "dd".
2212
  block_size = 1024 * 1024
2213

    
2214
  cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
2215
         "bs=%s" % block_size, "oflag=direct", "of=%s" % path,
2216
         "count=%d" % size]
2217
  result = utils.RunCmd(cmd)
2218

    
2219
  if result.failed:
2220
    _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
2221
          result.fail_reason, result.output)
2222

    
2223

    
2224
def BlockdevWipe(disk, offset, size):
2225
  """Wipes a block device.
2226

2227
  @type disk: L{objects.Disk}
2228
  @param disk: the disk object we want to wipe
2229
  @type offset: int
2230
  @param offset: The offset in MiB in the file
2231
  @type size: int
2232
  @param size: The size in MiB to write
2233

2234
  """
2235
  try:
2236
    rdev = _RecursiveFindBD(disk)
2237
  except errors.BlockDeviceError:
2238
    rdev = None
2239

    
2240
  if not rdev:
2241
    _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
2242

    
2243
  # Do cross verify some of the parameters
2244
  if offset < 0:
2245
    _Fail("Negative offset")
2246
  if size < 0:
2247
    _Fail("Negative size")
2248
  if offset > rdev.size:
2249
    _Fail("Offset is bigger than device size")
2250
  if (offset + size) > rdev.size:
2251
    _Fail("The provided offset and size to wipe is bigger than device size")
2252

    
2253
  _WipeDevice(rdev.dev_path, offset, size)
2254

    
2255

    
2256
def BlockdevPauseResumeSync(disks, pause):
2257
  """Pause or resume the sync of the block device.
2258

2259
  @type disks: list of L{objects.Disk}
2260
  @param disks: the disks object we want to pause/resume
2261
  @type pause: bool
2262
  @param pause: Wheater to pause or resume
2263

2264
  """
2265
  success = []
2266
  for disk in disks:
2267
    try:
2268
      rdev = _RecursiveFindBD(disk)
2269
    except errors.BlockDeviceError:
2270
      rdev = None
2271

    
2272
    if not rdev:
2273
      success.append((False, ("Cannot change sync for device %s:"
2274
                              " device not found" % disk.iv_name)))
2275
      continue
2276

    
2277
    result = rdev.PauseResumeSync(pause)
2278

    
2279
    if result:
2280
      success.append((result, None))
2281
    else:
2282
      if pause:
2283
        msg = "Pause"
2284
      else:
2285
        msg = "Resume"
2286
      success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
2287

    
2288
  return success
2289

    
2290

    
2291
def BlockdevRemove(disk):
2292
  """Remove a block device.
2293

2294
  @note: This is intended to be called recursively.
2295

2296
  @type disk: L{objects.Disk}
2297
  @param disk: the disk object we should remove
2298
  @rtype: boolean
2299
  @return: the success of the operation
2300

2301
  """
2302
  msgs = []
2303
  try:
2304
    rdev = _RecursiveFindBD(disk)
2305
  except errors.BlockDeviceError, err:
2306
    # probably can't attach
2307
    logging.info("Can't attach to device %s in remove", disk)
2308
    rdev = None
2309
  if rdev is not None:
2310
    r_path = rdev.dev_path
2311

    
2312
    def _TryRemove():
2313
      try:
2314
        rdev.Remove()
2315
        return []
2316
      except errors.BlockDeviceError, err:
2317
        return [str(err)]
2318

    
2319
    msgs.extend(utils.SimpleRetry([], _TryRemove,
2320
                                  constants.DISK_REMOVE_RETRY_INTERVAL,
2321
                                  constants.DISK_REMOVE_RETRY_TIMEOUT))
2322

    
2323
    if not msgs:
2324
      DevCacheManager.RemoveCache(r_path)
2325

    
2326
  if disk.children:
2327
    for child in disk.children:
2328
      try:
2329
        BlockdevRemove(child)
2330
      except RPCFail, err:
2331
        msgs.append(str(err))
2332

    
2333
  if msgs:
2334
    _Fail("; ".join(msgs))
2335

    
2336

    
2337
def _RecursiveAssembleBD(disk, owner, as_primary):
2338
  """Activate a block device for an instance.
2339

2340
  This is run on the primary and secondary nodes for an instance.
2341

2342
  @note: this function is called recursively.
2343

2344
  @type disk: L{objects.Disk}
2345
  @param disk: the disk we try to assemble
2346
  @type owner: str
2347
  @param owner: the name of the instance which owns the disk
2348
  @type as_primary: boolean
2349
  @param as_primary: if we should make the block device
2350
      read/write
2351

2352
  @return: the assembled device or None (in case no device
2353
      was assembled)
2354
  @raise errors.BlockDeviceError: in case there is an error
2355
      during the activation of the children or the device
2356
      itself
2357

2358
  """
2359
  children = []
2360
  if disk.children:
2361
    mcn = disk.ChildrenNeeded()
2362
    if mcn == -1:
2363
      mcn = 0 # max number of Nones allowed
2364
    else:
2365
      mcn = len(disk.children) - mcn # max number of Nones
2366
    for chld_disk in disk.children:
2367
      try:
2368
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
2369
      except errors.BlockDeviceError, err:
2370
        if children.count(None) >= mcn:
2371
          raise
2372
        cdev = None
2373
        logging.error("Error in child activation (but continuing): %s",
2374
                      str(err))
2375
      children.append(cdev)
2376

    
2377
  if as_primary or disk.AssembleOnSecondary():
2378
    r_dev = bdev.Assemble(disk, children)
2379
    result = r_dev
2380
    if as_primary or disk.OpenOnSecondary():
2381
      r_dev.Open()
2382
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
2383
                                as_primary, disk.iv_name)
2384

    
2385
  else:
2386
    result = True
2387
  return result
2388

    
2389

    
2390
def BlockdevAssemble(disk, owner, as_primary, idx):
2391
  """Activate a block device for an instance.
2392

2393
  This is a wrapper over _RecursiveAssembleBD.
2394

2395
  @rtype: str or boolean
2396
  @return: a tuple with the C{/dev/...} path and the created symlink
2397
      for primary nodes, and (C{True}, C{True}) for secondary nodes
2398

2399
  """
2400
  try:
2401
    result = _RecursiveAssembleBD(disk, owner, as_primary)
2402
    if isinstance(result, BlockDev):
2403
      # pylint: disable=E1103
2404
      dev_path = result.dev_path
2405
      link_name = None
2406
      if as_primary:
2407
        link_name = _SymlinkBlockDev(owner, dev_path, idx)
2408
    elif result:
2409
      return result, result
2410
    else:
2411
      _Fail("Unexpected result from _RecursiveAssembleBD")
2412
  except errors.BlockDeviceError, err:
2413
    _Fail("Error while assembling disk: %s", err, exc=True)
2414
  except OSError, err:
2415
    _Fail("Error while symlinking disk: %s", err, exc=True)
2416

    
2417
  return dev_path, link_name
2418

    
2419

    
2420
def BlockdevShutdown(disk):
2421
  """Shut down a block device.
2422

2423
  First, if the device is assembled (Attach() is successful), then
2424
  the device is shutdown. Then the children of the device are
2425
  shutdown.
2426

2427
  This function is called recursively. Note that we don't cache the
2428
  children or such, as oppossed to assemble, shutdown of different
2429
  devices doesn't require that the upper device was active.
2430

2431
  @type disk: L{objects.Disk}
2432
  @param disk: the description of the disk we should
2433
      shutdown
2434
  @rtype: None
2435

2436
  """
2437
  msgs = []
2438
  r_dev = _RecursiveFindBD(disk)
2439
  if r_dev is not None:
2440
    r_path = r_dev.dev_path
2441
    try:
2442
      r_dev.Shutdown()
2443
      DevCacheManager.RemoveCache(r_path)
2444
    except errors.BlockDeviceError, err:
2445
      msgs.append(str(err))
2446

    
2447
  if disk.children:
2448
    for child in disk.children:
2449
      try:
2450
        BlockdevShutdown(child)
2451
      except RPCFail, err:
2452
        msgs.append(str(err))
2453

    
2454
  if msgs:
2455
    _Fail("; ".join(msgs))
2456

    
2457

    
2458
def BlockdevAddchildren(parent_cdev, new_cdevs):
2459
  """Extend a mirrored block device.
2460

2461
  @type parent_cdev: L{objects.Disk}
2462
  @param parent_cdev: the disk to which we should add children
2463
  @type new_cdevs: list of L{objects.Disk}
2464
  @param new_cdevs: the list of children which we should add
2465
  @rtype: None
2466

2467
  """
2468
  parent_bdev = _RecursiveFindBD(parent_cdev)
2469
  if parent_bdev is None:
2470
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
2471
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
2472
  if new_bdevs.count(None) > 0:
2473
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
2474
  parent_bdev.AddChildren(new_bdevs)
2475

    
2476

    
2477
def BlockdevRemovechildren(parent_cdev, new_cdevs):
2478
  """Shrink a mirrored block device.
2479

2480
  @type parent_cdev: L{objects.Disk}
2481
  @param parent_cdev: the disk from which we should remove children
2482
  @type new_cdevs: list of L{objects.Disk}
2483
  @param new_cdevs: the list of children which we should remove
2484
  @rtype: None
2485

2486
  """
2487
  parent_bdev = _RecursiveFindBD(parent_cdev)
2488
  if parent_bdev is None:
2489
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
2490
  devs = []
2491
  for disk in new_cdevs:
2492
    rpath = disk.StaticDevPath()
2493
    if rpath is None:
2494
      bd = _RecursiveFindBD(disk)
2495
      if bd is None:
2496
        _Fail("Can't find device %s while removing children", disk)
2497
      else:
2498
        devs.append(bd.dev_path)
2499
    else:
2500
      if not utils.IsNormAbsPath(rpath):
2501
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
2502
      devs.append(rpath)
2503
  parent_bdev.RemoveChildren(devs)
2504

    
2505

    
2506
def BlockdevGetmirrorstatus(disks):
2507
  """Get the mirroring status of a list of devices.
2508

2509
  @type disks: list of L{objects.Disk}
2510
  @param disks: the list of disks which we should query
2511
  @rtype: disk
2512
  @return: List of L{objects.BlockDevStatus}, one for each disk
2513
  @raise errors.BlockDeviceError: if any of the disks cannot be
2514
      found
2515

2516
  """
2517
  stats = []
2518
  for dsk in disks:
2519
    rbd = _RecursiveFindBD(dsk)
2520
    if rbd is None:
2521
      _Fail("Can't find device %s", dsk)
2522

    
2523
    stats.append(rbd.CombinedSyncStatus())
2524

    
2525
  return stats
2526

    
2527

    
2528
def BlockdevGetmirrorstatusMulti(disks):
2529
  """Get the mirroring status of a list of devices.
2530

2531
  @type disks: list of L{objects.Disk}
2532
  @param disks: the list of disks which we should query
2533
  @rtype: disk
2534
  @return: List of tuples, (bool, status), one for each disk; bool denotes
2535
    success/failure, status is L{objects.BlockDevStatus} on success, string
2536
    otherwise
2537

2538
  """
2539
  result = []
2540
  for disk in disks:
2541
    try:
2542
      rbd = _RecursiveFindBD(disk)
2543
      if rbd is None:
2544
        result.append((False, "Can't find device %s" % disk))
2545
        continue
2546

    
2547
      status = rbd.CombinedSyncStatus()
2548
    except errors.BlockDeviceError, err:
2549
      logging.exception("Error while getting disk status")
2550
      result.append((False, str(err)))
2551
    else:
2552
      result.append((True, status))
2553

    
2554
  assert len(disks) == len(result)
2555

    
2556
  return result
2557

    
2558

    
2559
def _RecursiveFindBD(disk):
2560
  """Check if a device is activated.
2561

2562
  If so, return information about the real device.
2563

2564
  @type disk: L{objects.Disk}
2565
  @param disk: the disk object we need to find
2566

2567
  @return: None if the device can't be found,
2568
      otherwise the device instance
2569

2570
  """
2571
  children = []
2572
  if disk.children:
2573
    for chdisk in disk.children:
2574
      children.append(_RecursiveFindBD(chdisk))
2575

    
2576
  return bdev.FindDevice(disk, children)
2577

    
2578

    
2579
def _OpenRealBD(disk):
2580
  """Opens the underlying block device of a disk.
2581

2582
  @type disk: L{objects.Disk}
2583
  @param disk: the disk object we want to open
2584

2585
  """
2586
  real_disk = _RecursiveFindBD(disk)
2587
  if real_disk is None:
2588
    _Fail("Block device '%s' is not set up", disk)
2589

    
2590
  real_disk.Open()
2591

    
2592
  return real_disk
2593

    
2594

    
2595
def BlockdevFind(disk):
2596
  """Check if a device is activated.
2597

2598
  If it is, return information about the real device.
2599

2600
  @type disk: L{objects.Disk}
2601
  @param disk: the disk to find
2602
  @rtype: None or objects.BlockDevStatus
2603
  @return: None if the disk cannot be found, otherwise a the current
2604
           information
2605

2606
  """
2607
  try:
2608
    rbd = _RecursiveFindBD(disk)
2609
  except errors.BlockDeviceError, err:
2610
    _Fail("Failed to find device: %s", err, exc=True)
2611

    
2612
  if rbd is None:
2613
    return None
2614

    
2615
  return rbd.GetSyncStatus()
2616

    
2617

    
2618
def BlockdevGetdimensions(disks):
2619
  """Computes the size of the given disks.
2620

2621
  If a disk is not found, returns None instead.
2622

2623
  @type disks: list of L{objects.Disk}
2624
  @param disks: the list of disk to compute the size for
2625
  @rtype: list
2626
  @return: list with elements None if the disk cannot be found,
2627
      otherwise the pair (size, spindles), where spindles is None if the
2628
      device doesn't support that
2629

2630
  """
2631
  result = []
2632
  for cf in disks:
2633
    try:
2634
      rbd = _RecursiveFindBD(cf)
2635
    except errors.BlockDeviceError:
2636
      result.append(None)
2637
      continue
2638
    if rbd is None:
2639
      result.append(None)
2640
    else:
2641
      result.append(rbd.GetActualDimensions())
2642
  return result
2643

    
2644

    
2645
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2646
  """Write a file to the filesystem.
2647

2648
  This allows the master to overwrite(!) a file. It will only perform
2649
  the operation if the file belongs to a list of configuration files.
2650

2651
  @type file_name: str
2652
  @param file_name: the target file name
2653
  @type data: str
2654
  @param data: the new contents of the file
2655
  @type mode: int
2656
  @param mode: the mode to give the file (can be None)
2657
  @type uid: string
2658
  @param uid: the owner of the file
2659
  @type gid: string
2660
  @param gid: the group of the file
2661
  @type atime: float
2662
  @param atime: the atime to set on the file (can be None)
2663
  @type mtime: float
2664
  @param mtime: the mtime to set on the file (can be None)
2665
  @rtype: None
2666

2667
  """
2668
  file_name = vcluster.LocalizeVirtualPath(file_name)
2669

    
2670
  if not os.path.isabs(file_name):
2671
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2672

    
2673
  if file_name not in _ALLOWED_UPLOAD_FILES:
2674
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2675
          file_name)
2676

    
2677
  raw_data = _Decompress(data)
2678

    
2679
  if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2680
    _Fail("Invalid username/groupname type")
2681

    
2682
  getents = runtime.GetEnts()
2683
  uid = getents.LookupUser(uid)
2684
  gid = getents.LookupGroup(gid)
2685

    
2686
  utils.SafeWriteFile(file_name, None,
2687
                      data=raw_data, mode=mode, uid=uid, gid=gid,
2688
                      atime=atime, mtime=mtime)
2689

    
2690

    
2691
def RunOob(oob_program, command, node, timeout):
2692
  """Executes oob_program with given command on given node.
2693

2694
  @param oob_program: The path to the executable oob_program
2695
  @param command: The command to invoke on oob_program
2696
  @param node: The node given as an argument to the program
2697
  @param timeout: Timeout after which we kill the oob program
2698

2699
  @return: stdout
2700
  @raise RPCFail: If execution fails for some reason
2701

2702
  """
2703
  result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2704

    
2705
  if result.failed:
2706
    _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2707
          result.fail_reason, result.output)
2708

    
2709
  return result.stdout
2710

    
2711

    
2712
def _OSOndiskAPIVersion(os_dir):
2713
  """Compute and return the API version of a given OS.
2714

2715
  This function will try to read the API version of the OS residing in
2716
  the 'os_dir' directory.
2717

2718
  @type os_dir: str
2719
  @param os_dir: the directory in which we should look for the OS
2720
  @rtype: tuple
2721
  @return: tuple (status, data) with status denoting the validity and
2722
      data holding either the vaid versions or an error message
2723

2724
  """
2725
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2726

    
2727
  try:
2728
    st = os.stat(api_file)
2729
  except EnvironmentError, err:
2730
    return False, ("Required file '%s' not found under path %s: %s" %
2731
                   (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err)))
2732

    
2733
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2734
    return False, ("File '%s' in %s is not a regular file" %
2735
                   (constants.OS_API_FILE, os_dir))
2736

    
2737
  try:
2738
    api_versions = utils.ReadFile(api_file).splitlines()
2739
  except EnvironmentError, err:
2740
    return False, ("Error while reading the API version file at %s: %s" %
2741
                   (api_file, utils.ErrnoOrStr(err)))
2742

    
2743
  try:
2744
    api_versions = [int(version.strip()) for version in api_versions]
2745
  except (TypeError, ValueError), err:
2746
    return False, ("API version(s) can't be converted to integer: %s" %
2747
                   str(err))
2748

    
2749
  return True, api_versions
2750

    
2751

    
2752
def DiagnoseOS(top_dirs=None):
2753
  """Compute the validity for all OSes.
2754

2755
  @type top_dirs: list
2756
  @param top_dirs: the list of directories in which to
2757
      search (if not given defaults to
2758
      L{pathutils.OS_SEARCH_PATH})
2759
  @rtype: list of L{objects.OS}
2760
  @return: a list of tuples (name, path, status, diagnose, variants,
2761
      parameters, api_version) for all (potential) OSes under all
2762
      search paths, where:
2763
          - name is the (potential) OS name
2764
          - path is the full path to the OS
2765
          - status True/False is the validity of the OS
2766
          - diagnose is the error message for an invalid OS, otherwise empty
2767
          - variants is a list of supported OS variants, if any
2768
          - parameters is a list of (name, help) parameters, if any
2769
          - api_version is a list of support OS API versions
2770

2771
  """
2772
  if top_dirs is None:
2773
    top_dirs = pathutils.OS_SEARCH_PATH
2774

    
2775
  result = []
2776
  for dir_name in top_dirs:
2777
    if os.path.isdir(dir_name):
2778
      try:
2779
        f_names = utils.ListVisibleFiles(dir_name)
2780
      except EnvironmentError, err:
2781
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2782
        break
2783
      for name in f_names:
2784
        os_path = utils.PathJoin(dir_name, name)
2785
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2786
        if status:
2787
          diagnose = ""
2788
          variants = os_inst.supported_variants
2789
          parameters = os_inst.supported_parameters
2790
          api_versions = os_inst.api_versions
2791
        else:
2792
          diagnose = os_inst
2793
          variants = parameters = api_versions = []
2794
        result.append((name, os_path, status, diagnose, variants,
2795
                       parameters, api_versions))
2796

    
2797
  return result
2798

    
2799

    
2800
def _TryOSFromDisk(name, base_dir=None):
2801
  """Create an OS instance from disk.
2802

2803
  This function will return an OS instance if the given name is a
2804
  valid OS name.
2805

2806
  @type base_dir: string
2807
  @keyword base_dir: Base directory containing OS installations.
2808
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2809
  @rtype: tuple
2810
  @return: success and either the OS instance if we find a valid one,
2811
      or error message
2812

2813
  """
2814
  if base_dir is None:
2815
    os_dir = utils.FindFile(name, pathutils.OS_SEARCH_PATH, os.path.isdir)
2816
  else:
2817
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2818

    
2819
  if os_dir is None:
2820
    return False, "Directory for OS %s not found in search path" % name
2821

    
2822
  status, api_versions = _OSOndiskAPIVersion(os_dir)
2823
  if not status:
2824
    # push the error up
2825
    return status, api_versions
2826

    
2827
  if not constants.OS_API_VERSIONS.intersection(api_versions):
2828
    return False, ("API version mismatch for path '%s': found %s, want %s." %
2829
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
2830

    
2831
  # OS Files dictionary, we will populate it with the absolute path
2832
  # names; if the value is True, then it is a required file, otherwise
2833
  # an optional one
2834
  os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2835

    
2836
  if max(api_versions) >= constants.OS_API_V15:
2837
    os_files[constants.OS_VARIANTS_FILE] = False
2838

    
2839
  if max(api_versions) >= constants.OS_API_V20:
2840
    os_files[constants.OS_PARAMETERS_FILE] = True
2841
  else:
2842
    del os_files[constants.OS_SCRIPT_VERIFY]
2843

    
2844
  for (filename, required) in os_files.items():
2845
    os_files[filename] = utils.PathJoin(os_dir, filename)
2846

    
2847
    try:
2848
      st = os.stat(os_files[filename])
2849
    except EnvironmentError, err:
2850
      if err.errno == errno.ENOENT and not required:
2851
        del os_files[filename]
2852
        continue
2853
      return False, ("File '%s' under path '%s' is missing (%s)" %
2854
                     (filename, os_dir, utils.ErrnoOrStr(err)))
2855

    
2856
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2857
      return False, ("File '%s' under path '%s' is not a regular file" %
2858
                     (filename, os_dir))
2859

    
2860
    if filename in constants.OS_SCRIPTS:
2861
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2862
        return False, ("File '%s' under path '%s' is not executable" %
2863
                       (filename, os_dir))
2864

    
2865
  variants = []
2866
  if constants.OS_VARIANTS_FILE in os_files:
2867
    variants_file = os_files[constants.OS_VARIANTS_FILE]
2868
    try:
2869
      variants = \
2870
        utils.FilterEmptyLinesAndComments(utils.ReadFile(variants_file))
2871
    except EnvironmentError, err:
2872
      # we accept missing files, but not other errors
2873
      if err.errno != errno.ENOENT:
2874
        return False, ("Error while reading the OS variants file at %s: %s" %
2875
                       (variants_file, utils.ErrnoOrStr(err)))
2876

    
2877
  parameters = []
2878
  if constants.OS_PARAMETERS_FILE in os_files:
2879
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2880
    try:
2881
      parameters = utils.ReadFile(parameters_file).splitlines()
2882
    except EnvironmentError, err:
2883
      return False, ("Error while reading the OS parameters file at %s: %s" %
2884
                     (parameters_file, utils.ErrnoOrStr(err)))
2885
    parameters = [v.split(None, 1) for v in parameters]
2886

    
2887
  os_obj = objects.OS(name=name, path=os_dir,
2888
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
2889
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
2890
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
2891
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
2892
                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2893
                                                 None),
2894
                      supported_variants=variants,
2895
                      supported_parameters=parameters,
2896
                      api_versions=api_versions)
2897
  return True, os_obj
2898

    
2899

    
2900
def OSFromDisk(name, base_dir=None):
2901
  """Create an OS instance from disk.
2902

2903
  This function will return an OS instance if the given name is a
2904
  valid OS name. Otherwise, it will raise an appropriate
2905
  L{RPCFail} exception, detailing why this is not a valid OS.
2906

2907
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2908
  an exception but returns true/false status data.
2909

2910
  @type base_dir: string
2911
  @keyword base_dir: Base directory containing OS installations.
2912
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2913
  @rtype: L{objects.OS}
2914
  @return: the OS instance if we find a valid one
2915
  @raise RPCFail: if we don't find a valid OS
2916

2917
  """
2918
  name_only = objects.OS.GetName(name)
2919
  status, payload = _TryOSFromDisk(name_only, base_dir)
2920

    
2921
  if not status:
2922
    _Fail(payload)
2923

    
2924
  return payload
2925

    
2926

    
2927
def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2928
  """Calculate the basic environment for an os script.
2929

2930
  @type os_name: str
2931
  @param os_name: full operating system name (including variant)
2932
  @type inst_os: L{objects.OS}
2933
  @param inst_os: operating system for which the environment is being built
2934
  @type os_params: dict
2935
  @param os_params: the OS parameters
2936
  @type debug: integer
2937
  @param debug: debug level (0 or 1, for OS Api 10)
2938
  @rtype: dict
2939
  @return: dict of environment variables
2940
  @raise errors.BlockDeviceError: if the block device
2941
      cannot be found
2942

2943
  """
2944
  result = {}
2945
  api_version = \
2946
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2947
  result["OS_API_VERSION"] = "%d" % api_version
2948
  result["OS_NAME"] = inst_os.name
2949
  result["DEBUG_LEVEL"] = "%d" % debug
2950

    
2951
  # OS variants
2952
  if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2953
    variant = objects.OS.GetVariant(os_name)
2954
    if not variant:
2955
      variant = inst_os.supported_variants[0]
2956
  else:
2957
    variant = ""
2958
  result["OS_VARIANT"] = variant
2959

    
2960
  # OS params
2961
  for pname, pvalue in os_params.items():
2962
    result["OSP_%s" % pname.upper()] = pvalue
2963

    
2964
  # Set a default path otherwise programs called by OS scripts (or
2965
  # even hooks called from OS scripts) might break, and we don't want
2966
  # to have each script require setting a PATH variable
2967
  result["PATH"] = constants.HOOKS_PATH
2968

    
2969
  return result
2970

    
2971

    
2972
def OSEnvironment(instance, inst_os, debug=0):
2973
  """Calculate the environment for an os script.
2974

2975
  @type instance: L{objects.Instance}
2976
  @param instance: target instance for the os script run
2977
  @type inst_os: L{objects.OS}
2978
  @param inst_os: operating system for which the environment is being built
2979
  @type debug: integer
2980
  @param debug: debug level (0 or 1, for OS Api 10)
2981
  @rtype: dict
2982
  @return: dict of environment variables
2983
  @raise errors.BlockDeviceError: if the block device
2984
      cannot be found
2985

2986
  """
2987
  result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2988

    
2989
  for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2990
    result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2991

    
2992
  result["HYPERVISOR"] = instance.hypervisor
2993
  result["DISK_COUNT"] = "%d" % len(instance.disks)
2994
  result["NIC_COUNT"] = "%d" % len(instance.nics)
2995
  result["INSTANCE_SECONDARY_NODES"] = \
2996
      ("%s" % " ".join(instance.secondary_nodes))
2997

    
2998
  # Disks
2999
  for idx, disk in enumerate(instance.disks):
3000
    real_disk = _OpenRealBD(disk)
3001
    result["DISK_%d_PATH" % idx] = real_disk.dev_path
3002
    result["DISK_%d_ACCESS" % idx] = disk.mode
3003
    result["DISK_%d_UUID" % idx] = disk.uuid
3004
    if disk.name:
3005
      result["DISK_%d_NAME" % idx] = disk.name
3006
    if constants.HV_DISK_TYPE in instance.hvparams:
3007
      result["DISK_%d_FRONTEND_TYPE" % idx] = \
3008
        instance.hvparams[constants.HV_DISK_TYPE]
3009
    if disk.dev_type in constants.DTS_BLOCK:
3010
      result["DISK_%d_BACKEND_TYPE" % idx] = "block"
3011
    elif disk.dev_type in constants.DTS_FILEBASED:
3012
      result["DISK_%d_BACKEND_TYPE" % idx] = \
3013
        "file:%s" % disk.logical_id[0]
3014

    
3015
  # NICs
3016
  for idx, nic in enumerate(instance.nics):
3017
    result["NIC_%d_MAC" % idx] = nic.mac
3018
    result["NIC_%d_UUID" % idx] = nic.uuid
3019
    if nic.name:
3020
      result["NIC_%d_NAME" % idx] = nic.name
3021
    if nic.ip:
3022
      result["NIC_%d_IP" % idx] = nic.ip
3023
    result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
3024
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3025
      result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
3026
    if nic.nicparams[constants.NIC_LINK]:
3027
      result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
3028
    if nic.netinfo:
3029
      nobj = objects.Network.FromDict(nic.netinfo)
3030
      result.update(nobj.HooksDict("NIC_%d_" % idx))
3031
    if constants.HV_NIC_TYPE in instance.hvparams:
3032
      result["NIC_%d_FRONTEND_TYPE" % idx] = \
3033
        instance.hvparams[constants.HV_NIC_TYPE]
3034

    
3035
  # HV/BE params
3036
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
3037
    for key, value in source.items():
3038
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
3039

    
3040
  return result
3041

    
3042

    
3043
def DiagnoseExtStorage(top_dirs=None):
3044
  """Compute the validity for all ExtStorage Providers.
3045

3046
  @type top_dirs: list
3047
  @param top_dirs: the list of directories in which to
3048
      search (if not given defaults to
3049
      L{pathutils.ES_SEARCH_PATH})
3050
  @rtype: list of L{objects.ExtStorage}
3051
  @return: a list of tuples (name, path, status, diagnose, parameters)
3052
      for all (potential) ExtStorage Providers under all
3053
      search paths, where:
3054
          - name is the (potential) ExtStorage Provider
3055
          - path is the full path to the ExtStorage Provider
3056
          - status True/False is the validity of the ExtStorage Provider
3057
          - diagnose is the error message for an invalid ExtStorage Provider,
3058
            otherwise empty
3059
          - parameters is a list of (name, help) parameters, if any
3060

3061
  """
3062
  if top_dirs is None:
3063
    top_dirs = pathutils.ES_SEARCH_PATH
3064

    
3065
  result = []
3066
  for dir_name in top_dirs:
3067
    if os.path.isdir(dir_name):
3068
      try:
3069
        f_names = utils.ListVisibleFiles(dir_name)
3070
      except EnvironmentError, err:
3071
        logging.exception("Can't list the ExtStorage directory %s: %s",
3072
                          dir_name, err)
3073
        break
3074
      for name in f_names:
3075
        es_path = utils.PathJoin(dir_name, name)
3076
        status, es_inst = bdev.ExtStorageFromDisk(name, base_dir=dir_name)
3077
        if status:
3078
          diagnose = ""
3079
          parameters = es_inst.supported_parameters
3080
        else:
3081
          diagnose = es_inst
3082
          parameters = []
3083
        result.append((name, es_path, status, diagnose, parameters))
3084

    
3085
  return result
3086

    
3087

    
3088
def BlockdevGrow(disk, amount, dryrun, backingstore, excl_stor):
3089
  """Grow a stack of block devices.
3090

3091
  This function is called recursively, with the childrens being the
3092
  first ones to resize.
3093

3094
  @type disk: L{objects.Disk}
3095
  @param disk: the disk to be grown
3096
  @type amount: integer
3097
  @param amount: the amount (in mebibytes) to grow with
3098
  @type dryrun: boolean
3099
  @param dryrun: whether to execute the operation in simulation mode
3100
      only, without actually increasing the size
3101
  @param backingstore: whether to execute the operation on backing storage
3102
      only, or on "logical" storage only; e.g. DRBD is logical storage,
3103
      whereas LVM, file, RBD are backing storage
3104
  @rtype: (status, result)
3105
  @type excl_stor: boolean
3106
  @param excl_stor: Whether exclusive_storage is active
3107
  @return: a tuple with the status of the operation (True/False), and
3108
      the errors message if status is False
3109

3110
  """
3111
  r_dev = _RecursiveFindBD(disk)
3112
  if r_dev is None:
3113
    _Fail("Cannot find block device %s", disk)
3114

    
3115
  try:
3116
    r_dev.Grow(amount, dryrun, backingstore, excl_stor)
3117
  except errors.BlockDeviceError, err:
3118
    _Fail("Failed to grow block device: %s", err, exc=True)
3119

    
3120

    
3121
def BlockdevSnapshot(disk):
3122
  """Create a snapshot copy of a block device.
3123

3124
  This function is called recursively, and the snapshot is actually created
3125
  just for the leaf lvm backend device.
3126

3127
  @type disk: L{objects.Disk}
3128
  @param disk: the disk to be snapshotted
3129
  @rtype: string
3130
  @return: snapshot disk ID as (vg, lv)
3131

3132
  """
3133
  if disk.dev_type == constants.DT_DRBD8:
3134
    if not disk.children:
3135
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
3136
            disk.unique_id)
3137
    return BlockdevSnapshot(disk.children[0])
3138
  elif disk.dev_type == constants.DT_PLAIN:
3139
    r_dev = _RecursiveFindBD(disk)
3140
    if r_dev is not None:
3141
      # FIXME: choose a saner value for the snapshot size
3142
      # let's stay on the safe side and ask for the full size, for now
3143
      return r_dev.Snapshot(disk.size)
3144
    else:
3145
      _Fail("Cannot find block device %s", disk)
3146
  else:
3147
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
3148
          disk.unique_id, disk.dev_type)
3149

    
3150

    
3151
def BlockdevSetInfo(disk, info):
3152
  """Sets 'metadata' information on block devices.
3153

3154
  This function sets 'info' metadata on block devices. Initial
3155
  information is set at device creation; this function should be used
3156
  for example after renames.
3157

3158
  @type disk: L{objects.Disk}
3159
  @param disk: the disk to be grown
3160
  @type info: string
3161
  @param info: new 'info' metadata
3162
  @rtype: (status, result)
3163
  @return: a tuple with the status of the operation (True/False), and
3164
      the errors message if status is False
3165

3166
  """
3167
  r_dev = _RecursiveFindBD(disk)
3168
  if r_dev is None:
3169
    _Fail("Cannot find block device %s", disk)
3170

    
3171
  try:
3172
    r_dev.SetInfo(info)
3173
  except errors.BlockDeviceError, err:
3174
    _Fail("Failed to set information on block device: %s", err, exc=True)
3175

    
3176

    
3177
def FinalizeExport(instance, snap_disks):
3178
  """Write out the export configuration information.
3179

3180
  @type instance: L{objects.Instance}
3181
  @param instance: the instance which we export, used for
3182
      saving configuration
3183
  @type snap_disks: list of L{objects.Disk}
3184
  @param snap_disks: list of snapshot block devices, which
3185
      will be used to get the actual name of the dump file
3186

3187
  @rtype: None
3188

3189
  """
3190
  destdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name + ".new")
3191
  finaldestdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name)
3192

    
3193
  config = objects.SerializableConfigParser()
3194

    
3195
  config.add_section(constants.INISECT_EXP)
3196
  config.set(constants.INISECT_EXP, "version", "0")
3197
  config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
3198
  config.set(constants.INISECT_EXP, "source", instance.primary_node)
3199
  config.set(constants.INISECT_EXP, "os", instance.os)
3200
  config.set(constants.INISECT_EXP, "compression", "none")
3201

    
3202
  config.add_section(constants.INISECT_INS)
3203
  config.set(constants.INISECT_INS, "name", instance.name)
3204
  config.set(constants.INISECT_INS, "maxmem", "%d" %
3205
             instance.beparams[constants.BE_MAXMEM])
3206
  config.set(constants.INISECT_INS, "minmem", "%d" %
3207
             instance.beparams[constants.BE_MINMEM])
3208
  # "memory" is deprecated, but useful for exporting to old ganeti versions
3209
  config.set(constants.INISECT_INS, "memory", "%d" %
3210
             instance.beparams[constants.BE_MAXMEM])
3211
  config.set(constants.INISECT_INS, "vcpus", "%d" %
3212
             instance.beparams[constants.BE_VCPUS])
3213
  config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
3214
  config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
3215
  config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
3216

    
3217
  nic_total = 0
3218
  for nic_count, nic in enumerate(instance.nics):
3219
    nic_total += 1
3220
    config.set(constants.INISECT_INS, "nic%d_mac" %
3221
               nic_count, "%s" % nic.mac)
3222
    config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
3223
    config.set(constants.INISECT_INS, "nic%d_network" % nic_count,
3224
               "%s" % nic.network)
3225
    for param in constants.NICS_PARAMETER_TYPES:
3226
      config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
3227
                 "%s" % nic.nicparams.get(param, None))
3228
  # TODO: redundant: on load can read nics until it doesn't exist
3229
  config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
3230

    
3231
  disk_total = 0
3232
  for disk_count, disk in enumerate(snap_disks):
3233
    if disk:
3234
      disk_total += 1
3235
      config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
3236
                 ("%s" % disk.iv_name))
3237
      config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
3238
                 ("%s" % disk.logical_id[1]))
3239
      config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
3240
                 ("%d" % disk.size))
3241

    
3242
  config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
3243

    
3244
  # New-style hypervisor/backend parameters
3245

    
3246
  config.add_section(constants.INISECT_HYP)
3247
  for name, value in instance.hvparams.items():
3248
    if name not in constants.HVC_GLOBALS:
3249
      config.set(constants.INISECT_HYP, name, str(value))
3250

    
3251
  config.add_section(constants.INISECT_BEP)
3252
  for name, value in instance.beparams.items():
3253
    config.set(constants.INISECT_BEP, name, str(value))
3254

    
3255
  config.add_section(constants.INISECT_OSP)
3256
  for name, value in instance.osparams.items():
3257
    config.set(constants.INISECT_OSP, name, str(value))
3258

    
3259
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
3260
                  data=config.Dumps())
3261
  shutil.rmtree(finaldestdir, ignore_errors=True)
3262
  shutil.move(destdir, finaldestdir)
3263

    
3264

    
3265
def ExportInfo(dest):
3266
  """Get export configuration information.
3267

3268
  @type dest: str
3269
  @param dest: directory containing the export
3270

3271
  @rtype: L{objects.SerializableConfigParser}
3272
  @return: a serializable config file containing the
3273
      export info
3274

3275
  """
3276
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
3277

    
3278
  config = objects.SerializableConfigParser()
3279
  config.read(cff)
3280

    
3281
  if (not config.has_section(constants.INISECT_EXP) or
3282
      not config.has_section(constants.INISECT_INS)):
3283
    _Fail("Export info file doesn't have the required fields")
3284

    
3285
  return config.Dumps()
3286

    
3287

    
3288
def ListExports():
3289
  """Return a list of exports currently available on this machine.
3290

3291
  @rtype: list
3292
  @return: list of the exports
3293

3294
  """
3295
  if os.path.isdir(pathutils.EXPORT_DIR):
3296
    return sorted(utils.ListVisibleFiles(pathutils.EXPORT_DIR))
3297
  else:
3298
    _Fail("No exports directory")
3299

    
3300

    
3301
def RemoveExport(export):
3302
  """Remove an existing export from the node.
3303

3304
  @type export: str
3305
  @param export: the name of the export to remove
3306
  @rtype: None
3307

3308
  """
3309
  target = utils.PathJoin(pathutils.EXPORT_DIR, export)
3310

    
3311
  try:
3312
    shutil.rmtree(target)
3313
  except EnvironmentError, err:
3314
    _Fail("Error while removing the export: %s", err, exc=True)
3315

    
3316

    
3317
def BlockdevRename(devlist):
3318
  """Rename a list of block devices.
3319

3320
  @type devlist: list of tuples
3321
  @param devlist: list of tuples of the form  (disk, new_unique_id); disk is
3322
      an L{objects.Disk} object describing the current disk, and new
3323
      unique_id is the name we rename it to
3324
  @rtype: boolean
3325
  @return: True if all renames succeeded, False otherwise
3326

3327
  """
3328
  msgs = []
3329
  result = True
3330
  for disk, unique_id in devlist:
3331
    dev = _RecursiveFindBD(disk)
3332
    if dev is None:
3333
      msgs.append("Can't find device %s in rename" % str(disk))
3334
      result = False
3335
      continue
3336
    try:
3337
      old_rpath = dev.dev_path
3338
      dev.Rename(unique_id)
3339
      new_rpath = dev.dev_path
3340
      if old_rpath != new_rpath:
3341
        DevCacheManager.RemoveCache(old_rpath)
3342
        # FIXME: we should add the new cache information here, like:
3343
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
3344
        # but we don't have the owner here - maybe parse from existing
3345
        # cache? for now, we only lose lvm data when we rename, which
3346
        # is less critical than DRBD or MD
3347
    except errors.BlockDeviceError, err:
3348
      msgs.append("Can't rename device '%s' to '%s': %s" %
3349
                  (dev, unique_id, err))
3350
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
3351
      result = False
3352
  if not result:
3353
    _Fail("; ".join(msgs))
3354

    
3355

    
3356
def _TransformFileStorageDir(fs_dir):
3357
  """Checks whether given file_storage_dir is valid.
3358

3359
  Checks wheter the given fs_dir is within the cluster-wide default
3360
  file_storage_dir or the shared_file_storage_dir, which are stored in
3361
  SimpleStore. Only paths under those directories are allowed.
3362

3363
  @type fs_dir: str
3364
  @param fs_dir: the path to check
3365

3366
  @return: the normalized path if valid, None otherwise
3367

3368
  """
3369
  filestorage.CheckFileStoragePath(fs_dir)
3370

    
3371
  return os.path.normpath(fs_dir)
3372

    
3373

    
3374
def CreateFileStorageDir(file_storage_dir):
3375
  """Create file storage directory.
3376

3377
  @type file_storage_dir: str
3378
  @param file_storage_dir: directory to create
3379

3380
  @rtype: tuple
3381
  @return: tuple with first element a boolean indicating wheter dir
3382
      creation was successful or not
3383

3384
  """
3385
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
3386
  if os.path.exists(file_storage_dir):
3387
    if not os.path.isdir(file_storage_dir):
3388
      _Fail("Specified storage dir '%s' is not a directory",
3389
            file_storage_dir)
3390
  else:
3391
    try:
3392
      os.makedirs(file_storage_dir, 0750)
3393
    except OSError, err:
3394
      _Fail("Cannot create file storage directory '%s': %s",
3395
            file_storage_dir, err, exc=True)
3396

    
3397

    
3398
def RemoveFileStorageDir(file_storage_dir):
3399
  """Remove file storage directory.
3400

3401
  Remove it only if it's empty. If not log an error and return.
3402

3403
  @type file_storage_dir: str
3404
  @param file_storage_dir: the directory we should cleanup
3405
  @rtype: tuple (success,)
3406
  @return: tuple of one element, C{success}, denoting
3407
      whether the operation was successful
3408

3409
  """
3410
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
3411
  if os.path.exists(file_storage_dir):
3412
    if not os.path.isdir(file_storage_dir):
3413
      _Fail("Specified Storage directory '%s' is not a directory",
3414
            file_storage_dir)
3415
    # deletes dir only if empty, otherwise we want to fail the rpc call
3416
    try:
3417
      os.rmdir(file_storage_dir)
3418
    except OSError, err:
3419
      _Fail("Cannot remove file storage directory '%s': %s",
3420
            file_storage_dir, err)
3421

    
3422

    
3423
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
3424
  """Rename the file storage directory.
3425

3426
  @type old_file_storage_dir: str
3427
  @param old_file_storage_dir: the current path
3428
  @type new_file_storage_dir: str
3429
  @param new_file_storage_dir: the name we should rename to
3430
  @rtype: tuple (success,)
3431
  @return: tuple of one element, C{success}, denoting
3432
      whether the operation was successful
3433

3434
  """
3435
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
3436
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
3437
  if not os.path.exists(new_file_storage_dir):
3438
    if os.path.isdir(old_file_storage_dir):
3439
      try:
3440
        os.rename(old_file_storage_dir, new_file_storage_dir)
3441
      except OSError, err:
3442
        _Fail("Cannot rename '%s' to '%s': %s",
3443
              old_file_storage_dir, new_file_storage_dir, err)
3444
    else:
3445
      _Fail("Specified storage dir '%s' is not a directory",
3446
            old_file_storage_dir)
3447
  else:
3448
    if os.path.exists(old_file_storage_dir):
3449
      _Fail("Cannot rename '%s' to '%s': both locations exist",
3450
            old_file_storage_dir, new_file_storage_dir)
3451

    
3452

    
3453
def _EnsureJobQueueFile(file_name):
3454
  """Checks whether the given filename is in the queue directory.
3455

3456
  @type file_name: str
3457
  @param file_name: the file name we should check
3458
  @rtype: None
3459
  @raises RPCFail: if the file is not valid
3460

3461
  """
3462
  if not utils.IsBelowDir(pathutils.QUEUE_DIR, file_name):
3463
    _Fail("Passed job queue file '%s' does not belong to"
3464
          " the queue directory '%s'", file_name, pathutils.QUEUE_DIR)
3465

    
3466

    
3467
def JobQueueUpdate(file_name, content):
3468
  """Updates a file in the queue directory.
3469

3470
  This is just a wrapper over L{utils.io.WriteFile}, with proper
3471
  checking.
3472

3473
  @type file_name: str
3474
  @param file_name: the job file name
3475
  @type content: str
3476
  @param content: the new job contents
3477
  @rtype: boolean
3478
  @return: the success of the operation
3479

3480
  """
3481
  file_name = vcluster.LocalizeVirtualPath(file_name)
3482

    
3483
  _EnsureJobQueueFile(file_name)
3484
  getents = runtime.GetEnts()
3485

    
3486
  # Write and replace the file atomically
3487
  utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
3488
                  gid=getents.daemons_gid, mode=constants.JOB_QUEUE_FILES_PERMS)
3489

    
3490

    
3491
def JobQueueRename(old, new):
3492
  """Renames a job queue file.
3493

3494
  This is just a wrapper over os.rename with proper checking.
3495

3496
  @type old: str
3497
  @param old: the old (actual) file name
3498
  @type new: str
3499
  @param new: the desired file name
3500
  @rtype: tuple
3501
  @return: the success of the operation and payload
3502

3503
  """
3504
  old = vcluster.LocalizeVirtualPath(old)
3505
  new = vcluster.LocalizeVirtualPath(new)
3506

    
3507
  _EnsureJobQueueFile(old)
3508
  _EnsureJobQueueFile(new)
3509

    
3510
  getents = runtime.GetEnts()
3511

    
3512
  utils.RenameFile(old, new, mkdir=True, mkdir_mode=0750,
3513
                   dir_uid=getents.masterd_uid, dir_gid=getents.daemons_gid)
3514

    
3515

    
3516
def BlockdevClose(instance_name, disks):
3517
  """Closes the given block devices.
3518

3519
  This means they will be switched to secondary mode (in case of
3520
  DRBD).
3521

3522
  @param instance_name: if the argument is not empty, the symlinks
3523
      of this instance will be removed
3524
  @type disks: list of L{objects.Disk}
3525
  @param disks: the list of disks to be closed
3526
  @rtype: tuple (success, message)
3527
  @return: a tuple of success and message, where success
3528
      indicates the succes of the operation, and message
3529
      which will contain the error details in case we
3530
      failed
3531

3532
  """
3533
  bdevs = []
3534
  for cf in disks:
3535
    rd = _RecursiveFindBD(cf)
3536
    if rd is None:
3537
      _Fail("Can't find device %s", cf)
3538
    bdevs.append(rd)
3539

    
3540
  msg = []
3541
  for rd in bdevs:
3542
    try:
3543
      rd.Close()
3544
    except errors.BlockDeviceError, err:
3545
      msg.append(str(err))
3546
  if msg:
3547
    _Fail("Can't make devices secondary: %s", ",".join(msg))
3548
  else:
3549
    if instance_name:
3550
      _RemoveBlockDevLinks(instance_name, disks)
3551

    
3552

    
3553
def ValidateHVParams(hvname, hvparams):
3554
  """Validates the given hypervisor parameters.
3555

3556
  @type hvname: string
3557
  @param hvname: the hypervisor name
3558
  @type hvparams: dict
3559
  @param hvparams: the hypervisor parameters to be validated
3560
  @rtype: None
3561

3562
  """
3563
  try:
3564
    hv_type = hypervisor.GetHypervisor(hvname)
3565
    hv_type.ValidateParameters(hvparams)
3566
  except errors.HypervisorError, err:
3567
    _Fail(str(err), log=False)
3568

    
3569

    
3570
def _CheckOSPList(os_obj, parameters):
3571
  """Check whether a list of parameters is supported by the OS.
3572

3573
  @type os_obj: L{objects.OS}
3574
  @param os_obj: OS object to check
3575
  @type parameters: list
3576
  @param parameters: the list of parameters to check
3577

3578
  """
3579
  supported = [v[0] for v in os_obj.supported_parameters]
3580
  delta = frozenset(parameters).difference(supported)
3581
  if delta:
3582
    _Fail("The following parameters are not supported"
3583
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
3584

    
3585

    
3586
def ValidateOS(required, osname, checks, osparams):
3587
  """Validate the given OS' parameters.
3588

3589
  @type required: boolean
3590
  @param required: whether absence of the OS should translate into
3591
      failure or not
3592
  @type osname: string
3593
  @param osname: the OS to be validated
3594
  @type checks: list
3595
  @param checks: list of the checks to run (currently only 'parameters')
3596
  @type osparams: dict
3597
  @param osparams: dictionary with OS parameters
3598
  @rtype: boolean
3599
  @return: True if the validation passed, or False if the OS was not
3600
      found and L{required} was false
3601

3602
  """
3603
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
3604
    _Fail("Unknown checks required for OS %s: %s", osname,
3605
          set(checks).difference(constants.OS_VALIDATE_CALLS))
3606

    
3607
  name_only = objects.OS.GetName(osname)
3608
  status, tbv = _TryOSFromDisk(name_only, None)
3609

    
3610
  if not status:
3611
    if required:
3612
      _Fail(tbv)
3613
    else:
3614
      return False
3615

    
3616
  if max(tbv.api_versions) < constants.OS_API_V20:
3617
    return True
3618

    
3619
  if constants.OS_VALIDATE_PARAMETERS in checks:
3620
    _CheckOSPList(tbv, osparams.keys())
3621

    
3622
  validate_env = OSCoreEnv(osname, tbv, osparams)
3623
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
3624
                        cwd=tbv.path, reset_env=True)
3625
  if result.failed:
3626
    logging.error("os validate command '%s' returned error: %s output: %s",
3627
                  result.cmd, result.fail_reason, result.output)
3628
    _Fail("OS validation script failed (%s), output: %s",
3629
          result.fail_reason, result.output, log=False)
3630

    
3631
  return True
3632

    
3633

    
3634
def DemoteFromMC():
3635
  """Demotes the current node from master candidate role.
3636

3637
  """
3638
  # try to ensure we're not the master by mistake
3639
  master, myself = ssconf.GetMasterAndMyself()
3640
  if master == myself:
3641
    _Fail("ssconf status shows I'm the master node, will not demote")
3642

    
3643
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "check", constants.MASTERD])
3644
  if not result.failed:
3645
    _Fail("The master daemon is running, will not demote")
3646

    
3647
  try:
3648
    if os.path.isfile(pathutils.CLUSTER_CONF_FILE):
3649
      utils.CreateBackup(pathutils.CLUSTER_CONF_FILE)
3650
  except EnvironmentError, err:
3651
    if err.errno != errno.ENOENT:
3652
      _Fail("Error while backing up cluster file: %s", err, exc=True)
3653

    
3654
  utils.RemoveFile(pathutils.CLUSTER_CONF_FILE)
3655

    
3656

    
3657
def _GetX509Filenames(cryptodir, name):
3658
  """Returns the full paths for the private key and certificate.
3659

3660
  """
3661
  return (utils.PathJoin(cryptodir, name),
3662
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
3663
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
3664

    
3665

    
3666
def CreateX509Certificate(validity, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3667
  """Creates a new X509 certificate for SSL/TLS.
3668

3669
  @type validity: int
3670
  @param validity: Validity in seconds
3671
  @rtype: tuple; (string, string)
3672
  @return: Certificate name and public part
3673

3674
  """
3675
  (key_pem, cert_pem) = \
3676
    utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
3677
                                     min(validity, _MAX_SSL_CERT_VALIDITY))
3678

    
3679
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
3680
                              prefix="x509-%s-" % utils.TimestampForFilename())
3681
  try:
3682
    name = os.path.basename(cert_dir)
3683
    assert len(name) > 5
3684

    
3685
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3686

    
3687
    utils.WriteFile(key_file, mode=0400, data=key_pem)
3688
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
3689

    
3690
    # Never return private key as it shouldn't leave the node
3691
    return (name, cert_pem)
3692
  except Exception:
3693
    shutil.rmtree(cert_dir, ignore_errors=True)
3694
    raise
3695

    
3696

    
3697
def RemoveX509Certificate(name, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3698
  """Removes a X509 certificate.
3699

3700
  @type name: string
3701
  @param name: Certificate name
3702

3703
  """
3704
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3705

    
3706
  utils.RemoveFile(key_file)
3707
  utils.RemoveFile(cert_file)
3708

    
3709
  try:
3710
    os.rmdir(cert_dir)
3711
  except EnvironmentError, err:
3712
    _Fail("Cannot remove certificate directory '%s': %s",
3713
          cert_dir, err)
3714

    
3715

    
3716
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3717
  """Returns the command for the requested input/output.
3718

3719
  @type instance: L{objects.Instance}
3720
  @param instance: The instance object
3721
  @param mode: Import/export mode
3722
  @param ieio: Input/output type
3723
  @param ieargs: Input/output arguments
3724

3725
  """
3726
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3727

    
3728
  env = None
3729
  prefix = None
3730
  suffix = None
3731
  exp_size = None
3732

    
3733
  if ieio == constants.IEIO_FILE:
3734
    (filename, ) = ieargs
3735

    
3736
    if not utils.IsNormAbsPath(filename):
3737
      _Fail("Path '%s' is not normalized or absolute", filename)
3738

    
3739
    real_filename = os.path.realpath(filename)
3740
    directory = os.path.dirname(real_filename)
3741

    
3742
    if not utils.IsBelowDir(pathutils.EXPORT_DIR, real_filename):
3743
      _Fail("File '%s' is not under exports directory '%s': %s",
3744
            filename, pathutils.EXPORT_DIR, real_filename)
3745

    
3746
    # Create directory
3747
    utils.Makedirs(directory, mode=0750)
3748

    
3749
    quoted_filename = utils.ShellQuote(filename)
3750

    
3751
    if mode == constants.IEM_IMPORT:
3752
      suffix = "> %s" % quoted_filename
3753
    elif mode == constants.IEM_EXPORT:
3754
      suffix = "< %s" % quoted_filename
3755

    
3756
      # Retrieve file size
3757
      try:
3758
        st = os.stat(filename)
3759
      except EnvironmentError, err:
3760
        logging.error("Can't stat(2) %s: %s", filename, err)
3761
      else:
3762
        exp_size = utils.BytesToMebibyte(st.st_size)
3763

    
3764
  elif ieio == constants.IEIO_RAW_DISK:
3765
    (disk, ) = ieargs
3766

    
3767
    real_disk = _OpenRealBD(disk)
3768

    
3769
    if mode == constants.IEM_IMPORT:
3770
      # we use nocreat to fail if the device is not already there or we pass a
3771
      # wrong path; we use notrunc to no attempt truncate on an LV device
3772
      suffix = utils.BuildShellCmd("| dd of=%s conv=nocreat,notrunc bs=%s",
3773
                                   real_disk.dev_path,
3774
                                   str(1024 * 1024)) # 1 MB
3775

    
3776
    elif mode == constants.IEM_EXPORT:
3777
      # the block size on the read dd is 1MiB to match our units
3778
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3779
                                   real_disk.dev_path,
3780
                                   str(1024 * 1024), # 1 MB
3781
                                   str(disk.size))
3782
      exp_size = disk.size
3783

    
3784
  elif ieio == constants.IEIO_SCRIPT:
3785
    (disk, disk_index, ) = ieargs
3786

    
3787
    assert isinstance(disk_index, (int, long))
3788

    
3789
    inst_os = OSFromDisk(instance.os)
3790
    env = OSEnvironment(instance, inst_os)
3791

    
3792
    if mode == constants.IEM_IMPORT:
3793
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3794
      env["IMPORT_INDEX"] = str(disk_index)
3795
      script = inst_os.import_script
3796

    
3797
    elif mode == constants.IEM_EXPORT:
3798
      real_disk = _OpenRealBD(disk)
3799
      env["EXPORT_DEVICE"] = real_disk.dev_path
3800
      env["EXPORT_INDEX"] = str(disk_index)
3801
      script = inst_os.export_script
3802

    
3803
    # TODO: Pass special environment only to script
3804
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3805

    
3806
    if mode == constants.IEM_IMPORT:
3807
      suffix = "| %s" % script_cmd
3808

    
3809
    elif mode == constants.IEM_EXPORT:
3810
      prefix = "%s |" % script_cmd
3811

    
3812
    # Let script predict size
3813
    exp_size = constants.IE_CUSTOM_SIZE
3814

    
3815
  else:
3816
    _Fail("Invalid %s I/O mode %r", mode, ieio)
3817

    
3818
  return (env, prefix, suffix, exp_size)
3819

    
3820

    
3821
def _CreateImportExportStatusDir(prefix):
3822
  """Creates status directory for import/export.
3823

3824
  """
3825
  return tempfile.mkdtemp(dir=pathutils.IMPORT_EXPORT_DIR,
3826
                          prefix=("%s-%s-" %
3827
                                  (prefix, utils.TimestampForFilename())))
3828

    
3829

    
3830
def StartImportExportDaemon(mode, opts, host, port, instance, component,
3831
                            ieio, ieioargs):
3832
  """Starts an import or export daemon.
3833

3834
  @param mode: Import/output mode
3835
  @type opts: L{objects.ImportExportOptions}
3836
  @param opts: Daemon options
3837
  @type host: string
3838
  @param host: Remote host for export (None for import)
3839
  @type port: int
3840
  @param port: Remote port for export (None for import)
3841
  @type instance: L{objects.Instance}
3842
  @param instance: Instance object
3843
  @type component: string
3844
  @param component: which part of the instance is transferred now,
3845
      e.g. 'disk/0'
3846
  @param ieio: Input/output type
3847
  @param ieioargs: Input/output arguments
3848

3849
  """
3850
  if mode == constants.IEM_IMPORT:
3851
    prefix = "import"
3852

    
3853
    if not (host is None and port is None):
3854
      _Fail("Can not specify host or port on import")
3855

    
3856
  elif mode == constants.IEM_EXPORT:
3857
    prefix = "export"
3858

    
3859
    if host is None or port is None:
3860
      _Fail("Host and port must be specified for an export")
3861

    
3862
  else:
3863
    _Fail("Invalid mode %r", mode)
3864

    
3865
  if (opts.key_name is None) ^ (opts.ca_pem is None):
3866
    _Fail("Cluster certificate can only be used for both key and CA")
3867

    
3868
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3869
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3870

    
3871
  if opts.key_name is None:
3872
    # Use server.pem
3873
    key_path = pathutils.NODED_CERT_FILE
3874
    cert_path = pathutils.NODED_CERT_FILE
3875
    assert opts.ca_pem is None
3876
  else:
3877
    (_, key_path, cert_path) = _GetX509Filenames(pathutils.CRYPTO_KEYS_DIR,
3878
                                                 opts.key_name)
3879
    assert opts.ca_pem is not None
3880

    
3881
  for i in [key_path, cert_path]:
3882
    if not os.path.exists(i):
3883
      _Fail("File '%s' does not exist" % i)
3884

    
3885
  status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3886
  try:
3887
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3888
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3889
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3890

    
3891
    if opts.ca_pem is None:
3892
      # Use server.pem
3893
      ca = utils.ReadFile(pathutils.NODED_CERT_FILE)
3894
    else:
3895
      ca = opts.ca_pem
3896

    
3897
    # Write CA file
3898
    utils.WriteFile(ca_file, data=ca, mode=0400)
3899

    
3900
    cmd = [
3901
      pathutils.IMPORT_EXPORT_DAEMON,
3902
      status_file, mode,
3903
      "--key=%s" % key_path,
3904
      "--cert=%s" % cert_path,
3905
      "--ca=%s" % ca_file,
3906
      ]
3907

    
3908
    if host:
3909
      cmd.append("--host=%s" % host)
3910

    
3911
    if port:
3912
      cmd.append("--port=%s" % port)
3913

    
3914
    if opts.ipv6:
3915
      cmd.append("--ipv6")
3916
    else:
3917
      cmd.append("--ipv4")
3918

    
3919
    if opts.compress:
3920
      cmd.append("--compress=%s" % opts.compress)
3921

    
3922
    if opts.magic:
3923
      cmd.append("--magic=%s" % opts.magic)
3924

    
3925
    if exp_size is not None:
3926
      cmd.append("--expected-size=%s" % exp_size)
3927

    
3928
    if cmd_prefix:
3929
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
3930

    
3931
    if cmd_suffix:
3932
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
3933

    
3934
    if mode == constants.IEM_EXPORT:
3935
      # Retry connection a few times when connecting to remote peer
3936
      cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3937
      cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3938
    elif opts.connect_timeout is not None:
3939
      assert mode == constants.IEM_IMPORT
3940
      # Overall timeout for establishing connection while listening
3941
      cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3942

    
3943
    logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3944

    
3945
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3946
    # support for receiving a file descriptor for output
3947
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3948
                      output=logfile)
3949

    
3950
    # The import/export name is simply the status directory name
3951
    return os.path.basename(status_dir)
3952

    
3953
  except Exception:
3954
    shutil.rmtree(status_dir, ignore_errors=True)
3955
    raise
3956

    
3957

    
3958
def GetImportExportStatus(names):
3959
  """Returns import/export daemon status.
3960

3961
  @type names: sequence
3962
  @param names: List of names
3963
  @rtype: List of dicts
3964
  @return: Returns a list of the state of each named import/export or None if a
3965
           status couldn't be read
3966

3967
  """
3968
  result = []
3969

    
3970
  for name in names:
3971
    status_file = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name,
3972
                                 _IES_STATUS_FILE)
3973

    
3974
    try:
3975
      data = utils.ReadFile(status_file)
3976
    except EnvironmentError, err:
3977
      if err.errno != errno.ENOENT:
3978
        raise
3979
      data = None
3980

    
3981
    if not data:
3982
      result.append(None)
3983
      continue
3984

    
3985
    result.append(serializer.LoadJson(data))
3986

    
3987
  return result
3988

    
3989

    
3990
def AbortImportExport(name):
3991
  """Sends SIGTERM to a running import/export daemon.
3992

3993
  """
3994
  logging.info("Abort import/export %s", name)
3995

    
3996
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3997
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3998

    
3999
  if pid:
4000
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
4001
                 name, pid)
4002
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
4003

    
4004

    
4005
def CleanupImportExport(name):
4006
  """Cleanup after an import or export.
4007

4008
  If the import/export daemon is still running it's killed. Afterwards the
4009
  whole status directory is removed.
4010

4011
  """
4012
  logging.info("Finalizing import/export %s", name)
4013

    
4014
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
4015

    
4016
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
4017

    
4018
  if pid:
4019
    logging.info("Import/export %s is still running with PID %s",
4020
                 name, pid)
4021
    utils.KillProcess(pid, waitpid=False)
4022

    
4023
  shutil.rmtree(status_dir, ignore_errors=True)
4024

    
4025

    
4026
def _FindDisks(disks):
4027
  """Finds attached L{BlockDev}s for the given disks.
4028

4029
  @type disks: list of L{objects.Disk}
4030
  @param disks: the disk objects we need to find
4031

4032
  @return: list of L{BlockDev} objects or C{None} if a given disk
4033
           was not found or was no attached.
4034

4035
  """
4036
  bdevs = []
4037

    
4038
  for disk in disks:
4039
    rd = _RecursiveFindBD(disk)
4040
    if rd is None:
4041
      _Fail("Can't find device %s", disk)
4042
    bdevs.append(rd)
4043
  return bdevs
4044

    
4045

    
4046
def DrbdDisconnectNet(disks):
4047
  """Disconnects the network on a list of drbd devices.
4048

4049
  """
4050
  bdevs = _FindDisks(disks)
4051

    
4052
  # disconnect disks
4053
  for rd in bdevs:
4054
    try:
4055
      rd.DisconnectNet()
4056
    except errors.BlockDeviceError, err:
4057
      _Fail("Can't change network configuration to standalone mode: %s",
4058
            err, exc=True)
4059

    
4060

    
4061
def DrbdAttachNet(disks, instance_name, multimaster):
4062
  """Attaches the network on a list of drbd devices.
4063

4064
  """
4065
  bdevs = _FindDisks(disks)
4066

    
4067
  if multimaster:
4068
    for idx, rd in enumerate(bdevs):
4069
      try:
4070
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
4071
      except EnvironmentError, err:
4072
        _Fail("Can't create symlink: %s", err)
4073
  # reconnect disks, switch to new master configuration and if
4074
  # needed primary mode
4075
  for rd in bdevs:
4076
    try:
4077
      rd.AttachNet(multimaster)
4078
    except errors.BlockDeviceError, err:
4079
      _Fail("Can't change network configuration: %s", err)
4080

    
4081
  # wait until the disks are connected; we need to retry the re-attach
4082
  # if the device becomes standalone, as this might happen if the one
4083
  # node disconnects and reconnects in a different mode before the
4084
  # other node reconnects; in this case, one or both of the nodes will
4085
  # decide it has wrong configuration and switch to standalone
4086

    
4087
  def _Attach():
4088
    all_connected = True
4089

    
4090
    for rd in bdevs:
4091
      stats = rd.GetProcStatus()
4092

    
4093
      if multimaster:
4094
        # In the multimaster case we have to wait explicitly until
4095
        # the resource is Connected and UpToDate/UpToDate, because
4096
        # we promote *both nodes* to primary directly afterwards.
4097
        # Being in resync is not enough, since there is a race during which we
4098
        # may promote a node with an Outdated disk to primary, effectively
4099
        # tearing down the connection.
4100
        all_connected = (all_connected and
4101
                         stats.is_connected and
4102
                         stats.is_disk_uptodate and
4103
                         stats.peer_disk_uptodate)
4104
      else:
4105
        all_connected = (all_connected and
4106
                         (stats.is_connected or stats.is_in_resync))
4107

    
4108
      if stats.is_standalone:
4109
        # peer had different config info and this node became
4110
        # standalone, even though this should not happen with the
4111
        # new staged way of changing disk configs
4112
        try:
4113
          rd.AttachNet(multimaster)
4114
        except errors.BlockDeviceError, err:
4115
          _Fail("Can't change network configuration: %s", err)
4116

    
4117
    if not all_connected:
4118
      raise utils.RetryAgain()
4119

    
4120
  try:
4121
    # Start with a delay of 100 miliseconds and go up to 5 seconds
4122
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
4123
  except utils.RetryTimeout:
4124
    _Fail("Timeout in disk reconnecting")
4125

    
4126
  if multimaster:
4127
    # change to primary mode
4128
    for rd in bdevs:
4129
      try:
4130
        rd.Open()
4131
      except errors.BlockDeviceError, err:
4132
        _Fail("Can't change to primary mode: %s", err)
4133

    
4134

    
4135
def DrbdWaitSync(disks):
4136
  """Wait until DRBDs have synchronized.
4137

4138
  """
4139
  def _helper(rd):
4140
    stats = rd.GetProcStatus()
4141
    if not (stats.is_connected or stats.is_in_resync):
4142
      raise utils.RetryAgain()
4143
    return stats
4144

    
4145
  bdevs = _FindDisks(disks)
4146

    
4147
  min_resync = 100
4148
  alldone = True
4149
  for rd in bdevs:
4150
    try:
4151
      # poll each second for 15 seconds
4152
      stats = utils.Retry(_helper, 1, 15, args=[rd])
4153
    except utils.RetryTimeout:
4154
      stats = rd.GetProcStatus()
4155
      # last check
4156
      if not (stats.is_connected or stats.is_in_resync):
4157
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
4158
    alldone = alldone and (not stats.is_in_resync)
4159
    if stats.sync_percent is not None:
4160
      min_resync = min(min_resync, stats.sync_percent)
4161

    
4162
  return (alldone, min_resync)
4163

    
4164

    
4165
def DrbdNeedsActivation(disks):
4166
  """Checks which of the passed disks needs activation and returns their UUIDs.
4167

4168
  """
4169
  faulty_disks = []
4170

    
4171
  for disk in disks:
4172
    rd = _RecursiveFindBD(disk)
4173
    if rd is None:
4174
      faulty_disks.append(disk)
4175
      continue
4176

    
4177
    stats = rd.GetProcStatus()
4178
    if stats.is_standalone or stats.is_diskless:
4179
      faulty_disks.append(disk)
4180

    
4181
  return [disk.uuid for disk in faulty_disks]
4182

    
4183

    
4184
def GetDrbdUsermodeHelper():
4185
  """Returns DRBD usermode helper currently configured.
4186

4187
  """
4188
  try:
4189
    return drbd.DRBD8.GetUsermodeHelper()
4190
  except errors.BlockDeviceError, err:
4191
    _Fail(str(err))
4192

    
4193

    
4194
def PowercycleNode(hypervisor_type, hvparams=None):
4195
  """Hard-powercycle the node.
4196

4197
  Because we need to return first, and schedule the powercycle in the
4198
  background, we won't be able to report failures nicely.
4199

4200
  """
4201
  hyper = hypervisor.GetHypervisor(hypervisor_type)
4202
  try:
4203
    pid = os.fork()
4204
  except OSError:
4205
    # if we can't fork, we'll pretend that we're in the child process
4206
    pid = 0
4207
  if pid > 0:
4208
    return "Reboot scheduled in 5 seconds"
4209
  # ensure the child is running on ram
4210
  try:
4211
    utils.Mlockall()
4212
  except Exception: # pylint: disable=W0703
4213
    pass
4214
  time.sleep(5)
4215
  hyper.PowercycleNode(hvparams=hvparams)
4216

    
4217

    
4218
def _VerifyRestrictedCmdName(cmd):
4219
  """Verifies a restricted command name.
4220

4221
  @type cmd: string
4222
  @param cmd: Command name
4223
  @rtype: tuple; (boolean, string or None)
4224
  @return: The tuple's first element is the status; if C{False}, the second
4225
    element is an error message string, otherwise it's C{None}
4226

4227
  """
4228
  if not cmd.strip():
4229
    return (False, "Missing command name")
4230

    
4231
  if os.path.basename(cmd) != cmd:
4232
    return (False, "Invalid command name")
4233

    
4234
  if not constants.EXT_PLUGIN_MASK.match(cmd):
4235
    return (False, "Command name contains forbidden characters")
4236

    
4237
  return (True, None)
4238

    
4239

    
4240
def _CommonRestrictedCmdCheck(path, owner):
4241
  """Common checks for restricted command file system directories and files.
4242

4243
  @type path: string
4244
  @param path: Path to check
4245
  @param owner: C{None} or tuple containing UID and GID
4246
  @rtype: tuple; (boolean, string or C{os.stat} result)
4247
  @return: The tuple's first element is the status; if C{False}, the second
4248
    element is an error message string, otherwise it's the result of C{os.stat}
4249

4250
  """
4251
  if owner is None:
4252
    # Default to root as owner
4253
    owner = (0, 0)
4254

    
4255
  try:
4256
    st = os.stat(path)
4257
  except EnvironmentError, err:
4258
    return (False, "Can't stat(2) '%s': %s" % (path, err))
4259

    
4260
  if stat.S_IMODE(st.st_mode) & (~_RCMD_MAX_MODE):
4261
    return (False, "Permissions on '%s' are too permissive" % path)
4262

    
4263
  if (st.st_uid, st.st_gid) != owner:
4264
    (owner_uid, owner_gid) = owner
4265
    return (False, "'%s' is not owned by %s:%s" % (path, owner_uid, owner_gid))
4266

    
4267
  return (True, st)
4268

    
4269

    
4270
def _VerifyRestrictedCmdDirectory(path, _owner=None):
4271
  """Verifies restricted command directory.
4272

4273
  @type path: string
4274
  @param path: Path to check
4275
  @rtype: tuple; (boolean, string or None)
4276
  @return: The tuple's first element is the status; if C{False}, the second
4277
    element is an error message string, otherwise it's C{None}
4278

4279
  """
4280
  (status, value) = _CommonRestrictedCmdCheck(path, _owner)
4281

    
4282
  if not status:
4283
    return (False, value)
4284

    
4285
  if not stat.S_ISDIR(value.st_mode):
4286
    return (False, "Path '%s' is not a directory" % path)
4287

    
4288
  return (True, None)
4289

    
4290

    
4291
def _VerifyRestrictedCmd(path, cmd, _owner=None):
4292
  """Verifies a whole restricted command and returns its executable filename.
4293

4294
  @type path: string
4295
  @param path: Directory containing restricted commands
4296
  @type cmd: string
4297
  @param cmd: Command name
4298
  @rtype: tuple; (boolean, string)
4299
  @return: The tuple's first element is the status; if C{False}, the second
4300
    element is an error message string, otherwise the second element is the
4301
    absolute path to the executable
4302

4303
  """
4304
  executable = utils.PathJoin(path, cmd)
4305

    
4306
  (status, msg) = _CommonRestrictedCmdCheck(executable, _owner)
4307

    
4308
  if not status:
4309
    return (False, msg)
4310

    
4311
  if not utils.IsExecutable(executable):
4312
    return (False, "access(2) thinks '%s' can't be executed" % executable)
4313

    
4314
  return (True, executable)
4315

    
4316

    
4317
def _PrepareRestrictedCmd(path, cmd,
4318
                          _verify_dir=_VerifyRestrictedCmdDirectory,
4319
                          _verify_name=_VerifyRestrictedCmdName,
4320
                          _verify_cmd=_VerifyRestrictedCmd):
4321
  """Performs a number of tests on a restricted command.
4322

4323
  @type path: string
4324
  @param path: Directory containing restricted commands
4325
  @type cmd: string
4326
  @param cmd: Command name
4327
  @return: Same as L{_VerifyRestrictedCmd}
4328

4329
  """
4330
  # Verify the directory first
4331
  (status, msg) = _verify_dir(path)
4332
  if status:
4333
    # Check command if everything was alright
4334
    (status, msg) = _verify_name(cmd)
4335

    
4336
  if not status:
4337
    return (False, msg)
4338

    
4339
  # Check actual executable
4340
  return _verify_cmd(path, cmd)
4341

    
4342

    
4343
def RunRestrictedCmd(cmd,
4344
                     _lock_timeout=_RCMD_LOCK_TIMEOUT,
4345
                     _lock_file=pathutils.RESTRICTED_COMMANDS_LOCK_FILE,
4346
                     _path=pathutils.RESTRICTED_COMMANDS_DIR,
4347
                     _sleep_fn=time.sleep,
4348
                     _prepare_fn=_PrepareRestrictedCmd,
4349
                     _runcmd_fn=utils.RunCmd,
4350
                     _enabled=constants.ENABLE_RESTRICTED_COMMANDS):
4351
  """Executes a restricted command after performing strict tests.
4352

4353
  @type cmd: string
4354
  @param cmd: Command name
4355
  @rtype: string
4356
  @return: Command output
4357
  @raise RPCFail: In case of an error
4358

4359
  """
4360
  logging.info("Preparing to run restricted command '%s'", cmd)
4361

    
4362
  if not _enabled:
4363
    _Fail("Restricted commands disabled at configure time")
4364

    
4365
  lock = None
4366
  try:
4367
    cmdresult = None
4368
    try:
4369
      lock = utils.FileLock.Open(_lock_file)
4370
      lock.Exclusive(blocking=True, timeout=_lock_timeout)
4371

    
4372
      (status, value) = _prepare_fn(_path, cmd)
4373

    
4374
      if status:
4375
        cmdresult = _runcmd_fn([value], env={}, reset_env=True,
4376
                               postfork_fn=lambda _: lock.Unlock())
4377
      else:
4378
        logging.error(value)
4379
    except Exception: # pylint: disable=W0703
4380
      # Keep original error in log
4381
      logging.exception("Caught exception")
4382

    
4383
    if cmdresult is None:
4384
      logging.info("Sleeping for %0.1f seconds before returning",
4385
                   _RCMD_INVALID_DELAY)
4386
      _sleep_fn(_RCMD_INVALID_DELAY)
4387

    
4388
      # Do not include original error message in returned error
4389
      _Fail("Executing command '%s' failed" % cmd)
4390
    elif cmdresult.failed or cmdresult.fail_reason:
4391
      _Fail("Restricted command '%s' failed: %s; output: %s",
4392
            cmd, cmdresult.fail_reason, cmdresult.output)
4393
    else:
4394
      return cmdresult.output
4395
  finally:
4396
    if lock is not None:
4397
      # Release lock at last
4398
      lock.Close()
4399
      lock = None
4400

    
4401

    
4402
def SetWatcherPause(until, _filename=pathutils.WATCHER_PAUSEFILE):
4403
  """Creates or removes the watcher pause file.
4404

4405
  @type until: None or number
4406
  @param until: Unix timestamp saying until when the watcher shouldn't run
4407

4408
  """
4409
  if until is None:
4410
    logging.info("Received request to no longer pause watcher")
4411
    utils.RemoveFile(_filename)
4412
  else:
4413
    logging.info("Received request to pause watcher until %s", until)
4414

    
4415
    if not ht.TNumber(until):
4416
      _Fail("Duration must be numeric")
4417

    
4418
    utils.WriteFile(_filename, data="%d\n" % (until, ), mode=0644)
4419

    
4420

    
4421
def ConfigureOVS(ovs_name, ovs_link):
4422
  """Creates a OpenvSwitch on the node.
4423

4424
  This function sets up a OpenvSwitch on the node with given name nad
4425
  connects it via a given eth device.
4426

4427
  @type ovs_name: string
4428
  @param ovs_name: Name of the OpenvSwitch to create.
4429
  @type ovs_link: None or string
4430
  @param ovs_link: Ethernet device for outside connection (can be missing)
4431

4432
  """
4433
  # Initialize the OpenvSwitch
4434
  result = utils.RunCmd(["ovs-vsctl", "add-br", ovs_name])
4435
  if result.failed:
4436
    _Fail("Failed to create openvswitch. Script return value: %s, output: '%s'"
4437
          % (result.exit_code, result.output), log=True)
4438

    
4439
  # And connect it to a physical interface, if given
4440
  if ovs_link:
4441
    result = utils.RunCmd(["ovs-vsctl", "add-port", ovs_name, ovs_link])
4442
    if result.failed:
4443
      _Fail("Failed to connect openvswitch to  interface %s. Script return"
4444
            " value: %s, output: '%s'" % (ovs_link, result.exit_code,
4445
            result.output), log=True)
4446

    
4447

    
4448
class HooksRunner(object):
4449
  """Hook runner.
4450

4451
  This class is instantiated on the node side (ganeti-noded) and not
4452
  on the master side.
4453

4454
  """
4455
  def __init__(self, hooks_base_dir=None):
4456
    """Constructor for hooks runner.
4457

4458
    @type hooks_base_dir: str or None
4459
    @param hooks_base_dir: if not None, this overrides the
4460
        L{pathutils.HOOKS_BASE_DIR} (useful for unittests)
4461

4462
    """
4463
    if hooks_base_dir is None:
4464
      hooks_base_dir = pathutils.HOOKS_BASE_DIR
4465
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
4466
    # constant
4467
    self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
4468

    
4469
  def RunLocalHooks(self, node_list, hpath, phase, env):
4470
    """Check that the hooks will be run only locally and then run them.
4471

4472
    """
4473
    assert len(node_list) == 1
4474
    node = node_list[0]
4475
    _, myself = ssconf.GetMasterAndMyself()
4476
    assert node == myself
4477

    
4478
    results = self.RunHooks(hpath, phase, env)
4479

    
4480
    # Return values in the form expected by HooksMaster
4481
    return {node: (None, False, results)}
4482

    
4483
  def RunHooks(self, hpath, phase, env):
4484
    """Run the scripts in the hooks directory.
4485

4486
    @type hpath: str
4487
    @param hpath: the path to the hooks directory which
4488
        holds the scripts
4489
    @type phase: str
4490
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
4491
        L{constants.HOOKS_PHASE_POST}
4492
    @type env: dict
4493
    @param env: dictionary with the environment for the hook
4494
    @rtype: list
4495
    @return: list of 3-element tuples:
4496
      - script path
4497
      - script result, either L{constants.HKR_SUCCESS} or
4498
        L{constants.HKR_FAIL}
4499
      - output of the script
4500

4501
    @raise errors.ProgrammerError: for invalid input
4502
        parameters
4503

4504
    """
4505
    if phase == constants.HOOKS_PHASE_PRE:
4506
      suffix = "pre"
4507
    elif phase == constants.HOOKS_PHASE_POST:
4508
      suffix = "post"
4509
    else:
4510
      _Fail("Unknown hooks phase '%s'", phase)
4511

    
4512
    subdir = "%s-%s.d" % (hpath, suffix)
4513
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
4514

    
4515
    results = []
4516

    
4517
    if not os.path.isdir(dir_name):
4518
      # for non-existing/non-dirs, we simply exit instead of logging a
4519
      # warning at every operation
4520
      return results
4521

    
4522
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
4523

    
4524
    for (relname, relstatus, runresult) in runparts_results:
4525
      if relstatus == constants.RUNPARTS_SKIP:
4526
        rrval = constants.HKR_SKIP
4527
        output = ""
4528
      elif relstatus == constants.RUNPARTS_ERR:
4529
        rrval = constants.HKR_FAIL
4530
        output = "Hook script execution error: %s" % runresult
4531
      elif relstatus == constants.RUNPARTS_RUN:
4532
        if runresult.failed:
4533
          rrval = constants.HKR_FAIL
4534
        else:
4535
          rrval = constants.HKR_SUCCESS
4536
        output = utils.SafeEncode(runresult.output.strip())
4537
      results.append(("%s/%s" % (subdir, relname), rrval, output))
4538

    
4539
    return results
4540

    
4541

    
4542
class IAllocatorRunner(object):
4543
  """IAllocator runner.
4544

4545
  This class is instantiated on the node side (ganeti-noded) and not on
4546
  the master side.
4547

4548
  """
4549
  @staticmethod
4550
  def Run(name, idata, ial_params):
4551
    """Run an iallocator script.
4552

4553
    @type name: str
4554
    @param name: the iallocator script name
4555
    @type idata: str
4556
    @param idata: the allocator input data
4557
    @type ial_params: list
4558
    @param ial_params: the iallocator parameters
4559

4560
    @rtype: tuple
4561
    @return: two element tuple of:
4562
       - status
4563
       - either error message or stdout of allocator (for success)
4564

4565
    """
4566
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
4567
                                  os.path.isfile)
4568
    if alloc_script is None:
4569
      _Fail("iallocator module '%s' not found in the search path", name)
4570

    
4571
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
4572
    try:
4573
      os.write(fd, idata)
4574
      os.close(fd)
4575
      result = utils.RunCmd([alloc_script, fin_name] + ial_params)
4576
      if result.failed:
4577
        _Fail("iallocator module '%s' failed: %s, output '%s'",
4578
              name, result.fail_reason, result.output)
4579
    finally:
4580
      os.unlink(fin_name)
4581

    
4582
    return result.stdout
4583

    
4584

    
4585
class DevCacheManager(object):
4586
  """Simple class for managing a cache of block device information.
4587

4588
  """
4589
  _DEV_PREFIX = "/dev/"
4590
  _ROOT_DIR = pathutils.BDEV_CACHE_DIR
4591

    
4592
  @classmethod
4593
  def _ConvertPath(cls, dev_path):
4594
    """Converts a /dev/name path to the cache file name.
4595

4596
    This replaces slashes with underscores and strips the /dev
4597
    prefix. It then returns the full path to the cache file.
4598

4599
    @type dev_path: str
4600
    @param dev_path: the C{/dev/} path name
4601
    @rtype: str
4602
    @return: the converted path name
4603

4604
    """
4605
    if dev_path.startswith(cls._DEV_PREFIX):
4606
      dev_path = dev_path[len(cls._DEV_PREFIX):]
4607
    dev_path = dev_path.replace("/", "_")
4608
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
4609
    return fpath
4610

    
4611
  @classmethod
4612
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
4613
    """Updates the cache information for a given device.
4614

4615
    @type dev_path: str
4616
    @param dev_path: the pathname of the device
4617
    @type owner: str
4618
    @param owner: the owner (instance name) of the device
4619
    @type on_primary: bool
4620
    @param on_primary: whether this is the primary
4621
        node nor not
4622
    @type iv_name: str
4623
    @param iv_name: the instance-visible name of the
4624
        device, as in objects.Disk.iv_name
4625

4626
    @rtype: None
4627

4628
    """
4629
    if dev_path is None:
4630
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
4631
      return
4632
    fpath = cls._ConvertPath(dev_path)
4633
    if on_primary:
4634
      state = "primary"
4635
    else:
4636
      state = "secondary"
4637
    if iv_name is None:
4638
      iv_name = "not_visible"
4639
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
4640
    try:
4641
      utils.WriteFile(fpath, data=fdata)
4642
    except EnvironmentError, err:
4643
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
4644

    
4645
  @classmethod
4646
  def RemoveCache(cls, dev_path):
4647
    """Remove data for a dev_path.
4648

4649
    This is just a wrapper over L{utils.io.RemoveFile} with a converted
4650
    path name and logging.
4651

4652
    @type dev_path: str
4653
    @param dev_path: the pathname of the device
4654

4655
    @rtype: None
4656

4657
    """
4658
    if dev_path is None:
4659
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
4660
      return
4661
    fpath = cls._ConvertPath(dev_path)
4662
    try:
4663
      utils.RemoveFile(fpath)
4664
    except EnvironmentError, err:
4665
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)