Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 1a182390

History | View | Annotate | Download (145.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable=E1103,C0302
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37
# C0302: This module has become too big and should be split up
38

    
39

    
40
import os
41
import os.path
42
import shutil
43
import time
44
import stat
45
import errno
46
import re
47
import random
48
import logging
49
import tempfile
50
import zlib
51
import base64
52
import signal
53

    
54
from ganeti import errors
55
from ganeti import utils
56
from ganeti import ssh
57
from ganeti import hypervisor
58
from ganeti import constants
59
from ganeti.storage import bdev
60
from ganeti.storage import drbd
61
from ganeti.storage import filestorage
62
from ganeti import objects
63
from ganeti import ssconf
64
from ganeti import serializer
65
from ganeti import netutils
66
from ganeti import runtime
67
from ganeti import compat
68
from ganeti import pathutils
69
from ganeti import vcluster
70
from ganeti import ht
71
from ganeti.storage.base import BlockDev
72
from ganeti.storage.drbd import DRBD8
73
from ganeti import hooksmaster
74

    
75

    
76
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
77
_ALLOWED_CLEAN_DIRS = compat.UniqueFrozenset([
78
  pathutils.DATA_DIR,
79
  pathutils.JOB_QUEUE_ARCHIVE_DIR,
80
  pathutils.QUEUE_DIR,
81
  pathutils.CRYPTO_KEYS_DIR,
82
  ])
83
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
84
_X509_KEY_FILE = "key"
85
_X509_CERT_FILE = "cert"
86
_IES_STATUS_FILE = "status"
87
_IES_PID_FILE = "pid"
88
_IES_CA_FILE = "ca"
89

    
90
#: Valid LVS output line regex
91
_LVSLINE_REGEX = re.compile(r"^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$")
92

    
93
# Actions for the master setup script
94
_MASTER_START = "start"
95
_MASTER_STOP = "stop"
96

    
97
#: Maximum file permissions for restricted command directory and executables
98
_RCMD_MAX_MODE = (stat.S_IRWXU |
99
                  stat.S_IRGRP | stat.S_IXGRP |
100
                  stat.S_IROTH | stat.S_IXOTH)
101

    
102
#: Delay before returning an error for restricted commands
103
_RCMD_INVALID_DELAY = 10
104

    
105
#: How long to wait to acquire lock for restricted commands (shorter than
106
#: L{_RCMD_INVALID_DELAY}) to reduce blockage of noded forks when many
107
#: command requests arrive
108
_RCMD_LOCK_TIMEOUT = _RCMD_INVALID_DELAY * 0.8
109

    
110

    
111
class RPCFail(Exception):
112
  """Class denoting RPC failure.
113

114
  Its argument is the error message.
115

116
  """
117

    
118

    
119
def _GetInstReasonFilename(instance_name):
120
  """Path of the file containing the reason of the instance status change.
121

122
  @type instance_name: string
123
  @param instance_name: The name of the instance
124
  @rtype: string
125
  @return: The path of the file
126

127
  """
128
  return utils.PathJoin(pathutils.INSTANCE_REASON_DIR, instance_name)
129

    
130

    
131
def _StoreInstReasonTrail(instance_name, trail):
132
  """Serialize a reason trail related to an instance change of state to file.
133

134
  The exact location of the file depends on the name of the instance and on
135
  the configuration of the Ganeti cluster defined at deploy time.
136

137
  @type instance_name: string
138
  @param instance_name: The name of the instance
139
  @rtype: None
140

141
  """
142
  json = serializer.DumpJson(trail)
143
  filename = _GetInstReasonFilename(instance_name)
144
  utils.WriteFile(filename, data=json)
145

    
146

    
147
def _Fail(msg, *args, **kwargs):
148
  """Log an error and the raise an RPCFail exception.
149

150
  This exception is then handled specially in the ganeti daemon and
151
  turned into a 'failed' return type. As such, this function is a
152
  useful shortcut for logging the error and returning it to the master
153
  daemon.
154

155
  @type msg: string
156
  @param msg: the text of the exception
157
  @raise RPCFail
158

159
  """
160
  if args:
161
    msg = msg % args
162
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
163
    if "exc" in kwargs and kwargs["exc"]:
164
      logging.exception(msg)
165
    else:
166
      logging.error(msg)
167
  raise RPCFail(msg)
168

    
169

    
170
def _GetConfig():
171
  """Simple wrapper to return a SimpleStore.
172

173
  @rtype: L{ssconf.SimpleStore}
174
  @return: a SimpleStore instance
175

176
  """
177
  return ssconf.SimpleStore()
178

    
179

    
180
def _GetSshRunner(cluster_name):
181
  """Simple wrapper to return an SshRunner.
182

183
  @type cluster_name: str
184
  @param cluster_name: the cluster name, which is needed
185
      by the SshRunner constructor
186
  @rtype: L{ssh.SshRunner}
187
  @return: an SshRunner instance
188

189
  """
190
  return ssh.SshRunner(cluster_name)
191

    
192

    
193
def _Decompress(data):
194
  """Unpacks data compressed by the RPC client.
195

196
  @type data: list or tuple
197
  @param data: Data sent by RPC client
198
  @rtype: str
199
  @return: Decompressed data
200

201
  """
202
  assert isinstance(data, (list, tuple))
203
  assert len(data) == 2
204
  (encoding, content) = data
205
  if encoding == constants.RPC_ENCODING_NONE:
206
    return content
207
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
208
    return zlib.decompress(base64.b64decode(content))
209
  else:
210
    raise AssertionError("Unknown data encoding")
211

    
212

    
213
def _CleanDirectory(path, exclude=None):
214
  """Removes all regular files in a directory.
215

216
  @type path: str
217
  @param path: the directory to clean
218
  @type exclude: list
219
  @param exclude: list of files to be excluded, defaults
220
      to the empty list
221

222
  """
223
  if path not in _ALLOWED_CLEAN_DIRS:
224
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
225
          path)
226

    
227
  if not os.path.isdir(path):
228
    return
229
  if exclude is None:
230
    exclude = []
231
  else:
232
    # Normalize excluded paths
233
    exclude = [os.path.normpath(i) for i in exclude]
234

    
235
  for rel_name in utils.ListVisibleFiles(path):
236
    full_name = utils.PathJoin(path, rel_name)
237
    if full_name in exclude:
238
      continue
239
    if os.path.isfile(full_name) and not os.path.islink(full_name):
240
      utils.RemoveFile(full_name)
241

    
242

    
243
def _BuildUploadFileList():
244
  """Build the list of allowed upload files.
245

246
  This is abstracted so that it's built only once at module import time.
247

248
  """
249
  allowed_files = set([
250
    pathutils.CLUSTER_CONF_FILE,
251
    pathutils.ETC_HOSTS,
252
    pathutils.SSH_KNOWN_HOSTS_FILE,
253
    pathutils.VNC_PASSWORD_FILE,
254
    pathutils.RAPI_CERT_FILE,
255
    pathutils.SPICE_CERT_FILE,
256
    pathutils.SPICE_CACERT_FILE,
257
    pathutils.RAPI_USERS_FILE,
258
    pathutils.CONFD_HMAC_KEY,
259
    pathutils.CLUSTER_DOMAIN_SECRET_FILE,
260
    ])
261

    
262
  for hv_name in constants.HYPER_TYPES:
263
    hv_class = hypervisor.GetHypervisorClass(hv_name)
264
    allowed_files.update(hv_class.GetAncillaryFiles()[0])
265

    
266
  assert pathutils.FILE_STORAGE_PATHS_FILE not in allowed_files, \
267
    "Allowed file storage paths should never be uploaded via RPC"
268

    
269
  return frozenset(allowed_files)
270

    
271

    
272
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
273

    
274

    
275
def JobQueuePurge():
276
  """Removes job queue files and archived jobs.
277

278
  @rtype: tuple
279
  @return: True, None
280

281
  """
282
  _CleanDirectory(pathutils.QUEUE_DIR, exclude=[pathutils.JOB_QUEUE_LOCK_FILE])
283
  _CleanDirectory(pathutils.JOB_QUEUE_ARCHIVE_DIR)
284

    
285

    
286
def GetMasterNodeName():
287
  """Returns the master node name.
288

289
  @rtype: string
290
  @return: name of the master node
291
  @raise RPCFail: in case of errors
292

293
  """
294
  try:
295
    return _GetConfig().GetMasterNode()
296
  except errors.ConfigurationError, err:
297
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
298

    
299

    
300
def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
301
  """Decorator that runs hooks before and after the decorated function.
302

303
  @type hook_opcode: string
304
  @param hook_opcode: opcode of the hook
305
  @type hooks_path: string
306
  @param hooks_path: path of the hooks
307
  @type env_builder_fn: function
308
  @param env_builder_fn: function that returns a dictionary containing the
309
    environment variables for the hooks. Will get all the parameters of the
310
    decorated function.
311
  @raise RPCFail: in case of pre-hook failure
312

313
  """
314
  def decorator(fn):
315
    def wrapper(*args, **kwargs):
316
      _, myself = ssconf.GetMasterAndMyself()
317
      nodes = ([myself], [myself])  # these hooks run locally
318

    
319
      env_fn = compat.partial(env_builder_fn, *args, **kwargs)
320

    
321
      cfg = _GetConfig()
322
      hr = HooksRunner()
323
      hm = hooksmaster.HooksMaster(hook_opcode, hooks_path, nodes,
324
                                   hr.RunLocalHooks, None, env_fn,
325
                                   logging.warning, cfg.GetClusterName(),
326
                                   cfg.GetMasterNode())
327
      hm.RunPhase(constants.HOOKS_PHASE_PRE)
328
      result = fn(*args, **kwargs)
329
      hm.RunPhase(constants.HOOKS_PHASE_POST)
330

    
331
      return result
332
    return wrapper
333
  return decorator
334

    
335

    
336
def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
337
  """Builds environment variables for master IP hooks.
338

339
  @type master_params: L{objects.MasterNetworkParameters}
340
  @param master_params: network parameters of the master
341
  @type use_external_mip_script: boolean
342
  @param use_external_mip_script: whether to use an external master IP
343
    address setup script (unused, but necessary per the implementation of the
344
    _RunLocalHooks decorator)
345

346
  """
347
  # pylint: disable=W0613
348
  ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
349
  env = {
350
    "MASTER_NETDEV": master_params.netdev,
351
    "MASTER_IP": master_params.ip,
352
    "MASTER_NETMASK": str(master_params.netmask),
353
    "CLUSTER_IP_VERSION": str(ver),
354
  }
355

    
356
  return env
357

    
358

    
359
def _RunMasterSetupScript(master_params, action, use_external_mip_script):
360
  """Execute the master IP address setup script.
361

362
  @type master_params: L{objects.MasterNetworkParameters}
363
  @param master_params: network parameters of the master
364
  @type action: string
365
  @param action: action to pass to the script. Must be one of
366
    L{backend._MASTER_START} or L{backend._MASTER_STOP}
367
  @type use_external_mip_script: boolean
368
  @param use_external_mip_script: whether to use an external master IP
369
    address setup script
370
  @raise backend.RPCFail: if there are errors during the execution of the
371
    script
372

373
  """
374
  env = _BuildMasterIpEnv(master_params)
375

    
376
  if use_external_mip_script:
377
    setup_script = pathutils.EXTERNAL_MASTER_SETUP_SCRIPT
378
  else:
379
    setup_script = pathutils.DEFAULT_MASTER_SETUP_SCRIPT
380

    
381
  result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
382

    
383
  if result.failed:
384
    _Fail("Failed to %s the master IP. Script return value: %s, output: '%s'" %
385
          (action, result.exit_code, result.output), log=True)
386

    
387

    
388
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
389
               _BuildMasterIpEnv)
390
def ActivateMasterIp(master_params, use_external_mip_script):
391
  """Activate the IP address of the master daemon.
392

393
  @type master_params: L{objects.MasterNetworkParameters}
394
  @param master_params: network parameters of the master
395
  @type use_external_mip_script: boolean
396
  @param use_external_mip_script: whether to use an external master IP
397
    address setup script
398
  @raise RPCFail: in case of errors during the IP startup
399

400
  """
401
  _RunMasterSetupScript(master_params, _MASTER_START,
402
                        use_external_mip_script)
403

    
404

    
405
def StartMasterDaemons(no_voting):
406
  """Activate local node as master node.
407

408
  The function will start the master daemons (ganeti-masterd and ganeti-rapi).
409

410
  @type no_voting: boolean
411
  @param no_voting: whether to start ganeti-masterd without a node vote
412
      but still non-interactively
413
  @rtype: None
414

415
  """
416

    
417
  if no_voting:
418
    masterd_args = "--no-voting --yes-do-it"
419
  else:
420
    masterd_args = ""
421

    
422
  env = {
423
    "EXTRA_MASTERD_ARGS": masterd_args,
424
    }
425

    
426
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
427
  if result.failed:
428
    msg = "Can't start Ganeti master: %s" % result.output
429
    logging.error(msg)
430
    _Fail(msg)
431

    
432

    
433
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
434
               _BuildMasterIpEnv)
435
def DeactivateMasterIp(master_params, use_external_mip_script):
436
  """Deactivate the master IP on this node.
437

438
  @type master_params: L{objects.MasterNetworkParameters}
439
  @param master_params: network parameters of the master
440
  @type use_external_mip_script: boolean
441
  @param use_external_mip_script: whether to use an external master IP
442
    address setup script
443
  @raise RPCFail: in case of errors during the IP turndown
444

445
  """
446
  _RunMasterSetupScript(master_params, _MASTER_STOP,
447
                        use_external_mip_script)
448

    
449

    
450
def StopMasterDaemons():
451
  """Stop the master daemons on this node.
452

453
  Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
454

455
  @rtype: None
456

457
  """
458
  # TODO: log and report back to the caller the error failures; we
459
  # need to decide in which case we fail the RPC for this
460

    
461
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
462
  if result.failed:
463
    logging.error("Could not stop Ganeti master, command %s had exitcode %s"
464
                  " and error %s",
465
                  result.cmd, result.exit_code, result.output)
466

    
467

    
468
def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
469
  """Change the netmask of the master IP.
470

471
  @param old_netmask: the old value of the netmask
472
  @param netmask: the new value of the netmask
473
  @param master_ip: the master IP
474
  @param master_netdev: the master network device
475

476
  """
477
  if old_netmask == netmask:
478
    return
479

    
480
  if not netutils.IPAddress.Own(master_ip):
481
    _Fail("The master IP address is not up, not attempting to change its"
482
          " netmask")
483

    
484
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
485
                         "%s/%s" % (master_ip, netmask),
486
                         "dev", master_netdev, "label",
487
                         "%s:0" % master_netdev])
488
  if result.failed:
489
    _Fail("Could not set the new netmask on the master IP address")
490

    
491
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
492
                         "%s/%s" % (master_ip, old_netmask),
493
                         "dev", master_netdev, "label",
494
                         "%s:0" % master_netdev])
495
  if result.failed:
496
    _Fail("Could not bring down the master IP address with the old netmask")
497

    
498

    
499
def EtcHostsModify(mode, host, ip):
500
  """Modify a host entry in /etc/hosts.
501

502
  @param mode: The mode to operate. Either add or remove entry
503
  @param host: The host to operate on
504
  @param ip: The ip associated with the entry
505

506
  """
507
  if mode == constants.ETC_HOSTS_ADD:
508
    if not ip:
509
      RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
510
              " present")
511
    utils.AddHostToEtcHosts(host, ip)
512
  elif mode == constants.ETC_HOSTS_REMOVE:
513
    if ip:
514
      RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
515
              " parameter is present")
516
    utils.RemoveHostFromEtcHosts(host)
517
  else:
518
    RPCFail("Mode not supported")
519

    
520

    
521
def LeaveCluster(modify_ssh_setup):
522
  """Cleans up and remove the current node.
523

524
  This function cleans up and prepares the current node to be removed
525
  from the cluster.
526

527
  If processing is successful, then it raises an
528
  L{errors.QuitGanetiException} which is used as a special case to
529
  shutdown the node daemon.
530

531
  @param modify_ssh_setup: boolean
532

533
  """
534
  _CleanDirectory(pathutils.DATA_DIR)
535
  _CleanDirectory(pathutils.CRYPTO_KEYS_DIR)
536
  JobQueuePurge()
537

    
538
  if modify_ssh_setup:
539
    try:
540
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
541

    
542
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
543

    
544
      utils.RemoveFile(priv_key)
545
      utils.RemoveFile(pub_key)
546
    except errors.OpExecError:
547
      logging.exception("Error while processing ssh files")
548

    
549
  try:
550
    utils.RemoveFile(pathutils.CONFD_HMAC_KEY)
551
    utils.RemoveFile(pathutils.RAPI_CERT_FILE)
552
    utils.RemoveFile(pathutils.SPICE_CERT_FILE)
553
    utils.RemoveFile(pathutils.SPICE_CACERT_FILE)
554
    utils.RemoveFile(pathutils.NODED_CERT_FILE)
555
  except: # pylint: disable=W0702
556
    logging.exception("Error while removing cluster secrets")
557

    
558
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.CONFD])
559
  if result.failed:
560
    logging.error("Command %s failed with exitcode %s and error %s",
561
                  result.cmd, result.exit_code, result.output)
562

    
563
  # Raise a custom exception (handled in ganeti-noded)
564
  raise errors.QuitGanetiException(True, "Shutdown scheduled")
565

    
566

    
567
def _CheckStorageParams(params, num_params):
568
  """Performs sanity checks for storage parameters.
569

570
  @type params: list
571
  @param params: list of storage parameters
572
  @type num_params: int
573
  @param num_params: expected number of parameters
574

575
  """
576
  if params is None:
577
    raise errors.ProgrammerError("No storage parameters for storage"
578
                                 " reporting is provided.")
579
  if not isinstance(params, list):
580
    raise errors.ProgrammerError("The storage parameters are not of type"
581
                                 " list: '%s'" % params)
582
  if not len(params) == num_params:
583
    raise errors.ProgrammerError("Did not receive the expected number of"
584
                                 "storage parameters: expected %s,"
585
                                 " received '%s'" % (num_params, len(params)))
586

    
587

    
588
def _CheckLvmStorageParams(params):
589
  """Performs sanity check for the 'exclusive storage' flag.
590

591
  @see: C{_CheckStorageParams}
592

593
  """
594
  _CheckStorageParams(params, 1)
595
  excl_stor = params[0]
596
  if not isinstance(params[0], bool):
597
    raise errors.ProgrammerError("Exclusive storage parameter is not"
598
                                 " boolean: '%s'." % excl_stor)
599
  return excl_stor
600

    
601

    
602
def _GetLvmVgSpaceInfo(name, params):
603
  """Wrapper around C{_GetVgInfo} which checks the storage parameters.
604

605
  @type name: string
606
  @param name: name of the volume group
607
  @type params: list
608
  @param params: list of storage parameters, which in this case should be
609
    containing only one for exclusive storage
610

611
  """
612
  excl_stor = _CheckLvmStorageParams(params)
613
  return _GetVgInfo(name, excl_stor)
614

    
615

    
616
def _GetVgInfo(
617
    name, excl_stor, info_fn=bdev.LogicalVolume.GetVGInfo):
618
  """Retrieves information about a LVM volume group.
619

620
  """
621
  # TODO: GetVGInfo supports returning information for multiple VGs at once
622
  vginfo = info_fn([name], excl_stor)
623
  if vginfo:
624
    vg_free = int(round(vginfo[0][0], 0))
625
    vg_size = int(round(vginfo[0][1], 0))
626
  else:
627
    vg_free = None
628
    vg_size = None
629

    
630
  return {
631
    "type": constants.ST_LVM_VG,
632
    "name": name,
633
    "storage_free": vg_free,
634
    "storage_size": vg_size,
635
    }
636

    
637

    
638
def _GetLvmPvSpaceInfo(name, params):
639
  """Wrapper around C{_GetVgSpindlesInfo} with sanity checks.
640

641
  @see: C{_GetLvmVgSpaceInfo}
642

643
  """
644
  excl_stor = _CheckLvmStorageParams(params)
645
  return _GetVgSpindlesInfo(name, excl_stor)
646

    
647

    
648
def _GetVgSpindlesInfo(
649
    name, excl_stor, info_fn=bdev.LogicalVolume.GetVgSpindlesInfo):
650
  """Retrieves information about spindles in an LVM volume group.
651

652
  @type name: string
653
  @param name: VG name
654
  @type excl_stor: bool
655
  @param excl_stor: exclusive storage
656
  @rtype: dict
657
  @return: dictionary whose keys are "name", "vg_free", "vg_size" for VG name,
658
      free spindles, total spindles respectively
659

660
  """
661
  if excl_stor:
662
    (vg_free, vg_size) = info_fn(name)
