Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 5349519d

History | View | Annotate | Download (146.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable=E1103,C0302
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37
# C0302: This module has become too big and should be split up
38

    
39

    
40
import os
41
import os.path
42
import shutil
43
import time
44
import stat
45
import errno
46
import re
47
import random
48
import logging
49
import tempfile
50
import zlib
51
import base64
52
import signal
53

    
54
from ganeti import errors
55
from ganeti import utils
56
from ganeti import ssh
57
from ganeti import hypervisor
58
from ganeti import constants
59
from ganeti.storage import bdev
60
from ganeti.storage import drbd
61
from ganeti.storage import filestorage
62
from ganeti import objects
63
from ganeti import ssconf
64
from ganeti import serializer
65
from ganeti import netutils
66
from ganeti import runtime
67
from ganeti import compat
68
from ganeti import pathutils
69
from ganeti import vcluster
70
from ganeti import ht
71
from ganeti.storage.base import BlockDev
72
from ganeti.storage.drbd import DRBD8
73
from ganeti import hooksmaster
74

    
75

    
76
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
77
_ALLOWED_CLEAN_DIRS = compat.UniqueFrozenset([
78
  pathutils.DATA_DIR,
79
  pathutils.JOB_QUEUE_ARCHIVE_DIR,
80
  pathutils.QUEUE_DIR,
81
  pathutils.CRYPTO_KEYS_DIR,
82
  ])
83
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
84
_X509_KEY_FILE = "key"
85
_X509_CERT_FILE = "cert"
86
_IES_STATUS_FILE = "status"
87
_IES_PID_FILE = "pid"
88
_IES_CA_FILE = "ca"
89

    
90
#: Valid LVS output line regex
91
_LVSLINE_REGEX = re.compile(r"^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$")
92

    
93
# Actions for the master setup script
94
_MASTER_START = "start"
95
_MASTER_STOP = "stop"
96

    
97
#: Maximum file permissions for restricted command directory and executables
98
_RCMD_MAX_MODE = (stat.S_IRWXU |
99
                  stat.S_IRGRP | stat.S_IXGRP |
100
                  stat.S_IROTH | stat.S_IXOTH)
101

    
102
#: Delay before returning an error for restricted commands
103
_RCMD_INVALID_DELAY = 10
104

    
105
#: How long to wait to acquire lock for restricted commands (shorter than
106
#: L{_RCMD_INVALID_DELAY}) to reduce blockage of noded forks when many
107
#: command requests arrive
108
_RCMD_LOCK_TIMEOUT = _RCMD_INVALID_DELAY * 0.8
109

    
110

    
111
class RPCFail(Exception):
112
  """Class denoting RPC failure.
113

114
  Its argument is the error message.
115

116
  """
117

    
118

    
119
def _GetInstReasonFilename(instance_name):
120
  """Path of the file containing the reason of the instance status change.
121

122
  @type instance_name: string
123
  @param instance_name: The name of the instance
124
  @rtype: string
125
  @return: The path of the file
126

127
  """
128
  return utils.PathJoin(pathutils.INSTANCE_REASON_DIR, instance_name)
129

    
130

    
131
def _StoreInstReasonTrail(instance_name, trail):
132
  """Serialize a reason trail related to an instance change of state to file.
133

134
  The exact location of the file depends on the name of the instance and on
135
  the configuration of the Ganeti cluster defined at deploy time.
136

137
  @type instance_name: string
138
  @param instance_name: The name of the instance
139
  @rtype: None
140

141
  """
142
  json = serializer.DumpJson(trail)
143
  filename = _GetInstReasonFilename(instance_name)
144
  utils.WriteFile(filename, data=json)
145

    
146

    
147
def _Fail(msg, *args, **kwargs):
148
  """Log an error and the raise an RPCFail exception.
149

150
  This exception is then handled specially in the ganeti daemon and
151
  turned into a 'failed' return type. As such, this function is a
152
  useful shortcut for logging the error and returning it to the master
153
  daemon.
154

155
  @type msg: string
156
  @param msg: the text of the exception
157
  @raise RPCFail
158

159
  """
160
  if args:
161
    msg = msg % args
162
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
163
    if "exc" in kwargs and kwargs["exc"]:
164
      logging.exception(msg)
165
    else:
166
      logging.error(msg)
167
  raise RPCFail(msg)
168

    
169

    
170
def _GetConfig():
171
  """Simple wrapper to return a SimpleStore.
172

173
  @rtype: L{ssconf.SimpleStore}
174
  @return: a SimpleStore instance
175

176
  """
177
  return ssconf.SimpleStore()
178

    
179

    
180
def _GetSshRunner(cluster_name):
181
  """Simple wrapper to return an SshRunner.
182

183
  @type cluster_name: str
184
  @param cluster_name: the cluster name, which is needed
185
      by the SshRunner constructor
186
  @rtype: L{ssh.SshRunner}
187
  @return: an SshRunner instance
188

189
  """
190
  return ssh.SshRunner(cluster_name)
191

    
192

    
193
def _Decompress(data):
194
  """Unpacks data compressed by the RPC client.
195

196
  @type data: list or tuple
197
  @param data: Data sent by RPC client
198
  @rtype: str
199
  @return: Decompressed data
200

201
  """
202
  assert isinstance(data, (list, tuple))
203
  assert len(data) == 2
204
  (encoding, content) = data
205
  if encoding == constants.RPC_ENCODING_NONE:
206
    return content
207
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
208
    return zlib.decompress(base64.b64decode(content))
209
  else:
210
    raise AssertionError("Unknown data encoding")
211

    
212

    
213
def _CleanDirectory(path, exclude=None):
214
  """Removes all regular files in a directory.
215

216
  @type path: str
217
  @param path: the directory to clean
218
  @type exclude: list
219
  @param exclude: list of files to be excluded, defaults
220
      to the empty list
221

222
  """
223
  if path not in _ALLOWED_CLEAN_DIRS:
224
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
225
          path)
226

    
227
  if not os.path.isdir(path):
228
    return
229
  if exclude is None:
230
    exclude = []
231
  else:
232
    # Normalize excluded paths
233
    exclude = [os.path.normpath(i) for i in exclude]
234

    
235
  for rel_name in utils.ListVisibleFiles(path):
236
    full_name = utils.PathJoin(path, rel_name)
237
    if full_name in exclude:
238
      continue
239
    if os.path.isfile(full_name) and not os.path.islink(full_name):
240
      utils.RemoveFile(full_name)
241

    
242

    
243
def _BuildUploadFileList():
244
  """Build the list of allowed upload files.
245

246
  This is abstracted so that it's built only once at module import time.
247

248
  """
249
  allowed_files = set([
250
    pathutils.CLUSTER_CONF_FILE,
251
    pathutils.ETC_HOSTS,
252
    pathutils.SSH_KNOWN_HOSTS_FILE,
253
    pathutils.VNC_PASSWORD_FILE,
254
    pathutils.RAPI_CERT_FILE,
255
    pathutils.SPICE_CERT_FILE,
256
    pathutils.SPICE_CACERT_FILE,
257
    pathutils.RAPI_USERS_FILE,
258
    pathutils.CONFD_HMAC_KEY,
259
    pathutils.CLUSTER_DOMAIN_SECRET_FILE,
260
    ])
261

    
262
  for hv_name in constants.HYPER_TYPES:
263
    hv_class = hypervisor.GetHypervisorClass(hv_name)
264
    allowed_files.update(hv_class.GetAncillaryFiles()[0])
265

    
266
  assert pathutils.FILE_STORAGE_PATHS_FILE not in allowed_files, \
267
    "Allowed file storage paths should never be uploaded via RPC"
268

    
269
  return frozenset(allowed_files)
270

    
271

    
272
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
273

    
274

    
275
def JobQueuePurge():
276
  """Removes job queue files and archived jobs.
277

278
  @rtype: tuple
279
  @return: True, None
280

281
  """
282
  _CleanDirectory(pathutils.QUEUE_DIR, exclude=[pathutils.JOB_QUEUE_LOCK_FILE])
283
  _CleanDirectory(pathutils.JOB_QUEUE_ARCHIVE_DIR)
284

    
285

    
286
def GetMasterNodeName():
287
  """Returns the master node name.
288

289
  @rtype: string
290
  @return: name of the master node
291
  @raise RPCFail: in case of errors
292

293
  """
294
  try:
295
    return _GetConfig().GetMasterNode()
296
  except errors.ConfigurationError, err:
297
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
298

    
299

    
300
def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
301
  """Decorator that runs hooks before and after the decorated function.
302

303
  @type hook_opcode: string
304
  @param hook_opcode: opcode of the hook
305
  @type hooks_path: string
306
  @param hooks_path: path of the hooks
307
  @type env_builder_fn: function
308
  @param env_builder_fn: function that returns a dictionary containing the
309
    environment variables for the hooks. Will get all the parameters of the
310
    decorated function.
311
  @raise RPCFail: in case of pre-hook failure
312

313
  """
314
  def decorator(fn):
315
    def wrapper(*args, **kwargs):
316
      _, myself = ssconf.GetMasterAndMyself()
317
      nodes = ([myself], [myself])  # these hooks run locally
318

    
319
      env_fn = compat.partial(env_builder_fn, *args, **kwargs)
320

    
321
      cfg = _GetConfig()
322
      hr = HooksRunner()
323
      hm = hooksmaster.HooksMaster(hook_opcode, hooks_path, nodes,
324
                                   hr.RunLocalHooks, None, env_fn,
325
                                   logging.warning, cfg.GetClusterName(),
326
                                   cfg.GetMasterNode())
327
      hm.RunPhase(constants.HOOKS_PHASE_PRE)
328
      result = fn(*args, **kwargs)
329
      hm.RunPhase(constants.HOOKS_PHASE_POST)
330

    
331
      return result
332
    return wrapper
333
  return decorator
334

    
335

    
336
def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
337
  """Builds environment variables for master IP hooks.
338

339
  @type master_params: L{objects.MasterNetworkParameters}
340
  @param master_params: network parameters of the master
341
  @type use_external_mip_script: boolean
342
  @param use_external_mip_script: whether to use an external master IP
343
    address setup script (unused, but necessary per the implementation of the
344
    _RunLocalHooks decorator)
345

346
  """
347
  # pylint: disable=W0613
348
  ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
349
  env = {
350
    "MASTER_NETDEV": master_params.netdev,
351
    "MASTER_IP": master_params.ip,
352
    "MASTER_NETMASK": str(master_params.netmask),
353
    "CLUSTER_IP_VERSION": str(ver),
354
  }
355

    
356
  return env
357

    
358

    
359
def _RunMasterSetupScript(master_params, action, use_external_mip_script):
360
  """Execute the master IP address setup script.
361

362
  @type master_params: L{objects.MasterNetworkParameters}
363
  @param master_params: network parameters of the master
364
  @type action: string
365
  @param action: action to pass to the script. Must be one of
366
    L{backend._MASTER_START} or L{backend._MASTER_STOP}
367
  @type use_external_mip_script: boolean
368
  @param use_external_mip_script: whether to use an external master IP
369
    address setup script
370
  @raise backend.RPCFail: if there are errors during the execution of the
371
    script
372

373
  """
374
  env = _BuildMasterIpEnv(master_params)
375

    
376
  if use_external_mip_script:
377
    setup_script = pathutils.EXTERNAL_MASTER_SETUP_SCRIPT
378
  else:
379
    setup_script = pathutils.DEFAULT_MASTER_SETUP_SCRIPT
380

    
381
  result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
382

    
383
  if result.failed:
384
    _Fail("Failed to %s the master IP. Script return value: %s, output: '%s'" %
385
          (action, result.exit_code, result.output), log=True)
386

    
387

    
388
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
389
               _BuildMasterIpEnv)
390
def ActivateMasterIp(master_params, use_external_mip_script):
391
  """Activate the IP address of the master daemon.
392

393
  @type master_params: L{objects.MasterNetworkParameters}
394
  @param master_params: network parameters of the master
395
  @type use_external_mip_script: boolean
396
  @param use_external_mip_script: whether to use an external master IP
397
    address setup script
398
  @raise RPCFail: in case of errors during the IP startup
399

400
  """
401
  _RunMasterSetupScript(master_params, _MASTER_START,
402
                        use_external_mip_script)
403

    
404

    
405
def StartMasterDaemons(no_voting):
406
  """Activate local node as master node.
407

408
  The function will start the master daemons (ganeti-masterd and ganeti-rapi).
409

410
  @type no_voting: boolean
411
  @param no_voting: whether to start ganeti-masterd without a node vote
412
      but still non-interactively
413
  @rtype: None
414

415
  """
416

    
417
  if no_voting:
418
    masterd_args = "--no-voting --yes-do-it"
419
  else:
420
    masterd_args = ""
421

    
422
  env = {
423
    "EXTRA_MASTERD_ARGS": masterd_args,
424
    }
425

    
426
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
427
  if result.failed:
428
    msg = "Can't start Ganeti master: %s" % result.output
429
    logging.error(msg)
430
    _Fail(msg)
431

    
432

    
433
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
434
               _BuildMasterIpEnv)
435
def DeactivateMasterIp(master_params, use_external_mip_script):
436
  """Deactivate the master IP on this node.
437

438
  @type master_params: L{objects.MasterNetworkParameters}
439
  @param master_params: network parameters of the master
440
  @type use_external_mip_script: boolean
441
  @param use_external_mip_script: whether to use an external master IP
442
    address setup script
443
  @raise RPCFail: in case of errors during the IP turndown
444

445
  """
446
  _RunMasterSetupScript(master_params, _MASTER_STOP,
447
                        use_external_mip_script)
448

    
449

    
450
def StopMasterDaemons():
451
  """Stop the master daemons on this node.
452

453
  Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
454

455
  @rtype: None
456

457
  """
458
  # TODO: log and report back to the caller the error failures; we
459
  # need to decide in which case we fail the RPC for this
460

    
461
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
462
  if result.failed:
463
    logging.error("Could not stop Ganeti master, command %s had exitcode %s"
464
                  " and error %s",
465
                  result.cmd, result.exit_code, result.output)
466

    
467

    
468
def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
469
  """Change the netmask of the master IP.
470

471
  @param old_netmask: the old value of the netmask
472
  @param netmask: the new value of the netmask
473
  @param master_ip: the master IP
474
  @param master_netdev: the master network device
475

476
  """
477
  if old_netmask == netmask:
478
    return
479

    
480
  if not netutils.IPAddress.Own(master_ip):
481
    _Fail("The master IP address is not up, not attempting to change its"
482
          " netmask")
483

    
484
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
485
                         "%s/%s" % (master_ip, netmask),
486
                         "dev", master_netdev, "label",
487
                         "%s:0" % master_netdev])
488
  if result.failed:
489
    _Fail("Could not set the new netmask on the master IP address")
490

    
491
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
492
                         "%s/%s" % (master_ip, old_netmask),
493
                         "dev", master_netdev, "label",
494
                         "%s:0" % master_netdev])
495
  if result.failed:
496
    _Fail("Could not bring down the master IP address with the old netmask")
497

    
498

    
499
def EtcHostsModify(mode, host, ip):
500
  """Modify a host entry in /etc/hosts.
501

502
  @param mode: The mode to operate. Either add or remove entry
503
  @param host: The host to operate on
504
  @param ip: The ip associated with the entry
505

506
  """
507
  if mode == constants.ETC_HOSTS_ADD:
508
    if not ip:
509
      RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
510
              " present")
511
    utils.AddHostToEtcHosts(host, ip)
512
  elif mode == constants.ETC_HOSTS_REMOVE:
513
    if ip:
514
      RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
515
              " parameter is present")
516
    utils.RemoveHostFromEtcHosts(host)
517
  else:
518
    RPCFail("Mode not supported")
519

    
520

    
521
def LeaveCluster(modify_ssh_setup):
522
  """Cleans up and remove the current node.
523

524
  This function cleans up and prepares the current node to be removed
525
  from the cluster.
526

527
  If processing is successful, then it raises an
528
  L{errors.QuitGanetiException} which is used as a special case to
529
  shutdown the node daemon.
530

531
  @param modify_ssh_setup: boolean
532

533
  """
534
  _CleanDirectory(pathutils.DATA_DIR)
535
  _CleanDirectory(pathutils.CRYPTO_KEYS_DIR)
536
  JobQueuePurge()
537

    
538
  if modify_ssh_setup:
539
    try:
540
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
541

    
542
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
543

    
544
      utils.RemoveFile(priv_key)
545
      utils.RemoveFile(pub_key)
546
    except errors.OpExecError:
547
      logging.exception("Error while processing ssh files")
548

    
549
  try:
550
    utils.RemoveFile(pathutils.CONFD_HMAC_KEY)
551
    utils.RemoveFile(pathutils.RAPI_CERT_FILE)
552
    utils.RemoveFile(pathutils.SPICE_CERT_FILE)
553
    utils.RemoveFile(pathutils.SPICE_CACERT_FILE)
554
    utils.RemoveFile(pathutils.NODED_CERT_FILE)
555
  except: # pylint: disable=W0702
556
    logging.exception("Error while removing cluster secrets")
557

    
558
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.CONFD])
559
  if result.failed:
560
    logging.error("Command %s failed with exitcode %s and error %s",
561
                  result.cmd, result.exit_code, result.output)
562

    
563
  # Raise a custom exception (handled in ganeti-noded)
564
  raise errors.QuitGanetiException(True, "Shutdown scheduled")
565

    
566

    
567
def _CheckStorageParams(params, num_params):
568
  """Performs sanity checks for storage parameters.
569

570
  @type params: list
571
  @param params: list of storage parameters
572
  @type num_params: int
573
  @param num_params: expected number of parameters
574

575
  """
576
  if params is None:
577
    raise errors.ProgrammerError("No storage parameters for storage"
578
                                 " reporting is provided.")
579
  if not isinstance(params, list):
580
    raise errors.ProgrammerError("The storage parameters are not of type"
581
                                 " list: '%s'" % params)
582
  if not len(params) == num_params:
583
    raise errors.ProgrammerError("Did not receive the expected number of"
584
                                 "storage parameters: expected %s,"
585
                                 " received '%s'" % (num_params, len(params)))
586

    
587

    
588
def _CheckLvmStorageParams(params):
589
  """Performs sanity check for the 'exclusive storage' flag.
590

591
  @see: C{_CheckStorageParams}
592

593
  """
594
  _CheckStorageParams(params, 1)
595
  excl_stor = params[0]
596
  if not isinstance(params[0], bool):
597
    raise errors.ProgrammerError("Exclusive storage parameter is not"
598
                                 " boolean: '%s'." % excl_stor)
599
  return excl_stor
600

    
601

    
602
def _GetLvmVgSpaceInfo(name, params):
603
  """Wrapper around C{_GetVgInfo} which checks the storage parameters.
604

605
  @type name: string
606
  @param name: name of the volume group
607
  @type params: list
608
  @param params: list of storage parameters, which in this case should be
609
    containing only one for exclusive storage
610

611
  """
612
  excl_stor = _CheckLvmStorageParams(params)
613
  return _GetVgInfo(name, excl_stor)
614

    
615

    
616
def _GetVgInfo(
617
    name, excl_stor, info_fn=bdev.LogicalVolume.GetVGInfo):
618
  """Retrieves information about a LVM volume group.
619

620
  """
621
  # TODO: GetVGInfo supports returning information for multiple VGs at once
622
  vginfo = info_fn([name], excl_stor)
623
  if vginfo:
624
    vg_free = int(round(vginfo[0][0], 0))
625
    vg_size = int(round(vginfo[0][1], 0))
626
  else:
627
    vg_free = None
628
    vg_size = None
629

    
630
  return {
631
    "type": constants.ST_LVM_VG,
632
    "name": name,
633
    "storage_free": vg_free,
634
    "storage_size": vg_size,
635
    }
636

    
637

    
638
def _GetLvmPvSpaceInfo(name, params):
639
  """Wrapper around C{_GetVgSpindlesInfo} with sanity checks.
640

641
  @see: C{_GetLvmVgSpaceInfo}
642

643
  """
644
  excl_stor = _CheckLvmStorageParams(params)
645
  return _GetVgSpindlesInfo(name, excl_stor)
646

    
647

    
648
def _GetVgSpindlesInfo(
649
    name, excl_stor, info_fn=bdev.LogicalVolume.GetVgSpindlesInfo):
650
  """Retrieves information about spindles in an LVM volume group.
651

652
  @type name: string
653
  @param name: VG name
654
  @type excl_stor: bool
655
  @param excl_stor: exclusive storage
656
  @rtype: dict
657
  @return: dictionary whose keys are "name", "vg_free", "vg_size" for VG name,
658
      free spindles, total spindles respectively
659

660
  """
661
  if excl_stor:
662
    (vg_free, vg_size) = info_fn(name)
663
  else:
664
    vg_free = 0
665
    vg_size = 0
