Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ b544a3c2

History | View | Annotate | Download (143 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable=E1103,C0302
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37
# C0302: This module has become too big and should be split up
38

    
39

    
40
import os
41
import os.path
42
import shutil
43
import time
44
import stat
45
import errno
46
import re
47
import random
48
import logging
49
import tempfile
50
import zlib
51
import base64
52
import signal
53

    
54
from ganeti import errors
55
from ganeti import utils
56
from ganeti import ssh
57
from ganeti import hypervisor
58
from ganeti import constants
59
from ganeti.storage import bdev
60
from ganeti.storage import drbd
61
from ganeti.storage import filestorage
62
from ganeti import objects
63
from ganeti import ssconf
64
from ganeti import serializer
65
from ganeti import netutils
66
from ganeti import runtime
67
from ganeti import compat
68
from ganeti import pathutils
69
from ganeti import vcluster
70
from ganeti import ht
71
from ganeti.storage.base import BlockDev
72
from ganeti.storage.drbd import DRBD8
73
from ganeti import hooksmaster
74

    
75

    
76
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
77
_ALLOWED_CLEAN_DIRS = compat.UniqueFrozenset([
78
  pathutils.DATA_DIR,
79
  pathutils.JOB_QUEUE_ARCHIVE_DIR,
80
  pathutils.QUEUE_DIR,
81
  pathutils.CRYPTO_KEYS_DIR,
82
  ])
83
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
84
_X509_KEY_FILE = "key"
85
_X509_CERT_FILE = "cert"
86
_IES_STATUS_FILE = "status"
87
_IES_PID_FILE = "pid"
88
_IES_CA_FILE = "ca"
89

    
90
#: Valid LVS output line regex
91
_LVSLINE_REGEX = re.compile(r"^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$")
92

    
93
# Actions for the master setup script
94
_MASTER_START = "start"
95
_MASTER_STOP = "stop"
96

    
97
#: Maximum file permissions for restricted command directory and executables
98
_RCMD_MAX_MODE = (stat.S_IRWXU |
99
                  stat.S_IRGRP | stat.S_IXGRP |
100
                  stat.S_IROTH | stat.S_IXOTH)
101

    
102
#: Delay before returning an error for restricted commands
103
_RCMD_INVALID_DELAY = 10
104

    
105
#: How long to wait to acquire lock for restricted commands (shorter than
106
#: L{_RCMD_INVALID_DELAY}) to reduce blockage of noded forks when many
107
#: command requests arrive
108
_RCMD_LOCK_TIMEOUT = _RCMD_INVALID_DELAY * 0.8
109

    
110

    
111
class RPCFail(Exception):
112
  """Class denoting RPC failure.
113

114
  Its argument is the error message.
115

116
  """
117

    
118

    
119
def _GetInstReasonFilename(instance_name):
120
  """Path of the file containing the reason of the instance status change.
121

122
  @type instance_name: string
123
  @param instance_name: The name of the instance
124
  @rtype: string
125
  @return: The path of the file
126

127
  """
128
  return utils.PathJoin(pathutils.INSTANCE_REASON_DIR, instance_name)
129

    
130

    
131
def _StoreInstReasonTrail(instance_name, trail):
132
  """Serialize a reason trail related to an instance change of state to file.
133

134
  The exact location of the file depends on the name of the instance and on
135
  the configuration of the Ganeti cluster defined at deploy time.
136

137
  @type instance_name: string
138
  @param instance_name: The name of the instance
139
  @rtype: None
140

141
  """
142
  json = serializer.DumpJson(trail)
143
  filename = _GetInstReasonFilename(instance_name)
144
  utils.WriteFile(filename, data=json)
145

    
146

    
147
def _Fail(msg, *args, **kwargs):
148
  """Log an error and the raise an RPCFail exception.
149

150
  This exception is then handled specially in the ganeti daemon and
151
  turned into a 'failed' return type. As such, this function is a
152
  useful shortcut for logging the error and returning it to the master
153
  daemon.
154

155
  @type msg: string
156
  @param msg: the text of the exception
157
  @raise RPCFail
158

159
  """
160
  if args:
161
    msg = msg % args
162
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
163
    if "exc" in kwargs and kwargs["exc"]:
164
      logging.exception(msg)
165
    else:
166
      logging.error(msg)
167
  raise RPCFail(msg)
168

    
169

    
170
def _GetConfig():
171
  """Simple wrapper to return a SimpleStore.
172

173
  @rtype: L{ssconf.SimpleStore}
174
  @return: a SimpleStore instance
175

176
  """
177
  return ssconf.SimpleStore()
178

    
179

    
180
def _GetSshRunner(cluster_name):
181
  """Simple wrapper to return an SshRunner.
182

183
  @type cluster_name: str
184
  @param cluster_name: the cluster name, which is needed
185
      by the SshRunner constructor
186
  @rtype: L{ssh.SshRunner}
187
  @return: an SshRunner instance
188

189
  """
190
  return ssh.SshRunner(cluster_name)
191

    
192

    
193
def _Decompress(data):
194
  """Unpacks data compressed by the RPC client.
195

196
  @type data: list or tuple
197
  @param data: Data sent by RPC client
198
  @rtype: str
199
  @return: Decompressed data
200

201
  """
202
  assert isinstance(data, (list, tuple))
203
  assert len(data) == 2
204
  (encoding, content) = data
205
  if encoding == constants.RPC_ENCODING_NONE:
206
    return content
207
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
208
    return zlib.decompress(base64.b64decode(content))
209
  else:
210
    raise AssertionError("Unknown data encoding")
211

    
212

    
213
def _CleanDirectory(path, exclude=None):
214
  """Removes all regular files in a directory.
215

216
  @type path: str
217
  @param path: the directory to clean
218
  @type exclude: list
219
  @param exclude: list of files to be excluded, defaults
220
      to the empty list
221

222
  """
223
  if path not in _ALLOWED_CLEAN_DIRS:
224
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
225
          path)
226

    
227
  if not os.path.isdir(path):
228
    return
229
  if exclude is None:
230
    exclude = []
231
  else:
232
    # Normalize excluded paths
233
    exclude = [os.path.normpath(i) for i in exclude]
234

    
235
  for rel_name in utils.ListVisibleFiles(path):
236
    full_name = utils.PathJoin(path, rel_name)
237
    if full_name in exclude:
238
      continue
239
    if os.path.isfile(full_name) and not os.path.islink(full_name):
240
      utils.RemoveFile(full_name)
241

    
242

    
243
def _BuildUploadFileList():
244
  """Build the list of allowed upload files.
245

246
  This is abstracted so that it's built only once at module import time.
247

248
  """
249
  allowed_files = set([
250
    pathutils.CLUSTER_CONF_FILE,
251
    pathutils.ETC_HOSTS,
252
    pathutils.SSH_KNOWN_HOSTS_FILE,
253
    pathutils.VNC_PASSWORD_FILE,
254
    pathutils.RAPI_CERT_FILE,
255
    pathutils.SPICE_CERT_FILE,
256
    pathutils.SPICE_CACERT_FILE,
257
    pathutils.RAPI_USERS_FILE,
258
    pathutils.CONFD_HMAC_KEY,
259
    pathutils.CLUSTER_DOMAIN_SECRET_FILE,
260
    ])
261

    
262
  for hv_name in constants.HYPER_TYPES:
263
    hv_class = hypervisor.GetHypervisorClass(hv_name)
264
    allowed_files.update(hv_class.GetAncillaryFiles()[0])
265

    
266
  assert pathutils.FILE_STORAGE_PATHS_FILE not in allowed_files, \
267
    "Allowed file storage paths should never be uploaded via RPC"
268

    
269
  return frozenset(allowed_files)
270

    
271

    
272
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
273

    
274

    
275
def JobQueuePurge():
276
  """Removes job queue files and archived jobs.
277

278
  @rtype: tuple
279
  @return: True, None
280

281
  """
282
  _CleanDirectory(pathutils.QUEUE_DIR, exclude=[pathutils.JOB_QUEUE_LOCK_FILE])
283
  _CleanDirectory(pathutils.JOB_QUEUE_ARCHIVE_DIR)
284

    
285

    
286
def GetMasterInfo():
287
  """Returns master information.
288

289
  This is an utility function to compute master information, either
290
  for consumption here or from the node daemon.
291

292
  @rtype: tuple
293
  @return: master_netdev, master_ip, master_name, primary_ip_family,
294
    master_netmask
295
  @raise RPCFail: in case of errors
296

297
  """
298
  try:
299
    cfg = _GetConfig()
300
    master_netdev = cfg.GetMasterNetdev()
301
    master_ip = cfg.GetMasterIP()
302
    master_netmask = cfg.GetMasterNetmask()
303
    master_node = cfg.GetMasterNode()
304
    primary_ip_family = cfg.GetPrimaryIPFamily()
305
  except errors.ConfigurationError, err:
306
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
307
  return (master_netdev, master_ip, master_node, primary_ip_family,
308
          master_netmask)
309

    
310

    
311
def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
312
  """Decorator that runs hooks before and after the decorated function.
313

314
  @type hook_opcode: string
315
  @param hook_opcode: opcode of the hook
316
  @type hooks_path: string
317
  @param hooks_path: path of the hooks
318
  @type env_builder_fn: function
319
  @param env_builder_fn: function that returns a dictionary containing the
320
    environment variables for the hooks. Will get all the parameters of the
321
    decorated function.
322
  @raise RPCFail: in case of pre-hook failure
323

324
  """
325
  def decorator(fn):
326
    def wrapper(*args, **kwargs):
327
      _, myself = ssconf.GetMasterAndMyself()
328
      nodes = ([myself], [myself])  # these hooks run locally
329

    
330
      env_fn = compat.partial(env_builder_fn, *args, **kwargs)
331

    
332
      cfg = _GetConfig()
333
      hr = HooksRunner()
334
      hm = hooksmaster.HooksMaster(hook_opcode, hooks_path, nodes,
335
                                   hr.RunLocalHooks, None, env_fn,
336
                                   logging.warning, cfg.GetClusterName(),
337
                                   cfg.GetMasterNode())
338
      hm.RunPhase(constants.HOOKS_PHASE_PRE)
339
      result = fn(*args, **kwargs)
340
      hm.RunPhase(constants.HOOKS_PHASE_POST)
341

    
342
      return result
343
    return wrapper
344
  return decorator
345

    
346

    
347
def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
348
  """Builds environment variables for master IP hooks.
349

350
  @type master_params: L{objects.MasterNetworkParameters}
351
  @param master_params: network parameters of the master
352
  @type use_external_mip_script: boolean
353
  @param use_external_mip_script: whether to use an external master IP
354
    address setup script (unused, but necessary per the implementation of the
355
    _RunLocalHooks decorator)
356

357
  """
358
  # pylint: disable=W0613
359
  ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
360
  env = {
361
    "MASTER_NETDEV": master_params.netdev,
362
    "MASTER_IP": master_params.ip,
363
    "MASTER_NETMASK": str(master_params.netmask),
364
    "CLUSTER_IP_VERSION": str(ver),
365
  }
366

    
367
  return env
368

    
369

    
370
def _RunMasterSetupScript(master_params, action, use_external_mip_script):
371
  """Execute the master IP address setup script.
372

373
  @type master_params: L{objects.MasterNetworkParameters}
374
  @param master_params: network parameters of the master
375
  @type action: string
376
  @param action: action to pass to the script. Must be one of
377
    L{backend._MASTER_START} or L{backend._MASTER_STOP}
378
  @type use_external_mip_script: boolean
379
  @param use_external_mip_script: whether to use an external master IP
380
    address setup script
381
  @raise backend.RPCFail: if there are errors during the execution of the
382
    script
383

384
  """
385
  env = _BuildMasterIpEnv(master_params)
386

    
387
  if use_external_mip_script:
388
    setup_script = pathutils.EXTERNAL_MASTER_SETUP_SCRIPT
389
  else:
390
    setup_script = pathutils.DEFAULT_MASTER_SETUP_SCRIPT
391

    
392
  result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
393

    
394
  if result.failed:
395
    _Fail("Failed to %s the master IP. Script return value: %s, output: '%s'" %
396
          (action, result.exit_code, result.output), log=True)
397

    
398

    
399
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
400
               _BuildMasterIpEnv)
401
def ActivateMasterIp(master_params, use_external_mip_script):
402
  """Activate the IP address of the master daemon.
403

404
  @type master_params: L{objects.MasterNetworkParameters}
405
  @param master_params: network parameters of the master
406
  @type use_external_mip_script: boolean
407
  @param use_external_mip_script: whether to use an external master IP
408
    address setup script
409
  @raise RPCFail: in case of errors during the IP startup
410

411
  """
412
  _RunMasterSetupScript(master_params, _MASTER_START,
413
                        use_external_mip_script)
414

    
415

    
416
def StartMasterDaemons(no_voting):
417
  """Activate local node as master node.
418

419
  The function will start the master daemons (ganeti-masterd and ganeti-rapi).
420

421
  @type no_voting: boolean
422
  @param no_voting: whether to start ganeti-masterd without a node vote
423
      but still non-interactively
424
  @rtype: None
425

426
  """
427

    
428
  if no_voting:
429
    masterd_args = "--no-voting --yes-do-it"
430
  else:
431
    masterd_args = ""
432

    
433
  env = {
434
    "EXTRA_MASTERD_ARGS": masterd_args,
435
    }
436

    
437
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
438
  if result.failed:
439
    msg = "Can't start Ganeti master: %s" % result.output
440
    logging.error(msg)
441
    _Fail(msg)
442

    
443

    
444
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
445
               _BuildMasterIpEnv)
446
def DeactivateMasterIp(master_params, use_external_mip_script):
447
  """Deactivate the master IP on this node.
448

449
  @type master_params: L{objects.MasterNetworkParameters}
450
  @param master_params: network parameters of the master
451
  @type use_external_mip_script: boolean
452
  @param use_external_mip_script: whether to use an external master IP
453
    address setup script
454
  @raise RPCFail: in case of errors during the IP turndown
455

456
  """
457
  _RunMasterSetupScript(master_params, _MASTER_STOP,
458
                        use_external_mip_script)
459

    
460

    
461
def StopMasterDaemons():
462
  """Stop the master daemons on this node.
463

464
  Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
465

466
  @rtype: None
467

468
  """
469
  # TODO: log and report back to the caller the error failures; we
470
  # need to decide in which case we fail the RPC for this
471

    
472
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
473
  if result.failed:
474
    logging.error("Could not stop Ganeti master, command %s had exitcode %s"
475
                  " and error %s",
476
                  result.cmd, result.exit_code, result.output)
477

    
478

    
479
def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
480
  """Change the netmask of the master IP.
481

482
  @param old_netmask: the old value of the netmask
483
  @param netmask: the new value of the netmask
484
  @param master_ip: the master IP
485
  @param master_netdev: the master network device
486

487
  """
488
  if old_netmask == netmask:
489
    return
490

    
491
  if not netutils.IPAddress.Own(master_ip):
492
    _Fail("The master IP address is not up, not attempting to change its"
493
          " netmask")
494

    
495
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
496
                         "%s/%s" % (master_ip, netmask),
497
                         "dev", master_netdev, "label",
498
                         "%s:0" % master_netdev])
499
  if result.failed:
500
    _Fail("Could not set the new netmask on the master IP address")
501

    
502
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
503
                         "%s/%s" % (master_ip, old_netmask),
504
                         "dev", master_netdev, "label",
505
                         "%s:0" % master_netdev])
506
  if result.failed:
507
    _Fail("Could not bring down the master IP address with the old netmask")
508

    
509

    
510
def EtcHostsModify(mode, host, ip):
511
  """Modify a host entry in /etc/hosts.
512

513
  @param mode: The mode to operate. Either add or remove entry
514
  @param host: The host to operate on
515
  @param ip: The ip associated with the entry
516

517
  """
518
  if mode == constants.ETC_HOSTS_ADD:
519
    if not ip:
520
      RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
521
              " present")
522
    utils.AddHostToEtcHosts(host, ip)
523
  elif mode == constants.ETC_HOSTS_REMOVE:
524
    if ip:
525
      RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
526
              " parameter is present")
527
    utils.RemoveHostFromEtcHosts(host)
528
  else:
529
    RPCFail("Mode not supported")
530

    
531

    
532
def LeaveCluster(modify_ssh_setup):
533
  """Cleans up and remove the current node.
534

535
  This function cleans up and prepares the current node to be removed
536
  from the cluster.
537

538
  If processing is successful, then it raises an
539
  L{errors.QuitGanetiException} which is used as a special case to
540
  shutdown the node daemon.
541

542
  @param modify_ssh_setup: boolean
543

544
  """
545
  _CleanDirectory(pathutils.DATA_DIR)
546
  _CleanDirectory(pathutils.CRYPTO_KEYS_DIR)
547
  JobQueuePurge()
548

    
549
  if modify_ssh_setup:
550
    try:
551
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
552

    
553
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
554

    
555
      utils.RemoveFile(priv_key)
556
      utils.RemoveFile(pub_key)
557
    except errors.OpExecError:
558
      logging.exception("Error while processing ssh files")
559

    
560
  try:
561
    utils.RemoveFile(pathutils.CONFD_HMAC_KEY)
562
    utils.RemoveFile(pathutils.RAPI_CERT_FILE)
563
    utils.RemoveFile(pathutils.SPICE_CERT_FILE)
564
    utils.RemoveFile(pathutils.SPICE_CACERT_FILE)
565
    utils.RemoveFile(pathutils.NODED_CERT_FILE)
566
  except: # pylint: disable=W0702
567
    logging.exception("Error while removing cluster secrets")
568

    
569
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.CONFD])
570
  if result.failed:
571
    logging.error("Command %s failed with exitcode %s and error %s",
572
                  result.cmd, result.exit_code, result.output)
573

    
574
  # Raise a custom exception (handled in ganeti-noded)
575
  raise errors.QuitGanetiException(True, "Shutdown scheduled")
576

    
577

    
578
def _CheckStorageParams(params, num_params):
579
  """Performs sanity checks for storage parameters.
580

581
  @type params: list
582
  @param params: list of storage parameters
583
  @type num_params: int
584
  @param num_params: expected number of parameters
585

586
  """
587
  if params is None:
588
    raise errors.ProgrammerError("No storage parameters for storage"
589
                                 " reporting is provided.")
590
  if not isinstance(params, list):
591
    raise errors.ProgrammerError("The storage parameters are not of type"
592
                                 " list: '%s'" % params)
593
  if not len(params) == num_params:
594
    raise errors.ProgrammerError("Did not receive the expected number of"
595
                                 "storage parameters: expected %s,"
596
                                 " received '%s'" % (num_params, len(params)))
597

    
598

    
599
def _CheckLvmStorageParams(params):
600
  """Performs sanity check for the 'exclusive storage' flag.
601

602
  @see: C{_CheckStorageParams}
603

604
  """
605
  _CheckStorageParams(params, 1)
606
  excl_stor = params[0]
607
  if not isinstance(params[0], bool):
608
    raise errors.ProgrammerError("Exclusive storage parameter is not"
609
                                 " boolean: '%s'." % excl_stor)
610
  return excl_stor
611

    
612

    
613
def _GetLvmVgSpaceInfo(name, params):
614
  """Wrapper around C{_GetVgInfo} which checks the storage parameters.
615

616
  @type name: string
617
  @param name: name of the volume group
618
  @type params: list
619
  @param params: list of storage parameters, which in this case should be
620
    containing only one for exclusive storage
621

622
  """
623
  excl_stor = _CheckLvmStorageParams(params)
624
  return _GetVgInfo(name, excl_stor)
625

    
626

    
627
def _GetVgInfo(
628
    name, excl_stor, info_fn=bdev.LogicalVolume.GetVGInfo):
629
  """Retrieves information about a LVM volume group.
630

631
  """
632
  # TODO: GetVGInfo supports returning information for multiple VGs at once
633
  vginfo = info_fn([name], excl_stor)
634
  if vginfo:
635
    vg_free = int(round(vginfo[0][0], 0))
636
    vg_size = int(round(vginfo[0][1], 0))
637
  else:
638
    vg_free = None
639
    vg_size = None
640

    
641
  return {
642
    "type": constants.ST_LVM_VG,
643
    "name": name,
644
    "storage_free": vg_free,
645
    "storage_size": vg_size,
646
    }
647

    
648

    
649
def _GetLvmPvSpaceInfo(name, params):
650
  """Wrapper around C{_GetVgSpindlesInfo} with sanity checks.
651

652
  @see: C{_GetLvmVgSpaceInfo}
653

654
  """