663
  else:
664
    vg_free = 0
665
    vg_size = 0
666
  return {
667
    "type": constants.ST_LVM_PV,
668
    "name": name,
669
    "storage_free": vg_free,
670
    "storage_size": vg_size,
671
    }
672

    
673

    
674
def _GetHvInfo(name, hvparams, get_hv_fn=hypervisor.GetHypervisor):
675
  """Retrieves node information from a hypervisor.
676

677
  The information returned depends on the hypervisor. Common items:
678

679
    - vg_size is the size of the configured volume group in MiB
680
    - vg_free is the free size of the volume group in MiB
681
    - memory_dom0 is the memory allocated for domain0 in MiB
682
    - memory_free is the currently available (free) ram in MiB
683
    - memory_total is the total number of ram in MiB
684
    - hv_version: the hypervisor version, if available
685

686
  @type hvparams: dict of string
687
  @param hvparams: the hypervisor's hvparams
688

689
  """
690
  return get_hv_fn(name).GetNodeInfo(hvparams=hvparams)
691

    
692

    
693
def _GetHvInfoAll(hv_specs, get_hv_fn=hypervisor.GetHypervisor):
694
  """Retrieves node information for all hypervisors.
695

696
  See C{_GetHvInfo} for information on the output.
697

698
  @type hv_specs: list of pairs (string, dict of strings)
699
  @param hv_specs: list of pairs of a hypervisor's name and its hvparams
700

701
  """
702
  if hv_specs is None:
703
    return None
704

    
705
  result = []
706
  for hvname, hvparams in hv_specs:
707
    result.append(_GetHvInfo(hvname, hvparams, get_hv_fn))
708
  return result
709

    
710

    
711
def _GetNamedNodeInfo(names, fn):
712
  """Calls C{fn} for all names in C{names} and returns a dictionary.
713

714
  @rtype: None or dict
715

716
  """
717
  if names is None:
718
    return None
719
  else:
720
    return map(fn, names)
721

    
722

    
723
def GetNodeInfo(storage_units, hv_specs):
724
  """Gives back a hash with different information about the node.
725

726
  @type storage_units: list of tuples (string, string, list)
727
  @param storage_units: List of tuples (storage unit, identifier, parameters) to
728
    ask for disk space information. In case of lvm-vg, the identifier is
729
    the VG name. The parameters can contain additional, storage-type-specific
730
    parameters, for example exclusive storage for lvm storage.
731
  @type hv_specs: list of pairs (string, dict of strings)
732
  @param hv_specs: list of pairs of a hypervisor's name and its hvparams
733
  @rtype: tuple; (string, None/dict, None/dict)
734
  @return: Tuple containing boot ID, volume group information and hypervisor
735
    information
736

737
  """
738
  bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
739
  storage_info = _GetNamedNodeInfo(
740
    storage_units,
741
    (lambda (storage_type, storage_key, storage_params):
742
        _ApplyStorageInfoFunction(storage_type, storage_key, storage_params)))
743
  hv_info = _GetHvInfoAll(hv_specs)
744
  return (bootid, storage_info, hv_info)
745

    
746

    
747
def _GetFileStorageSpaceInfo(path, params):
748
  """Wrapper around filestorage.GetSpaceInfo.
749

750
  The purpose of this wrapper is to call filestorage.GetFileStorageSpaceInfo
751
  and ignore the *args parameter to not leak it into the filestorage
752
  module's code.
753

754
  @see: C{filestorage.GetFileStorageSpaceInfo} for description of the
755
    parameters.
756

757
  """
758
  _CheckStorageParams(params, 0)
759
  return filestorage.GetFileStorageSpaceInfo(path)
760

    
761

    
762
# FIXME: implement storage reporting for all missing storage types.
763
_STORAGE_TYPE_INFO_FN = {
764
  constants.ST_BLOCK: None,
765
  constants.ST_DISKLESS: None,
766
  constants.ST_EXT: None,
767
  constants.ST_FILE: _GetFileStorageSpaceInfo,
768
  constants.ST_LVM_PV: _GetLvmPvSpaceInfo,
769
  constants.ST_LVM_VG: _GetLvmVgSpaceInfo,
770
  constants.ST_SHARED_FILE: None,
771
  constants.ST_RADOS: None,
772
}
773

    
774

    
775
def _ApplyStorageInfoFunction(storage_type, storage_key, *args):
776
  """Looks up and applies the correct function to calculate free and total
777
  storage for the given storage type.
778

779
  @type storage_type: string
780
  @param storage_type: the storage type for which the storage shall be reported.
781
  @type storage_key: string
782
  @param storage_key: identifier of a storage unit, e.g. the volume group name
783
    of an LVM storage unit
784
  @type args: any
785
  @param args: various parameters that can be used for storage reporting. These
786
    parameters and their semantics vary from storage type to storage type and
787
    are just propagated in this function.
788
  @return: the results of the application of the storage space function (see
789
    _STORAGE_TYPE_INFO_FN) if storage space reporting is implemented for that
790
    storage type
791
  @raises NotImplementedError: for storage types who don't support space
792
    reporting yet
793
  """
794
  fn = _STORAGE_TYPE_INFO_FN[storage_type]
795
  if fn is not None:
796
    return fn(storage_key, *args)
797
  else:
798
    raise NotImplementedError
799

    
800

    
801
def _CheckExclusivePvs(pvi_list):
802
  """Check that PVs are not shared among LVs
803

804
  @type pvi_list: list of L{objects.LvmPvInfo} objects
805
  @param pvi_list: information about the PVs
806

807
  @rtype: list of tuples (string, list of strings)
808
  @return: offending volumes, as tuples: (pv_name, [lv1_name, lv2_name...])
809

810
  """
811
  res = []
812
  for pvi in pvi_list:
813
    if len(pvi.lv_list) > 1:
814
      res.append((pvi.name, pvi.lv_list))
815
  return res
816

    
817

    
818
def _VerifyHypervisors(what, vm_capable, result, all_hvparams,
819
                       get_hv_fn=hypervisor.GetHypervisor):
820
  """Verifies the hypervisor. Appends the results to the 'results' list.
821

822
  @type what: C{dict}
823
  @param what: a dictionary of things to check
824
  @type vm_capable: boolean
825
  @param vm_capable: whether or not this node is vm capable
826
  @type result: dict
827
  @param result: dictionary of verification results; results of the
828
    verifications in this function will be added here
829
  @type all_hvparams: dict of dict of string
830
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
831
  @type get_hv_fn: function
832
  @param get_hv_fn: function to retrieve the hypervisor, to improve testability
833

834
  """
835
  if not vm_capable:
836
    return
837

    
838
  if constants.NV_HYPERVISOR in what:
839
    result[constants.NV_HYPERVISOR] = {}
840
    for hv_name in what[constants.NV_HYPERVISOR]:
841
      hvparams = all_hvparams[hv_name]
842
      try:
843
        val = get_hv_fn(hv_name).Verify(hvparams=hvparams)
844
      except errors.HypervisorError, err:
845
        val = "Error while checking hypervisor: %s" % str(err)
846
      result[constants.NV_HYPERVISOR][hv_name] = val
847

    
848

    
849
def _VerifyHvparams(what, vm_capable, result,
850
                    get_hv_fn=hypervisor.GetHypervisor):
851
  """Verifies the hvparams. Appends the results to the 'results' list.
852

853
  @type what: C{dict}
854
  @param what: a dictionary of things to check
855
  @type vm_capable: boolean
856
  @param vm_capable: whether or not this node is vm capable
857
  @type result: dict
858
  @param result: dictionary of verification results; results of the
859
    verifications in this function will be added here
860
  @type get_hv_fn: function
861
  @param get_hv_fn: function to retrieve the hypervisor, to improve testability
862

863
  """
864
  if not vm_capable:
865
    return
866

    
867
  if constants.NV_HVPARAMS in what:
868
    result[constants.NV_HVPARAMS] = []
869
    for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
870
      try:
871
        logging.info("Validating hv %s, %s", hv_name, hvparms)
872
        get_hv_fn(hv_name).ValidateParameters(hvparms)
873
      except errors.HypervisorError, err:
874
        result[constants.NV_HVPARAMS].append((source, hv_name, str(err)))
875

    
876

    
877
def _VerifyInstanceList(what, vm_capable, result, all_hvparams):
878
  """Verifies the instance list.
879

880
  @type what: C{dict}
881
  @param what: a dictionary of things to check
882
  @type vm_capable: boolean
883
  @param vm_capable: whether or not this node is vm capable
884
  @type result: dict
885
  @param result: dictionary of verification results; results of the
886
    verifications in this function will be added here
887
  @type all_hvparams: dict of dict of string
888
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
889

890
  """
891
  if constants.NV_INSTANCELIST in what and vm_capable:
892
    # GetInstanceList can fail
893
    try:
894
      val = GetInstanceList(what[constants.NV_INSTANCELIST],
895
                            all_hvparams=all_hvparams)
896
    except RPCFail, err:
897
      val = str(err)
898
    result[constants.NV_INSTANCELIST] = val
899

    
900

    
901
def _VerifyNodeInfo(what, vm_capable, result, all_hvparams):
902
  """Verifies the node info.
903

904
  @type what: C{dict}
905
  @param what: a dictionary of things to check
906
  @type vm_capable: boolean
907
  @param vm_capable: whether or not this node is vm capable
908
  @type result: dict
909
  @param result: dictionary of verification results; results of the
910
    verifications in this function will be added here
911
  @type all_hvparams: dict of dict of string
912
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
913

914
  """
915
  if constants.NV_HVINFO in what and vm_capable:
916
    hvname = what[constants.NV_HVINFO]
917
    hyper = hypervisor.GetHypervisor(hvname)
918
    hvparams = all_hvparams[hvname]
919
    result[constants.NV_HVINFO] = hyper.GetNodeInfo(hvparams=hvparams)
920

    
921

    
922
def _VerifyClientCertificate(cert_file=pathutils.NODED_CLIENT_CERT_FILE):
923
  """Verify the existance and validity of the client SSL certificate.
924

925
  """
926
  create_cert_cmd = "gnt-cluster renew-crypto --new-node-certificates"
927
  if not os.path.exists(cert_file):
928
    return (constants.CV_ERROR,
929
            "The client certificate does not exist. Run '%s' to create"
930
            "client certificates for all nodes." % create_cert_cmd)
931

    
932
  (errcode, msg) = utils.VerifyCertificate(cert_file)
933
  if errcode is not None:
934
    return (errcode, msg)
935
  else:
936
    # if everything is fine, we return the digest to be compared to the config
937
    return (None, utils.GetCertificateDigest(cert_filename=cert_file))
938

    
939

    
940
def VerifyNode(what, cluster_name, all_hvparams, node_groups, groups_cfg):
941
  """Verify the status of the local node.
942

943
  Based on the input L{what} parameter, various checks are done on the
944
  local node.
945

946
  If the I{filelist} key is present, this list of
947
  files is checksummed and the file/checksum pairs are returned.
948

949
  If the I{nodelist} key is present, we check that we have
950
  connectivity via ssh with the target nodes (and check the hostname
951
  report).
952

953
  If the I{node-net-test} key is present, we check that we have
954
  connectivity to the given nodes via both primary IP and, if
955
  applicable, secondary IPs.
956

957
  @type what: C{dict}
958
  @param what: a dictionary of things to check:
959
      - filelist: list of files for which to compute checksums
960
      - nodelist: list of nodes we should check ssh communication with
961
      - node-net-test: list of nodes we should check node daemon port
962
        connectivity with
963
      - hypervisor: list with hypervisors to run the verify for
964
  @type cluster_name: string
965
  @param cluster_name: the cluster's name
966
  @type all_hvparams: dict of dict of strings
967
  @param all_hvparams: a dictionary mapping hypervisor names to hvparams
968
  @type node_groups: a dict of strings
969
  @param node_groups: node _names_ mapped to their group uuids (it's enough to
970
      have only those nodes that are in `what["nodelist"]`)
971
  @type groups_cfg: a dict of dict of strings
972
  @param groups_cfg: a dictionary mapping group uuids to their configuration
973
  @rtype: dict
974
  @return: a dictionary with the same keys as the input dict, and
975
      values representing the result of the checks
976

977
  """
978
  result = {}
979
  my_name = netutils.Hostname.GetSysName()
980
  port = netutils.GetDaemonPort(constants.NODED)
981
  vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
982

    
983
  _VerifyHypervisors(what, vm_capable, result, all_hvparams)
984
  _VerifyHvparams(what, vm_capable, result)
985

    
986
  if constants.NV_FILELIST in what:
987
    fingerprints = utils.FingerprintFiles(map(vcluster.LocalizeVirtualPath,
988
                                              what[constants.NV_FILELIST]))
989
    result[constants.NV_FILELIST] = \
990
      dict((vcluster.MakeVirtualPath(key), value)
991
           for (key, value) in fingerprints.items())
992

    
993
  if constants.NV_CLIENT_CERT in what:
994
    result[constants.NV_CLIENT_CERT] = _VerifyClientCertificate()
995

    
996
  if constants.NV_NODELIST in what:
997
    (nodes, bynode) = what[constants.NV_NODELIST]
998

    
999
    # Add nodes from other groups (different for each node)
1000
    try:
1001
      nodes.extend(bynode[my_name])
1002
    except KeyError:
1003
      pass
1004

    
1005
    # Use a random order
1006
    random.shuffle(nodes)
1007

    
1008
    # Try to contact all nodes
1009
    val = {}
1010
    for node in nodes:
1011
      params = groups_cfg.get(node_groups.get(node))
1012
      ssh_port = params["ndparams"].get(constants.ND_SSH_PORT)
1013
      logging.debug("Ssh port %s (None = default) for node %s",
1014
                    str(ssh_port), node)
1015
      success, message = _GetSshRunner(cluster_name). \
1016
                            VerifyNodeHostname(node, ssh_port)
1017
      if not success:
1018
        val[node] = message
1019

    
1020
    result[constants.NV_NODELIST] = val
1021

    
1022
  if constants.NV_NODENETTEST in what:
1023
    result[constants.NV_NODENETTEST] = tmp = {}
1024
    my_pip = my_sip = None
1025
    for name, pip, sip in what[constants.NV_NODENETTEST]:
1026
      if name == my_name:
1027
        my_pip = pip
1028
        my_sip = sip
1029
        break
1030
    if not my_pip:
1031
      tmp[my_name] = ("Can't find my own primary/secondary IP"
1032
                      " in the node list")
1033
    else:
1034
      for name, pip, sip in what[constants.NV_NODENETTEST]:
1035
        fail = []
1036
        if not netutils.TcpPing(pip, port, source=my_pip):
1037
          fail.append("primary")
1038
        if sip != pip:
1039
          if not netutils.TcpPing(sip, port, source=my_sip):
1040
            fail.append("secondary")
1041
        if fail:
1042
          tmp[name] = ("failure using the %s interface(s)" %
1043
                       " and ".join(fail))
1044

    
1045
  if constants.NV_MASTERIP in what:
1046
    # FIXME: add checks on incoming data structures (here and in the
1047
    # rest of the function)
1048
    master_name, master_ip = what[constants.NV_MASTERIP]
1049
    if master_name == my_name:
1050
      source = constants.IP4_ADDRESS_LOCALHOST
1051
    else:
1052
      source = None
1053
    result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
1054
                                                     source=source)
1055

    
1056
  if constants.NV_USERSCRIPTS in what:
1057
    result[constants.NV_USERSCRIPTS] = \
1058
      [script for script in what[constants.NV_USERSCRIPTS]
1059
       if not utils.IsExecutable(script)]
1060

    
1061
  if constants.NV_OOB_PATHS in what:
1062
    result[constants.NV_OOB_PATHS] = tmp = []
1063
    for path in what[constants.NV_OOB_PATHS]:
1064
      try:
1065
        st = os.stat(path)
1066
      except OSError, err:
1067
        tmp.append("error stating out of band helper: %s" % err)
1068
      else:
1069
        if stat.S_ISREG(st.st_mode):
1070
          if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
1071
            tmp.append(None)
1072
          else:
1073
            tmp.append("out of band helper %s is not executable" % path)
1074
        else:
1075
          tmp.append("out of band helper %s is not a file" % path)
1076

    
1077
  if constants.NV_LVLIST in what and vm_capable:
1078
    try:
1079
      val = GetVolumeList(utils.ListVolumeGroups().keys())
1080
    except RPCFail, err:
1081
      val = str(err)
1082
    result[constants.NV_LVLIST] = val
1083

    
1084
  _VerifyInstanceList(what, vm_capable, result, all_hvparams)
1085

    
1086
  if constants.NV_VGLIST in what and vm_capable:
1087
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
1088

    
1089
  if constants.NV_PVLIST in what and vm_capable:
1090
    check_exclusive_pvs = constants.NV_EXCLUSIVEPVS in what
1091
    val = bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
1092
                                       filter_allocatable=False,
1093
                                       include_lvs=check_exclusive_pvs)
1094
    if check_exclusive_pvs:
1095
      result[constants.NV_EXCLUSIVEPVS] = _CheckExclusivePvs(val)
1096
      for pvi in val:
1097
        # Avoid sending useless data on the wire
1098
        pvi.lv_list = []
1099
    result[constants.NV_PVLIST] = map(objects.LvmPvInfo.ToDict, val)
1100

    
1101
  if constants.NV_VERSION in what:
1102
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
1103
                                    constants.RELEASE_VERSION)
1104

    
1105
  _VerifyNodeInfo(what, vm_capable, result, all_hvparams)
1106

    
1107
  if constants.NV_DRBDVERSION in what and vm_capable:
1108
    try:
1109
      drbd_version = DRBD8.GetProcInfo().GetVersionString()
1110
    except errors.BlockDeviceError, err:
1111
      logging.warning("Can't get DRBD version", exc_info=True)
1112
      drbd_version = str(err)
1113
    result[constants.NV_DRBDVERSION] = drbd_version
1114

    
1115
  if constants.NV_DRBDLIST in what and vm_capable:
1116
    try:
1117
      used_minors = drbd.DRBD8.GetUsedDevs()
1118
    except errors.BlockDeviceError, err:
1119
      logging.warning("Can't get used minors list", exc_info=True)
1120
      used_minors = str(err)
1121
    result[constants.NV_DRBDLIST] = used_minors
1122

    
1123
  if constants.NV_DRBDHELPER in what and vm_capable:
1124
    status = True
1125
    try:
1126
      payload = drbd.DRBD8.GetUsermodeHelper()
1127
    except errors.BlockDeviceError, err:
1128
      logging.error("Can't get DRBD usermode helper: %s", str(err))
1129
      status = False
1130
      payload = str(err)
1131
    result[constants.NV_DRBDHELPER] = (status, payload)
1132

    
1133
  if constants.NV_NODESETUP in what:
1134
    result[constants.NV_NODESETUP] = tmpr = []
1135
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
1136
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
1137
                  " under /sys, missing required directories /sys/block"
1138
                  " and /sys/class/net")
1139
    if (not os.path.isdir("/proc/sys") or
1140
        not os.path.isfile("/proc/sysrq-trigger")):
1141
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
1142
                  " under /proc, missing required directory /proc/sys and"
1143
                  " the file /proc/sysrq-trigger")
1144

    
1145
  if constants.NV_TIME in what:
1146
    result[constants.NV_TIME] = utils.SplitTime(time.time())
1147

    
1148
  if constants.NV_OSLIST in what and vm_capable:
1149
    result[constants.NV_OSLIST] = DiagnoseOS()
1150

    
1151
  if constants.NV_BRIDGES in what and vm_capable:
1152
    result[constants.NV_BRIDGES] = [bridge
1153
                                    for bridge in what[constants.NV_BRIDGES]
1154
                                    if not utils.BridgeExists(bridge)]
1155

    
1156
  if what.get(constants.NV_ACCEPTED_STORAGE_PATHS) == my_name:
1157
    result[constants.NV_ACCEPTED_STORAGE_PATHS] = \
1158
        filestorage.ComputeWrongFileStoragePaths()
1159

    
1160
  if what.get(constants.NV_FILE_STORAGE_PATH):
1161
    pathresult = filestorage.CheckFileStoragePath(
1162
        what[constants.NV_FILE_STORAGE_PATH])
1163
    if pathresult:
1164
      result[constants.NV_FILE_STORAGE_PATH] = pathresult
1165

    
1166
  if what.get(constants.NV_SHARED_FILE_STORAGE_PATH):
1167
    pathresult = filestorage.CheckFileStoragePath(
1168
        what[constants.NV_SHARED_FILE_STORAGE_PATH])
1169
    if pathresult:
1170
      result[constants.NV_SHARED_FILE_STORAGE_PATH] = pathresult
1171

    
1172
  return result