666
  return {
667
    "type": constants.ST_LVM_PV,
668
    "name": name,
669
    "storage_free": vg_free,
670
    "storage_size": vg_size,
671
    }
672

    
673

    
674
def _GetHvInfo(name, hvparams, get_hv_fn=hypervisor.GetHypervisor):
675
  """Retrieves node information from a hypervisor.
676

677
  The information returned depends on the hypervisor. Common items:
678

679
    - vg_size is the size of the configured volume group in MiB
680
    - vg_free is the free size of the volume group in MiB
681
    - memory_dom0 is the memory allocated for domain0 in MiB
682
    - memory_free is the currently available (free) ram in MiB
683
    - memory_total is the total number of ram in MiB
684
    - hv_version: the hypervisor version, if available
685

686
  @type hvparams: dict of string
687
  @param hvparams: the hypervisor's hvparams
688

689
  """
690
  return get_hv_fn(name).GetNodeInfo(hvparams=hvparams)
691

    
692

    
693
def _GetHvInfoAll(hv_specs, get_hv_fn=hypervisor.GetHypervisor):
694
  """Retrieves node information for all hypervisors.
695

696
  See C{_GetHvInfo} for information on the output.
697

698
  @type hv_specs: list of pairs (string, dict of strings)
699
  @param hv_specs: list of pairs of a hypervisor's name and its hvparams
700

701
  """
702
  if hv_specs is None:
703
    return None
704

    
705
  result = []
706
  for hvname, hvparams in hv_specs:
707
    result.append(_GetHvInfo(hvname, hvparams, get_hv_fn))
708
  return result
709

    
710

    
711
def _GetNamedNodeInfo(names, fn):
712
  """Calls C{fn} for all names in C{names} and returns a dictionary.
713

714
  @rtype: None or dict
715

716
  """
717
  if names is None:
718
    return None
719
  else:
720
    return map(fn, names)
721

    
722

    
723
def GetNodeInfo(storage_units, hv_specs):
724
  """Gives back a hash with different information about the node.
725

726
  @type storage_units: list of tuples (string, string, list)
727
  @param storage_units: List of tuples (storage unit, identifier, parameters) to
728
    ask for disk space information. In case of lvm-vg, the identifier is
729
    the VG name. The parameters can contain additional, storage-type-specific
730
    parameters, for example exclusive storage for lvm storage.
731
  @type hv_specs: list of pairs (string, dict of strings)
732
  @param hv_specs: list of pairs of a hypervisor's name and its hvparams
733
  @rtype: tuple; (string, None/dict, None/dict)
734
  @return: Tuple containing boot ID, volume group information and hypervisor
735
    information
736

737
  """
738
  bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
739
  storage_info = _GetNamedNodeInfo(
740
    storage_units,
741
    (lambda (storage_type, storage_key, storage_params):
742
        _ApplyStorageInfoFunction(storage_type, storage_key, storage_params)))
743
  hv_info = _GetHvInfoAll(hv_specs)
744
  return (bootid, storage_info, hv_info)
745

    
746

    
747
def _GetFileStorageSpaceInfo(path, params):
748
  """Wrapper around filestorage.GetSpaceInfo.
749

750
  The purpose of this wrapper is to call filestorage.GetFileStorageSpaceInfo
751
  and ignore the *args parameter to not leak it into the filestorage
752
  module's code.
753

754
  @see: C{filestorage.GetFileStorageSpaceInfo} for description of the
755
    parameters.
756

757
  """
758
  _CheckStorageParams(params, 0)
759
  return filestorage.GetFileStorageSpaceInfo(path)
760

    
761

    
762
# FIXME: implement storage reporting for all missing storage types.
763
_STORAGE_TYPE_INFO_FN = {
764
  constants.ST_BLOCK: None,
765
  constants.ST_DISKLESS: None,
766
  constants.ST_EXT: None,
767
  constants.ST_FILE: _GetFileStorageSpaceInfo,
768
  constants.ST_LVM_PV: _GetLvmPvSpaceInfo,
769
  constants.ST_LVM_VG: _GetLvmVgSpaceInfo,
770
  constants.ST_SHARED_FILE: None,
771
  constants.ST_RADOS: None,
772
}
773

    
774

    
775
def _ApplyStorageInfoFunction(storage_type, storage_key, *args):
776
  """Looks up and applies the correct function to calculate free and total
777
  storage for the given storage type.
778

779
  @type storage_type: string
780
  @param storage_type: the storage type for which the storage shall be reported.
781
  @type storage_key: string
782
  @param storage_key: identifier of a storage unit, e.g. the volume group name
783
    of an LVM storage unit
784
  @type args: any
785
  @param args: various parameters that can be used for storage reporting. These
786
    parameters and their semantics vary from storage type to storage type and
787
    are just propagated in this function.
788
  @return: the results of the application of the storage space function (see
789
    _STORAGE_TYPE_INFO_FN) if storage space reporting is implemented for that
790
    storage type
791
  @raises NotImplementedError: for storage types who don't support space
792
    reporting yet
793
  """
794
  fn = _STORAGE_TYPE_INFO_FN[storage_type]
795
  if fn is not None:
796
    return fn(storage_key, *args)
797
  else:
798
    raise NotImplementedError
799

    
800

    
801
def _CheckExclusivePvs(pvi_list):
802
  """Check that PVs are not shared among LVs
803

804
  @type pvi_list: list of L{objects.LvmPvInfo} objects
805
  @param pvi_list: information about the PVs
806

807
  @rtype: list of tuples (string, list of strings)
808
  @return: offending volumes, as tuples: (pv_name, [lv1_name, lv2_name...])
809

810
  """
811
  res = []
812
  for pvi in pvi_list:
813
    if len(pvi.lv_list) > 1:
814
      res.append((pvi.name, pvi.lv_list))
815
  return res
816

    
817

    
818
def _VerifyHypervisors(what, vm_capable, result, all_hvparams,
819
                       get_hv_fn=hypervisor.GetHypervisor):
820
  """Verifies the hypervisor. Appends the results to the 'results' list.
821

822
  @type what: C{dict}
823
  @param what: a dictionary of things to check
824
  @type vm_capable: boolean
825
  @param vm_capable: whether or not this node is vm capable
826
  @type result: dict
827
  @param result: dictionary of verification results; results of the
828
    verifications in this function will be added here
829
  @type all_hvparams: dict of dict of string
830
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
831
  @type get_hv_fn: function
832
  @param get_hv_fn: function to retrieve the hypervisor, to improve testability
833

834
  """
835
  if not vm_capable:
836
    return
837

    
838
  if constants.NV_HYPERVISOR in what:
839
    result[constants.NV_HYPERVISOR] = {}
840
    for hv_name in what[constants.NV_HYPERVISOR]:
841
      hvparams = all_hvparams[hv_name]
842
      try:
843
        val = get_hv_fn(hv_name).Verify(hvparams=hvparams)
844
      except errors.HypervisorError, err:
845
        val = "Error while checking hypervisor: %s" % str(err)
846
      result[constants.NV_HYPERVISOR][hv_name] = val
847

    
848

    
849
def _VerifyHvparams(what, vm_capable, result,
850
                    get_hv_fn=hypervisor.GetHypervisor):
851
  """Verifies the hvparams. Appends the results to the 'results' list.
852

853
  @type what: C{dict}
854
  @param what: a dictionary of things to check
855
  @type vm_capable: boolean
856
  @param vm_capable: whether or not this node is vm capable
857
  @type result: dict
858
  @param result: dictionary of verification results; results of the
859
    verifications in this function will be added here
860
  @type get_hv_fn: function
861
  @param get_hv_fn: function to retrieve the hypervisor, to improve testability
862

863
  """
864
  if not vm_capable:
865
    return
866

    
867
  if constants.NV_HVPARAMS in what:
868
    result[constants.NV_HVPARAMS] = []
869
    for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
870
      try:
871
        logging.info("Validating hv %s, %s", hv_name, hvparms)
872
        get_hv_fn(hv_name).ValidateParameters(hvparms)
873
      except errors.HypervisorError, err:
874
        result[constants.NV_HVPARAMS].append((source, hv_name, str(err)))
875

    
876

    
877
def _VerifyInstanceList(what, vm_capable, result, all_hvparams):
878
  """Verifies the instance list.
879

880
  @type what: C{dict}
881
  @param what: a dictionary of things to check
882
  @type vm_capable: boolean
883
  @param vm_capable: whether or not this node is vm capable
884
  @type result: dict
885
  @param result: dictionary of verification results; results of the
886
    verifications in this function will be added here
887
  @type all_hvparams: dict of dict of string
888
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
889

890
  """
891
  if constants.NV_INSTANCELIST in what and vm_capable:
892
    # GetInstanceList can fail
893
    try:
894
      val = GetInstanceList(what[constants.NV_INSTANCELIST],
895
                            all_hvparams=all_hvparams)
896
    except RPCFail, err:
897
      val = str(err)
898
    result[constants.NV_INSTANCELIST] = val
899

    
900

    
901
def _VerifyNodeInfo(what, vm_capable, result, all_hvparams):
902
  """Verifies the node info.
903

904
  @type what: C{dict}
905
  @param what: a dictionary of things to check
906
  @type vm_capable: boolean
907
  @param vm_capable: whether or not this node is vm capable
908
  @type result: dict
909
  @param result: dictionary of verification results; results of the
910
    verifications in this function will be added here
911
  @type all_hvparams: dict of dict of string
912
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
913

914
  """
915
  if constants.NV_HVINFO in what and vm_capable:
916
    hvname = what[constants.NV_HVINFO]
917
    hyper = hypervisor.GetHypervisor(hvname)
918
    hvparams = all_hvparams[hvname]
919
    result[constants.NV_HVINFO] = hyper.GetNodeInfo(hvparams=hvparams)
920

    
921

    
922
def _VerifyClientCertificate(cert_file=pathutils.NODED_CLIENT_CERT_FILE):
923
  """Verify the existance and validity of the client SSL certificate.
924

925
  """
926
  create_cert_cmd = "gnt-cluster renew-crypto --new-node-certificates"
927
  if not os.path.exists(cert_file):
928
    return (constants.CV_ERROR,
929
            "The client certificate does not exist. Run '%s' to create"
930
            "client certificates for all nodes." % create_cert_cmd)
931

    
932
  (errcode, msg) = utils.VerifyCertificate(cert_file)
933
  if errcode is not None:
934
    return (errcode, msg)
935
  else:
936
    # if everything is fine, we return the digest to be compared to the config
937
    return (None, utils.GetCertificateDigest(cert_filename=cert_file))
938

    
939

    
940
def VerifyNode(what, cluster_name, all_hvparams, node_groups, groups_cfg):
941
  """Verify the status of the local node.
942

943
  Based on the input L{what} parameter, various checks are done on the
944
  local node.
945

946
  If the I{filelist} key is present, this list of
947
  files is checksummed and the file/checksum pairs are returned.
948

949
  If the I{nodelist} key is present, we check that we have
950
  connectivity via ssh with the target nodes (and check the hostname
951
  report).
952

953
  If the I{node-net-test} key is present, we check that we have
954
  connectivity to the given nodes via both primary IP and, if
955
  applicable, secondary IPs.
956

957
  @type what: C{dict}
958
  @param what: a dictionary of things to check:
959
      - filelist: list of files for which to compute checksums
960
      - nodelist: list of nodes we should check ssh communication with
961
      - node-net-test: list of nodes we should check node daemon port
962
        connectivity with
963
      - hypervisor: list with hypervisors to run the verify for
964
  @type cluster_name: string
965
  @param cluster_name: the cluster's name
966
  @type all_hvparams: dict of dict of strings
967
  @param all_hvparams: a dictionary mapping hypervisor names to hvparams
968
  @type node_groups: a dict of strings
969
  @param node_groups: node _names_ mapped to their group uuids (it's enough to
970
      have only those nodes that are in `what["nodelist"]`)
971
  @type groups_cfg: a dict of dict of strings
972
  @param groups_cfg: a dictionary mapping group uuids to their configuration
973
  @rtype: dict
974
  @return: a dictionary with the same keys as the input dict, and
975
      values representing the result of the checks
976

977
  """
978
  result = {}
979
  my_name = netutils.Hostname.GetSysName()
980
  port = netutils.GetDaemonPort(constants.NODED)
981
  vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
982

    
983
  _VerifyHypervisors(what, vm_capable, result, all_hvparams)
984
  _VerifyHvparams(what, vm_capable, result)
985

    
986
  if constants.NV_FILELIST in what:
987
    fingerprints = utils.FingerprintFiles(map(vcluster.LocalizeVirtualPath,
988
                                              what[constants.NV_FILELIST]))
989
    result[constants.NV_FILELIST] = \
990
      dict((vcluster.MakeVirtualPath(key), value)
991
           for (key, value) in fingerprints.items())
992

    
993
  if constants.NV_CLIENT_CERT in what:
994
    result[constants.NV_CLIENT_CERT] = _VerifyClientCertificate()
995

    
996
  if constants.NV_NODELIST in what:
997
    (nodes, bynode) = what[constants.NV_NODELIST]
998

    
999
    # Add nodes from other groups (different for each node)
1000
    try:
1001
      nodes.extend(bynode[my_name])
1002
    except KeyError:
1003
      pass
1004

    
1005
    # Use a random order
1006
    random.shuffle(nodes)
1007

    
1008
    # Try to contact all nodes
1009
    val = {}
1010
    for node in nodes:
1011
      params = groups_cfg.get(node_groups.get(node))
1012
      ssh_port = params["ndparams"].get(constants.ND_SSH_PORT)
1013
      logging.debug("Ssh port %s (None = default) for node %s",
1014
                    str(ssh_port), node)
1015
      success, message = _GetSshRunner(cluster_name). \
1016
                            VerifyNodeHostname(node, ssh_port)
1017
      if not success:
1018
        val[node] = message
1019

    
1020
    result[constants.NV_NODELIST] = val
1021

    
1022
  if constants.NV_NODENETTEST in what:
1023
    result[constants.NV_NODENETTEST] = tmp = {}
1024
    my_pip = my_sip = None
1025
    for name, pip, sip in what[constants.NV_NODENETTEST]:
1026
      if name == my_name:
1027
        my_pip = pip
1028
        my_sip = sip
1029
        break
1030
    if not my_pip:
1031
      tmp[my_name] = ("Can't find my own primary/secondary IP"
1032
                      " in the node list")
1033
    else:
1034
      for name, pip, sip in what[constants.NV_NODENETTEST]:
1035
        fail = []
1036
        if not netutils.TcpPing(pip, port, source=my_pip):
1037
          fail.append("primary")
1038
        if sip != pip:
1039
          if not netutils.TcpPing(sip, port, source=my_sip):
1040
            fail.append("secondary")
1041
        if fail:
1042
          tmp[name] = ("failure using the %s interface(s)" %
1043
                       " and ".join(fail))
1044

    
1045
  if constants.NV_MASTERIP in what:
1046
    # FIXME: add checks on incoming data structures (here and in the
1047
    # rest of the function)
1048
    master_name, master_ip = what[constants.NV_MASTERIP]
1049
    if master_name == my_name:
1050
      source = constants.IP4_ADDRESS_LOCALHOST
1051
    else:
1052
      source = None
1053
    result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
1054
                                                     source=source)
1055

    
1056
  if constants.NV_USERSCRIPTS in what:
1057
    result[constants.NV_USERSCRIPTS] = \
1058
      [script for script in what[constants.NV_USERSCRIPTS]
1059
       if not utils.IsExecutable(script)]
1060

    
1061
  if constants.NV_OOB_PATHS in what:
1062
    result[constants.NV_OOB_PATHS] = tmp = []
1063
    for path in what[constants.NV_OOB_PATHS]:
1064
      try:
1065
        st = os.stat(path)
1066
      except OSError, err:
1067
        tmp.append("error stating out of band helper: %s" % err)
1068
      else:
1069
        if stat.S_ISREG(st.st_mode):
1070
          if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
1071
            tmp.append(None)
1072
          else:
1073
            tmp.append("out of band helper %s is not executable" % path)
1074
        else:
1075
          tmp.append("out of band helper %s is not a file" % path)
1076

    
1077
  if constants.NV_LVLIST in what and vm_capable:
1078
    try:
1079
      val = GetVolumeList(utils.ListVolumeGroups().keys())
1080
    except RPCFail, err:
1081
      val = str(err)
1082
    result[constants.NV_LVLIST] = val
1083

    
1084
  _VerifyInstanceList(what, vm_capable, result, all_hvparams)
1085

    
1086
  if constants.NV_VGLIST in what and vm_capable:
1087
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
1088

    
1089
  if constants.NV_PVLIST in what and vm_capable:
1090
    check_exclusive_pvs = constants.NV_EXCLUSIVEPVS in what
1091
    val = bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
1092
                                       filter_allocatable=False,
1093
                                       include_lvs=check_exclusive_pvs)
1094
    if check_exclusive_pvs:
1095
      result[constants.NV_EXCLUSIVEPVS] = _CheckExclusivePvs(val)
1096
      for pvi in val:
1097
        # Avoid sending useless data on the wire
1098
        pvi.lv_list = []
1099
    result[constants.NV_PVLIST] = map(objects.LvmPvInfo.ToDict, val)
1100

    
1101
  if constants.NV_VERSION in what:
1102
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
1103
                                    constants.RELEASE_VERSION)
1104

    
1105
  _VerifyNodeInfo(what, vm_capable, result, all_hvparams)
1106

    
1107
  if constants.NV_DRBDVERSION in what and vm_capable:
1108
    try:
1109
      drbd_version = DRBD8.GetProcInfo().GetVersionString()
1110
    except errors.BlockDeviceError, err:
1111
      logging.warning("Can't get DRBD version", exc_info=True)
1112
      drbd_version = str(err)
1113
    result[constants.NV_DRBDVERSION] = drbd_version
1114

    
1115
  if constants.NV_DRBDLIST in what and vm_capable:
1116
    try:
1117
      used_minors = drbd.DRBD8.GetUsedDevs()
1118
    except errors.BlockDeviceError, err:
1119
      logging.warning("Can't get used minors list", exc_info=True)
1120
      used_minors = str(err)
1121
    result[constants.NV_DRBDLIST] = used_minors
1122

    
1123
  if constants.NV_DRBDHELPER in what and vm_capable:
1124
    status = True
1125
    try:
1126
      payload = drbd.DRBD8.GetUsermodeHelper()
1127
    except errors.BlockDeviceError, err:
1128
      logging.error("Can't get DRBD usermode helper: %s", str(err))
1129
      status = False
1130
      payload = str(err)
1131
    result[constants.NV_DRBDHELPER] = (status, payload)
1132

    
1133
  if constants.NV_NODESETUP in what:
1134
    result[constants.NV_NODESETUP] = tmpr = []
1135
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
1136
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
1137
                  " under /sys, missing required directories /sys/block"
1138
                  " and /sys/class/net")
1139
    if (not os.path.isdir("/proc/sys") or
1140
        not os.path.isfile("/proc/sysrq-trigger")):
1141
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
1142
                  " under /proc, missing required directory /proc/sys and"
1143
                  " the file /proc/sysrq-trigger")
1144

    
1145
  if constants.NV_TIME in what:
1146
    result[constants.NV_TIME] = utils.SplitTime(time.time())
1147

    
1148
  if constants.NV_OSLIST in what and vm_capable:
1149
    result[constants.NV_OSLIST] = DiagnoseOS()
1150

    
1151
  if constants.NV_BRIDGES in what and vm_capable:
1152
    result[constants.NV_BRIDGES] = [bridge
1153
                                    for bridge in what[constants.NV_BRIDGES]
1154
                                    if not utils.BridgeExists(bridge)]
1155

    
1156
  if what.get(constants.NV_ACCEPTED_STORAGE_PATHS) == my_name:
1157
    result[constants.NV_ACCEPTED_STORAGE_PATHS] = \
1158
        filestorage.ComputeWrongFileStoragePaths()
1159

    
1160
  if what.get(constants.NV_FILE_STORAGE_PATH):
1161
    pathresult = filestorage.CheckFileStoragePath(
1162
        what[constants.NV_FILE_STORAGE_PATH])
1163
    if pathresult:
1164
      result[constants.NV_FILE_STORAGE_PATH] = pathresult
1165

    
1166
  if what.get(constants.NV_SHARED_FILE_STORAGE_PATH):
1167
    pathresult = filestorage.CheckFileStoragePath(
1168
        what[constants.NV_SHARED_FILE_STORAGE_PATH])
1169
    if pathresult:
1170
      result[constants.NV_SHARED_FILE_STORAGE_PATH] = pathresult
1171

    
1172
  return result
