Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ fc6ccde4

History | View | Annotate | Download (145.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable=E1103,C0302
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37
# C0302: This module has become too big and should be split up
38

    
39

    
40
import os
41
import os.path
42
import shutil
43
import time
44
import stat
45
import errno
46
import re
47
import random
48
import logging
49
import tempfile
50
import zlib
51
import base64
52
import signal
53

    
54
from ganeti import errors
55
from ganeti import utils
56
from ganeti import ssh
57
from ganeti import hypervisor
58
from ganeti import constants
59
from ganeti.storage import bdev
60
from ganeti.storage import drbd
61
from ganeti.storage import filestorage
62
from ganeti import objects
63
from ganeti import ssconf
64
from ganeti import serializer
65
from ganeti import netutils
66
from ganeti import runtime
67
from ganeti import compat
68
from ganeti import pathutils
69
from ganeti import vcluster
70
from ganeti import ht
71
from ganeti.storage.base import BlockDev
72
from ganeti.storage.drbd import DRBD8
73
from ganeti import hooksmaster
74

    
75

    
76
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
77
_ALLOWED_CLEAN_DIRS = compat.UniqueFrozenset([
78
  pathutils.DATA_DIR,
79
  pathutils.JOB_QUEUE_ARCHIVE_DIR,
80
  pathutils.QUEUE_DIR,
81
  pathutils.CRYPTO_KEYS_DIR,
82
  ])
83
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
84
_X509_KEY_FILE = "key"
85
_X509_CERT_FILE = "cert"
86
_IES_STATUS_FILE = "status"
87
_IES_PID_FILE = "pid"
88
_IES_CA_FILE = "ca"
89

    
90
#: Valid LVS output line regex
91
_LVSLINE_REGEX = re.compile(r"^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$")
92

    
93
# Actions for the master setup script
94
_MASTER_START = "start"
95
_MASTER_STOP = "stop"
96

    
97
#: Maximum file permissions for restricted command directory and executables
98
_RCMD_MAX_MODE = (stat.S_IRWXU |
99
                  stat.S_IRGRP | stat.S_IXGRP |
100
                  stat.S_IROTH | stat.S_IXOTH)
101

    
102
#: Delay before returning an error for restricted commands
103
_RCMD_INVALID_DELAY = 10
104

    
105
#: How long to wait to acquire lock for restricted commands (shorter than
106
#: L{_RCMD_INVALID_DELAY}) to reduce blockage of noded forks when many
107
#: command requests arrive
108
_RCMD_LOCK_TIMEOUT = _RCMD_INVALID_DELAY * 0.8
109

    
110

    
111
class RPCFail(Exception):
112
  """Class denoting RPC failure.
113

114
  Its argument is the error message.
115

116
  """
117

    
118

    
119
def _GetInstReasonFilename(instance_name):
120
  """Path of the file containing the reason of the instance status change.
121

122
  @type instance_name: string
123
  @param instance_name: The name of the instance
124
  @rtype: string
125
  @return: The path of the file
126

127
  """
128
  return utils.PathJoin(pathutils.INSTANCE_REASON_DIR, instance_name)
129

    
130

    
131
def _StoreInstReasonTrail(instance_name, trail):
132
  """Serialize a reason trail related to an instance change of state to file.
133

134
  The exact location of the file depends on the name of the instance and on
135
  the configuration of the Ganeti cluster defined at deploy time.
136

137
  @type instance_name: string
138
  @param instance_name: The name of the instance
139
  @rtype: None
140

141
  """
142
  json = serializer.DumpJson(trail)
143
  filename = _GetInstReasonFilename(instance_name)
144
  utils.WriteFile(filename, data=json)
145

    
146

    
147
def _Fail(msg, *args, **kwargs):
148
  """Log an error and the raise an RPCFail exception.
149

150
  This exception is then handled specially in the ganeti daemon and
151
  turned into a 'failed' return type. As such, this function is a
152
  useful shortcut for logging the error and returning it to the master
153
  daemon.
154

155
  @type msg: string
156
  @param msg: the text of the exception
157
  @raise RPCFail
158

159
  """
160
  if args:
161
    msg = msg % args
162
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
163
    if "exc" in kwargs and kwargs["exc"]:
164
      logging.exception(msg)
165
    else:
166
      logging.error(msg)
167
  raise RPCFail(msg)
168

    
169

    
170
def _GetConfig():
171
  """Simple wrapper to return a SimpleStore.
172

173
  @rtype: L{ssconf.SimpleStore}
174
  @return: a SimpleStore instance
175

176
  """
177
  return ssconf.SimpleStore()
178

    
179

    
180
def _GetSshRunner(cluster_name):
181
  """Simple wrapper to return an SshRunner.
182

183
  @type cluster_name: str
184
  @param cluster_name: the cluster name, which is needed
185
      by the SshRunner constructor
186
  @rtype: L{ssh.SshRunner}
187
  @return: an SshRunner instance
188

189
  """
190
  return ssh.SshRunner(cluster_name)
191

    
192

    
193
def _Decompress(data):
194
  """Unpacks data compressed by the RPC client.
195

196
  @type data: list or tuple
197
  @param data: Data sent by RPC client
198
  @rtype: str
199
  @return: Decompressed data
200

201
  """
202
  assert isinstance(data, (list, tuple))
203
  assert len(data) == 2
204
  (encoding, content) = data
205
  if encoding == constants.RPC_ENCODING_NONE:
206
    return content
207
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
208
    return zlib.decompress(base64.b64decode(content))
209
  else:
210
    raise AssertionError("Unknown data encoding")
211

    
212

    
213
def _CleanDirectory(path, exclude=None):
214
  """Removes all regular files in a directory.
215

216
  @type path: str
217
  @param path: the directory to clean
218
  @type exclude: list
219
  @param exclude: list of files to be excluded, defaults
220
      to the empty list
221

222
  """
223
  if path not in _ALLOWED_CLEAN_DIRS:
224
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
225
          path)
226

    
227
  if not os.path.isdir(path):
228
    return
229
  if exclude is None:
230
    exclude = []
231
  else:
232
    # Normalize excluded paths
233
    exclude = [os.path.normpath(i) for i in exclude]
234

    
235
  for rel_name in utils.ListVisibleFiles(path):
236
    full_name = utils.PathJoin(path, rel_name)
237
    if full_name in exclude:
238
      continue
239
    if os.path.isfile(full_name) and not os.path.islink(full_name):
240
      utils.RemoveFile(full_name)
241

    
242

    
243
def _BuildUploadFileList():
244
  """Build the list of allowed upload files.
245

246
  This is abstracted so that it's built only once at module import time.
247

248
  """
249
  allowed_files = set([
250
    pathutils.CLUSTER_CONF_FILE,
251
    pathutils.ETC_HOSTS,
252
    pathutils.SSH_KNOWN_HOSTS_FILE,
253
    pathutils.VNC_PASSWORD_FILE,
254
    pathutils.RAPI_CERT_FILE,
255
    pathutils.SPICE_CERT_FILE,
256
    pathutils.SPICE_CACERT_FILE,
257
    pathutils.RAPI_USERS_FILE,
258
    pathutils.CONFD_HMAC_KEY,
259
    pathutils.CLUSTER_DOMAIN_SECRET_FILE,
260
    ])
261

    
262
  for hv_name in constants.HYPER_TYPES:
263
    hv_class = hypervisor.GetHypervisorClass(hv_name)
264
    allowed_files.update(hv_class.GetAncillaryFiles()[0])
265

    
266
  assert pathutils.FILE_STORAGE_PATHS_FILE not in allowed_files, \
267
    "Allowed file storage paths should never be uploaded via RPC"
268

    
269
  return frozenset(allowed_files)
270

    
271

    
272
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
273

    
274

    
275
def JobQueuePurge():
276
  """Removes job queue files and archived jobs.
277

278
  @rtype: tuple
279
  @return: True, None
280

281
  """
282
  _CleanDirectory(pathutils.QUEUE_DIR, exclude=[pathutils.JOB_QUEUE_LOCK_FILE])
283
  _CleanDirectory(pathutils.JOB_QUEUE_ARCHIVE_DIR)
284

    
285

    
286
def GetMasterInfo():
287
  """Returns master information.
288

289
  This is an utility function to compute master information, either
290
  for consumption here or from the node daemon.
291

292
  @rtype: tuple
293
  @return: master_netdev, master_ip, master_name, primary_ip_family,
294
    master_netmask
295
  @raise RPCFail: in case of errors
296

297
  """
298
  try:
299
    cfg = _GetConfig()
300
    master_netdev = cfg.GetMasterNetdev()
301
    master_ip = cfg.GetMasterIP()
302
    master_netmask = cfg.GetMasterNetmask()
303
    master_node = cfg.GetMasterNode()
304
    primary_ip_family = cfg.GetPrimaryIPFamily()
305
  except errors.ConfigurationError, err:
306
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
307
  return (master_netdev, master_ip, master_node, primary_ip_family,
308
          master_netmask)
309

    
310

    
311
def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
312
  """Decorator that runs hooks before and after the decorated function.
313

314
  @type hook_opcode: string
315
  @param hook_opcode: opcode of the hook
316
  @type hooks_path: string
317
  @param hooks_path: path of the hooks
318
  @type env_builder_fn: function
319
  @param env_builder_fn: function that returns a dictionary containing the
320
    environment variables for the hooks. Will get all the parameters of the
321
    decorated function.
322
  @raise RPCFail: in case of pre-hook failure
323

324
  """
325
  def decorator(fn):
326
    def wrapper(*args, **kwargs):
327
      _, myself = ssconf.GetMasterAndMyself()
328
      nodes = ([myself], [myself])  # these hooks run locally
329

    
330
      env_fn = compat.partial(env_builder_fn, *args, **kwargs)
331

    
332
      cfg = _GetConfig()
333
      hr = HooksRunner()
334
      hm = hooksmaster.HooksMaster(hook_opcode, hooks_path, nodes,
335
                                   hr.RunLocalHooks, None, env_fn,
336
                                   logging.warning, cfg.GetClusterName(),
337
                                   cfg.GetMasterNode())
338
      hm.RunPhase(constants.HOOKS_PHASE_PRE)
339
      result = fn(*args, **kwargs)
340
      hm.RunPhase(constants.HOOKS_PHASE_POST)
341

    
342
      return result
343
    return wrapper
344
  return decorator
345

    
346

    
347
def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
348
  """Builds environment variables for master IP hooks.
349

350
  @type master_params: L{objects.MasterNetworkParameters}
351
  @param master_params: network parameters of the master
352
  @type use_external_mip_script: boolean
353
  @param use_external_mip_script: whether to use an external master IP
354
    address setup script (unused, but necessary per the implementation of the
355
    _RunLocalHooks decorator)
356

357
  """
358
  # pylint: disable=W0613
359
  ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
360
  env = {
361
    "MASTER_NETDEV": master_params.netdev,
362
    "MASTER_IP": master_params.ip,
363
    "MASTER_NETMASK": str(master_params.netmask),
364
    "CLUSTER_IP_VERSION": str(ver),
365
  }
366

    
367
  return env
368

    
369

    
370
def _RunMasterSetupScript(master_params, action, use_external_mip_script):
371
  """Execute the master IP address setup script.
372

373
  @type master_params: L{objects.MasterNetworkParameters}
374
  @param master_params: network parameters of the master
375
  @type action: string
376
  @param action: action to pass to the script. Must be one of
377
    L{backend._MASTER_START} or L{backend._MASTER_STOP}
378
  @type use_external_mip_script: boolean
379
  @param use_external_mip_script: whether to use an external master IP
380
    address setup script
381
  @raise backend.RPCFail: if there are errors during the execution of the
382
    script
383

384
  """
385
  env = _BuildMasterIpEnv(master_params)
386

    
387
  if use_external_mip_script:
388
    setup_script = pathutils.EXTERNAL_MASTER_SETUP_SCRIPT
389
  else:
390
    setup_script = pathutils.DEFAULT_MASTER_SETUP_SCRIPT
391

    
392
  result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
393

    
394
  if result.failed:
395
    _Fail("Failed to %s the master IP. Script return value: %s, output: '%s'" %
396
          (action, result.exit_code, result.output), log=True)
397

    
398

    
399
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
400
               _BuildMasterIpEnv)
401
def ActivateMasterIp(master_params, use_external_mip_script):
402
  """Activate the IP address of the master daemon.
403

404
  @type master_params: L{objects.MasterNetworkParameters}
405
  @param master_params: network parameters of the master
406
  @type use_external_mip_script: boolean
407
  @param use_external_mip_script: whether to use an external master IP
408
    address setup script
409
  @raise RPCFail: in case of errors during the IP startup
410

411
  """
412
  _RunMasterSetupScript(master_params, _MASTER_START,
413
                        use_external_mip_script)
414

    
415

    
416
def StartMasterDaemons(no_voting):
417
  """Activate local node as master node.
418

419
  The function will start the master daemons (ganeti-masterd and ganeti-rapi).
420

421
  @type no_voting: boolean
422
  @param no_voting: whether to start ganeti-masterd without a node vote
423
      but still non-interactively
424
  @rtype: None
425

426
  """
427

    
428
  if no_voting:
429
    masterd_args = "--no-voting --yes-do-it"
430
  else:
431
    masterd_args = ""
432

    
433
  env = {
434
    "EXTRA_MASTERD_ARGS": masterd_args,
435
    }
436

    
437
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
438
  if result.failed:
439
    msg = "Can't start Ganeti master: %s" % result.output
440
    logging.error(msg)
441
    _Fail(msg)
442

    
443

    
444
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
445
               _BuildMasterIpEnv)
446
def DeactivateMasterIp(master_params, use_external_mip_script):
447
  """Deactivate the master IP on this node.
448

449
  @type master_params: L{objects.MasterNetworkParameters}
450
  @param master_params: network parameters of the master
451
  @type use_external_mip_script: boolean
452
  @param use_external_mip_script: whether to use an external master IP
453
    address setup script
454
  @raise RPCFail: in case of errors during the IP turndown
455

456
  """
457
  _RunMasterSetupScript(master_params, _MASTER_STOP,
458
                        use_external_mip_script)
459

    
460

    
461
def StopMasterDaemons():
462
  """Stop the master daemons on this node.
463

464
  Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
465

466
  @rtype: None
467

468
  """
469
  # TODO: log and report back to the caller the error failures; we
470
  # need to decide in which case we fail the RPC for this
471

    
472
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
473
  if result.failed:
474
    logging.error("Could not stop Ganeti master, command %s had exitcode %s"
475
                  " and error %s",
476
                  result.cmd, result.exit_code, result.output)
477

    
478

    
479
def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
480
  """Change the netmask of the master IP.
481

482
  @param old_netmask: the old value of the netmask
483
  @param netmask: the new value of the netmask
484
  @param master_ip: the master IP
485
  @param master_netdev: the master network device
486

487
  """
488
  if old_netmask == netmask:
489
    return
490

    
491
  if not netutils.IPAddress.Own(master_ip):
492
    _Fail("The master IP address is not up, not attempting to change its"
493
          " netmask")
494

    
495
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
496
                         "%s/%s" % (master_ip, netmask),
497
                         "dev", master_netdev, "label",
498
                         "%s:0" % master_netdev])
499
  if result.failed:
500
    _Fail("Could not set the new netmask on the master IP address")
501

    
502
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
503
                         "%s/%s" % (master_ip, old_netmask),
504
                         "dev", master_netdev, "label",
505
                         "%s:0" % master_netdev])
506
  if result.failed:
507
    _Fail("Could not bring down the master IP address with the old netmask")
508

    
509

    
510
def EtcHostsModify(mode, host, ip):
511
  """Modify a host entry in /etc/hosts.
512

513
  @param mode: The mode to operate. Either add or remove entry
514
  @param host: The host to operate on
515
  @param ip: The ip associated with the entry
516

517
  """
518
  if mode == constants.ETC_HOSTS_ADD:
519
    if not ip:
520
      RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
521
              " present")
522
    utils.AddHostToEtcHosts(host, ip)
523
  elif mode == constants.ETC_HOSTS_REMOVE:
524
    if ip:
525
      RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
526
              " parameter is present")
527
    utils.RemoveHostFromEtcHosts(host)
528
  else:
529
    RPCFail("Mode not supported")
530

    
531

    
532
def LeaveCluster(modify_ssh_setup):
533
  """Cleans up and remove the current node.
534

535
  This function cleans up and prepares the current node to be removed
536
  from the cluster.
537

538
  If processing is successful, then it raises an
539
  L{errors.QuitGanetiException} which is used as a special case to
540
  shutdown the node daemon.
541

542
  @param modify_ssh_setup: boolean
543

544
  """
545
  _CleanDirectory(pathutils.DATA_DIR)
546
  _CleanDirectory(pathutils.CRYPTO_KEYS_DIR)
547
  JobQueuePurge()
548

    
549
  if modify_ssh_setup:
550
    try:
551
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
552

    
553
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
554

    
555
      utils.RemoveFile(priv_key)
556
      utils.RemoveFile(pub_key)
557
    except errors.OpExecError:
558
      logging.exception("Error while processing ssh files")
559

    
560
  try:
561
    utils.RemoveFile(pathutils.CONFD_HMAC_KEY)
562
    utils.RemoveFile(pathutils.RAPI_CERT_FILE)
563
    utils.RemoveFile(pathutils.SPICE_CERT_FILE)
564
    utils.RemoveFile(pathutils.SPICE_CACERT_FILE)
565
    utils.RemoveFile(pathutils.NODED_CERT_FILE)
566
  except: # pylint: disable=W0702
567
    logging.exception("Error while removing cluster secrets")
568

    
569
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.CONFD])
570
  if result.failed:
571
    logging.error("Command %s failed with exitcode %s and error %s",
572
                  result.cmd, result.exit_code, result.output)
573

    
574
  # Raise a custom exception (handled in ganeti-noded)
575
  raise errors.QuitGanetiException(True, "Shutdown scheduled")
576

    
577

    
578
def _CheckStorageParams(params, num_params):
579
  """Performs sanity checks for storage parameters.
580

581
  @type params: list
582
  @param params: list of storage parameters
583
  @type num_params: int
584
  @param num_params: expected number of parameters
585

586
  """
587
  if params is None:
588
    raise errors.ProgrammerError("No storage parameters for storage"
589
                                 " reporting is provided.")
590
  if not isinstance(params, list):
591
    raise errors.ProgrammerError("The storage parameters are not of type"
592
                                 " list: '%s'" % params)
593
  if not len(params) == num_params:
594
    raise errors.ProgrammerError("Did not receive the expected number of"
595
                                 "storage parameters: expected %s,"
596
                                 " received '%s'" % (num_params, len(params)))
597

    
598

    
599
def _CheckLvmStorageParams(params):
600
  """Performs sanity check for the 'exclusive storage' flag.
601

602
  @see: C{_CheckStorageParams}
603

604
  """
605
  _CheckStorageParams(params, 1)
606
  excl_stor = params[0]
607
  if not isinstance(params[0], bool):
608
    raise errors.ProgrammerError("Exclusive storage parameter is not"
609
                                 " boolean: '%s'." % excl_stor)
610
  return excl_stor
611

    
612

    
613
def _GetLvmVgSpaceInfo(name, params):
614
  """Wrapper around C{_GetVgInfo} which checks the storage parameters.
615

616
  @type name: string
617
  @param name: name of the volume group
618
  @type params: list
619
  @param params: list of storage parameters, which in this case should be
620
    containing only one for exclusive storage
621

622
  """
623
  excl_stor = _CheckLvmStorageParams(params)
624
  return _GetVgInfo(name, excl_stor)
625

    
626

    
627
def _GetVgInfo(
628
    name, excl_stor, info_fn=bdev.LogicalVolume.GetVGInfo):
629
  """Retrieves information about a LVM volume group.
630

631
  """
632
  # TODO: GetVGInfo supports returning information for multiple VGs at once
633
  vginfo = info_fn([name], excl_stor)
634
  if vginfo:
635
    vg_free = int(round(vginfo[0][0], 0))
636
    vg_size = int(round(vginfo[0][1], 0))
637
  else:
638
    vg_free = None
639
    vg_size = None
640

    
641
  return {
642
    "type": constants.ST_LVM_VG,
643
    "name": name,
644
    "storage_free": vg_free,
645
    "storage_size": vg_size,
646
    }
647

    
648

    
649
def _GetLvmPvSpaceInfo(name, params):
650
  """Wrapper around C{_GetVgSpindlesInfo} with sanity checks.
651

652
  @see: C{_GetLvmVgSpaceInfo}
653

654
  """
655
  excl_stor = _CheckLvmStorageParams(params)
656
  return _GetVgSpindlesInfo(name, excl_stor)
657

    
658

    
659
def _GetVgSpindlesInfo(
660
    name, excl_stor, info_fn=bdev.LogicalVolume.GetVgSpindlesInfo):