1173

    
1174

    
1175
def GetCryptoTokens(token_requests):
1176
  """Perform actions on the node's cryptographic tokens.
1177

1178
  Token types can be 'ssl' or 'ssh'. So far only some actions are implemented
1179
  for 'ssl'. Action 'get' returns the digest of the public client ssl
1180
  certificate. Action 'create' creates a new client certificate and private key
1181
  and also returns the digest of the certificate. The third parameter of a
1182
  token request are optional parameters for the actions, so far only the
1183
  filename is supported.
1184

1185
  @type token_requests: list of tuples of (string, string, dict), where the
1186
    first string is in constants.CRYPTO_TYPES, the second in
1187
    constants.CRYPTO_ACTIONS. The third parameter is a dictionary of string
1188
    to string.
1189
  @param token_requests: list of requests of cryptographic tokens and actions
1190
    to perform on them. The actions come with a dictionary of options.
1191
  @rtype: list of tuples (string, string)
1192
  @return: list of tuples of the token type and the public crypto token
1193

1194
  """
1195
  _VALID_CERT_FILES = [pathutils.NODED_CERT_FILE,
1196
                       pathutils.NODED_CLIENT_CERT_FILE,
1197
                       pathutils.NODED_CLIENT_CERT_FILE_TMP]
1198
  _DEFAULT_CERT_FILE = pathutils.NODED_CLIENT_CERT_FILE
1199
  tokens = []
1200
  for (token_type, action, options) in token_requests:
1201
    if token_type not in constants.CRYPTO_TYPES:
1202
      raise errors.ProgrammerError("Token type '%s' not supported." %
1203
                                   token_type)
1204
    if action not in constants.CRYPTO_ACTIONS:
1205
      raise errors.ProgrammerError("Action '%s' is not supported." %
1206
                                   action)
1207
    if token_type == constants.CRYPTO_TYPE_SSL_DIGEST:
1208
      if action == constants.CRYPTO_ACTION_CREATE:
1209
        cert_filename = None
1210
        if options:
1211
          cert_filename = options.get(constants.CRYPTO_OPTION_CERT_FILE)
1212
        if not cert_filename:
1213
          cert_filename = _DEFAULT_CERT_FILE
1214
        # For security reason, we don't allow arbitrary filenames
1215
        if not cert_filename in _VALID_CERT_FILES:
1216
          raise errors.ProgrammerError(
1217
            "The certificate file name path '%s' is not allowed." %
1218
            cert_filename)
1219
        utils.GenerateNewSslCert(
1220
          True, cert_filename,
1221
          "Create new client SSL certificate in %s." % cert_filename)
1222
        tokens.append((token_type,
1223
                       utils.GetCertificateDigest(
1224
                         cert_filename=cert_filename)))
1225
      elif action == constants.CRYPTO_ACTION_GET:
1226
        tokens.append((token_type,
1227
                       utils.GetCertificateDigest()))
1228
  return tokens
1229

    
1230

    
1231
def GetBlockDevSizes(devices):
1232
  """Return the size of the given block devices
1233

1234
  @type devices: list
1235
  @param devices: list of block device nodes to query
1236
  @rtype: dict
1237
  @return:
1238
    dictionary of all block devices under /dev (key). The value is their
1239
    size in MiB.
1240

1241
    {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
1242

1243
  """
1244
  DEV_PREFIX = "/dev/"
1245
  blockdevs = {}
1246

    
1247
  for devpath in devices:
1248
    if not utils.IsBelowDir(DEV_PREFIX, devpath):
1249
      continue
1250

    
1251
    try:
1252
      st = os.stat(devpath)
1253
    except EnvironmentError, err:
1254
      logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
1255
      continue
1256

    
1257
    if stat.S_ISBLK(st.st_mode):
1258
      result = utils.RunCmd(["blockdev", "--getsize64", devpath])
1259
      if result.failed:
1260
        # We don't want to fail, just do not list this device as available
1261
        logging.warning("Cannot get size for block device %s", devpath)
1262
        continue
1263

    
1264
      size = int(result.stdout) / (1024 * 1024)
1265
      blockdevs[devpath] = size
1266
  return blockdevs
1267

    
1268

    
1269
def GetVolumeList(vg_names):
1270
  """Compute list of logical volumes and their size.
1271

1272
  @type vg_names: list
1273
  @param vg_names: the volume groups whose LVs we should list, or
1274
      empty for all volume groups
1275
  @rtype: dict
1276
  @return:
1277
      dictionary of all partions (key) with value being a tuple of
1278
      their size (in MiB), inactive and online status::
1279

1280
        {'xenvg/test1': ('20.06', True, True)}
1281

1282
      in case of errors, a string is returned with the error
1283
      details.
1284

1285
  """
1286
  lvs = {}
1287
  sep = "|"
1288
  if not vg_names:
1289
    vg_names = []
1290
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
1291
                         "--separator=%s" % sep,
1292
                         "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
1293
  if result.failed:
1294
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
1295

    
1296
  for line in result.stdout.splitlines():
1297
    line = line.strip()
1298
    match = _LVSLINE_REGEX.match(line)
1299
    if not match:
1300
      logging.error("Invalid line returned from lvs output: '%s'", line)
1301
      continue
1302
    vg_name, name, size, attr = match.groups()
1303
    inactive = attr[4] == "-"
1304
    online = attr[5] == "o"
1305
    virtual = attr[0] == "v"
1306
    if virtual:
1307
      # we don't want to report such volumes as existing, since they
1308
      # don't really hold data
1309
      continue
1310
    lvs[vg_name + "/" + name] = (size, inactive, online)
1311

    
1312
  return lvs
1313

    
1314

    
1315
def ListVolumeGroups():
1316
  """List the volume groups and their size.
1317

1318
  @rtype: dict
1319
  @return: dictionary with keys volume name and values the
1320
      size of the volume
1321

1322
  """
1323
  return utils.ListVolumeGroups()
1324

    
1325

    
1326
def NodeVolumes():
1327
  """List all volumes on this node.
1328

1329
  @rtype: list
1330
  @return:
1331
    A list of dictionaries, each having four keys:
1332
      - name: the logical volume name,
1333
      - size: the size of the logical volume
1334
      - dev: the physical device on which the LV lives
1335
      - vg: the volume group to which it belongs
1336

1337
    In case of errors, we return an empty list and log the
1338
    error.
1339

1340
    Note that since a logical volume can live on multiple physical
1341
    volumes, the resulting list might include a logical volume
1342
    multiple times.
1343

1344
  """
1345
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
1346
                         "--separator=|",
1347
                         "--options=lv_name,lv_size,devices,vg_name"])
1348
  if result.failed:
1349
    _Fail("Failed to list logical volumes, lvs output: %s",
1350
          result.output)
1351

    
1352
  def parse_dev(dev):
1353
    return dev.split("(")[0]
1354

    
1355
  def handle_dev(dev):
1356
    return [parse_dev(x) for x in dev.split(",")]
1357

    
1358
  def map_line(line):
1359
    line = [v.strip() for v in line]