1173

    
1174

    
1175
def GetCryptoTokens(token_requests):
1176
  """Perform actions on the node's cryptographic tokens.
1177

1178
  Token types can be 'ssl' or 'ssh'. So far only some actions are implemented
1179
  for 'ssl'. Action 'get' returns the digest of the public client ssl
1180
  certificate. Action 'create' creates a new client certificate and private key
1181
  and also returns the digest of the certificate. The third parameter of a
1182
  token request are optional parameters for the actions, so far only the
1183
  filename is supported.
1184

1185
  @type token_requests: list of tuples of (string, string, dict), where the
1186
    first string is in constants.CRYPTO_TYPES, the second in
1187
    constants.CRYPTO_ACTIONS. The third parameter is a dictionary of string
1188
    to string.
1189
  @param token_requests: list of requests of cryptographic tokens and actions
1190
    to perform on them. The actions come with a dictionary of options.
1191
  @rtype: list of tuples (string, string)
1192
  @return: list of tuples of the token type and the public crypto token
1193

1194
  """
1195
  _VALID_CERT_FILES = [pathutils.NODED_CERT_FILE,
1196
                       pathutils.NODED_CLIENT_CERT_FILE,
1197
                       pathutils.NODED_CLIENT_CERT_FILE_TMP]
1198
  _DEFAULT_CERT_FILE = pathutils.NODED_CLIENT_CERT_FILE
1199
  tokens = []
1200
  for (token_type, action, options) in token_requests:
1201
    if token_type not in constants.CRYPTO_TYPES:
1202
      raise errors.ProgrammerError("Token type '%s' not supported." %
1203
                                   token_type)
1204
    if action not in constants.CRYPTO_ACTIONS:
1205
      raise errors.ProgrammerError("Action '%s' is not supported." %
1206
                                   action)
1207
    if token_type == constants.CRYPTO_TYPE_SSL_DIGEST:
1208
      if action == constants.CRYPTO_ACTION_CREATE:
1209

    
1210
        # extract file name from options
1211
        cert_filename = None
1212
        if options:
1213
          cert_filename = options.get(constants.CRYPTO_OPTION_CERT_FILE)
1214
        if not cert_filename:
1215
          cert_filename = _DEFAULT_CERT_FILE
1216
        # For security reason, we don't allow arbitrary filenames
1217
        if not cert_filename in _VALID_CERT_FILES:
1218
          raise errors.ProgrammerError(
1219
            "The certificate file name path '%s' is not allowed." %
1220
            cert_filename)
1221

    
1222
        # extract serial number from options
1223
        serial_no = None
1224
        if options:
1225
          try:
1226
            serial_no = int(options[constants.CRYPTO_OPTION_SERIAL_NO])
1227
          except ValueError:
1228
            raise errors.ProgrammerError(
1229
              "The given serial number is not an intenger: %s." %
1230
              options.get(constants.CRYPTO_OPTION_SERIAL_NO))
1231
          except KeyError:
1232
            raise errors.ProgrammerError("No serial number was provided.")
1233

    
1234
        if not serial_no:
1235
          raise errors.ProgrammerError(
1236
            "Cannot create an SSL certificate without a serial no.")
1237

    
1238
        utils.GenerateNewSslCert(
1239
          True, cert_filename, serial_no,
1240
          "Create new client SSL certificate in %s." % cert_filename)
1241
        tokens.append((token_type,
1242
                       utils.GetCertificateDigest(
1243
                         cert_filename=cert_filename)))
1244
      elif action == constants.CRYPTO_ACTION_GET:
1245
        tokens.append((token_type,
1246
                       utils.GetCertificateDigest()))
1247
  return tokens
1248

    
1249

    
1250
def GetBlockDevSizes(devices):
1251
  """Return the size of the given block devices
1252

1253
  @type devices: list
1254
  @param devices: list of block device nodes to query
1255
  @rtype: dict
1256
  @return:
1257
    dictionary of all block devices under /dev (key). The value is their
1258
    size in MiB.
1259

1260
    {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
1261

1262
  """
1263
  DEV_PREFIX = "/dev/"
1264
  blockdevs = {}
1265

    
1266
  for devpath in devices:
1267
    if not utils.IsBelowDir(DEV_PREFIX, devpath):
1268
      continue
1269

    
1270
    try:
1271
      st = os.stat(devpath)
1272
    except EnvironmentError, err:
1273
      logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
1274
      continue
1275

    
1276
    if stat.S_ISBLK(st.st_mode):
1277
      result = utils.RunCmd(["blockdev", "--getsize64", devpath])
1278
      if result.failed:
1279
        # We don't want to fail, just do not list this device as available
1280
        logging.warning("Cannot get size for block device %s", devpath)
1281
        continue
1282

    
1283
      size = int(result.stdout) / (1024 * 1024)
1284
      blockdevs[devpath] = size
1285
  return blockdevs
1286

    
1287

    
1288
def GetVolumeList(vg_names):
1289
  """Compute list of logical volumes and their size.
1290

1291
  @type vg_names: list
1292
  @param vg_names: the volume groups whose LVs we should list, or
1293
      empty for all volume groups
1294
  @rtype: dict
1295
  @return:
1296
      dictionary of all partions (key) with value being a tuple of
1297
      their size (in MiB), inactive and online status::
1298

1299
        {'xenvg/test1': ('20.06', True, True)}
1300

1301
      in case of errors, a string is returned with the error
1302
      details.
1303

1304
  """
1305
  lvs = {}
1306
  sep = "|"
1307
  if not vg_names:
1308
    vg_names = []
1309
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
1310
                         "--separator=%s" % sep,
1311
                         "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
1312
  if result.failed:
1313
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
1314

    
1315
  for line in result.stdout.splitlines():
1316
    line = line.strip()
1317
    match = _LVSLINE_REGEX.match(line)
1318
    if not match:
1319
      logging.error("Invalid line returned from lvs output: '%s'", line)
1320
      continue
1321
    vg_name, name, size, attr = match.groups()
1322
    inactive = attr[4] == "-"
1323
    online = attr[5] == "o"
1324
    virtual = attr[0] == "v"
1325
    if virtual:
1326
      # we don't want to report such volumes as existing, since they
1327
      # don't really hold data
1328
      continue
1329
    lvs[vg_name + "/" + name] = (size, inactive, online)
1330

    
1331
  return lvs
1332

    
1333

    
1334
def ListVolumeGroups():
1335
  """List the volume groups and their size.
1336

1337
  @rtype: dict
1338
  @return: dictionary with keys volume name and values the
1339
      size of the volume
1340

1341
  """
1342
  return utils.ListVolumeGroups()
1343

    
1344

    
1345
def NodeVolumes():
1346
  """List all volumes on this node.
1347

1348
  @rtype: list
1349
  @return:
1350
    A list of dictionaries, each having four keys:
1351
      - name: the logical volume name,
1352
      - size: the size of the logical volume
1353
      - dev: the physical device on which the LV lives
1354
      - vg: the volume group to which it belongs
1355

1356
    In case of errors, we return an empty list and log the
1357
    error.
1358

1359
    Note that since a logical volume can live on multiple physical
1360
    volumes, the resulting list might include a logical volume
1361
    multiple times.
1362

1363
  """
1364
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
1365
                         "--separator=|",
1366
                         "--options=lv_name,lv_size,devices,vg_name"])
1367
  if result.failed:
1368
    _Fail("Failed to list logical volumes, lvs output: %s",
1369
          result.output)
1370

    
1371
  def parse_dev(dev):
1372
    return dev.split("(")[0]
1373

    
1374
  def handle_dev(dev):
1375
    return [parse_dev(x) for x in dev.split(",")]
1376

    
1377
  def map_line(line):
1378
    line = [v.strip() for v in line]