661
  """Retrieves information about spindles in an LVM volume group.
662

663
  @type name: string
664
  @param name: VG name
665
  @type excl_stor: bool
666
  @param excl_stor: exclusive storage
667
  @rtype: dict
668
  @return: dictionary whose keys are "name", "vg_free", "vg_size" for VG name,
669
      free spindles, total spindles respectively
670

671
  """
672
  if excl_stor:
673
    (vg_free, vg_size) = info_fn(name)
674
  else:
675
    vg_free = 0
676
    vg_size = 0
677
  return {
678
    "type": constants.ST_LVM_PV,
679
    "name": name,
680
    "storage_free": vg_free,
681
    "storage_size": vg_size,
682
    }
683

    
684

    
685
def _GetHvInfo(name, hvparams, get_hv_fn=hypervisor.GetHypervisor):
686
  """Retrieves node information from a hypervisor.
687

688
  The information returned depends on the hypervisor. Common items:
689

690
    - vg_size is the size of the configured volume group in MiB
691
    - vg_free is the free size of the volume group in MiB
692
    - memory_dom0 is the memory allocated for domain0 in MiB
693
    - memory_free is the currently available (free) ram in MiB
694
    - memory_total is the total number of ram in MiB
695
    - hv_version: the hypervisor version, if available
696

697
  @type hvparams: dict of string
698
  @param hvparams: the hypervisor's hvparams
699

700
  """
701
  return get_hv_fn(name).GetNodeInfo(hvparams=hvparams)
702

    
703

    
704
def _GetHvInfoAll(hv_specs, get_hv_fn=hypervisor.GetHypervisor):
705
  """Retrieves node information for all hypervisors.
706

707
  See C{_GetHvInfo} for information on the output.
708

709
  @type hv_specs: list of pairs (string, dict of strings)
710
  @param hv_specs: list of pairs of a hypervisor's name and its hvparams
711

712
  """
713
  if hv_specs is None:
714
    return None
715

    
716
  result = []
717
  for hvname, hvparams in hv_specs:
718
    result.append(_GetHvInfo(hvname, hvparams, get_hv_fn))
719
  return result
720

    
721

    
722
def _GetNamedNodeInfo(names, fn):
723
  """Calls C{fn} for all names in C{names} and returns a dictionary.
724

725
  @rtype: None or dict
726

727
  """
728
  if names is None:
729
    return None
730
  else:
731
    return map(fn, names)
732

    
733

    
734
def GetNodeInfo(storage_units, hv_specs):
735
  """Gives back a hash with different information about the node.
736

737
  @type storage_units: list of tuples (string, string, list)
738
  @param storage_units: List of tuples (storage unit, identifier, parameters) to
739
    ask for disk space information. In case of lvm-vg, the identifier is
740
    the VG name. The parameters can contain additional, storage-type-specific
741
    parameters, for example exclusive storage for lvm storage.
742
  @type hv_specs: list of pairs (string, dict of strings)
743
  @param hv_specs: list of pairs of a hypervisor's name and its hvparams
744
  @rtype: tuple; (string, None/dict, None/dict)
745
  @return: Tuple containing boot ID, volume group information and hypervisor
746
    information
747

748
  """
749
  bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
750
  storage_info = _GetNamedNodeInfo(
751
    storage_units,
752
    (lambda (storage_type, storage_key, storage_params):
753
        _ApplyStorageInfoFunction(storage_type, storage_key, storage_params)))
754
  hv_info = _GetHvInfoAll(hv_specs)
755
  return (bootid, storage_info, hv_info)
756

    
757

    
758
def _GetFileStorageSpaceInfo(path, params):
759
  """Wrapper around filestorage.GetSpaceInfo.
760

761
  The purpose of this wrapper is to call filestorage.GetFileStorageSpaceInfo
762
  and ignore the *args parameter to not leak it into the filestorage
763
  module's code.
764

765
  @see: C{filestorage.GetFileStorageSpaceInfo} for description of the
766
    parameters.
767

768
  """
769
  _CheckStorageParams(params, 0)
770
  return filestorage.GetFileStorageSpaceInfo(path)
771

    
772

    
773
# FIXME: implement storage reporting for all missing storage types.
774
_STORAGE_TYPE_INFO_FN = {
775
  constants.ST_BLOCK: None,
776
  constants.ST_DISKLESS: None,
777
  constants.ST_EXT: None,
778
  constants.ST_FILE: _GetFileStorageSpaceInfo,
779
  constants.ST_LVM_PV: _GetLvmPvSpaceInfo,
780
  constants.ST_LVM_VG: _GetLvmVgSpaceInfo,
781
  constants.ST_SHARED_FILE: None,
782
  constants.ST_RADOS: None,
783
}
784

    
785

    
786
def _ApplyStorageInfoFunction(storage_type, storage_key, *args):
787
  """Looks up and applies the correct function to calculate free and total
788
  storage for the given storage type.
789

790
  @type storage_type: string
791
  @param storage_type: the storage type for which the storage shall be reported.
792
  @type storage_key: string
793
  @param storage_key: identifier of a storage unit, e.g. the volume group name
794
    of an LVM storage unit
795
  @type args: any
796
  @param args: various parameters that can be used for storage reporting. These
797
    parameters and their semantics vary from storage type to storage type and
798
    are just propagated in this function.
799
  @return: the results of the application of the storage space function (see
800
    _STORAGE_TYPE_INFO_FN) if storage space reporting is implemented for that
801
    storage type
802
  @raises NotImplementedError: for storage types who don't support space
803
    reporting yet
804
  """
805
  fn = _STORAGE_TYPE_INFO_FN[storage_type]
806
  if fn is not None:
807
    return fn(storage_key, *args)
808
  else:
809
    raise NotImplementedError
810

    
811

    
812
def _CheckExclusivePvs(pvi_list):
813
  """Check that PVs are not shared among LVs
814

815
  @type pvi_list: list of L{objects.LvmPvInfo} objects
816
  @param pvi_list: information about the PVs
817

818
  @rtype: list of tuples (string, list of strings)
819
  @return: offending volumes, as tuples: (pv_name, [lv1_name, lv2_name...])
820

821
  """
822
  res = []
823
  for pvi in pvi_list:
824
    if len(pvi.lv_list) > 1:
825
      res.append((pvi.name, pvi.lv_list))
826
  return res
827

    
828

    
829
def _VerifyHypervisors(what, vm_capable, result, all_hvparams,
830
                       get_hv_fn=hypervisor.GetHypervisor):
831
  """Verifies the hypervisor. Appends the results to the 'results' list.
832

833
  @type what: C{dict}
834
  @param what: a dictionary of things to check
835
  @type vm_capable: boolean
836
  @param vm_capable: whether or not this node is vm capable
837
  @type result: dict
838
  @param result: dictionary of verification results; results of the
839
    verifications in this function will be added here
840
  @type all_hvparams: dict of dict of string
841
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
842
  @type get_hv_fn: function
843
  @param get_hv_fn: function to retrieve the hypervisor, to improve testability
844

845
  """
846
  if not vm_capable:
847
    return
848

    
849
  if constants.NV_HYPERVISOR in what:
850
    result[constants.NV_HYPERVISOR] = {}
851
    for hv_name in what[constants.NV_HYPERVISOR]:
852
      hvparams = all_hvparams[hv_name]
853
      try:
854
        val = get_hv_fn(hv_name).Verify(hvparams=hvparams)
855
      except errors.HypervisorError, err:
856
        val = "Error while checking hypervisor: %s" % str(err)
857
      result[constants.NV_HYPERVISOR][hv_name] = val
858

    
859

    
860
def _VerifyHvparams(what, vm_capable, result,
861
                    get_hv_fn=hypervisor.GetHypervisor):
862
  """Verifies the hvparams. Appends the results to the 'results' list.
863

864
  @type what: C{dict}
865
  @param what: a dictionary of things to check
866
  @type vm_capable: boolean
867
  @param vm_capable: whether or not this node is vm capable
868
  @type result: dict
869
  @param result: dictionary of verification results; results of the
870
    verifications in this function will be added here
871
  @type get_hv_fn: function
872
  @param get_hv_fn: function to retrieve the hypervisor, to improve testability
873

874
  """
875
  if not vm_capable:
876
    return
877

    
878
  if constants.NV_HVPARAMS in what:
879
    result[constants.NV_HVPARAMS] = []
880
    for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
881
      try:
882
        logging.info("Validating hv %s, %s", hv_name, hvparms)
883
        get_hv_fn(hv_name).ValidateParameters(hvparms)
884
      except errors.HypervisorError, err:
885
        result[constants.NV_HVPARAMS].append((source, hv_name, str(err)))
886

    
887

    
888
def _VerifyInstanceList(what, vm_capable, result, all_hvparams):
889
  """Verifies the instance list.
890

891
  @type what: C{dict}
892
  @param what: a dictionary of things to check
893
  @type vm_capable: boolean
894
  @param vm_capable: whether or not this node is vm capable
895
  @type result: dict
896
  @param result: dictionary of verification results; results of the
897
    verifications in this function will be added here
898
  @type all_hvparams: dict of dict of string
899
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
900

901
  """
902
  if constants.NV_INSTANCELIST in what and vm_capable:
903
    # GetInstanceList can fail
904
    try:
905
      val = GetInstanceList(what[constants.NV_INSTANCELIST],
906
                            all_hvparams=all_hvparams)
907
    except RPCFail, err:
908
      val = str(err)
909
    result[constants.NV_INSTANCELIST] = val
910

    
911

    
912
def _VerifyNodeInfo(what, vm_capable, result, all_hvparams):
913
  """Verifies the node info.
914

915
  @type what: C{dict}
916
  @param what: a dictionary of things to check
917
  @type vm_capable: boolean
918
  @param vm_capable: whether or not this node is vm capable
919
  @type result: dict
920
  @param result: dictionary of verification results; results of the
921
    verifications in this function will be added here
922
  @type all_hvparams: dict of dict of string
923
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
924

925
  """
926
  if constants.NV_HVINFO in what and vm_capable:
927
    hvname = what[constants.NV_HVINFO]
928
    hyper = hypervisor.GetHypervisor(hvname)
929
    hvparams = all_hvparams[hvname]
930
    result[constants.NV_HVINFO] = hyper.GetNodeInfo(hvparams=hvparams)
931

    
932

    
933
def _VerifyClientCertificate(cert_file=pathutils.NODED_CLIENT_CERT_FILE):
934
  """Verify the existance and validity of the client SSL certificate.
935

936
  """
937
  create_cert_cmd = "gnt-cluster renew-crypto --new-node-certificates"
938
  if not os.path.exists(cert_file):
939
    return (constants.CV_ERROR,
940
            "The client certificate does not exist. Run '%s' to create"
941
            "client certificates for all nodes." % create_cert_cmd)
942

    
943
  (errcode, msg) = utils.VerifyCertificate(cert_file)
944
  if errcode is not None:
945
    return (errcode, msg)
946
  else:
947
    # if everything is fine, we return the digest to be compared to the config
948
    return (None, utils.GetCertificateDigest(cert_filename=cert_file))
949

    
950

    
951
def VerifyNode(what, cluster_name, all_hvparams, node_groups, groups_cfg):
952
  """Verify the status of the local node.
953

954
  Based on the input L{what} parameter, various checks are done on the
955
  local node.
956

957
  If the I{filelist} key is present, this list of
958
  files is checksummed and the file/checksum pairs are returned.
959

960
  If the I{nodelist} key is present, we check that we have
961
  connectivity via ssh with the target nodes (and check the hostname
962
  report).
963

964
  If the I{node-net-test} key is present, we check that we have
965
  connectivity to the given nodes via both primary IP and, if
966
  applicable, secondary IPs.
967

968
  @type what: C{dict}
969
  @param what: a dictionary of things to check:
970
      - filelist: list of files for which to compute checksums
971
      - nodelist: list of nodes we should check ssh communication with
972
      - node-net-test: list of nodes we should check node daemon port
973
        connectivity with
974
      - hypervisor: list with hypervisors to run the verify for
975
  @type cluster_name: string
976
  @param cluster_name: the cluster's name
977
  @type all_hvparams: dict of dict of strings
978
  @param all_hvparams: a dictionary mapping hypervisor names to hvparams
979
  @type node_groups: a dict of strings
980
  @param node_groups: node _names_ mapped to their group uuids (it's enough to
981
      have only those nodes that are in `what["nodelist"]`)
982
  @type groups_cfg: a dict of dict of strings
983
  @param groups_cfg: a dictionary mapping group uuids to their configuration
984
  @rtype: dict
985
  @return: a dictionary with the same keys as the input dict, and
986
      values representing the result of the checks
987

988
  """
989
  result = {}
990
  my_name = netutils.Hostname.GetSysName()
991
  port = netutils.GetDaemonPort(constants.NODED)
992
  vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
993

    
994
  _VerifyHypervisors(what, vm_capable, result, all_hvparams)
995
  _VerifyHvparams(what, vm_capable, result)
996

    
997
  if constants.NV_FILELIST in what:
998
    fingerprints = utils.FingerprintFiles(map(vcluster.LocalizeVirtualPath,
999
                                              what[constants.NV_FILELIST]))
1000
    result[constants.NV_FILELIST] = \
1001
      dict((vcluster.MakeVirtualPath(key), value)
1002
           for (key, value) in fingerprints.items())
1003

    
1004
  if constants.NV_CLIENT_CERT in what:
1005
    result[constants.NV_CLIENT_CERT] = _VerifyClientCertificate()
1006

    
1007
  if constants.NV_NODELIST in what:
1008
    (nodes, bynode) = what[constants.NV_NODELIST]
1009

    
1010
    # Add nodes from other groups (different for each node)
1011
    try:
1012
      nodes.extend(bynode[my_name])
1013
    except KeyError:
1014
      pass
1015

    
1016
    # Use a random order
1017
    random.shuffle(nodes)
1018

    
1019
    # Try to contact all nodes
1020
    val = {}
1021
    for node in nodes:
1022
      params = groups_cfg.get(node_groups.get(node))
1023
      ssh_port = params["ndparams"].get(constants.ND_SSH_PORT)
1024
      logging.debug("Ssh port %s (None = default) for node %s",
1025
                    str(ssh_port), node)
1026
      success, message = _GetSshRunner(cluster_name). \
1027
                            VerifyNodeHostname(node, ssh_port)
1028
      if not success:
1029
        val[node] = message
1030

    
1031
    result[constants.NV_NODELIST] = val
1032

    
1033
  if constants.NV_NODENETTEST in what:
1034
    result[constants.NV_NODENETTEST] = tmp = {}
1035
    my_pip = my_sip = None
1036
    for name, pip, sip in what[constants.NV_NODENETTEST]:
1037
      if name == my_name:
1038
        my_pip = pip
1039
        my_sip = sip
1040
        break
1041
    if not my_pip:
1042
      tmp[my_name] = ("Can't find my own primary/secondary IP"
1043
                      " in the node list")
1044
    else:
1045
      for name, pip, sip in what[constants.NV_NODENETTEST]:
1046
        fail = []
1047
        if not netutils.TcpPing(pip, port, source=my_pip):
1048
          fail.append("primary")
1049
        if sip != pip:
1050
          if not netutils.TcpPing(sip, port, source=my_sip):
1051
            fail.append("secondary")
1052
        if fail:
1053
          tmp[name] = ("failure using the %s interface(s)" %
1054
                       " and ".join(fail))
1055

    
1056
  if constants.NV_MASTERIP in what:
1057
    # FIXME: add checks on incoming data structures (here and in the
1058
    # rest of the function)
1059
    master_name, master_ip = what[constants.NV_MASTERIP]
1060
    if master_name == my_name:
1061
      source = constants.IP4_ADDRESS_LOCALHOST
1062
    else:
1063
      source = None
1064
    result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
1065
                                                     source=source)
1066

    
1067
  if constants.NV_USERSCRIPTS in what:
1068
    result[constants.NV_USERSCRIPTS] = \
1069
      [script for script in what[constants.NV_USERSCRIPTS]
1070
       if not utils.IsExecutable(script)]
1071

    
1072
  if constants.NV_OOB_PATHS in what:
1073
    result[constants.NV_OOB_PATHS] = tmp = []
1074
    for path in what[constants.NV_OOB_PATHS]:
1075
      try:
1076
        st = os.stat(path)
1077
      except OSError, err:
1078
        tmp.append("error stating out of band helper: %s" % err)
1079
      else:
1080
        if stat.S_ISREG(st.st_mode):
1081
          if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
1082
            tmp.append(None)
1083
          else:
1084
            tmp.append("out of band helper %s is not executable" % path)
1085
        else:
1086
          tmp.append("out of band helper %s is not a file" % path)
1087

    
1088
  if constants.NV_LVLIST in what and vm_capable:
1089
    try:
1090
      val = GetVolumeList(utils.ListVolumeGroups().keys())
1091
    except RPCFail, err:
1092
      val = str(err)
1093
    result[constants.NV_LVLIST] = val
1094

    
1095
  _VerifyInstanceList(what, vm_capable, result, all_hvparams)
1096

    
1097
  if constants.NV_VGLIST in what and vm_capable:
1098
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
1099

    
1100
  if constants.NV_PVLIST in what and vm_capable:
1101
    check_exclusive_pvs = constants.NV_EXCLUSIVEPVS in what
1102
    val = bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
1103
                                       filter_allocatable=False,
1104
                                       include_lvs=check_exclusive_pvs)
1105
    if check_exclusive_pvs:
1106
      result[constants.NV_EXCLUSIVEPVS] = _CheckExclusivePvs(val)
1107
      for pvi in val:
1108
        # Avoid sending useless data on the wire
1109
        pvi.lv_list = []
1110
    result[constants.NV_PVLIST] = map(objects.LvmPvInfo.ToDict, val)
1111

    
1112
  if constants.NV_VERSION in what:
1113
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
1114
                                    constants.RELEASE_VERSION)
1115

    
1116
  _VerifyNodeInfo(what, vm_capable, result, all_hvparams)
1117

    
1118
  if constants.NV_DRBDVERSION in what and vm_capable:
1119
    try:
1120
      drbd_version = DRBD8.GetProcInfo().GetVersionString()
1121
    except errors.BlockDeviceError, err:
1122
      logging.warning("Can't get DRBD version", exc_info=True)
1123
      drbd_version = str(err)
1124
    result[constants.NV_DRBDVERSION] = drbd_version
1125

    
1126
  if constants.NV_DRBDLIST in what and vm_capable:
1127
    try:
1128
      used_minors = drbd.DRBD8.GetUsedDevs()
1129
    except errors.BlockDeviceError, err:
1130
      logging.warning("Can't get used minors list", exc_info=True)
1131
      used_minors = str(err)
1132
    result[constants.NV_DRBDLIST] = used_minors
1133

    
1134
  if constants.NV_DRBDHELPER in what and vm_capable:
1135
    status = True
1136
    try:
1137
      payload = drbd.DRBD8.GetUsermodeHelper()
1138
    except errors.BlockDeviceError, err:
1139
      logging.error("Can't get DRBD usermode helper: %s", str(err))
1140
      status = False
1141
      payload = str(err)
1142
    result[constants.NV_DRBDHELPER] = (status, payload)
1143

    
1144
  if constants.NV_NODESETUP in what:
1145
    result[constants.NV_NODESETUP] = tmpr = []
1146
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
1147
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
1148
                  " under /sys, missing required directories /sys/block"
1149
                  " and /sys/class/net")
1150
    if (not os.path.isdir("/proc/sys") or
1151
        not os.path.isfile("/proc/sysrq-trigger")):
1152
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
1153
                  " under /proc, missing required directory /proc/sys and"
1154
                  " the file /proc/sysrq-trigger")
1155

    
1156
  if constants.NV_TIME in what:
1157
    result[constants.NV_TIME] = utils.SplitTime(time.time())
1158

    
1159
  if constants.NV_OSLIST in what and vm_capable:
1160
    result[constants.NV_OSLIST] = DiagnoseOS()
1161

    
1162
  if constants.NV_BRIDGES in what and vm_capable:
1163
    result[constants.NV_BRIDGES] = [bridge
1164
                                    for bridge in what[constants.NV_BRIDGES]
1165
                                    if not utils.BridgeExists(bridge)]
1166

    
1167
  if what.get(constants.NV_ACCEPTED_STORAGE_PATHS) == my_name:
1168
    result[constants.NV_ACCEPTED_STORAGE_PATHS] = \
1169
        filestorage.ComputeWrongFileStoragePaths()
1170

    
1171
  if what.get(constants.NV_FILE_STORAGE_PATH):
1172
    pathresult = filestorage.CheckFileStoragePath(
1173
        what[constants.NV_FILE_STORAGE_PATH])
1174
    if pathresult:
1175
      result[constants.NV_FILE_STORAGE_PATH] = pathresult