655
  excl_stor = _CheckLvmStorageParams(params)
656
  return _GetVgSpindlesInfo(name, excl_stor)
657

    
658

    
659
def _GetVgSpindlesInfo(
660
    name, excl_stor, info_fn=bdev.LogicalVolume.GetVgSpindlesInfo):
661
  """Retrieves information about spindles in an LVM volume group.
662

663
  @type name: string
664
  @param name: VG name
665
  @type excl_stor: bool
666
  @param excl_stor: exclusive storage
667
  @rtype: dict
668
  @return: dictionary whose keys are "name", "vg_free", "vg_size" for VG name,
669
      free spindles, total spindles respectively
670

671
  """
672
  if excl_stor:
673
    (vg_free, vg_size) = info_fn(name)
674
  else:
675
    vg_free = 0
676
    vg_size = 0
677
  return {
678
    "type": constants.ST_LVM_PV,
679
    "name": name,
680
    "storage_free": vg_free,
681
    "storage_size": vg_size,
682
    }
683

    
684

    
685
def _GetHvInfo(name, hvparams, get_hv_fn=hypervisor.GetHypervisor):
686
  """Retrieves node information from a hypervisor.
687

688
  The information returned depends on the hypervisor. Common items:
689

690
    - vg_size is the size of the configured volume group in MiB
691
    - vg_free is the free size of the volume group in MiB
692
    - memory_dom0 is the memory allocated for domain0 in MiB
693
    - memory_free is the currently available (free) ram in MiB
694
    - memory_total is the total number of ram in MiB
695
    - hv_version: the hypervisor version, if available
696

697
  @type hvparams: dict of string
698
  @param hvparams: the hypervisor's hvparams
699

700
  """
701
  return get_hv_fn(name).GetNodeInfo(hvparams=hvparams)
702

    
703

    
704
def _GetHvInfoAll(hv_specs, get_hv_fn=hypervisor.GetHypervisor):
705
  """Retrieves node information for all hypervisors.
706

707
  See C{_GetHvInfo} for information on the output.
708

709
  @type hv_specs: list of pairs (string, dict of strings)
710
  @param hv_specs: list of pairs of a hypervisor's name and its hvparams
711

712
  """
713
  if hv_specs is None:
714
    return None
715

    
716
  result = []
717
  for hvname, hvparams in hv_specs:
718
    result.append(_GetHvInfo(hvname, hvparams, get_hv_fn))
719
  return result
720

    
721

    
722
def _GetNamedNodeInfo(names, fn):
723
  """Calls C{fn} for all names in C{names} and returns a dictionary.
724

725
  @rtype: None or dict
726

727
  """
728
  if names is None:
729
    return None
730
  else:
731
    return map(fn, names)
732

    
733

    
734
def GetNodeInfo(storage_units, hv_specs):
735
  """Gives back a hash with different information about the node.
736

737
  @type storage_units: list of tuples (string, string, list)
738
  @param storage_units: List of tuples (storage unit, identifier, parameters) to
739
    ask for disk space information. In case of lvm-vg, the identifier is
740
    the VG name. The parameters can contain additional, storage-type-specific
741
    parameters, for example exclusive storage for lvm storage.
742
  @type hv_specs: list of pairs (string, dict of strings)
743
  @param hv_specs: list of pairs of a hypervisor's name and its hvparams
744
  @rtype: tuple; (string, None/dict, None/dict)
745
  @return: Tuple containing boot ID, volume group information and hypervisor
746
    information
747

748
  """
749
  bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
750
  storage_info = _GetNamedNodeInfo(
751
    storage_units,
752
    (lambda (storage_type, storage_key, storage_params):
753
        _ApplyStorageInfoFunction(storage_type, storage_key, storage_params)))
754
  hv_info = _GetHvInfoAll(hv_specs)
755
  return (bootid, storage_info, hv_info)
756

    
757

    
758
def _GetFileStorageSpaceInfo(path, params):
759
  """Wrapper around filestorage.GetSpaceInfo.
760

761
  The purpose of this wrapper is to call filestorage.GetFileStorageSpaceInfo
762
  and ignore the *args parameter to not leak it into the filestorage
763
  module's code.
764

765
  @see: C{filestorage.GetFileStorageSpaceInfo} for description of the
766
    parameters.
767

768
  """
769
  _CheckStorageParams(params, 0)
770
  return filestorage.GetFileStorageSpaceInfo(path)
771

    
772

    
773
# FIXME: implement storage reporting for all missing storage types.
774
_STORAGE_TYPE_INFO_FN = {
775
  constants.ST_BLOCK: None,
776
  constants.ST_DISKLESS: None,
777
  constants.ST_EXT: None,
778
  constants.ST_FILE: _GetFileStorageSpaceInfo,
779
  constants.ST_LVM_PV: _GetLvmPvSpaceInfo,
780
  constants.ST_LVM_VG: _GetLvmVgSpaceInfo,
781
  constants.ST_SHARED_FILE: None,
782
  constants.ST_RADOS: None,
783
}
784

    
785

    
786
def _ApplyStorageInfoFunction(storage_type, storage_key, *args):
787
  """Looks up and applies the correct function to calculate free and total
788
  storage for the given storage type.
789

790
  @type storage_type: string
791
  @param storage_type: the storage type for which the storage shall be reported.
792
  @type storage_key: string
793
  @param storage_key: identifier of a storage unit, e.g. the volume group name
794
    of an LVM storage unit
795
  @type args: any
796
  @param args: various parameters that can be used for storage reporting. These
797
    parameters and their semantics vary from storage type to storage type and
798
    are just propagated in this function.
799
  @return: the results of the application of the storage space function (see
800
    _STORAGE_TYPE_INFO_FN) if storage space reporting is implemented for that
801
    storage type
802
  @raises NotImplementedError: for storage types who don't support space
803
    reporting yet
804
  """
805
  fn = _STORAGE_TYPE_INFO_FN[storage_type]
806
  if fn is not None:
807
    return fn(storage_key, *args)
808
  else:
809
    raise NotImplementedError
810

    
811

    
812
def _CheckExclusivePvs(pvi_list):
813
  """Check that PVs are not shared among LVs
814

815
  @type pvi_list: list of L{objects.LvmPvInfo} objects
816
  @param pvi_list: information about the PVs
817

818
  @rtype: list of tuples (string, list of strings)
819
  @return: offending volumes, as tuples: (pv_name, [lv1_name, lv2_name...])
820

821
  """
822
  res = []
823
  for pvi in pvi_list:
824
    if len(pvi.lv_list) > 1:
825
      res.append((pvi.name, pvi.lv_list))
826
  return res
827

    
828

    
829
def _VerifyHypervisors(what, vm_capable, result, all_hvparams,
830
                       get_hv_fn=hypervisor.GetHypervisor):
831
  """Verifies the hypervisor. Appends the results to the 'results' list.
832

833
  @type what: C{dict}
834
  @param what: a dictionary of things to check
835
  @type vm_capable: boolean
836
  @param vm_capable: whether or not this node is vm capable
837
  @type result: dict
838
  @param result: dictionary of verification results; results of the
839
    verifications in this function will be added here
840
  @type all_hvparams: dict of dict of string
841
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
842
  @type get_hv_fn: function
843
  @param get_hv_fn: function to retrieve the hypervisor, to improve testability
844

845
  """
846
  if not vm_capable:
847
    return
848

    
849
  if constants.NV_HYPERVISOR in what:
850
    result[constants.NV_HYPERVISOR] = {}
851
    for hv_name in what[constants.NV_HYPERVISOR]:
852
      hvparams = all_hvparams[hv_name]
853
      try:
854
        val = get_hv_fn(hv_name).Verify(hvparams=hvparams)
855
      except errors.HypervisorError, err:
856
        val = "Error while checking hypervisor: %s" % str(err)
857
      result[constants.NV_HYPERVISOR][hv_name] = val
858

    
859

    
860
def _VerifyHvparams(what, vm_capable, result,
861
                    get_hv_fn=hypervisor.GetHypervisor):
862
  """Verifies the hvparams. Appends the results to the 'results' list.
863

864
  @type what: C{dict}
865
  @param what: a dictionary of things to check
866
  @type vm_capable: boolean
867
  @param vm_capable: whether or not this node is vm capable
868
  @type result: dict
869
  @param result: dictionary of verification results; results of the
870
    verifications in this function will be added here
871
  @type get_hv_fn: function
872
  @param get_hv_fn: function to retrieve the hypervisor, to improve testability
873

874
  """
875
  if not vm_capable:
876
    return
877

    
878
  if constants.NV_HVPARAMS in what:
879
    result[constants.NV_HVPARAMS] = []
880
    for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
881
      try:
882
        logging.info("Validating hv %s, %s", hv_name, hvparms)
883
        get_hv_fn(hv_name).ValidateParameters(hvparms)
884
      except errors.HypervisorError, err:
885
        result[constants.NV_HVPARAMS].append((source, hv_name, str(err)))
886

    
887

    
888
def _VerifyInstanceList(what, vm_capable, result, all_hvparams):
889
  """Verifies the instance list.
890

891
  @type what: C{dict}
892
  @param what: a dictionary of things to check
893
  @type vm_capable: boolean
894
  @param vm_capable: whether or not this node is vm capable
895
  @type result: dict
896
  @param result: dictionary of verification results; results of the
897
    verifications in this function will be added here
898
  @type all_hvparams: dict of dict of string
899
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
900

901
  """
902
  if constants.NV_INSTANCELIST in what and vm_capable:
903
    # GetInstanceList can fail
904
    try:
905
      val = GetInstanceList(what[constants.NV_INSTANCELIST],
906
                            all_hvparams=all_hvparams)
907
    except RPCFail, err:
908
      val = str(err)
909
    result[constants.NV_INSTANCELIST] = val
910

    
911

    
912
def _VerifyNodeInfo(what, vm_capable, result, all_hvparams):
913
  """Verifies the node info.
914

915
  @type what: C{dict}
916
  @param what: a dictionary of things to check
917
  @type vm_capable: boolean
918
  @param vm_capable: whether or not this node is vm capable
919
  @type result: dict
920
  @param result: dictionary of verification results; results of the
921
    verifications in this function will be added here
922
  @type all_hvparams: dict of dict of string
923
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
924

925
  """
926
  if constants.NV_HVINFO in what and vm_capable:
927
    hvname = what[constants.NV_HVINFO]
928
    hyper = hypervisor.GetHypervisor(hvname)
929
    hvparams = all_hvparams[hvname]
930
    result[constants.NV_HVINFO] = hyper.GetNodeInfo(hvparams=hvparams)
931

    
932

    
933
def VerifyNode(what, cluster_name, all_hvparams, node_groups, groups_cfg):
934
  """Verify the status of the local node.
935

936
  Based on the input L{what} parameter, various checks are done on the
937
  local node.
938

939
  If the I{filelist} key is present, this list of
940
  files is checksummed and the file/checksum pairs are returned.
941

942
  If the I{nodelist} key is present, we check that we have
943
  connectivity via ssh with the target nodes (and check the hostname
944
  report).
945

946
  If the I{node-net-test} key is present, we check that we have
947
  connectivity to the given nodes via both primary IP and, if
948
  applicable, secondary IPs.
949

950
  @type what: C{dict}
951
  @param what: a dictionary of things to check:
952
      - filelist: list of files for which to compute checksums
953
      - nodelist: list of nodes we should check ssh communication with
954
      - node-net-test: list of nodes we should check node daemon port
955
        connectivity with
956
      - hypervisor: list with hypervisors to run the verify for
957
  @type cluster_name: string
958
  @param cluster_name: the cluster's name
959
  @type all_hvparams: dict of dict of strings
960
  @param all_hvparams: a dictionary mapping hypervisor names to hvparams
961
  @type node_groups: a dict of strings
962
  @param node_groups: node _names_ mapped to their group uuids (it's enough to
963
      have only those nodes that are in `what["nodelist"]`)
964
  @type groups_cfg: a dict of dict of strings
965
  @param groups_cfg: a dictionary mapping group uuids to their configuration
966
  @rtype: dict
967
  @return: a dictionary with the same keys as the input dict, and
968
      values representing the result of the checks
969

970
  """
971
  result = {}
972
  my_name = netutils.Hostname.GetSysName()
973
  port = netutils.GetDaemonPort(constants.NODED)
974
  vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
975

    
976
  _VerifyHypervisors(what, vm_capable, result, all_hvparams)
977
  _VerifyHvparams(what, vm_capable, result)
978

    
979
  if constants.NV_FILELIST in what:
980
    fingerprints = utils.FingerprintFiles(map(vcluster.LocalizeVirtualPath,
981
                                              what[constants.NV_FILELIST]))
982
    result[constants.NV_FILELIST] = \
983
      dict((vcluster.MakeVirtualPath(key), value)
984
           for (key, value) in fingerprints.items())
985

    
986
  if constants.NV_NODELIST in what:
987
    (nodes, bynode) = what[constants.NV_NODELIST]
988

    
989
    # Add nodes from other groups (different for each node)
990
    try:
991
      nodes.extend(bynode[my_name])
992
    except KeyError:
993
      pass
994

    
995
    # Use a random order
996
    random.shuffle(nodes)
997

    
998
    # Try to contact all nodes
999
    val = {}
1000
    for node in nodes:
1001
      params = groups_cfg.get(node_groups.get(node))
1002
      ssh_port = params["ndparams"].get(constants.ND_SSH_PORT)
1003
      logging.debug("Ssh port %s (None = default) for node %s",
1004
                    str(ssh_port), node)
1005
      success, message = _GetSshRunner(cluster_name). \
1006
                            VerifyNodeHostname(node, ssh_port)
1007
      if not success:
1008
        val[node] = message
1009

    
1010
    result[constants.NV_NODELIST] = val
1011

    
1012
  if constants.NV_NODENETTEST in what:
1013
    result[constants.NV_NODENETTEST] = tmp = {}
1014
    my_pip = my_sip = None
1015
    for name, pip, sip in what[constants.NV_NODENETTEST]:
1016
      if name == my_name:
1017
        my_pip = pip
1018
        my_sip = sip
1019
        break
1020
    if not my_pip:
1021
      tmp[my_name] = ("Can't find my own primary/secondary IP"
1022
                      " in the node list")
1023
    else:
1024
      for name, pip, sip in what[constants.NV_NODENETTEST]:
1025
        fail = []
1026
        if not netutils.TcpPing(pip, port, source=my_pip):
1027
          fail.append("primary")
1028
        if sip != pip:
1029
          if not netutils.TcpPing(sip, port, source=my_sip):
1030
            fail.append("secondary")
1031
        if fail:
1032
          tmp[name] = ("failure using the %s interface(s)" %
1033
                       " and ".join(fail))
1034

    
1035
  if constants.NV_MASTERIP in what:
1036
    # FIXME: add checks on incoming data structures (here and in the
1037
    # rest of the function)
1038
    master_name, master_ip = what[constants.NV_MASTERIP]
1039
    if master_name == my_name:
1040
      source = constants.IP4_ADDRESS_LOCALHOST
1041
    else:
1042
      source = None
1043
    result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
1044
                                                     source=source)
1045

    
1046
  if constants.NV_USERSCRIPTS in what:
1047
    result[constants.NV_USERSCRIPTS] = \
1048
      [script for script in what[constants.NV_USERSCRIPTS]
1049
       if not utils.IsExecutable(script)]
1050

    
1051
  if constants.NV_OOB_PATHS in what:
1052
    result[constants.NV_OOB_PATHS] = tmp = []
1053
    for path in what[constants.NV_OOB_PATHS]:
1054
      try:
1055
        st = os.stat(path)
1056
      except OSError, err:
1057
        tmp.append("error stating out of band helper: %s" % err)
1058
      else:
1059
        if stat.S_ISREG(st.st_mode):
1060
          if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
1061
            tmp.append(None)
1062
          else:
1063
            tmp.append("out of band helper %s is not executable" % path)
1064
        else:
1065
          tmp.append("out of band helper %s is not a file" % path)
1066

    
1067
  if constants.NV_LVLIST in what and vm_capable:
1068
    try:
1069
      val = GetVolumeList(utils.ListVolumeGroups().keys())
1070
    except RPCFail, err:
1071
      val = str(err)
1072
    result[constants.NV_LVLIST] = val
1073

    
1074
  _VerifyInstanceList(what, vm_capable, result, all_hvparams)
1075

    
1076
  if constants.NV_VGLIST in what and vm_capable:
1077
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
1078

    
1079
  if constants.NV_PVLIST in what and vm_capable:
1080
    check_exclusive_pvs = constants.NV_EXCLUSIVEPVS in what
1081
    val = bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
1082
                                       filter_allocatable=False,
1083
                                       include_lvs=check_exclusive_pvs)
1084
    if check_exclusive_pvs:
1085
      result[constants.NV_EXCLUSIVEPVS] = _CheckExclusivePvs(val)
1086
      for pvi in val:
1087
        # Avoid sending useless data on the wire
1088
        pvi.lv_list = []
1089
    result[constants.NV_PVLIST] = map(objects.LvmPvInfo.ToDict, val)
1090

    
1091
  if constants.NV_VERSION in what:
1092
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
1093
                                    constants.RELEASE_VERSION)
1094

    
1095
  _VerifyNodeInfo(what, vm_capable, result, all_hvparams)
1096

    
1097
  if constants.NV_DRBDVERSION in what and vm_capable:
1098
    try:
1099
      drbd_version = DRBD8.GetProcInfo().GetVersionString()
1100
    except errors.BlockDeviceError, err:
1101
      logging.warning("Can't get DRBD version", exc_info=True)
1102
      drbd_version = str(err)
1103
    result[constants.NV_DRBDVERSION] = drbd_version
1104

    
1105
  if constants.NV_DRBDLIST in what and vm_capable:
1106
    try:
1107
      used_minors = drbd.DRBD8.GetUsedDevs()
1108
    except errors.BlockDeviceError, err:
1109
      logging.warning("Can't get used minors list", exc_info=True)
1110
      used_minors = str(err)
1111
    result[constants.NV_DRBDLIST] = used_minors
1112

    
1113
  if constants.NV_DRBDHELPER in what and vm_capable:
1114
    status = True
1115
    try:
1116
      payload = drbd.DRBD8.GetUsermodeHelper()
1117
    except errors.BlockDeviceError, err:
1118
      logging.error("Can't get DRBD usermode helper: %s", str(err))
1119
      status = False
1120
      payload = str(err)
1121
    result[constants.NV_DRBDHELPER] = (status, payload)
1122

    
1123
  if constants.NV_NODESETUP in what:
1124
    result[constants.NV_NODESETUP] = tmpr = []
1125
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
1126
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
1127
                  " under /sys, missing required directories /sys/block"
1128
                  " and /sys/class/net")
1129
    if (not os.path.isdir("/proc/sys") or
1130
        not os.path.isfile("/proc/sysrq-trigger")):
1131
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
1132
                  " under /proc, missing required directory /proc/sys and"
1133
                  " the file /proc/sysrq-trigger")
1134

    
1135
  if constants.NV_TIME in what:
1136
    result[constants.NV_TIME] = utils.SplitTime(time.time())
1137

    
1138
  if constants.NV_OSLIST in what and vm_capable:
1139
    result[constants.NV_OSLIST] = DiagnoseOS()
1140

    
1141
  if constants.NV_BRIDGES in what and vm_capable:
1142
    result[constants.NV_BRIDGES] = [bridge
1143
                                    for bridge in what[constants.NV_BRIDGES]
1144
                                    if not utils.BridgeExists(bridge)]
1145

    
1146
  if what.get(constants.NV_ACCEPTED_STORAGE_PATHS) == my_name:
1147
    result[constants.NV_ACCEPTED_STORAGE_PATHS] = \
1148
        filestorage.ComputeWrongFileStoragePaths()
1149

    
1150
  if what.get(constants.NV_FILE_STORAGE_PATH):
1151
    pathresult = filestorage.CheckFileStoragePath(
1152
        what[constants.NV_FILE_STORAGE_PATH])
1153
    if pathresult:
1154
      result[constants.NV_FILE_STORAGE_PATH] = pathresult
1155

    
1156
  if what.get(constants.NV_SHARED_FILE_STORAGE_PATH):
1157
    pathresult = filestorage.CheckFileStoragePath(
1158
        what[constants.NV_SHARED_FILE_STORAGE_PATH])