1379
    return [{"name": line[0], "size": line[1],
1380
             "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
1381

    
1382
  all_devs = []
1383
  for line in result.stdout.splitlines():
1384
    if line.count("|") >= 3:
1385
      all_devs.extend(map_line(line.split("|")))
1386
    else:
1387
      logging.warning("Strange line in the output from lvs: '%s'", line)
1388
  return all_devs
1389

    
1390

    
1391
def BridgesExist(bridges_list):
1392
  """Check if a list of bridges exist on the current node.
1393

1394
  @rtype: boolean
1395
  @return: C{True} if all of them exist, C{False} otherwise
1396

1397
  """
1398
  missing = []
1399
  for bridge in bridges_list:
1400
    if not utils.BridgeExists(bridge):
1401
      missing.append(bridge)
1402

    
1403
  if missing:
1404
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
1405

    
1406

    
1407
def GetInstanceListForHypervisor(hname, hvparams=None,
1408
                                 get_hv_fn=hypervisor.GetHypervisor):
1409
  """Provides a list of instances of the given hypervisor.
1410

1411
  @type hname: string
1412
  @param hname: name of the hypervisor
1413
  @type hvparams: dict of strings
1414
  @param hvparams: hypervisor parameters for the given hypervisor
1415
  @type get_hv_fn: function
1416
  @param get_hv_fn: function that returns a hypervisor for the given hypervisor
1417
    name; optional parameter to increase testability
1418

1419
  @rtype: list
1420
  @return: a list of all running instances on the current node
1421
    - instance1.example.com
1422
    - instance2.example.com
1423

1424
  """
1425
  results = []
1426
  try:
1427
    hv = get_hv_fn(hname)
1428
    names = hv.ListInstances(hvparams=hvparams)
1429
    results.extend(names)
1430
  except errors.HypervisorError, err:
1431
    _Fail("Error enumerating instances (hypervisor %s): %s",
1432
          hname, err, exc=True)
1433
  return results
1434

    
1435

    
1436
def GetInstanceList(hypervisor_list, all_hvparams=None,
1437
                    get_hv_fn=hypervisor.GetHypervisor):
1438
  """Provides a list of instances.
1439

1440
  @type hypervisor_list: list
1441
  @param hypervisor_list: the list of hypervisors to query information
1442
  @type all_hvparams: dict of dict of strings
1443
  @param all_hvparams: a dictionary mapping hypervisor types to respective
1444
    cluster-wide hypervisor parameters
1445
  @type get_hv_fn: function
1446
  @param get_hv_fn: function that returns a hypervisor for the given hypervisor
1447
    name; optional parameter to increase testability
1448

1449
  @rtype: list
1450
  @return: a list of all running instances on the current node
1451
    - instance1.example.com
1452
    - instance2.example.com
1453

1454
  """
1455
  results = []
1456
  for hname in hypervisor_list:
1457
    hvparams = all_hvparams[hname]
1458
    results.extend(GetInstanceListForHypervisor(hname, hvparams=hvparams,
1459
                                                get_hv_fn=get_hv_fn))
1460
  return results
1461

    
1462

    
1463
def GetInstanceInfo(instance, hname, hvparams=None):
1464
  """Gives back the information about an instance as a dictionary.
1465

1466
  @type instance: string
1467
  @param instance: the instance name
1468
  @type hname: string
1469
  @param hname: the hypervisor type of the instance
1470
  @type hvparams: dict of strings
1471
  @param hvparams: the instance's hvparams
1472

1473
  @rtype: dict
1474
  @return: dictionary with the following keys:
1475
      - memory: memory size of instance (int)
1476
      - state: state of instance (HvInstanceState)
1477
      - time: cpu time of instance (float)
1478
      - vcpus: the number of vcpus (int)
1479

1480
  """
1481
  output = {}
1482

    
1483
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance,
1484
                                                          hvparams=hvparams)
1485
  if iinfo is not None:
1486
    output["memory"] = iinfo[2]
1487
    output["vcpus"] = iinfo[3]
1488
    output["state"] = iinfo[4]
1489
    output["time"] = iinfo[5]
1490

    
1491
  return output
1492

    
1493

    
1494
def GetInstanceMigratable(instance):
1495
  """Computes whether an instance can be migrated.
1496

1497
  @type instance: L{objects.Instance}
1498
  @param instance: object representing the instance to be checked.
1499

1500
  @rtype: tuple
1501
  @return: tuple of (result, description) where:
1502
      - result: whether the instance can be migrated or not
1503
      - description: a description of the issue, if relevant
1504

1505
  """
1506
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1507
  iname = instance.name
1508
  if iname not in hyper.ListInstances(instance.hvparams):
1509
    _Fail("Instance %s is not running", iname)
1510

    
1511
  for idx in range(len(instance.disks)):
1512
    link_name = _GetBlockDevSymlinkPath(iname, idx)
1513
    if not os.path.islink(link_name):
1514
      logging.warning("Instance %s is missing symlink %s for disk %d",
1515
                      iname, link_name, idx)
1516

    
1517

    
1518
def GetAllInstancesInfo(hypervisor_list, all_hvparams):
1519
  """Gather data about all instances.
1520

1521
  This is the equivalent of L{GetInstanceInfo}, except that it
1522
  computes data for all instances at once, thus being faster if one
1523
  needs data about more than one instance.
1524

1525
  @type hypervisor_list: list
1526
  @param hypervisor_list: list of hypervisors to query for instance data
1527
  @type all_hvparams: dict of dict of strings
1528
  @param all_hvparams: mapping of hypervisor names to hvparams
1529

1530
  @rtype: dict
1531
  @return: dictionary of instance: data, with data having the following keys:
1532
      - memory: memory size of instance (int)
1533
      - state: xen state of instance (string)
1534
      - time: cpu time of instance (float)
1535
      - vcpus: the number of vcpus
1536

1537
  """
1538
  output = {}
1539
  for hname in hypervisor_list:
1540
    hvparams = all_hvparams[hname]
1541
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo(hvparams)
1542
    if iinfo:
1543
      for name, _, memory, vcpus, state, times in iinfo:
1544
        value = {
1545
          "memory": memory,
1546
          "vcpus": vcpus,
1547
          "state": state,
1548
          "time": times,
1549
          }
1550
        if name in output:
1551
          # we only check static parameters, like memory and vcpus,
1552
          # and not state and time which can change between the
1553
          # invocations of the different hypervisors
1554
          for key in "memory", "vcpus":
1555
            if value[key] != output[name][key]:
1556
              _Fail("Instance %s is running twice"
1557
                    " with different parameters", name)
1558
        output[name] = value
1559

    
1560
  return output
1561

    
1562

    
1563
def GetInstanceConsoleInfo(instance_param_dict,
1564
                           get_hv_fn=hypervisor.GetHypervisor):
1565
  """Gather data about the console access of a set of instances of this node.
1566

1567
  This function assumes that the caller already knows which instances are on
1568
  this node, by calling a function such as L{GetAllInstancesInfo} or
1569
  L{GetInstanceList}.
1570

1571
  For every instance, a large amount of configuration data needs to be
1572
  provided to the hypervisor interface in order to receive the console
1573
  information. Whether this could or should be cut down can be discussed.
1574
  The information is provided in a dictionary indexed by instance name,
1575
  allowing any number of instance queries to be done.
1576

1577
  @type instance_param_dict: dict of string to tuple of dictionaries, where the
1578
    dictionaries represent: L{objects.Instance}, L{objects.Node},
1579
    L{objects.NodeGroup}, HvParams, BeParams
1580
  @param instance_param_dict: mapping of instance name to parameters necessary
1581
    for console information retrieval
1582

1583
  @rtype: dict
1584
  @return: dictionary of instance: data, with data having the following keys:
1585
      - instance: instance name
1586
      - kind: console kind
1587
      - message: used with kind == CONS_MESSAGE, indicates console to be
1588
                 unavailable, supplies error message
1589
      - host: host to connect to
1590
      - port: port to use
1591
      - user: user for login
1592
      - command: the command, broken into parts as an array
1593
      - display: unknown, potentially unused?
1594

1595
  """
1596

    
1597
  output = {}
1598
  for inst_name in instance_param_dict:
1599
    instance = instance_param_dict[inst_name]["instance"]
1600
    pnode = instance_param_dict[inst_name]["node"]
1601
    group = instance_param_dict[inst_name]["group"]
1602
    hvparams = instance_param_dict[inst_name]["hvParams"]
1603
    beparams = instance_param_dict[inst_name]["beParams"]
1604

    
1605
    instance = objects.Instance.FromDict(instance)
1606
    pnode = objects.Node.FromDict(pnode)
1607
    group = objects.NodeGroup.FromDict(group)
1608

    
1609
    h = get_hv_fn(instance.hypervisor)
1610
    output[inst_name] = h.GetInstanceConsole(instance, pnode, group,
1611
                                             hvparams, beparams).ToDict()
1612

    
1613
  return output
1614

    
1615

    
1616
def _InstanceLogName(kind, os_name, instance, component):
1617
  """Compute the OS log filename for a given instance and operation.
1618

1619
  The instance name and os name are passed in as strings since not all
1620
  operations have these as part of an instance object.
1621

1622
  @type kind: string
1623
  @param kind: the operation type (e.g. add, import, etc.)
1624
  @type os_name: string
1625
  @param os_name: the os name
1626
  @type instance: string
1627
  @param instance: the name of the instance being imported/added/etc.
1628
  @type component: string or None
1629
  @param component: the name of the component of the instance being
1630
      transferred
1631

1632
  """
1633
  # TODO: Use tempfile.mkstemp to create unique filename
1634
  if component:
1635
    assert "/" not in component
1636
    c_msg = "-%s" % component
1637
  else:
1638
    c_msg = ""
1639
  base = ("%s-%s-%s%s-%s.log" %
1640
          (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1641
  return utils.PathJoin(pathutils.LOG_OS_DIR, base)
1642

    
1643

    
1644
def InstanceOsAdd(instance, reinstall, debug):
1645
  """Add an OS to an instance.
1646

1647
  @type instance: L{objects.Instance}
1648
  @param instance: Instance whose OS is to be installed
1649
  @type reinstall: boolean
1650
  @param reinstall: whether this is an instance reinstall
1651
  @type debug: integer
1652
  @param debug: debug level, passed to the OS scripts
1653
  @rtype: None
1654

1655
  """
1656
  inst_os = OSFromDisk(instance.os)
1657

    
1658
  create_env = OSEnvironment(instance, inst_os, debug)
1659
  if reinstall:
1660
    create_env["INSTANCE_REINSTALL"] = "1"
1661

    
1662
  logfile = _InstanceLogName("add", instance.os, instance.name, None)
1663

    
1664
  result = utils.RunCmd([inst_os.create_script], env=create_env,
1665
                        cwd=inst_os.path, output=logfile, reset_env=True)
1666
  if result.failed:
1667
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
1668
                  " output: %s", result.cmd, result.fail_reason, logfile,
1669
                  result.output)
1670
    lines = [utils.SafeEncode(val)
1671
             for val in utils.TailFile(logfile, lines=20)]
1672
    _Fail("OS create script failed (%s), last lines in the"
1673
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1674

    
1675

    
1676
def RunRenameInstance(instance, old_name, debug):
1677
  """Run the OS rename script for an instance.
1678

1679
  @type instance: L{objects.Instance}
1680
  @param instance: Instance whose OS is to be installed
1681
  @type old_name: string
1682
  @param old_name: previous instance name
1683
  @type debug: integer
1684
  @param debug: debug level, passed to the OS scripts
1685
  @rtype: boolean
1686
  @return: the success of the operation
1687

1688
  """
1689
  inst_os = OSFromDisk(instance.os)
1690

    
1691
  rename_env = OSEnvironment(instance, inst_os, debug)
1692
  rename_env["OLD_INSTANCE_NAME"] = old_name
1693

    
1694
  logfile = _InstanceLogName("rename", instance.os,
1695
                             "%s-%s" % (old_name, instance.name), None)
1696

    
1697
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1698
                        cwd=inst_os.path, output=logfile, reset_env=True)
1699

    
1700
  if result.failed:
1701
    logging.error("os create command '%s' returned error: %s output: %s",
1702
                  result.cmd, result.fail_reason, result.output)
1703
    lines = [utils.SafeEncode(val)
1704
             for val in utils.TailFile(logfile, lines=20)]
1705
    _Fail("OS rename script failed (%s), last lines in the"
1706
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1707

    
1708

    
1709
def _GetBlockDevSymlinkPath(instance_name, idx, _dir=None):
1710
  """Returns symlink path for block device.
1711

1712
  """
1713
  if _dir is None:
1714
    _dir = pathutils.DISK_LINKS_DIR
1715

    
1716
  return utils.PathJoin(_dir,
1717
                        ("%s%s%s" %
1718
                         (instance_name, constants.DISK_SEPARATOR, idx)))
1719

    
1720

    
1721
def _SymlinkBlockDev(instance_name, device_path, idx):
1722
  """Set up symlinks to a instance's block device.
1723

1724
  This is an auxiliary function run when an instance is start (on the primary
1725
  node) or when an instance is migrated (on the target node).
1726

1727

1728
  @param instance_name: the name of the target instance
1729
  @param device_path: path of the physical block device, on the node
1730
  @param idx: the disk index
1731
  @return: absolute path to the disk's symlink
1732

1733
  """
1734
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1735
  try:
1736
    os.symlink(device_path, link_name)
1737
  except OSError, err:
1738
    if err.errno == errno.EEXIST:
1739
      if (not os.path.islink(link_name) or
1740
          os.readlink(link_name) != device_path):
1741
        os.remove(link_name)
1742
        os.symlink(device_path, link_name)
1743
    else:
1744
      raise
1745

    
1746
  return link_name
1747

    
1748

    
1749
def _RemoveBlockDevLinks(instance_name, disks):
1750
  """Remove the block device symlinks belonging to the given instance.
1751

1752
  """
1753
  for idx, _ in enumerate(disks):
1754
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1755
    if os.path.islink(link_name):
1756
      try:
1757
        os.remove(link_name)
1758
      except OSError:
1759
        logging.exception("Can't remove symlink '%s'", link_name)
1760

    
1761

    
1762
def _CalculateDeviceURI(instance, disk, device):
1763
  """Get the URI for the device.
1764

1765
  @type instance: L{objects.Instance}
1766
  @param instance: the instance which disk belongs to
1767
  @type disk: L{objects.Disk}
1768
  @param disk: the target disk object
1769
  @type device: L{bdev.BlockDev}
1770
  @param device: the corresponding BlockDevice
1771
  @rtype: string
1772
  @return: the device uri if any else None
1773

1774
  """
1775
  access_mode = disk.params.get(constants.LDP_ACCESS,
1776
                                constants.DISK_KERNELSPACE)
1777
  if access_mode == constants.DISK_USERSPACE:
1778
    # This can raise errors.BlockDeviceError
1779
    return device.GetUserspaceAccessUri(instance.hypervisor)
1780
  else:
1781
    return None
1782

    
1783

    
1784
def _GatherAndLinkBlockDevs(instance):
1785
  """Set up an instance's block device(s).
1786

1787
  This is run on the primary node at instance startup. The block
1788
  devices must be already assembled.
1789

1790
  @type instance: L{objects.Instance}
1791
  @param instance: the instance whose disks we should assemble
1792
  @rtype: list
1793
  @return: list of (disk_object, link_name, drive_uri)
1794

1795
  """
1796
  block_devices = []
1797
  for idx, disk in enumerate(instance.disks):
1798
    device = _RecursiveFindBD(disk)
1799
    if device is None:
1800
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1801
                                    str(disk))
1802
    device.Open()
1803
    try:
1804
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1805
    except OSError, e:
1806
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1807
                                    e.strerror)
1808
    uri = _CalculateDeviceURI(instance, disk, device)
1809

    
1810
    block_devices.append((disk, link_name, uri))
1811

    
1812
  return block_devices
1813

    
1814

    
1815
def StartInstance(instance, startup_paused, reason, store_reason=True):
1816
  """Start an instance.
1817

1818
  @type instance: L{objects.Instance}
1819
  @param instance: the instance object
1820
  @type startup_paused: bool
1821
  @param instance: pause instance at startup?
1822
  @type reason: list of reasons
1823
  @param reason: the reason trail for this startup
1824
  @type store_reason: boolean
1825
  @param store_reason: whether to store the shutdown reason trail on file
1826
  @rtype: None
1827

1828
  """
1829
  running_instances = GetInstanceListForHypervisor(instance.hypervisor,
1830
                                                   instance.hvparams)
1831

    
1832
  if instance.name in running_instances:
1833
    logging.info("Instance %s already running, not starting", instance.name)
1834
    return
1835

    
1836
  try:
1837
    block_devices = _GatherAndLinkBlockDevs(instance)
1838
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1839
    hyper.StartInstance(instance, block_devices, startup_paused)
1840
    if store_reason:
1841
      _StoreInstReasonTrail(instance.name, reason)
1842
  except errors.BlockDeviceError, err:
1843
    _Fail("Block device error: %s", err, exc=True)
1844
  except errors.HypervisorError, err:
1845
    _RemoveBlockDevLinks(instance.name, instance.disks)
1846
    _Fail("Hypervisor error: %s", err, exc=True)
1847

    
1848

    
1849
def InstanceShutdown(instance, timeout, reason, store_reason=True):
1850
  """Shut an instance down.
1851

1852
  @note: this functions uses polling with a hardcoded timeout.
1853

1854
  @type instance: L{objects.Instance}
1855
  @param instance: the instance object
1856
  @type timeout: integer
1857
  @param timeout: maximum timeout for soft shutdown
1858
  @type reason: list of reasons
1859
  @param reason: the reason trail for this shutdown
1860
  @type store_reason: boolean
1861
  @param store_reason: whether to store the shutdown reason trail on file
1862
  @rtype: None
1863

1864
  """
1865
  hv_name = instance.hypervisor
1866
  hyper = hypervisor.GetHypervisor(hv_name)
1867
  iname = instance.name
1868

    
1869
  if instance.name not in hyper.ListInstances(instance.hvparams):
1870
    logging.info("Instance %s not running, doing nothing", iname)
1871
    return
1872

    
1873
  class _TryShutdown:
1874
    def __init__(self):
1875
      self.tried_once = False
1876

    
1877
    def __call__(self):
1878
      if iname not in hyper.ListInstances(instance.hvparams):
1879
        return
1880

    
1881
      try:
1882
        hyper.StopInstance(instance, retry=self.tried_once)
1883
        if store_reason:
1884
          _StoreInstReasonTrail(instance.name, reason)
1885
      except errors.HypervisorError, err:
1886
        if iname not in hyper.ListInstances(instance.hvparams):
1887
          # if the instance is no longer existing, consider this a
1888
          # success and go to cleanup
1889
          return
1890

    
1891
        _Fail("Failed to stop instance %s: %s", iname, err)
1892

    
1893
      self.tried_once = True
1894

    
1895
      raise utils.RetryAgain()
1896

    
1897
  try:
1898
    utils.Retry(_TryShutdown(), 5, timeout)
1899
  except utils.RetryTimeout:
1900
    # the shutdown did not succeed
1901
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1902

    
1903
    try:
1904
      hyper.StopInstance(instance, force=True)
1905
    except errors.HypervisorError, err:
1906
      if iname in hyper.ListInstances(instance.hvparams):
1907
        # only raise an error if the instance still exists, otherwise
1908
        # the error could simply be "instance ... unknown"!
1909
        _Fail("Failed to force stop instance %s: %s", iname, err)
1910

    
1911
    time.sleep(1)
1912

    
1913
    if iname in hyper.ListInstances(instance.hvparams):
1914
      _Fail("Could not shutdown instance %s even by destroy", iname)
1915

    
1916
  try:
1917
    hyper.CleanupInstance(instance.name)
1918
  except errors.HypervisorError, err:
1919
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1920

    
1921
  _RemoveBlockDevLinks(iname, instance.disks)
1922

    
1923

    
1924
def InstanceReboot(instance, reboot_type, shutdown_timeout, reason):
1925
  """Reboot an instance.
1926

1927
  @type instance: L{objects.Instance}
1928
  @param instance: the instance object to reboot
1929
  @type reboot_type: str
1930
  @param reboot_type: the type of reboot, one the following
1931
    constants:
1932
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1933
        instance OS, do not recreate the VM
1934
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1935
        restart the VM (at the hypervisor level)
1936
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1937
        not accepted here, since that mode is handled differently, in
1938
        cmdlib, and translates into full stop and start of the
1939
        instance (instead of a call_instance_reboot RPC)
1940
  @type shutdown_timeout: integer
1941
  @param shutdown_timeout: maximum timeout for soft shutdown
1942
  @type reason: list of reasons
1943
  @param reason: the reason trail for this reboot
1944
  @rtype: None
1945

1946
  """
1947
  running_instances = GetInstanceListForHypervisor(instance.hypervisor,
1948
                                                   instance.hvparams)
1949

    
1950
  if instance.name not in running_instances:
1951
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1952

    
1953
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1954
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1955
    try:
1956
      hyper.RebootInstance(instance)
1957
    except errors.HypervisorError, err:
1958
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1959
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1960
    try:
1961
      InstanceShutdown(instance, shutdown_timeout, reason, store_reason=False)
1962
      result = StartInstance(instance, False, reason, store_reason=False)
1963
      _StoreInstReasonTrail(instance.name, reason)
1964
      return result
1965
    except errors.HypervisorError, err:
1966
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1967
  else:
1968
    _Fail("Invalid reboot_type received: %s", reboot_type)
1969

    
1970

    
1971
def InstanceBalloonMemory(instance, memory):
1972
  """Resize an instance's memory.
1973

1974
  @type instance: L{objects.Instance}
1975
  @param instance: the instance object
1976
  @type memory: int
1977
  @param memory: new memory amount in MB
1978
  @rtype: None
1979

1980
  """
1981
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1982
  running = hyper.ListInstances(instance.hvparams)
1983
  if instance.name not in running:
1984
    logging.info("Instance %s is not running, cannot balloon", instance.name)
1985
    return
1986
  try:
1987
    hyper.BalloonInstanceMemory(instance, memory)
1988
  except errors.HypervisorError, err:
1989
    _Fail("Failed to balloon instance memory: %s", err, exc=True)
1990

    
1991

    
1992
def MigrationInfo(instance):
1993
  """Gather information about an instance to be migrated.
1994

1995
  @type instance: L{objects.Instance}
1996
  @param instance: the instance definition
1997

1998
  """
1999
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2000
  try:
2001
    info = hyper.MigrationInfo(instance)
2002
  except errors.HypervisorError, err:
2003
    _Fail("Failed to fetch migration information: %s", err, exc=True)
2004
  return info
2005

    
2006

    
2007
def AcceptInstance(instance, info, target):
2008
  """Prepare the node to accept an instance.
2009

2010
  @type instance: L{objects.Instance}
2011
  @param instance: the instance definition
2012
  @type info: string/data (opaque)
2013
  @param info: migration information, from the source node
2014
  @type target: string
2015
  @param target: target host (usually ip), on this node
2016

2017
  """
2018
  # TODO: why is this required only for DTS_EXT_MIRROR?
2019
  if instance.disk_template in constants.DTS_EXT_MIRROR:
2020
    # Create the symlinks, as the disks are not active
2021
    # in any way
2022
    try:
2023
      _GatherAndLinkBlockDevs(instance)
2024
    except errors.BlockDeviceError, err:
2025
      _Fail("Block device error: %s", err, exc=True)
2026

    
2027
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2028
  try:
2029
    hyper.AcceptInstance(instance, info, target)
2030
  except errors.HypervisorError, err:
2031
    if instance.disk_template in constants.DTS_EXT_MIRROR:
2032
      _RemoveBlockDevLinks(instance.name, instance.disks)
2033
    _Fail("Failed to accept instance: %s", err, exc=True)
2034

    
2035

    
2036
def FinalizeMigrationDst(instance, info, success):
2037
  """Finalize any preparation to accept an instance.
2038

2039
  @type instance: L{objects.Instance}
2040
  @param instance: the instance definition
2041
  @type info: string/data (opaque)
2042
  @param info: migration information, from the source node
2043
  @type success: boolean
2044
  @param success: whether the migration was a success or a failure
2045

2046
  """
2047
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2048
  try:
2049
    hyper.FinalizeMigrationDst(instance, info, success)
2050
  except errors.HypervisorError, err:
2051
    _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
2052

    
2053

    
2054
def MigrateInstance(cluster_name, instance, target, live):
2055
  """Migrates an instance to another node.
2056

2057
  @type cluster_name: string
2058
  @param cluster_name: name of the cluster
2059
  @type instance: L{objects.Instance}
2060
  @param instance: the instance definition
2061
  @type target: string
2062
  @param target: the target node name
2063
  @type live: boolean
2064
  @param live: whether the migration should be done live or not (the
2065
      interpretation of this parameter is left to the hypervisor)
2066
  @raise RPCFail: if migration fails for some reason
2067

2068
  """
2069
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2070

    
2071
  try:
2072
    hyper.MigrateInstance(cluster_name, instance, target, live)
2073
  except errors.HypervisorError, err:
2074
    _Fail("Failed to migrate instance: %s", err, exc=True)
2075

    
2076

    
2077
def FinalizeMigrationSource(instance, success, live):
2078
  """Finalize the instance migration on the source node.
2079

2080
  @type instance: L{objects.Instance}
2081
  @param instance: the instance definition of the migrated instance
2082
  @type success: bool
2083
  @param success: whether the migration succeeded or not
2084
  @type live: bool
2085
  @param live: whether the user requested a live migration or not
2086
  @raise RPCFail: If the execution fails for some reason
2087

2088
  """
2089
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2090

    
2091
  try:
2092
    hyper.FinalizeMigrationSource(instance, success, live)
2093
  except Exception, err:  # pylint: disable=W0703
2094
    _Fail("Failed to finalize the migration on the source node: %s", err,
2095
          exc=True)
2096

    
2097

    
2098
def GetMigrationStatus(instance):
2099
  """Get the migration status
2100

2101
  @type instance: L{objects.Instance}
2102
  @param instance: the instance that is being migrated
2103
  @rtype: L{objects.MigrationStatus}
2104
  @return: the status of the current migration (one of
2105
           L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
2106
           progress info that can be retrieved from the hypervisor
2107
  @raise RPCFail: If the migration status cannot be retrieved
2108

2109
  """
2110
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2111
  try:
2112
    return hyper.GetMigrationStatus(instance)
2113
  except Exception, err:  # pylint: disable=W0703
2114
    _Fail("Failed to get migration status: %s", err, exc=True)
2115

    
2116

    
2117
def HotplugDevice(instance, action, dev_type, device, extra, seq):
2118
  """Hotplug a device
2119

2120
  Hotplug is currently supported only for KVM Hypervisor.
2121
  @type instance: L{objects.Instance}
2122
  @param instance: the instance to which we hotplug a device
2123
  @type action: string
2124
  @param action: the hotplug action to perform
2125
  @type dev_type: string
2126
  @param dev_type: the device type to hotplug
2127
  @type device: either L{objects.NIC} or L{objects.Disk}
2128
  @param device: the device object to hotplug
2129
  @type extra: string
2130
  @param extra: extra info used by hotplug code (e.g. disk link)
2131
  @type seq: int
2132
  @param seq: the index of the device from master perspective
2133
  @raise RPCFail: in case instance does not have KVM hypervisor
2134

2135
  """
2136
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2137
  try:
2138
    hyper.VerifyHotplugSupport(instance, action, dev_type)
2139
  except errors.HotplugError, err:
2140
    _Fail("Hotplug is not supported: %s", err)
2141

    
2142
  if action == constants.HOTPLUG_ACTION_ADD:
2143
    fn = hyper.HotAddDevice
2144
  elif action == constants.HOTPLUG_ACTION_REMOVE:
2145
    fn = hyper.HotDelDevice
2146
  elif action == constants.HOTPLUG_ACTION_MODIFY:
2147
    fn = hyper.HotModDevice
2148
  else:
2149
    assert action in constants.HOTPLUG_ALL_ACTIONS
2150

    
2151
  return fn(instance, dev_type, device, extra, seq)
2152

    
2153

    
2154
def HotplugSupported(instance):
2155
  """Checks if hotplug is generally supported.
2156

2157
  """
2158
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2159
  try:
2160
    hyper.HotplugSupported(instance)
2161
  except errors.HotplugError, err:
2162
    _Fail("Hotplug is not supported: %s", err)
2163

    
2164

    
2165
def BlockdevCreate(disk, size, owner, on_primary, info, excl_stor):
2166
  """Creates a block device for an instance.
2167

2168
  @type disk: L{objects.Disk}
2169
  @param disk: the object describing the disk we should create
2170
  @type size: int
2171
  @param size: the size of the physical underlying device, in MiB
2172
  @type owner: str
2173
  @param owner: the name of the instance for which disk is created,
2174
      used for device cache data
2175
  @type on_primary: boolean
2176
  @param on_primary:  indicates if it is the primary node or not
2177
  @type info: string
2178
  @param info: string that will be sent to the physical device
2179
      creation, used for example to set (LVM) tags on LVs
2180
  @type excl_stor: boolean
2181
  @param excl_stor: Whether exclusive_storage is active
2182

2183
  @return: the new unique_id of the device (this can sometime be
2184
      computed only after creation), or None. On secondary nodes,
2185
      it's not required to return anything.
2186

2187
  """
2188
  # TODO: remove the obsolete "size" argument
2189
  # pylint: disable=W0613
2190
  clist = []
2191
  if disk.children:
2192
    for child in disk.children:
2193
      try:
2194
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
2195
      except errors.BlockDeviceError, err:
2196
        _Fail("Can't assemble device %s: %s", child, err)
2197
      if on_primary or disk.AssembleOnSecondary():
2198
        # we need the children open in case the device itself has to
2199
        # be assembled
2200
        try:
2201
          # pylint: disable=E1103
2202
          crdev.Open()
2203
        except errors.BlockDeviceError, err:
2204
          _Fail("Can't make child '%s' read-write: %s", child, err)
2205
      clist.append(crdev)
2206

    
2207
  try:
2208
    device = bdev.Create(disk, clist, excl_stor)
2209
  except errors.BlockDeviceError, err:
2210
    _Fail("Can't create block device: %s", err)
2211

    
2212
  if on_primary or disk.AssembleOnSecondary():
2213
    try:
2214
      device.Assemble()
2215
    except errors.BlockDeviceError, err:
2216
      _Fail("Can't assemble device after creation, unusual event: %s", err)
2217
    if on_primary or disk.OpenOnSecondary():
2218
      try:
2219
        device.Open(force=True)
2220
      except errors.BlockDeviceError, err:
2221
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
2222
    DevCacheManager.UpdateCache(device.dev_path, owner,
2223
                                on_primary, disk.iv_name)
2224

    
2225
  device.SetInfo(info)
2226

    
2227
  return device.unique_id
2228

    
2229

    
2230
def _WipeDevice(path, offset, size):
2231
  """This function actually wipes the device.
2232

2233
  @param path: The path to the device to wipe
2234
  @param offset: The offset in MiB in the file
2235
  @param size: The size in MiB to write
2236

2237
  """
2238
  # Internal sizes are always in Mebibytes; if the following "dd" command
2239
  # should use a different block size the offset and size given to this
2240
  # function must be adjusted accordingly before being passed to "dd".
2241
  block_size = 1024 * 1024
2242

    
2243
  cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
2244
         "bs=%s" % block_size, "oflag=direct", "of=%s" % path,
2245
         "count=%d" % size]
2246
  result = utils.RunCmd(cmd)
2247

    
2248
  if result.failed:
2249
    _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
2250
          result.fail_reason, result.output)
2251

    
2252

    
2253
def BlockdevWipe(disk, offset, size):
2254
  """Wipes a block device.
2255

2256
  @type disk: L{objects.Disk}
2257
  @param disk: the disk object we want to wipe
2258
  @type offset: int
2259
  @param offset: The offset in MiB in the file
2260
  @type size: int
2261
  @param size: The size in MiB to write
2262

2263
  """
2264
  try:
2265
    rdev = _RecursiveFindBD(disk)
2266
  except errors.BlockDeviceError:
2267
    rdev = None
2268

    
2269
  if not rdev:
2270
    _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
2271

    
2272
  # Do cross verify some of the parameters
2273
  if offset < 0:
2274
    _Fail("Negative offset")
2275
  if size < 0:
2276
    _Fail("Negative size")
2277
  if offset > rdev.size:
2278
    _Fail("Offset is bigger than device size")
2279
  if (offset + size) > rdev.size:
2280
    _Fail("The provided offset and size to wipe is bigger than device size")
2281

    
2282
  _WipeDevice(rdev.dev_path, offset, size)
2283

    
2284

    
2285
def BlockdevPauseResumeSync(disks, pause):
2286
  """Pause or resume the sync of the block device.
2287

2288
  @type disks: list of L{objects.Disk}
2289
  @param disks: the disks object we want to pause/resume
2290
  @type pause: bool
2291
  @param pause: Wheater to pause or resume
2292

2293
  """
2294
  success = []
2295
  for disk in disks:
2296
    try:
2297
      rdev = _RecursiveFindBD(disk)
2298
    except errors.BlockDeviceError:
2299
      rdev = None
2300

    
2301
    if not rdev:
2302
      success.append((False, ("Cannot change sync for device %s:"
2303
                              " device not found" % disk.iv_name)))
2304
      continue
2305

    
2306
    result = rdev.PauseResumeSync(pause)
2307

    
2308
    if result:
2309
      success.append((result, None))
2310
    else:
2311
      if pause:
2312
        msg = "Pause"
2313
      else:
2314
        msg = "Resume"
2315
      success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
2316

    
2317
  return success
2318

    
2319

    
2320
def BlockdevRemove(disk):
2321
  """Remove a block device.
2322

2323
  @note: This is intended to be called recursively.
2324

2325
  @type disk: L{objects.Disk}
2326
  @param disk: the disk object we should remove
2327
  @rtype: boolean
2328
  @return: the success of the operation
2329

2330
  """
2331
  msgs = []
2332
  try:
2333
    rdev = _RecursiveFindBD(disk)
2334
  except errors.BlockDeviceError, err:
2335
    # probably can't attach
2336
    logging.info("Can't attach to device %s in remove", disk)
2337
    rdev = None
2338
  if rdev is not None:
2339
    r_path = rdev.dev_path
2340

    
2341
    def _TryRemove():
2342
      try:
2343
        rdev.Remove()
2344
        return []
2345
      except errors.BlockDeviceError, err:
2346
        return [str(err)]
2347

    
2348
    msgs.extend(utils.SimpleRetry([], _TryRemove,
2349
                                  constants.DISK_REMOVE_RETRY_INTERVAL,
2350
                                  constants.DISK_REMOVE_RETRY_TIMEOUT))
2351

    
2352
    if not msgs:
2353
      DevCacheManager.RemoveCache(r_path)
2354

    
2355
  if disk.children:
2356
    for child in disk.children:
2357
      try:
2358
        BlockdevRemove(child)
2359
      except RPCFail, err:
2360
        msgs.append(str(err))
2361

    
2362
  if msgs:
2363
    _Fail("; ".join(msgs))
2364

    
2365

    
2366
def _RecursiveAssembleBD(disk, owner, as_primary):
2367
  """Activate a block device for an instance.
2368

2369
  This is run on the primary and secondary nodes for an instance.
2370

2371
  @note: this function is called recursively.
2372

2373
  @type disk: L{objects.Disk}
2374
  @param disk: the disk we try to assemble
2375
  @type owner: str
2376
  @param owner: the name of the instance which owns the disk
2377
  @type as_primary: boolean
2378
  @param as_primary: if we should make the block device
2379
      read/write
2380

2381
  @return: the assembled device or None (in case no device
2382
      was assembled)
2383
  @raise errors.BlockDeviceError: in case there is an error
2384
      during the activation of the children or the device
2385
      itself
2386

2387
  """
2388
  children = []
2389
  if disk.children:
2390
    mcn = disk.ChildrenNeeded()
2391
    if mcn == -1:
2392
      mcn = 0 # max number of Nones allowed
2393
    else:
2394
      mcn = len(disk.children) - mcn # max number of Nones
2395
    for chld_disk in disk.children:
2396
      try:
2397
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
2398
      except errors.BlockDeviceError, err:
2399
        if children.count(None) >= mcn:
2400
          raise
2401
        cdev = None
2402
        logging.error("Error in child activation (but continuing): %s",
2403
                      str(err))
2404
      children.append(cdev)
2405

    
2406
  if as_primary or disk.AssembleOnSecondary():
2407
    r_dev = bdev.Assemble(disk, children)
2408
    result = r_dev
2409
    if as_primary or disk.OpenOnSecondary():
2410
      r_dev.Open()
2411
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
2412
                                as_primary, disk.iv_name)
2413

    
2414
  else:
2415
    result = True
2416
  return result
2417

    
2418

    
2419
def BlockdevAssemble(disk, owner, as_primary, idx):
2420
  """Activate a block device for an instance.
2421

2422
  This is a wrapper over _RecursiveAssembleBD.
2423

2424
  @rtype: str or boolean
2425
  @return: a tuple with the C{/dev/...} path and the created symlink
2426
      for primary nodes, and (C{True}, C{True}) for secondary nodes
2427

2428
  """
2429
  try:
2430
    result = _RecursiveAssembleBD(disk, owner, as_primary)
2431
    if isinstance(result, BlockDev):
2432
      # pylint: disable=E1103
2433
      dev_path = result.dev_path
2434
      link_name = None
2435
      if as_primary:
2436
        link_name = _SymlinkBlockDev(owner, dev_path, idx)
2437
    elif result:
2438
      return result, result
2439
    else:
2440
      _Fail("Unexpected result from _RecursiveAssembleBD")
2441
  except errors.BlockDeviceError, err:
2442
    _Fail("Error while assembling disk: %s", err, exc=True)
2443
  except OSError, err:
2444
    _Fail("Error while symlinking disk: %s", err, exc=True)
2445

    
2446
  return dev_path, link_name
2447

    
2448

    
2449
def BlockdevShutdown(disk):
2450
  """Shut down a block device.
2451

2452
  First, if the device is assembled (Attach() is successful), then
2453
  the device is shutdown. Then the children of the device are
2454
  shutdown.
2455

2456
  This function is called recursively. Note that we don't cache the
2457
  children or such, as oppossed to assemble, shutdown of different
2458
  devices doesn't require that the upper device was active.
2459

2460
  @type disk: L{objects.Disk}
2461
  @param disk: the description of the disk we should
2462
      shutdown
2463
  @rtype: None
2464

2465
  """
2466
  msgs = []
2467
  r_dev = _RecursiveFindBD(disk)
2468
  if r_dev is not None:
2469
    r_path = r_dev.dev_path
2470
    try:
2471
      r_dev.Shutdown()
2472
      DevCacheManager.RemoveCache(r_path)
2473
    except errors.BlockDeviceError, err:
2474
      msgs.append(str(err))
2475

    
2476
  if disk.children:
2477
    for child in disk.children:
2478
      try:
2479
        BlockdevShutdown(child)
2480
      except RPCFail, err:
2481
        msgs.append(str(err))
2482

    
2483
  if msgs:
2484
    _Fail("; ".join(msgs))
2485

    
2486

    
2487
def BlockdevAddchildren(parent_cdev, new_cdevs):
2488
  """Extend a mirrored block device.
2489

2490
  @type parent_cdev: L{objects.Disk}
2491
  @param parent_cdev: the disk to which we should add children
2492
  @type new_cdevs: list of L{objects.Disk}
2493
  @param new_cdevs: the list of children which we should add
2494
  @rtype: None
2495

2496
  """
2497
  parent_bdev = _RecursiveFindBD(parent_cdev)
2498
  if parent_bdev is None:
2499
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
2500
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
2501
  if new_bdevs.count(None) > 0:
2502
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
2503
  parent_bdev.AddChildren(new_bdevs)
2504

    
2505

    
2506
def BlockdevRemovechildren(parent_cdev, new_cdevs):
2507
  """Shrink a mirrored block device.
2508

2509
  @type parent_cdev: L{objects.Disk}
2510
  @param parent_cdev: the disk from which we should remove children
2511
  @type new_cdevs: list of L{objects.Disk}
2512
  @param new_cdevs: the list of children which we should remove
2513
  @rtype: None
2514

2515
  """
2516
  parent_bdev = _RecursiveFindBD(parent_cdev)
2517
  if parent_bdev is None:
2518
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
2519
  devs = []
2520
  for disk in new_cdevs:
2521
    rpath = disk.StaticDevPath()
2522
    if rpath is None:
2523
      bd = _RecursiveFindBD(disk)
2524
      if bd is None:
2525
        _Fail("Can't find device %s while removing children", disk)
2526
      else:
2527
        devs.append(bd.dev_path)
2528
    else:
2529
      if not utils.IsNormAbsPath(rpath):
2530
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
2531
      devs.append(rpath)
2532
  parent_bdev.RemoveChildren(devs)
2533

    
2534

    
2535
def BlockdevGetmirrorstatus(disks):
2536
  """Get the mirroring status of a list of devices.
2537

2538
  @type disks: list of L{objects.Disk}
2539
  @param disks: the list of disks which we should query
2540
  @rtype: disk
2541
  @return: List of L{objects.BlockDevStatus}, one for each disk
2542
  @raise errors.BlockDeviceError: if any of the disks cannot be
2543
      found
2544

2545
  """
2546
  stats = []
2547
  for dsk in disks:
2548
    rbd = _RecursiveFindBD(dsk)
2549
    if rbd is None:
2550
      _Fail("Can't find device %s", dsk)
2551

    
2552
    stats.append(rbd.CombinedSyncStatus())
2553

    
2554
  return stats
2555

    
2556

    
2557
def BlockdevGetmirrorstatusMulti(disks):
2558
  """Get the mirroring status of a list of devices.
2559

2560
  @type disks: list of L{objects.Disk}
2561
  @param disks: the list of disks which we should query
2562
  @rtype: disk
2563
  @return: List of tuples, (bool, status), one for each disk; bool denotes
2564
    success/failure, status is L{objects.BlockDevStatus} on success, string
2565
    otherwise
2566

2567
  """
2568
  result = []
2569
  for disk in disks:
2570
    try:
2571
      rbd = _RecursiveFindBD(disk)
2572
      if rbd is None:
2573
        result.append((False, "Can't find device %s" % disk))
2574
        continue
2575

    
2576
      status = rbd.CombinedSyncStatus()
2577
    except errors.BlockDeviceError, err:
2578
      logging.exception("Error while getting disk status")
2579
      result.append((False, str(err)))
2580
    else:
2581
      result.append((True, status))
2582

    
2583
  assert len(disks) == len(result)
2584

    
2585
  return result
2586

    
2587

    
2588
def _RecursiveFindBD(disk):
2589
  """Check if a device is activated.
2590

2591
  If so, return information about the real device.
2592

2593
  @type disk: L{objects.Disk}
2594
  @param disk: the disk object we need to find
2595

2596
  @return: None if the device can't be found,
2597
      otherwise the device instance
2598

2599
  """
2600
  children = []
2601
  if disk.children:
2602
    for chdisk in disk.children:
2603
      children.append(_RecursiveFindBD(chdisk))
2604

    
2605
  return bdev.FindDevice(disk, children)
2606

    
2607

    
2608
def _OpenRealBD(disk):
2609
  """Opens the underlying block device of a disk.
2610

2611
  @type disk: L{objects.Disk}
2612
  @param disk: the disk object we want to open
2613

2614
  """
2615
  real_disk = _RecursiveFindBD(disk)
2616
  if real_disk is None:
2617
    _Fail("Block device '%s' is not set up", disk)
2618

    
2619
  real_disk.Open()
2620

    
2621
  return real_disk
2622

    
2623

    
2624
def BlockdevFind(disk):
2625
  """Check if a device is activated.
2626

2627
  If it is, return information about the real device.
2628

2629
  @type disk: L{objects.Disk}
2630
  @param disk: the disk to find
2631
  @rtype: None or objects.BlockDevStatus
2632
  @return: None if the disk cannot be found, otherwise a the current
2633
           information
2634

2635
  """
2636
  try:
2637
    rbd = _RecursiveFindBD(disk)
2638
  except errors.BlockDeviceError, err:
2639
    _Fail("Failed to find device: %s", err, exc=True)
2640

    
2641
  if rbd is None:
2642
    return None
2643

    
2644
  return rbd.GetSyncStatus()
2645

    
2646

    
2647
def BlockdevGetdimensions(disks):
2648
  """Computes the size of the given disks.
2649

2650
  If a disk is not found, returns None instead.
2651

2652
  @type disks: list of L{objects.Disk}
2653
  @param disks: the list of disk to compute the size for
2654
  @rtype: list
2655
  @return: list with elements None if the disk cannot be found,
2656
      otherwise the pair (size, spindles), where spindles is None if the
2657
      device doesn't support that
2658

2659
  """
2660
  result = []
2661
  for cf in disks:
2662
    try:
2663
      rbd = _RecursiveFindBD(cf)
2664
    except errors.BlockDeviceError:
2665
      result.append(None)
2666
      continue
2667
    if rbd is None:
2668
      result.append(None)
2669
    else:
2670
      result.append(rbd.GetActualDimensions())
2671
  return result
2672

    
2673

    
2674
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2675
  """Write a file to the filesystem.
2676

2677
  This allows the master to overwrite(!) a file. It will only perform
2678
  the operation if the file belongs to a list of configuration files.
2679

2680
  @type file_name: str
2681
  @param file_name: the target file name
2682
  @type data: str
2683
  @param data: the new contents of the file
2684
  @type mode: int
2685
  @param mode: the mode to give the file (can be None)
2686
  @type uid: string
2687
  @param uid: the owner of the file
2688
  @type gid: string
2689
  @param gid: the group of the file
2690
  @type atime: float
2691
  @param atime: the atime to set on the file (can be None)
2692
  @type mtime: float
2693
  @param mtime: the mtime to set on the file (can be None)
2694
  @rtype: None
2695

2696
  """
2697
  file_name = vcluster.LocalizeVirtualPath(file_name)
2698

    
2699
  if not os.path.isabs(file_name):
2700
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2701

    
2702
  if file_name not in _ALLOWED_UPLOAD_FILES:
2703
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2704
          file_name)
2705

    
2706
  raw_data = _Decompress(data)
2707

    
2708
  if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2709
    _Fail("Invalid username/groupname type")
2710

    
2711
  getents = runtime.GetEnts()
2712
  uid = getents.LookupUser(uid)
2713
  gid = getents.LookupGroup(gid)
2714

    
2715
  utils.SafeWriteFile(file_name, None,
2716
                      data=raw_data, mode=mode, uid=uid, gid=gid,
2717
                      atime=atime, mtime=mtime)
2718

    
2719

    
2720
def RunOob(oob_program, command, node, timeout):
2721
  """Executes oob_program with given command on given node.
2722

2723
  @param oob_program: The path to the executable oob_program
2724
  @param command: The command to invoke on oob_program
2725
  @param node: The node given as an argument to the program
2726
  @param timeout: Timeout after which we kill the oob program
2727

2728
  @return: stdout
2729
  @raise RPCFail: If execution fails for some reason
2730

2731
  """
2732
  result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2733

    
2734
  if result.failed:
2735
    _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2736
          result.fail_reason, result.output)
2737

    
2738
  return result.stdout
2739

    
2740

    
2741
def _OSOndiskAPIVersion(os_dir):
2742
  """Compute and return the API version of a given OS.
2743

2744
  This function will try to read the API version of the OS residing in
2745
  the 'os_dir' directory.
2746

2747
  @type os_dir: str
2748
  @param os_dir: the directory in which we should look for the OS
2749
  @rtype: tuple
2750
  @return: tuple (status, data) with status denoting the validity and
2751
      data holding either the vaid versions or an error message
2752

2753
  """
2754
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2755

    
2756
  try:
2757
    st = os.stat(api_file)
2758
  except EnvironmentError, err:
2759
    return False, ("Required file '%s' not found under path %s: %s" %
2760
                   (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err)))
2761

    
2762
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2763
    return False, ("File '%s' in %s is not a regular file" %
2764
                   (constants.OS_API_FILE, os_dir))
2765

    
2766
  try:
2767
    api_versions = utils.ReadFile(api_file).splitlines()
2768
  except EnvironmentError, err:
2769
    return False, ("Error while reading the API version file at %s: %s" %
2770
                   (api_file, utils.ErrnoOrStr(err)))
2771

    
2772
  try:
2773
    api_versions = [int(version.strip()) for version in api_versions]
2774
  except (TypeError, ValueError), err:
2775
    return False, ("API version(s) can't be converted to integer: %s" %
2776
                   str(err))
2777

    
2778
  return True, api_versions
2779

    
2780

    
2781
def DiagnoseOS(top_dirs=None):
2782
  """Compute the validity for all OSes.
2783

2784
  @type top_dirs: list
2785
  @param top_dirs: the list of directories in which to
2786
      search (if not given defaults to
2787
      L{pathutils.OS_SEARCH_PATH})
2788
  @rtype: list of L{objects.OS}
2789
  @return: a list of tuples (name, path, status, diagnose, variants,
2790
      parameters, api_version) for all (potential) OSes under all
2791
      search paths, where:
2792
          - name is the (potential) OS name
2793
          - path is the full path to the OS
2794
          - status True/False is the validity of the OS
2795
          - diagnose is the error message for an invalid OS, otherwise empty
2796
          - variants is a list of supported OS variants, if any
2797
          - parameters is a list of (name, help) parameters, if any
2798
          - api_version is a list of support OS API versions
2799

2800
  """
2801
  if top_dirs is None:
2802
    top_dirs = pathutils.OS_SEARCH_PATH
2803

    
2804
  result = []
2805
  for dir_name in top_dirs:
2806
    if os.path.isdir(dir_name):
2807
      try:
2808
        f_names = utils.ListVisibleFiles(dir_name)
2809
      except EnvironmentError, err:
2810
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2811
        break
2812
      for name in f_names:
2813
        os_path = utils.PathJoin(dir_name, name)
2814
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2815
        if status:
2816
          diagnose = ""
2817
          variants = os_inst.supported_variants
2818
          parameters = os_inst.supported_parameters
2819
          api_versions = os_inst.api_versions
2820
        else:
2821
          diagnose = os_inst
2822
          variants = parameters = api_versions = []
2823
        result.append((name, os_path, status, diagnose, variants,
2824
                       parameters, api_versions))
2825

    
2826
  return result
2827

    
2828

    
2829
def _TryOSFromDisk(name, base_dir=None):
2830
  """Create an OS instance from disk.
2831

2832
  This function will return an OS instance if the given name is a
2833
  valid OS name.
2834

2835
  @type base_dir: string
2836
  @keyword base_dir: Base directory containing OS installations.
2837
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2838
  @rtype: tuple
2839
  @return: success and either the OS instance if we find a valid one,
2840
      or error message
2841

2842
  """
2843
  if base_dir is None:
2844
    os_dir = utils.FindFile(name, pathutils.OS_SEARCH_PATH, os.path.isdir)
2845
  else:
2846
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2847

    
2848
  if os_dir is None:
2849
    return False, "Directory for OS %s not found in search path" % name
2850

    
2851
  status, api_versions = _OSOndiskAPIVersion(os_dir)
2852
  if not status:
2853
    # push the error up
2854
    return status, api_versions
2855

    
2856
  if not constants.OS_API_VERSIONS.intersection(api_versions):
2857
    return False, ("API version mismatch for path '%s': found %s, want %s." %
2858
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
2859

    
2860
  # OS Files dictionary, we will populate it with the absolute path
2861
  # names; if the value is True, then it is a required file, otherwise
2862
  # an optional one
2863
  os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2864

    
2865
  if max(api_versions) >= constants.OS_API_V15:
2866
    os_files[constants.OS_VARIANTS_FILE] = False
2867

    
2868
  if max(api_versions) >= constants.OS_API_V20:
2869
    os_files[constants.OS_PARAMETERS_FILE] = True
2870
  else:
2871
    del os_files[constants.OS_SCRIPT_VERIFY]
2872

    
2873
  for (filename, required) in os_files.items():
2874
    os_files[filename] = utils.PathJoin(os_dir, filename)
2875

    
2876
    try:
2877
      st = os.stat(os_files[filename])
2878
    except EnvironmentError, err:
2879
      if err.errno == errno.ENOENT and not required:
2880
        del os_files[filename]
2881
        continue
2882
      return False, ("File '%s' under path '%s' is missing (%s)" %
2883
                     (filename, os_dir, utils.ErrnoOrStr(err)))
2884

    
2885
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2886
      return False, ("File '%s' under path '%s' is not a regular file" %
2887
                     (filename, os_dir))
2888

    
2889
    if filename in constants.OS_SCRIPTS:
2890
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2891
        return False, ("File '%s' under path '%s' is not executable" %
2892
                       (filename, os_dir))
2893

    
2894
  variants = []
2895
  if constants.OS_VARIANTS_FILE in os_files:
2896
    variants_file = os_files[constants.OS_VARIANTS_FILE]
2897
    try:
2898
      variants = \
2899
        utils.FilterEmptyLinesAndComments(utils.ReadFile(variants_file))
2900
    except EnvironmentError, err:
2901
      # we accept missing files, but not other errors
2902
      if err.errno != errno.ENOENT:
2903
        return False, ("Error while reading the OS variants file at %s: %s" %
2904
                       (variants_file, utils.ErrnoOrStr(err)))
2905

    
2906
  parameters = []
2907
  if constants.OS_PARAMETERS_FILE in os_files:
2908
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2909
    try:
2910
      parameters = utils.ReadFile(parameters_file).splitlines()
2911
    except EnvironmentError, err:
2912
      return False, ("Error while reading the OS parameters file at %s: %s" %
2913
                     (parameters_file, utils.ErrnoOrStr(err)))
2914
    parameters = [v.split(None, 1) for v in parameters]
2915

    
2916
  os_obj = objects.OS(name=name, path=os_dir,
2917
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
2918
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
2919
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
2920
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
2921
                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2922
                                                 None),
2923
                      supported_variants=variants,
2924
                      supported_parameters=parameters,
2925
                      api_versions=api_versions)
2926
  return True, os_obj
2927

    
2928

    
2929
def OSFromDisk(name, base_dir=None):
2930
  """Create an OS instance from disk.
2931

2932
  This function will return an OS instance if the given name is a
2933
  valid OS name. Otherwise, it will raise an appropriate
2934
  L{RPCFail} exception, detailing why this is not a valid OS.
2935

2936
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2937
  an exception but returns true/false status data.
2938

2939
  @type base_dir: string
2940
  @keyword base_dir: Base directory containing OS installations.
2941
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2942
  @rtype: L{objects.OS}
2943
  @return: the OS instance if we find a valid one
2944
  @raise RPCFail: if we don't find a valid OS
2945

2946
  """
2947
  name_only = objects.OS.GetName(name)
2948
  status, payload = _TryOSFromDisk(name_only, base_dir)