1176

    
1177
  if what.get(constants.NV_SHARED_FILE_STORAGE_PATH):
1178
    pathresult = filestorage.CheckFileStoragePath(
1179
        what[constants.NV_SHARED_FILE_STORAGE_PATH])
1180
    if pathresult:
1181
      result[constants.NV_SHARED_FILE_STORAGE_PATH] = pathresult
1182

    
1183
  return result
1184

    
1185

    
1186
def GetCryptoTokens(token_requests):
1187
  """Perform actions on the node's cryptographic tokens.
1188

1189
  Token types can be 'ssl' or 'ssh'. So far only some actions are implemented
1190
  for 'ssl'. Action 'get' returns the digest of the public client ssl
1191
  certificate. Action 'create' creates a new client certificate and private key
1192
  and also returns the digest of the certificate. The third parameter of a
1193
  token request are optional parameters for the actions, so far only the
1194
  filename is supported.
1195

1196
  @type token_requests: list of tuples of (string, string, dict), where the
1197
    first string is in constants.CRYPTO_TYPES, the second in
1198
    constants.CRYPTO_ACTIONS. The third parameter is a dictionary of string
1199
    to string.
1200
  @param token_requests: list of requests of cryptographic tokens and actions
1201
    to perform on them. The actions come with a dictionary of options.
1202
  @rtype: list of tuples (string, string)
1203
  @return: list of tuples of the token type and the public crypto token
1204

1205
  """
1206
  _VALID_CERT_FILES = [pathutils.NODED_CERT_FILE,
1207
                       pathutils.NODED_CLIENT_CERT_FILE,
1208
                       pathutils.NODED_CLIENT_CERT_FILE_TMP]
1209
  _DEFAULT_CERT_FILE = pathutils.NODED_CLIENT_CERT_FILE
1210
  tokens = []
1211
  for (token_type, action, options) in token_requests:
1212
    if token_type not in constants.CRYPTO_TYPES:
1213
      raise errors.ProgrammerError("Token type '%s' not supported." %
1214
                                   token_type)
1215
    if action not in constants.CRYPTO_ACTIONS:
1216
      raise errors.ProgrammerError("Action '%s' is not supported." %
1217
                                   action)
1218
    if token_type == constants.CRYPTO_TYPE_SSL_DIGEST:
1219
      if action == constants.CRYPTO_ACTION_CREATE:
1220
        cert_filename = None
1221
        if options:
1222
          cert_filename = options.get(constants.CRYPTO_OPTION_CERT_FILE)
1223
        if not cert_filename:
1224
          cert_filename = _DEFAULT_CERT_FILE
1225
        # For security reason, we don't allow arbitrary filenames
1226
        if not cert_filename in _VALID_CERT_FILES:
1227
          raise errors.ProgrammerError(
1228
            "The certificate file name path '%s' is not allowed." %
1229
            cert_filename)
1230
        utils.GenerateNewSslCert(
1231
          True, cert_filename,
1232
          "Create new client SSL certificate in %s." % cert_filename)
1233
        tokens.append((token_type,
1234
                       utils.GetCertificateDigest(
1235
                         cert_filename=cert_filename)))
1236
      elif action == constants.CRYPTO_ACTION_GET:
1237
        tokens.append((token_type,
1238
                       utils.GetCertificateDigest()))
1239
  return tokens
1240

    
1241

    
1242
def GetBlockDevSizes(devices):
1243
  """Return the size of the given block devices
1244

1245
  @type devices: list
1246
  @param devices: list of block device nodes to query
1247
  @rtype: dict
1248
  @return:
1249
    dictionary of all block devices under /dev (key). The value is their
1250
    size in MiB.
1251

1252
    {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
1253

1254
  """
1255
  DEV_PREFIX = "/dev/"
1256
  blockdevs = {}
1257

    
1258
  for devpath in devices:
1259
    if not utils.IsBelowDir(DEV_PREFIX, devpath):
1260
      continue
1261

    
1262
    try:
1263
      st = os.stat(devpath)
1264
    except EnvironmentError, err:
1265
      logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
1266
      continue
1267

    
1268
    if stat.S_ISBLK(st.st_mode):
1269
      result = utils.RunCmd(["blockdev", "--getsize64", devpath])
1270
      if result.failed:
1271
        # We don't want to fail, just do not list this device as available
1272
        logging.warning("Cannot get size for block device %s", devpath)
1273
        continue
1274

    
1275
      size = int(result.stdout) / (1024 * 1024)
1276
      blockdevs[devpath] = size
1277
  return blockdevs
1278

    
1279

    
1280
def GetVolumeList(vg_names):
1281
  """Compute list of logical volumes and their size.
1282

1283
  @type vg_names: list
1284
  @param vg_names: the volume groups whose LVs we should list, or
1285
      empty for all volume groups
1286
  @rtype: dict
1287
  @return:
1288
      dictionary of all partions (key) with value being a tuple of
1289
      their size (in MiB), inactive and online status::
1290

1291
        {'xenvg/test1': ('20.06', True, True)}
1292

1293
      in case of errors, a string is returned with the error
1294
      details.
1295

1296
  """
1297
  lvs = {}
1298
  sep = "|"
1299
  if not vg_names:
1300
    vg_names = []
1301
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
1302
                         "--separator=%s" % sep,
1303
                         "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
1304
  if result.failed:
1305
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
1306

    
1307
  for line in result.stdout.splitlines():
1308
    line = line.strip()
1309
    match = _LVSLINE_REGEX.match(line)
1310
    if not match:
1311
      logging.error("Invalid line returned from lvs output: '%s'", line)
1312
      continue
1313
    vg_name, name, size, attr = match.groups()
1314
    inactive = attr[4] == "-"
1315
    online = attr[5] == "o"
1316
    virtual = attr[0] == "v"
1317
    if virtual:
1318
      # we don't want to report such volumes as existing, since they
1319
      # don't really hold data
1320
      continue
1321
    lvs[vg_name + "/" + name] = (size, inactive, online)
1322

    
1323
  return lvs
1324

    
1325

    
1326
def ListVolumeGroups():
1327
  """List the volume groups and their size.
1328

1329
  @rtype: dict
1330
  @return: dictionary with keys volume name and values the
1331
      size of the volume
1332

1333
  """
1334
  return utils.ListVolumeGroups()
1335

    
1336

    
1337
def NodeVolumes():
1338
  """List all volumes on this node.
1339

1340
  @rtype: list
1341
  @return:
1342
    A list of dictionaries, each having four keys:
1343
      - name: the logical volume name,
1344
      - size: the size of the logical volume
1345
      - dev: the physical device on which the LV lives
1346
      - vg: the volume group to which it belongs
1347

1348
    In case of errors, we return an empty list and log the
1349
    error.
1350

1351
    Note that since a logical volume can live on multiple physical
1352
    volumes, the resulting list might include a logical volume
1353
    multiple times.
1354

1355
  """
1356
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
1357
                         "--separator=|",
1358
                         "--options=lv_name,lv_size,devices,vg_name"])
1359
  if result.failed:
1360
    _Fail("Failed to list logical volumes, lvs output: %s",
1361
          result.output)
1362

    
1363
  def parse_dev(dev):
1364
    return dev.split("(")[0]
1365

    
1366
  def handle_dev(dev):
1367
    return [parse_dev(x) for x in dev.split(",")]
1368

    
1369
  def map_line(line):
1370
    line = [v.strip() for v in line]