1360
    return [{"name": line[0], "size": line[1],
1361
             "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
1362

    
1363
  all_devs = []
1364
  for line in result.stdout.splitlines():
1365
    if line.count("|") >= 3:
1366
      all_devs.extend(map_line(line.split("|")))
1367
    else:
1368
      logging.warning("Strange line in the output from lvs: '%s'", line)
1369
  return all_devs
1370

    
1371

    
1372
def BridgesExist(bridges_list):
1373
  """Check if a list of bridges exist on the current node.
1374

1375
  @rtype: boolean
1376
  @return: C{True} if all of them exist, C{False} otherwise
1377

1378
  """
1379
  missing = []
1380
  for bridge in bridges_list:
1381
    if not utils.BridgeExists(bridge):
1382
      missing.append(bridge)
1383

    
1384
  if missing:
1385
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
1386

    
1387

    
1388
def GetInstanceListForHypervisor(hname, hvparams=None,
1389
                                 get_hv_fn=hypervisor.GetHypervisor):
1390
  """Provides a list of instances of the given hypervisor.
1391

1392
  @type hname: string
1393
  @param hname: name of the hypervisor
1394
  @type hvparams: dict of strings
1395
  @param hvparams: hypervisor parameters for the given hypervisor
1396
  @type get_hv_fn: function
1397
  @param get_hv_fn: function that returns a hypervisor for the given hypervisor
1398
    name; optional parameter to increase testability
1399

1400
  @rtype: list
1401
  @return: a list of all running instances on the current node
1402
    - instance1.example.com
1403
    - instance2.example.com
1404

1405
  """
1406
  results = []
1407
  try:
1408
    hv = get_hv_fn(hname)
1409
    names = hv.ListInstances(hvparams=hvparams)
1410
    results.extend(names)
1411
  except errors.HypervisorError, err:
1412
    _Fail("Error enumerating instances (hypervisor %s): %s",
1413
          hname, err, exc=True)
1414
  return results
1415

    
1416

    
1417
def GetInstanceList(hypervisor_list, all_hvparams=None,
1418
                    get_hv_fn=hypervisor.GetHypervisor):
1419
  """Provides a list of instances.
1420

1421
  @type hypervisor_list: list
1422
  @param hypervisor_list: the list of hypervisors to query information
1423
  @type all_hvparams: dict of dict of strings
1424
  @param all_hvparams: a dictionary mapping hypervisor types to respective
1425
    cluster-wide hypervisor parameters
1426
  @type get_hv_fn: function
1427
  @param get_hv_fn: function that returns a hypervisor for the given hypervisor
1428
    name; optional parameter to increase testability
1429

1430
  @rtype: list
1431
  @return: a list of all running instances on the current node
1432
    - instance1.example.com
1433
    - instance2.example.com
1434

1435
  """
1436
  results = []
1437
  for hname in hypervisor_list:
1438
    hvparams = all_hvparams[hname]
1439
    results.extend(GetInstanceListForHypervisor(hname, hvparams=hvparams,
1440
                                                get_hv_fn=get_hv_fn))
1441
  return results
1442

    
1443

    
1444
def GetInstanceInfo(instance, hname, hvparams=None):
1445
  """Gives back the information about an instance as a dictionary.
1446

1447
  @type instance: string
1448
  @param instance: the instance name
1449
  @type hname: string
1450
  @param hname: the hypervisor type of the instance
1451
  @type hvparams: dict of strings
1452
  @param hvparams: the instance's hvparams
1453

1454
  @rtype: dict
1455
  @return: dictionary with the following keys:
1456
      - memory: memory size of instance (int)
1457
      - state: state of instance (HvInstanceState)
1458
      - time: cpu time of instance (float)
1459
      - vcpus: the number of vcpus (int)
1460

1461
  """
1462
  output = {}
1463

    
1464
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance,
1465
                                                          hvparams=hvparams)
1466
  if iinfo is not None:
1467
    output["memory"] = iinfo[2]
1468
    output["vcpus"] = iinfo[3]
1469
    output["state"] = iinfo[4]
1470
    output["time"] = iinfo[5]
1471

    
1472
  return output
1473

    
1474

    
1475
def GetInstanceMigratable(instance):
1476
  """Computes whether an instance can be migrated.
1477

1478
  @type instance: L{objects.Instance}
1479
  @param instance: object representing the instance to be checked.
1480

1481
  @rtype: tuple
1482
  @return: tuple of (result, description) where:
1483
      - result: whether the instance can be migrated or not
1484
      - description: a description of the issue, if relevant
1485

1486
  """
1487
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1488
  iname = instance.name
1489
  if iname not in hyper.ListInstances(instance.hvparams):
1490
    _Fail("Instance %s is not running", iname)
1491

    
1492
  for idx in range(len(instance.disks)):
1493
    link_name = _GetBlockDevSymlinkPath(iname, idx)
1494
    if not os.path.islink(link_name):
1495
      logging.warning("Instance %s is missing symlink %s for disk %d",
1496
                      iname, link_name, idx)
1497

    
1498

    
1499
def GetAllInstancesInfo(hypervisor_list, all_hvparams):
1500
  """Gather data about all instances.
1501

1502
  This is the equivalent of L{GetInstanceInfo}, except that it
1503
  computes data for all instances at once, thus being faster if one
1504
  needs data about more than one instance.
1505

1506
  @type hypervisor_list: list
1507
  @param hypervisor_list: list of hypervisors to query for instance data
1508
  @type all_hvparams: dict of dict of strings
1509
  @param all_hvparams: mapping of hypervisor names to hvparams
1510

1511
  @rtype: dict
1512
  @return: dictionary of instance: data, with data having the following keys:
1513
      - memory: memory size of instance (int)
1514
      - state: xen state of instance (string)
1515
      - time: cpu time of instance (float)
1516
      - vcpus: the number of vcpus
1517

1518
  """
1519
  output = {}
1520
  for hname in hypervisor_list:
1521
    hvparams = all_hvparams[hname]
1522
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo(hvparams)
1523
    if iinfo:
1524
      for name, _, memory, vcpus, state, times in iinfo:
1525
        value = {
1526
          "memory": memory,
1527
          "vcpus": vcpus,
1528
          "state": state,
1529
          "time": times,
1530
          }
1531
        if name in output:
1532
          # we only check static parameters, like memory and vcpus,
1533
          # and not state and time which can change between the
1534
          # invocations of the different hypervisors
1535
          for key in "memory", "vcpus":
1536
            if value[key] != output[name][key]:
1537
              _Fail("Instance %s is running twice"
1538
                    " with different parameters", name)
1539
        output[name] = value
1540

    
1541
  return output
1542

    
1543

    
1544
def GetInstanceConsoleInfo(instance_param_dict,
1545
                           get_hv_fn=hypervisor.GetHypervisor):
1546
  """Gather data about the console access of a set of instances of this node.
1547

1548
  This function assumes that the caller already knows which instances are on
1549
  this node, by calling a function such as L{GetAllInstancesInfo} or
1550
  L{GetInstanceList}.
1551

1552
  For every instance, a large amount of configuration data needs to be
1553
  provided to the hypervisor interface in order to receive the console
1554
  information. Whether this could or should be cut down can be discussed.
1555
  The information is provided in a dictionary indexed by instance name,
1556
  allowing any number of instance queries to be done.
1557

1558
  @type instance_param_dict: dict of string to tuple of dictionaries, where the
1559
    dictionaries represent: L{objects.Instance}, L{objects.Node},
1560
    L{objects.NodeGroup}, HvParams, BeParams
1561
  @param instance_param_dict: mapping of instance name to parameters necessary
1562
    for console information retrieval
1563

1564
  @rtype: dict
1565
  @return: dictionary of instance: data, with data having the following keys:
1566
      - instance: instance name
1567
      - kind: console kind
1568
      - message: used with kind == CONS_MESSAGE, indicates console to be
1569
                 unavailable, supplies error message
1570
      - host: host to connect to
1571
      - port: port to use
1572
      - user: user for login
1573
      - command: the command, broken into parts as an array
1574
      - display: unknown, potentially unused?
1575

1576
  """
1577

    
1578
  output = {}
1579
  for inst_name in instance_param_dict:
1580
    instance = instance_param_dict[inst_name]["instance"]
1581
    pnode = instance_param_dict[inst_name]["node"]
1582
    group = instance_param_dict[inst_name]["group"]
1583
    hvparams = instance_param_dict[inst_name]["hvParams"]
1584
    beparams = instance_param_dict[inst_name]["beParams"]
1585

    
1586
    instance = objects.Instance.FromDict(instance)
1587
    pnode = objects.Node.FromDict(pnode)
1588
    group = objects.NodeGroup.FromDict(group)
1589

    
1590
    h = get_hv_fn(instance.hypervisor)
1591
    output[inst_name] = h.GetInstanceConsole(instance, pnode, group,
1592
                                             hvparams, beparams).ToDict()
1593

    
1594
  return output
1595

    
1596

    
1597
def _InstanceLogName(kind, os_name, instance, component):
1598
  """Compute the OS log filename for a given instance and operation.
1599

1600
  The instance name and os name are passed in as strings since not all
1601
  operations have these as part of an instance object.
1602

1603
  @type kind: string
1604
  @param kind: the operation type (e.g. add, import, etc.)
1605
  @type os_name: string
1606
  @param os_name: the os name
1607
  @type instance: string
1608
  @param instance: the name of the instance being imported/added/etc.
1609
  @type component: string or None
1610
  @param component: the name of the component of the instance being
1611
      transferred
1612

1613
  """
1614
  # TODO: Use tempfile.mkstemp to create unique filename
1615
  if component:
1616
    assert "/" not in component
1617
    c_msg = "-%s" % component
1618
  else:
1619
    c_msg = ""
1620
  base = ("%s-%s-%s%s-%s.log" %
1621
          (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1622
  return utils.PathJoin(pathutils.LOG_OS_DIR, base)
1623

    
1624

    
1625
def InstanceOsAdd(instance, reinstall, debug):
1626
  """Add an OS to an instance.
1627

1628
  @type instance: L{objects.Instance}
1629
  @param instance: Instance whose OS is to be installed
1630
  @type reinstall: boolean
1631
  @param reinstall: whether this is an instance reinstall
1632
  @type debug: integer
1633
  @param debug: debug level, passed to the OS scripts
1634
  @rtype: None
1635

1636
  """
1637
  inst_os = OSFromDisk(instance.os)
1638

    
1639
  create_env = OSEnvironment(instance, inst_os, debug)
1640
  if reinstall:
1641
    create_env["INSTANCE_REINSTALL"] = "1"
1642

    
1643
  logfile = _InstanceLogName("add", instance.os, instance.name, None)
1644

    
1645
  result = utils.RunCmd([inst_os.create_script], env=create_env,
1646
                        cwd=inst_os.path, output=logfile, reset_env=True)
1647
  if result.failed:
1648
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
1649
                  " output: %s", result.cmd, result.fail_reason, logfile,
1650
                  result.output)
1651
    lines = [utils.SafeEncode(val)
1652
             for val in utils.TailFile(logfile, lines=20)]
1653
    _Fail("OS create script failed (%s), last lines in the"
1654
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1655

    
1656

    
1657
def RunRenameInstance(instance, old_name, debug):
1658
  """Run the OS rename script for an instance.
1659

1660
  @type instance: L{objects.Instance}
1661
  @param instance: Instance whose OS is to be installed
1662
  @type old_name: string
1663
  @param old_name: previous instance name
1664
  @type debug: integer
1665
  @param debug: debug level, passed to the OS scripts
1666
  @rtype: boolean
1667
  @return: the success of the operation
1668

1669
  """
1670
  inst_os = OSFromDisk(instance.os)
1671

    
1672
  rename_env = OSEnvironment(instance, inst_os, debug)
1673
  rename_env["OLD_INSTANCE_NAME"] = old_name
1674

    
1675
  logfile = _InstanceLogName("rename", instance.os,
1676
                             "%s-%s" % (old_name, instance.name), None)
1677

    
1678
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1679
                        cwd=inst_os.path, output=logfile, reset_env=True)
1680

    
1681
  if result.failed:
1682
    logging.error("os create command '%s' returned error: %s output: %s",
1683
                  result.cmd, result.fail_reason, result.output)
1684
    lines = [utils.SafeEncode(val)
1685
             for val in utils.TailFile(logfile, lines=20)]
1686
    _Fail("OS rename script failed (%s), last lines in the"
1687
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1688

    
1689

    
1690
def _GetBlockDevSymlinkPath(instance_name, idx, _dir=None):
1691
  """Returns symlink path for block device.
1692

1693
  """
1694
  if _dir is None:
1695
    _dir = pathutils.DISK_LINKS_DIR
1696

    
1697
  return utils.PathJoin(_dir,
1698
                        ("%s%s%s" %
1699
                         (instance_name, constants.DISK_SEPARATOR, idx)))
1700

    
1701

    
1702
def _SymlinkBlockDev(instance_name, device_path, idx):
1703
  """Set up symlinks to a instance's block device.
1704

1705
  This is an auxiliary function run when an instance is start (on the primary
1706
  node) or when an instance is migrated (on the target node).
1707

1708

1709
  @param instance_name: the name of the target instance
1710
  @param device_path: path of the physical block device, on the node
1711
  @param idx: the disk index
1712
  @return: absolute path to the disk's symlink
1713

1714
  """
1715
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1716
  try:
1717
    os.symlink(device_path, link_name)
1718
  except OSError, err:
1719
    if err.errno == errno.EEXIST:
1720
      if (not os.path.islink(link_name) or
1721
          os.readlink(link_name) != device_path):
1722
        os.remove(link_name)
1723
        os.symlink(device_path, link_name)
1724
    else:
1725
      raise
1726

    
1727
  return link_name
1728

    
1729

    
1730
def _RemoveBlockDevLinks(instance_name, disks):
1731
  """Remove the block device symlinks belonging to the given instance.
1732

1733
  """
1734
  for idx, _ in enumerate(disks):
1735
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1736
    if os.path.islink(link_name):
1737
      try:
1738
        os.remove(link_name)
1739
      except OSError:
1740
        logging.exception("Can't remove symlink '%s'", link_name)
1741

    
1742

    
1743
def _CalculateDeviceURI(instance, disk, device):
1744
  """Get the URI for the device.
1745

1746
  @type instance: L{objects.Instance}
1747
  @param instance: the instance which disk belongs to
1748
  @type disk: L{objects.Disk}
1749
  @param disk: the target disk object
1750
  @type device: L{bdev.BlockDev}
1751
  @param device: the corresponding BlockDevice
1752
  @rtype: string
1753
  @return: the device uri if any else None
1754

1755
  """
1756
  access_mode = disk.params.get(constants.LDP_ACCESS,
1757
                                constants.DISK_KERNELSPACE)
1758
  if access_mode == constants.DISK_USERSPACE:
1759
    # This can raise errors.BlockDeviceError
1760
    return device.GetUserspaceAccessUri(instance.hypervisor)
1761
  else:
1762
    return None
1763

    
1764

    
1765
def _GatherAndLinkBlockDevs(instance):
1766
  """Set up an instance's block device(s).
1767

1768
  This is run on the primary node at instance startup. The block
1769
  devices must be already assembled.
1770

1771
  @type instance: L{objects.Instance}
1772
  @param instance: the instance whose disks we should assemble
1773
  @rtype: list
1774
  @return: list of (disk_object, link_name, drive_uri)
1775

1776
  """
1777
  block_devices = []
1778
  for idx, disk in enumerate(instance.disks):
1779
    device = _RecursiveFindBD(disk)
1780
    if device is None:
1781
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1782
                                    str(disk))
1783
    device.Open()
1784
    try:
1785
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1786
    except OSError, e:
1787
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1788
                                    e.strerror)
1789
    uri = _CalculateDeviceURI(instance, disk, device)
1790

    
1791
    block_devices.append((disk, link_name, uri))
1792

    
1793
  return block_devices
1794

    
1795

    
1796
def StartInstance(instance, startup_paused, reason, store_reason=True):
1797
  """Start an instance.
1798

1799
  @type instance: L{objects.Instance}
1800
  @param instance: the instance object
1801
  @type startup_paused: bool
1802
  @param instance: pause instance at startup?
1803
  @type reason: list of reasons
1804
  @param reason: the reason trail for this startup
1805
  @type store_reason: boolean
1806
  @param store_reason: whether to store the shutdown reason trail on file
1807
  @rtype: None
1808

1809
  """
1810
  running_instances = GetInstanceListForHypervisor(instance.hypervisor,
1811
                                                   instance.hvparams)
1812

    
1813
  if instance.name in running_instances:
1814
    logging.info("Instance %s already running, not starting", instance.name)
1815
    return
1816

    
1817
  try:
1818
    block_devices = _GatherAndLinkBlockDevs(instance)
1819
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1820
    hyper.StartInstance(instance, block_devices, startup_paused)
1821
    if store_reason:
1822
      _StoreInstReasonTrail(instance.name, reason)
1823
  except errors.BlockDeviceError, err:
1824
    _Fail("Block device error: %s", err, exc=True)
1825
  except errors.HypervisorError, err:
1826
    _RemoveBlockDevLinks(instance.name, instance.disks)
1827
    _Fail("Hypervisor error: %s", err, exc=True)
1828

    
1829

    
1830
def InstanceShutdown(instance, timeout, reason, store_reason=True):
1831
  """Shut an instance down.
1832

1833
  @note: this functions uses polling with a hardcoded timeout.
1834

1835
  @type instance: L{objects.Instance}
1836
  @param instance: the instance object
1837
  @type timeout: integer
1838
  @param timeout: maximum timeout for soft shutdown
1839
  @type reason: list of reasons
1840
  @param reason: the reason trail for this shutdown
1841
  @type store_reason: boolean
1842
  @param store_reason: whether to store the shutdown reason trail on file
1843
  @rtype: None
1844

1845
  """
1846
  hv_name = instance.hypervisor
1847
  hyper = hypervisor.GetHypervisor(hv_name)
1848
  iname = instance.name
1849

    
1850
  if instance.name not in hyper.ListInstances(instance.hvparams):
1851
    logging.info("Instance %s not running, doing nothing", iname)
1852
    return
1853

    
1854
  class _TryShutdown:
1855
    def __init__(self):
1856
      self.tried_once = False
1857

    
1858
    def __call__(self):
1859
      if iname not in hyper.ListInstances(instance.hvparams):
1860
        return
1861

    
1862
      try:
1863
        hyper.StopInstance(instance, retry=self.tried_once)
1864
        if store_reason:
1865
          _StoreInstReasonTrail(instance.name, reason)
1866
      except errors.HypervisorError, err:
1867
        if iname not in hyper.ListInstances(instance.hvparams):
1868
          # if the instance is no longer existing, consider this a
1869
          # success and go to cleanup
1870
          return
1871

    
1872
        _Fail("Failed to stop instance %s: %s", iname, err)
1873

    
1874
      self.tried_once = True
1875

    
1876
      raise utils.RetryAgain()
1877

    
1878
  try:
1879
    utils.Retry(_TryShutdown(), 5, timeout)
1880
  except utils.RetryTimeout:
1881
    # the shutdown did not succeed
1882
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1883

    
1884
    try:
1885
      hyper.StopInstance(instance, force=True)
1886
    except errors.HypervisorError, err:
1887
      if iname in hyper.ListInstances(instance.hvparams):
1888
        # only raise an error if the instance still exists, otherwise
1889
        # the error could simply be "instance ... unknown"!
1890
        _Fail("Failed to force stop instance %s: %s", iname, err)
1891

    
1892
    time.sleep(1)
1893

    
1894
    if iname in hyper.ListInstances(instance.hvparams):
1895
      _Fail("Could not shutdown instance %s even by destroy", iname)
1896

    
1897
  try:
1898
    hyper.CleanupInstance(instance.name)
1899
  except errors.HypervisorError, err:
1900
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1901

    
1902
  _RemoveBlockDevLinks(iname, instance.disks)
1903

    
1904

    
1905
def InstanceReboot(instance, reboot_type, shutdown_timeout, reason):
1906
  """Reboot an instance.
1907

1908
  @type instance: L{objects.Instance}
1909
  @param instance: the instance object to reboot
1910
  @type reboot_type: str
1911
  @param reboot_type: the type of reboot, one the following
1912
    constants:
1913
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1914
        instance OS, do not recreate the VM
1915
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1916
        restart the VM (at the hypervisor level)
1917
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1918
        not accepted here, since that mode is handled differently, in
1919
        cmdlib, and translates into full stop and start of the
1920
        instance (instead of a call_instance_reboot RPC)
1921
  @type shutdown_timeout: integer
1922
  @param shutdown_timeout: maximum timeout for soft shutdown
1923
  @type reason: list of reasons
1924
  @param reason: the reason trail for this reboot
1925
  @rtype: None
1926

1927
  """
1928
  running_instances = GetInstanceListForHypervisor(instance.hypervisor,
1929
                                                   instance.hvparams)
1930

    
1931
  if instance.name not in running_instances:
1932
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1933

    
1934
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1935
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1936
    try:
1937
      hyper.RebootInstance(instance)
1938
    except errors.HypervisorError, err:
1939
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1940
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1941
    try:
1942
      InstanceShutdown(instance, shutdown_timeout, reason, store_reason=False)
1943
      result = StartInstance(instance, False, reason, store_reason=False)
1944
      _StoreInstReasonTrail(instance.name, reason)
1945
      return result
1946
    except errors.HypervisorError, err:
1947
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1948
  else:
1949
    _Fail("Invalid reboot_type received: %s", reboot_type)
1950

    
1951

    
1952
def InstanceBalloonMemory(instance, memory):
1953
  """Resize an instance's memory.
1954

1955
  @type instance: L{objects.Instance}
1956
  @param instance: the instance object
1957
  @type memory: int
1958
  @param memory: new memory amount in MB
1959
  @rtype: None
1960

1961
  """
1962
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1963
  running = hyper.ListInstances(instance.hvparams)
1964
  if instance.name not in running:
1965
    logging.info("Instance %s is not running, cannot balloon", instance.name)
1966
    return
1967
  try:
1968
    hyper.BalloonInstanceMemory(instance, memory)
1969
  except errors.HypervisorError, err:
1970
    _Fail("Failed to balloon instance memory: %s", err, exc=True)
1971

    
1972

    
1973
def MigrationInfo(instance):
1974
  """Gather information about an instance to be migrated.
1975

1976
  @type instance: L{objects.Instance}
1977
  @param instance: the instance definition
1978

1979
  """
1980
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1981
  try:
1982
    info = hyper.MigrationInfo(instance)
1983
  except errors.HypervisorError, err:
1984
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1985
  return info
1986

    
1987

    
1988
def AcceptInstance(instance, info, target):
1989
  """Prepare the node to accept an instance.
1990

1991
  @type instance: L{objects.Instance}
1992
  @param instance: the instance definition
1993
  @type info: string/data (opaque)
1994
  @param info: migration information, from the source node
1995
  @type target: string
1996
  @param target: target host (usually ip), on this node
1997

1998
  """
1999
  # TODO: why is this required only for DTS_EXT_MIRROR?
2000
  if instance.disk_template in constants.DTS_EXT_MIRROR:
2001
    # Create the symlinks, as the disks are not active
2002
    # in any way
2003
    try:
2004
      _GatherAndLinkBlockDevs(instance)
2005
    except errors.BlockDeviceError, err:
2006
      _Fail("Block device error: %s", err, exc=True)
2007

    
2008
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2009
  try:
2010
    hyper.AcceptInstance(instance, info, target)
2011
  except errors.HypervisorError, err:
2012
    if instance.disk_template in constants.DTS_EXT_MIRROR:
2013
      _RemoveBlockDevLinks(instance.name, instance.disks)
2014
    _Fail("Failed to accept instance: %s", err, exc=True)
2015

    
2016

    
2017
def FinalizeMigrationDst(instance, info, success):
2018
  """Finalize any preparation to accept an instance.
2019

2020
  @type instance: L{objects.Instance}
2021
  @param instance: the instance definition
2022
  @type info: string/data (opaque)
2023
  @param info: migration information, from the source node
2024
  @type success: boolean
2025
  @param success: whether the migration was a success or a failure
2026

2027
  """
2028
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2029
  try:
2030
    hyper.FinalizeMigrationDst(instance, info, success)
2031
  except errors.HypervisorError, err:
2032
    _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
2033

    
2034

    
2035
def MigrateInstance(cluster_name, instance, target, live):
2036
  """Migrates an instance to another node.
2037

2038
  @type cluster_name: string
2039
  @param cluster_name: name of the cluster
2040
  @type instance: L{objects.Instance}
2041
  @param instance: the instance definition
2042
  @type target: string
2043
  @param target: the target node name
2044
  @type live: boolean
2045
  @param live: whether the migration should be done live or not (the
2046
      interpretation of this parameter is left to the hypervisor)
2047
  @raise RPCFail: if migration fails for some reason
2048

2049
  """
2050
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2051

    
2052
  try:
2053
    hyper.MigrateInstance(cluster_name, instance, target, live)
2054
  except errors.HypervisorError, err:
2055
    _Fail("Failed to migrate instance: %s", err, exc=True)
2056

    
2057

    
2058
def FinalizeMigrationSource(instance, success, live):
2059
  """Finalize the instance migration on the source node.
2060

2061
  @type instance: L{objects.Instance}
2062
  @param instance: the instance definition of the migrated instance
2063
  @type success: bool
2064
  @param success: whether the migration succeeded or not
2065
  @type live: bool
2066
  @param live: whether the user requested a live migration or not
2067
  @raise RPCFail: If the execution fails for some reason
2068

2069
  """
2070
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2071

    
2072
  try:
2073
    hyper.FinalizeMigrationSource(instance, success, live)
2074
  except Exception, err:  # pylint: disable=W0703
2075
    _Fail("Failed to finalize the migration on the source node: %s", err,
2076
          exc=True)
2077

    
2078

    
2079
def GetMigrationStatus(instance):
2080
  """Get the migration status
2081

2082
  @type instance: L{objects.Instance}
2083
  @param instance: the instance that is being migrated
2084
  @rtype: L{objects.MigrationStatus}
2085
  @return: the status of the current migration (one of
2086
           L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
2087
           progress info that can be retrieved from the hypervisor
2088
  @raise RPCFail: If the migration status cannot be retrieved
2089

2090
  """
2091
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2092
  try:
2093
    return hyper.GetMigrationStatus(instance)
2094
  except Exception, err:  # pylint: disable=W0703
2095
    _Fail("Failed to get migration status: %s", err, exc=True)
2096

    
2097

    
2098
def HotplugDevice(instance, action, dev_type, device, extra, seq):
2099
  """Hotplug a device
2100

2101
  Hotplug is currently supported only for KVM Hypervisor.
2102
  @type instance: L{objects.Instance}
2103
  @param instance: the instance to which we hotplug a device
2104
  @type action: string
2105
  @param action: the hotplug action to perform
2106
  @type dev_type: string
2107
  @param dev_type: the device type to hotplug
2108
  @type device: either L{objects.NIC} or L{objects.Disk}
2109
  @param device: the device object to hotplug
2110
  @type extra: string
2111
  @param extra: extra info used by hotplug code (e.g. disk link)
2112
  @type seq: int
2113
  @param seq: the index of the device from master perspective
2114
  @raise RPCFail: in case instance does not have KVM hypervisor
2115

2116
  """
2117
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2118
  try:
2119
    hyper.VerifyHotplugSupport(instance, action, dev_type)
2120
  except errors.HotplugError, err:
2121
    _Fail("Hotplug is not supported: %s", err)
2122

    
2123
  if action == constants.HOTPLUG_ACTION_ADD:
2124
    fn = hyper.HotAddDevice
2125
  elif action == constants.HOTPLUG_ACTION_REMOVE:
2126
    fn = hyper.HotDelDevice
2127
  elif action == constants.HOTPLUG_ACTION_MODIFY:
2128
    fn = hyper.HotModDevice
2129
  else:
2130
    assert action in constants.HOTPLUG_ALL_ACTIONS
2131

    
2132
  return fn(instance, dev_type, device, extra, seq)
2133

    
2134

    
2135
def HotplugSupported(instance):
2136
  """Checks if hotplug is generally supported.
2137

2138
  """
2139
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2140
  try:
2141
    hyper.HotplugSupported(instance)
2142
  except errors.HotplugError, err:
2143
    _Fail("Hotplug is not supported: %s", err)
2144

    
2145

    
2146
def BlockdevCreate(disk, size, owner, on_primary, info, excl_stor):
2147
  """Creates a block device for an instance.
2148

2149
  @type disk: L{objects.Disk}
2150
  @param disk: the object describing the disk we should create
2151
  @type size: int
2152
  @param size: the size of the physical underlying device, in MiB
2153
  @type owner: str
2154
  @param owner: the name of the instance for which disk is created,
2155
      used for device cache data
2156
  @type on_primary: boolean
2157
  @param on_primary:  indicates if it is the primary node or not
2158
  @type info: string
2159
  @param info: string that will be sent to the physical device
2160
      creation, used for example to set (LVM) tags on LVs
2161
  @type excl_stor: boolean
2162
  @param excl_stor: Whether exclusive_storage is active
2163

2164
  @return: the new unique_id of the device (this can sometime be
2165
      computed only after creation), or None. On secondary nodes,
2166
      it's not required to return anything.
2167

2168
  """
2169
  # TODO: remove the obsolete "size" argument
2170
  # pylint: disable=W0613
2171
  clist = []
2172
  if disk.children:
2173
    for child in disk.children:
2174
      try:
2175
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
2176
      except errors.BlockDeviceError, err:
2177
        _Fail("Can't assemble device %s: %s", child, err)
2178
      if on_primary or disk.AssembleOnSecondary():
2179
        # we need the children open in case the device itself has to
2180
        # be assembled
2181
        try:
2182
          # pylint: disable=E1103
2183
          crdev.Open()
2184
        except errors.BlockDeviceError, err:
2185
          _Fail("Can't make child '%s' read-write: %s", child, err)
2186
      clist.append(crdev)
2187

    
2188
  try:
2189
    device = bdev.Create(disk, clist, excl_stor)
2190
  except errors.BlockDeviceError, err:
2191
    _Fail("Can't create block device: %s", err)
2192

    
2193
  if on_primary or disk.AssembleOnSecondary():
2194
    try:
2195
      device.Assemble()
2196
    except errors.BlockDeviceError, err:
2197
      _Fail("Can't assemble device after creation, unusual event: %s", err)
2198
    if on_primary or disk.OpenOnSecondary():
2199
      try:
2200
        device.Open(force=True)
2201
      except errors.BlockDeviceError, err:
2202
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
2203
    DevCacheManager.UpdateCache(device.dev_path, owner,
2204
                                on_primary, disk.iv_name)
2205

    
2206
  device.SetInfo(info)
2207

    
2208
  return device.unique_id
2209

    
2210

    
2211
def _WipeDevice(path, offset, size):
2212
  """This function actually wipes the device.
2213

2214
  @param path: The path to the device to wipe
2215
  @param offset: The offset in MiB in the file
2216
  @param size: The size in MiB to write
2217

2218
  """
2219
  # Internal sizes are always in Mebibytes; if the following "dd" command
2220
  # should use a different block size the offset and size given to this
2221
  # function must be adjusted accordingly before being passed to "dd".
2222
  block_size = 1024 * 1024
2223

    
2224
  cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
2225
         "bs=%s" % block_size, "oflag=direct", "of=%s" % path,
2226
         "count=%d" % size]
2227
  result = utils.RunCmd(cmd)
2228

    
2229
  if result.failed:
2230
    _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
2231
          result.fail_reason, result.output)
2232

    
2233

    
2234
def BlockdevWipe(disk, offset, size):
2235
  """Wipes a block device.
2236

2237
  @type disk: L{objects.Disk}
2238
  @param disk: the disk object we want to wipe
2239
  @type offset: int
2240
  @param offset: The offset in MiB in the file
2241
  @type size: int
2242
  @param size: The size in MiB to write
2243

2244
  """
2245
  try:
2246
    rdev = _RecursiveFindBD(disk)
2247
  except errors.BlockDeviceError:
2248
    rdev = None
2249

    
2250
  if not rdev:
2251
    _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
2252

    
2253
  # Do cross verify some of the parameters
2254
  if offset < 0:
2255
    _Fail("Negative offset")
2256
  if size < 0:
2257
    _Fail("Negative size")
2258
  if offset > rdev.size:
2259
    _Fail("Offset is bigger than device size")
2260
  if (offset + size) > rdev.size:
2261
    _Fail("The provided offset and size to wipe is bigger than device size")
2262

    
2263
  _WipeDevice(rdev.dev_path, offset, size)
2264

    
2265

    
2266
def BlockdevPauseResumeSync(disks, pause):
2267
  """Pause or resume the sync of the block device.
2268

2269
  @type disks: list of L{objects.Disk}
2270
  @param disks: the disks object we want to pause/resume
2271
  @type pause: bool
2272
  @param pause: Wheater to pause or resume
2273

2274
  """
2275
  success = []
2276
  for disk in disks:
2277
    try:
2278
      rdev = _RecursiveFindBD(disk)
2279
    except errors.BlockDeviceError:
2280
      rdev = None
2281

    
2282
    if not rdev:
2283
      success.append((False, ("Cannot change sync for device %s:"
2284
                              " device not found" % disk.iv_name)))
2285
      continue
2286

    
2287
    result = rdev.PauseResumeSync(pause)
2288

    
2289
    if result:
2290
      success.append((result, None))
2291
    else:
2292
      if pause:
2293
        msg = "Pause"
2294
      else:
2295
        msg = "Resume"
2296
      success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
2297

    
2298
  return success
2299

    
2300

    
2301
def BlockdevRemove(disk):
2302
  """Remove a block device.
2303

2304
  @note: This is intended to be called recursively.
2305

2306
  @type disk: L{objects.Disk}
2307
  @param disk: the disk object we should remove
2308
  @rtype: boolean
2309
  @return: the success of the operation
2310

2311
  """
2312
  msgs = []
2313
  try:
2314
    rdev = _RecursiveFindBD(disk)
2315
  except errors.BlockDeviceError, err:
2316
    # probably can't attach
2317
    logging.info("Can't attach to device %s in remove", disk)
2318
    rdev = None
2319
  if rdev is not None:
2320
    r_path = rdev.dev_path
2321

    
2322
    def _TryRemove():
2323
      try:
2324
        rdev.Remove()
2325
        return []