1159
    if pathresult:
1160
      result[constants.NV_SHARED_FILE_STORAGE_PATH] = pathresult
1161

    
1162
  return result
1163

    
1164

    
1165
def GetCryptoTokens(token_types):
1166
  """Get the node's public cryptographic tokens.
1167

1168
  This can be the public ssh key of the node or the certificate digest of
1169
  the node's public client SSL certificate.
1170

1171
  Note: so far, only retrieval of the SSL digest is implemented
1172

1173
  @type token_types: list of strings in constants.CRYPTO_TYPES
1174
  @param token_types: list of types of requested cryptographic tokens
1175
  @rtype: list of tuples (string, string)
1176
  @return: list of tuples of the token type and the public crypto token
1177

1178
  """
1179
  tokens = []
1180
  for token_type in token_types:
1181
    if token_type not in constants.CRYPTO_TYPES:
1182
      raise errors.ProgrammerError("Token type %s not supported." %
1183
                                   token_type)
1184
    if token_type == constants.CRYPTO_TYPE_SSL_DIGEST:
1185
      tokens.append((token_type, utils.GetClientCertificateDigest()))
1186
  return tokens
1187

    
1188

    
1189
def GetBlockDevSizes(devices):
1190
  """Return the size of the given block devices
1191

1192
  @type devices: list
1193
  @param devices: list of block device nodes to query
1194
  @rtype: dict
1195
  @return:
1196
    dictionary of all block devices under /dev (key). The value is their
1197
    size in MiB.
1198

1199
    {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
1200

1201
  """
1202
  DEV_PREFIX = "/dev/"
1203
  blockdevs = {}
1204

    
1205
  for devpath in devices:
1206
    if not utils.IsBelowDir(DEV_PREFIX, devpath):
1207
      continue
1208

    
1209
    try:
1210
      st = os.stat(devpath)
1211
    except EnvironmentError, err:
1212
      logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
1213
      continue
1214

    
1215
    if stat.S_ISBLK(st.st_mode):
1216
      result = utils.RunCmd(["blockdev", "--getsize64", devpath])
1217
      if result.failed:
1218
        # We don't want to fail, just do not list this device as available
1219
        logging.warning("Cannot get size for block device %s", devpath)
1220
        continue
1221

    
1222
      size = int(result.stdout) / (1024 * 1024)
1223
      blockdevs[devpath] = size
1224
  return blockdevs
1225

    
1226

    
1227
def GetVolumeList(vg_names):
1228
  """Compute list of logical volumes and their size.
1229

1230
  @type vg_names: list
1231
  @param vg_names: the volume groups whose LVs we should list, or
1232
      empty for all volume groups
1233
  @rtype: dict
1234
  @return:
1235
      dictionary of all partions (key) with value being a tuple of
1236
      their size (in MiB), inactive and online status::
1237

1238
        {'xenvg/test1': ('20.06', True, True)}
1239

1240
      in case of errors, a string is returned with the error
1241
      details.
1242

1243
  """
1244
  lvs = {}
1245
  sep = "|"
1246
  if not vg_names:
1247
    vg_names = []
1248
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
1249
                         "--separator=%s" % sep,
1250
                         "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
1251
  if result.failed:
1252
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
1253

    
1254
  for line in result.stdout.splitlines():
1255
    line = line.strip()
1256
    match = _LVSLINE_REGEX.match(line)
1257
    if not match:
1258
      logging.error("Invalid line returned from lvs output: '%s'", line)
1259
      continue
1260
    vg_name, name, size, attr = match.groups()
1261
    inactive = attr[4] == "-"
1262
    online = attr[5] == "o"
1263
    virtual = attr[0] == "v"
1264
    if virtual:
1265
      # we don't want to report such volumes as existing, since they
1266
      # don't really hold data
1267
      continue
1268
    lvs[vg_name + "/" + name] = (size, inactive, online)
1269

    
1270
  return lvs
1271

    
1272

    
1273
def ListVolumeGroups():
1274
  """List the volume groups and their size.
1275

1276
  @rtype: dict
1277
  @return: dictionary with keys volume name and values the
1278
      size of the volume
1279

1280
  """
1281
  return utils.ListVolumeGroups()
1282

    
1283

    
1284
def NodeVolumes():
1285
  """List all volumes on this node.
1286

1287
  @rtype: list
1288
  @return:
1289
    A list of dictionaries, each having four keys:
1290
      - name: the logical volume name,
1291
      - size: the size of the logical volume
1292
      - dev: the physical device on which the LV lives
1293
      - vg: the volume group to which it belongs
1294

1295
    In case of errors, we return an empty list and log the
1296
    error.
1297

1298
    Note that since a logical volume can live on multiple physical
1299
    volumes, the resulting list might include a logical volume
1300
    multiple times.
1301

1302
  """
1303
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
1304
                         "--separator=|",
1305
                         "--options=lv_name,lv_size,devices,vg_name"])
1306
  if result.failed:
1307
    _Fail("Failed to list logical volumes, lvs output: %s",
1308
          result.output)
1309

    
1310
  def parse_dev(dev):
1311
    return dev.split("(")[0]
1312

    
1313
  def handle_dev(dev):
1314
    return [parse_dev(x) for x in dev.split(",")]
1315

    
1316
  def map_line(line):
1317
    line = [v.strip() for v in line]