1371
    return [{"name": line[0], "size": line[1],
1372
             "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
1373

    
1374
  all_devs = []
1375
  for line in result.stdout.splitlines():
1376
    if line.count("|") >= 3:
1377
      all_devs.extend(map_line(line.split("|")))
1378
    else:
1379
      logging.warning("Strange line in the output from lvs: '%s'", line)
1380
  return all_devs
1381

    
1382

    
1383
def BridgesExist(bridges_list):
1384
  """Check if a list of bridges exist on the current node.
1385

1386
  @rtype: boolean
1387
  @return: C{True} if all of them exist, C{False} otherwise
1388

1389
  """
1390
  missing = []
1391
  for bridge in bridges_list:
1392
    if not utils.BridgeExists(bridge):
1393
      missing.append(bridge)
1394

    
1395
  if missing:
1396
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
1397

    
1398

    
1399
def GetInstanceListForHypervisor(hname, hvparams=None,
1400
                                 get_hv_fn=hypervisor.GetHypervisor):
1401
  """Provides a list of instances of the given hypervisor.
1402

1403
  @type hname: string
1404
  @param hname: name of the hypervisor
1405
  @type hvparams: dict of strings
1406
  @param hvparams: hypervisor parameters for the given hypervisor
1407
  @type get_hv_fn: function
1408
  @param get_hv_fn: function that returns a hypervisor for the given hypervisor
1409
    name; optional parameter to increase testability
1410

1411
  @rtype: list
1412
  @return: a list of all running instances on the current node
1413
    - instance1.example.com
1414
    - instance2.example.com
1415

1416
  """
1417
  results = []
1418
  try:
1419
    hv = get_hv_fn(hname)
1420
    names = hv.ListInstances(hvparams=hvparams)
1421
    results.extend(names)
1422
  except errors.HypervisorError, err:
1423
    _Fail("Error enumerating instances (hypervisor %s): %s",
1424
          hname, err, exc=True)
1425
  return results
1426

    
1427

    
1428
def GetInstanceList(hypervisor_list, all_hvparams=None,
1429
                    get_hv_fn=hypervisor.GetHypervisor):
1430
  """Provides a list of instances.
1431

1432
  @type hypervisor_list: list
1433
  @param hypervisor_list: the list of hypervisors to query information
1434
  @type all_hvparams: dict of dict of strings
1435
  @param all_hvparams: a dictionary mapping hypervisor types to respective
1436
    cluster-wide hypervisor parameters
1437
  @type get_hv_fn: function
1438
  @param get_hv_fn: function that returns a hypervisor for the given hypervisor
1439
    name; optional parameter to increase testability
1440

1441
  @rtype: list
1442
  @return: a list of all running instances on the current node
1443
    - instance1.example.com
1444
    - instance2.example.com
1445

1446
  """
1447
  results = []
1448
  for hname in hypervisor_list:
1449
    hvparams = all_hvparams[hname]
1450
    results.extend(GetInstanceListForHypervisor(hname, hvparams=hvparams,
1451
                                                get_hv_fn=get_hv_fn))
1452
  return results
1453

    
1454

    
1455
def GetInstanceInfo(instance, hname, hvparams=None):
1456
  """Gives back the information about an instance as a dictionary.
1457

1458
  @type instance: string
1459
  @param instance: the instance name
1460
  @type hname: string
1461
  @param hname: the hypervisor type of the instance
1462
  @type hvparams: dict of strings
1463
  @param hvparams: the instance's hvparams
1464

1465
  @rtype: dict
1466
  @return: dictionary with the following keys:
1467
      - memory: memory size of instance (int)
1468
      - state: state of instance (HvInstanceState)
1469
      - time: cpu time of instance (float)
1470
      - vcpus: the number of vcpus (int)
1471

1472
  """
1473
  output = {}
1474

    
1475
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance,
1476
                                                          hvparams=hvparams)
1477
  if iinfo is not None:
1478
    output["memory"] = iinfo[2]
1479
    output["vcpus"] = iinfo[3]
1480
    output["state"] = iinfo[4]
1481
    output["time"] = iinfo[5]
1482

    
1483
  return output
1484

    
1485

    
1486
def GetInstanceMigratable(instance):
1487
  """Computes whether an instance can be migrated.
1488

1489
  @type instance: L{objects.Instance}
1490
  @param instance: object representing the instance to be checked.
1491

1492
  @rtype: tuple
1493
  @return: tuple of (result, description) where:
1494
      - result: whether the instance can be migrated or not
1495
      - description: a description of the issue, if relevant
1496

1497
  """
1498
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1499
  iname = instance.name
1500
  if iname not in hyper.ListInstances(instance.hvparams):
1501
    _Fail("Instance %s is not running", iname)
1502

    
1503
  for idx in range(len(instance.disks)):
1504
    link_name = _GetBlockDevSymlinkPath(iname, idx)
1505
    if not os.path.islink(link_name):
1506
      logging.warning("Instance %s is missing symlink %s for disk %d",
1507
                      iname, link_name, idx)
1508

    
1509

    
1510
def GetAllInstancesInfo(hypervisor_list, all_hvparams):
1511
  """Gather data about all instances.
1512

1513
  This is the equivalent of L{GetInstanceInfo}, except that it
1514
  computes data for all instances at once, thus being faster if one
1515
  needs data about more than one instance.
1516

1517
  @type hypervisor_list: list
1518
  @param hypervisor_list: list of hypervisors to query for instance data
1519
  @type all_hvparams: dict of dict of strings
1520
  @param all_hvparams: mapping of hypervisor names to hvparams
1521

1522
  @rtype: dict
1523
  @return: dictionary of instance: data, with data having the following keys:
1524
      - memory: memory size of instance (int)
1525
      - state: xen state of instance (string)
1526
      - time: cpu time of instance (float)
1527
      - vcpus: the number of vcpus
1528

1529
  """
1530
  output = {}
1531
  for hname in hypervisor_list:
1532
    hvparams = all_hvparams[hname]
1533
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo(hvparams)
1534
    if iinfo:
1535
      for name, _, memory, vcpus, state, times in iinfo:
1536
        value = {
1537
          "memory": memory,
1538
          "vcpus": vcpus,
1539
          "state": state,
1540
          "time": times,
1541
          }
1542
        if name in output:
1543
          # we only check static parameters, like memory and vcpus,
1544
          # and not state and time which can change between the
1545
          # invocations of the different hypervisors
1546
          for key in "memory", "vcpus":
1547
            if value[key] != output[name][key]:
1548
              _Fail("Instance %s is running twice"
1549
                    " with different parameters", name)
1550
        output[name] = value
1551

    
1552
  return output
1553

    
1554

    
1555
def GetInstanceConsoleInfo(instance_param_dict,
1556
                           get_hv_fn=hypervisor.GetHypervisor):
1557
  """Gather data about the console access of a set of instances of this node.
1558

1559
  This function assumes that the caller already knows which instances are on
1560
  this node, by calling a function such as L{GetAllInstancesInfo} or
1561
  L{GetInstanceList}.
1562

1563
  For every instance, a large amount of configuration data needs to be
1564
  provided to the hypervisor interface in order to receive the console
1565
  information. Whether this could or should be cut down can be discussed.
1566
  The information is provided in a dictionary indexed by instance name,
1567
  allowing any number of instance queries to be done.
1568

1569
  @type instance_param_dict: dict of string to tuple of dictionaries, where the
1570
    dictionaries represent: L{objects.Instance}, L{objects.Node},
1571
    L{objects.NodeGroup}, HvParams, BeParams
1572
  @param instance_param_dict: mapping of instance name to parameters necessary
1573
    for console information retrieval
1574

1575
  @rtype: dict
1576
  @return: dictionary of instance: data, with data having the following keys:
1577
      - instance: instance name
1578
      - kind: console kind
1579
      - message: used with kind == CONS_MESSAGE, indicates console to be
1580
                 unavailable, supplies error message
1581
      - host: host to connect to
1582
      - port: port to use
1583
      - user: user for login
1584
      - command: the command, broken into parts as an array
1585
      - display: unknown, potentially unused?
1586

1587
  """
1588

    
1589
  output = {}
1590
  for inst_name in instance_param_dict:
1591
    instance = instance_param_dict[inst_name]["instance"]
1592
    pnode = instance_param_dict[inst_name]["node"]
1593
    group = instance_param_dict[inst_name]["group"]
1594
    hvparams = instance_param_dict[inst_name]["hvParams"]
1595
    beparams = instance_param_dict[inst_name]["beParams"]
1596

    
1597
    instance = objects.Instance.FromDict(instance)
1598
    pnode = objects.Node.FromDict(pnode)
1599
    group = objects.NodeGroup.FromDict(group)
1600

    
1601
    h = get_hv_fn(instance.hypervisor)
1602
    output[inst_name] = h.GetInstanceConsole(instance, pnode, group,
1603
                                             hvparams, beparams).ToDict()
1604

    
1605
  return output
1606

    
1607

    
1608
def _InstanceLogName(kind, os_name, instance, component):
1609
  """Compute the OS log filename for a given instance and operation.
1610

1611
  The instance name and os name are passed in as strings since not all
1612
  operations have these as part of an instance object.
1613

1614
  @type kind: string
1615
  @param kind: the operation type (e.g. add, import, etc.)
1616
  @type os_name: string
1617
  @param os_name: the os name
1618
  @type instance: string
1619
  @param instance: the name of the instance being imported/added/etc.
1620
  @type component: string or None
1621
  @param component: the name of the component of the instance being
1622
      transferred
1623

1624
  """
1625
  # TODO: Use tempfile.mkstemp to create unique filename
1626
  if component:
1627
    assert "/" not in component
1628
    c_msg = "-%s" % component
1629
  else:
1630
    c_msg = ""
1631
  base = ("%s-%s-%s%s-%s.log" %
1632
          (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1633
  return utils.PathJoin(pathutils.LOG_OS_DIR, base)
1634

    
1635

    
1636
def InstanceOsAdd(instance, reinstall, debug):
1637
  """Add an OS to an instance.
1638

1639
  @type instance: L{objects.Instance}
1640
  @param instance: Instance whose OS is to be installed
1641
  @type reinstall: boolean
1642
  @param reinstall: whether this is an instance reinstall
1643
  @type debug: integer
1644
  @param debug: debug level, passed to the OS scripts
1645
  @rtype: None
1646

1647
  """
1648
  inst_os = OSFromDisk(instance.os)
1649

    
1650
  create_env = OSEnvironment(instance, inst_os, debug)
1651
  if reinstall:
1652
    create_env["INSTANCE_REINSTALL"] = "1"
1653

    
1654
  logfile = _InstanceLogName("add", instance.os, instance.name, None)
1655

    
1656
  result = utils.RunCmd([inst_os.create_script], env=create_env,
1657
                        cwd=inst_os.path, output=logfile, reset_env=True)
1658
  if result.failed:
1659
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
1660
                  " output: %s", result.cmd, result.fail_reason, logfile,
1661
                  result.output)
1662
    lines = [utils.SafeEncode(val)
1663
             for val in utils.TailFile(logfile, lines=20)]
1664
    _Fail("OS create script failed (%s), last lines in the"
1665
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1666

    
1667

    
1668
def RunRenameInstance(instance, old_name, debug):
1669
  """Run the OS rename script for an instance.
1670

1671
  @type instance: L{objects.Instance}
1672
  @param instance: Instance whose OS is to be installed
1673
  @type old_name: string
1674
  @param old_name: previous instance name
1675
  @type debug: integer
1676
  @param debug: debug level, passed to the OS scripts
1677
  @rtype: boolean
1678
  @return: the success of the operation
1679

1680
  """
1681
  inst_os = OSFromDisk(instance.os)
1682

    
1683
  rename_env = OSEnvironment(instance, inst_os, debug)
1684
  rename_env["OLD_INSTANCE_NAME"] = old_name
1685

    
1686
  logfile = _InstanceLogName("rename", instance.os,
1687
                             "%s-%s" % (old_name, instance.name), None)
1688

    
1689
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1690
                        cwd=inst_os.path, output=logfile, reset_env=True)
1691

    
1692
  if result.failed:
1693
    logging.error("os create command '%s' returned error: %s output: %s",
1694
                  result.cmd, result.fail_reason, result.output)
1695
    lines = [utils.SafeEncode(val)
1696
             for val in utils.TailFile(logfile, lines=20)]
1697
    _Fail("OS rename script failed (%s), last lines in the"
1698
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1699

    
1700

    
1701
def _GetBlockDevSymlinkPath(instance_name, idx, _dir=None):
1702
  """Returns symlink path for block device.
1703

1704
  """
1705
  if _dir is None:
1706
    _dir = pathutils.DISK_LINKS_DIR
1707

    
1708
  return utils.PathJoin(_dir,
1709
                        ("%s%s%s" %
1710
                         (instance_name, constants.DISK_SEPARATOR, idx)))
1711

    
1712

    
1713
def _SymlinkBlockDev(instance_name, device_path, idx):
1714
  """Set up symlinks to a instance's block device.
1715

1716
  This is an auxiliary function run when an instance is start (on the primary
1717
  node) or when an instance is migrated (on the target node).
1718

1719

1720
  @param instance_name: the name of the target instance
1721
  @param device_path: path of the physical block device, on the node
1722
  @param idx: the disk index
1723
  @return: absolute path to the disk's symlink
1724

1725
  """
1726
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1727
  try:
1728
    os.symlink(device_path, link_name)
1729
  except OSError, err:
1730
    if err.errno == errno.EEXIST:
1731
      if (not os.path.islink(link_name) or
1732
          os.readlink(link_name) != device_path):
1733
        os.remove(link_name)
1734
        os.symlink(device_path, link_name)
1735
    else:
1736
      raise
1737

    
1738
  return link_name
1739

    
1740

    
1741
def _RemoveBlockDevLinks(instance_name, disks):
1742
  """Remove the block device symlinks belonging to the given instance.
1743

1744
  """
1745
  for idx, _ in enumerate(disks):
1746
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1747
    if os.path.islink(link_name):
1748
      try:
1749
        os.remove(link_name)
1750
      except OSError:
1751
        logging.exception("Can't remove symlink '%s'", link_name)
1752

    
1753

    
1754
def _CalculateDeviceURI(instance, disk, device):
1755
  """Get the URI for the device.
1756

1757
  @type instance: L{objects.Instance}
1758
  @param instance: the instance which disk belongs to
1759
  @type disk: L{objects.Disk}
1760
  @param disk: the target disk object
1761
  @type device: L{bdev.BlockDev}
1762
  @param device: the corresponding BlockDevice
1763
  @rtype: string
1764
  @return: the device uri if any else None
1765

1766
  """
1767
  access_mode = disk.params.get(constants.LDP_ACCESS,
1768
                                constants.DISK_KERNELSPACE)
1769
  if access_mode == constants.DISK_USERSPACE:
1770
    # This can raise errors.BlockDeviceError
1771
    return device.GetUserspaceAccessUri(instance.hypervisor)
1772
  else:
1773
    return None
1774

    
1775

    
1776
def _GatherAndLinkBlockDevs(instance):
1777
  """Set up an instance's block device(s).
1778

1779
  This is run on the primary node at instance startup. The block
1780
  devices must be already assembled.
1781

1782
  @type instance: L{objects.Instance}
1783
  @param instance: the instance whose disks we should assemble
1784
  @rtype: list
1785
  @return: list of (disk_object, link_name, drive_uri)
1786

1787
  """
1788
  block_devices = []
1789
  for idx, disk in enumerate(instance.disks):
1790
    device = _RecursiveFindBD(disk)
1791
    if device is None:
1792
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1793
                                    str(disk))
1794
    device.Open()
1795
    try:
1796
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1797
    except OSError, e:
1798
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1799
                                    e.strerror)
1800
    uri = _CalculateDeviceURI(instance, disk, device)
1801

    
1802
    block_devices.append((disk, link_name, uri))
1803

    
1804
  return block_devices
1805

    
1806

    
1807
def StartInstance(instance, startup_paused, reason, store_reason=True):
1808
  """Start an instance.
1809

1810
  @type instance: L{objects.Instance}
1811
  @param instance: the instance object
1812
  @type startup_paused: bool
1813
  @param instance: pause instance at startup?
1814
  @type reason: list of reasons
1815
  @param reason: the reason trail for this startup
1816
  @type store_reason: boolean
1817
  @param store_reason: whether to store the shutdown reason trail on file
1818
  @rtype: None
1819

1820
  """
1821
  running_instances = GetInstanceListForHypervisor(instance.hypervisor,
1822
                                                   instance.hvparams)
1823

    
1824
  if instance.name in running_instances:
1825
    logging.info("Instance %s already running, not starting", instance.name)
1826
    return
1827

    
1828
  try:
1829
    block_devices = _GatherAndLinkBlockDevs(instance)
1830
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1831
    hyper.StartInstance(instance, block_devices, startup_paused)
1832
    if store_reason:
1833
      _StoreInstReasonTrail(instance.name, reason)
1834
  except errors.BlockDeviceError, err:
1835
    _Fail("Block device error: %s", err, exc=True)
1836
  except errors.HypervisorError, err:
1837
    _RemoveBlockDevLinks(instance.name, instance.disks)
1838
    _Fail("Hypervisor error: %s", err, exc=True)
1839

    
1840

    
1841
def InstanceShutdown(instance, timeout, reason, store_reason=True):
1842
  """Shut an instance down.
1843

1844
  @note: this functions uses polling with a hardcoded timeout.
1845

1846
  @type instance: L{objects.Instance}
1847
  @param instance: the instance object
1848
  @type timeout: integer
1849
  @param timeout: maximum timeout for soft shutdown
1850
  @type reason: list of reasons
1851
  @param reason: the reason trail for this shutdown
1852
  @type store_reason: boolean
1853
  @param store_reason: whether to store the shutdown reason trail on file
1854
  @rtype: None
1855

1856
  """
1857
  hv_name = instance.hypervisor
1858
  hyper = hypervisor.GetHypervisor(hv_name)
1859
  iname = instance.name
1860

    
1861
  if instance.name not in hyper.ListInstances(instance.hvparams):
1862
    logging.info("Instance %s not running, doing nothing", iname)
1863
    return
1864

    
1865
  class _TryShutdown:
1866
    def __init__(self):
1867
      self.tried_once = False
1868

    
1869
    def __call__(self):
1870
      if iname not in hyper.ListInstances(instance.hvparams):
1871
        return
1872

    
1873
      try:
1874
        hyper.StopInstance(instance, retry=self.tried_once)
1875
        if store_reason:
1876
          _StoreInstReasonTrail(instance.name, reason)
1877
      except errors.HypervisorError, err:
1878
        if iname not in hyper.ListInstances(instance.hvparams):
1879
          # if the instance is no longer existing, consider this a
1880
          # success and go to cleanup
1881
          return
1882

    
1883
        _Fail("Failed to stop instance %s: %s", iname, err)
1884

    
1885
      self.tried_once = True
1886

    
1887
      raise utils.RetryAgain()
1888

    
1889
  try:
1890
    utils.Retry(_TryShutdown(), 5, timeout)
1891
  except utils.RetryTimeout:
1892
    # the shutdown did not succeed
1893
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1894

    
1895
    try:
1896
      hyper.StopInstance(instance, force=True)
1897
    except errors.HypervisorError, err:
1898
      if iname in hyper.ListInstances(instance.hvparams):
1899
        # only raise an error if the instance still exists, otherwise
1900
        # the error could simply be "instance ... unknown"!
1901
        _Fail("Failed to force stop instance %s: %s", iname, err)
1902

    
1903
    time.sleep(1)
1904

    
1905
    if iname in hyper.ListInstances(instance.hvparams):
1906
      _Fail("Could not shutdown instance %s even by destroy", iname)
1907

    
1908
  try:
1909
    hyper.CleanupInstance(instance.name)
1910
  except errors.HypervisorError, err:
1911
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1912

    
1913
  _RemoveBlockDevLinks(iname, instance.disks)
1914

    
1915

    
1916
def InstanceReboot(instance, reboot_type, shutdown_timeout, reason):
1917
  """Reboot an instance.
1918

1919
  @type instance: L{objects.Instance}
1920
  @param instance: the instance object to reboot
1921
  @type reboot_type: str
1922
  @param reboot_type: the type of reboot, one the following
1923
    constants:
1924
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1925
        instance OS, do not recreate the VM
1926
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1927
        restart the VM (at the hypervisor level)
1928
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1929
        not accepted here, since that mode is handled differently, in
1930
        cmdlib, and translates into full stop and start of the
1931
        instance (instead of a call_instance_reboot RPC)
1932
  @type shutdown_timeout: integer
1933
  @param shutdown_timeout: maximum timeout for soft shutdown
1934
  @type reason: list of reasons
1935
  @param reason: the reason trail for this reboot
1936
  @rtype: None
1937

1938
  """
1939
  running_instances = GetInstanceListForHypervisor(instance.hypervisor,
1940
                                                   instance.hvparams)
1941

    
1942
  if instance.name not in running_instances:
1943
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1944

    
1945
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1946
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1947
    try:
1948
      hyper.RebootInstance(instance)
1949
    except errors.HypervisorError, err:
1950
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1951
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1952
    try:
1953
      InstanceShutdown(instance, shutdown_timeout, reason, store_reason=False)
1954
      result = StartInstance(instance, False, reason, store_reason=False)
1955
      _StoreInstReasonTrail(instance.name, reason)
1956
      return result
1957
    except errors.HypervisorError, err:
1958
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1959
  else:
1960
    _Fail("Invalid reboot_type received: %s", reboot_type)
1961

    
1962

    
1963
def InstanceBalloonMemory(instance, memory):
1964
  """Resize an instance's memory.
1965

1966
  @type instance: L{objects.Instance}
1967
  @param instance: the instance object
1968
  @type memory: int
1969
  @param memory: new memory amount in MB
1970
  @rtype: None
1971

1972
  """
1973
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1974
  running = hyper.ListInstances(instance.hvparams)
1975
  if instance.name not in running:
1976
    logging.info("Instance %s is not running, cannot balloon", instance.name)
1977
    return
1978
  try:
1979
    hyper.BalloonInstanceMemory(instance, memory)
1980
  except errors.HypervisorError, err:
1981
    _Fail("Failed to balloon instance memory: %s", err, exc=True)
1982

    
1983

    
1984
def MigrationInfo(instance):
1985
  """Gather information about an instance to be migrated.
1986

1987
  @type instance: L{objects.Instance}
1988
  @param instance: the instance definition
1989

1990
  """
1991
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1992
  try:
1993
    info = hyper.MigrationInfo(instance)
1994
  except errors.HypervisorError, err:
1995
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1996
  return info
1997

    
1998

    
1999
def AcceptInstance(instance, info, target):
2000
  """Prepare the node to accept an instance.
2001

2002
  @type instance: L{objects.Instance}
2003
  @param instance: the instance definition
2004
  @type info: string/data (opaque)
2005
  @param info: migration information, from the source node
2006
  @type target: string
2007
  @param target: target host (usually ip), on this node
2008

2009
  """
2010
  # TODO: why is this required only for DTS_EXT_MIRROR?
2011
  if instance.disk_template in constants.DTS_EXT_MIRROR:
2012
    # Create the symlinks, as the disks are not active
2013
    # in any way
2014
    try:
2015
      _GatherAndLinkBlockDevs(instance)
2016
    except errors.BlockDeviceError, err:
2017
      _Fail("Block device error: %s", err, exc=True)
2018

    
2019
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2020
  try:
2021
    hyper.AcceptInstance(instance, info, target)
2022
  except errors.HypervisorError, err:
2023
    if instance.disk_template in constants.DTS_EXT_MIRROR:
2024
      _RemoveBlockDevLinks(instance.name, instance.disks)
2025
    _Fail("Failed to accept instance: %s", err, exc=True)
2026

    
2027

    
2028
def FinalizeMigrationDst(instance, info, success):
2029
  """Finalize any preparation to accept an instance.
2030

2031
  @type instance: L{objects.Instance}
2032
  @param instance: the instance definition
2033
  @type info: string/data (opaque)
2034
  @param info: migration information, from the source node
2035
  @type success: boolean
2036
  @param success: whether the migration was a success or a failure
2037

2038
  """
2039
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2040
  try:
2041
    hyper.FinalizeMigrationDst(instance, info, success)
2042
  except errors.HypervisorError, err:
2043
    _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
2044

    
2045

    
2046
def MigrateInstance(cluster_name, instance, target, live):
2047
  """Migrates an instance to another node.
2048

2049
  @type cluster_name: string
2050
  @param cluster_name: name of the cluster
2051
  @type instance: L{objects.Instance}
2052
  @param instance: the instance definition
2053
  @type target: string
2054
  @param target: the target node name
2055
  @type live: boolean
2056
  @param live: whether the migration should be done live or not (the
2057
      interpretation of this parameter is left to the hypervisor)
2058
  @raise RPCFail: if migration fails for some reason
2059

2060
  """
2061
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2062

    
2063
  try:
2064
    hyper.MigrateInstance(cluster_name, instance, target, live)
2065
  except errors.HypervisorError, err:
2066
    _Fail("Failed to migrate instance: %s", err, exc=True)
2067

    
2068

    
2069
def FinalizeMigrationSource(instance, success, live):
2070
  """Finalize the instance migration on the source node.
2071

2072
  @type instance: L{objects.Instance}
2073
  @param instance: the instance definition of the migrated instance
2074
  @type success: bool
2075
  @param success: whether the migration succeeded or not
2076
  @type live: bool
2077
  @param live: whether the user requested a live migration or not
2078
  @raise RPCFail: If the execution fails for some reason
2079

2080
  """
2081
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2082

    
2083
  try:
2084
    hyper.FinalizeMigrationSource(instance, success, live)
2085
  except Exception, err:  # pylint: disable=W0703
2086
    _Fail("Failed to finalize the migration on the source node: %s", err,
2087
          exc=True)
2088

    
2089

    
2090
def GetMigrationStatus(instance):
2091
  """Get the migration status
2092

2093
  @type instance: L{objects.Instance}
2094
  @param instance: the instance that is being migrated
2095
  @rtype: L{objects.MigrationStatus}
2096
  @return: the status of the current migration (one of
2097
           L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
2098
           progress info that can be retrieved from the hypervisor
2099
  @raise RPCFail: If the migration status cannot be retrieved
2100

2101
  """
2102
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2103
  try:
2104
    return hyper.GetMigrationStatus(instance)
2105
  except Exception, err:  # pylint: disable=W0703
2106
    _Fail("Failed to get migration status: %s", err, exc=True)
2107

    
2108

    
2109
def HotplugDevice(instance, action, dev_type, device, extra, seq):
2110
  """Hotplug a device
2111

2112
  Hotplug is currently supported only for KVM Hypervisor.
2113
  @type instance: L{objects.Instance}
2114
  @param instance: the instance to which we hotplug a device
2115
  @type action: string
2116
  @param action: the hotplug action to perform
2117
  @type dev_type: string
2118
  @param dev_type: the device type to hotplug
2119
  @type device: either L{objects.NIC} or L{objects.Disk}
2120
  @param device: the device object to hotplug
2121
  @type extra: string
2122
  @param extra: extra info used by hotplug code (e.g. disk link)
2123
  @type seq: int
2124
  @param seq: the index of the device from master perspective
2125
  @raise RPCFail: in case instance does not have KVM hypervisor
2126

2127
  """
2128
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2129
  try:
2130
    hyper.VerifyHotplugSupport(instance, action, dev_type)
2131
  except errors.HotplugError, err:
2132
    _Fail("Hotplug is not supported: %s", err)
2133

    
2134
  if action == constants.HOTPLUG_ACTION_ADD:
2135
    fn = hyper.HotAddDevice
2136
  elif action == constants.HOTPLUG_ACTION_REMOVE:
2137
    fn = hyper.HotDelDevice
2138
  elif action == constants.HOTPLUG_ACTION_MODIFY:
2139
    fn = hyper.HotModDevice
2140
  else:
2141
    assert action in constants.HOTPLUG_ALL_ACTIONS
2142

    
2143
  return fn(instance, dev_type, device, extra, seq)
2144

    
2145

    
2146
def HotplugSupported(instance):
2147
  """Checks if hotplug is generally supported.
2148

2149
  """
2150
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2151
  try:
2152
    hyper.HotplugSupported(instance)
2153
  except errors.HotplugError, err:
2154
    _Fail("Hotplug is not supported: %s", err)
2155

    
2156

    
2157
def BlockdevCreate(disk, size, owner, on_primary, info, excl_stor):
2158
  """Creates a block device for an instance.
2159

2160
  @type disk: L{objects.Disk}
2161
  @param disk: the object describing the disk we should create
2162
  @type size: int
2163
  @param size: the size of the physical underlying device, in MiB
2164
  @type owner: str
2165
  @param owner: the name of the instance for which disk is created,
2166
      used for device cache data
2167
  @type on_primary: boolean
2168
  @param on_primary:  indicates if it is the primary node or not
2169
  @type info: string
2170
  @param info: string that will be sent to the physical device
2171
      creation, used for example to set (LVM) tags on LVs
2172
  @type excl_stor: boolean
2173
  @param excl_stor: Whether exclusive_storage is active
2174

2175
  @return: the new unique_id of the device (this can sometime be
2176
      computed only after creation), or None. On secondary nodes,
2177
      it's not required to return anything.
2178

2179
  """
2180
  # TODO: remove the obsolete "size" argument
2181
  # pylint: disable=W0613
2182
  clist = []
2183
  if disk.children:
2184
    for child in disk.children:
2185
      try:
2186
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
2187
      except errors.BlockDeviceError, err:
2188
        _Fail("Can't assemble device %s: %s", child, err)
2189
      if on_primary or disk.AssembleOnSecondary():
2190
        # we need the children open in case the device itself has to
2191
        # be assembled
2192
        try:
2193
          # pylint: disable=E1103
2194
          crdev.Open()
2195
        except errors.BlockDeviceError, err:
2196
          _Fail("Can't make child '%s' read-write: %s", child, err)
2197
      clist.append(crdev)
2198

    
2199
  try:
2200
    device = bdev.Create(disk, clist, excl_stor)
2201
  except errors.BlockDeviceError, err:
2202
    _Fail("Can't create block device: %s", err)
2203

    
2204
  if on_primary or disk.AssembleOnSecondary():
2205
    try:
2206
      device.Assemble()
2207
    except errors.BlockDeviceError, err:
2208
      _Fail("Can't assemble device after creation, unusual event: %s", err)
2209
    if on_primary or disk.OpenOnSecondary():
2210
      try:
2211
        device.Open(force=True)
2212
      except errors.BlockDeviceError, err:
2213
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
2214
    DevCacheManager.UpdateCache(device.dev_path, owner,
2215
                                on_primary, disk.iv_name)
2216

    
2217
  device.SetInfo(info)
2218

    
2219
  return device.unique_id
2220

    
2221

    
2222
def _WipeDevice(path, offset, size):
2223
  """This function actually wipes the device.
2224

2225
  @param path: The path to the device to wipe
2226
  @param offset: The offset in MiB in the file
2227
  @param size: The size in MiB to write
2228

2229
  """
2230
  # Internal sizes are always in Mebibytes; if the following "dd" command
2231
  # should use a different block size the offset and size given to this
2232
  # function must be adjusted accordingly before being passed to "dd".
2233
  block_size = 1024 * 1024
2234

    
2235
  cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
2236
         "bs=%s" % block_size, "oflag=direct", "of=%s" % path,
2237
         "count=%d" % size]
2238
  result = utils.RunCmd(cmd)
2239

    
2240
  if result.failed:
2241
    _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
2242
          result.fail_reason, result.output)
2243

    
2244

    
2245
def BlockdevWipe(disk, offset, size):
2246
  """Wipes a block device.
2247

2248
  @type disk: L{objects.Disk}
2249
  @param disk: the disk object we want to wipe
2250
  @type offset: int
2251
  @param offset: The offset in MiB in the file
2252
  @type size: int
2253
  @param size: The size in MiB to write
2254

2255
  """
2256
  try:
2257
    rdev = _RecursiveFindBD(disk)
2258
  except errors.BlockDeviceError:
2259
    rdev = None
2260

    
2261
  if not rdev:
2262
    _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
2263

    
2264
  # Do cross verify some of the parameters
2265
  if offset < 0:
2266
    _Fail("Negative offset")
2267
  if size < 0:
2268
    _Fail("Negative size")
2269
  if offset > rdev.size:
2270
    _Fail("Offset is bigger than device size")
2271
  if (offset + size) > rdev.size:
2272
    _Fail("The provided offset and size to wipe is bigger than device size")
2273

    
2274
  _WipeDevice(rdev.dev_path, offset, size)
2275

    
2276

    
2277
def BlockdevPauseResumeSync(disks, pause):
2278
  """Pause or resume the sync of the block device.
2279

2280
  @type disks: list of L{objects.Disk}
2281
  @param disks: the disks object we want to pause/resume
2282
  @type pause: bool
2283
  @param pause: Wheater to pause or resume
2284

2285
  """
2286
  success = []
2287
  for disk in disks:
2288
    try:
2289
      rdev = _RecursiveFindBD(disk)
2290
    except errors.BlockDeviceError:
2291
      rdev = None
2292

    
2293
    if not rdev:
2294
      success.append((False, ("Cannot change sync for device %s:"
2295
                              " device not found" % disk.iv_name)))
2296
      continue
2297

    
2298
    result = rdev.PauseResumeSync(pause)
2299

    
2300
    if result:
2301
      success.append((result, None))
2302
    else:
2303
      if pause:
2304
        msg = "Pause"
2305
      else:
2306
        msg = "Resume"
2307
      success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
2308

    
2309
  return success
2310

    
2311

    
2312
def BlockdevRemove(disk):
2313
  """Remove a block device.
2314

2315
  @note: This is intended to be called recursively.
2316

2317
  @type disk: L{objects.Disk}
2318
  @param disk: the disk object we should remove
2319
  @rtype: boolean
2320
  @return: the success of the operation
2321

2322
  """
2323
  msgs = []
2324
  try:
2325
    rdev = _RecursiveFindBD(disk)
2326
  except errors.BlockDeviceError, err:
2327
    # probably can't attach
2328
    logging.info("Can't attach to device %s in remove", disk)
2329
    rdev = None
2330
  if rdev is not None:
2331
    r_path = rdev.dev_path
2332

    
2333
    def _TryRemove():
2334
      try:
2335
        rdev.Remove()
2336
        return []
2337
      except errors.BlockDeviceError, err:
2338
        return [str(err)]
2339

    
2340
    msgs.extend(utils.SimpleRetry([], _TryRemove,
2341
                                  constants.DISK_REMOVE_RETRY_INTERVAL,
2342
                                  constants.DISK_REMOVE_RETRY_TIMEOUT))
2343

    
2344
    if not msgs:
2345
      DevCacheManager.RemoveCache(r_path)
2346

    
2347
  if disk.children:
2348
    for child in disk.children:
2349
      try:
2350
        BlockdevRemove(child)
2351
      except RPCFail, err:
2352
        msgs.append(str(err))
2353

    
2354
  if msgs:
2355
    _Fail("; ".join(msgs))
2356

    
2357

    
2358
def _RecursiveAssembleBD(disk, owner, as_primary):
2359
  """Activate a block device for an instance.
2360

2361
  This is run on the primary and secondary nodes for an instance.
2362

2363
  @note: this function is called recursively.
2364

2365
  @type disk: L{objects.Disk}
2366
  @param disk: the disk we try to assemble
2367
  @type owner: str
2368
  @param owner: the name of the instance which owns the disk
2369
  @type as_primary: boolean
2370
  @param as_primary: if we should make the block device
2371
      read/write
2372

2373
  @return: the assembled device or None (in case no device
2374
      was assembled)
2375
  @raise errors.BlockDeviceError: in case there is an error
2376
      during the activation of the children or the device
2377
      itself
2378

2379
  """
2380
  children = []
2381
  if disk.children:
2382
    mcn = disk.ChildrenNeeded()
2383
    if mcn == -1:
2384
      mcn = 0 # max number of Nones allowed
2385
    else:
2386
      mcn = len(disk.children) - mcn # max number of Nones
2387
    for chld_disk in disk.children:
2388
      try:
2389
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
2390
      except errors.BlockDeviceError, err:
2391
        if children.count(None) >= mcn:
2392
          raise
2393
        cdev = None
2394
        logging.error("Error in child activation (but continuing): %s",
2395
                      str(err))
2396
      children.append(cdev)
2397

    
2398
  if as_primary or disk.AssembleOnSecondary():
2399
    r_dev = bdev.Assemble(disk, children)
2400
    result = r_dev
2401
    if as_primary or disk.OpenOnSecondary():
2402
      r_dev.Open()
2403
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
2404
                                as_primary, disk.iv_name)
2405

    
2406
  else:
2407
    result = True
2408
  return result
2409

    
2410

    
2411
def BlockdevAssemble(disk, owner, as_primary, idx):
2412
  """Activate a block device for an instance.
2413

2414
  This is a wrapper over _RecursiveAssembleBD.
2415

2416
  @rtype: str or boolean
2417
  @return: a tuple with the C{/dev/...} path and the created symlink
2418
      for primary nodes, and (C{True}, C{True}) for secondary nodes
2419

2420
  """
2421
  try:
2422
    result = _RecursiveAssembleBD(disk, owner, as_primary)
2423
    if isinstance(result, BlockDev):
2424
      # pylint: disable=E1103
2425
      dev_path = result.dev_path
2426
      link_name = None
2427
      if as_primary:
2428
        link_name = _SymlinkBlockDev(owner, dev_path, idx)
2429
    elif result:
2430
      return result, result
2431
    else:
2432
      _Fail("Unexpected result from _RecursiveAssembleBD")
2433
  except errors.BlockDeviceError, err:
2434
    _Fail("Error while assembling disk: %s", err, exc=True)
2435
  except OSError, err:
2436
    _Fail("Error while symlinking disk: %s", err, exc=True)
2437

    
2438
  return dev_path, link_name
2439

    
2440

    
2441
def BlockdevShutdown(disk):
2442
  """Shut down a block device.
2443

2444
  First, if the device is assembled (Attach() is successful), then
2445
  the device is shutdown. Then the children of the device are
2446
  shutdown.
2447

2448
  This function is called recursively. Note that we don't cache the
2449
  children or such, as oppossed to assemble, shutdown of different
2450
  devices doesn't require that the upper device was active.
2451

2452
  @type disk: L{objects.Disk}
2453
  @param disk: the description of the disk we should
2454
      shutdown
2455
  @rtype: None
2456

2457
  """
2458
  msgs = []
2459
  r_dev = _RecursiveFindBD(disk)
2460
  if r_dev is not None:
2461
    r_path = r_dev.dev_path
2462
    try:
2463
      r_dev.Shutdown()
2464
      DevCacheManager.RemoveCache(r_path)
2465
    except errors.BlockDeviceError, err:
2466
      msgs.append(str(err))
2467

    
2468
  if disk.children:
2469
    for child in disk.children:
2470
      try:
2471
        BlockdevShutdown(child)
2472
      except RPCFail, err:
2473
        msgs.append(str(err))
2474

    
2475
  if msgs:
2476
    _Fail("; ".join(msgs))
2477

    
2478

    
2479
def BlockdevAddchildren(parent_cdev, new_cdevs):
2480
  """Extend a mirrored block device.
2481

2482
  @type parent_cdev: L{objects.Disk}
2483
  @param parent_cdev: the disk to which we should add children
2484
  @type new_cdevs: list of L{objects.Disk}
2485
  @param new_cdevs: the list of children which we should add
2486
  @rtype: None
2487

2488
  """
2489
  parent_bdev = _RecursiveFindBD(parent_cdev)
2490
  if parent_bdev is None:
2491
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
2492
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
2493
  if new_bdevs.count(None) > 0:
2494
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
2495
  parent_bdev.AddChildren(new_bdevs)
2496

    
2497

    
2498
def BlockdevRemovechildren(parent_cdev, new_cdevs):
2499
  """Shrink a mirrored block device.
2500

2501
  @type parent_cdev: L{objects.Disk}
2502
  @param parent_cdev: the disk from which we should remove children
2503
  @type new_cdevs: list of L{objects.Disk}
2504
  @param new_cdevs: the list of children which we should remove
2505
  @rtype: None
2506

2507
  """
2508
  parent_bdev = _RecursiveFindBD(parent_cdev)
2509
  if parent_bdev is None:
2510
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
2511
  devs = []
2512
  for disk in new_cdevs:
2513
    rpath = disk.StaticDevPath()
2514
    if rpath is None:
2515
      bd = _RecursiveFindBD(disk)
2516
      if bd is None:
2517
        _Fail("Can't find device %s while removing children", disk)
2518
      else:
2519
        devs.append(bd.dev_path)
2520
    else:
2521
      if not utils.IsNormAbsPath(rpath):
2522
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
2523
      devs.append(rpath)
2524
  parent_bdev.RemoveChildren(devs)
2525

    
2526

    
2527
def BlockdevGetmirrorstatus(disks):
2528
  """Get the mirroring status of a list of devices.
2529

2530
  @type disks: list of L{objects.Disk}
2531
  @param disks: the list of disks which we should query
2532
  @rtype: disk
2533
  @return: List of L{objects.BlockDevStatus}, one for each disk
2534
  @raise errors.BlockDeviceError: if any of the disks cannot be
2535
      found
2536

2537
  """
2538
  stats = []
2539
  for dsk in disks:
2540
    rbd = _RecursiveFindBD(dsk)
2541
    if rbd is None:
2542
      _Fail("Can't find device %s", dsk)
2543

    
2544
    stats.append(rbd.CombinedSyncStatus())
2545

    
2546
  return stats
2547

    
2548

    
2549
def BlockdevGetmirrorstatusMulti(disks):
2550
  """Get the mirroring status of a list of devices.
2551

2552
  @type disks: list of L{objects.Disk}
2553
  @param disks: the list of disks which we should query
2554
  @rtype: disk
2555
  @return: List of tuples, (bool, status), one for each disk; bool denotes
2556
    success/failure, status is L{objects.BlockDevStatus} on success, string
2557
    otherwise
2558

2559
  """
2560
  result = []
2561
  for disk in disks:
2562
    try:
2563
      rbd = _RecursiveFindBD(disk)
2564
      if rbd is None:
2565
        result.append((False, "Can't find device %s" % disk))
2566
        continue
2567

    
2568
      status = rbd.CombinedSyncStatus()
2569
    except errors.BlockDeviceError, err:
2570
      logging.exception("Error while getting disk status")
2571
      result.append((False, str(err)))
2572
    else:
2573
      result.append((True, status))
2574

    
2575
  assert len(disks) == len(result)
2576

    
2577
  return result
2578

    
2579

    
2580
def _RecursiveFindBD(disk):
2581
  """Check if a device is activated.
2582

2583
  If so, return information about the real device.
2584

2585
  @type disk: L{objects.Disk}
2586
  @param disk: the disk object we need to find
2587

2588
  @return: None if the device can't be found,
2589
      otherwise the device instance
2590

2591
  """
2592
  children = []
2593
  if disk.children:
2594
    for chdisk in disk.children:
2595
      children.append(_RecursiveFindBD(chdisk))
2596

    
2597
  return bdev.FindDevice(disk, children)
2598

    
2599

    
2600
def _OpenRealBD(disk):
2601
  """Opens the underlying block device of a disk.
2602

2603
  @type disk: L{objects.Disk}
2604
  @param disk: the disk object we want to open
2605

2606
  """
2607
  real_disk = _RecursiveFindBD(disk)
2608
  if real_disk is None:
2609
    _Fail("Block device '%s' is not set up", disk)
2610

    
2611
  real_disk.Open()
2612

    
2613
  return real_disk
2614

    
2615

    
2616
def BlockdevFind(disk):
2617
  """Check if a device is activated.
2618

2619
  If it is, return information about the real device.
2620

2621
  @type disk: L{objects.Disk}
2622
  @param disk: the disk to find
2623
  @rtype: None or objects.BlockDevStatus
2624
  @return: None if the disk cannot be found, otherwise a the current
2625
           information
2626

2627
  """
2628
  try:
2629
    rbd = _RecursiveFindBD(disk)
2630
  except errors.BlockDeviceError, err:
2631
    _Fail("Failed to find device: %s", err, exc=True)
2632

    
2633
  if rbd is None:
2634
    return None
2635

    
2636
  return rbd.GetSyncStatus()
2637

    
2638

    
2639
def BlockdevGetdimensions(disks):
2640
  """Computes the size of the given disks.
2641

2642
  If a disk is not found, returns None instead.
2643

2644
  @type disks: list of L{objects.Disk}
2645
  @param disks: the list of disk to compute the size for
2646
  @rtype: list
2647
  @return: list with elements None if the disk cannot be found,
2648
      otherwise the pair (size, spindles), where spindles is None if the
2649
      device doesn't support that
2650

2651
  """
2652
  result = []
2653
  for cf in disks:
2654
    try:
2655
      rbd = _RecursiveFindBD(cf)
2656
    except errors.BlockDeviceError:
2657
      result.append(None)
2658
      continue
2659
    if rbd is None:
2660
      result.append(None)
2661
    else:
2662
      result.append(rbd.GetActualDimensions())
2663
  return result
2664

    
2665

    
2666
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2667
  """Write a file to the filesystem.
2668

2669
  This allows the master to overwrite(!) a file. It will only perform
2670
  the operation if the file belongs to a list of configuration files.
2671

2672
  @type file_name: str
2673
  @param file_name: the target file name
2674
  @type data: str
2675
  @param data: the new contents of the file
2676
  @type mode: int
2677
  @param mode: the mode to give the file (can be None)
2678
  @type uid: string
2679
  @param uid: the owner of the file
2680
  @type gid: string
2681
  @param gid: the group of the file
2682
  @type atime: float
2683
  @param atime: the atime to set on the file (can be None)
2684
  @type mtime: float
2685
  @param mtime: the mtime to set on the file (can be None)
2686
  @rtype: None
2687

2688
  """
2689
  file_name = vcluster.LocalizeVirtualPath(file_name)
2690

    
2691
  if not os.path.isabs(file_name):
2692
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2693

    
2694
  if file_name not in _ALLOWED_UPLOAD_FILES:
2695
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2696
          file_name)
2697

    
2698
  raw_data = _Decompress(data)
2699

    
2700
  if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2701
    _Fail("Invalid username/groupname type")
2702

    
2703
  getents = runtime.GetEnts()
2704
  uid = getents.LookupUser(uid)
2705
  gid = getents.LookupGroup(gid)
2706

    
2707
  utils.SafeWriteFile(file_name, None,
2708
                      data=raw_data, mode=mode, uid=uid, gid=gid,
2709
                      atime=atime, mtime=mtime)
2710

    
2711

    
2712
def RunOob(oob_program, command, node, timeout):
2713
  """Executes oob_program with given command on given node.
2714

2715
  @param oob_program: The path to the executable oob_program
2716
  @param command: The command to invoke on oob_program
2717
  @param node: The node given as an argument to the program
2718
  @param timeout: Timeout after which we kill the oob program
2719

2720
  @return: stdout
2721
  @raise RPCFail: If execution fails for some reason
2722

2723
  """
2724
  result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2725

    
2726
  if result.failed:
2727
    _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2728
          result.fail_reason, result.output)
2729

    
2730
  return result.stdout
2731

    
2732

    
2733
def _OSOndiskAPIVersion(os_dir):
2734
  """Compute and return the API version of a given OS.
2735

2736
  This function will try to read the API version of the OS residing in
2737
  the 'os_dir' directory.
2738

2739
  @type os_dir: str
2740
  @param os_dir: the directory in which we should look for the OS
2741
  @rtype: tuple
2742
  @return: tuple (status, data) with status denoting the validity and
2743
      data holding either the vaid versions or an error message
2744

2745
  """
2746
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2747

    
2748
  try:
2749
    st = os.stat(api_file)
2750
  except EnvironmentError, err:
2751
    return False, ("Required file '%s' not found under path %s: %s" %
2752
                   (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err)))
2753

    
2754
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2755
    return False, ("File '%s' in %s is not a regular file" %
2756
                   (constants.OS_API_FILE, os_dir))
2757

    
2758
  try:
2759
    api_versions = utils.ReadFile(api_file).splitlines()
2760
  except EnvironmentError, err:
2761
    return False, ("Error while reading the API version file at %s: %s" %
2762
                   (api_file, utils.ErrnoOrStr(err)))
2763

    
2764
  try:
2765
    api_versions = [int(version.strip()) for version in api_versions]
2766
  except (TypeError, ValueError), err:
2767
    return False, ("API version(s) can't be converted to integer: %s" %
2768
                   str(err))
2769

    
2770
  return True, api_versions
2771

    
2772

    
2773
def DiagnoseOS(top_dirs=None):
2774
  """Compute the validity for all OSes.
2775

2776
  @type top_dirs: list
2777
  @param top_dirs: the list of directories in which to
2778
      search (if not given defaults to
2779
      L{pathutils.OS_SEARCH_PATH})
2780
  @rtype: list of L{objects.OS}
2781
  @return: a list of tuples (name, path, status, diagnose, variants,
2782
      parameters, api_version) for all (potential) OSes under all
2783
      search paths, where:
2784
          - name is the (potential) OS name
2785
          - path is the full path to the OS
2786
          - status True/False is the validity of the OS
2787
          - diagnose is the error message for an invalid OS, otherwise empty
2788
          - variants is a list of supported OS variants, if any
2789
          - parameters is a list of (name, help) parameters, if any
2790
          - api_version is a list of support OS API versions
2791

2792
  """
2793
  if top_dirs is None:
2794
    top_dirs = pathutils.OS_SEARCH_PATH
2795

    
2796
  result = []
2797
  for dir_name in top_dirs:
2798
    if os.path.isdir(dir_name):
2799
      try:
2800
        f_names = utils.ListVisibleFiles(dir_name)
2801
      except EnvironmentError, err:
2802
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2803
        break
2804
      for name in f_names:
2805
        os_path = utils.PathJoin(dir_name, name)
2806
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2807
        if status:
2808
          diagnose = ""
2809
          variants = os_inst.supported_variants
2810
          parameters = os_inst.supported_parameters
2811
          api_versions = os_inst.api_versions
2812
        else:
2813
          diagnose = os_inst
2814
          variants = parameters = api_versions = []
2815
        result.append((name, os_path, status, diagnose, variants,
2816
                       parameters, api_versions))
2817

    
2818
  return result
2819

    
2820

    
2821
def _TryOSFromDisk(name, base_dir=None):
2822
  """Create an OS instance from disk.
2823

2824
  This function will return an OS instance if the given name is a
2825
  valid OS name.
2826

2827
  @type base_dir: string
2828
  @keyword base_dir: Base directory containing OS installations.
2829
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2830
  @rtype: tuple
2831
  @return: success and either the OS instance if we find a valid one,
2832
      or error message
2833

2834
  """
2835
  if base_dir is None:
2836
    os_dir = utils.FindFile(name, pathutils.OS_SEARCH_PATH, os.path.isdir)
2837
  else:
2838
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2839

    
2840
  if os_dir is None:
2841
    return False, "Directory for OS %s not found in search path" % name
2842

    
2843
  status, api_versions = _OSOndiskAPIVersion(os_dir)
2844
  if not status:
2845
    # push the error up
2846
    return status, api_versions
2847

    
2848
  if not constants.OS_API_VERSIONS.intersection(api_versions):
2849
    return False, ("API version mismatch for path '%s': found %s, want %s." %
2850
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
2851

    
2852
  # OS Files dictionary, we will populate it with the absolute path
2853
  # names; if the value is True, then it is a required file, otherwise
2854
  # an optional one
2855
  os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2856

    
2857
  if max(api_versions) >= constants.OS_API_V15:
2858
    os_files[constants.OS_VARIANTS_FILE] = False
2859

    
2860
  if max(api_versions) >= constants.OS_API_V20:
2861
    os_files[constants.OS_PARAMETERS_FILE] = True
2862
  else:
2863
    del os_files[constants.OS_SCRIPT_VERIFY]
2864

    
2865
  for (filename, required) in os_files.items():
2866
    os_files[filename] = utils.PathJoin(os_dir, filename)
2867

    
2868
    try:
2869
      st = os.stat(os_files[filename])
2870
    except EnvironmentError, err:
2871
      if err.errno == errno.ENOENT and not required:
2872
        del os_files[filename]
2873
        continue
2874
      return False, ("File '%s' under path '%s' is missing (%s)" %
2875
                     (filename, os_dir, utils.ErrnoOrStr(err)))
2876

    
2877
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2878
      return False, ("File '%s' under path '%s' is not a regular file" %
2879
                     (filename, os_dir))
2880

    
2881
    if filename in constants.OS_SCRIPTS:
2882
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2883
        return False, ("File '%s' under path '%s' is not executable" %
2884
                       (filename, os_dir))
2885

    
2886
  variants = []
2887
  if constants.OS_VARIANTS_FILE in os_files:
2888
    variants_file = os_files[constants.OS_VARIANTS_FILE]
2889
    try:
2890
      variants = \
2891
        utils.FilterEmptyLinesAndComments(utils.ReadFile(variants_file))
2892
    except EnvironmentError, err:
2893
      # we accept missing files, but not other errors
2894
      if err.errno != errno.ENOENT:
2895
        return False, ("Error while reading the OS variants file at %s: %s" %
2896
                       (variants_file, utils.ErrnoOrStr(err)))
2897

    
2898
  parameters = []
2899
  if constants.OS_PARAMETERS_FILE in os_files:
2900
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2901
    try:
2902
      parameters = utils.ReadFile(parameters_file).splitlines()
2903
    except EnvironmentError, err:
2904
      return False, ("Error while reading the OS parameters file at %s: %s" %
2905
                     (parameters_file, utils.ErrnoOrStr(err)))
2906
    parameters = [v.split(None, 1) for v in parameters]
2907

    
2908
  os_obj = objects.OS(name=name, path=os_dir,
2909
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
2910
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
2911
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
2912
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
2913
                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2914
                                                 None),
2915
                      supported_variants=variants,
2916
                      supported_parameters=parameters,
2917
                      api_versions=api_versions)
2918
  return True, os_obj
2919

    
2920

    
2921
def OSFromDisk(name, base_dir=None):
2922
  """Create an OS instance from disk.
2923

2924
  This function will return an OS instance if the given name is a
2925
  valid OS name. Otherwise, it will raise an appropriate
2926
  L{RPCFail} exception, detailing why this is not a valid OS.
2927

2928
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2929
  an exception but returns true/false status data.
2930

2931
  @type base_dir: string
2932
  @keyword base_dir: Base directory containing OS installations.
2933
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2934
  @rtype: L{objects.OS}
2935
  @return: the OS instance if we find a valid one
2936
  @raise RPCFail: if we don't find a valid OS
2937

2938
  """