2326
      except errors.BlockDeviceError, err:
2327
        return [str(err)]
2328

    
2329
    msgs.extend(utils.SimpleRetry([], _TryRemove,
2330
                                  constants.DISK_REMOVE_RETRY_INTERVAL,
2331
                                  constants.DISK_REMOVE_RETRY_TIMEOUT))
2332

    
2333
    if not msgs:
2334
      DevCacheManager.RemoveCache(r_path)
2335

    
2336
  if disk.children:
2337
    for child in disk.children:
2338
      try:
2339
        BlockdevRemove(child)
2340
      except RPCFail, err:
2341
        msgs.append(str(err))
2342

    
2343
  if msgs:
2344
    _Fail("; ".join(msgs))
2345

    
2346

    
2347
def _RecursiveAssembleBD(disk, owner, as_primary):
2348
  """Activate a block device for an instance.
2349

2350
  This is run on the primary and secondary nodes for an instance.
2351

2352
  @note: this function is called recursively.
2353

2354
  @type disk: L{objects.Disk}
2355
  @param disk: the disk we try to assemble
2356
  @type owner: str
2357
  @param owner: the name of the instance which owns the disk
2358
  @type as_primary: boolean
2359
  @param as_primary: if we should make the block device
2360
      read/write
2361

2362
  @return: the assembled device or None (in case no device
2363
      was assembled)
2364
  @raise errors.BlockDeviceError: in case there is an error
2365
      during the activation of the children or the device
2366
      itself
2367

2368
  """
2369
  children = []
2370
  if disk.children:
2371
    mcn = disk.ChildrenNeeded()
2372
    if mcn == -1:
2373
      mcn = 0 # max number of Nones allowed
2374
    else:
2375
      mcn = len(disk.children) - mcn # max number of Nones
2376
    for chld_disk in disk.children:
2377
      try:
2378
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
2379
      except errors.BlockDeviceError, err:
2380
        if children.count(None) >= mcn:
2381
          raise
2382
        cdev = None
2383
        logging.error("Error in child activation (but continuing): %s",
2384
                      str(err))
2385
      children.append(cdev)
2386

    
2387
  if as_primary or disk.AssembleOnSecondary():
2388
    r_dev = bdev.Assemble(disk, children)
2389
    result = r_dev
2390
    if as_primary or disk.OpenOnSecondary():
2391
      r_dev.Open()
2392
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
2393
                                as_primary, disk.iv_name)
2394

    
2395
  else:
2396
    result = True
2397
  return result
2398

    
2399

    
2400
def BlockdevAssemble(disk, owner, as_primary, idx):
2401
  """Activate a block device for an instance.
2402

2403
  This is a wrapper over _RecursiveAssembleBD.
2404

2405
  @rtype: str or boolean
2406
  @return: a tuple with the C{/dev/...} path and the created symlink
2407
      for primary nodes, and (C{True}, C{True}) for secondary nodes
2408

2409
  """
2410
  try:
2411
    result = _RecursiveAssembleBD(disk, owner, as_primary)
2412
    if isinstance(result, BlockDev):
2413
      # pylint: disable=E1103
2414
      dev_path = result.dev_path
2415
      link_name = None
2416
      if as_primary:
2417
        link_name = _SymlinkBlockDev(owner, dev_path, idx)
2418
    elif result:
2419
      return result, result
2420
    else:
2421
      _Fail("Unexpected result from _RecursiveAssembleBD")
2422
  except errors.BlockDeviceError, err:
2423
    _Fail("Error while assembling disk: %s", err, exc=True)
2424
  except OSError, err:
2425
    _Fail("Error while symlinking disk: %s", err, exc=True)
2426

    
2427
  return dev_path, link_name
2428

    
2429

    
2430
def BlockdevShutdown(disk):
2431
  """Shut down a block device.
2432

2433
  First, if the device is assembled (Attach() is successful), then
2434
  the device is shutdown. Then the children of the device are
2435
  shutdown.
2436

2437
  This function is called recursively. Note that we don't cache the
2438
  children or such, as oppossed to assemble, shutdown of different
2439
  devices doesn't require that the upper device was active.
2440

2441
  @type disk: L{objects.Disk}
2442
  @param disk: the description of the disk we should
2443
      shutdown
2444
  @rtype: None
2445

2446
  """
2447
  msgs = []
2448
  r_dev = _RecursiveFindBD(disk)
2449
  if r_dev is not None:
2450
    r_path = r_dev.dev_path
2451
    try:
2452
      r_dev.Shutdown()
2453
      DevCacheManager.RemoveCache(r_path)
2454
    except errors.BlockDeviceError, err:
2455
      msgs.append(str(err))
2456

    
2457
  if disk.children:
2458
    for child in disk.children:
2459
      try:
2460
        BlockdevShutdown(child)
2461
      except RPCFail, err:
2462
        msgs.append(str(err))
2463

    
2464
  if msgs:
2465
    _Fail("; ".join(msgs))
2466

    
2467

    
2468
def BlockdevAddchildren(parent_cdev, new_cdevs):
2469
  """Extend a mirrored block device.
2470

2471
  @type parent_cdev: L{objects.Disk}
2472
  @param parent_cdev: the disk to which we should add children
2473
  @type new_cdevs: list of L{objects.Disk}
2474
  @param new_cdevs: the list of children which we should add
2475
  @rtype: None
2476

2477
  """
2478
  parent_bdev = _RecursiveFindBD(parent_cdev)
2479
  if parent_bdev is None:
2480
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
2481
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
2482
  if new_bdevs.count(None) > 0:
2483
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
2484
  parent_bdev.AddChildren(new_bdevs)
2485

    
2486

    
2487
def BlockdevRemovechildren(parent_cdev, new_cdevs):
2488
  """Shrink a mirrored block device.
2489

2490
  @type parent_cdev: L{objects.Disk}
2491
  @param parent_cdev: the disk from which we should remove children
2492
  @type new_cdevs: list of L{objects.Disk}
2493
  @param new_cdevs: the list of children which we should remove
2494
  @rtype: None
2495

2496
  """
2497
  parent_bdev = _RecursiveFindBD(parent_cdev)
2498
  if parent_bdev is None:
2499
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
2500
  devs = []
2501
  for disk in new_cdevs:
2502
    rpath = disk.StaticDevPath()
2503
    if rpath is None:
2504
      bd = _RecursiveFindBD(disk)
2505
      if bd is None:
2506
        _Fail("Can't find device %s while removing children", disk)
2507
      else:
2508
        devs.append(bd.dev_path)
2509
    else:
2510
      if not utils.IsNormAbsPath(rpath):
2511
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
2512
      devs.append(rpath)
2513
  parent_bdev.RemoveChildren(devs)
2514

    
2515

    
2516
def BlockdevGetmirrorstatus(disks):
2517
  """Get the mirroring status of a list of devices.
2518

2519
  @type disks: list of L{objects.Disk}
2520
  @param disks: the list of disks which we should query
2521
  @rtype: disk
2522
  @return: List of L{objects.BlockDevStatus}, one for each disk
2523
  @raise errors.BlockDeviceError: if any of the disks cannot be
2524
      found
2525

2526
  """
2527
  stats = []
2528
  for dsk in disks:
2529
    rbd = _RecursiveFindBD(dsk)
2530
    if rbd is None:
2531
      _Fail("Can't find device %s", dsk)
2532

    
2533
    stats.append(rbd.CombinedSyncStatus())
2534

    
2535
  return stats
2536

    
2537

    
2538
def BlockdevGetmirrorstatusMulti(disks):
2539
  """Get the mirroring status of a list of devices.
2540

2541
  @type disks: list of L{objects.Disk}
2542
  @param disks: the list of disks which we should query
2543
  @rtype: disk
2544
  @return: List of tuples, (bool, status), one for each disk; bool denotes
2545
    success/failure, status is L{objects.BlockDevStatus} on success, string
2546
    otherwise
2547

2548
  """
2549
  result = []
2550
  for disk in disks:
2551
    try:
2552
      rbd = _RecursiveFindBD(disk)
2553
      if rbd is None:
2554
        result.append((False, "Can't find device %s" % disk))
2555
        continue
2556

    
2557
      status = rbd.CombinedSyncStatus()
2558
    except errors.BlockDeviceError, err:
2559
      logging.exception("Error while getting disk status")
2560
      result.append((False, str(err)))
2561
    else:
2562
      result.append((True, status))
2563

    
2564
  assert len(disks) == len(result)
2565

    
2566
  return result
2567

    
2568

    
2569
def _RecursiveFindBD(disk):
2570
  """Check if a device is activated.
2571

2572
  If so, return information about the real device.
2573

2574
  @type disk: L{objects.Disk}
2575
  @param disk: the disk object we need to find
2576

2577
  @return: None if the device can't be found,
2578
      otherwise the device instance
2579

2580
  """
2581
  children = []
2582
  if disk.children:
2583
    for chdisk in disk.children:
2584
      children.append(_RecursiveFindBD(chdisk))
2585

    
2586
  return bdev.FindDevice(disk, children)
2587

    
2588

    
2589
def _OpenRealBD(disk):
2590
  """Opens the underlying block device of a disk.
2591

2592
  @type disk: L{objects.Disk}
2593
  @param disk: the disk object we want to open
2594

2595
  """
2596
  real_disk = _RecursiveFindBD(disk)
2597
  if real_disk is None:
2598
    _Fail("Block device '%s' is not set up", disk)
2599

    
2600
  real_disk.Open()
2601

    
2602
  return real_disk
2603

    
2604

    
2605
def BlockdevFind(disk):
2606
  """Check if a device is activated.
2607

2608
  If it is, return information about the real device.
2609

2610
  @type disk: L{objects.Disk}
2611
  @param disk: the disk to find
2612
  @rtype: None or objects.BlockDevStatus
2613
  @return: None if the disk cannot be found, otherwise a the current
2614
           information
2615

2616
  """
2617
  try:
2618
    rbd = _RecursiveFindBD(disk)
2619
  except errors.BlockDeviceError, err:
2620
    _Fail("Failed to find device: %s", err, exc=True)
2621

    
2622
  if rbd is None:
2623
    return None
2624

    
2625
  return rbd.GetSyncStatus()
2626

    
2627

    
2628
def BlockdevGetdimensions(disks):
2629
  """Computes the size of the given disks.
2630

2631
  If a disk is not found, returns None instead.
2632

2633
  @type disks: list of L{objects.Disk}
2634
  @param disks: the list of disk to compute the size for
2635
  @rtype: list
2636
  @return: list with elements None if the disk cannot be found,
2637
      otherwise the pair (size, spindles), where spindles is None if the
2638
      device doesn't support that
2639

2640
  """
2641
  result = []
2642
  for cf in disks:
2643
    try:
2644
      rbd = _RecursiveFindBD(cf)
2645
    except errors.BlockDeviceError:
2646
      result.append(None)
2647
      continue
2648
    if rbd is None:
2649
      result.append(None)
2650
    else:
2651
      result.append(rbd.GetActualDimensions())
2652
  return result
2653

    
2654

    
2655
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2656
  """Write a file to the filesystem.
2657

2658
  This allows the master to overwrite(!) a file. It will only perform
2659
  the operation if the file belongs to a list of configuration files.
2660

2661
  @type file_name: str
2662
  @param file_name: the target file name
2663
  @type data: str
2664
  @param data: the new contents of the file
2665
  @type mode: int
2666
  @param mode: the mode to give the file (can be None)
2667
  @type uid: string
2668
  @param uid: the owner of the file
2669
  @type gid: string
2670
  @param gid: the group of the file
2671
  @type atime: float
2672
  @param atime: the atime to set on the file (can be None)
2673
  @type mtime: float
2674
  @param mtime: the mtime to set on the file (can be None)
2675
  @rtype: None
2676

2677
  """
2678
  file_name = vcluster.LocalizeVirtualPath(file_name)
2679

    
2680
  if not os.path.isabs(file_name):
2681
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2682

    
2683
  if file_name not in _ALLOWED_UPLOAD_FILES:
2684
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2685
          file_name)
2686

    
2687
  raw_data = _Decompress(data)
2688

    
2689
  if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2690
    _Fail("Invalid username/groupname type")
2691

    
2692
  getents = runtime.GetEnts()
2693
  uid = getents.LookupUser(uid)
2694
  gid = getents.LookupGroup(gid)
2695

    
2696
  utils.SafeWriteFile(file_name, None,
2697
                      data=raw_data, mode=mode, uid=uid, gid=gid,
2698
                      atime=atime, mtime=mtime)
2699

    
2700

    
2701
def RunOob(oob_program, command, node, timeout):
2702
  """Executes oob_program with given command on given node.
2703

2704
  @param oob_program: The path to the executable oob_program
2705
  @param command: The command to invoke on oob_program
2706
  @param node: The node given as an argument to the program
2707
  @param timeout: Timeout after which we kill the oob program
2708

2709
  @return: stdout
2710
  @raise RPCFail: If execution fails for some reason
2711

2712
  """
2713
  result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2714

    
2715
  if result.failed:
2716
    _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2717
          result.fail_reason, result.output)
2718

    
2719
  return result.stdout
2720

    
2721

    
2722
def _OSOndiskAPIVersion(os_dir):
2723
  """Compute and return the API version of a given OS.
2724

2725
  This function will try to read the API version of the OS residing in
2726
  the 'os_dir' directory.
2727

2728
  @type os_dir: str
2729
  @param os_dir: the directory in which we should look for the OS
2730
  @rtype: tuple
2731
  @return: tuple (status, data) with status denoting the validity and
2732
      data holding either the vaid versions or an error message
2733

2734
  """
2735
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2736

    
2737
  try:
2738
    st = os.stat(api_file)
2739
  except EnvironmentError, err:
2740
    return False, ("Required file '%s' not found under path %s: %s" %
2741
                   (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err)))
2742

    
2743
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2744
    return False, ("File '%s' in %s is not a regular file" %
2745
                   (constants.OS_API_FILE, os_dir))
2746

    
2747
  try:
2748
    api_versions = utils.ReadFile(api_file).splitlines()
2749
  except EnvironmentError, err:
2750
    return False, ("Error while reading the API version file at %s: %s" %
2751
                   (api_file, utils.ErrnoOrStr(err)))
2752

    
2753
  try:
2754
    api_versions = [int(version.strip()) for version in api_versions]
2755
  except (TypeError, ValueError), err:
2756
    return False, ("API version(s) can't be converted to integer: %s" %
2757
                   str(err))
2758

    
2759
  return True, api_versions
2760

    
2761

    
2762
def DiagnoseOS(top_dirs=None):
2763
  """Compute the validity for all OSes.
2764

2765
  @type top_dirs: list
2766
  @param top_dirs: the list of directories in which to
2767
      search (if not given defaults to
2768
      L{pathutils.OS_SEARCH_PATH})
2769
  @rtype: list of L{objects.OS}
2770
  @return: a list of tuples (name, path, status, diagnose, variants,
2771
      parameters, api_version) for all (potential) OSes under all
2772
      search paths, where:
2773
          - name is the (potential) OS name
2774
          - path is the full path to the OS
2775
          - status True/False is the validity of the OS
2776
          - diagnose is the error message for an invalid OS, otherwise empty
2777
          - variants is a list of supported OS variants, if any
2778
          - parameters is a list of (name, help) parameters, if any
2779
          - api_version is a list of support OS API versions
2780

2781
  """
2782
  if top_dirs is None:
2783
    top_dirs = pathutils.OS_SEARCH_PATH
2784

    
2785
  result = []
2786
  for dir_name in top_dirs:
2787
    if os.path.isdir(dir_name):
2788
      try:
2789
        f_names = utils.ListVisibleFiles(dir_name)
2790
      except EnvironmentError, err:
2791
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2792
        break
2793
      for name in f_names:
2794
        os_path = utils.PathJoin(dir_name, name)
2795
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2796
        if status:
2797
          diagnose = ""
2798
          variants = os_inst.supported_variants
2799
          parameters = os_inst.supported_parameters
2800
          api_versions = os_inst.api_versions
2801
        else:
2802
          diagnose = os_inst
2803
          variants = parameters = api_versions = []
2804
        result.append((name, os_path, status, diagnose, variants,
2805
                       parameters, api_versions))
2806

    
2807
  return result
2808

    
2809

    
2810
def _TryOSFromDisk(name, base_dir=None):
2811
  """Create an OS instance from disk.
2812

2813
  This function will return an OS instance if the given name is a
2814
  valid OS name.
2815

2816
  @type base_dir: string
2817
  @keyword base_dir: Base directory containing OS installations.
2818
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2819
  @rtype: tuple
2820
  @return: success and either the OS instance if we find a valid one,
2821
      or error message
2822

2823
  """
2824
  if base_dir is None:
2825
    os_dir = utils.FindFile(name, pathutils.OS_SEARCH_PATH, os.path.isdir)
2826
  else:
2827
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2828

    
2829
  if os_dir is None:
2830
    return False, "Directory for OS %s not found in search path" % name
2831

    
2832
  status, api_versions = _OSOndiskAPIVersion(os_dir)
2833
  if not status:
2834
    # push the error up
2835
    return status, api_versions
2836

    
2837
  if not constants.OS_API_VERSIONS.intersection(api_versions):
2838
    return False, ("API version mismatch for path '%s': found %s, want %s." %
2839
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
2840

    
2841
  # OS Files dictionary, we will populate it with the absolute path
2842
  # names; if the value is True, then it is a required file, otherwise
2843
  # an optional one
2844
  os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2845

    
2846
  if max(api_versions) >= constants.OS_API_V15:
2847
    os_files[constants.OS_VARIANTS_FILE] = False
2848

    
2849
  if max(api_versions) >= constants.OS_API_V20:
2850
    os_files[constants.OS_PARAMETERS_FILE] = True
2851
  else:
2852
    del os_files[constants.OS_SCRIPT_VERIFY]
2853

    
2854
  for (filename, required) in os_files.items():
2855
    os_files[filename] = utils.PathJoin(os_dir, filename)
2856

    
2857
    try:
2858
      st = os.stat(os_files[filename])
2859
    except EnvironmentError, err:
2860
      if err.errno == errno.ENOENT and not required:
2861
        del os_files[filename]
2862
        continue
2863
      return False, ("File '%s' under path '%s' is missing (%s)" %
2864
                     (filename, os_dir, utils.ErrnoOrStr(err)))
2865

    
2866
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2867
      return False, ("File '%s' under path '%s' is not a regular file" %
2868
                     (filename, os_dir))
2869

    
2870
    if filename in constants.OS_SCRIPTS:
2871
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2872
        return False, ("File '%s' under path '%s' is not executable" %
2873
                       (filename, os_dir))
2874

    
2875
  variants = []
2876
  if constants.OS_VARIANTS_FILE in os_files:
2877
    variants_file = os_files[constants.OS_VARIANTS_FILE]
2878
    try:
2879
      variants = \
2880
        utils.FilterEmptyLinesAndComments(utils.ReadFile(variants_file))
2881
    except EnvironmentError, err:
2882
      # we accept missing files, but not other errors
2883
      if err.errno != errno.ENOENT:
2884
        return False, ("Error while reading the OS variants file at %s: %s" %
2885
                       (variants_file, utils.ErrnoOrStr(err)))
2886

    
2887
  parameters = []
2888
  if constants.OS_PARAMETERS_FILE in os_files:
2889
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2890
    try:
2891
      parameters = utils.ReadFile(parameters_file).splitlines()
2892
    except EnvironmentError, err:
2893
      return False, ("Error while reading the OS parameters file at %s: %s" %
2894
                     (parameters_file, utils.ErrnoOrStr(err)))
2895
    parameters = [v.split(None, 1) for v in parameters]
2896

    
2897
  os_obj = objects.OS(name=name, path=os_dir,
2898
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
2899
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
2900
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
2901
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
2902
                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2903
                                                 None),
2904
                      supported_variants=variants,
2905
                      supported_parameters=parameters,
2906
                      api_versions=api_versions)
2907
  return True, os_obj
2908

    
2909

    
2910
def OSFromDisk(name, base_dir=None):
2911
  """Create an OS instance from disk.
2912

2913
  This function will return an OS instance if the given name is a
2914
  valid OS name. Otherwise, it will raise an appropriate
2915
  L{RPCFail} exception, detailing why this is not a valid OS.
2916

2917
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2918
  an exception but returns true/false status data.
2919

2920
  @type base_dir: string
2921
  @keyword base_dir: Base directory containing OS installations.
2922
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2923
  @rtype: L{objects.OS}
2924
  @return: the OS instance if we find a valid one
2925
  @raise RPCFail: if we don't find a valid OS
2926

2927
  """
2928
  name_only = objects.OS.GetName(name)
2929
  status, payload = _TryOSFromDisk(name_only, base_dir)