1318
    return [{"name": line[0], "size": line[1],
1319
             "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
1320

    
1321
  all_devs = []
1322
  for line in result.stdout.splitlines():
1323
    if line.count("|") >= 3:
1324
      all_devs.extend(map_line(line.split("|")))
1325
    else:
1326
      logging.warning("Strange line in the output from lvs: '%s'", line)
1327
  return all_devs
1328

    
1329

    
1330
def BridgesExist(bridges_list):
1331
  """Check if a list of bridges exist on the current node.
1332

1333
  @rtype: boolean
1334
  @return: C{True} if all of them exist, C{False} otherwise
1335

1336
  """
1337
  missing = []
1338
  for bridge in bridges_list:
1339
    if not utils.BridgeExists(bridge):
1340
      missing.append(bridge)
1341

    
1342
  if missing:
1343
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
1344

    
1345

    
1346
def GetInstanceListForHypervisor(hname, hvparams=None,
1347
                                 get_hv_fn=hypervisor.GetHypervisor):
1348
  """Provides a list of instances of the given hypervisor.
1349

1350
  @type hname: string
1351
  @param hname: name of the hypervisor
1352
  @type hvparams: dict of strings
1353
  @param hvparams: hypervisor parameters for the given hypervisor
1354
  @type get_hv_fn: function
1355
  @param get_hv_fn: function that returns a hypervisor for the given hypervisor
1356
    name; optional parameter to increase testability
1357

1358
  @rtype: list
1359
  @return: a list of all running instances on the current node
1360
    - instance1.example.com
1361
    - instance2.example.com
1362

1363
  """
1364
  results = []
1365
  try:
1366
    hv = get_hv_fn(hname)
1367
    names = hv.ListInstances(hvparams=hvparams)
1368
    results.extend(names)
1369
  except errors.HypervisorError, err:
1370
    _Fail("Error enumerating instances (hypervisor %s): %s",
1371
          hname, err, exc=True)
1372
  return results
1373

    
1374

    
1375
def GetInstanceList(hypervisor_list, all_hvparams=None,
1376
                    get_hv_fn=hypervisor.GetHypervisor):
1377
  """Provides a list of instances.
1378

1379
  @type hypervisor_list: list
1380
  @param hypervisor_list: the list of hypervisors to query information
1381
  @type all_hvparams: dict of dict of strings
1382
  @param all_hvparams: a dictionary mapping hypervisor types to respective
1383
    cluster-wide hypervisor parameters
1384
  @type get_hv_fn: function
1385
  @param get_hv_fn: function that returns a hypervisor for the given hypervisor
1386
    name; optional parameter to increase testability
1387

1388
  @rtype: list
1389
  @return: a list of all running instances on the current node
1390
    - instance1.example.com
1391
    - instance2.example.com
1392

1393
  """
1394
  results = []
1395
  for hname in hypervisor_list:
1396
    hvparams = all_hvparams[hname]
1397
    results.extend(GetInstanceListForHypervisor(hname, hvparams=hvparams,
1398
                                                get_hv_fn=get_hv_fn))
1399
  return results
1400

    
1401

    
1402
def GetInstanceInfo(instance, hname, hvparams=None):
1403
  """Gives back the information about an instance as a dictionary.
1404

1405
  @type instance: string
1406
  @param instance: the instance name
1407
  @type hname: string
1408
  @param hname: the hypervisor type of the instance
1409
  @type hvparams: dict of strings
1410
  @param hvparams: the instance's hvparams
1411

1412
  @rtype: dict
1413
  @return: dictionary with the following keys:
1414
      - memory: memory size of instance (int)
1415
      - state: state of instance (HvInstanceState)
1416
      - time: cpu time of instance (float)
1417
      - vcpus: the number of vcpus (int)
1418

1419
  """
1420
  output = {}
1421

    
1422
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance,
1423
                                                          hvparams=hvparams)
1424
  if iinfo is not None:
1425
    output["memory"] = iinfo[2]
1426
    output["vcpus"] = iinfo[3]
1427
    output["state"] = iinfo[4]
1428
    output["time"] = iinfo[5]
1429

    
1430
  return output
1431

    
1432

    
1433
def GetInstanceMigratable(instance):
1434
  """Computes whether an instance can be migrated.
1435

1436
  @type instance: L{objects.Instance}
1437
  @param instance: object representing the instance to be checked.
1438

1439
  @rtype: tuple
1440
  @return: tuple of (result, description) where:
1441
      - result: whether the instance can be migrated or not
1442
      - description: a description of the issue, if relevant
1443

1444
  """
1445
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1446
  iname = instance.name
1447
  if iname not in hyper.ListInstances(instance.hvparams):
1448
    _Fail("Instance %s is not running", iname)
1449

    
1450
  for idx in range(len(instance.disks)):
1451
    link_name = _GetBlockDevSymlinkPath(iname, idx)
1452
    if not os.path.islink(link_name):
1453
      logging.warning("Instance %s is missing symlink %s for disk %d",
1454
                      iname, link_name, idx)
1455

    
1456

    
1457
def GetAllInstancesInfo(hypervisor_list, all_hvparams):
1458
  """Gather data about all instances.
1459

1460
  This is the equivalent of L{GetInstanceInfo}, except that it
1461
  computes data for all instances at once, thus being faster if one
1462
  needs data about more than one instance.
1463

1464
  @type hypervisor_list: list
1465
  @param hypervisor_list: list of hypervisors to query for instance data
1466
  @type all_hvparams: dict of dict of strings
1467
  @param all_hvparams: mapping of hypervisor names to hvparams
1468

1469
  @rtype: dict
1470
  @return: dictionary of instance: data, with data having the following keys:
1471
      - memory: memory size of instance (int)
1472
      - state: xen state of instance (string)
1473
      - time: cpu time of instance (float)
1474
      - vcpus: the number of vcpus
1475

1476
  """
1477
  output = {}
1478
  for hname in hypervisor_list:
1479
    hvparams = all_hvparams[hname]
1480
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo(hvparams)
1481
    if iinfo:
1482
      for name, _, memory, vcpus, state, times in iinfo:
1483
        value = {
1484
          "memory": memory,
1485
          "vcpus": vcpus,
1486
          "state": state,
1487
          "time": times,
1488
          }
1489
        if name in output:
1490
          # we only check static parameters, like memory and vcpus,
1491
          # and not state and time which can change between the
1492
          # invocations of the different hypervisors
1493
          for key in "memory", "vcpus":
1494
            if value[key] != output[name][key]:
1495
              _Fail("Instance %s is running twice"
1496
                    " with different parameters", name)
1497
        output[name] = value
1498

    
1499
  return output
1500

    
1501

    
1502
def GetInstanceConsoleInfo(instance_param_dict,
1503
                           get_hv_fn=hypervisor.GetHypervisor):
1504
  """Gather data about the console access of a set of instances of this node.
1505

1506
  This function assumes that the caller already knows which instances are on
1507
  this node, by calling a function such as L{GetAllInstancesInfo} or
1508
  L{GetInstanceList}.
1509

1510
  For every instance, a large amount of configuration data needs to be
1511
  provided to the hypervisor interface in order to receive the console
1512
  information. Whether this could or should be cut down can be discussed.
1513
  The information is provided in a dictionary indexed by instance name,
1514
  allowing any number of instance queries to be done.
1515

1516
  @type instance_param_dict: dict of string to tuple of dictionaries, where the
1517
    dictionaries represent: L{objects.Instance}, L{objects.Node},
1518
    L{objects.NodeGroup}, HvParams, BeParams
1519
  @param instance_param_dict: mapping of instance name to parameters necessary
1520
    for console information retrieval
1521

1522
  @rtype: dict
1523
  @return: dictionary of instance: data, with data having the following keys:
1524
      - instance: instance name
1525
      - kind: console kind
1526
      - message: used with kind == CONS_MESSAGE, indicates console to be
1527
                 unavailable, supplies error message
1528
      - host: host to connect to
1529
      - port: port to use
1530
      - user: user for login
1531
      - command: the command, broken into parts as an array
1532
      - display: unknown, potentially unused?
1533

1534
  """
1535

    
1536
  output = {}
1537
  for inst_name in instance_param_dict:
1538
    instance = instance_param_dict[inst_name]["instance"]
1539
    pnode = instance_param_dict[inst_name]["node"]
1540
    group = instance_param_dict[inst_name]["group"]
1541
    hvparams = instance_param_dict[inst_name]["hvParams"]
1542
    beparams = instance_param_dict[inst_name]["beParams"]
1543

    
1544
    instance = objects.Instance.FromDict(instance)
1545
    pnode = objects.Node.FromDict(pnode)
1546
    group = objects.NodeGroup.FromDict(group)
1547

    
1548
    h = get_hv_fn(instance.hypervisor)
1549
    output[inst_name] = h.GetInstanceConsole(instance, pnode, group,
1550
                                             hvparams, beparams).ToDict()
1551

    
1552
  return output
1553

    
1554

    
1555
def _InstanceLogName(kind, os_name, instance, component):
1556
  """Compute the OS log filename for a given instance and operation.
1557

1558
  The instance name and os name are passed in as strings since not all
1559
  operations have these as part of an instance object.
1560

1561
  @type kind: string
1562
  @param kind: the operation type (e.g. add, import, etc.)
1563
  @type os_name: string
1564
  @param os_name: the os name
1565
  @type instance: string
1566
  @param instance: the name of the instance being imported/added/etc.
1567
  @type component: string or None
1568
  @param component: the name of the component of the instance being
1569
      transferred
1570

1571
  """
1572
  # TODO: Use tempfile.mkstemp to create unique filename
1573
  if component:
1574
    assert "/" not in component
1575
    c_msg = "-%s" % component
1576
  else:
1577
    c_msg = ""
1578
  base = ("%s-%s-%s%s-%s.log" %
1579
          (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1580
  return utils.PathJoin(pathutils.LOG_OS_DIR, base)
1581

    
1582

    
1583
def InstanceOsAdd(instance, reinstall, debug):
1584
  """Add an OS to an instance.
1585

1586
  @type instance: L{objects.Instance}
1587
  @param instance: Instance whose OS is to be installed
1588
  @type reinstall: boolean
1589
  @param reinstall: whether this is an instance reinstall
1590
  @type debug: integer
1591
  @param debug: debug level, passed to the OS scripts
1592
  @rtype: None
1593

1594
  """
1595
  inst_os = OSFromDisk(instance.os)
1596

    
1597
  create_env = OSEnvironment(instance, inst_os, debug)
1598
  if reinstall:
1599
    create_env["INSTANCE_REINSTALL"] = "1"
1600

    
1601
  logfile = _InstanceLogName("add", instance.os, instance.name, None)
1602

    
1603
  result = utils.RunCmd([inst_os.create_script], env=create_env,
1604
                        cwd=inst_os.path, output=logfile, reset_env=True)
1605
  if result.failed:
1606
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
1607
                  " output: %s", result.cmd, result.fail_reason, logfile,
1608
                  result.output)
1609
    lines = [utils.SafeEncode(val)
1610
             for val in utils.TailFile(logfile, lines=20)]
1611
    _Fail("OS create script failed (%s), last lines in the"
1612
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1613

    
1614

    
1615
def RunRenameInstance(instance, old_name, debug):
1616
  """Run the OS rename script for an instance.
1617

1618
  @type instance: L{objects.Instance}
1619
  @param instance: Instance whose OS is to be installed
1620
  @type old_name: string
1621
  @param old_name: previous instance name
1622
  @type debug: integer
1623
  @param debug: debug level, passed to the OS scripts
1624
  @rtype: boolean
1625
  @return: the success of the operation
1626

1627
  """
1628
  inst_os = OSFromDisk(instance.os)
1629

    
1630
  rename_env = OSEnvironment(instance, inst_os, debug)
1631
  rename_env["OLD_INSTANCE_NAME"] = old_name
1632

    
1633
  logfile = _InstanceLogName("rename", instance.os,
1634
                             "%s-%s" % (old_name, instance.name), None)
1635

    
1636
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1637
                        cwd=inst_os.path, output=logfile, reset_env=True)
1638

    
1639
  if result.failed:
1640
    logging.error("os create command '%s' returned error: %s output: %s",
1641
                  result.cmd, result.fail_reason, result.output)
1642
    lines = [utils.SafeEncode(val)
1643
             for val in utils.TailFile(logfile, lines=20)]
1644
    _Fail("OS rename script failed (%s), last lines in the"
1645
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1646

    
1647

    
1648
def _GetBlockDevSymlinkPath(instance_name, idx, _dir=None):
1649
  """Returns symlink path for block device.
1650

1651
  """
1652
  if _dir is None:
1653
    _dir = pathutils.DISK_LINKS_DIR
1654

    
1655
  return utils.PathJoin(_dir,
1656
                        ("%s%s%s" %
1657
                         (instance_name, constants.DISK_SEPARATOR, idx)))
1658

    
1659

    
1660
def _SymlinkBlockDev(instance_name, device_path, idx):
1661
  """Set up symlinks to a instance's block device.
1662

1663
  This is an auxiliary function run when an instance is start (on the primary
1664
  node) or when an instance is migrated (on the target node).
1665

1666

1667
  @param instance_name: the name of the target instance
1668
  @param device_path: path of the physical block device, on the node
1669
  @param idx: the disk index
1670
  @return: absolute path to the disk's symlink
1671

1672
  """
1673
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1674
  try:
1675
    os.symlink(device_path, link_name)
1676
  except OSError, err:
1677
    if err.errno == errno.EEXIST:
1678
      if (not os.path.islink(link_name) or
1679
          os.readlink(link_name) != device_path):
1680
        os.remove(link_name)
1681
        os.symlink(device_path, link_name)
1682
    else:
1683
      raise
1684

    
1685
  return link_name
1686

    
1687

    
1688
def _RemoveBlockDevLinks(instance_name, disks):
1689
  """Remove the block device symlinks belonging to the given instance.
1690

1691
  """
1692
  for idx, _ in enumerate(disks):
1693
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1694
    if os.path.islink(link_name):
1695
      try:
1696
        os.remove(link_name)
1697
      except OSError:
1698
        logging.exception("Can't remove symlink '%s'", link_name)
1699

    
1700

    
1701
def _CalculateDeviceURI(instance, disk, device):
1702
  """Get the URI for the device.
1703

1704
  @type instance: L{objects.Instance}
1705
  @param instance: the instance which disk belongs to
1706
  @type disk: L{objects.Disk}
1707
  @param disk: the target disk object
1708
  @type device: L{bdev.BlockDev}
1709
  @param device: the corresponding BlockDevice
1710
  @rtype: string
1711
  @return: the device uri if any else None
1712

1713
  """
1714
  access_mode = disk.params.get(constants.LDP_ACCESS,
1715
                                constants.DISK_KERNELSPACE)
1716
  if access_mode == constants.DISK_USERSPACE:
1717
    # This can raise errors.BlockDeviceError
1718
    return device.GetUserspaceAccessUri(instance.hypervisor)
1719
  else:
1720
    return None
1721

    
1722

    
1723
def _GatherAndLinkBlockDevs(instance):
1724
  """Set up an instance's block device(s).
1725

1726
  This is run on the primary node at instance startup. The block
1727
  devices must be already assembled.
1728

1729
  @type instance: L{objects.Instance}
1730
  @param instance: the instance whose disks we should assemble
1731
  @rtype: list
1732
  @return: list of (disk_object, link_name, drive_uri)
1733

1734
  """
1735
  block_devices = []
1736
  for idx, disk in enumerate(instance.disks):
1737
    device = _RecursiveFindBD(disk)
1738
    if device is None:
1739
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1740
                                    str(disk))
1741
    device.Open()
1742
    try:
1743
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1744
    except OSError, e:
1745
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1746
                                    e.strerror)
1747
    uri = _CalculateDeviceURI(instance, disk, device)
1748

    
1749
    block_devices.append((disk, link_name, uri))
1750

    
1751
  return block_devices
1752

    
1753

    
1754
def StartInstance(instance, startup_paused, reason, store_reason=True):
1755
  """Start an instance.
1756

1757
  @type instance: L{objects.Instance}
1758
  @param instance: the instance object
1759
  @type startup_paused: bool
1760
  @param instance: pause instance at startup?
1761
  @type reason: list of reasons
1762
  @param reason: the reason trail for this startup
1763
  @type store_reason: boolean
1764
  @param store_reason: whether to store the shutdown reason trail on file
1765
  @rtype: None
1766

1767
  """
1768
  running_instances = GetInstanceListForHypervisor(instance.hypervisor,
1769
                                                   instance.hvparams)
1770

    
1771
  if instance.name in running_instances:
1772
    logging.info("Instance %s already running, not starting", instance.name)
1773
    return
1774

    
1775
  try:
1776
    block_devices = _GatherAndLinkBlockDevs(instance)
1777
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1778
    hyper.StartInstance(instance, block_devices, startup_paused)
1779
    if store_reason:
1780
      _StoreInstReasonTrail(instance.name, reason)
1781
  except errors.BlockDeviceError, err:
1782
    _Fail("Block device error: %s", err, exc=True)
1783
  except errors.HypervisorError, err:
1784
    _RemoveBlockDevLinks(instance.name, instance.disks)
1785
    _Fail("Hypervisor error: %s", err, exc=True)
1786

    
1787

    
1788
def InstanceShutdown(instance, timeout, reason, store_reason=True):
1789
  """Shut an instance down.
1790

1791
  @note: this functions uses polling with a hardcoded timeout.
1792

1793
  @type instance: L{objects.Instance}
1794
  @param instance: the instance object
1795
  @type timeout: integer
1796
  @param timeout: maximum timeout for soft shutdown
1797
  @type reason: list of reasons
1798
  @param reason: the reason trail for this shutdown
1799
  @type store_reason: boolean
1800
  @param store_reason: whether to store the shutdown reason trail on file
1801
  @rtype: None
1802

1803
  """
1804
  hv_name = instance.hypervisor
1805
  hyper = hypervisor.GetHypervisor(hv_name)
1806
  iname = instance.name
1807

    
1808
  if instance.name not in hyper.ListInstances(instance.hvparams):
1809
    logging.info("Instance %s not running, doing nothing", iname)
1810
    return
1811

    
1812
  class _TryShutdown:
1813
    def __init__(self):
1814
      self.tried_once = False
1815

    
1816
    def __call__(self):
1817
      if iname not in hyper.ListInstances(instance.hvparams):
1818
        return
1819

    
1820
      try:
1821
        hyper.StopInstance(instance, retry=self.tried_once)
1822
        if store_reason:
1823
          _StoreInstReasonTrail(instance.name, reason)
1824
      except errors.HypervisorError, err:
1825
        if iname not in hyper.ListInstances(instance.hvparams):
1826
          # if the instance is no longer existing, consider this a
1827
          # success and go to cleanup
1828
          return
1829

    
1830
        _Fail("Failed to stop instance %s: %s", iname, err)
1831

    
1832
      self.tried_once = True
1833

    
1834
      raise utils.RetryAgain()
1835

    
1836
  try:
1837
    utils.Retry(_TryShutdown(), 5, timeout)
1838
  except utils.RetryTimeout:
1839
    # the shutdown did not succeed
1840
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1841

    
1842
    try:
1843
      hyper.StopInstance(instance, force=True)
1844
    except errors.HypervisorError, err:
1845
      if iname in hyper.ListInstances(instance.hvparams):
1846
        # only raise an error if the instance still exists, otherwise
1847
        # the error could simply be "instance ... unknown"!
1848
        _Fail("Failed to force stop instance %s: %s", iname, err)
1849

    
1850
    time.sleep(1)
1851

    
1852
    if iname in hyper.ListInstances(instance.hvparams):
1853
      _Fail("Could not shutdown instance %s even by destroy", iname)
1854

    
1855
  try:
1856
    hyper.CleanupInstance(instance.name)
1857
  except errors.HypervisorError, err:
1858
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1859

    
1860
  _RemoveBlockDevLinks(iname, instance.disks)
1861

    
1862

    
1863
def InstanceReboot(instance, reboot_type, shutdown_timeout, reason):
1864
  """Reboot an instance.
1865

1866
  @type instance: L{objects.Instance}
1867
  @param instance: the instance object to reboot
1868
  @type reboot_type: str
1869
  @param reboot_type: the type of reboot, one the following
1870
    constants:
1871
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1872
        instance OS, do not recreate the VM
1873
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1874
        restart the VM (at the hypervisor level)
1875
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1876
        not accepted here, since that mode is handled differently, in
1877
        cmdlib, and translates into full stop and start of the
1878
        instance (instead of a call_instance_reboot RPC)
1879
  @type shutdown_timeout: integer
1880
  @param shutdown_timeout: maximum timeout for soft shutdown
1881
  @type reason: list of reasons
1882
  @param reason: the reason trail for this reboot
1883
  @rtype: None
1884

1885
  """
1886
  running_instances = GetInstanceListForHypervisor(instance.hypervisor,
1887
                                                   instance.hvparams)
1888

    
1889
  if instance.name not in running_instances:
1890
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1891

    
1892
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1893
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1894
    try:
1895
      hyper.RebootInstance(instance)
1896
    except errors.HypervisorError, err:
1897
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1898
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1899
    try:
1900
      InstanceShutdown(instance, shutdown_timeout, reason, store_reason=False)
1901
      result = StartInstance(instance, False, reason, store_reason=False)
1902
      _StoreInstReasonTrail(instance.name, reason)
1903
      return result
1904
    except errors.HypervisorError, err:
1905
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1906
  else:
1907
    _Fail("Invalid reboot_type received: %s", reboot_type)
1908

    
1909

    
1910
def InstanceBalloonMemory(instance, memory):
1911
  """Resize an instance's memory.
1912

1913
  @type instance: L{objects.Instance}
1914
  @param instance: the instance object
1915
  @type memory: int
1916
  @param memory: new memory amount in MB
1917
  @rtype: None
1918

1919
  """
1920
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1921
  running = hyper.ListInstances(instance.hvparams)
1922
  if instance.name not in running:
1923
    logging.info("Instance %s is not running, cannot balloon", instance.name)
1924
    return
1925
  try:
1926
    hyper.BalloonInstanceMemory(instance, memory)
1927
  except errors.HypervisorError, err:
1928
    _Fail("Failed to balloon instance memory: %s", err, exc=True)
1929

    
1930

    
1931
def MigrationInfo(instance):
1932
  """Gather information about an instance to be migrated.
1933

1934
  @type instance: L{objects.Instance}
1935
  @param instance: the instance definition
1936

1937
  """
1938
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1939
  try:
1940
    info = hyper.MigrationInfo(instance)
1941
  except errors.HypervisorError, err:
1942
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1943
  return info
1944

    
1945

    
1946
def AcceptInstance(instance, info, target):
1947
  """Prepare the node to accept an instance.
1948

1949
  @type instance: L{objects.Instance}
1950
  @param instance: the instance definition
1951
  @type info: string/data (opaque)
1952
  @param info: migration information, from the source node
1953
  @type target: string
1954
  @param target: target host (usually ip), on this node
1955

1956
  """
1957
  # TODO: why is this required only for DTS_EXT_MIRROR?
1958
  if instance.disk_template in constants.DTS_EXT_MIRROR:
1959
    # Create the symlinks, as the disks are not active
1960
    # in any way
1961
    try:
1962
      _GatherAndLinkBlockDevs(instance)
1963
    except errors.BlockDeviceError, err:
1964
      _Fail("Block device error: %s", err, exc=True)
1965

    
1966
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1967
  try:
1968
    hyper.AcceptInstance(instance, info, target)
1969
  except errors.HypervisorError, err:
1970
    if instance.disk_template in constants.DTS_EXT_MIRROR:
1971
      _RemoveBlockDevLinks(instance.name, instance.disks)
1972
    _Fail("Failed to accept instance: %s", err, exc=True)
1973

    
1974

    
1975
def FinalizeMigrationDst(instance, info, success):
1976
  """Finalize any preparation to accept an instance.
1977

1978
  @type instance: L{objects.Instance}
1979
  @param instance: the instance definition
1980
  @type info: string/data (opaque)
1981
  @param info: migration information, from the source node
1982
  @type success: boolean
1983
  @param success: whether the migration was a success or a failure
1984

1985
  """
1986
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1987
  try:
1988
    hyper.FinalizeMigrationDst(instance, info, success)
1989
  except errors.HypervisorError, err:
1990
    _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1991

    
1992

    
1993
def MigrateInstance(cluster_name, instance, target, live):
1994
  """Migrates an instance to another node.
1995

1996
  @type cluster_name: string
1997
  @param cluster_name: name of the cluster
1998
  @type instance: L{objects.Instance}
1999
  @param instance: the instance definition
2000
  @type target: string
2001
  @param target: the target node name
2002
  @type live: boolean
2003
  @param live: whether the migration should be done live or not (the
2004
      interpretation of this parameter is left to the hypervisor)
2005
  @raise RPCFail: if migration fails for some reason
2006

2007
  """
2008
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2009

    
2010
  try:
2011
    hyper.MigrateInstance(cluster_name, instance, target, live)
2012
  except errors.HypervisorError, err:
2013
    _Fail("Failed to migrate instance: %s", err, exc=True)
2014

    
2015

    
2016
def FinalizeMigrationSource(instance, success, live):
2017
  """Finalize the instance migration on the source node.
2018

2019
  @type instance: L{objects.Instance}
2020
  @param instance: the instance definition of the migrated instance
2021
  @type success: bool
2022
  @param success: whether the migration succeeded or not
2023
  @type live: bool
2024
  @param live: whether the user requested a live migration or not
2025
  @raise RPCFail: If the execution fails for some reason
2026

2027
  """
2028
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2029

    
2030
  try:
2031
    hyper.FinalizeMigrationSource(instance, success, live)
2032
  except Exception, err:  # pylint: disable=W0703
2033
    _Fail("Failed to finalize the migration on the source node: %s", err,
2034
          exc=True)
2035

    
2036

    
2037
def GetMigrationStatus(instance):
2038
  """Get the migration status
2039

2040
  @type instance: L{objects.Instance}
2041
  @param instance: the instance that is being migrated
2042
  @rtype: L{objects.MigrationStatus}
2043
  @return: the status of the current migration (one of
2044
           L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
2045
           progress info that can be retrieved from the hypervisor
2046
  @raise RPCFail: If the migration status cannot be retrieved
2047

2048
  """
2049
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2050
  try:
2051
    return hyper.GetMigrationStatus(instance)
2052
  except Exception, err:  # pylint: disable=W0703
2053
    _Fail("Failed to get migration status: %s", err, exc=True)
2054

    
2055

    
2056
def HotplugDevice(instance, action, dev_type, device, extra, seq):
2057
  """Hotplug a device
2058

2059
  Hotplug is currently supported only for KVM Hypervisor.
2060
  @type instance: L{objects.Instance}
2061
  @param instance: the instance to which we hotplug a device
2062
  @type action: string
2063
  @param action: the hotplug action to perform
2064
  @type dev_type: string
2065
  @param dev_type: the device type to hotplug
2066
  @type device: either L{objects.NIC} or L{objects.Disk}
2067
  @param device: the device object to hotplug
2068
  @type extra: string
2069
  @param extra: extra info used by hotplug code (e.g. disk link)
2070
  @type seq: int
2071
  @param seq: the index of the device from master perspective
2072
  @raise RPCFail: in case instance does not have KVM hypervisor
2073

2074
  """
2075
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2076
  try:
2077
    hyper.VerifyHotplugSupport(instance, action, dev_type)
2078
  except errors.HotplugError, err:
2079
    _Fail("Hotplug is not supported: %s", err)
2080

    
2081
  if action == constants.HOTPLUG_ACTION_ADD:
2082
    fn = hyper.HotAddDevice
2083
  elif action == constants.HOTPLUG_ACTION_REMOVE:
2084
    fn = hyper.HotDelDevice
2085
  elif action == constants.HOTPLUG_ACTION_MODIFY:
2086
    fn = hyper.HotModDevice
2087
  else:
2088
    assert action in constants.HOTPLUG_ALL_ACTIONS
2089

    
2090
  return fn(instance, dev_type, device, extra, seq)
2091

    
2092

    
2093
def HotplugSupported(instance):
2094
  """Checks if hotplug is generally supported.
2095

2096
  """
2097
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2098
  try:
2099
    hyper.HotplugSupported(instance)
2100
  except errors.HotplugError, err:
2101
    _Fail("Hotplug is not supported: %s", err)
2102

    
2103

    
2104
def BlockdevCreate(disk, size, owner, on_primary, info, excl_stor):
2105
  """Creates a block device for an instance.
2106

2107
  @type disk: L{objects.Disk}
2108
  @param disk: the object describing the disk we should create
2109
  @type size: int
2110
  @param size: the size of the physical underlying device, in MiB
2111
  @type owner: str
2112
  @param owner: the name of the instance for which disk is created,
2113
      used for device cache data
2114
  @type on_primary: boolean
2115
  @param on_primary:  indicates if it is the primary node or not
2116
  @type info: string
2117
  @param info: string that will be sent to the physical device
2118
      creation, used for example to set (LVM) tags on LVs
2119
  @type excl_stor: boolean
2120
  @param excl_stor: Whether exclusive_storage is active
2121

2122
  @return: the new unique_id of the device (this can sometime be
2123
      computed only after creation), or None. On secondary nodes,
2124
      it's not required to return anything.
2125

2126
  """
2127
  # TODO: remove the obsolete "size" argument
2128
  # pylint: disable=W0613
2129
  clist = []
2130
  if disk.children:
2131
    for child in disk.children:
2132
      try:
2133
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
2134
      except errors.BlockDeviceError, err:
2135
        _Fail("Can't assemble device %s: %s", child, err)
2136
      if on_primary or disk.AssembleOnSecondary():
2137
        # we need the children open in case the device itself has to
2138
        # be assembled
2139
        try:
2140
          # pylint: disable=E1103
2141
          crdev.Open()
2142
        except errors.BlockDeviceError, err:
2143
          _Fail("Can't make child '%s' read-write: %s", child, err)
2144
      clist.append(crdev)
2145

    
2146
  try:
2147
    device = bdev.Create(disk, clist, excl_stor)
2148
  except errors.BlockDeviceError, err:
2149
    _Fail("Can't create block device: %s", err)
2150

    
2151
  if on_primary or disk.AssembleOnSecondary():
2152
    try:
2153
      device.Assemble()
2154
    except errors.BlockDeviceError, err:
2155
      _Fail("Can't assemble device after creation, unusual event: %s", err)
2156
    if on_primary or disk.OpenOnSecondary():
2157
      try:
2158
        device.Open(force=True)
2159
      except errors.BlockDeviceError, err:
2160
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
2161
    DevCacheManager.UpdateCache(device.dev_path, owner,
2162
                                on_primary, disk.iv_name)
2163

    
2164
  device.SetInfo(info)
2165

    
2166
  return device.unique_id
2167

    
2168

    
2169
def _WipeDevice(path, offset, size):
2170
  """This function actually wipes the device.
2171

2172
  @param path: The path to the device to wipe
2173
  @param offset: The offset in MiB in the file
2174
  @param size: The size in MiB to write
2175

2176
  """
2177
  # Internal sizes are always in Mebibytes; if the following "dd" command
2178
  # should use a different block size the offset and size given to this
2179
  # function must be adjusted accordingly before being passed to "dd".
2180
  block_size = 1024 * 1024
2181

    
2182
  cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
2183
         "bs=%s" % block_size, "oflag=direct", "of=%s" % path,
2184
         "count=%d" % size]
2185
  result = utils.RunCmd(cmd)
2186

    
2187
  if result.failed:
2188
    _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
2189
          result.fail_reason, result.output)
2190

    
2191

    
2192
def BlockdevWipe(disk, offset, size):
2193
  """Wipes a block device.
2194

2195
  @type disk: L{objects.Disk}
2196
  @param disk: the disk object we want to wipe
2197
  @type offset: int
2198
  @param offset: The offset in MiB in the file
2199
  @type size: int
2200
  @param size: The size in MiB to write
2201

2202
  """
2203
  try:
2204
    rdev = _RecursiveFindBD(disk)
2205
  except errors.BlockDeviceError:
2206
    rdev = None
2207

    
2208
  if not rdev:
2209
    _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
2210

    
2211
  # Do cross verify some of the parameters
2212
  if offset < 0:
2213
    _Fail("Negative offset")
2214
  if size < 0:
2215
    _Fail("Negative size")
2216
  if offset > rdev.size:
2217
    _Fail("Offset is bigger than device size")
2218
  if (offset + size) > rdev.size:
2219
    _Fail("The provided offset and size to wipe is bigger than device size")
2220

    
2221
  _WipeDevice(rdev.dev_path, offset, size)
2222

    
2223

    
2224
def BlockdevPauseResumeSync(disks, pause):
2225
  """Pause or resume the sync of the block device.
2226

2227
  @type disks: list of L{objects.Disk}
2228
  @param disks: the disks object we want to pause/resume
2229
  @type pause: bool
2230
  @param pause: Wheater to pause or resume
2231

2232
  """
2233
  success = []
2234
  for disk in disks:
2235
    try:
2236
      rdev = _RecursiveFindBD(disk)
2237
    except errors.BlockDeviceError:
2238
      rdev = None
2239

    
2240
    if not rdev:
2241
      success.append((False, ("Cannot change sync for device %s:"
2242
                              " device not found" % disk.iv_name)))
2243
      continue
2244

    
2245
    result = rdev.PauseResumeSync(pause)
2246

    
2247
    if result:
2248
      success.append((result, None))
2249
    else:
2250
      if pause:
2251
        msg = "Pause"
2252
      else:
2253
        msg = "Resume"
2254
      success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
2255

    
2256
  return success
2257

    
2258

    
2259
def BlockdevRemove(disk):
2260
  """Remove a block device.
2261

2262
  @note: This is intended to be called recursively.
2263

2264
  @type disk: L{objects.Disk}
2265
  @param disk: the disk object we should remove
2266
  @rtype: boolean
2267
  @return: the success of the operation
2268

2269
  """
2270
  msgs = []
2271
  try:
2272
    rdev = _RecursiveFindBD(disk)
2273
  except errors.BlockDeviceError, err:
2274
    # probably can't attach
2275
    logging.info("Can't attach to device %s in remove", disk)
2276
    rdev = None
2277
  if rdev is not None:
2278
    r_path = rdev.dev_path
2279

    
2280
    def _TryRemove():
2281
      try:
2282
        rdev.Remove()
2283
        return []
2284
      except errors.BlockDeviceError, err:
2285
        return [str(err)]
2286

    
2287
    msgs.extend(utils.SimpleRetry([], _TryRemove,
2288
                                  constants.DISK_REMOVE_RETRY_INTERVAL,
2289
                                  constants.DISK_REMOVE_RETRY_TIMEOUT))
2290

    
2291
    if not msgs:
2292
      DevCacheManager.RemoveCache(r_path)
2293

    
2294
  if disk.children:
2295
    for child in disk.children:
2296
      try:
2297
        BlockdevRemove(child)
2298
      except RPCFail, err:
2299
        msgs.append(str(err))
2300

    
2301
  if msgs:
2302
    _Fail("; ".join(msgs))
2303

    
2304

    
2305
def _RecursiveAssembleBD(disk, owner, as_primary):
2306
  """Activate a block device for an instance.
2307

2308
  This is run on the primary and secondary nodes for an instance.
2309

2310
  @note: this function is called recursively.
2311

2312
  @type disk: L{objects.Disk}
2313
  @param disk: the disk we try to assemble
2314
  @type owner: str
2315
  @param owner: the name of the instance which owns the disk
2316
  @type as_primary: boolean
2317
  @param as_primary: if we should make the block device
2318
      read/write
2319

2320
  @return: the assembled device or None (in case no device
2321
      was assembled)
2322
  @raise errors.BlockDeviceError: in case there is an error
2323
      during the activation of the children or the device
2324
      itself
2325

2326
  """