2939
  name_only = objects.OS.GetName(name)
2940
  status, payload = _TryOSFromDisk(name_only, base_dir)
2941

    
2942
  if not status:
2943
    _Fail(payload)
2944

    
2945
  return payload
2946

    
2947

    
2948
def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2949
  """Calculate the basic environment for an os script.
2950

2951
  @type os_name: str
2952
  @param os_name: full operating system name (including variant)
2953
  @type inst_os: L{objects.OS}
2954
  @param inst_os: operating system for which the environment is being built
2955
  @type os_params: dict
2956
  @param os_params: the OS parameters
2957
  @type debug: integer
2958
  @param debug: debug level (0 or 1, for OS Api 10)
2959
  @rtype: dict
2960
  @return: dict of environment variables
2961
  @raise errors.BlockDeviceError: if the block device
2962
      cannot be found
2963

2964
  """
2965
  result = {}
2966
  api_version = \
2967
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2968
  result["OS_API_VERSION"] = "%d" % api_version
2969
  result["OS_NAME"] = inst_os.name
2970
  result["DEBUG_LEVEL"] = "%d" % debug
2971

    
2972
  # OS variants
2973
  if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2974
    variant = objects.OS.GetVariant(os_name)
2975
    if not variant:
2976
      variant = inst_os.supported_variants[0]
2977
  else:
2978
    variant = ""
2979
  result["OS_VARIANT"] = variant
2980

    
2981
  # OS params
2982
  for pname, pvalue in os_params.items():
2983
    result["OSP_%s" % pname.upper()] = pvalue
2984

    
2985
  # Set a default path otherwise programs called by OS scripts (or
2986
  # even hooks called from OS scripts) might break, and we don't want
2987
  # to have each script require setting a PATH variable
2988
  result["PATH"] = constants.HOOKS_PATH
2989

    
2990
  return result
2991

    
2992

    
2993
def OSEnvironment(instance, inst_os, debug=0):
2994
  """Calculate the environment for an os script.
2995

2996
  @type instance: L{objects.Instance}
2997
  @param instance: target instance for the os script run
2998
  @type inst_os: L{objects.OS}
2999
  @param inst_os: operating system for which the environment is being built
3000
  @type debug: integer
3001
  @param debug: debug level (0 or 1, for OS Api 10)
3002
  @rtype: dict
3003
  @return: dict of environment variables
3004
  @raise errors.BlockDeviceError: if the block device
3005
      cannot be found
3006

3007
  """
3008
  result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
3009

    
3010
  for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
3011
    result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
3012

    
3013
  result["HYPERVISOR"] = instance.hypervisor
3014
  result["DISK_COUNT"] = "%d" % len(instance.disks)
3015
  result["NIC_COUNT"] = "%d" % len(instance.nics)
3016
  result["INSTANCE_SECONDARY_NODES"] = \
3017
      ("%s" % " ".join(instance.secondary_nodes))
3018

    
3019
  # Disks
3020
  for idx, disk in enumerate(instance.disks):
3021
    real_disk = _OpenRealBD(disk)
3022
    result["DISK_%d_PATH" % idx] = real_disk.dev_path
3023
    result["DISK_%d_ACCESS" % idx] = disk.mode
3024
    result["DISK_%d_UUID" % idx] = disk.uuid
3025
    if disk.name:
3026
      result["DISK_%d_NAME" % idx] = disk.name
3027
    if constants.HV_DISK_TYPE in instance.hvparams:
3028
      result["DISK_%d_FRONTEND_TYPE" % idx] = \
3029
        instance.hvparams[constants.HV_DISK_TYPE]
3030
    if disk.dev_type in constants.DTS_BLOCK:
3031
      result["DISK_%d_BACKEND_TYPE" % idx] = "block"
3032
    elif disk.dev_type in constants.DTS_FILEBASED:
3033
      result["DISK_%d_BACKEND_TYPE" % idx] = \
3034
        "file:%s" % disk.logical_id[0]
3035

    
3036
  # NICs
3037
  for idx, nic in enumerate(instance.nics):
3038
    result["NIC_%d_MAC" % idx] = nic.mac
3039
    result["NIC_%d_UUID" % idx] = nic.uuid
3040
    if nic.name:
3041
      result["NIC_%d_NAME" % idx] = nic.name
3042
    if nic.ip:
3043
      result["NIC_%d_IP" % idx] = nic.ip
3044
    result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
3045
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3046
      result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
3047
    if nic.nicparams[constants.NIC_LINK]:
3048
      result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
3049
    if nic.netinfo:
3050
      nobj = objects.Network.FromDict(nic.netinfo)
3051
      result.update(nobj.HooksDict("NIC_%d_" % idx))
3052
    if constants.HV_NIC_TYPE in instance.hvparams:
3053
      result["NIC_%d_FRONTEND_TYPE" % idx] = \
3054
        instance.hvparams[constants.HV_NIC_TYPE]
3055

    
3056
  # HV/BE params
3057
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
3058
    for key, value in source.items():
3059
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
3060

    
3061
  return result
3062

    
3063

    
3064
def DiagnoseExtStorage(top_dirs=None):
3065
  """Compute the validity for all ExtStorage Providers.
3066

3067
  @type top_dirs: list
3068
  @param top_dirs: the list of directories in which to
3069
      search (if not given defaults to
3070
      L{pathutils.ES_SEARCH_PATH})
3071
  @rtype: list of L{objects.ExtStorage}
3072
  @return: a list of tuples (name, path, status, diagnose, parameters)
3073
      for all (potential) ExtStorage Providers under all
3074
      search paths, where:
3075
          - name is the (potential) ExtStorage Provider
3076
          - path is the full path to the ExtStorage Provider
3077
          - status True/False is the validity of the ExtStorage Provider
3078
          - diagnose is the error message for an invalid ExtStorage Provider,
3079
            otherwise empty
3080
          - parameters is a list of (name, help) parameters, if any
3081

3082
  """
3083
  if top_dirs is None:
3084
    top_dirs = pathutils.ES_SEARCH_PATH
3085

    
3086
  result = []
3087
  for dir_name in top_dirs:
3088
    if os.path.isdir(dir_name):
3089
      try:
3090
        f_names = utils.ListVisibleFiles(dir_name)
3091
      except EnvironmentError, err:
3092
        logging.exception("Can't list the ExtStorage directory %s: %s",
3093
                          dir_name, err)
3094
        break
3095
      for name in f_names:
3096
        es_path = utils.PathJoin(dir_name, name)
3097
        status, es_inst = bdev.ExtStorageFromDisk(name, base_dir=dir_name)
3098
        if status:
3099
          diagnose = ""
3100
          parameters = es_inst.supported_parameters
3101
        else:
3102
          diagnose = es_inst
3103
          parameters = []
3104
        result.append((name, es_path, status, diagnose, parameters))
3105

    
3106
  return result
3107

    
3108

    
3109
def BlockdevGrow(disk, amount, dryrun, backingstore, excl_stor):
3110
  """Grow a stack of block devices.
3111

3112
  This function is called recursively, with the childrens being the
3113
  first ones to resize.
3114

3115
  @type disk: L{objects.Disk}
3116
  @param disk: the disk to be grown
3117
  @type amount: integer
3118
  @param amount: the amount (in mebibytes) to grow with
3119
  @type dryrun: boolean
3120
  @param dryrun: whether to execute the operation in simulation mode
3121
      only, without actually increasing the size
3122
  @param backingstore: whether to execute the operation on backing storage
3123
      only, or on "logical" storage only; e.g. DRBD is logical storage,
3124
      whereas LVM, file, RBD are backing storage
3125
  @rtype: (status, result)
3126
  @type excl_stor: boolean
3127
  @param excl_stor: Whether exclusive_storage is active
3128
  @return: a tuple with the status of the operation (True/False), and
3129
      the errors message if status is False
3130

3131
  """
3132
  r_dev = _RecursiveFindBD(disk)
3133
  if r_dev is None:
3134
    _Fail("Cannot find block device %s", disk)
3135

    
3136
  try:
3137
    r_dev.Grow(amount, dryrun, backingstore, excl_stor)
3138
  except errors.BlockDeviceError, err:
3139
    _Fail("Failed to grow block device: %s", err, exc=True)
3140

    
3141

    
3142
def BlockdevSnapshot(disk):
3143
  """Create a snapshot copy of a block device.
3144

3145
  This function is called recursively, and the snapshot is actually created
3146
  just for the leaf lvm backend device.
3147

3148
  @type disk: L{objects.Disk}
3149
  @param disk: the disk to be snapshotted
3150
  @rtype: string
3151
  @return: snapshot disk ID as (vg, lv)
3152

3153
  """
3154
  if disk.dev_type == constants.DT_DRBD8:
3155
    if not disk.children:
3156
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
3157
            disk.unique_id)
3158
    return BlockdevSnapshot(disk.children[0])
3159
  elif disk.dev_type == constants.DT_PLAIN:
3160
    r_dev = _RecursiveFindBD(disk)
3161
    if r_dev is not None:
3162
      # FIXME: choose a saner value for the snapshot size
3163
      # let's stay on the safe side and ask for the full size, for now
3164
      return r_dev.Snapshot(disk.size)
3165
    else:
3166
      _Fail("Cannot find block device %s", disk)
3167
  else:
3168
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
3169
          disk.unique_id, disk.dev_type)
3170

    
3171

    
3172
def BlockdevSetInfo(disk, info):
3173
  """Sets 'metadata' information on block devices.
3174

3175
  This function sets 'info' metadata on block devices. Initial
3176
  information is set at device creation; this function should be used
3177
  for example after renames.
3178

3179
  @type disk: L{objects.Disk}
3180
  @param disk: the disk to be grown
3181
  @type info: string
3182
  @param info: new 'info' metadata
3183
  @rtype: (status, result)
3184
  @return: a tuple with the status of the operation (True/False), and
3185
      the errors message if status is False
3186

3187
  """
3188
  r_dev = _RecursiveFindBD(disk)
3189
  if r_dev is None:
3190
    _Fail("Cannot find block device %s", disk)
3191

    
3192
  try:
3193
    r_dev.SetInfo(info)
3194
  except errors.BlockDeviceError, err:
3195
    _Fail("Failed to set information on block device: %s", err, exc=True)
3196

    
3197

    
3198
def FinalizeExport(instance, snap_disks):
3199
  """Write out the export configuration information.
3200

3201
  @type instance: L{objects.Instance}
3202
  @param instance: the instance which we export, used for
3203
      saving configuration
3204
  @type snap_disks: list of L{objects.Disk}
3205
  @param snap_disks: list of snapshot block devices, which
3206
      will be used to get the actual name of the dump file
3207

3208
  @rtype: None
3209

3210
  """
3211
  destdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name + ".new")
3212
  finaldestdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name)
3213

    
3214
  config = objects.SerializableConfigParser()
3215

    
3216
  config.add_section(constants.INISECT_EXP)
3217
  config.set(constants.INISECT_EXP, "version", "0")
3218
  config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
3219
  config.set(constants.INISECT_EXP, "source", instance.primary_node)
3220
  config.set(constants.INISECT_EXP, "os", instance.os)
3221
  config.set(constants.INISECT_EXP, "compression", "none")
3222

    
3223
  config.add_section(constants.INISECT_INS)
3224
  config.set(constants.INISECT_INS, "name", instance.name)
3225
  config.set(constants.INISECT_INS, "maxmem", "%d" %
3226
             instance.beparams[constants.BE_MAXMEM])
3227
  config.set(constants.INISECT_INS, "minmem", "%d" %
3228
             instance.beparams[constants.BE_MINMEM])
3229
  # "memory" is deprecated, but useful for exporting to old ganeti versions
3230
  config.set(constants.INISECT_INS, "memory", "%d" %
3231
             instance.beparams[constants.BE_MAXMEM])
3232
  config.set(constants.INISECT_INS, "vcpus", "%d" %
3233
             instance.beparams[constants.BE_VCPUS])
3234
  config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
3235
  config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
3236
  config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
3237

    
3238
  nic_total = 0
3239
  for nic_count, nic in enumerate(instance.nics):
3240
    nic_total += 1
3241
    config.set(constants.INISECT_INS, "nic%d_mac" %
3242
               nic_count, "%s" % nic.mac)
3243
    config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
3244
    config.set(constants.INISECT_INS, "nic%d_network" % nic_count,
3245
               "%s" % nic.network)
3246
    for param in constants.NICS_PARAMETER_TYPES:
3247
      config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
3248
                 "%s" % nic.nicparams.get(param, None))
3249
  # TODO: redundant: on load can read nics until it doesn't exist
3250
  config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
3251

    
3252
  disk_total = 0
3253
  for disk_count, disk in enumerate(snap_disks):
3254
    if disk:
3255
      disk_total += 1
3256
      config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
3257
                 ("%s" % disk.iv_name))
3258
      config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
3259
                 ("%s" % disk.logical_id[1]))
3260
      config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
3261
                 ("%d" % disk.size))
3262

    
3263
  config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
3264

    
3265
  # New-style hypervisor/backend parameters
3266

    
3267
  config.add_section(constants.INISECT_HYP)
3268
  for name, value in instance.hvparams.items():
3269
    if name not in constants.HVC_GLOBALS:
3270
      config.set(constants.INISECT_HYP, name, str(value))
3271

    
3272
  config.add_section(constants.INISECT_BEP)
3273
  for name, value in instance.beparams.items():
3274
    config.set(constants.INISECT_BEP, name, str(value))
3275

    
3276
  config.add_section(constants.INISECT_OSP)
3277
  for name, value in instance.osparams.items():
3278
    config.set(constants.INISECT_OSP, name, str(value))
3279

    
3280
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
3281
                  data=config.Dumps())
3282
  shutil.rmtree(finaldestdir, ignore_errors=True)
3283
  shutil.move(destdir, finaldestdir)
3284

    
3285

    
3286
def ExportInfo(dest):
3287
  """Get export configuration information.
3288

3289
  @type dest: str
3290
  @param dest: directory containing the export
3291

3292
  @rtype: L{objects.SerializableConfigParser}
3293
  @return: a serializable config file containing the
3294
      export info
3295

3296
  """
3297
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
3298

    
3299
  config = objects.SerializableConfigParser()
3300
  config.read(cff)
3301

    
3302
  if (not config.has_section(constants.INISECT_EXP) or
3303
      not config.has_section(constants.INISECT_INS)):
3304
    _Fail("Export info file doesn't have the required fields")
3305

    
3306
  return config.Dumps()
3307

    
3308

    
3309
def ListExports():
3310
  """Return a list of exports currently available on this machine.
3311

3312
  @rtype: list
3313
  @return: list of the exports
3314

3315
  """
3316
  if os.path.isdir(pathutils.EXPORT_DIR):
3317
    return sorted(utils.ListVisibleFiles(pathutils.EXPORT_DIR))
3318
  else:
3319
    _Fail("No exports directory")
3320

    
3321

    
3322
def RemoveExport(export):
3323
  """Remove an existing export from the node.
3324

3325
  @type export: str
3326
  @param export: the name of the export to remove
3327
  @rtype: None
3328

3329
  """
3330
  target = utils.PathJoin(pathutils.EXPORT_DIR, export)
3331

    
3332
  try:
3333
    shutil.rmtree(target)
3334
  except EnvironmentError, err:
3335
    _Fail("Error while removing the export: %s", err, exc=True)
3336

    
3337

    
3338
def BlockdevRename(devlist):
3339
  """Rename a list of block devices.
3340

3341
  @type devlist: list of tuples
3342
  @param devlist: list of tuples of the form  (disk, new_unique_id); disk is
3343
      an L{objects.Disk} object describing the current disk, and new
3344
      unique_id is the name we rename it to
3345
  @rtype: boolean
3346
  @return: True if all renames succeeded, False otherwise
3347

3348
  """
3349
  msgs = []
3350
  result = True
3351
  for disk, unique_id in devlist:
3352
    dev = _RecursiveFindBD(disk)
3353
    if dev is None:
3354
      msgs.append("Can't find device %s in rename" % str(disk))
3355
      result = False
3356
      continue
3357
    try:
3358
      old_rpath = dev.dev_path
3359
      dev.Rename(unique_id)
3360
      new_rpath = dev.dev_path
3361
      if old_rpath != new_rpath:
3362
        DevCacheManager.RemoveCache(old_rpath)
3363
        # FIXME: we should add the new cache information here, like:
3364
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
3365
        # but we don't have the owner here - maybe parse from existing
3366
        # cache? for now, we only lose lvm data when we rename, which
3367
        # is less critical than DRBD or MD
3368
    except errors.BlockDeviceError, err:
3369
      msgs.append("Can't rename device '%s' to '%s': %s" %
3370
                  (dev, unique_id, err))
3371
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
3372
      result = False
3373
  if not result:
3374
    _Fail("; ".join(msgs))
3375

    
3376

    
3377
def _TransformFileStorageDir(fs_dir):
3378
  """Checks whether given file_storage_dir is valid.
3379

3380
  Checks wheter the given fs_dir is within the cluster-wide default
3381
  file_storage_dir or the shared_file_storage_dir, which are stored in
3382
  SimpleStore. Only paths under those directories are allowed.
3383

3384
  @type fs_dir: str
3385
  @param fs_dir: the path to check
3386

3387
  @return: the normalized path if valid, None otherwise
3388

3389
  """
3390
  filestorage.CheckFileStoragePath(fs_dir)
3391

    
3392
  return os.path.normpath(fs_dir)
3393

    
3394

    
3395
def CreateFileStorageDir(file_storage_dir):
3396
  """Create file storage directory.
3397

3398
  @type file_storage_dir: str
3399
  @param file_storage_dir: directory to create
3400

3401
  @rtype: tuple
3402
  @return: tuple with first element a boolean indicating wheter dir
3403
      creation was successful or not
3404

3405
  """
3406
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
3407
  if os.path.exists(file_storage_dir):
3408
    if not os.path.isdir(file_storage_dir):
3409
      _Fail("Specified storage dir '%s' is not a directory",
3410
            file_storage_dir)
3411
  else:
3412
    try:
3413
      os.makedirs(file_storage_dir, 0750)
3414
    except OSError, err:
3415
      _Fail("Cannot create file storage directory '%s': %s",
3416
            file_storage_dir, err, exc=True)
3417

    
3418

    
3419
def RemoveFileStorageDir(file_storage_dir):
3420
  """Remove file storage directory.
3421

3422
  Remove it only if it's empty. If not log an error and return.
3423

3424
  @type file_storage_dir: str
3425
  @param file_storage_dir: the directory we should cleanup
3426
  @rtype: tuple (success,)
3427
  @return: tuple of one element, C{success}, denoting
3428
      whether the operation was successful
3429

3430
  """
3431
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
3432
  if os.path.exists(file_storage_dir):
3433
    if not os.path.isdir(file_storage_dir):
3434
      _Fail("Specified Storage directory '%s' is not a directory",
3435
            file_storage_dir)
3436
    # deletes dir only if empty, otherwise we want to fail the rpc call
3437
    try:
3438
      os.rmdir(file_storage_dir)
3439
    except OSError, err:
3440
      _Fail("Cannot remove file storage directory '%s': %s",
3441
            file_storage_dir, err)
3442

    
3443

    
3444
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
3445
  """Rename the file storage directory.
3446

3447
  @type old_file_storage_dir: str
3448
  @param old_file_storage_dir: the current path
3449
  @type new_file_storage_dir: str
3450
  @param new_file_storage_dir: the name we should rename to
3451
  @rtype: tuple (success,)
3452
  @return: tuple of one element, C{success}, denoting
3453
      whether the operation was successful
3454

3455
  """
3456
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
3457
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
3458
  if not os.path.exists(new_file_storage_dir):
3459
    if os.path.isdir(old_file_storage_dir):
3460
      try:
3461
        os.rename(old_file_storage_dir, new_file_storage_dir)
3462
      except OSError, err:
3463
        _Fail("Cannot rename '%s' to '%s': %s",
3464
              old_file_storage_dir, new_file_storage_dir, err)
3465
    else:
3466
      _Fail("Specified storage dir '%s' is not a directory",
3467
            old_file_storage_dir)
3468
  else:
3469
    if os.path.exists(old_file_storage_dir):
3470
      _Fail("Cannot rename '%s' to '%s': both locations exist",
3471
            old_file_storage_dir, new_file_storage_dir)
3472

    
3473

    
3474
def _EnsureJobQueueFile(file_name):
3475
  """Checks whether the given filename is in the queue directory.
3476

3477
  @type file_name: str
3478
  @param file_name: the file name we should check
3479
  @rtype: None
3480
  @raises RPCFail: if the file is not valid
3481

3482
  """
3483
  if not utils.IsBelowDir(pathutils.QUEUE_DIR, file_name):