2930

    
2931
  if not status:
2932
    _Fail(payload)
2933

    
2934
  return payload
2935

    
2936

    
2937
def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2938
  """Calculate the basic environment for an os script.
2939

2940
  @type os_name: str
2941
  @param os_name: full operating system name (including variant)
2942
  @type inst_os: L{objects.OS}
2943
  @param inst_os: operating system for which the environment is being built
2944
  @type os_params: dict
2945
  @param os_params: the OS parameters
2946
  @type debug: integer
2947
  @param debug: debug level (0 or 1, for OS Api 10)
2948
  @rtype: dict
2949
  @return: dict of environment variables
2950
  @raise errors.BlockDeviceError: if the block device
2951
      cannot be found
2952

2953
  """
2954
  result = {}
2955
  api_version = \
2956
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2957
  result["OS_API_VERSION"] = "%d" % api_version
2958
  result["OS_NAME"] = inst_os.name
2959
  result["DEBUG_LEVEL"] = "%d" % debug
2960

    
2961
  # OS variants
2962
  if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2963
    variant = objects.OS.GetVariant(os_name)
2964
    if not variant:
2965
      variant = inst_os.supported_variants[0]
2966
  else:
2967
    variant = ""
2968
  result["OS_VARIANT"] = variant
2969

    
2970
  # OS params
2971
  for pname, pvalue in os_params.items():
2972
    result["OSP_%s" % pname.upper()] = pvalue
2973

    
2974
  # Set a default path otherwise programs called by OS scripts (or
2975
  # even hooks called from OS scripts) might break, and we don't want
2976
  # to have each script require setting a PATH variable
2977
  result["PATH"] = constants.HOOKS_PATH
2978

    
2979
  return result
2980

    
2981

    
2982
def OSEnvironment(instance, inst_os, debug=0):
2983
  """Calculate the environment for an os script.
2984

2985
  @type instance: L{objects.Instance}
2986
  @param instance: target instance for the os script run
2987
  @type inst_os: L{objects.OS}
2988
  @param inst_os: operating system for which the environment is being built
2989
  @type debug: integer
2990
  @param debug: debug level (0 or 1, for OS Api 10)
2991
  @rtype: dict
2992
  @return: dict of environment variables
2993
  @raise errors.BlockDeviceError: if the block device
2994
      cannot be found
2995

2996
  """
2997
  result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2998

    
2999
  for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
3000
    result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
3001

    
3002
  result["HYPERVISOR"] = instance.hypervisor
3003
  result["DISK_COUNT"] = "%d" % len(instance.disks)
3004
  result["NIC_COUNT"] = "%d" % len(instance.nics)
3005
  result["INSTANCE_SECONDARY_NODES"] = \
3006
      ("%s" % " ".join(instance.secondary_nodes))
3007

    
3008
  # Disks
3009
  for idx, disk in enumerate(instance.disks):
3010
    real_disk = _OpenRealBD(disk)
3011
    result["DISK_%d_PATH" % idx] = real_disk.dev_path
3012
    result["DISK_%d_ACCESS" % idx] = disk.mode
3013
    result["DISK_%d_UUID" % idx] = disk.uuid
3014
    if disk.name:
3015
      result["DISK_%d_NAME" % idx] = disk.name
3016
    if constants.HV_DISK_TYPE in instance.hvparams:
3017
      result["DISK_%d_FRONTEND_TYPE" % idx] = \
3018
        instance.hvparams[constants.HV_DISK_TYPE]
3019
    if disk.dev_type in constants.DTS_BLOCK:
3020
      result["DISK_%d_BACKEND_TYPE" % idx] = "block"
3021
    elif disk.dev_type in constants.DTS_FILEBASED:
3022
      result["DISK_%d_BACKEND_TYPE" % idx] = \
3023
        "file:%s" % disk.logical_id[0]
3024

    
3025
  # NICs
3026
  for idx, nic in enumerate(instance.nics):
3027
    result["NIC_%d_MAC" % idx] = nic.mac
3028
    result["NIC_%d_UUID" % idx] = nic.uuid
3029
    if nic.name:
3030
      result["NIC_%d_NAME" % idx] = nic.name
3031
    if nic.ip:
3032
      result["NIC_%d_IP" % idx] = nic.ip
3033
    result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
3034
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3035
      result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
3036
    if nic.nicparams[constants.NIC_LINK]:
3037
      result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
3038
    if nic.netinfo:
3039
      nobj = objects.Network.FromDict(nic.netinfo)
3040
      result.update(nobj.HooksDict("NIC_%d_" % idx))
3041
    if constants.HV_NIC_TYPE in instance.hvparams:
3042
      result["NIC_%d_FRONTEND_TYPE" % idx] = \
3043
        instance.hvparams[constants.HV_NIC_TYPE]
3044

    
3045
  # HV/BE params
3046
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
3047
    for key, value in source.items():
3048
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
3049

    
3050
  return result
3051

    
3052

    
3053
def DiagnoseExtStorage(top_dirs=None):
3054
  """Compute the validity for all ExtStorage Providers.
3055

3056
  @type top_dirs: list
3057
  @param top_dirs: the list of directories in which to
3058
      search (if not given defaults to
3059
      L{pathutils.ES_SEARCH_PATH})
3060
  @rtype: list of L{objects.ExtStorage}
3061
  @return: a list of tuples (name, path, status, diagnose, parameters)
3062
      for all (potential) ExtStorage Providers under all
3063
      search paths, where:
3064
          - name is the (potential) ExtStorage Provider
3065
          - path is the full path to the ExtStorage Provider
3066
          - status True/False is the validity of the ExtStorage Provider
3067
          - diagnose is the error message for an invalid ExtStorage Provider,
3068
            otherwise empty
3069
          - parameters is a list of (name, help) parameters, if any
3070

3071
  """
3072
  if top_dirs is None:
3073
    top_dirs = pathutils.ES_SEARCH_PATH
3074

    
3075
  result = []
3076
  for dir_name in top_dirs:
3077
    if os.path.isdir(dir_name):
3078
      try:
3079
        f_names = utils.ListVisibleFiles(dir_name)
3080
      except EnvironmentError, err:
3081
        logging.exception("Can't list the ExtStorage directory %s: %s",
3082
                          dir_name, err)
3083
        break
3084
      for name in f_names:
3085
        es_path = utils.PathJoin(dir_name, name)
3086
        status, es_inst = bdev.ExtStorageFromDisk(name, base_dir=dir_name)
3087
        if status:
3088
          diagnose = ""
3089
          parameters = es_inst.supported_parameters
3090
        else:
3091
          diagnose = es_inst
3092
          parameters = []
3093
        result.append((name, es_path, status, diagnose, parameters))
3094

    
3095
  return result
3096

    
3097

    
3098
def BlockdevGrow(disk, amount, dryrun, backingstore, excl_stor):
3099
  """Grow a stack of block devices.
3100

3101
  This function is called recursively, with the childrens being the
3102
  first ones to resize.
3103

3104
  @type disk: L{objects.Disk}
3105
  @param disk: the disk to be grown
3106
  @type amount: integer
3107
  @param amount: the amount (in mebibytes) to grow with
3108
  @type dryrun: boolean
3109
  @param dryrun: whether to execute the operation in simulation mode
3110
      only, without actually increasing the size
3111
  @param backingstore: whether to execute the operation on backing storage
3112
      only, or on "logical" storage only; e.g. DRBD is logical storage,
3113
      whereas LVM, file, RBD are backing storage
3114
  @rtype: (status, result)
3115
  @type excl_stor: boolean
3116
  @param excl_stor: Whether exclusive_storage is active
3117
  @return: a tuple with the status of the operation (True/False), and
3118
      the errors message if status is False
3119

3120
  """
3121
  r_dev = _RecursiveFindBD(disk)
3122
  if r_dev is None:
3123
    _Fail("Cannot find block device %s", disk)
3124

    
3125
  try:
3126
    r_dev.Grow(amount, dryrun, backingstore, excl_stor)
3127
  except errors.BlockDeviceError, err:
3128
    _Fail("Failed to grow block device: %s", err, exc=True)
3129

    
3130

    
3131
def BlockdevSnapshot(disk):
3132
  """Create a snapshot copy of a block device.
3133

3134
  This function is called recursively, and the snapshot is actually created
3135
  just for the leaf lvm backend device.
3136

3137
  @type disk: L{objects.Disk}
3138
  @param disk: the disk to be snapshotted
3139
  @rtype: string
3140
  @return: snapshot disk ID as (vg, lv)
3141

3142
  """
3143
  if disk.dev_type == constants.DT_DRBD8:
3144
    if not disk.children:
3145
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
3146
            disk.unique_id)
3147
    return BlockdevSnapshot(disk.children[0])
3148
  elif disk.dev_type == constants.DT_PLAIN:
3149
    r_dev = _RecursiveFindBD(disk)
3150
    if r_dev is not None:
3151
      # FIXME: choose a saner value for the snapshot size
3152
      # let's stay on the safe side and ask for the full size, for now
3153
      return r_dev.Snapshot(disk.size)
3154
    else:
3155
      _Fail("Cannot find block device %s", disk)
3156
  else:
3157
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
3158
          disk.unique_id, disk.dev_type)
3159

    
3160

    
3161
def BlockdevSetInfo(disk, info):
3162
  """Sets 'metadata' information on block devices.
3163

3164
  This function sets 'info' metadata on block devices. Initial
3165
  information is set at device creation; this function should be used
3166
  for example after renames.
3167

3168
  @type disk: L{objects.Disk}
3169
  @param disk: the disk to be grown
3170
  @type info: string
3171
  @param info: new 'info' metadata
3172
  @rtype: (status, result)
3173
  @return: a tuple with the status of the operation (True/False), and
3174
      the errors message if status is False
3175

3176
  """
3177
  r_dev = _RecursiveFindBD(disk)
3178
  if r_dev is None:
3179
    _Fail("Cannot find block device %s", disk)
3180

    
3181
  try:
3182
    r_dev.SetInfo(info)
3183
  except errors.BlockDeviceError, err:
3184
    _Fail("Failed to set information on block device: %s", err, exc=True)
3185

    
3186

    
3187
def FinalizeExport(instance, snap_disks):
3188
  """Write out the export configuration information.
3189

3190
  @type instance: L{objects.Instance}
3191
  @param instance: the instance which we export, used for
3192
      saving configuration
3193
  @type snap_disks: list of L{objects.Disk}
3194
  @param snap_disks: list of snapshot block devices, which
3195
      will be used to get the actual name of the dump file
3196

3197
  @rtype: None
3198

3199
  """
3200
  destdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name + ".new")
3201
  finaldestdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name)
3202

    
3203
  config = objects.SerializableConfigParser()
3204

    
3205
  config.add_section(constants.INISECT_EXP)
3206
  config.set(constants.INISECT_EXP, "version", "0")
3207
  config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
3208
  config.set(constants.INISECT_EXP, "source", instance.primary_node)
3209
  config.set(constants.INISECT_EXP, "os", instance.os)
3210
  config.set(constants.INISECT_EXP, "compression", "none")
3211

    
3212
  config.add_section(constants.INISECT_INS)
3213
  config.set(constants.INISECT_INS, "name", instance.name)
3214
  config.set(constants.INISECT_INS, "maxmem", "%d" %
3215
             instance.beparams[constants.BE_MAXMEM])
3216
  config.set(constants.INISECT_INS, "minmem", "%d" %
3217
             instance.beparams[constants.BE_MINMEM])
3218
  # "memory" is deprecated, but useful for exporting to old ganeti versions
3219
  config.set(constants.INISECT_INS, "memory", "%d" %
3220
             instance.beparams[constants.BE_MAXMEM])
3221
  config.set(constants.INISECT_INS, "vcpus", "%d" %
3222
             instance.beparams[constants.BE_VCPUS])
3223
  config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
3224
  config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
3225
  config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
3226

    
3227
  nic_total = 0
3228
  for nic_count, nic in enumerate(instance.nics):
3229
    nic_total += 1
3230
    config.set(constants.INISECT_INS, "nic%d_mac" %
3231
               nic_count, "%s" % nic.mac)
3232
    config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
3233
    config.set(constants.INISECT_INS, "nic%d_network" % nic_count,
3234
               "%s" % nic.network)
3235
    for param in constants.NICS_PARAMETER_TYPES:
3236
      config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
3237
                 "%s" % nic.nicparams.get(param, None))
3238
  # TODO: redundant: on load can read nics until it doesn't exist
3239
  config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
3240

    
3241
  disk_total = 0
3242
  for disk_count, disk in enumerate(snap_disks):
3243
    if disk:
3244
      disk_total += 1
3245
      config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
3246
                 ("%s" % disk.iv_name))
3247
      config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
3248
                 ("%s" % disk.logical_id[1]))
3249
      config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
3250
                 ("%d" % disk.size))
3251

    
3252
  config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
3253

    
3254
  # New-style hypervisor/backend parameters
3255

    
3256
  config.add_section(constants.INISECT_HYP)
3257
  for name, value in instance.hvparams.items():
3258
    if name not in constants.HVC_GLOBALS:
3259
      config.set(constants.INISECT_HYP, name, str(value))
3260

    
3261
  config.add_section(constants.INISECT_BEP)
3262
  for name, value in instance.beparams.items():
3263
    config.set(constants.INISECT_BEP, name, str(value))
3264

    
3265
  config.add_section(constants.INISECT_OSP)
3266
  for name, value in instance.osparams.items():
3267
    config.set(constants.INISECT_OSP, name, str(value))
3268

    
3269
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
3270
                  data=config.Dumps())
3271
  shutil.rmtree(finaldestdir, ignore_errors=True)
3272
  shutil.move(destdir, finaldestdir)
3273

    
3274

    
3275
def ExportInfo(dest):
3276
  """Get export configuration information.
3277

3278
  @type dest: str
3279
  @param dest: directory containing the export
3280

3281
  @rtype: L{objects.SerializableConfigParser}
3282
  @return: a serializable config file containing the
3283
      export info
3284

3285
  """
3286
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
3287

    
3288
  config = objects.SerializableConfigParser()
3289
  config.read(cff)
3290

    
3291
  if (not config.has_section(constants.INISECT_EXP) or
3292
      not config.has_section(constants.INISECT_INS)):
3293
    _Fail("Export info file doesn't have the required fields")
3294

    
3295
  return config.Dumps()
3296

    
3297

    
3298
def ListExports():
3299
  """Return a list of exports currently available on this machine.
3300

3301
  @rtype: list
3302
  @return: list of the exports
3303

3304
  """
3305
  if os.path.isdir(pathutils.EXPORT_DIR):
3306
    return sorted(utils.ListVisibleFiles(pathutils.EXPORT_DIR))
3307
  else:
3308
    _Fail("No exports directory")
3309

    
3310

    
3311
def RemoveExport(export):
3312
  """Remove an existing export from the node.
3313

3314
  @type export: str
3315
  @param export: the name of the export to remove
3316
  @rtype: None
3317

3318
  """
3319
  target = utils.PathJoin(pathutils.EXPORT_DIR, export)
3320

    
3321
  try:
3322
    shutil.rmtree(target)
3323
  except EnvironmentError, err:
3324
    _Fail("Error while removing the export: %s", err, exc=True)
3325

    
3326

    
3327
def BlockdevRename(devlist):
3328
  """Rename a list of block devices.
3329

3330
  @type devlist: list of tuples
3331
  @param devlist: list of tuples of the form  (disk, new_unique_id); disk is
3332
      an L{objects.Disk} object describing the current disk, and new
3333
      unique_id is the name we rename it to
3334
  @rtype: boolean
3335
  @return: True if all renames succeeded, False otherwise
3336

3337
  """
3338
  msgs = []
3339
  result = True
3340
  for disk, unique_id in devlist:
3341
    dev = _RecursiveFindBD(disk)
3342
    if dev is None:
3343
      msgs.append("Can't find device %s in rename" % str(disk))
3344
      result = False
3345
      continue
3346
    try:
3347
      old_rpath = dev.dev_path
3348
      dev.Rename(unique_id)
3349
      new_rpath = dev.dev_path
3350
      if old_rpath != new_rpath:
3351
        DevCacheManager.RemoveCache(old_rpath)
3352
        # FIXME: we should add the new cache information here, like:
3353
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
3354
        # but we don't have the owner here - maybe parse from existing
3355
        # cache? for now, we only lose lvm data when we rename, which
3356
        # is less critical than DRBD or MD
3357
    except errors.BlockDeviceError, err:
3358
      msgs.append("Can't rename device '%s' to '%s': %s" %
3359
                  (dev, unique_id, err))
3360
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
3361
      result = False
3362
  if not result:
3363
    _Fail("; ".join(msgs))
3364

    
3365

    
3366
def _TransformFileStorageDir(fs_dir):
3367
  """Checks whether given file_storage_dir is valid.
3368

3369
  Checks wheter the given fs_dir is within the cluster-wide default
3370
  file_storage_dir or the shared_file_storage_dir, which are stored in
3371
  SimpleStore. Only paths under those directories are allowed.
3372

3373
  @type fs_dir: str
3374
  @param fs_dir: the path to check
3375

3376
  @return: the normalized path if valid, None otherwise
3377

3378
  """
3379
  filestorage.CheckFileStoragePath(fs_dir)
3380

    
3381
  return os.path.normpath(fs_dir)
3382

    
3383

    
3384
def CreateFileStorageDir(file_storage_dir):
3385
  """Create file storage directory.
3386

3387
  @type file_storage_dir: str
3388
  @param file_storage_dir: directory to create
3389

3390
  @rtype: tuple
3391
  @return: tuple with first element a boolean indicating wheter dir
3392
      creation was successful or not
3393

3394
  """
3395
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
3396
  if os.path.exists(file_storage_dir):
3397
    if not os.path.isdir(file_storage_dir):
3398
      _Fail("Specified storage dir '%s' is not a directory",
3399
            file_storage_dir)
3400
  else:
3401
    try:
3402
      os.makedirs(file_storage_dir, 0750)
3403
    except OSError, err:
3404
      _Fail("Cannot create file storage directory '%s': %s",
3405
            file_storage_dir, err, exc=True)
3406

    
3407

    
3408
def RemoveFileStorageDir(file_storage_dir):
3409
  """Remove file storage directory.
3410

3411
  Remove it only if it's empty. If not log an error and return.
3412

3413
  @type file_storage_dir: str
3414
  @param file_storage_dir: the directory we should cleanup
3415
  @rtype: tuple (success,)
3416
  @return: tuple of one element, C{success}, denoting
3417
      whether the operation was successful
3418

3419
  """
3420
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
3421
  if os.path.exists(file_storage_dir):
3422
    if not os.path.isdir(file_storage_dir):
3423
      _Fail("Specified Storage directory '%s' is not a directory",
3424
            file_storage_dir)
3425
    # deletes dir only if empty, otherwise we want to fail the rpc call
3426
    try:
3427
      os.rmdir(file_storage_dir)
3428
    except OSError, err:
3429
      _Fail("Cannot remove file storage directory '%s': %s",
3430
            file_storage_dir, err)
3431

    
3432

    
3433
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
3434
  """Rename the file storage directory.
3435

3436
  @type old_file_storage_dir: str
3437
  @param old_file_storage_dir: the current path
3438
  @type new_file_storage_dir: str
3439
  @param new_file_storage_dir: the name we should rename to
3440
  @rtype: tuple (success,)
3441
  @return: tuple of one element, C{success}, denoting
3442
      whether the operation was successful
3443

3444
  """
3445
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
3446
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
3447
  if not os.path.exists(new_file_storage_dir):
3448
    if os.path.isdir(old_file_storage_dir):
3449
      try:
3450
        os.rename(old_file_storage_dir, new_file_storage_dir)
3451
      except OSError, err:
3452
        _Fail("Cannot rename '%s' to '%s': %s",
3453
              old_file_storage_dir, new_file_storage_dir, err)
3454
    else:
3455
      _Fail("Specified storage dir '%s' is not a directory",
3456
            old_file_storage_dir)
3457
  else:
3458
    if os.path.exists(old_file_storage_dir):
3459
      _Fail("Cannot rename '%s' to '%s': both locations exist",
3460
            old_file_storage_dir, new_file_storage_dir)
3461

    
3462

    
3463
def _EnsureJobQueueFile(file_name):
3464
  """Checks whether the given filename is in the queue directory.
3465

3466
  @type file_name: str
3467
  @param file_name: the file name we should check
3468
  @rtype: None
3469
  @raises RPCFail: if the file is not valid
3470

3471
  """
3472
  if not utils.IsBelowDir(pathutils.QUEUE_DIR, file_name):
3473
    _Fail("Passed job queue file '%s' does not belong to"
3474
          " the queue directory '%s'", file_name, pathutils.QUEUE_DIR)