2949

    
2950
  if not status:
2951
    _Fail(payload)
2952

    
2953
  return payload
2954

    
2955

    
2956
def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2957
  """Calculate the basic environment for an os script.
2958

2959
  @type os_name: str
2960
  @param os_name: full operating system name (including variant)
2961
  @type inst_os: L{objects.OS}
2962
  @param inst_os: operating system for which the environment is being built
2963
  @type os_params: dict
2964
  @param os_params: the OS parameters
2965
  @type debug: integer
2966
  @param debug: debug level (0 or 1, for OS Api 10)
2967
  @rtype: dict
2968
  @return: dict of environment variables
2969
  @raise errors.BlockDeviceError: if the block device
2970
      cannot be found
2971

2972
  """
2973
  result = {}
2974
  api_version = \
2975
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2976
  result["OS_API_VERSION"] = "%d" % api_version
2977
  result["OS_NAME"] = inst_os.name
2978
  result["DEBUG_LEVEL"] = "%d" % debug
2979

    
2980
  # OS variants
2981
  if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2982
    variant = objects.OS.GetVariant(os_name)
2983
    if not variant:
2984
      variant = inst_os.supported_variants[0]
2985
  else:
2986
    variant = ""
2987
  result["OS_VARIANT"] = variant
2988

    
2989
  # OS params
2990
  for pname, pvalue in os_params.items():
2991
    result["OSP_%s" % pname.upper()] = pvalue
2992

    
2993
  # Set a default path otherwise programs called by OS scripts (or
2994
  # even hooks called from OS scripts) might break, and we don't want
2995
  # to have each script require setting a PATH variable
2996
  result["PATH"] = constants.HOOKS_PATH
2997

    
2998
  return result
2999

    
3000

    
3001
def OSEnvironment(instance, inst_os, debug=0):
3002
  """Calculate the environment for an os script.
3003

3004
  @type instance: L{objects.Instance}
3005
  @param instance: target instance for the os script run
3006
  @type inst_os: L{objects.OS}
3007
  @param inst_os: operating system for which the environment is being built
3008
  @type debug: integer
3009
  @param debug: debug level (0 or 1, for OS Api 10)
3010
  @rtype: dict
3011
  @return: dict of environment variables
3012
  @raise errors.BlockDeviceError: if the block device
3013
      cannot be found
3014

3015
  """
3016
  result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
3017

    
3018
  for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
3019
    result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
3020

    
3021
  result["HYPERVISOR"] = instance.hypervisor
3022
  result["DISK_COUNT"] = "%d" % len(instance.disks)
3023
  result["NIC_COUNT"] = "%d" % len(instance.nics)
3024
  result["INSTANCE_SECONDARY_NODES"] = \
3025
      ("%s" % " ".join(instance.secondary_nodes))
3026

    
3027
  # Disks
3028
  for idx, disk in enumerate(instance.disks):
3029
    real_disk = _OpenRealBD(disk)
3030
    result["DISK_%d_PATH" % idx] = real_disk.dev_path
3031
    result["DISK_%d_ACCESS" % idx] = disk.mode
3032
    result["DISK_%d_UUID" % idx] = disk.uuid
3033
    if disk.name:
3034
      result["DISK_%d_NAME" % idx] = disk.name
3035
    if constants.HV_DISK_TYPE in instance.hvparams:
3036
      result["DISK_%d_FRONTEND_TYPE" % idx] = \
3037
        instance.hvparams[constants.HV_DISK_TYPE]
3038
    if disk.dev_type in constants.DTS_BLOCK:
3039
      result["DISK_%d_BACKEND_TYPE" % idx] = "block"
3040
    elif disk.dev_type in constants.DTS_FILEBASED:
3041
      result["DISK_%d_BACKEND_TYPE" % idx] = \
3042
        "file:%s" % disk.logical_id[0]
3043

    
3044
  # NICs
3045
  for idx, nic in enumerate(instance.nics):
3046
    result["NIC_%d_MAC" % idx] = nic.mac
3047
    result["NIC_%d_UUID" % idx] = nic.uuid
3048
    if nic.name:
3049
      result["NIC_%d_NAME" % idx] = nic.name
3050
    if nic.ip:
3051
      result["NIC_%d_IP" % idx] = nic.ip
3052
    result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
3053
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3054
      result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
3055
    if nic.nicparams[constants.NIC_LINK]:
3056
      result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
3057
    if nic.netinfo:
3058
      nobj = objects.Network.FromDict(nic.netinfo)
3059
      result.update(nobj.HooksDict("NIC_%d_" % idx))
3060
    if constants.HV_NIC_TYPE in instance.hvparams:
3061
      result["NIC_%d_FRONTEND_TYPE" % idx] = \
3062
        instance.hvparams[constants.HV_NIC_TYPE]
3063

    
3064
  # HV/BE params
3065
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
3066
    for key, value in source.items():
3067
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
3068

    
3069
  return result
3070

    
3071

    
3072
def DiagnoseExtStorage(top_dirs=None):
3073
  """Compute the validity for all ExtStorage Providers.
3074

3075
  @type top_dirs: list
3076
  @param top_dirs: the list of directories in which to
3077
      search (if not given defaults to
3078
      L{pathutils.ES_SEARCH_PATH})
3079
  @rtype: list of L{objects.ExtStorage}
3080
  @return: a list of tuples (name, path, status, diagnose, parameters)
3081
      for all (potential) ExtStorage Providers under all
3082
      search paths, where:
3083
          - name is the (potential) ExtStorage Provider
3084
          - path is the full path to the ExtStorage Provider
3085
          - status True/False is the validity of the ExtStorage Provider
3086
          - diagnose is the error message for an invalid ExtStorage Provider,
3087
            otherwise empty
3088
          - parameters is a list of (name, help) parameters, if any
3089

3090
  """
3091
  if top_dirs is None:
3092
    top_dirs = pathutils.ES_SEARCH_PATH
3093

    
3094
  result = []
3095
  for dir_name in top_dirs:
3096
    if os.path.isdir(dir_name):
3097
      try:
3098
        f_names = utils.ListVisibleFiles(dir_name)
3099
      except EnvironmentError, err:
3100
        logging.exception("Can't list the ExtStorage directory %s: %s",
3101
                          dir_name, err)
3102
        break
3103
      for name in f_names:
3104
        es_path = utils.PathJoin(dir_name, name)
3105
        status, es_inst = bdev.ExtStorageFromDisk(name, base_dir=dir_name)
3106
        if status:
3107
          diagnose = ""
3108
          parameters = es_inst.supported_parameters
3109
        else:
3110
          diagnose = es_inst
3111
          parameters = []
3112
        result.append((name, es_path, status, diagnose, parameters))
3113

    
3114
  return result
3115

    
3116

    
3117
def BlockdevGrow(disk, amount, dryrun, backingstore, excl_stor):
3118
  """Grow a stack of block devices.
3119

3120
  This function is called recursively, with the childrens being the
3121
  first ones to resize.
3122

3123
  @type disk: L{objects.Disk}
3124
  @param disk: the disk to be grown
3125
  @type amount: integer
3126
  @param amount: the amount (in mebibytes) to grow with
3127
  @type dryrun: boolean
3128
  @param dryrun: whether to execute the operation in simulation mode
3129
      only, without actually increasing the size
3130
  @param backingstore: whether to execute the operation on backing storage
3131
      only, or on "logical" storage only; e.g. DRBD is logical storage,
3132
      whereas LVM, file, RBD are backing storage
3133
  @rtype: (status, result)
3134
  @type excl_stor: boolean
3135
  @param excl_stor: Whether exclusive_storage is active
3136
  @return: a tuple with the status of the operation (True/False), and
3137
      the errors message if status is False
3138

3139
  """
3140
  r_dev = _RecursiveFindBD(disk)
3141
  if r_dev is None:
3142
    _Fail("Cannot find block device %s", disk)
3143

    
3144
  try:
3145
    r_dev.Grow(amount, dryrun, backingstore, excl_stor)
3146
  except errors.BlockDeviceError, err:
3147
    _Fail("Failed to grow block device: %s", err, exc=True)
3148

    
3149

    
3150
def BlockdevSnapshot(disk):
3151
  """Create a snapshot copy of a block device.
3152

3153
  This function is called recursively, and the snapshot is actually created
3154
  just for the leaf lvm backend device.
3155

3156
  @type disk: L{objects.Disk}
3157
  @param disk: the disk to be snapshotted
3158
  @rtype: string
3159
  @return: snapshot disk ID as (vg, lv)
3160

3161
  """
3162
  if disk.dev_type == constants.DT_DRBD8:
3163
    if not disk.children:
3164
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
3165
            disk.unique_id)
3166
    return BlockdevSnapshot(disk.children[0])
3167
  elif disk.dev_type == constants.DT_PLAIN:
3168
    r_dev = _RecursiveFindBD(disk)
3169
    if r_dev is not None:
3170
      # FIXME: choose a saner value for the snapshot size
3171
      # let's stay on the safe side and ask for the full size, for now
3172
      return r_dev.Snapshot(disk.size)
3173
    else:
3174
      _Fail("Cannot find block device %s", disk)
3175
  else:
3176
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
3177
          disk.unique_id, disk.dev_type)
3178

    
3179

    
3180
def BlockdevSetInfo(disk, info):
3181
  """Sets 'metadata' information on block devices.
3182

3183
  This function sets 'info' metadata on block devices. Initial
3184
  information is set at device creation; this function should be used
3185
  for example after renames.
3186

3187
  @type disk: L{objects.Disk}
3188
  @param disk: the disk to be grown
3189
  @type info: string
3190
  @param info: new 'info' metadata
3191
  @rtype: (status, result)
3192
  @return: a tuple with the status of the operation (True/False), and
3193
      the errors message if status is False
3194

3195
  """
3196
  r_dev = _RecursiveFindBD(disk)
3197
  if r_dev is None:
3198
    _Fail("Cannot find block device %s", disk)
3199

    
3200
  try:
3201
    r_dev.SetInfo(info)
3202
  except errors.BlockDeviceError, err:
3203
    _Fail("Failed to set information on block device: %s", err, exc=True)
3204

    
3205

    
3206
def FinalizeExport(instance, snap_disks):
3207
  """Write out the export configuration information.
3208

3209
  @type instance: L{objects.Instance}
3210
  @param instance: the instance which we export, used for
3211
      saving configuration
3212
  @type snap_disks: list of L{objects.Disk}
3213
  @param snap_disks: list of snapshot block devices, which
3214
      will be used to get the actual name of the dump file
3215

3216
  @rtype: None
3217

3218
  """
3219
  destdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name + ".new")
3220
  finaldestdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name)
3221

    
3222
  config = objects.SerializableConfigParser()
3223

    
3224
  config.add_section(constants.INISECT_EXP)
3225
  config.set(constants.INISECT_EXP, "version", "0")
3226
  config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
3227
  config.set(constants.INISECT_EXP, "source", instance.primary_node)
3228
  config.set(constants.INISECT_EXP, "os", instance.os)
3229
  config.set(constants.INISECT_EXP, "compression", "none")
3230

    
3231
  config.add_section(constants.INISECT_INS)
3232
  config.set(constants.INISECT_INS, "name", instance.name)
3233
  config.set(constants.INISECT_INS, "maxmem", "%d" %
3234
             instance.beparams[constants.BE_MAXMEM])
3235
  config.set(constants.INISECT_INS, "minmem", "%d" %
3236
             instance.beparams[constants.BE_MINMEM])
3237
  # "memory" is deprecated, but useful for exporting to old ganeti versions
3238
  config.set(constants.INISECT_INS, "memory", "%d" %
3239
             instance.beparams[constants.BE_MAXMEM])
3240
  config.set(constants.INISECT_INS, "vcpus", "%d" %
3241
             instance.beparams[constants.BE_VCPUS])
3242
  config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
3243
  config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
3244
  config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
3245

    
3246
  nic_total = 0
3247
  for nic_count, nic in enumerate(instance.nics):
3248
    nic_total += 1
3249
    config.set(constants.INISECT_INS, "nic%d_mac" %
3250
               nic_count, "%s" % nic.mac)
3251
    config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
3252
    config.set(constants.INISECT_INS, "nic%d_network" % nic_count,
3253
               "%s" % nic.network)
3254
    config.set(constants.INISECT_INS, "nic%d_name" % nic_count,
3255
               "%s" % nic.name)
3256
    for param in constants.NICS_PARAMETER_TYPES:
3257
      config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
3258
                 "%s" % nic.nicparams.get(param, None))
3259
  # TODO: redundant: on load can read nics until it doesn't exist
3260
  config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
3261

    
3262
  disk_total = 0
3263
  for disk_count, disk in enumerate(snap_disks):
3264
    if disk:
3265
      disk_total += 1
3266
      config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
3267
                 ("%s" % disk.iv_name))
3268
      config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
3269
                 ("%s" % disk.logical_id[1]))
3270
      config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
3271
                 ("%d" % disk.size))
3272
      config.set(constants.INISECT_INS, "disk%d_name" % disk_count,
3273
                 "%s" % disk.name)
3274

    
3275
  config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
3276

    
3277
  # New-style hypervisor/backend parameters
3278

    
3279
  config.add_section(constants.INISECT_HYP)
3280
  for name, value in instance.hvparams.items():
3281
    if name not in constants.HVC_GLOBALS:
3282
      config.set(constants.INISECT_HYP, name, str(value))
3283

    
3284
  config.add_section(constants.INISECT_BEP)
3285
  for name, value in instance.beparams.items():
3286
    config.set(constants.INISECT_BEP, name, str(value))
3287

    
3288
  config.add_section(constants.INISECT_OSP)
3289
  for name, value in instance.osparams.items():
3290
    config.set(constants.INISECT_OSP, name, str(value))
3291

    
3292
  config.add_section(constants.INISECT_OSP_PRIVATE)
3293
  for name, value in instance.osparams_private.items():
3294
    config.set(constants.INISECT_OSP_PRIVATE, name, str(value.Get()))
3295

    
3296
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
3297
                  data=config.Dumps())
3298
  shutil.rmtree(finaldestdir, ignore_errors=True)
3299
  shutil.move(destdir, finaldestdir)
3300

    
3301

    
3302
def ExportInfo(dest):
3303
  """Get export configuration information.
3304

3305
  @type dest: str
3306
  @param dest: directory containing the export
3307

3308
  @rtype: L{objects.SerializableConfigParser}
3309
  @return: a serializable config file containing the
3310
      export info
3311

3312
  """
3313
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
3314

    
3315
  config = objects.SerializableConfigParser()
3316
  config.read(cff)
3317

    
3318
  if (not config.has_section(constants.INISECT_EXP) or
3319
      not config.has_section(constants.INISECT_INS)):
3320
    _Fail("Export info file doesn't have the required fields")
3321

    
3322
  return config.Dumps()
3323

    
3324

    
3325
def ListExports():
3326
  """Return a list of exports currently available on this machine.
3327

3328
  @rtype: list
3329
  @return: list of the exports
3330

3331
  """
3332
  if os.path.isdir(pathutils.EXPORT_DIR):
3333
    return sorted(utils.ListVisibleFiles(pathutils.EXPORT_DIR))
3334
  else:
3335
    _Fail("No exports directory")
3336

    
3337

    
3338
def RemoveExport(export):
3339
  """Remove an existing export from the node.
3340

3341
  @type export: str
3342
  @param export: the name of the export to remove
3343
  @rtype: None
3344

3345
  """
3346
  target = utils.PathJoin(pathutils.EXPORT_DIR, export)
3347

    
3348
  try:
3349
    shutil.rmtree(target)
3350
  except EnvironmentError, err:
3351
    _Fail("Error while removing the export: %s", err, exc=True)
3352

    
3353

    
3354
def BlockdevRename(devlist):
3355
  """Rename a list of block devices.
3356

3357
  @type devlist: list of tuples
3358
  @param devlist: list of tuples of the form  (disk, new_unique_id); disk is
3359
      an L{objects.Disk} object describing the current disk, and new
3360
      unique_id is the name we rename it to
3361
  @rtype: boolean
3362
  @return: True if all renames succeeded, False otherwise
3363

3364
  """
3365
  msgs = []
3366
  result = True
3367
  for disk, unique_id in devlist:
3368
    dev = _RecursiveFindBD(disk)
3369
    if dev is None:
3370
      msgs.append("Can't find device %s in rename" % str(disk))
3371
      result = False
3372
      continue
3373
    try:
3374
      old_rpath = dev.dev_path
3375
      dev.Rename(unique_id)
3376
      new_rpath = dev.dev_path
3377
      if old_rpath != new_rpath:
3378
        DevCacheManager.RemoveCache(old_rpath)
3379
        # FIXME: we should add the new cache information here, like:
3380
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
3381
        # but we don't have the owner here - maybe parse from existing
3382
        # cache? for now, we only lose lvm data when we rename, which
3383
        # is less critical than DRBD or MD
3384
    except errors.BlockDeviceError, err:
3385
      msgs.append("Can't rename device '%s' to '%s': %s" %
3386
                  (dev, unique_id, err))
3387
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
3388
      result = False
3389
  if not result:
3390
    _Fail("; ".join(msgs))
3391

    
3392

    
3393
def _TransformFileStorageDir(fs_dir):
3394
  """Checks whether given file_storage_dir is valid.
3395

3396
  Checks wheter the given fs_dir is within the cluster-wide default
3397
  file_storage_dir or the shared_file_storage_dir, which are stored in
3398
  SimpleStore. Only paths under those directories are allowed.
3399

3400
  @type fs_dir: str
3401
  @param fs_dir: the path to check
3402

3403
  @return: the normalized path if valid, None otherwise
3404

3405
  """
3406
  filestorage.CheckFileStoragePath(fs_dir)
3407

    
3408
  return os.path.normpath(fs_dir)
3409

    
3410

    
3411
def CreateFileStorageDir(file_storage_dir):
3412
  """Create file storage directory.
3413

3414
  @type file_storage_dir: str
3415
  @param file_storage_dir: directory to create
3416

3417
  @rtype: tuple
3418
  @return: tuple with first element a boolean indicating wheter dir
3419
      creation was successful or not
3420

3421
  """
3422
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
3423
  if os.path.exists(file_storage_dir):
3424
    if not os.path.isdir(file_storage_dir):
3425
      _Fail("Specified storage dir '%s' is not a directory",
3426
            file_storage_dir)
3427
  else:
3428
    try:
3429
      os.makedirs(file_storage_dir, 0750)
3430
    except OSError, err:
3431
      _Fail("Cannot create file storage directory '%s': %s",
3432
            file_storage_dir, err, exc=True)
3433

    
3434

    
3435
def RemoveFileStorageDir(file_storage_dir):
3436
  """Remove file storage directory.
3437

3438
  Remove it only if it's empty. If not log an error and return.
3439

3440
  @type file_storage_dir: str
3441
  @param file_storage_dir: the directory we should cleanup
3442
  @rtype: tuple (success,)
3443
  @return: tuple of one element, C{success}, denoting
3444
      whether the operation was successful
3445

3446
  """
3447
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
3448
  if os.path.exists(file_storage_dir):
3449
    if not os.path.isdir(file_storage_dir):
3450
      _Fail("Specified Storage directory '%s' is not a directory",
3451
            file_storage_dir)
3452
    # deletes dir only if empty, otherwise we want to fail the rpc call
3453
    try:
3454
      os.rmdir(file_storage_dir)
3455
    except OSError, err:
3456
      _Fail("Cannot remove file storage directory '%s': %s",
3457
            file_storage_dir, err)
3458

    
3459

    
3460
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
3461
  """Rename the file storage directory.
3462

3463
  @type old_file_storage_dir: str
3464
  @param old_file_storage_dir: the current path
3465
  @type new_file_storage_dir: str
3466
  @param new_file_storage_dir: the name we should rename to
3467
  @rtype: tuple (success,)
3468
  @return: tuple of one element, C{success}, denoting
3469
      whether the operation was successful
3470

3471
  """
3472
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
3473
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
3474
  if not os.path.exists(new_file_storage_dir):
3475
    if os.path.isdir(old_file_storage_dir):
3476
      try:
3477
        os.rename(old_file_storage_dir, new_file_storage_dir)
3478
      except OSError, err:
3479
        _Fail("Cannot rename '%s' to '%s': %s",
3480
              old_file_storage_dir, new_file_storage_dir, err)
3481
    else:
3482
      _Fail("Specified storage dir '%s' is not a directory",
3483
            old_file_storage_dir)
3484
  else:
3485
    if os.path.exists(old_file_storage_dir):
3486
      _Fail("Cannot rename '%s' to '%s': both locations exist",
3487
            old_file_storage_dir, new_file_storage_dir)
3488

    
3489

    
3490
def _EnsureJobQueueFile(file_name):
3491
  """Checks whether the given filename is in the queue directory.
3492

3493
  @type file_name: str
3494
  @param file_name: the file name we should check
3495
  @rtype: None
3496
  @raises RPCFail: if the file is not valid
3497

3498
  """