3484
    _Fail("Passed job queue file '%s' does not belong to"
3485
          " the queue directory '%s'", file_name, pathutils.QUEUE_DIR)
3486

    
3487

    
3488
def JobQueueUpdate(file_name, content):
3489
  """Updates a file in the queue directory.
3490

3491
  This is just a wrapper over L{utils.io.WriteFile}, with proper
3492
  checking.
3493

3494
  @type file_name: str
3495
  @param file_name: the job file name
3496
  @type content: str
3497
  @param content: the new job contents
3498
  @rtype: boolean
3499
  @return: the success of the operation
3500

3501
  """
3502
  file_name = vcluster.LocalizeVirtualPath(file_name)
3503

    
3504
  _EnsureJobQueueFile(file_name)
3505
  getents = runtime.GetEnts()
3506

    
3507
  # Write and replace the file atomically
3508
  utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
3509
                  gid=getents.daemons_gid, mode=constants.JOB_QUEUE_FILES_PERMS)
3510

    
3511

    
3512
def JobQueueRename(old, new):
3513
  """Renames a job queue file.
3514

3515
  This is just a wrapper over os.rename with proper checking.
3516

3517
  @type old: str
3518
  @param old: the old (actual) file name
3519
  @type new: str
3520
  @param new: the desired file name
3521
  @rtype: tuple
3522
  @return: the success of the operation and payload
3523

3524
  """
3525
  old = vcluster.LocalizeVirtualPath(old)
3526
  new = vcluster.LocalizeVirtualPath(new)
3527

    
3528
  _EnsureJobQueueFile(old)
3529
  _EnsureJobQueueFile(new)
3530

    
3531
  getents = runtime.GetEnts()
3532

    
3533
  utils.RenameFile(old, new, mkdir=True, mkdir_mode=0750,
3534
                   dir_uid=getents.masterd_uid, dir_gid=getents.daemons_gid)
3535

    
3536

    
3537
def BlockdevClose(instance_name, disks):
3538
  """Closes the given block devices.
3539

3540
  This means they will be switched to secondary mode (in case of
3541
  DRBD).
3542

3543
  @param instance_name: if the argument is not empty, the symlinks
3544
      of this instance will be removed
3545
  @type disks: list of L{objects.Disk}
3546
  @param disks: the list of disks to be closed
3547
  @rtype: tuple (success, message)
3548
  @return: a tuple of success and message, where success
3549
      indicates the succes of the operation, and message
3550
      which will contain the error details in case we
3551
      failed
3552

3553
  """
3554
  bdevs = []
3555
  for cf in disks:
3556
    rd = _RecursiveFindBD(cf)
3557
    if rd is None:
3558
      _Fail("Can't find device %s", cf)
3559
    bdevs.append(rd)
3560

    
3561
  msg = []
3562
  for rd in bdevs:
3563
    try:
3564
      rd.Close()
3565
    except errors.BlockDeviceError, err:
3566
      msg.append(str(err))
3567
  if msg:
3568
    _Fail("Can't make devices secondary: %s", ",".join(msg))
3569
  else:
3570
    if instance_name:
3571
      _RemoveBlockDevLinks(instance_name, disks)
3572

    
3573

    
3574
def ValidateHVParams(hvname, hvparams):
3575
  """Validates the given hypervisor parameters.
3576

3577
  @type hvname: string
3578
  @param hvname: the hypervisor name
3579
  @type hvparams: dict
3580
  @param hvparams: the hypervisor parameters to be validated
3581
  @rtype: None
3582

3583
  """
3584
  try:
3585
    hv_type = hypervisor.GetHypervisor(hvname)
3586
    hv_type.ValidateParameters(hvparams)
3587
  except errors.HypervisorError, err:
3588
    _Fail(str(err), log=False)
3589

    
3590

    
3591
def _CheckOSPList(os_obj, parameters):
3592
  """Check whether a list of parameters is supported by the OS.
3593

3594
  @type os_obj: L{objects.OS}
3595
  @param os_obj: OS object to check
3596
  @type parameters: list
3597
  @param parameters: the list of parameters to check
3598

3599
  """
3600
  supported = [v[0] for v in os_obj.supported_parameters]
3601
  delta = frozenset(parameters).difference(supported)
3602
  if delta:
3603
    _Fail("The following parameters are not supported"
3604
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
3605

    
3606

    
3607
def ValidateOS(required, osname, checks, osparams):
3608
  """Validate the given OS' parameters.
3609

3610
  @type required: boolean
3611
  @param required: whether absence of the OS should translate into
3612
      failure or not
3613
  @type osname: string
3614
  @param osname: the OS to be validated
3615
  @type checks: list
3616
  @param checks: list of the checks to run (currently only 'parameters')
3617
  @type osparams: dict
3618
  @param osparams: dictionary with OS parameters
3619
  @rtype: boolean
3620
  @return: True if the validation passed, or False if the OS was not
3621
      found and L{required} was false
3622

3623
  """
3624
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
3625
    _Fail("Unknown checks required for OS %s: %s", osname,
3626
          set(checks).difference(constants.OS_VALIDATE_CALLS))
3627

    
3628
  name_only = objects.OS.GetName(osname)
3629
  status, tbv = _TryOSFromDisk(name_only, None)
3630

    
3631
  if not status:
3632
    if required:
3633
      _Fail(tbv)
3634
    else:
3635
      return False
3636

    
3637
  if max(tbv.api_versions) < constants.OS_API_V20:
3638
    return True
3639

    
3640
  if constants.OS_VALIDATE_PARAMETERS in checks:
3641
    _CheckOSPList(tbv, osparams.keys())
3642

    
3643
  validate_env = OSCoreEnv(osname, tbv, osparams)
3644
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
3645
                        cwd=tbv.path, reset_env=True)
3646
  if result.failed:
3647
    logging.error("os validate command '%s' returned error: %s output: %s",
3648
                  result.cmd, result.fail_reason, result.output)
3649
    _Fail("OS validation script failed (%s), output: %s",
3650
          result.fail_reason, result.output, log=False)
3651

    
3652
  return True
3653

    
3654

    
3655
def DemoteFromMC():
3656
  """Demotes the current node from master candidate role.
3657

3658
  """
3659
  # try to ensure we're not the master by mistake
3660
  master, myself = ssconf.GetMasterAndMyself()
3661
  if master == myself:
3662
    _Fail("ssconf status shows I'm the master node, will not demote")
3663

    
3664
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "check", constants.MASTERD])
3665
  if not result.failed:
3666
    _Fail("The master daemon is running, will not demote")
3667

    
3668
  try:
3669
    if os.path.isfile(pathutils.CLUSTER_CONF_FILE):
3670
      utils.CreateBackup(pathutils.CLUSTER_CONF_FILE)
3671
  except EnvironmentError, err:
3672
    if err.errno != errno.ENOENT:
3673
      _Fail("Error while backing up cluster file: %s", err, exc=True)
3674

    
3675
  utils.RemoveFile(pathutils.CLUSTER_CONF_FILE)
3676

    
3677

    
3678
def _GetX509Filenames(cryptodir, name):
3679
  """Returns the full paths for the private key and certificate.
3680

3681
  """
3682
  return (utils.PathJoin(cryptodir, name),
3683
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
3684
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
3685

    
3686

    
3687
def CreateX509Certificate(validity, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3688
  """Creates a new X509 certificate for SSL/TLS.
3689

3690
  @type validity: int
3691
  @param validity: Validity in seconds
3692
  @rtype: tuple; (string, string)
3693
  @return: Certificate name and public part
3694

3695
  """
3696
  (key_pem, cert_pem) = \
3697
    utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
3698
                                     min(validity, _MAX_SSL_CERT_VALIDITY))
3699

    
3700
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
3701
                              prefix="x509-%s-" % utils.TimestampForFilename())
3702
  try:
3703
    name = os.path.basename(cert_dir)
3704
    assert len(name) > 5
3705

    
3706
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3707

    
3708
    utils.WriteFile(key_file, mode=0400, data=key_pem)
3709
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
3710

    
3711
    # Never return private key as it shouldn't leave the node
3712
    return (name, cert_pem)
3713
  except Exception:
3714
    shutil.rmtree(cert_dir, ignore_errors=True)
3715
    raise
3716

    
3717

    
3718
def RemoveX509Certificate(name, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3719
  """Removes a X509 certificate.
3720

3721
  @type name: string
3722
  @param name: Certificate name
3723

3724
  """
3725
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3726

    
3727
  utils.RemoveFile(key_file)
3728
  utils.RemoveFile(cert_file)
3729

    
3730
  try:
3731
    os.rmdir(cert_dir)
3732
  except EnvironmentError, err:
3733
    _Fail("Cannot remove certificate directory '%s': %s",
3734
          cert_dir, err)
3735

    
3736

    
3737
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3738
  """Returns the command for the requested input/output.
3739

3740
  @type instance: L{objects.Instance}
3741
  @param instance: The instance object
3742
  @param mode: Import/export mode
3743
  @param ieio: Input/output type
3744
  @param ieargs: Input/output arguments
3745

3746
  """
3747
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3748

    
3749
  env = None
3750
  prefix = None
3751
  suffix = None
3752
  exp_size = None
3753

    
3754
  if ieio == constants.IEIO_FILE:
3755
    (filename, ) = ieargs
3756

    
3757
    if not utils.IsNormAbsPath(filename):
3758
      _Fail("Path '%s' is not normalized or absolute", filename)
3759

    
3760
    real_filename = os.path.realpath(filename)
3761
    directory = os.path.dirname(real_filename)
3762

    
3763
    if not utils.IsBelowDir(pathutils.EXPORT_DIR, real_filename):
3764
      _Fail("File '%s' is not under exports directory '%s': %s",
3765
            filename, pathutils.EXPORT_DIR, real_filename)
3766

    
3767
    # Create directory
3768
    utils.Makedirs(directory, mode=0750)
3769

    
3770
    quoted_filename = utils.ShellQuote(filename)
3771

    
3772
    if mode == constants.IEM_IMPORT:
3773
      suffix = "> %s" % quoted_filename
3774
    elif mode == constants.IEM_EXPORT:
3775
      suffix = "< %s" % quoted_filename
3776

    
3777
      # Retrieve file size
3778
      try:
3779
        st = os.stat(filename)
3780
      except EnvironmentError, err:
3781
        logging.error("Can't stat(2) %s: %s", filename, err)
3782
      else:
3783
        exp_size = utils.BytesToMebibyte(st.st_size)
3784

    
3785
  elif ieio == constants.IEIO_RAW_DISK:
3786
    (disk, ) = ieargs
3787

    
3788
    real_disk = _OpenRealBD(disk)
3789

    
3790
    if mode == constants.IEM_IMPORT:
3791
      # we use nocreat to fail if the device is not already there or we pass a
3792
      # wrong path; we use notrunc to no attempt truncate on an LV device
3793
      suffix = utils.BuildShellCmd("| dd of=%s conv=nocreat,notrunc bs=%s",
3794
                                   real_disk.dev_path,
3795
                                   str(1024 * 1024)) # 1 MB
3796

    
3797
    elif mode == constants.IEM_EXPORT:
3798
      # the block size on the read dd is 1MiB to match our units
3799
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3800
                                   real_disk.dev_path,
3801
                                   str(1024 * 1024), # 1 MB
3802
                                   str(disk.size))
3803
      exp_size = disk.size
3804

    
3805
  elif ieio == constants.IEIO_SCRIPT:
3806
    (disk, disk_index, ) = ieargs
3807

    
3808
    assert isinstance(disk_index, (int, long))
3809

    
3810
    inst_os = OSFromDisk(instance.os)
3811
    env = OSEnvironment(instance, inst_os)
3812

    
3813
    if mode == constants.IEM_IMPORT:
3814
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3815
      env["IMPORT_INDEX"] = str(disk_index)
3816
      script = inst_os.import_script
3817

    
3818
    elif mode == constants.IEM_EXPORT:
3819
      real_disk = _OpenRealBD(disk)
3820
      env["EXPORT_DEVICE"] = real_disk.dev_path
3821
      env["EXPORT_INDEX"] = str(disk_index)
3822
      script = inst_os.export_script
3823

    
3824
    # TODO: Pass special environment only to script
3825
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3826

    
3827
    if mode == constants.IEM_IMPORT:
3828
      suffix = "| %s" % script_cmd
3829

    
3830
    elif mode == constants.IEM_EXPORT:
3831
      prefix = "%s |" % script_cmd
3832

    
3833
    # Let script predict size
3834
    exp_size = constants.IE_CUSTOM_SIZE
3835

    
3836
  else:
3837
    _Fail("Invalid %s I/O mode %r", mode, ieio)
3838

    
3839
  return (env, prefix, suffix, exp_size)
3840

    
3841

    
3842
def _CreateImportExportStatusDir(prefix):
3843
  """Creates status directory for import/export.
3844

3845
  """
3846
  return tempfile.mkdtemp(dir=pathutils.IMPORT_EXPORT_DIR,
3847
                          prefix=("%s-%s-" %
3848
                                  (prefix, utils.TimestampForFilename())))
3849

    
3850

    
3851
def StartImportExportDaemon(mode, opts, host, port, instance, component,
3852
                            ieio, ieioargs):
3853
  """Starts an import or export daemon.
3854

3855
  @param mode: Import/output mode
3856
  @type opts: L{objects.ImportExportOptions}
3857
  @param opts: Daemon options
3858
  @type host: string
3859
  @param host: Remote host for export (None for import)
3860
  @type port: int
3861
  @param port: Remote port for export (None for import)
3862
  @type instance: L{objects.Instance}
3863
  @param instance: Instance object
3864
  @type component: string
3865
  @param component: which part of the instance is transferred now,
3866
      e.g. 'disk/0'
3867
  @param ieio: Input/output type
3868
  @param ieioargs: Input/output arguments
3869

3870
  """
3871
  if mode == constants.IEM_IMPORT:
3872
    prefix = "import"
3873

    
3874
    if not (host is None and port is None):
3875
      _Fail("Can not specify host or port on import")
3876

    
3877
  elif mode == constants.IEM_EXPORT:
3878
    prefix = "export"
3879

    
3880
    if host is None or port is None:
3881
      _Fail("Host and port must be specified for an export")
3882

    
3883
  else:
3884
    _Fail("Invalid mode %r", mode)
3885

    
3886
  if (opts.key_name is None) ^ (opts.ca_pem is None):
3887
    _Fail("Cluster certificate can only be used for both key and CA")
3888

    
3889
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3890
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3891

    
3892
  if opts.key_name is None:
3893
    # Use server.pem
3894
    key_path = pathutils.NODED_CERT_FILE
3895
    cert_path = pathutils.NODED_CERT_FILE
3896
    assert opts.ca_pem is None
3897
  else:
3898
    (_, key_path, cert_path) = _GetX509Filenames(pathutils.CRYPTO_KEYS_DIR,
3899
                                                 opts.key_name)
3900
    assert opts.ca_pem is not None
3901

    
3902
  for i in [key_path, cert_path]:
3903
    if not os.path.exists(i):
3904
      _Fail("File '%s' does not exist" % i)
3905

    
3906
  status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3907
  try:
3908
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3909
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3910
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3911

    
3912
    if opts.ca_pem is None:
3913
      # Use server.pem
3914
      ca = utils.ReadFile(pathutils.NODED_CERT_FILE)
3915
    else:
3916
      ca = opts.ca_pem
3917

    
3918
    # Write CA file
3919
    utils.WriteFile(ca_file, data=ca, mode=0400)
3920

    
3921
    cmd = [
3922
      pathutils.IMPORT_EXPORT_DAEMON,
3923
      status_file, mode,
3924
      "--key=%s" % key_path,
3925
      "--cert=%s" % cert_path,
3926
      "--ca=%s" % ca_file,
3927
      ]
3928

    
3929
    if host:
3930
      cmd.append("--host=%s" % host)
3931

    
3932
    if port:
3933
      cmd.append("--port=%s" % port)
3934

    
3935
    if opts.ipv6:
3936
      cmd.append("--ipv6")
3937
    else:
3938
      cmd.append("--ipv4")
3939

    
3940
    if opts.compress:
3941
      cmd.append("--compress=%s" % opts.compress)
3942

    
3943
    if opts.magic:
3944
      cmd.append("--magic=%s" % opts.magic)
3945

    
3946
    if exp_size is not None:
3947
      cmd.append("--expected-size=%s" % exp_size)
3948

    
3949
    if cmd_prefix:
3950
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
3951

    
3952
    if cmd_suffix:
3953
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
3954

    
3955
    if mode == constants.IEM_EXPORT:
3956
      # Retry connection a few times when connecting to remote peer
3957
      cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3958
      cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3959
    elif opts.connect_timeout is not None:
3960
      assert mode == constants.IEM_IMPORT
3961
      # Overall timeout for establishing connection while listening
3962
      cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3963

    
3964
    logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3965

    
3966
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3967
    # support for receiving a file descriptor for output
3968
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3969
                      output=logfile)
3970

    
3971
    # The import/export name is simply the status directory name
3972
    return os.path.basename(status_dir)
3973

    
3974
  except Exception:
3975
    shutil.rmtree(status_dir, ignore_errors=True)
3976
    raise
3977

    
3978

    
3979
def GetImportExportStatus(names):
3980
  """Returns import/export daemon status.
3981

3982
  @type names: sequence
3983
  @param names: List of names
3984
  @rtype: List of dicts
3985
  @return: Returns a list of the state of each named import/export or None if a
3986
           status couldn't be read
3987

3988
  """
3989
  result = []
3990

    
3991
  for name in names:
3992
    status_file = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name,
3993
                                 _IES_STATUS_FILE)
3994

    
3995
    try:
3996
      data = utils.ReadFile(status_file)
3997
    except EnvironmentError, err:
3998
      if err.errno != errno.ENOENT:
3999
        raise
4000
      data = None
4001

    
4002
    if not data:
4003
      result.append(None)
4004
      continue
4005

    
4006
    result.append(serializer.LoadJson(data))
4007

    
4008
  return result
4009

    
4010

    
4011
def AbortImportExport(name):
4012
  """Sends SIGTERM to a running import/export daemon.
4013

4014
  """
4015
  logging.info("Abort import/export %s", name)
4016

    
4017
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
4018
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
4019

    
4020
  if pid:
4021
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
4022
                 name, pid)
4023
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
4024

    
4025

    
4026
def CleanupImportExport(name):
4027
  """Cleanup after an import or export.
4028

4029
  If the import/export daemon is still running it's killed. Afterwards the
4030
  whole status directory is removed.
4031

4032
  """
4033
  logging.info("Finalizing import/export %s", name)
4034

    
4035
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
4036

    
4037
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
4038

    
4039
  if pid:
4040
    logging.info("Import/export %s is still running with PID %s",
4041
                 name, pid)
4042
    utils.KillProcess(pid, waitpid=False)
4043

    
4044
  shutil.rmtree(status_dir, ignore_errors=True)
4045

    
4046

    
4047
def _FindDisks(disks):
4048
  """Finds attached L{BlockDev}s for the given disks.
4049

4050
  @type disks: list of L{objects.Disk}
4051
  @param disks: the disk objects we need to find
4052

4053
  @return: list of L{BlockDev} objects or C{None} if a given disk
4054
           was not found or was no attached.
4055

4056
  """
4057
  bdevs = []
4058

    
4059
  for disk in disks:
4060
    rd = _RecursiveFindBD(disk)
4061
    if rd is None:
4062
      _Fail("Can't find device %s", disk)
4063
    bdevs.append(rd)
4064
  return bdevs
4065

    
4066

    
4067
def DrbdDisconnectNet(disks):
4068
  """Disconnects the network on a list of drbd devices.
4069

4070
  """
4071
  bdevs = _FindDisks(disks)
4072

    
4073
  # disconnect disks
4074
  for rd in bdevs:
4075
    try:
4076
      rd.DisconnectNet()
4077
    except errors.BlockDeviceError, err:
4078
      _Fail("Can't change network configuration to standalone mode: %s",
4079
            err, exc=True)
4080

    
4081

    
4082
def DrbdAttachNet(disks, instance_name, multimaster):
4083
  """Attaches the network on a list of drbd devices.
4084

4085
  """
4086
  bdevs = _FindDisks(disks)
4087

    
4088
  if multimaster:
4089
    for idx, rd in enumerate(bdevs):
4090
      try:
4091
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
4092
      except EnvironmentError, err:
4093
        _Fail("Can't create symlink: %s", err)
4094
  # reconnect disks, switch to new master configuration and if
4095
  # needed primary mode
4096
  for rd in bdevs:
4097
    try:
4098
      rd.AttachNet(multimaster)
4099
    except errors.BlockDeviceError, err:
4100
      _Fail("Can't change network configuration: %s", err)
4101

    
4102
  # wait until the disks are connected; we need to retry the re-attach