3475

    
3476

    
3477
def JobQueueUpdate(file_name, content):
3478
  """Updates a file in the queue directory.
3479

3480
  This is just a wrapper over L{utils.io.WriteFile}, with proper
3481
  checking.
3482

3483
  @type file_name: str
3484
  @param file_name: the job file name
3485
  @type content: str
3486
  @param content: the new job contents
3487
  @rtype: boolean
3488
  @return: the success of the operation
3489

3490
  """
3491
  file_name = vcluster.LocalizeVirtualPath(file_name)
3492

    
3493
  _EnsureJobQueueFile(file_name)
3494
  getents = runtime.GetEnts()
3495

    
3496
  # Write and replace the file atomically
3497
  utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
3498
                  gid=getents.daemons_gid, mode=constants.JOB_QUEUE_FILES_PERMS)
3499

    
3500

    
3501
def JobQueueRename(old, new):
3502
  """Renames a job queue file.
3503

3504
  This is just a wrapper over os.rename with proper checking.
3505

3506
  @type old: str
3507
  @param old: the old (actual) file name
3508
  @type new: str
3509
  @param new: the desired file name
3510
  @rtype: tuple
3511
  @return: the success of the operation and payload
3512

3513
  """
3514
  old = vcluster.LocalizeVirtualPath(old)
3515
  new = vcluster.LocalizeVirtualPath(new)
3516

    
3517
  _EnsureJobQueueFile(old)
3518
  _EnsureJobQueueFile(new)
3519

    
3520
  getents = runtime.GetEnts()
3521

    
3522
  utils.RenameFile(old, new, mkdir=True, mkdir_mode=0750,
3523
                   dir_uid=getents.masterd_uid, dir_gid=getents.daemons_gid)
3524

    
3525

    
3526
def BlockdevClose(instance_name, disks):
3527
  """Closes the given block devices.
3528

3529
  This means they will be switched to secondary mode (in case of
3530
  DRBD).
3531

3532
  @param instance_name: if the argument is not empty, the symlinks
3533
      of this instance will be removed
3534
  @type disks: list of L{objects.Disk}
3535
  @param disks: the list of disks to be closed
3536
  @rtype: tuple (success, message)
3537
  @return: a tuple of success and message, where success
3538
      indicates the succes of the operation, and message
3539
      which will contain the error details in case we
3540
      failed
3541

3542
  """
3543
  bdevs = []
3544
  for cf in disks:
3545
    rd = _RecursiveFindBD(cf)
3546
    if rd is None:
3547
      _Fail("Can't find device %s", cf)
3548
    bdevs.append(rd)
3549

    
3550
  msg = []
3551
  for rd in bdevs:
3552
    try:
3553
      rd.Close()
3554
    except errors.BlockDeviceError, err:
3555
      msg.append(str(err))
3556
  if msg:
3557
    _Fail("Can't make devices secondary: %s", ",".join(msg))
3558
  else:
3559
    if instance_name:
3560
      _RemoveBlockDevLinks(instance_name, disks)
3561

    
3562

    
3563
def ValidateHVParams(hvname, hvparams):
3564
  """Validates the given hypervisor parameters.
3565

3566
  @type hvname: string
3567
  @param hvname: the hypervisor name
3568
  @type hvparams: dict
3569
  @param hvparams: the hypervisor parameters to be validated
3570
  @rtype: None
3571

3572
  """
3573
  try:
3574
    hv_type = hypervisor.GetHypervisor(hvname)
3575
    hv_type.ValidateParameters(hvparams)
3576
  except errors.HypervisorError, err:
3577
    _Fail(str(err), log=False)
3578

    
3579

    
3580
def _CheckOSPList(os_obj, parameters):
3581
  """Check whether a list of parameters is supported by the OS.
3582

3583
  @type os_obj: L{objects.OS}
3584
  @param os_obj: OS object to check
3585
  @type parameters: list
3586
  @param parameters: the list of parameters to check
3587

3588
  """
3589
  supported = [v[0] for v in os_obj.supported_parameters]
3590
  delta = frozenset(parameters).difference(supported)
3591
  if delta:
3592
    _Fail("The following parameters are not supported"
3593
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
3594

    
3595

    
3596
def ValidateOS(required, osname, checks, osparams):
3597
  """Validate the given OS' parameters.
3598

3599
  @type required: boolean
3600
  @param required: whether absence of the OS should translate into
3601
      failure or not
3602
  @type osname: string
3603
  @param osname: the OS to be validated
3604
  @type checks: list
3605
  @param checks: list of the checks to run (currently only 'parameters')
3606
  @type osparams: dict
3607
  @param osparams: dictionary with OS parameters, some of which may be
3608
                   private.
3609
  @rtype: boolean
3610
  @return: True if the validation passed, or False if the OS was not
3611
      found and L{required} was false
3612

3613
  """
3614
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
3615
    _Fail("Unknown checks required for OS %s: %s", osname,
3616
          set(checks).difference(constants.OS_VALIDATE_CALLS))
3617

    
3618
  name_only = objects.OS.GetName(osname)
3619
  status, tbv = _TryOSFromDisk(name_only, None)
3620

    
3621
  if not status:
3622
    if required:
3623
      _Fail(tbv)
3624
    else:
3625
      return False
3626

    
3627
  if max(tbv.api_versions) < constants.OS_API_V20:
3628
    return True
3629

    
3630
  if constants.OS_VALIDATE_PARAMETERS in checks:
3631
    _CheckOSPList(tbv, osparams.keys())
3632

    
3633
  validate_env = OSCoreEnv(osname, tbv, osparams)
3634
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
3635
                        cwd=tbv.path, reset_env=True)
3636
  if result.failed:
3637
    logging.error("os validate command '%s' returned error: %s output: %s",
3638
                  result.cmd, result.fail_reason, result.output)
3639
    _Fail("OS validation script failed (%s), output: %s",
3640
          result.fail_reason, result.output, log=False)
3641

    
3642
  return True
3643

    
3644

    
3645
def DemoteFromMC():
3646
  """Demotes the current node from master candidate role.
3647

3648
  """
3649
  # try to ensure we're not the master by mistake
3650
  master, myself = ssconf.GetMasterAndMyself()
3651
  if master == myself:
3652
    _Fail("ssconf status shows I'm the master node, will not demote")
3653

    
3654
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "check", constants.MASTERD])
3655
  if not result.failed:
3656
    _Fail("The master daemon is running, will not demote")
3657

    
3658
  try:
3659
    if os.path.isfile(pathutils.CLUSTER_CONF_FILE):
3660
      utils.CreateBackup(pathutils.CLUSTER_CONF_FILE)
3661
  except EnvironmentError, err:
3662
    if err.errno != errno.ENOENT:
3663
      _Fail("Error while backing up cluster file: %s", err, exc=True)
3664

    
3665
  utils.RemoveFile(pathutils.CLUSTER_CONF_FILE)
3666

    
3667

    
3668
def _GetX509Filenames(cryptodir, name):
3669
  """Returns the full paths for the private key and certificate.
3670

3671
  """
3672
  return (utils.PathJoin(cryptodir, name),
3673
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
3674
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
3675

    
3676

    
3677
def CreateX509Certificate(validity, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3678
  """Creates a new X509 certificate for SSL/TLS.
3679

3680
  @type validity: int
3681
  @param validity: Validity in seconds
3682
  @rtype: tuple; (string, string)
3683
  @return: Certificate name and public part
3684

3685
  """
3686
  (key_pem, cert_pem) = \
3687
    utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
3688
                                     min(validity, _MAX_SSL_CERT_VALIDITY))
3689

    
3690
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
3691
                              prefix="x509-%s-" % utils.TimestampForFilename())
3692
  try:
3693
    name = os.path.basename(cert_dir)
3694
    assert len(name) > 5
3695

    
3696
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3697

    
3698
    utils.WriteFile(key_file, mode=0400, data=key_pem)
3699
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
3700

    
3701
    # Never return private key as it shouldn't leave the node
3702
    return (name, cert_pem)
3703
  except Exception:
3704
    shutil.rmtree(cert_dir, ignore_errors=True)
3705
    raise
3706

    
3707

    
3708
def RemoveX509Certificate(name, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3709
  """Removes a X509 certificate.
3710

3711
  @type name: string
3712
  @param name: Certificate name
3713

3714
  """
3715
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3716

    
3717
  utils.RemoveFile(key_file)
3718
  utils.RemoveFile(cert_file)
3719

    
3720
  try:
3721
    os.rmdir(cert_dir)
3722
  except EnvironmentError, err:
3723
    _Fail("Cannot remove certificate directory '%s': %s",
3724
          cert_dir, err)
3725

    
3726

    
3727
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3728
  """Returns the command for the requested input/output.
3729

3730
  @type instance: L{objects.Instance}
3731
  @param instance: The instance object
3732
  @param mode: Import/export mode
3733
  @param ieio: Input/output type
3734
  @param ieargs: Input/output arguments
3735

3736
  """
3737
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3738

    
3739
  env = None
3740
  prefix = None
3741
  suffix = None
3742
  exp_size = None
3743

    
3744
  if ieio == constants.IEIO_FILE:
3745
    (filename, ) = ieargs
3746

    
3747
    if not utils.IsNormAbsPath(filename):
3748
      _Fail("Path '%s' is not normalized or absolute", filename)
3749

    
3750
    real_filename = os.path.realpath(filename)
3751
    directory = os.path.dirname(real_filename)
3752

    
3753
    if not utils.IsBelowDir(pathutils.EXPORT_DIR, real_filename):
3754
      _Fail("File '%s' is not under exports directory '%s': %s",
3755
            filename, pathutils.EXPORT_DIR, real_filename)
3756

    
3757
    # Create directory
3758
    utils.Makedirs(directory, mode=0750)
3759

    
3760
    quoted_filename = utils.ShellQuote(filename)
3761

    
3762
    if mode == constants.IEM_IMPORT:
3763
      suffix = "> %s" % quoted_filename
3764
    elif mode == constants.IEM_EXPORT:
3765
      suffix = "< %s" % quoted_filename
3766

    
3767
      # Retrieve file size
3768
      try:
3769
        st = os.stat(filename)
3770
      except EnvironmentError, err:
3771
        logging.error("Can't stat(2) %s: %s", filename, err)
3772
      else:
3773
        exp_size = utils.BytesToMebibyte(st.st_size)
3774

    
3775
  elif ieio == constants.IEIO_RAW_DISK:
3776
    (disk, ) = ieargs
3777

    
3778
    real_disk = _OpenRealBD(disk)
3779

    
3780
    if mode == constants.IEM_IMPORT:
3781
      # we use nocreat to fail if the device is not already there or we pass a
3782
      # wrong path; we use notrunc to no attempt truncate on an LV device
3783
      suffix = utils.BuildShellCmd("| dd of=%s conv=nocreat,notrunc bs=%s",
3784
                                   real_disk.dev_path,
3785
                                   str(1024 * 1024)) # 1 MB
3786

    
3787
    elif mode == constants.IEM_EXPORT:
3788
      # the block size on the read dd is 1MiB to match our units
3789
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3790
                                   real_disk.dev_path,
3791
                                   str(1024 * 1024), # 1 MB
3792
                                   str(disk.size))
3793
      exp_size = disk.size
3794

    
3795
  elif ieio == constants.IEIO_SCRIPT:
3796
    (disk, disk_index, ) = ieargs
3797

    
3798
    assert isinstance(disk_index, (int, long))
3799

    
3800
    inst_os = OSFromDisk(instance.os)
3801
    env = OSEnvironment(instance, inst_os)
3802

    
3803
    if mode == constants.IEM_IMPORT:
3804
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3805
      env["IMPORT_INDEX"] = str(disk_index)
3806
      script = inst_os.import_script
3807

    
3808
    elif mode == constants.IEM_EXPORT:
3809
      real_disk = _OpenRealBD(disk)
3810
      env["EXPORT_DEVICE"] = real_disk.dev_path
3811
      env["EXPORT_INDEX"] = str(disk_index)
3812
      script = inst_os.export_script
3813

    
3814
    # TODO: Pass special environment only to script
3815
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3816

    
3817
    if mode == constants.IEM_IMPORT:
3818
      suffix = "| %s" % script_cmd
3819

    
3820
    elif mode == constants.IEM_EXPORT:
3821
      prefix = "%s |" % script_cmd
3822

    
3823
    # Let script predict size
3824
    exp_size = constants.IE_CUSTOM_SIZE
3825

    
3826
  else:
3827
    _Fail("Invalid %s I/O mode %r", mode, ieio)
3828

    
3829
  return (env, prefix, suffix, exp_size)
3830

    
3831

    
3832
def _CreateImportExportStatusDir(prefix):
3833
  """Creates status directory for import/export.
3834

3835
  """
3836
  return tempfile.mkdtemp(dir=pathutils.IMPORT_EXPORT_DIR,
3837
                          prefix=("%s-%s-" %
3838
                                  (prefix, utils.TimestampForFilename())))
3839

    
3840

    
3841
def StartImportExportDaemon(mode, opts, host, port, instance, component,
3842
                            ieio, ieioargs):
3843
  """Starts an import or export daemon.
3844

3845
  @param mode: Import/output mode
3846
  @type opts: L{objects.ImportExportOptions}
3847
  @param opts: Daemon options
3848
  @type host: string
3849
  @param host: Remote host for export (None for import)
3850
  @type port: int
3851
  @param port: Remote port for export (None for import)
3852
  @type instance: L{objects.Instance}
3853
  @param instance: Instance object
3854
  @type component: string
3855
  @param component: which part of the instance is transferred now,
3856
      e.g. 'disk/0'
3857
  @param ieio: Input/output type
3858
  @param ieioargs: Input/output arguments
3859

3860
  """
3861
  if mode == constants.IEM_IMPORT:
3862
    prefix = "import"
3863

    
3864
    if not (host is None and port is None):
3865
      _Fail("Can not specify host or port on import")
3866

    
3867
  elif mode == constants.IEM_EXPORT:
3868
    prefix = "export"
3869

    
3870
    if host is None or port is None:
3871
      _Fail("Host and port must be specified for an export")
3872

    
3873
  else:
3874
    _Fail("Invalid mode %r", mode)
3875

    
3876
  if (opts.key_name is None) ^ (opts.ca_pem is None):
3877
    _Fail("Cluster certificate can only be used for both key and CA")
3878

    
3879
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3880
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3881

    
3882
  if opts.key_name is None:
3883
    # Use server.pem
3884
    key_path = pathutils.NODED_CERT_FILE
3885
    cert_path = pathutils.NODED_CERT_FILE
3886
    assert opts.ca_pem is None
3887
  else:
3888
    (_, key_path, cert_path) = _GetX509Filenames(pathutils.CRYPTO_KEYS_DIR,
3889
                                                 opts.key_name)
3890
    assert opts.ca_pem is not None
3891

    
3892
  for i in [key_path, cert_path]:
3893
    if not os.path.exists(i):
3894
      _Fail("File '%s' does not exist" % i)
3895

    
3896
  status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3897
  try:
3898
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3899
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3900
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3901

    
3902
    if opts.ca_pem is None:
3903
      # Use server.pem
3904
      ca = utils.ReadFile(pathutils.NODED_CERT_FILE)
3905
    else:
3906
      ca = opts.ca_pem
3907

    
3908
    # Write CA file
3909
    utils.WriteFile(ca_file, data=ca, mode=0400)
3910

    
3911
    cmd = [
3912
      pathutils.IMPORT_EXPORT_DAEMON,
3913
      status_file, mode,
3914
      "--key=%s" % key_path,
3915
      "--cert=%s" % cert_path,
3916
      "--ca=%s" % ca_file,
3917
      ]
3918

    
3919
    if host:
3920
      cmd.append("--host=%s" % host)
3921

    
3922
    if port:
3923
      cmd.append("--port=%s" % port)
3924

    
3925
    if opts.ipv6:
3926
      cmd.append("--ipv6")
3927
    else:
3928
      cmd.append("--ipv4")
3929

    
3930
    if opts.compress:
3931
      cmd.append("--compress=%s" % opts.compress)
3932

    
3933
    if opts.magic:
3934
      cmd.append("--magic=%s" % opts.magic)
3935

    
3936
    if exp_size is not None:
3937
      cmd.append("--expected-size=%s" % exp_size)
3938

    
3939
    if cmd_prefix:
3940
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
3941

    
3942
    if cmd_suffix:
3943
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
3944

    
3945
    if mode == constants.IEM_EXPORT:
3946
      # Retry connection a few times when connecting to remote peer
3947
      cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3948
      cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3949
    elif opts.connect_timeout is not None:
3950
      assert mode == constants.IEM_IMPORT
3951
      # Overall timeout for establishing connection while listening
3952
      cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3953

    
3954
    logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3955

    
3956
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3957
    # support for receiving a file descriptor for output
3958
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3959
                      output=logfile)
3960

    
3961
    # The import/export name is simply the status directory name
3962
    return os.path.basename(status_dir)
3963

    
3964
  except Exception:
3965
    shutil.rmtree(status_dir, ignore_errors=True)
3966
    raise
3967

    
3968

    
3969
def GetImportExportStatus(names):
3970
  """Returns import/export daemon status.
3971

3972
  @type names: sequence
3973
  @param names: List of names
3974
  @rtype: List of dicts
3975
  @return: Returns a list of the state of each named import/export or None if a
3976
           status couldn't be read
3977

3978
  """
3979
  result = []
3980

    
3981
  for name in names:
3982
    status_file = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name,
3983
                                 _IES_STATUS_FILE)
3984

    
3985
    try:
3986
      data = utils.ReadFile(status_file)
3987
    except EnvironmentError, err:
3988
      if err.errno != errno.ENOENT:
3989
        raise
3990
      data = None
3991

    
3992
    if not data:
3993
      result.append(None)
3994
      continue
3995

    
3996
    result.append(serializer.LoadJson(data))
3997

    
3998
  return result
3999

    
4000

    
4001
def AbortImportExport(name):
4002
  """Sends SIGTERM to a running import/export daemon.
4003

4004
  """
4005
  logging.info("Abort import/export %s", name)
4006

    
4007
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
4008
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
4009

    
4010
  if pid:
4011
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
4012
                 name, pid)
4013
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
4014

    
4015

    
4016
def CleanupImportExport(name):
4017
  """Cleanup after an import or export.
4018

4019
  If the import/export daemon is still running it's killed. Afterwards the
4020
  whole status directory is removed.
4021

4022
  """
4023
  logging.info("Finalizing import/export %s", name)
4024

    
4025
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
4026

    
4027
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
4028

    
4029
  if pid:
4030
    logging.info("Import/export %s is still running with PID %s",
4031
                 name, pid)
4032
    utils.KillProcess(pid, waitpid=False)
4033

    
4034
  shutil.rmtree(status_dir, ignore_errors=True)
4035

    
4036

    
4037
def _FindDisks(disks):
4038
  """Finds attached L{BlockDev}s for the given disks.
4039

4040
  @type disks: list of L{objects.Disk}
4041
  @param disks: the disk objects we need to find
4042

4043
  @return: list of L{BlockDev} objects or C{None} if a given disk
4044
           was not found or was no attached.
4045

4046
  """
4047
  bdevs = []
4048

    
4049
  for disk in disks:
4050
    rd = _RecursiveFindBD(disk)
4051
    if rd is None:
4052
      _Fail("Can't find device %s", disk)
4053
    bdevs.append(rd)
4054
  return bdevs
4055

    
4056

    
4057
def DrbdDisconnectNet(disks):
4058
  """Disconnects the network on a list of drbd devices.
4059

4060
  """
4061
  bdevs = _FindDisks(disks)
4062

    
4063
  # disconnect disks
4064
  for rd in bdevs:
4065
    try:
4066
      rd.DisconnectNet()
4067
    except errors.BlockDeviceError, err:
4068
      _Fail("Can't change network configuration to standalone mode: %s",
4069
            err, exc=True)
4070

    
4071

    
4072
def DrbdAttachNet(disks, instance_name, multimaster):
4073
  """Attaches the network on a list of drbd devices.
4074

4075
  """
4076
  bdevs = _FindDisks(disks)
4077

    
4078
  if multimaster:
4079
    for idx, rd in enumerate(bdevs):
4080
      try:
4081
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
4082
      except EnvironmentError, err:
4083
        _Fail("Can't create symlink: %s", err)
4084
  # reconnect disks, switch to new master configuration and if
4085
  # needed primary mode
4086
  for rd in bdevs:
4087
    try:
4088
      rd.AttachNet(multimaster)
4089
    except errors.BlockDeviceError, err:
4090
      _Fail("Can't change network configuration: %s", err)
4091

    
4092
  # wait until the disks are connected; we need to retry the re-attach
4093
  # if the device becomes standalone, as this might happen if the one