2327
  children = []
2328
  if disk.children:
2329
    mcn = disk.ChildrenNeeded()
2330
    if mcn == -1:
2331
      mcn = 0 # max number of Nones allowed
2332
    else:
2333
      mcn = len(disk.children) - mcn # max number of Nones
2334
    for chld_disk in disk.children:
2335
      try:
2336
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
2337
      except errors.BlockDeviceError, err:
2338
        if children.count(None) >= mcn:
2339
          raise
2340
        cdev = None
2341
        logging.error("Error in child activation (but continuing): %s",
2342
                      str(err))
2343
      children.append(cdev)
2344

    
2345
  if as_primary or disk.AssembleOnSecondary():
2346
    r_dev = bdev.Assemble(disk, children)
2347
    result = r_dev
2348
    if as_primary or disk.OpenOnSecondary():
2349
      r_dev.Open()
2350
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
2351
                                as_primary, disk.iv_name)
2352

    
2353
  else:
2354
    result = True
2355
  return result
2356

    
2357

    
2358
def BlockdevAssemble(disk, owner, as_primary, idx):
2359
  """Activate a block device for an instance.
2360

2361
  This is a wrapper over _RecursiveAssembleBD.
2362

2363
  @rtype: str or boolean
2364
  @return: a tuple with the C{/dev/...} path and the created symlink
2365
      for primary nodes, and (C{True}, C{True}) for secondary nodes
2366

2367
  """
2368
  try:
2369
    result = _RecursiveAssembleBD(disk, owner, as_primary)
2370
    if isinstance(result, BlockDev):
2371
      # pylint: disable=E1103
2372
      dev_path = result.dev_path
2373
      link_name = None
2374
      if as_primary:
2375
        link_name = _SymlinkBlockDev(owner, dev_path, idx)
2376
    elif result:
2377
      return result, result
2378
    else:
2379
      _Fail("Unexpected result from _RecursiveAssembleBD")
2380
  except errors.BlockDeviceError, err:
2381
    _Fail("Error while assembling disk: %s", err, exc=True)
2382
  except OSError, err:
2383
    _Fail("Error while symlinking disk: %s", err, exc=True)
2384

    
2385
  return dev_path, link_name
2386

    
2387

    
2388
def BlockdevShutdown(disk):
2389
  """Shut down a block device.
2390

2391
  First, if the device is assembled (Attach() is successful), then
2392
  the device is shutdown. Then the children of the device are
2393
  shutdown.
2394

2395
  This function is called recursively. Note that we don't cache the
2396
  children or such, as oppossed to assemble, shutdown of different
2397
  devices doesn't require that the upper device was active.
2398

2399
  @type disk: L{objects.Disk}
2400
  @param disk: the description of the disk we should
2401
      shutdown
2402
  @rtype: None
2403

2404
  """
2405
  msgs = []
2406
  r_dev = _RecursiveFindBD(disk)
2407
  if r_dev is not None:
2408
    r_path = r_dev.dev_path
2409
    try:
2410
      r_dev.Shutdown()
2411
      DevCacheManager.RemoveCache(r_path)
2412
    except errors.BlockDeviceError, err:
2413
      msgs.append(str(err))
2414

    
2415
  if disk.children:
2416
    for child in disk.children:
2417
      try:
2418
        BlockdevShutdown(child)
2419
      except RPCFail, err:
2420
        msgs.append(str(err))
2421

    
2422
  if msgs:
2423
    _Fail("; ".join(msgs))
2424

    
2425

    
2426
def BlockdevAddchildren(parent_cdev, new_cdevs):
2427
  """Extend a mirrored block device.
2428

2429
  @type parent_cdev: L{objects.Disk}
2430
  @param parent_cdev: the disk to which we should add children
2431
  @type new_cdevs: list of L{objects.Disk}
2432
  @param new_cdevs: the list of children which we should add
2433
  @rtype: None
2434

2435
  """
2436
  parent_bdev = _RecursiveFindBD(parent_cdev)
2437
  if parent_bdev is None:
2438
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
2439
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
2440
  if new_bdevs.count(None) > 0:
2441
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
2442
  parent_bdev.AddChildren(new_bdevs)
2443

    
2444

    
2445
def BlockdevRemovechildren(parent_cdev, new_cdevs):
2446
  """Shrink a mirrored block device.
2447

2448
  @type parent_cdev: L{objects.Disk}
2449
  @param parent_cdev: the disk from which we should remove children
2450
  @type new_cdevs: list of L{objects.Disk}
2451
  @param new_cdevs: the list of children which we should remove
2452
  @rtype: None
2453

2454
  """
2455
  parent_bdev = _RecursiveFindBD(parent_cdev)
2456
  if parent_bdev is None:
2457
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
2458
  devs = []
2459
  for disk in new_cdevs:
2460
    rpath = disk.StaticDevPath()
2461
    if rpath is None:
2462
      bd = _RecursiveFindBD(disk)
2463
      if bd is None:
2464
        _Fail("Can't find device %s while removing children", disk)
2465
      else:
2466
        devs.append(bd.dev_path)
2467
    else:
2468
      if not utils.IsNormAbsPath(rpath):
2469
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
2470
      devs.append(rpath)
2471
  parent_bdev.RemoveChildren(devs)
2472

    
2473

    
2474
def BlockdevGetmirrorstatus(disks):
2475
  """Get the mirroring status of a list of devices.
2476

2477
  @type disks: list of L{objects.Disk}
2478
  @param disks: the list of disks which we should query
2479
  @rtype: disk
2480
  @return: List of L{objects.BlockDevStatus}, one for each disk
2481
  @raise errors.BlockDeviceError: if any of the disks cannot be
2482
      found
2483

2484
  """
2485
  stats = []
2486
  for dsk in disks:
2487
    rbd = _RecursiveFindBD(dsk)
2488
    if rbd is None:
2489
      _Fail("Can't find device %s", dsk)
2490

    
2491
    stats.append(rbd.CombinedSyncStatus())
2492

    
2493
  return stats
2494

    
2495

    
2496
def BlockdevGetmirrorstatusMulti(disks):
2497
  """Get the mirroring status of a list of devices.
2498

2499
  @type disks: list of L{objects.Disk}
2500
  @param disks: the list of disks which we should query
2501
  @rtype: disk
2502
  @return: List of tuples, (bool, status), one for each disk; bool denotes
2503
    success/failure, status is L{objects.BlockDevStatus} on success, string
2504
    otherwise
2505

2506
  """
2507
  result = []
2508
  for disk in disks:
2509
    try:
2510
      rbd = _RecursiveFindBD(disk)
2511
      if rbd is None:
2512
        result.append((False, "Can't find device %s" % disk))
2513
        continue
2514

    
2515
      status = rbd.CombinedSyncStatus()
2516
    except errors.BlockDeviceError, err:
2517
      logging.exception("Error while getting disk status")
2518
      result.append((False, str(err)))
2519
    else:
2520
      result.append((True, status))
2521

    
2522
  assert len(disks) == len(result)
2523

    
2524
  return result
2525

    
2526

    
2527
def _RecursiveFindBD(disk):
2528
  """Check if a device is activated.
2529

2530
  If so, return information about the real device.
2531

2532
  @type disk: L{objects.Disk}
2533
  @param disk: the disk object we need to find
2534

2535
  @return: None if the device can't be found,
2536
      otherwise the device instance
2537

2538
  """
2539
  children = []
2540
  if disk.children:
2541
    for chdisk in disk.children:
2542
      children.append(_RecursiveFindBD(chdisk))
2543

    
2544
  return bdev.FindDevice(disk, children)
2545

    
2546

    
2547
def _OpenRealBD(disk):
2548
  """Opens the underlying block device of a disk.
2549

2550
  @type disk: L{objects.Disk}
2551
  @param disk: the disk object we want to open
2552

2553
  """
2554
  real_disk = _RecursiveFindBD(disk)
2555
  if real_disk is None:
2556
    _Fail("Block device '%s' is not set up", disk)
2557

    
2558
  real_disk.Open()
2559

    
2560
  return real_disk
2561

    
2562

    
2563
def BlockdevFind(disk):
2564
  """Check if a device is activated.
2565

2566
  If it is, return information about the real device.
2567

2568
  @type disk: L{objects.Disk}
2569
  @param disk: the disk to find
2570
  @rtype: None or objects.BlockDevStatus
2571
  @return: None if the disk cannot be found, otherwise a the current
2572
           information
2573

2574
  """
2575
  try:
2576
    rbd = _RecursiveFindBD(disk)
2577
  except errors.BlockDeviceError, err:
2578
    _Fail("Failed to find device: %s", err, exc=True)
2579

    
2580
  if rbd is None:
2581
    return None
2582

    
2583
  return rbd.GetSyncStatus()
2584

    
2585

    
2586
def BlockdevGetdimensions(disks):
2587
  """Computes the size of the given disks.
2588

2589
  If a disk is not found, returns None instead.
2590

2591
  @type disks: list of L{objects.Disk}
2592
  @param disks: the list of disk to compute the size for
2593
  @rtype: list
2594
  @return: list with elements None if the disk cannot be found,
2595
      otherwise the pair (size, spindles), where spindles is None if the
2596
      device doesn't support that
2597

2598
  """
2599
  result = []
2600
  for cf in disks:
2601
    try:
2602
      rbd = _RecursiveFindBD(cf)
2603
    except errors.BlockDeviceError:
2604
      result.append(None)
2605
      continue
2606
    if rbd is None:
2607
      result.append(None)
2608
    else:
2609
      result.append(rbd.GetActualDimensions())
2610
  return result
2611

    
2612

    
2613
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2614
  """Write a file to the filesystem.
2615

2616
  This allows the master to overwrite(!) a file. It will only perform
2617
  the operation if the file belongs to a list of configuration files.
2618

2619
  @type file_name: str
2620
  @param file_name: the target file name
2621
  @type data: str
2622
  @param data: the new contents of the file
2623
  @type mode: int
2624
  @param mode: the mode to give the file (can be None)
2625
  @type uid: string
2626
  @param uid: the owner of the file
2627
  @type gid: string
2628
  @param gid: the group of the file
2629
  @type atime: float
2630
  @param atime: the atime to set on the file (can be None)
2631
  @type mtime: float
2632
  @param mtime: the mtime to set on the file (can be None)
2633
  @rtype: None
2634

2635
  """
2636
  file_name = vcluster.LocalizeVirtualPath(file_name)
2637

    
2638
  if not os.path.isabs(file_name):
2639
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2640

    
2641
  if file_name not in _ALLOWED_UPLOAD_FILES:
2642
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2643
          file_name)
2644

    
2645
  raw_data = _Decompress(data)
2646

    
2647
  if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2648
    _Fail("Invalid username/groupname type")
2649

    
2650
  getents = runtime.GetEnts()
2651
  uid = getents.LookupUser(uid)
2652
  gid = getents.LookupGroup(gid)
2653

    
2654
  utils.SafeWriteFile(file_name, None,
2655
                      data=raw_data, mode=mode, uid=uid, gid=gid,
2656
                      atime=atime, mtime=mtime)
2657

    
2658

    
2659
def RunOob(oob_program, command, node, timeout):
2660
  """Executes oob_program with given command on given node.
2661

2662
  @param oob_program: The path to the executable oob_program
2663
  @param command: The command to invoke on oob_program
2664
  @param node: The node given as an argument to the program
2665
  @param timeout: Timeout after which we kill the oob program
2666

2667
  @return: stdout
2668
  @raise RPCFail: If execution fails for some reason
2669

2670
  """
2671
  result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2672

    
2673
  if result.failed:
2674
    _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2675
          result.fail_reason, result.output)
2676

    
2677
  return result.stdout
2678

    
2679

    
2680
def _OSOndiskAPIVersion(os_dir):
2681
  """Compute and return the API version of a given OS.
2682

2683
  This function will try to read the API version of the OS residing in
2684
  the 'os_dir' directory.
2685

2686
  @type os_dir: str
2687
  @param os_dir: the directory in which we should look for the OS
2688
  @rtype: tuple
2689
  @return: tuple (status, data) with status denoting the validity and
2690
      data holding either the vaid versions or an error message
2691

2692
  """
2693
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2694

    
2695
  try:
2696
    st = os.stat(api_file)
2697
  except EnvironmentError, err:
2698
    return False, ("Required file '%s' not found under path %s: %s" %
2699
                   (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err)))
2700

    
2701
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2702
    return False, ("File '%s' in %s is not a regular file" %
2703
                   (constants.OS_API_FILE, os_dir))
2704

    
2705
  try:
2706
    api_versions = utils.ReadFile(api_file).splitlines()
2707
  except EnvironmentError, err:
2708
    return False, ("Error while reading the API version file at %s: %s" %
2709
                   (api_file, utils.ErrnoOrStr(err)))
2710

    
2711
  try:
2712
    api_versions = [int(version.strip()) for version in api_versions]
2713
  except (TypeError, ValueError), err:
2714
    return False, ("API version(s) can't be converted to integer: %s" %
2715
                   str(err))
2716

    
2717
  return True, api_versions
2718

    
2719

    
2720
def DiagnoseOS(top_dirs=None):
2721
  """Compute the validity for all OSes.
2722

2723
  @type top_dirs: list
2724
  @param top_dirs: the list of directories in which to
2725
      search (if not given defaults to
2726
      L{pathutils.OS_SEARCH_PATH})
2727
  @rtype: list of L{objects.OS}
2728
  @return: a list of tuples (name, path, status, diagnose, variants,
2729
      parameters, api_version) for all (potential) OSes under all
2730
      search paths, where:
2731
          - name is the (potential) OS name
2732
          - path is the full path to the OS
2733
          - status True/False is the validity of the OS
2734
          - diagnose is the error message for an invalid OS, otherwise empty
2735
          - variants is a list of supported OS variants, if any
2736
          - parameters is a list of (name, help) parameters, if any
2737
          - api_version is a list of support OS API versions
2738

2739
  """
2740
  if top_dirs is None:
2741
    top_dirs = pathutils.OS_SEARCH_PATH
2742

    
2743
  result = []
2744
  for dir_name in top_dirs:
2745
    if os.path.isdir(dir_name):
2746
      try:
2747
        f_names = utils.ListVisibleFiles(dir_name)
2748
      except EnvironmentError, err:
2749
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2750
        break
2751
      for name in f_names:
2752
        os_path = utils.PathJoin(dir_name, name)
2753
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2754
        if status:
2755
          diagnose = ""
2756
          variants = os_inst.supported_variants
2757
          parameters = os_inst.supported_parameters
2758
          api_versions = os_inst.api_versions
2759
        else:
2760
          diagnose = os_inst
2761
          variants = parameters = api_versions = []
2762
        result.append((name, os_path, status, diagnose, variants,
2763
                       parameters, api_versions))
2764

    
2765
  return result
2766

    
2767

    
2768
def _TryOSFromDisk(name, base_dir=None):
2769
  """Create an OS instance from disk.
2770

2771
  This function will return an OS instance if the given name is a
2772
  valid OS name.
2773

2774
  @type base_dir: string
2775
  @keyword base_dir: Base directory containing OS installations.
2776
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2777
  @rtype: tuple
2778
  @return: success and either the OS instance if we find a valid one,
2779
      or error message
2780

2781
  """
2782
  if base_dir is None:
2783
    os_dir = utils.FindFile(name, pathutils.OS_SEARCH_PATH, os.path.isdir)
2784
  else:
2785
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2786

    
2787
  if os_dir is None:
2788
    return False, "Directory for OS %s not found in search path" % name
2789

    
2790
  status, api_versions = _OSOndiskAPIVersion(os_dir)
2791
  if not status:
2792
    # push the error up
2793
    return status, api_versions
2794

    
2795
  if not constants.OS_API_VERSIONS.intersection(api_versions):
2796
    return False, ("API version mismatch for path '%s': found %s, want %s." %
2797
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
2798

    
2799
  # OS Files dictionary, we will populate it with the absolute path
2800
  # names; if the value is True, then it is a required file, otherwise
2801
  # an optional one
2802
  os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2803

    
2804
  if max(api_versions) >= constants.OS_API_V15:
2805
    os_files[constants.OS_VARIANTS_FILE] = False
2806

    
2807
  if max(api_versions) >= constants.OS_API_V20:
2808
    os_files[constants.OS_PARAMETERS_FILE] = True
2809
  else:
2810
    del os_files[constants.OS_SCRIPT_VERIFY]
2811

    
2812
  for (filename, required) in os_files.items():
2813
    os_files[filename] = utils.PathJoin(os_dir, filename)
2814

    
2815
    try:
2816
      st = os.stat(os_files[filename])
2817
    except EnvironmentError, err:
2818
      if err.errno == errno.ENOENT and not required:
2819
        del os_files[filename]
2820
        continue
2821
      return False, ("File '%s' under path '%s' is missing (%s)" %
2822
                     (filename, os_dir, utils.ErrnoOrStr(err)))
2823

    
2824
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2825
      return False, ("File '%s' under path '%s' is not a regular file" %
2826
                     (filename, os_dir))
2827

    
2828
    if filename in constants.OS_SCRIPTS:
2829
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2830
        return False, ("File '%s' under path '%s' is not executable" %
2831
                       (filename, os_dir))
2832

    
2833
  variants = []
2834
  if constants.OS_VARIANTS_FILE in os_files:
2835
    variants_file = os_files[constants.OS_VARIANTS_FILE]
2836
    try:
2837
      variants = \
2838
        utils.FilterEmptyLinesAndComments(utils.ReadFile(variants_file))
2839
    except EnvironmentError, err:
2840
      # we accept missing files, but not other errors
2841
      if err.errno != errno.ENOENT:
2842
        return False, ("Error while reading the OS variants file at %s: %s" %
2843
                       (variants_file, utils.ErrnoOrStr(err)))
2844

    
2845
  parameters = []
2846
  if constants.OS_PARAMETERS_FILE in os_files:
2847
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2848
    try:
2849
      parameters = utils.ReadFile(parameters_file).splitlines()
2850
    except EnvironmentError, err:
2851
      return False, ("Error while reading the OS parameters file at %s: %s" %
2852
                     (parameters_file, utils.ErrnoOrStr(err)))
2853
    parameters = [v.split(None, 1) for v in parameters]
2854

    
2855
  os_obj = objects.OS(name=name, path=os_dir,
2856
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
2857
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
2858
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
2859
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
2860
                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2861
                                                 None),
2862
                      supported_variants=variants,
2863
                      supported_parameters=parameters,
2864
                      api_versions=api_versions)
2865
  return True, os_obj
2866

    
2867

    
2868
def OSFromDisk(name, base_dir=None):
2869
  """Create an OS instance from disk.
2870

2871
  This function will return an OS instance if the given name is a
2872
  valid OS name. Otherwise, it will raise an appropriate
2873
  L{RPCFail} exception, detailing why this is not a valid OS.
2874

2875
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2876
  an exception but returns true/false status data.
2877

2878
  @type base_dir: string
2879
  @keyword base_dir: Base directory containing OS installations.
2880
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2881
  @rtype: L{objects.OS}
2882
  @return: the OS instance if we find a valid one
2883
  @raise RPCFail: if we don't find a valid OS
2884

2885
  """
2886
  name_only = objects.OS.GetName(name)
2887
  status, payload = _TryOSFromDisk(name_only, base_dir)
2888

    
2889
  if not status:
2890
    _Fail(payload)
2891

    
2892
  return payload
2893

    
2894

    
2895
def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2896
  """Calculate the basic environment for an os script.
2897

2898
  @type os_name: str
2899
  @param os_name: full operating system name (including variant)
2900
  @type inst_os: L{objects.OS}
2901
  @param inst_os: operating system for which the environment is being built
2902
  @type os_params: dict
2903
  @param os_params: the OS parameters
2904
  @type debug: integer
2905
  @param debug: debug level (0 or 1, for OS Api 10)
2906
  @rtype: dict
2907
  @return: dict of environment variables
2908
  @raise errors.BlockDeviceError: if the block device
2909
      cannot be found
2910

2911
  """
2912
  result = {}
2913
  api_version = \