4103
  # if the device becomes standalone, as this might happen if the one
4104
  # node disconnects and reconnects in a different mode before the
4105
  # other node reconnects; in this case, one or both of the nodes will
4106
  # decide it has wrong configuration and switch to standalone
4107

    
4108
  def _Attach():
4109
    all_connected = True
4110

    
4111
    for rd in bdevs:
4112
      stats = rd.GetProcStatus()
4113

    
4114
      if multimaster:
4115
        # In the multimaster case we have to wait explicitly until
4116
        # the resource is Connected and UpToDate/UpToDate, because
4117
        # we promote *both nodes* to primary directly afterwards.
4118
        # Being in resync is not enough, since there is a race during which we
4119
        # may promote a node with an Outdated disk to primary, effectively
4120
        # tearing down the connection.
4121
        all_connected = (all_connected and
4122
                         stats.is_connected and
4123
                         stats.is_disk_uptodate and
4124
                         stats.peer_disk_uptodate)
4125
      else:
4126
        all_connected = (all_connected and
4127
                         (stats.is_connected or stats.is_in_resync))
4128

    
4129
      if stats.is_standalone:
4130
        # peer had different config info and this node became
4131
        # standalone, even though this should not happen with the
4132
        # new staged way of changing disk configs
4133
        try:
4134
          rd.AttachNet(multimaster)
4135
        except errors.BlockDeviceError, err:
4136
          _Fail("Can't change network configuration: %s", err)
4137

    
4138
    if not all_connected:
4139
      raise utils.RetryAgain()
4140

    
4141
  try:
4142
    # Start with a delay of 100 miliseconds and go up to 5 seconds
4143
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
4144
  except utils.RetryTimeout:
4145
    _Fail("Timeout in disk reconnecting")
4146

    
4147
  if multimaster:
4148
    # change to primary mode
4149
    for rd in bdevs:
4150
      try:
4151
        rd.Open()
4152
      except errors.BlockDeviceError, err:
4153
        _Fail("Can't change to primary mode: %s", err)
4154

    
4155

    
4156
def DrbdWaitSync(disks):
4157
  """Wait until DRBDs have synchronized.
4158

4159
  """
4160
  def _helper(rd):
4161
    stats = rd.GetProcStatus()
4162
    if not (stats.is_connected or stats.is_in_resync):
4163
      raise utils.RetryAgain()
4164
    return stats
4165

    
4166
  bdevs = _FindDisks(disks)
4167

    
4168
  min_resync = 100
4169
  alldone = True
4170
  for rd in bdevs:
4171
    try:
4172
      # poll each second for 15 seconds
4173
      stats = utils.Retry(_helper, 1, 15, args=[rd])
4174
    except utils.RetryTimeout:
4175
      stats = rd.GetProcStatus()
4176
      # last check
4177
      if not (stats.is_connected or stats.is_in_resync):
4178
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
4179
    alldone = alldone and (not stats.is_in_resync)
4180
    if stats.sync_percent is not None:
4181
      min_resync = min(min_resync, stats.sync_percent)
4182

    
4183
  return (alldone, min_resync)
4184

    
4185

    
4186
def DrbdNeedsActivation(disks):
4187
  """Checks which of the passed disks needs activation and returns their UUIDs.
4188

4189
  """
4190
  faulty_disks = []
4191

    
4192
  for disk in disks:
4193
    rd = _RecursiveFindBD(disk)
4194
    if rd is None:
4195
      faulty_disks.append(disk)
4196
      continue
4197

    
4198
    stats = rd.GetProcStatus()
4199
    if stats.is_standalone or stats.is_diskless:
4200
      faulty_disks.append(disk)
4201

    
4202
  return [disk.uuid for disk in faulty_disks]
4203

    
4204

    
4205
def GetDrbdUsermodeHelper():
4206
  """Returns DRBD usermode helper currently configured.
4207

4208
  """
4209
  try:
4210
    return drbd.DRBD8.GetUsermodeHelper()
4211
  except errors.BlockDeviceError, err:
4212
    _Fail(str(err))
4213

    
4214

    
4215
def PowercycleNode(hypervisor_type, hvparams=None):
4216
  """Hard-powercycle the node.
4217

4218
  Because we need to return first, and schedule the powercycle in the
4219
  background, we won't be able to report failures nicely.
4220

4221
  """
4222
  hyper = hypervisor.GetHypervisor(hypervisor_type)
4223
  try:
4224
    pid = os.fork()
4225
  except OSError:
4226
    # if we can't fork, we'll pretend that we're in the child process
4227
    pid = 0
4228
  if pid > 0:
4229
    return "Reboot scheduled in 5 seconds"
4230
  # ensure the child is running on ram
4231
  try:
4232
    utils.Mlockall()
4233
  except Exception: # pylint: disable=W0703
4234
    pass
4235
  time.sleep(5)
4236
  hyper.PowercycleNode(hvparams=hvparams)
4237

    
4238

    
4239
def _VerifyRestrictedCmdName(cmd):
4240
  """Verifies a restricted command name.
4241

4242
  @type cmd: string
4243
  @param cmd: Command name
4244
  @rtype: tuple; (boolean, string or None)
4245
  @return: The tuple's first element is the status; if C{False}, the second
4246
    element is an error message string, otherwise it's C{None}
4247

4248
  """
4249
  if not cmd.strip():
4250
    return (False, "Missing command name")
4251

    
4252
  if os.path.basename(cmd) != cmd:
4253
    return (False, "Invalid command name")
4254

    
4255
  if not constants.EXT_PLUGIN_MASK.match(cmd):
4256
    return (False, "Command name contains forbidden characters")
4257

    
4258
  return (True, None)
4259

    
4260

    
4261
def _CommonRestrictedCmdCheck(path, owner):
4262
  """Common checks for restricted command file system directories and files.
4263

4264
  @type path: string
4265
  @param path: Path to check
4266
  @param owner: C{None} or tuple containing UID and GID
4267
  @rtype: tuple; (boolean, string or C{os.stat} result)
4268
  @return: The tuple's first element is the status; if C{False}, the second
4269
    element is an error message string, otherwise it's the result of C{os.stat}
4270

4271
  """
4272
  if owner is None:
4273
    # Default to root as owner
4274
    owner = (0, 0)
4275

    
4276
  try:
4277
    st = os.stat(path)
4278
  except EnvironmentError, err:
4279
    return (False, "Can't stat(2) '%s': %s" % (path, err))
4280

    
4281
  if stat.S_IMODE(st.st_mode) & (~_RCMD_MAX_MODE):
4282
    return (False, "Permissions on '%s' are too permissive" % path)
4283

    
4284
  if (st.st_uid, st.st_gid) != owner:
4285
    (owner_uid, owner_gid) = owner
4286
    return (False, "'%s' is not owned by %s:%s" % (path, owner_uid, owner_gid))
4287

    
4288
  return (True, st)
4289

    
4290

    
4291
def _VerifyRestrictedCmdDirectory(path, _owner=None):
4292
  """Verifies restricted command directory.
4293

4294
  @type path: string
4295
  @param path: Path to check
4296
  @rtype: tuple; (boolean, string or None)
4297
  @return: The tuple's first element is the status; if C{False}, the second
4298
    element is an error message string, otherwise it's C{None}
4299

4300
  """
4301
  (status, value) = _CommonRestrictedCmdCheck(path, _owner)
4302

    
4303
  if not status:
4304
    return (False, value)
4305

    
4306
  if not stat.S_ISDIR(value.st_mode):
4307
    return (False, "Path '%s' is not a directory" % path)
4308

    
4309
  return (True, None)
4310

    
4311

    
4312
def _VerifyRestrictedCmd(path, cmd, _owner=None):
4313
  """Verifies a whole restricted command and returns its executable filename.
4314

4315
  @type path: string
4316
  @param path: Directory containing restricted commands
4317
  @type cmd: string
4318
  @param cmd: Command name
4319
  @rtype: tuple; (boolean, string)
4320
  @return: The tuple's first element is the status; if C{False}, the second
4321
    element is an error message string, otherwise the second element is the
4322
    absolute path to the executable
4323

4324
  """
4325
  executable = utils.PathJoin(path, cmd)
4326

    
4327
  (status, msg) = _CommonRestrictedCmdCheck(executable, _owner)
4328

    
4329
  if not status:
4330
    return (False, msg)
4331

    
4332
  if not utils.IsExecutable(executable):
4333
    return (False, "access(2) thinks '%s' can't be executed" % executable)
4334

    
4335
  return (True, executable)
4336

    
4337

    
4338
def _PrepareRestrictedCmd(path, cmd,
4339
                          _verify_dir=_VerifyRestrictedCmdDirectory,
4340
                          _verify_name=_VerifyRestrictedCmdName,
4341
                          _verify_cmd=_VerifyRestrictedCmd):
4342
  """Performs a number of tests on a restricted command.
4343

4344
  @type path: string
4345
  @param path: Directory containing restricted commands
4346
  @type cmd: string
4347
  @param cmd: Command name
4348
  @return: Same as L{_VerifyRestrictedCmd}
4349

4350
  """
4351
  # Verify the directory first
4352
  (status, msg) = _verify_dir(path)
4353
  if status:
4354
    # Check command if everything was alright
4355
    (status, msg) = _verify_name(cmd)
4356

    
4357
  if not status:
4358
    return (False, msg)
4359

    
4360
  # Check actual executable
4361
  return _verify_cmd(path, cmd)
4362

    
4363

    
4364
def RunRestrictedCmd(cmd,
4365
                     _lock_timeout=_RCMD_LOCK_TIMEOUT,
4366
                     _lock_file=pathutils.RESTRICTED_COMMANDS_LOCK_FILE,
4367
                     _path=pathutils.RESTRICTED_COMMANDS_DIR,
4368
                     _sleep_fn=time.sleep,
4369
                     _prepare_fn=_PrepareRestrictedCmd,
4370
                     _runcmd_fn=utils.RunCmd,
4371
                     _enabled=constants.ENABLE_RESTRICTED_COMMANDS):
4372
  """Executes a restricted command after performing strict tests.
4373

4374
  @type cmd: string
4375
  @param cmd: Command name
4376
  @rtype: string
4377
  @return: Command output
4378
  @raise RPCFail: In case of an error
4379

4380
  """
4381
  logging.info("Preparing to run restricted command '%s'", cmd)
4382

    
4383
  if not _enabled:
4384
    _Fail("Restricted commands disabled at configure time")
4385

    
4386
  lock = None
4387
  try:
4388
    cmdresult = None
4389
    try:
4390
      lock = utils.FileLock.Open(_lock_file)
4391
      lock.Exclusive(blocking=True, timeout=_lock_timeout)
4392

    
4393
      (status, value) = _prepare_fn(_path, cmd)
4394

    
4395
      if status:
4396
        cmdresult = _runcmd_fn([value], env={}, reset_env=True,
4397
                               postfork_fn=lambda _: lock.Unlock())
4398
      else:
4399
        logging.error(value)
4400
    except Exception: # pylint: disable=W0703
4401
      # Keep original error in log
4402
      logging.exception("Caught exception")
4403

    
4404
    if cmdresult is None:
4405
      logging.info("Sleeping for %0.1f seconds before returning",
4406
                   _RCMD_INVALID_DELAY)
4407
      _sleep_fn(_RCMD_INVALID_DELAY)
4408

    
4409
      # Do not include original error message in returned error
4410
      _Fail("Executing command '%s' failed" % cmd)
4411
    elif cmdresult.failed or cmdresult.fail_reason:
4412
      _Fail("Restricted command '%s' failed: %s; output: %s",
4413
            cmd, cmdresult.fail_reason, cmdresult.output)
4414
    else:
4415
      return cmdresult.output
4416
  finally:
4417
    if lock is not None:
4418
      # Release lock at last
4419
      lock.Close()
4420
      lock = None
4421

    
4422

    
4423
def SetWatcherPause(until, _filename=pathutils.WATCHER_PAUSEFILE):
4424
  """Creates or removes the watcher pause file.
4425

4426
  @type until: None or number
4427
  @param until: Unix timestamp saying until when the watcher shouldn't run
4428

4429
  """
4430
  if until is None:
4431
    logging.info("Received request to no longer pause watcher")
4432
    utils.RemoveFile(_filename)
4433
  else:
4434
    logging.info("Received request to pause watcher until %s", until)
4435

    
4436
    if not ht.TNumber(until):
4437
      _Fail("Duration must be numeric")
4438

    
4439
    utils.WriteFile(_filename, data="%d\n" % (until, ), mode=0644)
4440

    
4441

    
4442
def ConfigureOVS(ovs_name, ovs_link):
4443
  """Creates a OpenvSwitch on the node.
4444

4445
  This function sets up a OpenvSwitch on the node with given name nad
4446
  connects it via a given eth device.
4447

4448
  @type ovs_name: string
4449
  @param ovs_name: Name of the OpenvSwitch to create.
4450
  @type ovs_link: None or string
4451
  @param ovs_link: Ethernet device for outside connection (can be missing)
4452

4453
  """
4454
  # Initialize the OpenvSwitch
4455
  result = utils.RunCmd(["ovs-vsctl", "add-br", ovs_name])
4456
  if result.failed:
4457
    _Fail("Failed to create openvswitch. Script return value: %s, output: '%s'"
4458
          % (result.exit_code, result.output), log=True)
4459

    
4460
  # And connect it to a physical interface, if given
4461
  if ovs_link:
4462
    result = utils.RunCmd(["ovs-vsctl", "add-port", ovs_name, ovs_link])
4463
    if result.failed:
4464
      _Fail("Failed to connect openvswitch to  interface %s. Script return"
4465
            " value: %s, output: '%s'" % (ovs_link, result.exit_code,
4466
            result.output), log=True)
4467

    
4468

    
4469
class HooksRunner(object):
4470
  """Hook runner.
4471

4472
  This class is instantiated on the node side (ganeti-noded) and not
4473
  on the master side.
4474

4475
  """
4476
  def __init__(self, hooks_base_dir=None):
4477
    """Constructor for hooks runner.
4478

4479
    @type hooks_base_dir: str or None
4480
    @param hooks_base_dir: if not None, this overrides the
4481
        L{pathutils.HOOKS_BASE_DIR} (useful for unittests)
4482

4483
    """
4484
    if hooks_base_dir is None:
4485
      hooks_base_dir = pathutils.HOOKS_BASE_DIR
4486
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
4487
    # constant
4488
    self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
4489

    
4490
  def RunLocalHooks(self, node_list, hpath, phase, env):
4491
    """Check that the hooks will be run only locally and then run them.
4492

4493
    """
4494
    assert len(node_list) == 1
4495
    node = node_list[0]
4496
    _, myself = ssconf.GetMasterAndMyself()
4497
    assert node == myself
4498

    
4499
    results = self.RunHooks(hpath, phase, env)
4500

    
4501
    # Return values in the form expected by HooksMaster
4502
    return {node: (None, False, results)}
4503

    
4504
  def RunHooks(self, hpath, phase, env):
4505
    """Run the scripts in the hooks directory.
4506

4507
    @type hpath: str
4508
    @param hpath: the path to the hooks directory which
4509
        holds the scripts
4510
    @type phase: str
4511
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
4512
        L{constants.HOOKS_PHASE_POST}
4513
    @type env: dict
4514
    @param env: dictionary with the environment for the hook
4515
    @rtype: list
4516
    @return: list of 3-element tuples:
4517
      - script path
4518
      - script result, either L{constants.HKR_SUCCESS} or
4519
        L{constants.HKR_FAIL}
4520
      - output of the script
4521

4522
    @raise errors.ProgrammerError: for invalid input
4523
        parameters
4524

4525
    """
4526
    if phase == constants.HOOKS_PHASE_PRE:
4527
      suffix = "pre"
4528
    elif phase == constants.HOOKS_PHASE_POST:
4529
      suffix = "post"
4530
    else:
4531
      _Fail("Unknown hooks phase '%s'", phase)
4532

    
4533
    subdir = "%s-%s.d" % (hpath, suffix)
4534
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
4535

    
4536
    results = []
4537

    
4538
    if not os.path.isdir(dir_name):
4539
      # for non-existing/non-dirs, we simply exit instead of logging a
4540
      # warning at every operation
4541
      return results
4542

    
4543
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
4544

    
4545
    for (relname, relstatus, runresult) in runparts_results:
4546
      if relstatus == constants.RUNPARTS_SKIP:
4547
        rrval = constants.HKR_SKIP
4548
        output = ""
4549
      elif relstatus == constants.RUNPARTS_ERR:
4550
        rrval = constants.HKR_FAIL
4551
        output = "Hook script execution error: %s" % runresult
4552
      elif relstatus == constants.RUNPARTS_RUN:
4553
        if runresult.failed:
4554
          rrval = constants.HKR_FAIL
4555
        else:
4556
          rrval = constants.HKR_SUCCESS
4557
        output = utils.SafeEncode(runresult.output.strip())
4558
      results.append(("%s/%s" % (subdir, relname), rrval, output))
4559

    
4560
    return results
4561

    
4562

    
4563
class IAllocatorRunner(object):
4564
  """IAllocator runner.
4565

4566
  This class is instantiated on the node side (ganeti-noded) and not on
4567
  the master side.
4568

4569
  """
4570
  @staticmethod
4571
  def Run(name, idata, ial_params):
4572
    """Run an iallocator script.
4573

4574
    @type name: str
4575
    @param name: the iallocator script name
4576
    @type idata: str
4577
    @param idata: the allocator input data
4578
    @type ial_params: list
4579
    @param ial_params: the iallocator parameters
4580

4581
    @rtype: tuple
4582
    @return: two element tuple of:
4583
       - status
4584
       - either error message or stdout of allocator (for success)
4585

4586
    """
4587
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
4588
                                  os.path.isfile)
4589
    if alloc_script is None:
4590
      _Fail("iallocator module '%s' not found in the search path", name)
4591

    
4592
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
4593
    try:
4594
      os.write(fd, idata)
4595
      os.close(fd)
4596
      result = utils.RunCmd([alloc_script, fin_name] + ial_params)
4597
      if result.failed:
4598
        _Fail("iallocator module '%s' failed: %s, output '%s'",
4599
              name, result.fail_reason, result.output)
4600
    finally:
4601
      os.unlink(fin_name)
4602

    
4603
    return result.stdout
4604

    
4605

    
4606
class DevCacheManager(object):
4607
  """Simple class for managing a cache of block device information.
4608

4609
  """
4610
  _DEV_PREFIX = "/dev/"
4611
  _ROOT_DIR = pathutils.BDEV_CACHE_DIR
4612

    
4613
  @classmethod
4614
  def _ConvertPath(cls, dev_path):
4615
    """Converts a /dev/name path to the cache file name.
4616

4617
    This replaces slashes with underscores and strips the /dev
4618
    prefix. It then returns the full path to the cache file.
4619

4620
    @type dev_path: str
4621
    @param dev_path: the C{/dev/} path name
4622
    @rtype: str
4623
    @return: the converted path name
4624

4625
    """
4626
    if dev_path.startswith(cls._DEV_PREFIX):
4627
      dev_path = dev_path[len(cls._DEV_PREFIX):]
4628
    dev_path = dev_path.replace("/", "_")
4629
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
4630
    return fpath
4631

    
4632
  @classmethod
4633
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
4634
    """Updates the cache information for a given device.
4635

4636
    @type dev_path: str
4637
    @param dev_path: the pathname of the device
4638
    @type owner: str
4639
    @param owner: the owner (instance name) of the device
4640
    @type on_primary: bool
4641
    @param on_primary: whether this is the primary
4642
        node nor not
4643
    @type iv_name: str
4644
    @param iv_name: the instance-visible name of the
4645
        device, as in objects.Disk.iv_name
4646

4647
    @rtype: None
4648

4649
    """
4650
    if dev_path is None:
4651
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
4652
      return
4653
    fpath = cls._ConvertPath(dev_path)
4654
    if on_primary:
4655
      state = "primary"
4656
    else:
4657
      state = "secondary"
4658
    if iv_name is None:
4659
      iv_name = "not_visible"
4660
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
4661
    try:
4662
      utils.WriteFile(fpath, data=fdata)
4663
    except EnvironmentError, err:
4664
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
4665

    
4666
  @classmethod
4667
  def RemoveCache(cls, dev_path):
4668
    """Remove data for a dev_path.
4669

4670
    This is just a wrapper over L{utils.io.RemoveFile} with a converted
4671
    path name and logging.
4672

4673
    @type dev_path: str
4674
    @param dev_path: the pathname of the device
4675

4676
    @rtype: None
4677

4678
    """
4679
    if dev_path is None:
4680
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
4681
      return
4682
    fpath = cls._ConvertPath(dev_path)
4683
    try:
4684
      utils.RemoveFile(fpath)
4685
    except EnvironmentError, err:
4686
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)