3499
  if not utils.IsBelowDir(pathutils.QUEUE_DIR, file_name):
3500
    _Fail("Passed job queue file '%s' does not belong to"
3501
          " the queue directory '%s'", file_name, pathutils.QUEUE_DIR)
3502

    
3503

    
3504
def JobQueueUpdate(file_name, content):
3505
  """Updates a file in the queue directory.
3506

3507
  This is just a wrapper over L{utils.io.WriteFile}, with proper
3508
  checking.
3509

3510
  @type file_name: str
3511
  @param file_name: the job file name
3512
  @type content: str
3513
  @param content: the new job contents
3514
  @rtype: boolean
3515
  @return: the success of the operation
3516

3517
  """
3518
  file_name = vcluster.LocalizeVirtualPath(file_name)
3519

    
3520
  _EnsureJobQueueFile(file_name)
3521
  getents = runtime.GetEnts()
3522

    
3523
  # Write and replace the file atomically
3524
  utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
3525
                  gid=getents.daemons_gid, mode=constants.JOB_QUEUE_FILES_PERMS)
3526

    
3527

    
3528
def JobQueueRename(old, new):
3529
  """Renames a job queue file.
3530

3531
  This is just a wrapper over os.rename with proper checking.
3532

3533
  @type old: str
3534
  @param old: the old (actual) file name
3535
  @type new: str
3536
  @param new: the desired file name
3537
  @rtype: tuple
3538
  @return: the success of the operation and payload
3539

3540
  """
3541
  old = vcluster.LocalizeVirtualPath(old)
3542
  new = vcluster.LocalizeVirtualPath(new)
3543

    
3544
  _EnsureJobQueueFile(old)
3545
  _EnsureJobQueueFile(new)
3546

    
3547
  getents = runtime.GetEnts()
3548

    
3549
  utils.RenameFile(old, new, mkdir=True, mkdir_mode=0750,
3550
                   dir_uid=getents.masterd_uid, dir_gid=getents.daemons_gid)
3551

    
3552

    
3553
def BlockdevClose(instance_name, disks):
3554
  """Closes the given block devices.
3555

3556
  This means they will be switched to secondary mode (in case of
3557
  DRBD).
3558

3559
  @param instance_name: if the argument is not empty, the symlinks
3560
      of this instance will be removed
3561
  @type disks: list of L{objects.Disk}
3562
  @param disks: the list of disks to be closed
3563
  @rtype: tuple (success, message)
3564
  @return: a tuple of success and message, where success
3565
      indicates the succes of the operation, and message
3566
      which will contain the error details in case we
3567
      failed
3568

3569
  """
3570
  bdevs = []
3571
  for cf in disks:
3572
    rd = _RecursiveFindBD(cf)
3573
    if rd is None:
3574
      _Fail("Can't find device %s", cf)
3575
    bdevs.append(rd)
3576

    
3577
  msg = []
3578
  for rd in bdevs:
3579
    try:
3580
      rd.Close()
3581
    except errors.BlockDeviceError, err:
3582
      msg.append(str(err))
3583
  if msg:
3584
    _Fail("Can't make devices secondary: %s", ",".join(msg))
3585
  else:
3586
    if instance_name:
3587
      _RemoveBlockDevLinks(instance_name, disks)
3588

    
3589

    
3590
def ValidateHVParams(hvname, hvparams):
3591
  """Validates the given hypervisor parameters.
3592

3593
  @type hvname: string
3594
  @param hvname: the hypervisor name
3595
  @type hvparams: dict
3596
  @param hvparams: the hypervisor parameters to be validated
3597
  @rtype: None
3598

3599
  """
3600
  try:
3601
    hv_type = hypervisor.GetHypervisor(hvname)
3602
    hv_type.ValidateParameters(hvparams)
3603
  except errors.HypervisorError, err:
3604
    _Fail(str(err), log=False)
3605

    
3606

    
3607
def _CheckOSPList(os_obj, parameters):
3608
  """Check whether a list of parameters is supported by the OS.
3609

3610
  @type os_obj: L{objects.OS}
3611
  @param os_obj: OS object to check
3612
  @type parameters: list
3613
  @param parameters: the list of parameters to check
3614

3615
  """
3616
  supported = [v[0] for v in os_obj.supported_parameters]
3617
  delta = frozenset(parameters).difference(supported)
3618
  if delta:
3619
    _Fail("The following parameters are not supported"
3620
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
3621

    
3622

    
3623
def ValidateOS(required, osname, checks, osparams):
3624
  """Validate the given OS' parameters.
3625

3626
  @type required: boolean
3627
  @param required: whether absence of the OS should translate into
3628
      failure or not
3629
  @type osname: string
3630
  @param osname: the OS to be validated
3631
  @type checks: list
3632
  @param checks: list of the checks to run (currently only 'parameters')
3633
  @type osparams: dict
3634
  @param osparams: dictionary with OS parameters, some of which may be
3635
                   private.
3636
  @rtype: boolean
3637
  @return: True if the validation passed, or False if the OS was not
3638
      found and L{required} was false
3639

3640
  """
3641
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
3642
    _Fail("Unknown checks required for OS %s: %s", osname,
3643
          set(checks).difference(constants.OS_VALIDATE_CALLS))
3644

    
3645
  name_only = objects.OS.GetName(osname)
3646
  status, tbv = _TryOSFromDisk(name_only, None)
3647

    
3648
  if not status:
3649
    if required:
3650
      _Fail(tbv)
3651
    else:
3652
      return False
3653

    
3654
  if max(tbv.api_versions) < constants.OS_API_V20:
3655
    return True
3656

    
3657
  if constants.OS_VALIDATE_PARAMETERS in checks:
3658
    _CheckOSPList(tbv, osparams.keys())
3659

    
3660
  validate_env = OSCoreEnv(osname, tbv, osparams)
3661
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
3662
                        cwd=tbv.path, reset_env=True)
3663
  if result.failed:
3664
    logging.error("os validate command '%s' returned error: %s output: %s",
3665
                  result.cmd, result.fail_reason, result.output)
3666
    _Fail("OS validation script failed (%s), output: %s",
3667
          result.fail_reason, result.output, log=False)
3668

    
3669
  return True
3670

    
3671

    
3672
def DemoteFromMC():
3673
  """Demotes the current node from master candidate role.
3674

3675
  """
3676
  # try to ensure we're not the master by mistake
3677
  master, myself = ssconf.GetMasterAndMyself()
3678
  if master == myself:
3679
    _Fail("ssconf status shows I'm the master node, will not demote")
3680

    
3681
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "check", constants.MASTERD])
3682
  if not result.failed:
3683
    _Fail("The master daemon is running, will not demote")
3684

    
3685
  try:
3686
    if os.path.isfile(pathutils.CLUSTER_CONF_FILE):
3687
      utils.CreateBackup(pathutils.CLUSTER_CONF_FILE)
3688
  except EnvironmentError, err:
3689
    if err.errno != errno.ENOENT:
3690
      _Fail("Error while backing up cluster file: %s", err, exc=True)
3691

    
3692
  utils.RemoveFile(pathutils.CLUSTER_CONF_FILE)
3693

    
3694

    
3695
def _GetX509Filenames(cryptodir, name):
3696
  """Returns the full paths for the private key and certificate.
3697

3698
  """
3699
  return (utils.PathJoin(cryptodir, name),
3700
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
3701
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
3702

    
3703

    
3704
def CreateX509Certificate(validity, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3705
  """Creates a new X509 certificate for SSL/TLS.
3706

3707
  @type validity: int
3708
  @param validity: Validity in seconds
3709
  @rtype: tuple; (string, string)
3710
  @return: Certificate name and public part
3711

3712
  """
3713
  (key_pem, cert_pem) = \
3714
    utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
3715
                                     min(validity, _MAX_SSL_CERT_VALIDITY), 1)
3716

    
3717
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
3718
                              prefix="x509-%s-" % utils.TimestampForFilename())
3719
  try:
3720
    name = os.path.basename(cert_dir)
3721
    assert len(name) > 5
3722

    
3723
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3724

    
3725
    utils.WriteFile(key_file, mode=0400, data=key_pem)
3726
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
3727

    
3728
    # Never return private key as it shouldn't leave the node
3729
    return (name, cert_pem)
3730
  except Exception:
3731
    shutil.rmtree(cert_dir, ignore_errors=True)
3732
    raise
3733

    
3734

    
3735
def RemoveX509Certificate(name, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3736
  """Removes a X509 certificate.
3737

3738
  @type name: string
3739
  @param name: Certificate name
3740

3741
  """
3742
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3743

    
3744
  utils.RemoveFile(key_file)
3745
  utils.RemoveFile(cert_file)
3746

    
3747
  try:
3748
    os.rmdir(cert_dir)
3749
  except EnvironmentError, err:
3750
    _Fail("Cannot remove certificate directory '%s': %s",
3751
          cert_dir, err)
3752

    
3753

    
3754
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3755
  """Returns the command for the requested input/output.
3756

3757
  @type instance: L{objects.Instance}
3758
  @param instance: The instance object
3759
  @param mode: Import/export mode
3760
  @param ieio: Input/output type
3761
  @param ieargs: Input/output arguments
3762

3763
  """
3764
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3765

    
3766
  env = None
3767
  prefix = None
3768
  suffix = None
3769
  exp_size = None
3770

    
3771
  if ieio == constants.IEIO_FILE:
3772
    (filename, ) = ieargs
3773

    
3774
    if not utils.IsNormAbsPath(filename):
3775
      _Fail("Path '%s' is not normalized or absolute", filename)
3776

    
3777
    real_filename = os.path.realpath(filename)
3778
    directory = os.path.dirname(real_filename)
3779

    
3780
    if not utils.IsBelowDir(pathutils.EXPORT_DIR, real_filename):
3781
      _Fail("File '%s' is not under exports directory '%s': %s",
3782
            filename, pathutils.EXPORT_DIR, real_filename)
3783

    
3784
    # Create directory
3785
    utils.Makedirs(directory, mode=0750)
3786

    
3787
    quoted_filename = utils.ShellQuote(filename)
3788

    
3789
    if mode == constants.IEM_IMPORT:
3790
      suffix = "> %s" % quoted_filename
3791
    elif mode == constants.IEM_EXPORT:
3792
      suffix = "< %s" % quoted_filename
3793

    
3794
      # Retrieve file size
3795
      try:
3796
        st = os.stat(filename)
3797
      except EnvironmentError, err:
3798
        logging.error("Can't stat(2) %s: %s", filename, err)
3799
      else:
3800
        exp_size = utils.BytesToMebibyte(st.st_size)
3801

    
3802
  elif ieio == constants.IEIO_RAW_DISK:
3803
    (disk, ) = ieargs
3804

    
3805
    real_disk = _OpenRealBD(disk)
3806

    
3807
    if mode == constants.IEM_IMPORT:
3808
      # we use nocreat to fail if the device is not already there or we pass a
3809
      # wrong path; we use notrunc to no attempt truncate on an LV device
3810
      suffix = utils.BuildShellCmd("| dd of=%s conv=nocreat,notrunc bs=%s",
3811
                                   real_disk.dev_path,
3812
                                   str(1024 * 1024)) # 1 MB
3813

    
3814
    elif mode == constants.IEM_EXPORT:
3815
      # the block size on the read dd is 1MiB to match our units
3816
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3817
                                   real_disk.dev_path,
3818
                                   str(1024 * 1024), # 1 MB
3819
                                   str(disk.size))
3820
      exp_size = disk.size
3821

    
3822
  elif ieio == constants.IEIO_SCRIPT:
3823
    (disk, disk_index, ) = ieargs
3824

    
3825
    assert isinstance(disk_index, (int, long))
3826

    
3827
    inst_os = OSFromDisk(instance.os)
3828
    env = OSEnvironment(instance, inst_os)
3829

    
3830
    if mode == constants.IEM_IMPORT:
3831
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3832
      env["IMPORT_INDEX"] = str(disk_index)
3833
      script = inst_os.import_script
3834

    
3835
    elif mode == constants.IEM_EXPORT:
3836
      real_disk = _OpenRealBD(disk)
3837
      env["EXPORT_DEVICE"] = real_disk.dev_path
3838
      env["EXPORT_INDEX"] = str(disk_index)
3839
      script = inst_os.export_script
3840

    
3841
    # TODO: Pass special environment only to script
3842
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3843

    
3844
    if mode == constants.IEM_IMPORT:
3845
      suffix = "| %s" % script_cmd
3846

    
3847
    elif mode == constants.IEM_EXPORT:
3848
      prefix = "%s |" % script_cmd
3849

    
3850
    # Let script predict size
3851
    exp_size = constants.IE_CUSTOM_SIZE
3852

    
3853
  else:
3854
    _Fail("Invalid %s I/O mode %r", mode, ieio)
3855

    
3856
  return (env, prefix, suffix, exp_size)
3857

    
3858

    
3859
def _CreateImportExportStatusDir(prefix):
3860
  """Creates status directory for import/export.
3861

3862
  """
3863
  return tempfile.mkdtemp(dir=pathutils.IMPORT_EXPORT_DIR,
3864
                          prefix=("%s-%s-" %
3865
                                  (prefix, utils.TimestampForFilename())))
3866

    
3867

    
3868
def StartImportExportDaemon(mode, opts, host, port, instance, component,
3869
                            ieio, ieioargs):
3870
  """Starts an import or export daemon.
3871

3872
  @param mode: Import/output mode
3873
  @type opts: L{objects.ImportExportOptions}
3874
  @param opts: Daemon options
3875
  @type host: string
3876
  @param host: Remote host for export (None for import)
3877
  @type port: int
3878
  @param port: Remote port for export (None for import)
3879
  @type instance: L{objects.Instance}
3880
  @param instance: Instance object
3881
  @type component: string
3882
  @param component: which part of the instance is transferred now,
3883
      e.g. 'disk/0'
3884
  @param ieio: Input/output type
3885
  @param ieioargs: Input/output arguments
3886

3887
  """
3888
  if mode == constants.IEM_IMPORT:
3889
    prefix = "import"
3890

    
3891
    if not (host is None and port is None):
3892
      _Fail("Can not specify host or port on import")
3893

    
3894
  elif mode == constants.IEM_EXPORT:
3895
    prefix = "export"
3896

    
3897
    if host is None or port is None:
3898
      _Fail("Host and port must be specified for an export")
3899

    
3900
  else:
3901
    _Fail("Invalid mode %r", mode)
3902

    
3903
  if (opts.key_name is None) ^ (opts.ca_pem is None):
3904
    _Fail("Cluster certificate can only be used for both key and CA")
3905

    
3906
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3907
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3908

    
3909
  if opts.key_name is None:
3910
    # Use server.pem
3911
    key_path = pathutils.NODED_CERT_FILE
3912
    cert_path = pathutils.NODED_CERT_FILE
3913
    assert opts.ca_pem is None
3914
  else:
3915
    (_, key_path, cert_path) = _GetX509Filenames(pathutils.CRYPTO_KEYS_DIR,
3916
                                                 opts.key_name)
3917
    assert opts.ca_pem is not None
3918

    
3919
  for i in [key_path, cert_path]:
3920
    if not os.path.exists(i):
3921
      _Fail("File '%s' does not exist" % i)
3922

    
3923
  status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3924
  try:
3925
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3926
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3927
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3928

    
3929
    if opts.ca_pem is None:
3930
      # Use server.pem
3931
      ca = utils.ReadFile(pathutils.NODED_CERT_FILE)
3932
    else:
3933
      ca = opts.ca_pem
3934

    
3935
    # Write CA file
3936
    utils.WriteFile(ca_file, data=ca, mode=0400)
3937

    
3938
    cmd = [
3939
      pathutils.IMPORT_EXPORT_DAEMON,
3940
      status_file, mode,
3941
      "--key=%s" % key_path,
3942
      "--cert=%s" % cert_path,
3943
      "--ca=%s" % ca_file,
3944
      ]
3945

    
3946
    if host:
3947
      cmd.append("--host=%s" % host)
3948

    
3949
    if port:
3950
      cmd.append("--port=%s" % port)
3951

    
3952
    if opts.ipv6:
3953
      cmd.append("--ipv6")
3954
    else:
3955
      cmd.append("--ipv4")
3956

    
3957
    if opts.compress:
3958
      cmd.append("--compress=%s" % opts.compress)
3959

    
3960
    if opts.magic:
3961
      cmd.append("--magic=%s" % opts.magic)
3962

    
3963
    if exp_size is not None:
3964
      cmd.append("--expected-size=%s" % exp_size)
3965

    
3966
    if cmd_prefix:
3967
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
3968

    
3969
    if cmd_suffix:
3970
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
3971

    
3972
    if mode == constants.IEM_EXPORT:
3973
      # Retry connection a few times when connecting to remote peer
3974
      cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3975
      cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3976
    elif opts.connect_timeout is not None:
3977
      assert mode == constants.IEM_IMPORT
3978
      # Overall timeout for establishing connection while listening
3979
      cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3980

    
3981
    logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3982

    
3983
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3984
    # support for receiving a file descriptor for output
3985
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3986
                      output=logfile)
3987

    
3988
    # The import/export name is simply the status directory name
3989
    return os.path.basename(status_dir)
3990

    
3991
  except Exception:
3992
    shutil.rmtree(status_dir, ignore_errors=True)
3993
    raise
3994

    
3995

    
3996
def GetImportExportStatus(names):
3997
  """Returns import/export daemon status.
3998

3999
  @type names: sequence
4000
  @param names: List of names
4001
  @rtype: List of dicts
4002
  @return: Returns a list of the state of each named import/export or None if a
4003
           status couldn't be read
4004

4005
  """
4006
  result = []
4007

    
4008
  for name in names:
4009
    status_file = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name,
4010
                                 _IES_STATUS_FILE)
4011

    
4012
    try:
4013
      data = utils.ReadFile(status_file)
4014
    except EnvironmentError, err:
4015
      if err.errno != errno.ENOENT:
4016
        raise
4017
      data = None
4018

    
4019
    if not data:
4020
      result.append(None)
4021
      continue
4022

    
4023
    result.append(serializer.LoadJson(data))
4024

    
4025
  return result
4026

    
4027

    
4028
def AbortImportExport(name):
4029
  """Sends SIGTERM to a running import/export daemon.
4030

4031
  """
4032
  logging.info("Abort import/export %s", name)
4033

    
4034
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
4035
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
4036

    
4037
  if pid:
4038
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
4039
                 name, pid)
4040
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
4041

    
4042

    
4043
def CleanupImportExport(name):
4044
  """Cleanup after an import or export.
4045

4046
  If the import/export daemon is still running it's killed. Afterwards the
4047
  whole status directory is removed.
4048

4049
  """
4050
  logging.info("Finalizing import/export %s", name)
4051

    
4052
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
4053

    
4054
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
4055

    
4056
  if pid:
4057
    logging.info("Import/export %s is still running with PID %s",
4058
                 name, pid)
4059
    utils.KillProcess(pid, waitpid=False)
4060

    
4061
  shutil.rmtree(status_dir, ignore_errors=True)
4062

    
4063

    
4064
def _FindDisks(disks):
4065
  """Finds attached L{BlockDev}s for the given disks.
4066

4067
  @type disks: list of L{objects.Disk}
4068
  @param disks: the disk objects we need to find
4069

4070
  @return: list of L{BlockDev} objects or C{None} if a given disk
4071
           was not found or was no attached.
4072

4073
  """
4074
  bdevs = []
4075

    
4076
  for disk in disks:
4077
    rd = _RecursiveFindBD(disk)
4078
    if rd is None:
4079
      _Fail("Can't find device %s", disk)
4080
    bdevs.append(rd)
4081
  return bdevs
4082

    
4083

    
4084
def DrbdDisconnectNet(disks):
4085
  """Disconnects the network on a list of drbd devices.
4086

4087
  """
4088
  bdevs = _FindDisks(disks)
4089

    
4090
  # disconnect disks
4091
  for rd in bdevs:
4092
    try:
4093
      rd.DisconnectNet()
4094
    except errors.BlockDeviceError, err:
4095
      _Fail("Can't change network configuration to standalone mode: %s",
4096
            err, exc=True)
4097

    
4098

    
4099
def DrbdAttachNet(disks, instance_name, multimaster):
4100
  """Attaches the network on a list of drbd devices.
4101

4102
  """
4103
  bdevs = _FindDisks(disks)
4104

    
4105
  if multimaster:
4106
    for idx, rd in enumerate(bdevs):
4107
      try:
4108
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
4109
      except EnvironmentError, err:
4110
        _Fail("Can't create symlink: %s", err)
4111
  # reconnect disks, switch to new master configuration and if
4112
  # needed primary mode
4113
  for rd in bdevs:
4114
    try:
4115
      rd.AttachNet(multimaster)
4116
    except errors.BlockDeviceError, err:
4117
      _Fail("Can't change network configuration: %s", err)
4118

    
4119
  # wait until the disks are connected; we need to retry the re-attach
4120
  # if the device becomes standalone, as this might happen if the one
4121
  # node disconnects and reconnects in a different mode before the
4122
  # other node reconnects; in this case, one or both of the nodes will
4123
  # decide it has wrong configuration and switch to standalone
4124

    
4125
  def _Attach():
4126
    all_connected = True
4127

    
4128
    for rd in bdevs:
4129
      stats = rd.GetProcStatus()
4130

    
4131
      if multimaster:
4132
        # In the multimaster case we have to wait explicitly until
4133
        # the resource is Connected and UpToDate/UpToDate, because
4134
        # we promote *both nodes* to primary directly afterwards.
4135
        # Being in resync is not enough, since there is a race during which we
4136
        # may promote a node with an Outdated disk to primary, effectively
4137
        # tearing down the connection.
4138
        all_connected = (all_connected and
4139
                         stats.is_connected and
4140
                         stats.is_disk_uptodate and
4141
                         stats.peer_disk_uptodate)
4142
      else:
4143
        all_connected = (all_connected and
4144
                         (stats.is_connected or stats.is_in_resync))
4145

    
4146
      if stats.is_standalone:
4147
        # peer had different config info and this node became
4148
        # standalone, even though this should not happen with the
4149
        # new staged way of changing disk configs
4150
        try:
4151
          rd.AttachNet(multimaster)
4152
        except errors.BlockDeviceError, err:
4153
          _Fail("Can't change network configuration: %s", err)
4154

    
4155
    if not all_connected:
4156
      raise utils.RetryAgain()
4157

    
4158
  try:
4159
    # Start with a delay of 100 miliseconds and go up to 5 seconds
4160
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
4161
  except utils.RetryTimeout:
4162
    _Fail("Timeout in disk reconnecting")
4163

    
4164
  if multimaster:
4165
    # change to primary mode
4166
    for rd in bdevs:
4167
      try:
4168
        rd.Open()
4169
      except errors.BlockDeviceError, err:
4170
        _Fail("Can't change to primary mode: %s", err)
4171

    
4172

    
4173
def DrbdWaitSync(disks):
4174
  """Wait until DRBDs have synchronized.
4175

4176
  """
4177
  def _helper(rd):
4178
    stats = rd.GetProcStatus()
4179
    if not (stats.is_connected or stats.is_in_resync):
4180
      raise utils.RetryAgain()
4181
    return stats
4182

    
4183
  bdevs = _FindDisks(disks)
4184

    
4185
  min_resync = 100
4186
  alldone = True
4187
  for rd in bdevs:
4188
    try:
4189
      # poll each second for 15 seconds
4190
      stats = utils.Retry(_helper, 1, 15, args=[rd])
4191
    except utils.RetryTimeout:
4192
      stats = rd.GetProcStatus()
4193
      # last check
4194
      if not (stats.is_connected or stats.is_in_resync):
4195
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
4196
    alldone = alldone and (not stats.is_in_resync)
4197
    if stats.sync_percent is not None:
4198
      min_resync = min(min_resync, stats.sync_percent)
4199

    
4200
  return (alldone, min_resync)
4201

    
4202

    
4203
def DrbdNeedsActivation(disks):
4204
  """Checks which of the passed disks needs activation and returns their UUIDs.
4205

4206
  """
4207
  faulty_disks = []
4208

    
4209
  for disk in disks:
4210
    rd = _RecursiveFindBD(disk)
4211
    if rd is None:
4212
      faulty_disks.append(disk)
4213
      continue
4214

    
4215
    stats = rd.GetProcStatus()
4216
    if stats.is_standalone or stats.is_diskless:
4217
      faulty_disks.append(disk)
4218

    
4219
  return [disk.uuid for disk in faulty_disks]
4220

    
4221

    
4222
def GetDrbdUsermodeHelper():
4223
  """Returns DRBD usermode helper currently configured.
4224

4225
  """
4226
  try:
4227
    return drbd.DRBD8.GetUsermodeHelper()
4228
  except errors.BlockDeviceError, err:
4229
    _Fail(str(err))
4230

    
4231

    
4232
def PowercycleNode(hypervisor_type, hvparams=None):
4233
  """Hard-powercycle the node.
4234

4235
  Because we need to return first, and schedule the powercycle in the
4236
  background, we won't be able to report failures nicely.
4237

4238
  """
4239
  hyper = hypervisor.GetHypervisor(hypervisor_type)
4240
  try:
4241
    pid = os.fork()
4242
  except OSError:
4243
    # if we can't fork, we'll pretend that we're in the child process
4244
    pid = 0
4245
  if pid > 0:
4246
    return "Reboot scheduled in 5 seconds"
4247
  # ensure the child is running on ram
4248
  try:
4249
    utils.Mlockall()
4250
  except Exception: # pylint: disable=W0703
4251
    pass
4252
  time.sleep(5)
4253
  hyper.PowercycleNode(hvparams=hvparams)
4254

    
4255

    
4256
def _VerifyRestrictedCmdName(cmd):
4257
  """Verifies a restricted command name.
4258

4259
  @type cmd: string
4260
  @param cmd: Command name
4261
  @rtype: tuple; (boolean, string or None)
4262
  @return: The tuple's first element is the status; if C{False}, the second
4263
    element is an error message string, otherwise it's C{None}
4264

4265
  """
4266
  if not cmd.strip():
4267
    return (False, "Missing command name")
4268

    
4269
  if os.path.basename(cmd) != cmd:
4270
    return (False, "Invalid command name")
4271

    
4272
  if not constants.EXT_PLUGIN_MASK.match(cmd):
4273
    return (False, "Command name contains forbidden characters")
4274

    
4275
  return (True, None)
4276

    
4277

    
4278
def _CommonRestrictedCmdCheck(path, owner):
4279
  """Common checks for restricted command file system directories and files.
4280

4281
  @type path: string
4282
  @param path: Path to check
4283
  @param owner: C{None} or tuple containing UID and GID
4284
  @rtype: tuple; (boolean, string or C{os.stat} result)
4285
  @return: The tuple's first element is the status; if C{False}, the second
4286
    element is an error message string, otherwise it's the result of C{os.stat}
4287

4288
  """
4289
  if owner is None:
4290
    # Default to root as owner
4291
    owner = (0, 0)
4292

    
4293
  try:
4294
    st = os.stat(path)
4295
  except EnvironmentError, err:
4296
    return (False, "Can't stat(2) '%s': %s" % (path, err))
4297

    
4298
  if stat.S_IMODE(st.st_mode) & (~_RCMD_MAX_MODE):
4299
    return (False, "Permissions on '%s' are too permissive" % path)
4300

    
4301
  if (st.st_uid, st.st_gid) != owner:
4302
    (owner_uid, owner_gid) = owner
4303
    return (False, "'%s' is not owned by %s:%s" % (path, owner_uid, owner_gid))
4304

    
4305
  return (True, st)
4306

    
4307

    
4308
def _VerifyRestrictedCmdDirectory(path, _owner=None):
4309
  """Verifies restricted command directory.
4310

4311
  @type path: string
4312
  @param path: Path to check
4313
  @rtype: tuple; (boolean, string or None)
4314
  @return: The tuple's first element is the status; if C{False}, the second
4315
    element is an error message string, otherwise it's C{None}
4316

4317
  """
4318
  (status, value) = _CommonRestrictedCmdCheck(path, _owner)
4319

    
4320
  if not status:
4321
    return (False, value)
4322

    
4323
  if not stat.S_ISDIR(value.st_mode):
4324
    return (False, "Path '%s' is not a directory" % path)
4325

    
4326
  return (True, None)
4327

    
4328

    
4329
def _VerifyRestrictedCmd(path, cmd, _owner=None):
4330
  """Verifies a whole restricted command and returns its executable filename.
4331

4332
  @type path: string
4333
  @param path: Directory containing restricted commands
4334
  @type cmd: string
4335
  @param cmd: Command name
4336
  @rtype: tuple; (boolean, string)
4337
  @return: The tuple's first element is the status; if C{False}, the second
4338
    element is an error message string, otherwise the second element is the
4339
    absolute path to the executable
4340

4341
  """
4342
  executable = utils.PathJoin(path, cmd)
4343

    
4344
  (status, msg) = _CommonRestrictedCmdCheck(executable, _owner)
4345

    
4346
  if not status:
4347
    return (False, msg)
4348

    
4349
  if not utils.IsExecutable(executable):
4350
    return (False, "access(2) thinks '%s' can't be executed" % executable)
4351

    
4352
  return (True, executable)
4353

    
4354

    
4355
def _PrepareRestrictedCmd(path, cmd,
4356
                          _verify_dir=_VerifyRestrictedCmdDirectory,
4357
                          _verify_name=_VerifyRestrictedCmdName,
4358
                          _verify_cmd=_VerifyRestrictedCmd):
4359
  """Performs a number of tests on a restricted command.
4360

4361
  @type path: string
4362
  @param path: Directory containing restricted commands
4363
  @type cmd: string
4364
  @param cmd: Command name
4365
  @return: Same as L{_VerifyRestrictedCmd}
4366

4367
  """
4368
  # Verify the directory first
4369
  (status, msg) = _verify_dir(path)
4370
  if status:
4371
    # Check command if everything was alright
4372
    (status, msg) = _verify_name(cmd)
4373

    
4374
  if not status:
4375
    return (False, msg)
4376

    
4377
  # Check actual executable
4378
  return _verify_cmd(path, cmd)
4379

    
4380

    
4381
def RunRestrictedCmd(cmd,
4382
                     _lock_timeout=_RCMD_LOCK_TIMEOUT,
4383
                     _lock_file=pathutils.RESTRICTED_COMMANDS_LOCK_FILE,
4384
                     _path=pathutils.RESTRICTED_COMMANDS_DIR,
4385
                     _sleep_fn=time.sleep,
4386
                     _prepare_fn=_PrepareRestrictedCmd,
4387
                     _runcmd_fn=utils.RunCmd,
4388
                     _enabled=constants.ENABLE_RESTRICTED_COMMANDS):
4389
  """Executes a restricted command after performing strict tests.
4390

4391
  @type cmd: string
4392
  @param cmd: Command name
4393
  @rtype: string
4394
  @return: Command output
4395
  @raise RPCFail: In case of an error
4396

4397
  """
4398
  logging.info("Preparing to run restricted command '%s'", cmd)
4399

    
4400
  if not _enabled:
4401
    _Fail("Restricted commands disabled at configure time")
4402

    
4403
  lock = None
4404
  try:
4405
    cmdresult = None
4406
    try:
4407
      lock = utils.FileLock.Open(_lock_file)
4408
      lock.Exclusive(blocking=True, timeout=_lock_timeout)
4409

    
4410
      (status, value) = _prepare_fn(_path, cmd)
4411

    
4412
      if status:
4413
        cmdresult = _runcmd_fn([value], env={}, reset_env=True,
4414
                               postfork_fn=lambda _: lock.Unlock())
4415
      else:
4416
        logging.error(value)
4417
    except Exception: # pylint: disable=W0703
4418
      # Keep original error in log
4419
      logging.exception("Caught exception")
4420

    
4421
    if cmdresult is None:
4422
      logging.info("Sleeping for %0.1f seconds before returning",
4423
                   _RCMD_INVALID_DELAY)
4424
      _sleep_fn(_RCMD_INVALID_DELAY)
4425

    
4426
      # Do not include original error message in returned error
4427
      _Fail("Executing command '%s' failed" % cmd)
4428
    elif cmdresult.failed or cmdresult.fail_reason:
4429
      _Fail("Restricted command '%s' failed: %s; output: %s",
4430
            cmd, cmdresult.fail_reason, cmdresult.output)
4431
    else:
4432
      return cmdresult.output
4433
  finally:
4434
    if lock is not None:
4435
      # Release lock at last
4436
      lock.Close()
4437
      lock = None
4438

    
4439

    
4440
def SetWatcherPause(until, _filename=pathutils.WATCHER_PAUSEFILE):
4441
  """Creates or removes the watcher pause file.
4442

4443
  @type until: None or number
4444
  @param until: Unix timestamp saying until when the watcher shouldn't run
4445

4446
  """
4447
  if until is None:
4448
    logging.info("Received request to no longer pause watcher")
4449
    utils.RemoveFile(_filename)
4450
  else:
4451
    logging.info("Received request to pause watcher until %s", until)
4452

    
4453
    if not ht.TNumber(until):
4454
      _Fail("Duration must be numeric")
4455

    
4456
    utils.WriteFile(_filename, data="%d\n" % (until, ), mode=0644)
4457

    
4458

    
4459
def ConfigureOVS(ovs_name, ovs_link):
4460
  """Creates a OpenvSwitch on the node.
4461

4462
  This function sets up a OpenvSwitch on the node with given name nad
4463
  connects it via a given eth device.
4464

4465
  @type ovs_name: string
4466
  @param ovs_name: Name of the OpenvSwitch to create.
4467
  @type ovs_link: None or string
4468
  @param ovs_link: Ethernet device for outside connection (can be missing)
4469

4470
  """
4471
  # Initialize the OpenvSwitch
4472
  result = utils.RunCmd(["ovs-vsctl", "add-br", ovs_name])
4473
  if result.failed:
4474
    _Fail("Failed to create openvswitch. Script return value: %s, output: '%s'"
4475
          % (result.exit_code, result.output), log=True)
4476

    
4477
  # And connect it to a physical interface, if given
4478
  if ovs_link:
4479
    result = utils.RunCmd(["ovs-vsctl", "add-port", ovs_name, ovs_link])
4480
    if result.failed:
4481
      _Fail("Failed to connect openvswitch to  interface %s. Script return"
4482
            " value: %s, output: '%s'" % (ovs_link, result.exit_code,
4483
            result.output), log=True)
4484

    
4485

    
4486
class HooksRunner(object):
4487
  """Hook runner.
4488

4489
  This class is instantiated on the node side (ganeti-noded) and not
4490
  on the master side.
4491

4492
  """
4493
  def __init__(self, hooks_base_dir=None):
4494
    """Constructor for hooks runner.
4495

4496
    @type hooks_base_dir: str or None
4497
    @param hooks_base_dir: if not None, this overrides the
4498
        L{pathutils.HOOKS_BASE_DIR} (useful for unittests)
4499

4500
    """
4501
    if hooks_base_dir is None:
4502
      hooks_base_dir = pathutils.HOOKS_BASE_DIR
4503
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
4504
    # constant
4505
    self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
4506

    
4507
  def RunLocalHooks(self, node_list, hpath, phase, env):
4508
    """Check that the hooks will be run only locally and then run them.
4509

4510
    """
4511
    assert len(node_list) == 1
4512
    node = node_list[0]
4513
    _, myself = ssconf.GetMasterAndMyself()
4514
    assert node == myself
4515

    
4516
    results = self.RunHooks(hpath, phase, env)
4517

    
4518
    # Return values in the form expected by HooksMaster
4519
    return {node: (None, False, results)}
4520

    
4521
  def RunHooks(self, hpath, phase, env):
4522
    """Run the scripts in the hooks directory.
4523

4524
    @type hpath: str
4525
    @param hpath: the path to the hooks directory which
4526
        holds the scripts
4527
    @type phase: str
4528
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
4529
        L{constants.HOOKS_PHASE_POST}
4530
    @type env: dict
4531
    @param env: dictionary with the environment for the hook
4532
    @rtype: list
4533
    @return: list of 3-element tuples:
4534
      - script path
4535
      - script result, either L{constants.HKR_SUCCESS} or
4536
        L{constants.HKR_FAIL}
4537
      - output of the script
4538

4539
    @raise errors.ProgrammerError: for invalid input
4540
        parameters
4541

4542
    """
4543
    if phase == constants.HOOKS_PHASE_PRE:
4544
      suffix = "pre"
4545
    elif phase == constants.HOOKS_PHASE_POST:
4546
      suffix = "post"
4547
    else:
4548
      _Fail("Unknown hooks phase '%s'", phase)
4549

    
4550
    subdir = "%s-%s.d" % (hpath, suffix)
4551
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
4552

    
4553
    results = []
4554

    
4555
    if not os.path.isdir(dir_name):
4556
      # for non-existing/non-dirs, we simply exit instead of logging a
4557
      # warning at every operation
4558
      return results
4559

    
4560
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
4561

    
4562
    for (relname, relstatus, runresult) in runparts_results:
4563
      if relstatus == constants.RUNPARTS_SKIP:
4564
        rrval = constants.HKR_SKIP
4565
        output = ""
4566
      elif relstatus == constants.RUNPARTS_ERR:
4567
        rrval = constants.HKR_FAIL
4568
        output = "Hook script execution error: %s" % runresult
4569
      elif relstatus == constants.RUNPARTS_RUN:
4570
        if runresult.failed:
4571
          rrval = constants.HKR_FAIL
4572
        else:
4573
          rrval = constants.HKR_SUCCESS
4574
        output = utils.SafeEncode(runresult.output.strip())
4575
      results.append(("%s/%s" % (subdir, relname), rrval, output))
4576

    
4577
    return results
4578

    
4579

    
4580
class IAllocatorRunner(object):
4581
  """IAllocator runner.
4582

4583
  This class is instantiated on the node side (ganeti-noded) and not on
4584
  the master side.
4585

4586
  """
4587
  @staticmethod
4588
  def Run(name, idata, ial_params):
4589
    """Run an iallocator script.
4590

4591
    @type name: str
4592
    @param name: the iallocator script name
4593
    @type idata: str
4594
    @param idata: the allocator input data
4595
    @type ial_params: list
4596
    @param ial_params: the iallocator parameters
4597

4598
    @rtype: tuple
4599
    @return: two element tuple of:
4600
       - status
4601
       - either error message or stdout of allocator (for success)
4602

4603
    """
4604
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
4605
                                  os.path.isfile)
4606
    if alloc_script is None:
4607
      _Fail("iallocator module '%s' not found in the search path", name)
4608

    
4609
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
4610
    try:
4611
      os.write(fd, idata)
4612
      os.close(fd)
4613
      result = utils.RunCmd([alloc_script, fin_name] + ial_params)
4614
      if result.failed:
4615
        _Fail("iallocator module '%s' failed: %s, output '%s'",
4616
              name, result.fail_reason, result.output)
4617
    finally:
4618
      os.unlink(fin_name)
4619

    
4620
    return result.stdout
4621

    
4622

    
4623
class DevCacheManager(object):
4624
  """Simple class for managing a cache of block device information.
4625

4626
  """
4627
  _DEV_PREFIX = "/dev/"
4628
  _ROOT_DIR = pathutils.BDEV_CACHE_DIR
4629

    
4630
  @classmethod
4631
  def _ConvertPath(cls, dev_path):
4632
    """Converts a /dev/name path to the cache file name.
4633

4634
    This replaces slashes with underscores and strips the /dev
4635
    prefix. It then returns the full path to the cache file.
4636

4637
    @type dev_path: str
4638
    @param dev_path: the C{/dev/} path name
4639
    @rtype: str
4640
    @return: the converted path name
4641

4642
    """
4643
    if dev_path.startswith(cls._DEV_PREFIX):
4644
      dev_path = dev_path[len(cls._DEV_PREFIX):]
4645
    dev_path = dev_path.replace("/", "_")
4646
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
4647
    return fpath
4648

    
4649
  @classmethod
4650
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
4651
    """Updates the cache information for a given device.
4652

4653
    @type dev_path: str
4654
    @param dev_path: the pathname of the device
4655
    @type owner: str
4656
    @param owner: the owner (instance name) of the device
4657
    @type on_primary: bool
4658
    @param on_primary: whether this is the primary
4659
        node nor not
4660
    @type iv_name: str
4661
    @param iv_name: the instance-visible name of the
4662
        device, as in objects.Disk.iv_name
4663

4664
    @rtype: None
4665

4666
    """
4667
    if dev_path is None:
4668
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
4669
      return
4670
    fpath = cls._ConvertPath(dev_path)
4671
    if on_primary:
4672
      state = "primary"
4673
    else:
4674
      state = "secondary"
4675
    if iv_name is None:
4676
      iv_name = "not_visible"
4677
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
4678
    try:
4679
      utils.WriteFile(fpath, data=fdata)
4680
    except EnvironmentError, err:
4681
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
4682

    
4683
  @classmethod
4684
  def RemoveCache(cls, dev_path):
4685
    """Remove data for a dev_path.
4686

4687
    This is just a wrapper over L{utils.io.RemoveFile} with a converted
4688
    path name and logging.
4689

4690
    @type dev_path: str
4691
    @param dev_path: the pathname of the device
4692

4693
    @rtype: None
4694

4695
    """
4696
    if dev_path is None:
4697
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
4698
      return
4699
    fpath = cls._ConvertPath(dev_path)
4700
    try:
4701
      utils.RemoveFile(fpath)
4702
    except EnvironmentError, err:
4703
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)