2914
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2915
  result["OS_API_VERSION"] = "%d" % api_version
2916
  result["OS_NAME"] = inst_os.name
2917
  result["DEBUG_LEVEL"] = "%d" % debug
2918

    
2919
  # OS variants
2920
  if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2921
    variant = objects.OS.GetVariant(os_name)
2922
    if not variant:
2923
      variant = inst_os.supported_variants[0]
2924
  else:
2925
    variant = ""
2926
  result["OS_VARIANT"] = variant
2927

    
2928
  # OS params
2929
  for pname, pvalue in os_params.items():
2930
    result["OSP_%s" % pname.upper()] = pvalue
2931

    
2932
  # Set a default path otherwise programs called by OS scripts (or
2933
  # even hooks called from OS scripts) might break, and we don't want
2934
  # to have each script require setting a PATH variable
2935
  result["PATH"] = constants.HOOKS_PATH
2936

    
2937
  return result
2938

    
2939

    
2940
def OSEnvironment(instance, inst_os, debug=0):
2941
  """Calculate the environment for an os script.
2942

2943
  @type instance: L{objects.Instance}
2944
  @param instance: target instance for the os script run
2945
  @type inst_os: L{objects.OS}
2946
  @param inst_os: operating system for which the environment is being built
2947
  @type debug: integer
2948
  @param debug: debug level (0 or 1, for OS Api 10)
2949
  @rtype: dict
2950
  @return: dict of environment variables
2951
  @raise errors.BlockDeviceError: if the block device
2952
      cannot be found
2953

2954
  """
2955
  result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2956

    
2957
  for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2958
    result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2959

    
2960
  result["HYPERVISOR"] = instance.hypervisor
2961
  result["DISK_COUNT"] = "%d" % len(instance.disks)
2962
  result["NIC_COUNT"] = "%d" % len(instance.nics)
2963
  result["INSTANCE_SECONDARY_NODES"] = \
2964
      ("%s" % " ".join(instance.secondary_nodes))
2965

    
2966
  # Disks
2967
  for idx, disk in enumerate(instance.disks):
2968
    real_disk = _OpenRealBD(disk)
2969
    result["DISK_%d_PATH" % idx] = real_disk.dev_path
2970
    result["DISK_%d_ACCESS" % idx] = disk.mode
2971
    result["DISK_%d_UUID" % idx] = disk.uuid
2972
    if disk.name:
2973
      result["DISK_%d_NAME" % idx] = disk.name
2974
    if constants.HV_DISK_TYPE in instance.hvparams:
2975
      result["DISK_%d_FRONTEND_TYPE" % idx] = \
2976
        instance.hvparams[constants.HV_DISK_TYPE]
2977
    if disk.dev_type in constants.DTS_BLOCK:
2978
      result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2979
    elif disk.dev_type in constants.DTS_FILEBASED:
2980
      result["DISK_%d_BACKEND_TYPE" % idx] = \
2981
        "file:%s" % disk.logical_id[0]
2982

    
2983
  # NICs
2984
  for idx, nic in enumerate(instance.nics):
2985
    result["NIC_%d_MAC" % idx] = nic.mac
2986
    result["NIC_%d_UUID" % idx] = nic.uuid
2987
    if nic.name:
2988
      result["NIC_%d_NAME" % idx] = nic.name
2989
    if nic.ip:
2990
      result["NIC_%d_IP" % idx] = nic.ip
2991
    result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2992
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2993
      result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2994
    if nic.nicparams[constants.NIC_LINK]:
2995
      result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2996
    if nic.netinfo:
2997
      nobj = objects.Network.FromDict(nic.netinfo)
2998
      result.update(nobj.HooksDict("NIC_%d_" % idx))
2999
    if constants.HV_NIC_TYPE in instance.hvparams:
3000
      result["NIC_%d_FRONTEND_TYPE" % idx] = \
3001
        instance.hvparams[constants.HV_NIC_TYPE]
3002

    
3003
  # HV/BE params
3004
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
3005
    for key, value in source.items():
3006
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
3007

    
3008
  return result
3009

    
3010

    
3011
def DiagnoseExtStorage(top_dirs=None):
3012
  """Compute the validity for all ExtStorage Providers.
3013

3014
  @type top_dirs: list
3015
  @param top_dirs: the list of directories in which to
3016
      search (if not given defaults to
3017
      L{pathutils.ES_SEARCH_PATH})
3018
  @rtype: list of L{objects.ExtStorage}
3019
  @return: a list of tuples (name, path, status, diagnose, parameters)
3020
      for all (potential) ExtStorage Providers under all
3021
      search paths, where:
3022
          - name is the (potential) ExtStorage Provider
3023
          - path is the full path to the ExtStorage Provider
3024
          - status True/False is the validity of the ExtStorage Provider
3025
          - diagnose is the error message for an invalid ExtStorage Provider,
3026
            otherwise empty
3027
          - parameters is a list of (name, help) parameters, if any
3028

3029
  """
3030
  if top_dirs is None:
3031
    top_dirs = pathutils.ES_SEARCH_PATH
3032

    
3033
  result = []
3034
  for dir_name in top_dirs:
3035
    if os.path.isdir(dir_name):
3036
      try:
3037
        f_names = utils.ListVisibleFiles(dir_name)
3038
      except EnvironmentError, err:
3039
        logging.exception("Can't list the ExtStorage directory %s: %s",
3040
                          dir_name, err)
3041
        break
3042
      for name in f_names:
3043
        es_path = utils.PathJoin(dir_name, name)
3044
        status, es_inst = bdev.ExtStorageFromDisk(name, base_dir=dir_name)
3045
        if status:
3046
          diagnose = ""
3047
          parameters = es_inst.supported_parameters
3048
        else:
3049
          diagnose = es_inst
3050
          parameters = []
3051
        result.append((name, es_path, status, diagnose, parameters))
3052

    
3053
  return result
3054

    
3055

    
3056
def BlockdevGrow(disk, amount, dryrun, backingstore, excl_stor):
3057
  """Grow a stack of block devices.
3058

3059
  This function is called recursively, with the childrens being the
3060
  first ones to resize.
3061

3062
  @type disk: L{objects.Disk}
3063
  @param disk: the disk to be grown
3064
  @type amount: integer
3065
  @param amount: the amount (in mebibytes) to grow with
3066
  @type dryrun: boolean
3067
  @param dryrun: whether to execute the operation in simulation mode
3068
      only, without actually increasing the size
3069
  @param backingstore: whether to execute the operation on backing storage
3070
      only, or on "logical" storage only; e.g. DRBD is logical storage,
3071
      whereas LVM, file, RBD are backing storage
3072
  @rtype: (status, result)
3073
  @type excl_stor: boolean
3074
  @param excl_stor: Whether exclusive_storage is active
3075
  @return: a tuple with the status of the operation (True/False), and
3076
      the errors message if status is False
3077

3078
  """
3079
  r_dev = _RecursiveFindBD(disk)
3080
  if r_dev is None:
3081
    _Fail("Cannot find block device %s", disk)
3082

    
3083
  try:
3084
    r_dev.Grow(amount, dryrun, backingstore, excl_stor)
3085
  except errors.BlockDeviceError, err:
3086
    _Fail("Failed to grow block device: %s", err, exc=True)
3087

    
3088

    
3089
def BlockdevSnapshot(disk):
3090
  """Create a snapshot copy of a block device.
3091

3092
  This function is called recursively, and the snapshot is actually created
3093
  just for the leaf lvm backend device.
3094

3095
  @type disk: L{objects.Disk}
3096
  @param disk: the disk to be snapshotted
3097
  @rtype: string
3098
  @return: snapshot disk ID as (vg, lv)
3099

3100
  """
3101
  if disk.dev_type == constants.DT_DRBD8:
3102
    if not disk.children:
3103
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
3104
            disk.unique_id)
3105
    return BlockdevSnapshot(disk.children[0])
3106
  elif disk.dev_type == constants.DT_PLAIN:
3107
    r_dev = _RecursiveFindBD(disk)
3108
    if r_dev is not None:
3109
      # FIXME: choose a saner value for the snapshot size
3110
      # let's stay on the safe side and ask for the full size, for now
3111
      return r_dev.Snapshot(disk.size)
3112
    else:
3113
      _Fail("Cannot find block device %s", disk)
3114
  else:
3115
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
3116
          disk.unique_id, disk.dev_type)
3117

    
3118

    
3119
def BlockdevSetInfo(disk, info):
3120
  """Sets 'metadata' information on block devices.
3121

3122
  This function sets 'info' metadata on block devices. Initial
3123
  information is set at device creation; this function should be used
3124
  for example after renames.
3125

3126
  @type disk: L{objects.Disk}
3127
  @param disk: the disk to be grown
3128
  @type info: string
3129
  @param info: new 'info' metadata
3130
  @rtype: (status, result)
3131
  @return: a tuple with the status of the operation (True/False), and
3132
      the errors message if status is False
3133

3134
  """
3135
  r_dev = _RecursiveFindBD(disk)
3136
  if r_dev is None:
3137
    _Fail("Cannot find block device %s", disk)
3138

    
3139
  try:
3140
    r_dev.SetInfo(info)
3141
  except errors.BlockDeviceError, err:
3142
    _Fail("Failed to set information on block device: %s", err, exc=True)
3143

    
3144

    
3145
def FinalizeExport(instance, snap_disks):
3146
  """Write out the export configuration information.
3147

3148
  @type instance: L{objects.Instance}
3149
  @param instance: the instance which we export, used for
3150
      saving configuration
3151
  @type snap_disks: list of L{objects.Disk}
3152
  @param snap_disks: list of snapshot block devices, which
3153
      will be used to get the actual name of the dump file
3154

3155
  @rtype: None
3156

3157
  """
3158
  destdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name + ".new")
3159
  finaldestdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name)
3160

    
3161
  config = objects.SerializableConfigParser()
3162

    
3163
  config.add_section(constants.INISECT_EXP)
3164
  config.set(constants.INISECT_EXP, "version", "0")
3165
  config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
3166
  config.set(constants.INISECT_EXP, "source", instance.primary_node)
3167
  config.set(constants.INISECT_EXP, "os", instance.os)
3168
  config.set(constants.INISECT_EXP, "compression", "none")
3169

    
3170
  config.add_section(constants.INISECT_INS)
3171
  config.set(constants.INISECT_INS, "name", instance.name)
3172
  config.set(constants.INISECT_INS, "maxmem", "%d" %
3173
             instance.beparams[constants.BE_MAXMEM])
3174
  config.set(constants.INISECT_INS, "minmem", "%d" %
3175
             instance.beparams[constants.BE_MINMEM])
3176
  # "memory" is deprecated, but useful for exporting to old ganeti versions
3177
  config.set(constants.INISECT_INS, "memory", "%d" %
3178
             instance.beparams[constants.BE_MAXMEM])
3179
  config.set(constants.INISECT_INS, "vcpus", "%d" %
3180
             instance.beparams[constants.BE_VCPUS])
3181
  config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
3182
  config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
3183
  config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
3184

    
3185
  nic_total = 0
3186
  for nic_count, nic in enumerate(instance.nics):
3187
    nic_total += 1
3188
    config.set(constants.INISECT_INS, "nic%d_mac" %
3189
               nic_count, "%s" % nic.mac)
3190
    config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
3191
    config.set(constants.INISECT_INS, "nic%d_network" % nic_count,
3192
               "%s" % nic.network)
3193
    for param in constants.NICS_PARAMETER_TYPES:
3194
      config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
3195
                 "%s" % nic.nicparams.get(param, None))
3196
  # TODO: redundant: on load can read nics until it doesn't exist
3197
  config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
3198

    
3199
  disk_total = 0
3200
  for disk_count, disk in enumerate(snap_disks):
3201
    if disk:
3202
      disk_total += 1
3203
      config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
3204
                 ("%s" % disk.iv_name))
3205
      config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
3206
                 ("%s" % disk.logical_id[1]))
3207
      config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
3208
                 ("%d" % disk.size))
3209

    
3210
  config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
3211

    
3212
  # New-style hypervisor/backend parameters
3213

    
3214
  config.add_section(constants.INISECT_HYP)
3215
  for name, value in instance.hvparams.items():
3216
    if name not in constants.HVC_GLOBALS:
3217
      config.set(constants.INISECT_HYP, name, str(value))
3218

    
3219
  config.add_section(constants.INISECT_BEP)
3220
  for name, value in instance.beparams.items():
3221
    config.set(constants.INISECT_BEP, name, str(value))
3222

    
3223
  config.add_section(constants.INISECT_OSP)
3224
  for name, value in instance.osparams.items():
3225
    config.set(constants.INISECT_OSP, name, str(value))
3226

    
3227
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
3228
                  data=config.Dumps())
3229
  shutil.rmtree(finaldestdir, ignore_errors=True)
3230
  shutil.move(destdir, finaldestdir)
3231

    
3232

    
3233
def ExportInfo(dest):
3234
  """Get export configuration information.
3235

3236
  @type dest: str
3237
  @param dest: directory containing the export
3238

3239
  @rtype: L{objects.SerializableConfigParser}
3240
  @return: a serializable config file containing the
3241
      export info
3242

3243
  """
3244
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
3245

    
3246
  config = objects.SerializableConfigParser()
3247
  config.read(cff)
3248

    
3249
  if (not config.has_section(constants.INISECT_EXP) or
3250
      not config.has_section(constants.INISECT_INS)):
3251
    _Fail("Export info file doesn't have the required fields")
3252

    
3253
  return config.Dumps()
3254

    
3255

    
3256
def ListExports():
3257
  """Return a list of exports currently available on this machine.
3258

3259
  @rtype: list
3260
  @return: list of the exports
3261

3262
  """
3263
  if os.path.isdir(pathutils.EXPORT_DIR):
3264
    return sorted(utils.ListVisibleFiles(pathutils.EXPORT_DIR))
3265
  else:
3266
    _Fail("No exports directory")
3267

    
3268

    
3269
def RemoveExport(export):
3270
  """Remove an existing export from the node.
3271

3272
  @type export: str
3273
  @param export: the name of the export to remove
3274
  @rtype: None
3275

3276
  """
3277
  target = utils.PathJoin(pathutils.EXPORT_DIR, export)
3278

    
3279
  try:
3280
    shutil.rmtree(target)
3281
  except EnvironmentError, err:
3282
    _Fail("Error while removing the export: %s", err, exc=True)
3283

    
3284

    
3285
def BlockdevRename(devlist):
3286
  """Rename a list of block devices.
3287

3288
  @type devlist: list of tuples
3289
  @param devlist: list of tuples of the form  (disk, new_unique_id); disk is
3290
      an L{objects.Disk} object describing the current disk, and new
3291
      unique_id is the name we rename it to
3292
  @rtype: boolean
3293
  @return: True if all renames succeeded, False otherwise
3294

3295
  """
3296
  msgs = []
3297
  result = True
3298
  for disk, unique_id in devlist:
3299
    dev = _RecursiveFindBD(disk)
3300
    if dev is None:
3301
      msgs.append("Can't find device %s in rename" % str(disk))
3302
      result = False
3303
      continue
3304
    try:
3305
      old_rpath = dev.dev_path
3306
      dev.Rename(unique_id)
3307
      new_rpath = dev.dev_path
3308
      if old_rpath != new_rpath:
3309
        DevCacheManager.RemoveCache(old_rpath)
3310
        # FIXME: we should add the new cache information here, like:
3311
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
3312
        # but we don't have the owner here - maybe parse from existing
3313
        # cache? for now, we only lose lvm data when we rename, which
3314
        # is less critical than DRBD or MD
3315
    except errors.BlockDeviceError, err:
3316
      msgs.append("Can't rename device '%s' to '%s': %s" %
3317
                  (dev, unique_id, err))
3318
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
3319
      result = False
3320
  if not result:
3321
    _Fail("; ".join(msgs))
3322

    
3323

    
3324
def _TransformFileStorageDir(fs_dir):
3325
  """Checks whether given file_storage_dir is valid.
3326

3327
  Checks wheter the given fs_dir is within the cluster-wide default
3328
  file_storage_dir or the shared_file_storage_dir, which are stored in
3329
  SimpleStore. Only paths under those directories are allowed.
3330

3331
  @type fs_dir: str
3332
  @param fs_dir: the path to check
3333

3334
  @return: the normalized path if valid, None otherwise
3335

3336
  """
3337
  filestorage.CheckFileStoragePath(fs_dir)
3338

    
3339
  return os.path.normpath(fs_dir)
3340

    
3341

    
3342
def CreateFileStorageDir(file_storage_dir):
3343
  """Create file storage directory.
3344

3345
  @type file_storage_dir: str
3346
  @param file_storage_dir: directory to create
3347

3348
  @rtype: tuple
3349
  @return: tuple with first element a boolean indicating wheter dir
3350
      creation was successful or not
3351

3352
  """
3353
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
3354
  if os.path.exists(file_storage_dir):
3355
    if not os.path.isdir(file_storage_dir):
3356
      _Fail("Specified storage dir '%s' is not a directory",
3357
            file_storage_dir)
3358
  else:
3359
    try:
3360
      os.makedirs(file_storage_dir, 0750)
3361
    except OSError, err:
3362
      _Fail("Cannot create file storage directory '%s': %s",
3363
            file_storage_dir, err, exc=True)
3364

    
3365

    
3366
def RemoveFileStorageDir(file_storage_dir):
3367
  """Remove file storage directory.
3368

3369
  Remove it only if it's empty. If not log an error and return.
3370

3371
  @type file_storage_dir: str
3372
  @param file_storage_dir: the directory we should cleanup
3373
  @rtype: tuple (success,)
3374
  @return: tuple of one element, C{success}, denoting
3375
      whether the operation was successful
3376

3377
  """
3378
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
3379
  if os.path.exists(file_storage_dir):
3380
    if not os.path.isdir(file_storage_dir):
3381
      _Fail("Specified Storage directory '%s' is not a directory",
3382
            file_storage_dir)
3383
    # deletes dir only if empty, otherwise we want to fail the rpc call
3384
    try:
3385
      os.rmdir(file_storage_dir)
3386
    except OSError, err:
3387
      _Fail("Cannot remove file storage directory '%s': %s",
3388
            file_storage_dir, err)
3389

    
3390

    
3391
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
3392
  """Rename the file storage directory.
3393

3394
  @type old_file_storage_dir: str
3395
  @param old_file_storage_dir: the current path
3396
  @type new_file_storage_dir: str
3397
  @param new_file_storage_dir: the name we should rename to
3398
  @rtype: tuple (success,)
3399
  @return: tuple of one element, C{success}, denoting
3400
      whether the operation was successful
3401

3402
  """
3403
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
3404
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
3405
  if not os.path.exists(new_file_storage_dir):
3406
    if os.path.isdir(old_file_storage_dir):
3407
      try:
3408
        os.rename(old_file_storage_dir, new_file_storage_dir)
3409
      except OSError, err:
3410
        _Fail("Cannot rename '%s' to '%s': %s",
3411
              old_file_storage_dir, new_file_storage_dir, err)
3412
    else:
3413
      _Fail("Specified storage dir '%s' is not a directory",
3414
            old_file_storage_dir)
3415
  else:
3416
    if os.path.exists(old_file_storage_dir):
3417
      _Fail("Cannot rename '%s' to '%s': both locations exist",
3418
            old_file_storage_dir, new_file_storage_dir)
3419

    
3420

    
3421
def _EnsureJobQueueFile(file_name):
3422
  """Checks whether the given filename is in the queue directory.
3423

3424
  @type file_name: str
3425
  @param file_name: the file name we should check
3426
  @rtype: None
3427
  @raises RPCFail: if the file is not valid
3428

3429
  """
3430
  if not utils.IsBelowDir(pathutils.QUEUE_DIR, file_name):
3431
    _Fail("Passed job queue file '%s' does not belong to"
3432
          " the queue directory '%s'", file_name, pathutils.QUEUE_DIR)
3433

    
3434

    
3435
def JobQueueUpdate(file_name, content):
3436
  """Updates a file in the queue directory.
3437

3438
  This is just a wrapper over L{utils.io.WriteFile}, with proper
3439
  checking.
3440

3441
  @type file_name: str
3442
  @param file_name: the job file name
3443
  @type content: str
3444
  @param content: the new job contents
3445
  @rtype: boolean
3446
  @return: the success of the operation
3447

3448
  """
3449
  file_name = vcluster.LocalizeVirtualPath(file_name)
3450

    
3451
  _EnsureJobQueueFile(file_name)