4094
  # node disconnects and reconnects in a different mode before the
4095
  # other node reconnects; in this case, one or both of the nodes will
4096
  # decide it has wrong configuration and switch to standalone
4097

    
4098
  def _Attach():
4099
    all_connected = True
4100

    
4101
    for rd in bdevs:
4102
      stats = rd.GetProcStatus()
4103

    
4104
      if multimaster:
4105
        # In the multimaster case we have to wait explicitly until
4106
        # the resource is Connected and UpToDate/UpToDate, because
4107
        # we promote *both nodes* to primary directly afterwards.
4108
        # Being in resync is not enough, since there is a race during which we
4109
        # may promote a node with an Outdated disk to primary, effectively
4110
        # tearing down the connection.
4111
        all_connected = (all_connected and
4112
                         stats.is_connected and
4113
                         stats.is_disk_uptodate and
4114
                         stats.peer_disk_uptodate)
4115
      else:
4116
        all_connected = (all_connected and
4117
                         (stats.is_connected or stats.is_in_resync))
4118

    
4119
      if stats.is_standalone:
4120
        # peer had different config info and this node became
4121
        # standalone, even though this should not happen with the
4122
        # new staged way of changing disk configs
4123
        try:
4124
          rd.AttachNet(multimaster)
4125
        except errors.BlockDeviceError, err:
4126
          _Fail("Can't change network configuration: %s", err)
4127

    
4128
    if not all_connected:
4129
      raise utils.RetryAgain()
4130

    
4131
  try:
4132
    # Start with a delay of 100 miliseconds and go up to 5 seconds
4133
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
4134
  except utils.RetryTimeout:
4135
    _Fail("Timeout in disk reconnecting")
4136

    
4137
  if multimaster:
4138
    # change to primary mode
4139
    for rd in bdevs:
4140
      try:
4141
        rd.Open()
4142
      except errors.BlockDeviceError, err:
4143
        _Fail("Can't change to primary mode: %s", err)
4144

    
4145

    
4146
def DrbdWaitSync(disks):
4147
  """Wait until DRBDs have synchronized.
4148

4149
  """
4150
  def _helper(rd):
4151
    stats = rd.GetProcStatus()
4152
    if not (stats.is_connected or stats.is_in_resync):
4153
      raise utils.RetryAgain()
4154
    return stats
4155

    
4156
  bdevs = _FindDisks(disks)
4157

    
4158
  min_resync = 100
4159
  alldone = True
4160
  for rd in bdevs:
4161
    try:
4162
      # poll each second for 15 seconds
4163
      stats = utils.Retry(_helper, 1, 15, args=[rd])
4164
    except utils.RetryTimeout:
4165
      stats = rd.GetProcStatus()
4166
      # last check
4167
      if not (stats.is_connected or stats.is_in_resync):
4168
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
4169
    alldone = alldone and (not stats.is_in_resync)
4170
    if stats.sync_percent is not None:
4171
      min_resync = min(min_resync, stats.sync_percent)
4172

    
4173
  return (alldone, min_resync)
4174

    
4175

    
4176
def DrbdNeedsActivation(disks):
4177
  """Checks which of the passed disks needs activation and returns their UUIDs.
4178

4179
  """
4180
  faulty_disks = []
4181

    
4182
  for disk in disks:
4183
    rd = _RecursiveFindBD(disk)
4184
    if rd is None:
4185
      faulty_disks.append(disk)
4186
      continue
4187

    
4188
    stats = rd.GetProcStatus()
4189
    if stats.is_standalone or stats.is_diskless:
4190
      faulty_disks.append(disk)
4191

    
4192
  return [disk.uuid for disk in faulty_disks]
4193

    
4194

    
4195
def GetDrbdUsermodeHelper():
4196
  """Returns DRBD usermode helper currently configured.
4197

4198
  """
4199
  try:
4200
    return drbd.DRBD8.GetUsermodeHelper()
4201
  except errors.BlockDeviceError, err:
4202
    _Fail(str(err))
4203

    
4204

    
4205
def PowercycleNode(hypervisor_type, hvparams=None):
4206
  """Hard-powercycle the node.
4207

4208
  Because we need to return first, and schedule the powercycle in the
4209
  background, we won't be able to report failures nicely.
4210

4211
  """
4212
  hyper = hypervisor.GetHypervisor(hypervisor_type)
4213
  try:
4214
    pid = os.fork()
4215
  except OSError:
4216
    # if we can't fork, we'll pretend that we're in the child process
4217
    pid = 0
4218
  if pid > 0:
4219
    return "Reboot scheduled in 5 seconds"
4220
  # ensure the child is running on ram
4221
  try:
4222
    utils.Mlockall()
4223
  except Exception: # pylint: disable=W0703
4224
    pass
4225
  time.sleep(5)
4226
  hyper.PowercycleNode(hvparams=hvparams)
4227

    
4228

    
4229
def _VerifyRestrictedCmdName(cmd):
4230
  """Verifies a restricted command name.
4231

4232
  @type cmd: string
4233
  @param cmd: Command name
4234
  @rtype: tuple; (boolean, string or None)
4235
  @return: The tuple's first element is the status; if C{False}, the second
4236
    element is an error message string, otherwise it's C{None}
4237

4238
  """
4239
  if not cmd.strip():
4240
    return (False, "Missing command name")
4241

    
4242
  if os.path.basename(cmd) != cmd:
4243
    return (False, "Invalid command name")
4244

    
4245
  if not constants.EXT_PLUGIN_MASK.match(cmd):
4246
    return (False, "Command name contains forbidden characters")
4247

    
4248
  return (True, None)
4249

    
4250

    
4251
def _CommonRestrictedCmdCheck(path, owner):
4252
  """Common checks for restricted command file system directories and files.
4253

4254
  @type path: string
4255
  @param path: Path to check
4256
  @param owner: C{None} or tuple containing UID and GID
4257
  @rtype: tuple; (boolean, string or C{os.stat} result)
4258
  @return: The tuple's first element is the status; if C{False}, the second
4259
    element is an error message string, otherwise it's the result of C{os.stat}
4260

4261
  """
4262
  if owner is None:
4263
    # Default to root as owner
4264
    owner = (0, 0)
4265

    
4266
  try:
4267
    st = os.stat(path)
4268
  except EnvironmentError, err:
4269
    return (False, "Can't stat(2) '%s': %s" % (path, err))
4270

    
4271
  if stat.S_IMODE(st.st_mode) & (~_RCMD_MAX_MODE):
4272
    return (False, "Permissions on '%s' are too permissive" % path)
4273

    
4274
  if (st.st_uid, st.st_gid) != owner:
4275
    (owner_uid, owner_gid) = owner
4276
    return (False, "'%s' is not owned by %s:%s" % (path, owner_uid, owner_gid))
4277

    
4278
  return (True, st)
4279

    
4280

    
4281
def _VerifyRestrictedCmdDirectory(path, _owner=None):
4282
  """Verifies restricted command directory.
4283

4284
  @type path: string
4285
  @param path: Path to check
4286
  @rtype: tuple; (boolean, string or None)
4287
  @return: The tuple's first element is the status; if C{False}, the second
4288
    element is an error message string, otherwise it's C{None}
4289

4290
  """
4291
  (status, value) = _CommonRestrictedCmdCheck(path, _owner)
4292

    
4293
  if not status:
4294
    return (False, value)
4295

    
4296
  if not stat.S_ISDIR(value.st_mode):
4297
    return (False, "Path '%s' is not a directory" % path)
4298

    
4299
  return (True, None)
4300

    
4301

    
4302
def _VerifyRestrictedCmd(path, cmd, _owner=None):
4303
  """Verifies a whole restricted command and returns its executable filename.
4304

4305
  @type path: string
4306
  @param path: Directory containing restricted commands
4307
  @type cmd: string
4308
  @param cmd: Command name
4309
  @rtype: tuple; (boolean, string)
4310
  @return: The tuple's first element is the status; if C{False}, the second
4311
    element is an error message string, otherwise the second element is the
4312
    absolute path to the executable
4313

4314
  """
4315
  executable = utils.PathJoin(path, cmd)
4316

    
4317
  (status, msg) = _CommonRestrictedCmdCheck(executable, _owner)
4318

    
4319
  if not status:
4320
    return (False, msg)
4321

    
4322
  if not utils.IsExecutable(executable):
4323
    return (False, "access(2) thinks '%s' can't be executed" % executable)
4324

    
4325
  return (True, executable)
4326

    
4327

    
4328
def _PrepareRestrictedCmd(path, cmd,
4329
                          _verify_dir=_VerifyRestrictedCmdDirectory,
4330
                          _verify_name=_VerifyRestrictedCmdName,
4331
                          _verify_cmd=_VerifyRestrictedCmd):
4332
  """Performs a number of tests on a restricted command.
4333

4334
  @type path: string
4335
  @param path: Directory containing restricted commands
4336
  @type cmd: string
4337
  @param cmd: Command name
4338
  @return: Same as L{_VerifyRestrictedCmd}
4339

4340
  """
4341
  # Verify the directory first
4342
  (status, msg) = _verify_dir(path)
4343
  if status:
4344
    # Check command if everything was alright
4345
    (status, msg) = _verify_name(cmd)
4346

    
4347
  if not status:
4348
    return (False, msg)
4349

    
4350
  # Check actual executable
4351
  return _verify_cmd(path, cmd)
4352

    
4353

    
4354
def RunRestrictedCmd(cmd,
4355
                     _lock_timeout=_RCMD_LOCK_TIMEOUT,
4356
                     _lock_file=pathutils.RESTRICTED_COMMANDS_LOCK_FILE,
4357
                     _path=pathutils.RESTRICTED_COMMANDS_DIR,
4358
                     _sleep_fn=time.sleep,
4359
                     _prepare_fn=_PrepareRestrictedCmd,
4360
                     _runcmd_fn=utils.RunCmd,
4361
                     _enabled=constants.ENABLE_RESTRICTED_COMMANDS):
4362
  """Executes a restricted command after performing strict tests.
4363

4364
  @type cmd: string
4365
  @param cmd: Command name
4366
  @rtype: string
4367
  @return: Command output
4368
  @raise RPCFail: In case of an error
4369

4370
  """
4371
  logging.info("Preparing to run restricted command '%s'", cmd)
4372

    
4373
  if not _enabled:
4374
    _Fail("Restricted commands disabled at configure time")
4375

    
4376
  lock = None
4377
  try:
4378
    cmdresult = None
4379
    try:
4380
      lock = utils.FileLock.Open(_lock_file)
4381
      lock.Exclusive(blocking=True, timeout=_lock_timeout)
4382

    
4383
      (status, value) = _prepare_fn(_path, cmd)
4384

    
4385
      if status:
4386
        cmdresult = _runcmd_fn([value], env={}, reset_env=True,
4387
                               postfork_fn=lambda _: lock.Unlock())
4388
      else:
4389
        logging.error(value)
4390
    except Exception: # pylint: disable=W0703
4391
      # Keep original error in log
4392
      logging.exception("Caught exception")
4393

    
4394
    if cmdresult is None:
4395
      logging.info("Sleeping for %0.1f seconds before returning",
4396
                   _RCMD_INVALID_DELAY)
4397
      _sleep_fn(_RCMD_INVALID_DELAY)
4398

    
4399
      # Do not include original error message in returned error
4400
      _Fail("Executing command '%s' failed" % cmd)
4401
    elif cmdresult.failed or cmdresult.fail_reason:
4402
      _Fail("Restricted command '%s' failed: %s; output: %s",
4403
            cmd, cmdresult.fail_reason, cmdresult.output)
4404
    else:
4405
      return cmdresult.output
4406
  finally:
4407
    if lock is not None:
4408
      # Release lock at last
4409
      lock.Close()
4410
      lock = None
4411

    
4412

    
4413
def SetWatcherPause(until, _filename=pathutils.WATCHER_PAUSEFILE):
4414
  """Creates or removes the watcher pause file.
4415

4416
  @type until: None or number
4417
  @param until: Unix timestamp saying until when the watcher shouldn't run
4418

4419
  """
4420
  if until is None:
4421
    logging.info("Received request to no longer pause watcher")
4422
    utils.RemoveFile(_filename)
4423
  else:
4424
    logging.info("Received request to pause watcher until %s", until)
4425

    
4426
    if not ht.TNumber(until):
4427
      _Fail("Duration must be numeric")
4428

    
4429
    utils.WriteFile(_filename, data="%d\n" % (until, ), mode=0644)
4430

    
4431

    
4432
def ConfigureOVS(ovs_name, ovs_link):
4433
  """Creates a OpenvSwitch on the node.
4434

4435
  This function sets up a OpenvSwitch on the node with given name nad
4436
  connects it via a given eth device.
4437

4438
  @type ovs_name: string
4439
  @param ovs_name: Name of the OpenvSwitch to create.
4440
  @type ovs_link: None or string
4441
  @param ovs_link: Ethernet device for outside connection (can be missing)
4442

4443
  """
4444
  # Initialize the OpenvSwitch
4445
  result = utils.RunCmd(["ovs-vsctl", "add-br", ovs_name])
4446
  if result.failed:
4447
    _Fail("Failed to create openvswitch. Script return value: %s, output: '%s'"
4448
          % (result.exit_code, result.output), log=True)
4449

    
4450
  # And connect it to a physical interface, if given
4451
  if ovs_link:
4452
    result = utils.RunCmd(["ovs-vsctl", "add-port", ovs_name, ovs_link])
4453
    if result.failed:
4454
      _Fail("Failed to connect openvswitch to  interface %s. Script return"
4455
            " value: %s, output: '%s'" % (ovs_link, result.exit_code,
4456
            result.output), log=True)
4457

    
4458

    
4459
class HooksRunner(object):
4460
  """Hook runner.
4461

4462
  This class is instantiated on the node side (ganeti-noded) and not
4463
  on the master side.
4464

4465
  """
4466
  def __init__(self, hooks_base_dir=None):
4467
    """Constructor for hooks runner.
4468

4469
    @type hooks_base_dir: str or None
4470
    @param hooks_base_dir: if not None, this overrides the
4471
        L{pathutils.HOOKS_BASE_DIR} (useful for unittests)
4472

4473
    """
4474
    if hooks_base_dir is None:
4475
      hooks_base_dir = pathutils.HOOKS_BASE_DIR
4476
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
4477
    # constant
4478
    self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
4479

    
4480
  def RunLocalHooks(self, node_list, hpath, phase, env):
4481
    """Check that the hooks will be run only locally and then run them.
4482

4483
    """
4484
    assert len(node_list) == 1
4485
    node = node_list[0]
4486
    _, myself = ssconf.GetMasterAndMyself()
4487
    assert node == myself
4488

    
4489
    results = self.RunHooks(hpath, phase, env)
4490

    
4491
    # Return values in the form expected by HooksMaster
4492
    return {node: (None, False, results)}
4493

    
4494
  def RunHooks(self, hpath, phase, env):
4495
    """Run the scripts in the hooks directory.
4496

4497
    @type hpath: str
4498
    @param hpath: the path to the hooks directory which
4499
        holds the scripts
4500
    @type phase: str
4501
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
4502
        L{constants.HOOKS_PHASE_POST}
4503
    @type env: dict
4504
    @param env: dictionary with the environment for the hook
4505
    @rtype: list
4506
    @return: list of 3-element tuples:
4507
      - script path
4508
      - script result, either L{constants.HKR_SUCCESS} or
4509
        L{constants.HKR_FAIL}
4510
      - output of the script
4511

4512
    @raise errors.ProgrammerError: for invalid input
4513
        parameters
4514

4515
    """
4516
    if phase == constants.HOOKS_PHASE_PRE:
4517
      suffix = "pre"
4518
    elif phase == constants.HOOKS_PHASE_POST:
4519
      suffix = "post"
4520
    else:
4521
      _Fail("Unknown hooks phase '%s'", phase)
4522

    
4523
    subdir = "%s-%s.d" % (hpath, suffix)
4524
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
4525

    
4526
    results = []
4527

    
4528
    if not os.path.isdir(dir_name):
4529
      # for non-existing/non-dirs, we simply exit instead of logging a
4530
      # warning at every operation
4531
      return results
4532

    
4533
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
4534

    
4535
    for (relname, relstatus, runresult) in runparts_results:
4536
      if relstatus == constants.RUNPARTS_SKIP:
4537
        rrval = constants.HKR_SKIP
4538
        output = ""
4539
      elif relstatus == constants.RUNPARTS_ERR:
4540
        rrval = constants.HKR_FAIL
4541
        output = "Hook script execution error: %s" % runresult
4542
      elif relstatus == constants.RUNPARTS_RUN:
4543
        if runresult.failed:
4544
          rrval = constants.HKR_FAIL
4545
        else:
4546
          rrval = constants.HKR_SUCCESS
4547
        output = utils.SafeEncode(runresult.output.strip())
4548
      results.append(("%s/%s" % (subdir, relname), rrval, output))
4549

    
4550
    return results
4551

    
4552

    
4553
class IAllocatorRunner(object):
4554
  """IAllocator runner.
4555

4556
  This class is instantiated on the node side (ganeti-noded) and not on
4557
  the master side.
4558

4559
  """
4560
  @staticmethod
4561
  def Run(name, idata, ial_params):
4562
    """Run an iallocator script.
4563

4564
    @type name: str
4565
    @param name: the iallocator script name
4566
    @type idata: str
4567
    @param idata: the allocator input data
4568
    @type ial_params: list
4569
    @param ial_params: the iallocator parameters
4570

4571
    @rtype: tuple
4572
    @return: two element tuple of:
4573
       - status
4574
       - either error message or stdout of allocator (for success)
4575

4576
    """
4577
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
4578
                                  os.path.isfile)
4579
    if alloc_script is None:
4580
      _Fail("iallocator module '%s' not found in the search path", name)
4581

    
4582
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
4583
    try:
4584
      os.write(fd, idata)
4585
      os.close(fd)
4586
      result = utils.RunCmd([alloc_script, fin_name] + ial_params)
4587
      if result.failed:
4588
        _Fail("iallocator module '%s' failed: %s, output '%s'",
4589
              name, result.fail_reason, result.output)
4590
    finally:
4591
      os.unlink(fin_name)
4592

    
4593
    return result.stdout
4594

    
4595

    
4596
class DevCacheManager(object):
4597
  """Simple class for managing a cache of block device information.
4598

4599
  """
4600
  _DEV_PREFIX = "/dev/"
4601
  _ROOT_DIR = pathutils.BDEV_CACHE_DIR
4602

    
4603
  @classmethod
4604
  def _ConvertPath(cls, dev_path):
4605
    """Converts a /dev/name path to the cache file name.
4606

4607
    This replaces slashes with underscores and strips the /dev
4608
    prefix. It then returns the full path to the cache file.
4609

4610
    @type dev_path: str
4611
    @param dev_path: the C{/dev/} path name
4612
    @rtype: str
4613
    @return: the converted path name
4614

4615
    """
4616
    if dev_path.startswith(cls._DEV_PREFIX):
4617
      dev_path = dev_path[len(cls._DEV_PREFIX):]
4618
    dev_path = dev_path.replace("/", "_")
4619
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
4620
    return fpath
4621

    
4622
  @classmethod
4623
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
4624
    """Updates the cache information for a given device.
4625

4626
    @type dev_path: str
4627
    @param dev_path: the pathname of the device
4628
    @type owner: str
4629
    @param owner: the owner (instance name) of the device
4630
    @type on_primary: bool
4631
    @param on_primary: whether this is the primary
4632
        node nor not
4633
    @type iv_name: str
4634
    @param iv_name: the instance-visible name of the
4635
        device, as in objects.Disk.iv_name
4636

4637
    @rtype: None
4638

4639
    """
4640
    if dev_path is None:
4641
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
4642
      return
4643
    fpath = cls._ConvertPath(dev_path)
4644
    if on_primary:
4645
      state = "primary"
4646
    else:
4647
      state = "secondary"
4648
    if iv_name is None:
4649
      iv_name = "not_visible"
4650
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
4651
    try:
4652
      utils.WriteFile(fpath, data=fdata)
4653
    except EnvironmentError, err:
4654
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
4655

    
4656
  @classmethod
4657
  def RemoveCache(cls, dev_path):
4658
    """Remove data for a dev_path.
4659

4660
    This is just a wrapper over L{utils.io.RemoveFile} with a converted
4661
    path name and logging.
4662

4663
    @type dev_path: str
4664
    @param dev_path: the pathname of the device
4665

4666
    @rtype: None
4667

4668
    """
4669
    if dev_path is None:
4670
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
4671
      return
4672
    fpath = cls._ConvertPath(dev_path)
4673
    try:
4674
      utils.RemoveFile(fpath)
4675
    except EnvironmentError, err:
4676
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)