3452
  getents = runtime.GetEnts()
3453

    
3454
  # Write and replace the file atomically
3455
  utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
3456
                  gid=getents.daemons_gid, mode=constants.JOB_QUEUE_FILES_PERMS)
3457

    
3458

    
3459
def JobQueueRename(old, new):
3460
  """Renames a job queue file.
3461

3462
  This is just a wrapper over os.rename with proper checking.
3463

3464
  @type old: str
3465
  @param old: the old (actual) file name
3466
  @type new: str
3467
  @param new: the desired file name
3468
  @rtype: tuple
3469
  @return: the success of the operation and payload
3470

3471
  """
3472
  old = vcluster.LocalizeVirtualPath(old)
3473
  new = vcluster.LocalizeVirtualPath(new)
3474

    
3475
  _EnsureJobQueueFile(old)
3476
  _EnsureJobQueueFile(new)
3477

    
3478
  getents = runtime.GetEnts()
3479

    
3480
  utils.RenameFile(old, new, mkdir=True, mkdir_mode=0750,
3481
                   dir_uid=getents.masterd_uid, dir_gid=getents.daemons_gid)
3482

    
3483

    
3484
def BlockdevClose(instance_name, disks):
3485
  """Closes the given block devices.
3486

3487
  This means they will be switched to secondary mode (in case of
3488
  DRBD).
3489

3490
  @param instance_name: if the argument is not empty, the symlinks
3491
      of this instance will be removed
3492
  @type disks: list of L{objects.Disk}
3493
  @param disks: the list of disks to be closed
3494
  @rtype: tuple (success, message)
3495
  @return: a tuple of success and message, where success
3496
      indicates the succes of the operation, and message
3497
      which will contain the error details in case we
3498
      failed
3499

3500
  """
3501
  bdevs = []
3502
  for cf in disks:
3503
    rd = _RecursiveFindBD(cf)
3504
    if rd is None:
3505
      _Fail("Can't find device %s", cf)
3506
    bdevs.append(rd)
3507

    
3508
  msg = []
3509
  for rd in bdevs:
3510
    try:
3511
      rd.Close()
3512
    except errors.BlockDeviceError, err:
3513
      msg.append(str(err))
3514
  if msg:
3515
    _Fail("Can't make devices secondary: %s", ",".join(msg))
3516
  else:
3517
    if instance_name:
3518
      _RemoveBlockDevLinks(instance_name, disks)
3519

    
3520

    
3521
def ValidateHVParams(hvname, hvparams):
3522
  """Validates the given hypervisor parameters.
3523

3524
  @type hvname: string
3525
  @param hvname: the hypervisor name
3526
  @type hvparams: dict
3527
  @param hvparams: the hypervisor parameters to be validated
3528
  @rtype: None
3529

3530
  """
3531
  try:
3532
    hv_type = hypervisor.GetHypervisor(hvname)
3533
    hv_type.ValidateParameters(hvparams)
3534
  except errors.HypervisorError, err:
3535
    _Fail(str(err), log=False)
3536

    
3537

    
3538
def _CheckOSPList(os_obj, parameters):
3539
  """Check whether a list of parameters is supported by the OS.
3540

3541
  @type os_obj: L{objects.OS}
3542
  @param os_obj: OS object to check
3543
  @type parameters: list
3544
  @param parameters: the list of parameters to check
3545

3546
  """
3547
  supported = [v[0] for v in os_obj.supported_parameters]
3548
  delta = frozenset(parameters).difference(supported)
3549
  if delta:
3550
    _Fail("The following parameters are not supported"
3551
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
3552

    
3553

    
3554
def ValidateOS(required, osname, checks, osparams):
3555
  """Validate the given OS' parameters.
3556

3557
  @type required: boolean
3558
  @param required: whether absence of the OS should translate into
3559
      failure or not
3560
  @type osname: string
3561
  @param osname: the OS to be validated
3562
  @type checks: list
3563
  @param checks: list of the checks to run (currently only 'parameters')
3564
  @type osparams: dict
3565
  @param osparams: dictionary with OS parameters
3566
  @rtype: boolean
3567
  @return: True if the validation passed, or False if the OS was not
3568
      found and L{required} was false
3569

3570
  """
3571
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
3572
    _Fail("Unknown checks required for OS %s: %s", osname,
3573
          set(checks).difference(constants.OS_VALIDATE_CALLS))
3574

    
3575
  name_only = objects.OS.GetName(osname)
3576
  status, tbv = _TryOSFromDisk(name_only, None)
3577

    
3578
  if not status:
3579
    if required:
3580
      _Fail(tbv)
3581
    else:
3582
      return False
3583

    
3584
  if max(tbv.api_versions) < constants.OS_API_V20:
3585
    return True
3586

    
3587
  if constants.OS_VALIDATE_PARAMETERS in checks:
3588
    _CheckOSPList(tbv, osparams.keys())
3589

    
3590
  validate_env = OSCoreEnv(osname, tbv, osparams)
3591
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
3592
                        cwd=tbv.path, reset_env=True)
3593
  if result.failed:
3594
    logging.error("os validate command '%s' returned error: %s output: %s",
3595
                  result.cmd, result.fail_reason, result.output)
3596
    _Fail("OS validation script failed (%s), output: %s",
3597
          result.fail_reason, result.output, log=False)
3598

    
3599
  return True
3600

    
3601

    
3602
def DemoteFromMC():
3603
  """Demotes the current node from master candidate role.
3604

3605
  """
3606
  # try to ensure we're not the master by mistake
3607
  master, myself = ssconf.GetMasterAndMyself()
3608
  if master == myself:
3609
    _Fail("ssconf status shows I'm the master node, will not demote")
3610

    
3611
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "check", constants.MASTERD])
3612
  if not result.failed:
3613
    _Fail("The master daemon is running, will not demote")
3614

    
3615
  try:
3616
    if os.path.isfile(pathutils.CLUSTER_CONF_FILE):
3617
      utils.CreateBackup(pathutils.CLUSTER_CONF_FILE)
3618
  except EnvironmentError, err:
3619
    if err.errno != errno.ENOENT:
3620
      _Fail("Error while backing up cluster file: %s", err, exc=True)
3621

    
3622
  utils.RemoveFile(pathutils.CLUSTER_CONF_FILE)
3623

    
3624

    
3625
def _GetX509Filenames(cryptodir, name):
3626
  """Returns the full paths for the private key and certificate.
3627

3628
  """
3629
  return (utils.PathJoin(cryptodir, name),
3630
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
3631
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
3632

    
3633

    
3634
def CreateX509Certificate(validity, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3635
  """Creates a new X509 certificate for SSL/TLS.
3636

3637
  @type validity: int
3638
  @param validity: Validity in seconds
3639
  @rtype: tuple; (string, string)
3640
  @return: Certificate name and public part
3641

3642
  """
3643
  (key_pem, cert_pem) = \
3644
    utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
3645
                                     min(validity, _MAX_SSL_CERT_VALIDITY))
3646

    
3647
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
3648
                              prefix="x509-%s-" % utils.TimestampForFilename())
3649
  try:
3650
    name = os.path.basename(cert_dir)
3651
    assert len(name) > 5
3652

    
3653
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3654

    
3655
    utils.WriteFile(key_file, mode=0400, data=key_pem)
3656
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
3657

    
3658
    # Never return private key as it shouldn't leave the node
3659
    return (name, cert_pem)
3660
  except Exception:
3661
    shutil.rmtree(cert_dir, ignore_errors=True)
3662
    raise
3663

    
3664

    
3665
def RemoveX509Certificate(name, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3666
  """Removes a X509 certificate.
3667

3668
  @type name: string
3669
  @param name: Certificate name
3670

3671
  """
3672
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3673

    
3674
  utils.RemoveFile(key_file)
3675
  utils.RemoveFile(cert_file)
3676

    
3677
  try:
3678
    os.rmdir(cert_dir)
3679
  except EnvironmentError, err:
3680
    _Fail("Cannot remove certificate directory '%s': %s",
3681
          cert_dir, err)
3682

    
3683

    
3684
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3685
  """Returns the command for the requested input/output.
3686

3687
  @type instance: L{objects.Instance}
3688
  @param instance: The instance object
3689
  @param mode: Import/export mode
3690
  @param ieio: Input/output type
3691
  @param ieargs: Input/output arguments
3692

3693
  """
3694
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3695

    
3696
  env = None
3697
  prefix = None
3698
  suffix = None
3699
  exp_size = None
3700

    
3701
  if ieio == constants.IEIO_FILE:
3702
    (filename, ) = ieargs
3703

    
3704
    if not utils.IsNormAbsPath(filename):
3705
      _Fail("Path '%s' is not normalized or absolute", filename)
3706

    
3707
    real_filename = os.path.realpath(filename)
3708
    directory = os.path.dirname(real_filename)
3709

    
3710
    if not utils.IsBelowDir(pathutils.EXPORT_DIR, real_filename):
3711
      _Fail("File '%s' is not under exports directory '%s': %s",
3712
            filename, pathutils.EXPORT_DIR, real_filename)
3713

    
3714
    # Create directory
3715
    utils.Makedirs(directory, mode=0750)
3716

    
3717
    quoted_filename = utils.ShellQuote(filename)
3718

    
3719
    if mode == constants.IEM_IMPORT:
3720
      suffix = "> %s" % quoted_filename
3721
    elif mode == constants.IEM_EXPORT:
3722
      suffix = "< %s" % quoted_filename
3723

    
3724
      # Retrieve file size
3725
      try:
3726
        st = os.stat(filename)
3727
      except EnvironmentError, err:
3728
        logging.error("Can't stat(2) %s: %s", filename, err)
3729
      else:
3730
        exp_size = utils.BytesToMebibyte(st.st_size)
3731

    
3732
  elif ieio == constants.IEIO_RAW_DISK:
3733
    (disk, ) = ieargs
3734

    
3735
    real_disk = _OpenRealBD(disk)
3736

    
3737
    if mode == constants.IEM_IMPORT:
3738
      # we use nocreat to fail if the device is not already there or we pass a
3739
      # wrong path; we use notrunc to no attempt truncate on an LV device
3740
      suffix = utils.BuildShellCmd("| dd of=%s conv=nocreat,notrunc bs=%s",
3741
                                   real_disk.dev_path,
3742
                                   str(1024 * 1024)) # 1 MB
3743

    
3744
    elif mode == constants.IEM_EXPORT:
3745
      # the block size on the read dd is 1MiB to match our units
3746
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3747
                                   real_disk.dev_path,
3748
                                   str(1024 * 1024), # 1 MB
3749
                                   str(disk.size))
3750
      exp_size = disk.size
3751

    
3752
  elif ieio == constants.IEIO_SCRIPT:
3753
    (disk, disk_index, ) = ieargs
3754

    
3755
    assert isinstance(disk_index, (int, long))
3756

    
3757
    inst_os = OSFromDisk(instance.os)
3758
    env = OSEnvironment(instance, inst_os)
3759

    
3760
    if mode == constants.IEM_IMPORT:
3761
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3762
      env["IMPORT_INDEX"] = str(disk_index)
3763
      script = inst_os.import_script
3764

    
3765
    elif mode == constants.IEM_EXPORT:
3766
      real_disk = _OpenRealBD(disk)
3767
      env["EXPORT_DEVICE"] = real_disk.dev_path
3768
      env["EXPORT_INDEX"] = str(disk_index)
3769
      script = inst_os.export_script
3770

    
3771
    # TODO: Pass special environment only to script
3772
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3773

    
3774
    if mode == constants.IEM_IMPORT:
3775
      suffix = "| %s" % script_cmd
3776

    
3777
    elif mode == constants.IEM_EXPORT:
3778
      prefix = "%s |" % script_cmd
3779

    
3780
    # Let script predict size
3781
    exp_size = constants.IE_CUSTOM_SIZE
3782

    
3783
  else:
3784
    _Fail("Invalid %s I/O mode %r", mode, ieio)
3785

    
3786
  return (env, prefix, suffix, exp_size)
3787

    
3788

    
3789
def _CreateImportExportStatusDir(prefix):
3790
  """Creates status directory for import/export.
3791

3792
  """
3793
  return tempfile.mkdtemp(dir=pathutils.IMPORT_EXPORT_DIR,
3794
                          prefix=("%s-%s-" %
3795
                                  (prefix, utils.TimestampForFilename())))
3796

    
3797

    
3798
def StartImportExportDaemon(mode, opts, host, port, instance, component,
3799
                            ieio, ieioargs):
3800
  """Starts an import or export daemon.
3801

3802
  @param mode: Import/output mode
3803
  @type opts: L{objects.ImportExportOptions}
3804
  @param opts: Daemon options
3805
  @type host: string
3806
  @param host: Remote host for export (None for import)
3807
  @type port: int
3808
  @param port: Remote port for export (None for import)
3809
  @type instance: L{objects.Instance}
3810
  @param instance: Instance object
3811
  @type component: string
3812
  @param component: which part of the instance is transferred now,
3813
      e.g. 'disk/0'
3814
  @param ieio: Input/output type
3815
  @param ieioargs: Input/output arguments
3816

3817
  """
3818
  if mode == constants.IEM_IMPORT:
3819
    prefix = "import"
3820

    
3821
    if not (host is None and port is None):
3822
      _Fail("Can not specify host or port on import")
3823

    
3824
  elif mode == constants.IEM_EXPORT:
3825
    prefix = "export"
3826

    
3827
    if host is None or port is None:
3828
      _Fail("Host and port must be specified for an export")
3829

    
3830
  else:
3831
    _Fail("Invalid mode %r", mode)
3832

    
3833
  if (opts.key_name is None) ^ (opts.ca_pem is None):
3834
    _Fail("Cluster certificate can only be used for both key and CA")
3835

    
3836
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3837
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3838

    
3839
  if opts.key_name is None:
3840
    # Use server.pem
3841
    key_path = pathutils.NODED_CERT_FILE
3842
    cert_path = pathutils.NODED_CERT_FILE
3843
    assert opts.ca_pem is None
3844
  else:
3845
    (_, key_path, cert_path) = _GetX509Filenames(pathutils.CRYPTO_KEYS_DIR,
3846
                                                 opts.key_name)
3847
    assert opts.ca_pem is not None
3848

    
3849
  for i in [key_path, cert_path]:
3850
    if not os.path.exists(i):
3851
      _Fail("File '%s' does not exist" % i)
3852

    
3853
  status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3854
  try:
3855
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3856
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3857
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3858

    
3859
    if opts.ca_pem is None:
3860
      # Use server.pem
3861
      ca = utils.ReadFile(pathutils.NODED_CERT_FILE)
3862
    else:
3863
      ca = opts.ca_pem
3864

    
3865
    # Write CA file
3866
    utils.WriteFile(ca_file, data=ca, mode=0400)
3867

    
3868
    cmd = [
3869
      pathutils.IMPORT_EXPORT_DAEMON,
3870
      status_file, mode,
3871
      "--key=%s" % key_path,
3872
      "--cert=%s" % cert_path,
3873
      "--ca=%s" % ca_file,
3874
      ]
3875

    
3876
    if host:
3877
      cmd.append("--host=%s" % host)
3878

    
3879
    if port:
3880
      cmd.append("--port=%s" % port)
3881

    
3882
    if opts.ipv6:
3883
      cmd.append("--ipv6")
3884
    else:
3885
      cmd.append("--ipv4")
3886

    
3887
    if opts.compress:
3888
      cmd.append("--compress=%s" % opts.compress)
3889

    
3890
    if opts.magic:
3891
      cmd.append("--magic=%s" % opts.magic)
3892

    
3893
    if exp_size is not None:
3894
      cmd.append("--expected-size=%s" % exp_size)
3895

    
3896
    if cmd_prefix:
3897
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
3898

    
3899
    if cmd_suffix:
3900
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
3901

    
3902
    if mode == constants.IEM_EXPORT:
3903
      # Retry connection a few times when connecting to remote peer
3904
      cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3905
      cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3906
    elif opts.connect_timeout is not None:
3907
      assert mode == constants.IEM_IMPORT
3908
      # Overall timeout for establishing connection while listening
3909
      cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3910

    
3911
    logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3912

    
3913
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3914
    # support for receiving a file descriptor for output
3915
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3916
                      output=logfile)
3917

    
3918
    # The import/export name is simply the status directory name
3919
    return os.path.basename(status_dir)
3920

    
3921
  except Exception:
3922
    shutil.rmtree(status_dir, ignore_errors=True)
3923
    raise
3924

    
3925

    
3926
def GetImportExportStatus(names):
3927
  """Returns import/export daemon status.
3928

3929
  @type names: sequence
3930
  @param names: List of names
3931
  @rtype: List of dicts
3932
  @return: Returns a list of the state of each named import/export or None if a
3933
           status couldn't be read
3934

3935
  """
3936
  result = []
3937

    
3938
  for name in names:
3939
    status_file = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name,
3940
                                 _IES_STATUS_FILE)
3941

    
3942
    try:
3943
      data = utils.ReadFile(status_file)
3944
    except EnvironmentError, err:
3945
      if err.errno != errno.ENOENT:
3946
        raise
3947
      data = None
3948

    
3949
    if not data:
3950
      result.append(None)
3951
      continue
3952

    
3953
    result.append(serializer.LoadJson(data))
3954

    
3955
  return result
3956

    
3957

    
3958
def AbortImportExport(name):
3959
  """Sends SIGTERM to a running import/export daemon.
3960

3961
  """
3962
  logging.info("Abort import/export %s", name)
3963

    
3964
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3965
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3966

    
3967
  if pid:
3968
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3969
                 name, pid)
3970
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3971

    
3972

    
3973
def CleanupImportExport(name):
3974
  """Cleanup after an import or export.
3975

3976
  If the import/export daemon is still running it's killed. Afterwards the
3977
  whole status directory is removed.
3978

3979
  """
3980
  logging.info("Finalizing import/export %s", name)
3981

    
3982
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3983

    
3984
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3985

    
3986
  if pid:
3987
    logging.info("Import/export %s is still running with PID %s",
3988
                 name, pid)
3989
    utils.KillProcess(pid, waitpid=False)
3990

    
3991
  shutil.rmtree(status_dir, ignore_errors=True)
3992

    
3993

    
3994
def _FindDisks(disks):
3995
  """Finds attached L{BlockDev}s for the given disks.
3996

3997
  @type disks: list of L{objects.Disk}
3998
  @param disks: the disk objects we need to find
3999

4000
  @return: list of L{BlockDev} objects or C{None} if a given disk
4001
           was not found or was no attached.
4002

4003
  """
4004
  bdevs = []
4005

    
4006
  for disk in disks:
4007
    rd = _RecursiveFindBD(disk)
4008
    if rd is None:
4009
      _Fail("Can't find device %s", disk)
4010
    bdevs.append(rd)
4011
  return bdevs
4012

    
4013

    
4014
def DrbdDisconnectNet(disks):
4015
  """Disconnects the network on a list of drbd devices.
4016

4017
  """
4018
  bdevs = _FindDisks(disks)
4019

    
4020
  # disconnect disks
4021
  for rd in bdevs:
4022
    try:
4023
      rd.DisconnectNet()
4024
    except errors.BlockDeviceError, err:
4025
      _Fail("Can't change network configuration to standalone mode: %s",
4026
            err, exc=True)
4027

    
4028

    
4029
def DrbdAttachNet(disks, instance_name, multimaster):
4030
  """Attaches the network on a list of drbd devices.
4031

4032
  """
4033
  bdevs = _FindDisks(disks)
4034

    
4035
  if multimaster:
4036
    for idx, rd in enumerate(bdevs):
4037
      try:
4038
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
4039
      except EnvironmentError, err:
4040
        _Fail("Can't create symlink: %s", err)
4041
  # reconnect disks, switch to new master configuration and if
4042
  # needed primary mode
4043
  for rd in bdevs:
4044
    try:
4045
      rd.AttachNet(multimaster)
4046
    except errors.BlockDeviceError, err:
4047
      _Fail("Can't change network configuration: %s", err)
4048

    
4049
  # wait until the disks are connected; we need to retry the re-attach
4050
  # if the device becomes standalone, as this might happen if the one
4051
  # node disconnects and reconnects in a different mode before the
4052
  # other node reconnects; in this case, one or both of the nodes will
4053
  # decide it has wrong configuration and switch to standalone
4054

    
4055
  def _Attach():
4056
    all_connected = True
4057

    
4058
    for rd in bdevs:
4059
      stats = rd.GetProcStatus()
4060

    
4061
      if multimaster:
4062
        # In the multimaster case we have to wait explicitly until
4063
        # the resource is Connected and UpToDate/UpToDate, because
4064
        # we promote *both nodes* to primary directly afterwards.
4065
        # Being in resync is not enough, since there is a race during which we
4066
        # may promote a node with an Outdated disk to primary, effectively
4067
        # tearing down the connection.
4068
        all_connected = (all_connected and
4069
                         stats.is_connected and
4070
                         stats.is_disk_uptodate and
4071
                         stats.peer_disk_uptodate)
4072
      else:
4073
        all_connected = (all_connected and
4074
                         (stats.is_connected or stats.is_in_resync))
4075

    
4076
      if stats.is_standalone:
4077
        # peer had different config info and this node became
4078
        # standalone, even though this should not happen with the
4079
        # new staged way of changing disk configs
4080
        try:
4081
          rd.AttachNet(multimaster)
4082
        except errors.BlockDeviceError, err:
4083
          _Fail("Can't change network configuration: %s", err)
4084

    
4085
    if not all_connected:
4086
      raise utils.RetryAgain()
4087

    
4088
  try:
4089
    # Start with a delay of 100 miliseconds and go up to 5 seconds
4090
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
4091
  except utils.RetryTimeout:
4092
    _Fail("Timeout in disk reconnecting")
4093

    
4094
  if multimaster:
4095
    # change to primary mode
4096
    for rd in bdevs:
4097
      try:
4098
        rd.Open()
4099
      except errors.BlockDeviceError, err:
4100
        _Fail("Can't change to primary mode: %s", err)
4101

    
4102

    
4103
def DrbdWaitSync(disks):
4104
  """Wait until DRBDs have synchronized.
4105

4106
  """
4107
  def _helper(rd):
4108
    stats = rd.GetProcStatus()
4109
    if not (stats.is_connected or stats.is_in_resync):
4110
      raise utils.RetryAgain()
4111
    return stats
4112

    
4113
  bdevs = _FindDisks(disks)
4114

    
4115
  min_resync = 100
4116
  alldone = True
4117
  for rd in bdevs:
4118
    try:
4119
      # poll each second for 15 seconds
4120
      stats = utils.Retry(_helper, 1, 15, args=[rd])
4121
    except utils.RetryTimeout:
4122
      stats = rd.GetProcStatus()
4123
      # last check
4124
      if not (stats.is_connected or stats.is_in_resync):
4125
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
4126
    alldone = alldone and (not stats.is_in_resync)
4127
    if stats.sync_percent is not None:
4128
      min_resync = min(min_resync, stats.sync_percent)
4129

    
4130
  return (alldone, min_resync)
4131

    
4132

    
4133
def DrbdNeedsActivation(disks):
4134
  """Checks which of the passed disks needs activation and returns their UUIDs.
4135

4136
  """
4137
  faulty_disks = []
4138

    
4139
  for disk in disks:
4140
    rd = _RecursiveFindBD(disk)
4141
    if rd is None:
4142
      faulty_disks.append(disk)
4143
      continue
4144

    
4145
    stats = rd.GetProcStatus()
4146
    if stats.is_standalone or stats.is_diskless:
4147
      faulty_disks.append(disk)
4148

    
4149
  return [disk.uuid for disk in faulty_disks]
4150

    
4151

    
4152
def GetDrbdUsermodeHelper():
4153
  """Returns DRBD usermode helper currently configured.
4154

4155
  """
4156
  try:
4157
    return drbd.DRBD8.GetUsermodeHelper()
4158
  except errors.BlockDeviceError, err:
4159
    _Fail(str(err))
4160

    
4161

    
4162
def PowercycleNode(hypervisor_type, hvparams=None):
4163
  """Hard-powercycle the node.
4164

4165
  Because we need to return first, and schedule the powercycle in the
4166
  background, we won't be able to report failures nicely.
4167

4168
  """
4169
  hyper = hypervisor.GetHypervisor(hypervisor_type)
4170
  try:
4171
    pid = os.fork()
4172
  except OSError:
4173
    # if we can't fork, we'll pretend that we're in the child process
4174
    pid = 0
4175
  if pid > 0:
4176
    return "Reboot scheduled in 5 seconds"
4177
  # ensure the child is running on ram
4178
  try:
4179
    utils.Mlockall()
4180
  except Exception: # pylint: disable=W0703
4181
    pass
4182
  time.sleep(5)
4183
  hyper.PowercycleNode(hvparams=hvparams)
4184

    
4185

    
4186
def _VerifyRestrictedCmdName(cmd):
4187
  """Verifies a restricted command name.
4188

4189
  @type cmd: string
4190
  @param cmd: Command name
4191
  @rtype: tuple; (boolean, string or None)
4192
  @return: The tuple's first element is the status; if C{False}, the second
4193
    element is an error message string, otherwise it's C{None}
4194

4195
  """
4196
  if not cmd.strip():
4197
    return (False, "Missing command name")
4198

    
4199
  if os.path.basename(cmd) != cmd:
4200
    return (False, "Invalid command name")
4201

    
4202
  if not constants.EXT_PLUGIN_MASK.match(cmd):
4203
    return (False, "Command name contains forbidden characters")
4204

    
4205
  return (True, None)
4206

    
4207

    
4208
def _CommonRestrictedCmdCheck(path, owner):
4209
  """Common checks for restricted command file system directories and files.
4210

4211
  @type path: string
4212
  @param path: Path to check
4213
  @param owner: C{None} or tuple containing UID and GID
4214
  @rtype: tuple; (boolean, string or C{os.stat} result)
4215
  @return: The tuple's first element is the status; if C{False}, the second
4216
    element is an error message string, otherwise it's the result of C{os.stat}
4217

4218
  """
4219
  if owner is None:
4220
    # Default to root as owner
4221
    owner = (0, 0)
4222

    
4223
  try:
4224
    st = os.stat(path)
4225
  except EnvironmentError, err:
4226
    return (False, "Can't stat(2) '%s': %s" % (path, err))
4227

    
4228
  if stat.S_IMODE(st.st_mode) & (~_RCMD_MAX_MODE):
4229
    return (False, "Permissions on '%s' are too permissive" % path)
4230

    
4231
  if (st.st_uid, st.st_gid) != owner:
4232
    (owner_uid, owner_gid) = owner
4233
    return (False, "'%s' is not owned by %s:%s" % (path, owner_uid, owner_gid))
4234

    
4235
  return (True, st)
4236

    
4237

    
4238
def _VerifyRestrictedCmdDirectory(path, _owner=None):
4239
  """Verifies restricted command directory.
4240

4241
  @type path: string
4242
  @param path: Path to check
4243
  @rtype: tuple; (boolean, string or None)
4244
  @return: The tuple's first element is the status; if C{False}, the second
4245
    element is an error message string, otherwise it's C{None}
4246

4247
  """
4248
  (status, value) = _CommonRestrictedCmdCheck(path, _owner)
4249

    
4250
  if not status:
4251
    return (False, value)
4252

    
4253
  if not stat.S_ISDIR(value.st_mode):
4254
    return (False, "Path '%s' is not a directory" % path)
4255

    
4256
  return (True, None)
4257

    
4258

    
4259
def _VerifyRestrictedCmd(path, cmd, _owner=None):
4260
  """Verifies a whole restricted command and returns its executable filename.
4261

4262
  @type path: string
4263
  @param path: Directory containing restricted commands
4264
  @type cmd: string
4265
  @param cmd: Command name
4266
  @rtype: tuple; (boolean, string)
4267
  @return: The tuple's first element is the status; if C{False}, the second
4268
    element is an error message string, otherwise the second element is the
4269
    absolute path to the executable
4270

4271
  """
4272
  executable = utils.PathJoin(path, cmd)
4273

    
4274
  (status, msg) = _CommonRestrictedCmdCheck(executable, _owner)
4275

    
4276
  if not status:
4277
    return (False, msg)
4278

    
4279
  if not utils.IsExecutable(executable):
4280
    return (False, "access(2) thinks '%s' can't be executed" % executable)
4281

    
4282
  return (True, executable)
4283

    
4284

    
4285
def _PrepareRestrictedCmd(path, cmd,
4286
                          _verify_dir=_VerifyRestrictedCmdDirectory,
4287
                          _verify_name=_VerifyRestrictedCmdName,
4288
                          _verify_cmd=_VerifyRestrictedCmd):
4289
  """Performs a number of tests on a restricted command.
4290

4291
  @type path: string
4292
  @param path: Directory containing restricted commands
4293
  @type cmd: string
4294
  @param cmd: Command name
4295
  @return: Same as L{_VerifyRestrictedCmd}
4296

4297
  """
4298
  # Verify the directory first
4299
  (status, msg) = _verify_dir(path)
4300
  if status:
4301
    # Check command if everything was alright
4302
    (status, msg) = _verify_name(cmd)
4303

    
4304
  if not status:
4305
    return (False, msg)
4306

    
4307
  # Check actual executable
4308
  return _verify_cmd(path, cmd)
4309

    
4310

    
4311
def RunRestrictedCmd(cmd,
4312
                     _lock_timeout=_RCMD_LOCK_TIMEOUT,
4313
                     _lock_file=pathutils.RESTRICTED_COMMANDS_LOCK_FILE,
4314
                     _path=pathutils.RESTRICTED_COMMANDS_DIR,
4315
                     _sleep_fn=time.sleep,
4316
                     _prepare_fn=_PrepareRestrictedCmd,
4317
                     _runcmd_fn=utils.RunCmd,
4318
                     _enabled=constants.ENABLE_RESTRICTED_COMMANDS):
4319
  """Executes a restricted command after performing strict tests.
4320

4321
  @type cmd: string
4322
  @param cmd: Command name
4323
  @rtype: string
4324
  @return: Command output
4325
  @raise RPCFail: In case of an error
4326

4327
  """
4328
  logging.info("Preparing to run restricted command '%s'", cmd)
4329

    
4330
  if not _enabled:
4331
    _Fail("Restricted commands disabled at configure time")
4332

    
4333
  lock = None
4334
  try:
4335
    cmdresult = None
4336
    try:
4337
      lock = utils.FileLock.Open(_lock_file)
4338
      lock.Exclusive(blocking=True, timeout=_lock_timeout)
4339

    
4340
      (status, value) = _prepare_fn(_path, cmd)
4341

    
4342
      if status:
4343
        cmdresult = _runcmd_fn([value], env={}, reset_env=True,
4344
                               postfork_fn=lambda _: lock.Unlock())
4345
      else:
4346
        logging.error(value)
4347
    except Exception: # pylint: disable=W0703
4348
      # Keep original error in log
4349
      logging.exception("Caught exception")
4350

    
4351
    if cmdresult is None:
4352
      logging.info("Sleeping for %0.1f seconds before returning",
4353
                   _RCMD_INVALID_DELAY)
4354
      _sleep_fn(_RCMD_INVALID_DELAY)
4355

    
4356
      # Do not include original error message in returned error
4357
      _Fail("Executing command '%s' failed" % cmd)
4358
    elif cmdresult.failed or cmdresult.fail_reason:
4359
      _Fail("Restricted command '%s' failed: %s; output: %s",
4360
            cmd, cmdresult.fail_reason, cmdresult.output)
4361
    else:
4362
      return cmdresult.output
4363
  finally:
4364
    if lock is not None:
4365
      # Release lock at last
4366
      lock.Close()
4367
      lock = None
4368

    
4369

    
4370
def SetWatcherPause(until, _filename=pathutils.WATCHER_PAUSEFILE):
4371
  """Creates or removes the watcher pause file.
4372

4373
  @type until: None or number
4374
  @param until: Unix timestamp saying until when the watcher shouldn't run
4375

4376
  """
4377
  if until is None:
4378
    logging.info("Received request to no longer pause watcher")
4379
    utils.RemoveFile(_filename)
4380
  else:
4381
    logging.info("Received request to pause watcher until %s", until)
4382

    
4383
    if not ht.TNumber(until):
4384
      _Fail("Duration must be numeric")
4385

    
4386
    utils.WriteFile(_filename, data="%d\n" % (until, ), mode=0644)
4387

    
4388

    
4389
def ConfigureOVS(ovs_name, ovs_link):
4390
  """Creates a OpenvSwitch on the node.
4391

4392
  This function sets up a OpenvSwitch on the node with given name nad
4393
  connects it via a given eth device.
4394

4395
  @type ovs_name: string
4396
  @param ovs_name: Name of the OpenvSwitch to create.
4397
  @type ovs_link: None or string
4398
  @param ovs_link: Ethernet device for outside connection (can be missing)
4399

4400
  """
4401
  # Initialize the OpenvSwitch
4402
  result = utils.RunCmd(["ovs-vsctl", "add-br", ovs_name])
4403
  if result.failed:
4404
    _Fail("Failed to create openvswitch. Script return value: %s, output: '%s'"
4405
          % (result.exit_code, result.output), log=True)
4406

    
4407
  # And connect it to a physical interface, if given
4408
  if ovs_link:
4409
    result = utils.RunCmd(["ovs-vsctl", "add-port", ovs_name, ovs_link])
4410
    if result.failed:
4411
      _Fail("Failed to connect openvswitch to  interface %s. Script return"
4412
            " value: %s, output: '%s'" % (ovs_link, result.exit_code,
4413
            result.output), log=True)
4414

    
4415

    
4416
class HooksRunner(object):
4417
  """Hook runner.
4418

4419
  This class is instantiated on the node side (ganeti-noded) and not
4420
  on the master side.
4421

4422
  """
4423
  def __init__(self, hooks_base_dir=None):
4424
    """Constructor for hooks runner.
4425

4426
    @type hooks_base_dir: str or None
4427
    @param hooks_base_dir: if not None, this overrides the
4428
        L{pathutils.HOOKS_BASE_DIR} (useful for unittests)
4429

4430
    """
4431
    if hooks_base_dir is None:
4432
      hooks_base_dir = pathutils.HOOKS_BASE_DIR
4433
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
4434
    # constant
4435
    self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
4436

    
4437
  def RunLocalHooks(self, node_list, hpath, phase, env):
4438
    """Check that the hooks will be run only locally and then run them.
4439

4440
    """
4441
    assert len(node_list) == 1
4442
    node = node_list[0]
4443
    _, myself = ssconf.GetMasterAndMyself()
4444
    assert node == myself
4445

    
4446
    results = self.RunHooks(hpath, phase, env)
4447

    
4448
    # Return values in the form expected by HooksMaster
4449
    return {node: (None, False, results)}
4450

    
4451
  def RunHooks(self, hpath, phase, env):
4452
    """Run the scripts in the hooks directory.
4453

4454
    @type hpath: str
4455
    @param hpath: the path to the hooks directory which
4456
        holds the scripts
4457
    @type phase: str
4458
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
4459
        L{constants.HOOKS_PHASE_POST}
4460
    @type env: dict
4461
    @param env: dictionary with the environment for the hook
4462
    @rtype: list
4463
    @return: list of 3-element tuples:
4464
      - script path
4465
      - script result, either L{constants.HKR_SUCCESS} or
4466
        L{constants.HKR_FAIL}
4467
      - output of the script
4468

4469
    @raise errors.ProgrammerError: for invalid input
4470
        parameters
4471

4472
    """
4473
    if phase == constants.HOOKS_PHASE_PRE:
4474
      suffix = "pre"
4475
    elif phase == constants.HOOKS_PHASE_POST:
4476
      suffix = "post"
4477
    else:
4478
      _Fail("Unknown hooks phase '%s'", phase)
4479

    
4480
    subdir = "%s-%s.d" % (hpath, suffix)
4481
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
4482

    
4483
    results = []
4484

    
4485
    if not os.path.isdir(dir_name):
4486
      # for non-existing/non-dirs, we simply exit instead of logging a
4487
      # warning at every operation
4488
      return results
4489

    
4490
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
4491

    
4492
    for (relname, relstatus, runresult) in runparts_results:
4493
      if relstatus == constants.RUNPARTS_SKIP:
4494
        rrval = constants.HKR_SKIP
4495
        output = ""
4496
      elif relstatus == constants.RUNPARTS_ERR:
4497
        rrval = constants.HKR_FAIL
4498
        output = "Hook script execution error: %s" % runresult
4499
      elif relstatus == constants.RUNPARTS_RUN:
4500
        if runresult.failed:
4501
          rrval = constants.HKR_FAIL
4502
        else:
4503
          rrval = constants.HKR_SUCCESS
4504
        output = utils.SafeEncode(runresult.output.strip())
4505
      results.append(("%s/%s" % (subdir, relname), rrval, output))
4506

    
4507
    return results
4508

    
4509

    
4510
class IAllocatorRunner(object):
4511
  """IAllocator runner.
4512

4513
  This class is instantiated on the node side (ganeti-noded) and not on
4514
  the master side.
4515

4516
  """
4517
  @staticmethod
4518
  def Run(name, idata, ial_params):
4519
    """Run an iallocator script.
4520

4521
    @type name: str
4522
    @param name: the iallocator script name
4523
    @type idata: str
4524
    @param idata: the allocator input data
4525
    @type ial_params: list
4526
    @param ial_params: the iallocator parameters
4527

4528
    @rtype: tuple
4529
    @return: two element tuple of:
4530
       - status
4531
       - either error message or stdout of allocator (for success)
4532

4533
    """
4534
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
4535
                                  os.path.isfile)
4536
    if alloc_script is None:
4537
      _Fail("iallocator module '%s' not found in the search path", name)
4538

    
4539
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
4540
    try:
4541
      os.write(fd, idata)
4542
      os.close(fd)
4543
      result = utils.RunCmd([alloc_script, fin_name] + ial_params)
4544
      if result.failed:
4545
        _Fail("iallocator module '%s' failed: %s, output '%s'",
4546
              name, result.fail_reason, result.output)
4547
    finally:
4548
      os.unlink(fin_name)
4549

    
4550
    return result.stdout
4551

    
4552

    
4553
class DevCacheManager(object):
4554
  """Simple class for managing a cache of block device information.
4555

4556
  """
4557
  _DEV_PREFIX = "/dev/"
4558
  _ROOT_DIR = pathutils.BDEV_CACHE_DIR
4559

    
4560
  @classmethod
4561
  def _ConvertPath(cls, dev_path):
4562
    """Converts a /dev/name path to the cache file name.
4563

4564
    This replaces slashes with underscores and strips the /dev
4565
    prefix. It then returns the full path to the cache file.
4566

4567
    @type dev_path: str
4568
    @param dev_path: the C{/dev/} path name
4569
    @rtype: str
4570
    @return: the converted path name
4571

4572
    """
4573
    if dev_path.startswith(cls._DEV_PREFIX):
4574
      dev_path = dev_path[len(cls._DEV_PREFIX):]
4575
    dev_path = dev_path.replace("/", "_")
4576
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
4577
    return fpath
4578

    
4579
  @classmethod
4580
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
4581
    """Updates the cache information for a given device.
4582

4583
    @type dev_path: str
4584
    @param dev_path: the pathname of the device
4585
    @type owner: str
4586
    @param owner: the owner (instance name) of the device
4587
    @type on_primary: bool
4588
    @param on_primary: whether this is the primary
4589
        node nor not
4590
    @type iv_name: str
4591
    @param iv_name: the instance-visible name of the
4592
        device, as in objects.Disk.iv_name
4593

4594
    @rtype: None
4595

4596
    """
4597
    if dev_path is None:
4598
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
4599
      return
4600
    fpath = cls._ConvertPath(dev_path)
4601
    if on_primary:
4602
      state = "primary"
4603
    else:
4604
      state = "secondary"
4605
    if iv_name is None:
4606
      iv_name = "not_visible"
4607
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
4608
    try:
4609
      utils.WriteFile(fpath, data=fdata)
4610
    except EnvironmentError, err:
4611
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
4612

    
4613
  @classmethod
4614
  def RemoveCache(cls, dev_path):
4615
    """Remove data for a dev_path.
4616

4617
    This is just a wrapper over L{utils.io.RemoveFile} with a converted
4618
    path name and logging.
4619

4620
    @type dev_path: str
4621
    @param dev_path: the pathname of the device
4622

4623
    @rtype: None
4624

4625
    """
4626
    if dev_path is None:
4627
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
4628
      return
4629
    fpath = cls._ConvertPath(dev_path)
4630
    try:
4631
      utils.RemoveFile(fpath)
4632
    except EnvironmentError, err:
4633
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)