Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ ab4b1cf2

History | View | Annotate | Download (145.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable=E1103,C0302
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37
# C0302: This module has become too big and should be split up
38

    
39

    
40
import os
41
import os.path
42
import shutil
43
import time
44
import stat
45
import errno
46
import re
47
import random
48
import logging
49
import tempfile
50
import zlib
51
import base64
52
import signal
53

    
54
from ganeti import errors
55
from ganeti import utils
56
from ganeti import ssh
57
from ganeti import hypervisor
58
from ganeti import constants
59
from ganeti.storage import bdev
60
from ganeti.storage import drbd
61
from ganeti.storage import filestorage
62
from ganeti import objects
63
from ganeti import ssconf
64
from ganeti import serializer
65
from ganeti import netutils
66
from ganeti import runtime
67
from ganeti import compat
68
from ganeti import pathutils
69
from ganeti import vcluster
70
from ganeti import ht
71
from ganeti.storage.base import BlockDev
72
from ganeti.storage.drbd import DRBD8
73
from ganeti import hooksmaster
74

    
75

    
76
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
77
_ALLOWED_CLEAN_DIRS = compat.UniqueFrozenset([
78
  pathutils.DATA_DIR,
79
  pathutils.JOB_QUEUE_ARCHIVE_DIR,
80
  pathutils.QUEUE_DIR,
81
  pathutils.CRYPTO_KEYS_DIR,
82
  ])
83
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
84
_X509_KEY_FILE = "key"
85
_X509_CERT_FILE = "cert"
86
_IES_STATUS_FILE = "status"
87
_IES_PID_FILE = "pid"
88
_IES_CA_FILE = "ca"
89

    
90
#: Valid LVS output line regex
91
_LVSLINE_REGEX = re.compile(r"^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$")
92

    
93
# Actions for the master setup script
94
_MASTER_START = "start"
95
_MASTER_STOP = "stop"
96

    
97
#: Maximum file permissions for restricted command directory and executables
98
_RCMD_MAX_MODE = (stat.S_IRWXU |
99
                  stat.S_IRGRP | stat.S_IXGRP |
100
                  stat.S_IROTH | stat.S_IXOTH)
101

    
102
#: Delay before returning an error for restricted commands
103
_RCMD_INVALID_DELAY = 10
104

    
105
#: How long to wait to acquire lock for restricted commands (shorter than
106
#: L{_RCMD_INVALID_DELAY}) to reduce blockage of noded forks when many
107
#: command requests arrive
108
_RCMD_LOCK_TIMEOUT = _RCMD_INVALID_DELAY * 0.8
109

    
110

    
111
class RPCFail(Exception):
112
  """Class denoting RPC failure.
113

114
  Its argument is the error message.
115

116
  """
117

    
118

    
119
def _GetInstReasonFilename(instance_name):
120
  """Path of the file containing the reason of the instance status change.
121

122
  @type instance_name: string
123
  @param instance_name: The name of the instance
124
  @rtype: string
125
  @return: The path of the file
126

127
  """
128
  return utils.PathJoin(pathutils.INSTANCE_REASON_DIR, instance_name)
129

    
130

    
131
def _StoreInstReasonTrail(instance_name, trail):
132
  """Serialize a reason trail related to an instance change of state to file.
133

134
  The exact location of the file depends on the name of the instance and on
135
  the configuration of the Ganeti cluster defined at deploy time.
136

137
  @type instance_name: string
138
  @param instance_name: The name of the instance
139
  @rtype: None
140

141
  """
142
  json = serializer.DumpJson(trail)
143
  filename = _GetInstReasonFilename(instance_name)
144
  utils.WriteFile(filename, data=json)
145

    
146

    
147
def _Fail(msg, *args, **kwargs):
148
  """Log an error and the raise an RPCFail exception.
149

150
  This exception is then handled specially in the ganeti daemon and
151
  turned into a 'failed' return type. As such, this function is a
152
  useful shortcut for logging the error and returning it to the master
153
  daemon.
154

155
  @type msg: string
156
  @param msg: the text of the exception
157
  @raise RPCFail
158

159
  """
160
  if args:
161
    msg = msg % args
162
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
163
    if "exc" in kwargs and kwargs["exc"]:
164
      logging.exception(msg)
165
    else:
166
      logging.error(msg)
167
  raise RPCFail(msg)
168

    
169

    
170
def _GetConfig():
171
  """Simple wrapper to return a SimpleStore.
172

173
  @rtype: L{ssconf.SimpleStore}
174
  @return: a SimpleStore instance
175

176
  """
177
  return ssconf.SimpleStore()
178

    
179

    
180
def _GetSshRunner(cluster_name):
181
  """Simple wrapper to return an SshRunner.
182

183
  @type cluster_name: str
184
  @param cluster_name: the cluster name, which is needed
185
      by the SshRunner constructor
186
  @rtype: L{ssh.SshRunner}
187
  @return: an SshRunner instance
188

189
  """
190
  return ssh.SshRunner(cluster_name)
191

    
192

    
193
def _Decompress(data):
194
  """Unpacks data compressed by the RPC client.
195

196
  @type data: list or tuple
197
  @param data: Data sent by RPC client
198
  @rtype: str
199
  @return: Decompressed data
200

201
  """
202
  assert isinstance(data, (list, tuple))
203
  assert len(data) == 2
204
  (encoding, content) = data
205
  if encoding == constants.RPC_ENCODING_NONE:
206
    return content
207
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
208
    return zlib.decompress(base64.b64decode(content))
209
  else:
210
    raise AssertionError("Unknown data encoding")
211

    
212

    
213
def _CleanDirectory(path, exclude=None):
214
  """Removes all regular files in a directory.
215

216
  @type path: str
217
  @param path: the directory to clean
218
  @type exclude: list
219
  @param exclude: list of files to be excluded, defaults
220
      to the empty list
221

222
  """
223
  if path not in _ALLOWED_CLEAN_DIRS:
224
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
225
          path)
226

    
227
  if not os.path.isdir(path):
228
    return
229
  if exclude is None:
230
    exclude = []
231
  else:
232
    # Normalize excluded paths
233
    exclude = [os.path.normpath(i) for i in exclude]
234

    
235
  for rel_name in utils.ListVisibleFiles(path):
236
    full_name = utils.PathJoin(path, rel_name)
237
    if full_name in exclude:
238
      continue
239
    if os.path.isfile(full_name) and not os.path.islink(full_name):
240
      utils.RemoveFile(full_name)
241

    
242

    
243
def _BuildUploadFileList():
244
  """Build the list of allowed upload files.
245

246
  This is abstracted so that it's built only once at module import time.
247

248
  """
249
  allowed_files = set([
250
    pathutils.CLUSTER_CONF_FILE,
251
    pathutils.ETC_HOSTS,
252
    pathutils.SSH_KNOWN_HOSTS_FILE,
253
    pathutils.VNC_PASSWORD_FILE,
254
    pathutils.RAPI_CERT_FILE,
255
    pathutils.SPICE_CERT_FILE,
256
    pathutils.SPICE_CACERT_FILE,
257
    pathutils.RAPI_USERS_FILE,
258
    pathutils.CONFD_HMAC_KEY,
259
    pathutils.CLUSTER_DOMAIN_SECRET_FILE,
260
    ])
261

    
262
  for hv_name in constants.HYPER_TYPES:
263
    hv_class = hypervisor.GetHypervisorClass(hv_name)
264
    allowed_files.update(hv_class.GetAncillaryFiles()[0])
265

    
266
  assert pathutils.FILE_STORAGE_PATHS_FILE not in allowed_files, \
267
    "Allowed file storage paths should never be uploaded via RPC"
268

    
269
  return frozenset(allowed_files)
270

    
271

    
272
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
273

    
274

    
275
def JobQueuePurge():
276
  """Removes job queue files and archived jobs.
277

278
  @rtype: tuple
279
  @return: True, None
280

281
  """
282
  _CleanDirectory(pathutils.QUEUE_DIR, exclude=[pathutils.JOB_QUEUE_LOCK_FILE])
283
  _CleanDirectory(pathutils.JOB_QUEUE_ARCHIVE_DIR)
284

    
285

    
286
def GetMasterNodeName():
287
  """Returns the master node name.
288

289
  @rtype: string
290
  @return: name of the master node
291
  @raise RPCFail: in case of errors
292

293
  """
294
  try:
295
    return _GetConfig().GetMasterNode()
296
  except errors.ConfigurationError, err:
297
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
298

    
299

    
300
def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
301
  """Decorator that runs hooks before and after the decorated function.
302

303
  @type hook_opcode: string
304
  @param hook_opcode: opcode of the hook
305
  @type hooks_path: string
306
  @param hooks_path: path of the hooks
307
  @type env_builder_fn: function
308
  @param env_builder_fn: function that returns a dictionary containing the
309
    environment variables for the hooks. Will get all the parameters of the
310
    decorated function.
311
  @raise RPCFail: in case of pre-hook failure
312

313
  """
314
  def decorator(fn):
315
    def wrapper(*args, **kwargs):
316
      _, myself = ssconf.GetMasterAndMyself()
317
      nodes = ([myself], [myself])  # these hooks run locally
318

    
319
      env_fn = compat.partial(env_builder_fn, *args, **kwargs)
320

    
321
      cfg = _GetConfig()
322
      hr = HooksRunner()
323
      hm = hooksmaster.HooksMaster(hook_opcode, hooks_path, nodes,
324
                                   hr.RunLocalHooks, None, env_fn,
325
                                   logging.warning, cfg.GetClusterName(),
326
                                   cfg.GetMasterNode())
327
      hm.RunPhase(constants.HOOKS_PHASE_PRE)
328
      result = fn(*args, **kwargs)
329
      hm.RunPhase(constants.HOOKS_PHASE_POST)
330

    
331
      return result
332
    return wrapper
333
  return decorator
334

    
335

    
336
def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
337
  """Builds environment variables for master IP hooks.
338

339
  @type master_params: L{objects.MasterNetworkParameters}
340
  @param master_params: network parameters of the master
341
  @type use_external_mip_script: boolean
342
  @param use_external_mip_script: whether to use an external master IP
343
    address setup script (unused, but necessary per the implementation of the
344
    _RunLocalHooks decorator)
345

346
  """
347
  # pylint: disable=W0613
348
  ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
349
  env = {
350
    "MASTER_NETDEV": master_params.netdev,
351
    "MASTER_IP": master_params.ip,
352
    "MASTER_NETMASK": str(master_params.netmask),
353
    "CLUSTER_IP_VERSION": str(ver),
354
  }
355

    
356
  return env
357

    
358

    
359
def _RunMasterSetupScript(master_params, action, use_external_mip_script):
360
  """Execute the master IP address setup script.
361

362
  @type master_params: L{objects.MasterNetworkParameters}
363
  @param master_params: network parameters of the master
364
  @type action: string
365
  @param action: action to pass to the script. Must be one of
366
    L{backend._MASTER_START} or L{backend._MASTER_STOP}
367
  @type use_external_mip_script: boolean
368
  @param use_external_mip_script: whether to use an external master IP
369
    address setup script
370
  @raise backend.RPCFail: if there are errors during the execution of the
371
    script
372

373
  """
374
  env = _BuildMasterIpEnv(master_params)
375

    
376
  if use_external_mip_script:
377
    setup_script = pathutils.EXTERNAL_MASTER_SETUP_SCRIPT
378
  else:
379
    setup_script = pathutils.DEFAULT_MASTER_SETUP_SCRIPT
380

    
381
  result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
382

    
383
  if result.failed:
384
    _Fail("Failed to %s the master IP. Script return value: %s, output: '%s'" %
385
          (action, result.exit_code, result.output), log=True)
386

    
387

    
388
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
389
               _BuildMasterIpEnv)
390
def ActivateMasterIp(master_params, use_external_mip_script):
391
  """Activate the IP address of the master daemon.
392

393
  @type master_params: L{objects.MasterNetworkParameters}
394
  @param master_params: network parameters of the master
395
  @type use_external_mip_script: boolean
396
  @param use_external_mip_script: whether to use an external master IP
397
    address setup script
398
  @raise RPCFail: in case of errors during the IP startup
399

400
  """
401
  _RunMasterSetupScript(master_params, _MASTER_START,
402
                        use_external_mip_script)
403

    
404

    
405
def StartMasterDaemons(no_voting):
406
  """Activate local node as master node.
407

408
  The function will start the master daemons (ganeti-masterd and ganeti-rapi).
409

410
  @type no_voting: boolean
411
  @param no_voting: whether to start ganeti-masterd without a node vote
412
      but still non-interactively
413
  @rtype: None
414

415
  """
416

    
417
  if no_voting:
418
    masterd_args = "--no-voting --yes-do-it"
419
  else:
420
    masterd_args = ""
421

    
422
  env = {
423
    "EXTRA_MASTERD_ARGS": masterd_args,
424
    }
425

    
426
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
427
  if result.failed:
428
    msg = "Can't start Ganeti master: %s" % result.output
429
    logging.error(msg)
430
    _Fail(msg)
431

    
432

    
433
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
434
               _BuildMasterIpEnv)
435
def DeactivateMasterIp(master_params, use_external_mip_script):
436
  """Deactivate the master IP on this node.
437

438
  @type master_params: L{objects.MasterNetworkParameters}
439
  @param master_params: network parameters of the master
440
  @type use_external_mip_script: boolean
441
  @param use_external_mip_script: whether to use an external master IP
442
    address setup script
443
  @raise RPCFail: in case of errors during the IP turndown
444

445
  """
446
  _RunMasterSetupScript(master_params, _MASTER_STOP,
447
                        use_external_mip_script)
448

    
449

    
450
def StopMasterDaemons():
451
  """Stop the master daemons on this node.
452

453
  Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
454

455
  @rtype: None
456

457
  """
458
  # TODO: log and report back to the caller the error failures; we
459
  # need to decide in which case we fail the RPC for this
460

    
461
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
462
  if result.failed:
463
    logging.error("Could not stop Ganeti master, command %s had exitcode %s"
464
                  " and error %s",
465
                  result.cmd, result.exit_code, result.output)
466

    
467

    
468
def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
469
  """Change the netmask of the master IP.
470

471
  @param old_netmask: the old value of the netmask
472
  @param netmask: the new value of the netmask
473
  @param master_ip: the master IP
474
  @param master_netdev: the master network device
475

476
  """
477
  if old_netmask == netmask:
478
    return
479

    
480
  if not netutils.IPAddress.Own(master_ip):
481
    _Fail("The master IP address is not up, not attempting to change its"
482
          " netmask")
483

    
484
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
485
                         "%s/%s" % (master_ip, netmask),
486
                         "dev", master_netdev, "label",
487
                         "%s:0" % master_netdev])
488
  if result.failed:
489
    _Fail("Could not set the new netmask on the master IP address")
490

    
491
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
492
                         "%s/%s" % (master_ip, old_netmask),
493
                         "dev", master_netdev, "label",
494
                         "%s:0" % master_netdev])
495
  if result.failed:
496
    _Fail("Could not bring down the master IP address with the old netmask")
497

    
498

    
499
def EtcHostsModify(mode, host, ip):
500
  """Modify a host entry in /etc/hosts.
501

502
  @param mode: The mode to operate. Either add or remove entry
503
  @param host: The host to operate on
504
  @param ip: The ip associated with the entry
505

506
  """
507
  if mode == constants.ETC_HOSTS_ADD:
508
    if not ip:
509
      RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
510
              " present")
511
    utils.AddHostToEtcHosts(host, ip)
512
  elif mode == constants.ETC_HOSTS_REMOVE:
513
    if ip:
514
      RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
515
              " parameter is present")
516
    utils.RemoveHostFromEtcHosts(host)
517
  else:
518
    RPCFail("Mode not supported")
519

    
520

    
521
def LeaveCluster(modify_ssh_setup):
522
  """Cleans up and remove the current node.
523

524
  This function cleans up and prepares the current node to be removed
525
  from the cluster.
526

527
  If processing is successful, then it raises an
528
  L{errors.QuitGanetiException} which is used as a special case to
529
  shutdown the node daemon.
530

531
  @param modify_ssh_setup: boolean
532

533
  """
534
  _CleanDirectory(pathutils.DATA_DIR)
535
  _CleanDirectory(pathutils.CRYPTO_KEYS_DIR)
536
  JobQueuePurge()
537

    
538
  if modify_ssh_setup:
539
    try:
540
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
541

    
542
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
543

    
544
      utils.RemoveFile(priv_key)
545
      utils.RemoveFile(pub_key)
546
    except errors.OpExecError:
547
      logging.exception("Error while processing ssh files")
548

    
549
  try:
550
    utils.RemoveFile(pathutils.CONFD_HMAC_KEY)
551
    utils.RemoveFile(pathutils.RAPI_CERT_FILE)
552
    utils.RemoveFile(pathutils.SPICE_CERT_FILE)
553
    utils.RemoveFile(pathutils.SPICE_CACERT_FILE)
554
    utils.RemoveFile(pathutils.NODED_CERT_FILE)
555
  except: # pylint: disable=W0702
556
    logging.exception("Error while removing cluster secrets")
557

    
558
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.CONFD])
559
  if result.failed:
560
    logging.error("Command %s failed with exitcode %s and error %s",
561
                  result.cmd, result.exit_code, result.output)
562

    
563
  # Raise a custom exception (handled in ganeti-noded)
564
  raise errors.QuitGanetiException(True, "Shutdown scheduled")
565

    
566

    
567
def _CheckStorageParams(params, num_params):
568
  """Performs sanity checks for storage parameters.
569

570
  @type params: list
571
  @param params: list of storage parameters
572
  @type num_params: int
573
  @param num_params: expected number of parameters
574

575
  """
576
  if params is None:
577
    raise errors.ProgrammerError("No storage parameters for storage"
578
                                 " reporting is provided.")
579
  if not isinstance(params, list):
580
    raise errors.ProgrammerError("The storage parameters are not of type"
581
                                 " list: '%s'" % params)
582
  if not len(params) == num_params:
583
    raise errors.ProgrammerError("Did not receive the expected number of"
584
                                 "storage parameters: expected %s,"
585
                                 " received '%s'" % (num_params, len(params)))
586

    
587

    
588
def _CheckLvmStorageParams(params):
589
  """Performs sanity check for the 'exclusive storage' flag.
590

591
  @see: C{_CheckStorageParams}
592

593
  """
594
  _CheckStorageParams(params, 1)
595
  excl_stor = params[0]
596
  if not isinstance(params[0], bool):
597
    raise errors.ProgrammerError("Exclusive storage parameter is not"
598
                                 " boolean: '%s'." % excl_stor)
599
  return excl_stor
600

    
601

    
602
def _GetLvmVgSpaceInfo(name, params):
603
  """Wrapper around C{_GetVgInfo} which checks the storage parameters.
604

605
  @type name: string
606
  @param name: name of the volume group
607
  @type params: list
608
  @param params: list of storage parameters, which in this case should be
609
    containing only one for exclusive storage
610

611
  """
612
  excl_stor = _CheckLvmStorageParams(params)
613
  return _GetVgInfo(name, excl_stor)
614

    
615

    
616
def _GetVgInfo(
617
    name, excl_stor, info_fn=bdev.LogicalVolume.GetVGInfo):
618
  """Retrieves information about a LVM volume group.
619

620
  """
621
  # TODO: GetVGInfo supports returning information for multiple VGs at once
622
  vginfo = info_fn([name], excl_stor)
623
  if vginfo:
624
    vg_free = int(round(vginfo[0][0], 0))
625
    vg_size = int(round(vginfo[0][1], 0))
626
  else:
627
    vg_free = None
628
    vg_size = None
629

    
630
  return {
631
    "type": constants.ST_LVM_VG,
632
    "name": name,
633
    "storage_free": vg_free,
634
    "storage_size": vg_size,
635
    }
636

    
637

    
638
def _GetLvmPvSpaceInfo(name, params):
639
  """Wrapper around C{_GetVgSpindlesInfo} with sanity checks.
640

641
  @see: C{_GetLvmVgSpaceInfo}
642

643
  """
644
  excl_stor = _CheckLvmStorageParams(params)
645
  return _GetVgSpindlesInfo(name, excl_stor)
646

    
647

    
648
def _GetVgSpindlesInfo(
649
    name, excl_stor, info_fn=bdev.LogicalVolume.GetVgSpindlesInfo):
650
  """Retrieves information about spindles in an LVM volume group.
651

652
  @type name: string
653
  @param name: VG name
654
  @type excl_stor: bool
655
  @param excl_stor: exclusive storage
656
  @rtype: dict
657
  @return: dictionary whose keys are "name", "vg_free", "vg_size" for VG name,
658
      free spindles, total spindles respectively
659

660
  """
661
  if excl_stor:
662
    (vg_free, vg_size) = info_fn(name)
663
  else:
664
    vg_free = 0
665
    vg_size = 0
666
  return {
667
    "type": constants.ST_LVM_PV,
668
    "name": name,
669
    "storage_free": vg_free,
670
    "storage_size": vg_size,
671
    }
672

    
673

    
674
def _GetHvInfo(name, hvparams, get_hv_fn=hypervisor.GetHypervisor):
675
  """Retrieves node information from a hypervisor.
676

677
  The information returned depends on the hypervisor. Common items:
678

679
    - vg_size is the size of the configured volume group in MiB
680
    - vg_free is the free size of the volume group in MiB
681
    - memory_dom0 is the memory allocated for domain0 in MiB
682
    - memory_free is the currently available (free) ram in MiB
683
    - memory_total is the total number of ram in MiB
684
    - hv_version: the hypervisor version, if available
685

686
  @type hvparams: dict of string
687
  @param hvparams: the hypervisor's hvparams
688

689
  """
690
  return get_hv_fn(name).GetNodeInfo(hvparams=hvparams)
691

    
692

    
693
def _GetHvInfoAll(hv_specs, get_hv_fn=hypervisor.GetHypervisor):
694
  """Retrieves node information for all hypervisors.
695

696
  See C{_GetHvInfo} for information on the output.
697

698
  @type hv_specs: list of pairs (string, dict of strings)
699
  @param hv_specs: list of pairs of a hypervisor's name and its hvparams
700

701
  """
702
  if hv_specs is None:
703
    return None
704

    
705
  result = []
706
  for hvname, hvparams in hv_specs:
707
    result.append(_GetHvInfo(hvname, hvparams, get_hv_fn))
708
  return result
709

    
710

    
711
def _GetNamedNodeInfo(names, fn):
712
  """Calls C{fn} for all names in C{names} and returns a dictionary.
713

714
  @rtype: None or dict
715

716
  """
717
  if names is None:
718
    return None
719
  else:
720
    return map(fn, names)
721

    
722

    
723
def GetNodeInfo(storage_units, hv_specs):
724
  """Gives back a hash with different information about the node.
725

726
  @type storage_units: list of tuples (string, string, list)
727
  @param storage_units: List of tuples (storage unit, identifier, parameters) to
728
    ask for disk space information. In case of lvm-vg, the identifier is
729
    the VG name. The parameters can contain additional, storage-type-specific
730
    parameters, for example exclusive storage for lvm storage.
731
  @type hv_specs: list of pairs (string, dict of strings)
732
  @param hv_specs: list of pairs of a hypervisor's name and its hvparams
733
  @rtype: tuple; (string, None/dict, None/dict)
734
  @return: Tuple containing boot ID, volume group information and hypervisor
735
    information
736

737
  """
738
  bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
739
  storage_info = _GetNamedNodeInfo(
740
    storage_units,
741
    (lambda (storage_type, storage_key, storage_params):
742
        _ApplyStorageInfoFunction(storage_type, storage_key, storage_params)))
743
  hv_info = _GetHvInfoAll(hv_specs)
744
  return (bootid, storage_info, hv_info)
745

    
746

    
747
def _GetFileStorageSpaceInfo(path, params):
748
  """Wrapper around filestorage.GetSpaceInfo.
749

750
  The purpose of this wrapper is to call filestorage.GetFileStorageSpaceInfo
751
  and ignore the *args parameter to not leak it into the filestorage
752
  module's code.
753

754
  @see: C{filestorage.GetFileStorageSpaceInfo} for description of the
755
    parameters.
756

757
  """
758
  _CheckStorageParams(params, 0)
759
  return filestorage.GetFileStorageSpaceInfo(path)
760

    
761

    
762
# FIXME: implement storage reporting for all missing storage types.
763
_STORAGE_TYPE_INFO_FN = {
764
  constants.ST_BLOCK: None,
765
  constants.ST_DISKLESS: None,
766
  constants.ST_EXT: None,
767
  constants.ST_FILE: _GetFileStorageSpaceInfo,
768
  constants.ST_LVM_PV: _GetLvmPvSpaceInfo,
769
  constants.ST_LVM_VG: _GetLvmVgSpaceInfo,
770
  constants.ST_SHARED_FILE: None,
771
  constants.ST_RADOS: None,
772
}
773

    
774

    
775
def _ApplyStorageInfoFunction(storage_type, storage_key, *args):
776
  """Looks up and applies the correct function to calculate free and total
777
  storage for the given storage type.
778

779
  @type storage_type: string
780
  @param storage_type: the storage type for which the storage shall be reported.
781
  @type storage_key: string
782
  @param storage_key: identifier of a storage unit, e.g. the volume group name
783
    of an LVM storage unit
784
  @type args: any
785
  @param args: various parameters that can be used for storage reporting. These
786
    parameters and their semantics vary from storage type to storage type and
787
    are just propagated in this function.
788
  @return: the results of the application of the storage space function (see
789
    _STORAGE_TYPE_INFO_FN) if storage space reporting is implemented for that
790
    storage type
791
  @raises NotImplementedError: for storage types who don't support space
792
    reporting yet
793
  """
794
  fn = _STORAGE_TYPE_INFO_FN[storage_type]
795
  if fn is not None:
796
    return fn(storage_key, *args)
797
  else:
798
    raise NotImplementedError
799

    
800

    
801
def _CheckExclusivePvs(pvi_list):
802
  """Check that PVs are not shared among LVs
803

804
  @type pvi_list: list of L{objects.LvmPvInfo} objects
805
  @param pvi_list: information about the PVs
806

807
  @rtype: list of tuples (string, list of strings)
808
  @return: offending volumes, as tuples: (pv_name, [lv1_name, lv2_name...])
809

810
  """
811
  res = []
812
  for pvi in pvi_list:
813
    if len(pvi.lv_list) > 1:
814
      res.append((pvi.name, pvi.lv_list))
815
  return res
816

    
817

    
818
def _VerifyHypervisors(what, vm_capable, result, all_hvparams,
819
                       get_hv_fn=hypervisor.GetHypervisor):
820
  """Verifies the hypervisor. Appends the results to the 'results' list.
821

822
  @type what: C{dict}
823
  @param what: a dictionary of things to check
824
  @type vm_capable: boolean
825
  @param vm_capable: whether or not this node is vm capable
826
  @type result: dict
827
  @param result: dictionary of verification results; results of the
828
    verifications in this function will be added here
829
  @type all_hvparams: dict of dict of string
830
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
831
  @type get_hv_fn: function
832
  @param get_hv_fn: function to retrieve the hypervisor, to improve testability
833

834
  """
835
  if not vm_capable:
836
    return
837

    
838
  if constants.NV_HYPERVISOR in what:
839
    result[constants.NV_HYPERVISOR] = {}
840
    for hv_name in what[constants.NV_HYPERVISOR]:
841
      hvparams = all_hvparams[hv_name]
842
      try:
843
        val = get_hv_fn(hv_name).Verify(hvparams=hvparams)
844
      except errors.HypervisorError, err:
845
        val = "Error while checking hypervisor: %s" % str(err)
846
      result[constants.NV_HYPERVISOR][hv_name] = val
847

    
848

    
849
def _VerifyHvparams(what, vm_capable, result,
850
                    get_hv_fn=hypervisor.GetHypervisor):
851
  """Verifies the hvparams. Appends the results to the 'results' list.
852

853
  @type what: C{dict}
854
  @param what: a dictionary of things to check
855
  @type vm_capable: boolean
856
  @param vm_capable: whether or not this node is vm capable
857
  @type result: dict
858
  @param result: dictionary of verification results; results of the
859
    verifications in this function will be added here
860
  @type get_hv_fn: function
861
  @param get_hv_fn: function to retrieve the hypervisor, to improve testability
862

863
  """
864
  if not vm_capable:
865
    return
866

    
867
  if constants.NV_HVPARAMS in what:
868
    result[constants.NV_HVPARAMS] = []
869
    for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
870
      try:
871
        logging.info("Validating hv %s, %s", hv_name, hvparms)
872
        get_hv_fn(hv_name).ValidateParameters(hvparms)
873
      except errors.HypervisorError, err:
874
        result[constants.NV_HVPARAMS].append((source, hv_name, str(err)))
875

    
876

    
877
def _VerifyInstanceList(what, vm_capable, result, all_hvparams):
878
  """Verifies the instance list.
879

880
  @type what: C{dict}
881
  @param what: a dictionary of things to check
882
  @type vm_capable: boolean
883
  @param vm_capable: whether or not this node is vm capable
884
  @type result: dict
885
  @param result: dictionary of verification results; results of the
886
    verifications in this function will be added here
887
  @type all_hvparams: dict of dict of string
888
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
889

890
  """
891
  if constants.NV_INSTANCELIST in what and vm_capable:
892
    # GetInstanceList can fail
893
    try:
894
      val = GetInstanceList(what[constants.NV_INSTANCELIST],
895
                            all_hvparams=all_hvparams)
896
    except RPCFail, err:
897
      val = str(err)
898
    result[constants.NV_INSTANCELIST] = val
899

    
900

    
901
def _VerifyNodeInfo(what, vm_capable, result, all_hvparams):
902
  """Verifies the node info.
903

904
  @type what: C{dict}
905
  @param what: a dictionary of things to check
906
  @type vm_capable: boolean
907
  @param vm_capable: whether or not this node is vm capable
908
  @type result: dict
909
  @param result: dictionary of verification results; results of the
910
    verifications in this function will be added here
911
  @type all_hvparams: dict of dict of string
912
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
913

914
  """
915
  if constants.NV_HVINFO in what and vm_capable:
916
    hvname = what[constants.NV_HVINFO]
917
    hyper = hypervisor.GetHypervisor(hvname)
918
    hvparams = all_hvparams[hvname]
919
    result[constants.NV_HVINFO] = hyper.GetNodeInfo(hvparams=hvparams)
920

    
921

    
922
def _VerifyClientCertificate(cert_file=pathutils.NODED_CLIENT_CERT_FILE):
923
  """Verify the existance and validity of the client SSL certificate.
924

925
  """
926
  create_cert_cmd = "gnt-cluster renew-crypto --new-node-certificates"
927
  if not os.path.exists(cert_file):
928
    return (constants.CV_ERROR,
929
            "The client certificate does not exist. Run '%s' to create"
930
            "client certificates for all nodes." % create_cert_cmd)
931

    
932
  (errcode, msg) = utils.VerifyCertificate(cert_file)
933
  if errcode is not None:
934
    return (errcode, msg)
935
  else:
936
    # if everything is fine, we return the digest to be compared to the config
937
    return (None, utils.GetCertificateDigest(cert_filename=cert_file))
938

    
939

    
940
def VerifyNode(what, cluster_name, all_hvparams, node_groups, groups_cfg):
941
  """Verify the status of the local node.
942

943
  Based on the input L{what} parameter, various checks are done on the
944
  local node.
945

946
  If the I{filelist} key is present, this list of
947
  files is checksummed and the file/checksum pairs are returned.
948

949
  If the I{nodelist} key is present, we check that we have
950
  connectivity via ssh with the target nodes (and check the hostname
951
  report).
952

953
  If the I{node-net-test} key is present, we check that we have
954
  connectivity to the given nodes via both primary IP and, if
955
  applicable, secondary IPs.
956

957
  @type what: C{dict}
958
  @param what: a dictionary of things to check:
959
      - filelist: list of files for which to compute checksums
960
      - nodelist: list of nodes we should check ssh communication with
961
      - node-net-test: list of nodes we should check node daemon port
962
        connectivity with
963
      - hypervisor: list with hypervisors to run the verify for
964
  @type cluster_name: string
965
  @param cluster_name: the cluster's name
966
  @type all_hvparams: dict of dict of strings
967
  @param all_hvparams: a dictionary mapping hypervisor names to hvparams
968
  @type node_groups: a dict of strings
969
  @param node_groups: node _names_ mapped to their group uuids (it's enough to
970
      have only those nodes that are in `what["nodelist"]`)
971
  @type groups_cfg: a dict of dict of strings
972
  @param groups_cfg: a dictionary mapping group uuids to their configuration
973
  @rtype: dict
974
  @return: a dictionary with the same keys as the input dict, and
975
      values representing the result of the checks
976

977
  """
978
  result = {}
979
  my_name = netutils.Hostname.GetSysName()
980
  port = netutils.GetDaemonPort(constants.NODED)
981
  vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
982

    
983
  _VerifyHypervisors(what, vm_capable, result, all_hvparams)
984
  _VerifyHvparams(what, vm_capable, result)
985

    
986
  if constants.NV_FILELIST in what:
987
    fingerprints = utils.FingerprintFiles(map(vcluster.LocalizeVirtualPath,
988
                                              what[constants.NV_FILELIST]))
989
    result[constants.NV_FILELIST] = \
990
      dict((vcluster.MakeVirtualPath(key), value)
991
           for (key, value) in fingerprints.items())
992

    
993
  if constants.NV_CLIENT_CERT in what:
994
    result[constants.NV_CLIENT_CERT] = _VerifyClientCertificate()
995

    
996
  if constants.NV_NODELIST in what:
997
    (nodes, bynode) = what[constants.NV_NODELIST]
998

    
999
    # Add nodes from other groups (different for each node)
1000
    try:
1001
      nodes.extend(bynode[my_name])
1002
    except KeyError:
1003
      pass
1004

    
1005
    # Use a random order
1006
    random.shuffle(nodes)
1007

    
1008
    # Try to contact all nodes
1009
    val = {}
1010
    for node in nodes:
1011
      params = groups_cfg.get(node_groups.get(node))
1012
      ssh_port = params["ndparams"].get(constants.ND_SSH_PORT)
1013
      logging.debug("Ssh port %s (None = default) for node %s",
1014
                    str(ssh_port), node)
1015
      success, message = _GetSshRunner(cluster_name). \
1016
                            VerifyNodeHostname(node, ssh_port)
1017
      if not success:
1018
        val[node] = message
1019

    
1020
    result[constants.NV_NODELIST] = val
1021

    
1022
  if constants.NV_NODENETTEST in what:
1023
    result[constants.NV_NODENETTEST] = tmp = {}
1024
    my_pip = my_sip = None
1025
    for name, pip, sip in what[constants.NV_NODENETTEST]:
1026
      if name == my_name:
1027
        my_pip = pip
1028
        my_sip = sip
1029
        break
1030
    if not my_pip:
1031
      tmp[my_name] = ("Can't find my own primary/secondary IP"
1032
                      " in the node list")
1033
    else:
1034
      for name, pip, sip in what[constants.NV_NODENETTEST]:
1035
        fail = []
1036
        if not netutils.TcpPing(pip, port, source=my_pip):
1037
          fail.append("primary")
1038
        if sip != pip:
1039
          if not netutils.TcpPing(sip, port, source=my_sip):
1040
            fail.append("secondary")
1041
        if fail:
1042
          tmp[name] = ("failure using the %s interface(s)" %
1043
                       " and ".join(fail))
1044

    
1045
  if constants.NV_MASTERIP in what:
1046
    # FIXME: add checks on incoming data structures (here and in the
1047
    # rest of the function)
1048
    master_name, master_ip = what[constants.NV_MASTERIP]
1049
    if master_name == my_name:
1050
      source = constants.IP4_ADDRESS_LOCALHOST
1051
    else:
1052
      source = None
1053
    result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
1054
                                                     source=source)
1055

    
1056
  if constants.NV_USERSCRIPTS in what:
1057
    result[constants.NV_USERSCRIPTS] = \
1058
      [script for script in what[constants.NV_USERSCRIPTS]
1059
       if not utils.IsExecutable(script)]
1060

    
1061
  if constants.NV_OOB_PATHS in what:
1062
    result[constants.NV_OOB_PATHS] = tmp = []
1063
    for path in what[constants.NV_OOB_PATHS]:
1064
      try:
1065
        st = os.stat(path)
1066
      except OSError, err:
1067
        tmp.append("error stating out of band helper: %s" % err)
1068
      else:
1069
        if stat.S_ISREG(st.st_mode):
1070
          if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
1071
            tmp.append(None)
1072
          else:
1073
            tmp.append("out of band helper %s is not executable" % path)
1074
        else:
1075
          tmp.append("out of band helper %s is not a file" % path)
1076

    
1077
  if constants.NV_LVLIST in what and vm_capable:
1078
    try:
1079
      val = GetVolumeList(utils.ListVolumeGroups().keys())
1080
    except RPCFail, err:
1081
      val = str(err)
1082
    result[constants.NV_LVLIST] = val
1083

    
1084
  _VerifyInstanceList(what, vm_capable, result, all_hvparams)
1085

    
1086
  if constants.NV_VGLIST in what and vm_capable:
1087
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
1088

    
1089
  if constants.NV_PVLIST in what and vm_capable:
1090
    check_exclusive_pvs = constants.NV_EXCLUSIVEPVS in what
1091
    val = bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
1092
                                       filter_allocatable=False,
1093
                                       include_lvs=check_exclusive_pvs)
1094
    if check_exclusive_pvs:
1095
      result[constants.NV_EXCLUSIVEPVS] = _CheckExclusivePvs(val)
1096
      for pvi in val:
1097
        # Avoid sending useless data on the wire
1098
        pvi.lv_list = []
1099
    result[constants.NV_PVLIST] = map(objects.LvmPvInfo.ToDict, val)
1100

    
1101
  if constants.NV_VERSION in what:
1102
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
1103
                                    constants.RELEASE_VERSION)
1104

    
1105
  _VerifyNodeInfo(what, vm_capable, result, all_hvparams)
1106

    
1107
  if constants.NV_DRBDVERSION in what and vm_capable:
1108
    try:
1109
      drbd_version = DRBD8.GetProcInfo().GetVersionString()
1110
    except errors.BlockDeviceError, err:
1111
      logging.warning("Can't get DRBD version", exc_info=True)
1112
      drbd_version = str(err)
1113
    result[constants.NV_DRBDVERSION] = drbd_version
1114

    
1115
  if constants.NV_DRBDLIST in what and vm_capable:
1116
    try:
1117
      used_minors = drbd.DRBD8.GetUsedDevs()
1118
    except errors.BlockDeviceError, err:
1119
      logging.warning("Can't get used minors list", exc_info=True)
1120
      used_minors = str(err)
1121
    result[constants.NV_DRBDLIST] = used_minors
1122

    
1123
  if constants.NV_DRBDHELPER in what and vm_capable:
1124
    status = True
1125
    try:
1126
      payload = drbd.DRBD8.GetUsermodeHelper()
1127
    except errors.BlockDeviceError, err:
1128
      logging.error("Can't get DRBD usermode helper: %s", str(err))
1129
      status = False
1130
      payload = str(err)
1131
    result[constants.NV_DRBDHELPER] = (status, payload)
1132

    
1133
  if constants.NV_NODESETUP in what:
1134
    result[constants.NV_NODESETUP] = tmpr = []
1135
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
1136
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
1137
                  " under /sys, missing required directories /sys/block"
1138
                  " and /sys/class/net")
1139
    if (not os.path.isdir("/proc/sys") or
1140
        not os.path.isfile("/proc/sysrq-trigger")):
1141
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
1142
                  " under /proc, missing required directory /proc/sys and"
1143
                  " the file /proc/sysrq-trigger")
1144

    
1145
  if constants.NV_TIME in what:
1146
    result[constants.NV_TIME] = utils.SplitTime(time.time())
1147

    
1148
  if constants.NV_OSLIST in what and vm_capable:
1149
    result[constants.NV_OSLIST] = DiagnoseOS()
1150

    
1151
  if constants.NV_BRIDGES in what and vm_capable:
1152
    result[constants.NV_BRIDGES] = [bridge
1153
                                    for bridge in what[constants.NV_BRIDGES]
1154
                                    if not utils.BridgeExists(bridge)]
1155

    
1156
  if what.get(constants.NV_ACCEPTED_STORAGE_PATHS) == my_name:
1157
    result[constants.NV_ACCEPTED_STORAGE_PATHS] = \
1158
        filestorage.ComputeWrongFileStoragePaths()
1159

    
1160
  if what.get(constants.NV_FILE_STORAGE_PATH):
1161
    pathresult = filestorage.CheckFileStoragePath(
1162
        what[constants.NV_FILE_STORAGE_PATH])
1163
    if pathresult:
1164
      result[constants.NV_FILE_STORAGE_PATH] = pathresult
1165

    
1166
  if what.get(constants.NV_SHARED_FILE_STORAGE_PATH):
1167
    pathresult = filestorage.CheckFileStoragePath(
1168
        what[constants.NV_SHARED_FILE_STORAGE_PATH])
1169
    if pathresult:
1170
      result[constants.NV_SHARED_FILE_STORAGE_PATH] = pathresult
1171

    
1172
  return result
1173

    
1174

    
1175
def GetCryptoTokens(token_requests):
1176
  """Perform actions on the node's cryptographic tokens.
1177

1178
  Token types can be 'ssl' or 'ssh'. So far only some actions are implemented
1179
  for 'ssl'. Action 'get' returns the digest of the public client ssl
1180
  certificate. Action 'create' creates a new client certificate and private key
1181
  and also returns the digest of the certificate. The third parameter of a
1182
  token request are optional parameters for the actions, so far only the
1183
  filename is supported.
1184

1185
  @type token_requests: list of tuples of (string, string, dict), where the
1186
    first string is in constants.CRYPTO_TYPES, the second in
1187
    constants.CRYPTO_ACTIONS. The third parameter is a dictionary of string
1188
    to string.
1189
  @param token_requests: list of requests of cryptographic tokens and actions
1190
    to perform on them. The actions come with a dictionary of options.
1191
  @rtype: list of tuples (string, string)
1192
  @return: list of tuples of the token type and the public crypto token
1193

1194
  """
1195
  _VALID_CERT_FILES = [pathutils.NODED_CERT_FILE,
1196
                       pathutils.NODED_CLIENT_CERT_FILE,
1197
                       pathutils.NODED_CLIENT_CERT_FILE_TMP]
1198
  _DEFAULT_CERT_FILE = pathutils.NODED_CLIENT_CERT_FILE
1199
  tokens = []
1200
  for (token_type, action, options) in token_requests:
1201
    if token_type not in constants.CRYPTO_TYPES:
1202
      raise errors.ProgrammerError("Token type '%s' not supported." %
1203
                                   token_type)
1204
    if action not in constants.CRYPTO_ACTIONS:
1205
      raise errors.ProgrammerError("Action '%s' is not supported." %
1206
                                   action)
1207
    if token_type == constants.CRYPTO_TYPE_SSL_DIGEST:
1208
      if action == constants.CRYPTO_ACTION_CREATE:
1209

    
1210
        # extract file name from options
1211
        cert_filename = None
1212
        if options:
1213
          cert_filename = options.get(constants.CRYPTO_OPTION_CERT_FILE)
1214
        if not cert_filename:
1215
          cert_filename = _DEFAULT_CERT_FILE
1216
        # For security reason, we don't allow arbitrary filenames
1217
        if not cert_filename in _VALID_CERT_FILES:
1218
          raise errors.ProgrammerError(
1219
            "The certificate file name path '%s' is not allowed." %
1220
            cert_filename)
1221

    
1222
        # extract serial number from options
1223
        serial_no = None
1224
        if options:
1225
          try:
1226
            serial_no = int(options[constants.CRYPTO_OPTION_SERIAL_NO])
1227
          except ValueError:
1228
            raise errors.ProgrammerError(
1229
              "The given serial number is not an intenger: %s." %
1230
              options.get(constants.CRYPTO_OPTION_SERIAL_NO))
1231
          except KeyError:
1232
            raise errors.ProgrammerError("No serial number was provided.")
1233

    
1234
        if not serial_no:
1235
          raise errors.ProgrammerError(
1236
            "Cannot create an SSL certificate without a serial no.")
1237

    
1238
        utils.GenerateNewSslCert(
1239
          True, cert_filename, serial_no,
1240
          "Create new client SSL certificate in %s." % cert_filename)
1241
        tokens.append((token_type,
1242
                       utils.GetCertificateDigest(
1243
                         cert_filename=cert_filename)))
1244
      elif action == constants.CRYPTO_ACTION_GET:
1245
        tokens.append((token_type,
1246
                       utils.GetCertificateDigest()))
1247
  return tokens
1248

    
1249

    
1250
def GetBlockDevSizes(devices):
1251
  """Return the size of the given block devices
1252

1253
  @type devices: list
1254
  @param devices: list of block device nodes to query
1255
  @rtype: dict
1256
  @return:
1257
    dictionary of all block devices under /dev (key). The value is their
1258
    size in MiB.
1259

1260
    {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
1261

1262
  """
1263
  DEV_PREFIX = "/dev/"
1264
  blockdevs = {}
1265

    
1266
  for devpath in devices:
1267
    if not utils.IsBelowDir(DEV_PREFIX, devpath):
1268
      continue
1269

    
1270
    try:
1271
      st = os.stat(devpath)
1272
    except EnvironmentError, err:
1273
      logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
1274
      continue
1275

    
1276
    if stat.S_ISBLK(st.st_mode):
1277
      result = utils.RunCmd(["blockdev", "--getsize64", devpath])
1278
      if result.failed:
1279
        # We don't want to fail, just do not list this device as available
1280
        logging.warning("Cannot get size for block device %s", devpath)
1281
        continue
1282

    
1283
      size = int(result.stdout) / (1024 * 1024)
1284
      blockdevs[devpath] = size
1285
  return blockdevs
1286

    
1287

    
1288
def GetVolumeList(vg_names):
1289
  """Compute list of logical volumes and their size.
1290

1291
  @type vg_names: list
1292
  @param vg_names: the volume groups whose LVs we should list, or
1293
      empty for all volume groups
1294
  @rtype: dict
1295
  @return:
1296
      dictionary of all partions (key) with value being a tuple of
1297
      their size (in MiB), inactive and online status::
1298

1299
        {'xenvg/test1': ('20.06', True, True)}
1300

1301
      in case of errors, a string is returned with the error
1302
      details.
1303

1304
  """
1305
  lvs = {}
1306
  sep = "|"
1307
  if not vg_names:
1308
    vg_names = []
1309
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
1310
                         "--separator=%s" % sep,
1311
                         "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
1312
  if result.failed:
1313
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
1314

    
1315
  for line in result.stdout.splitlines():
1316
    line = line.strip()
1317
    match = _LVSLINE_REGEX.match(line)
1318
    if not match:
1319
      logging.error("Invalid line returned from lvs output: '%s'", line)
1320
      continue
1321
    vg_name, name, size, attr = match.groups()
1322
    inactive = attr[4] == "-"
1323
    online = attr[5] == "o"
1324
    virtual = attr[0] == "v"
1325
    if virtual:
1326
      # we don't want to report such volumes as existing, since they
1327
      # don't really hold data
1328
      continue
1329
    lvs[vg_name + "/" + name] = (size, inactive, online)
1330

    
1331
  return lvs
1332

    
1333

    
1334
def ListVolumeGroups():
1335
  """List the volume groups and their size.
1336

1337
  @rtype: dict
1338
  @return: dictionary with keys volume name and values the
1339
      size of the volume
1340

1341
  """
1342
  return utils.ListVolumeGroups()
1343

    
1344

    
1345
def NodeVolumes():
1346
  """List all volumes on this node.
1347

1348
  @rtype: list
1349
  @return:
1350
    A list of dictionaries, each having four keys:
1351
      - name: the logical volume name,
1352
      - size: the size of the logical volume
1353
      - dev: the physical device on which the LV lives
1354
      - vg: the volume group to which it belongs
1355

1356
    In case of errors, we return an empty list and log the
1357
    error.
1358

1359
    Note that since a logical volume can live on multiple physical
1360
    volumes, the resulting list might include a logical volume
1361
    multiple times.
1362

1363
  """
1364
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
1365
                         "--separator=|",
1366
                         "--options=lv_name,lv_size,devices,vg_name"])
1367
  if result.failed:
1368
    _Fail("Failed to list logical volumes, lvs output: %s",
1369
          result.output)
1370

    
1371
  def parse_dev(dev):
1372
    return dev.split("(")[0]
1373

    
1374
  def handle_dev(dev):
1375
    return [parse_dev(x) for x in dev.split(",")]
1376

    
1377
  def map_line(line):
1378
    line = [v.strip() for v in line]
1379
    return [{"name": line[0], "size": line[1],
1380
             "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
1381

    
1382
  all_devs = []
1383
  for line in result.stdout.splitlines():
1384
    if line.count("|") >= 3:
1385
      all_devs.extend(map_line(line.split("|")))
1386
    else:
1387
      logging.warning("Strange line in the output from lvs: '%s'", line)
1388
  return all_devs
1389

    
1390

    
1391
def BridgesExist(bridges_list):
1392
  """Check if a list of bridges exist on the current node.
1393

1394
  @rtype: boolean
1395
  @return: C{True} if all of them exist, C{False} otherwise
1396

1397
  """
1398
  missing = []
1399
  for bridge in bridges_list:
1400
    if not utils.BridgeExists(bridge):
1401
      missing.append(bridge)
1402

    
1403
  if missing:
1404
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
1405

    
1406

    
1407
def GetInstanceListForHypervisor(hname, hvparams=None,
1408
                                 get_hv_fn=hypervisor.GetHypervisor):
1409
  """Provides a list of instances of the given hypervisor.
1410

1411
  @type hname: string
1412
  @param hname: name of the hypervisor
1413
  @type hvparams: dict of strings
1414
  @param hvparams: hypervisor parameters for the given hypervisor
1415
  @type get_hv_fn: function
1416
  @param get_hv_fn: function that returns a hypervisor for the given hypervisor
1417
    name; optional parameter to increase testability
1418

1419
  @rtype: list
1420
  @return: a list of all running instances on the current node
1421
    - instance1.example.com
1422
    - instance2.example.com
1423

1424
  """
1425
  results = []
1426
  try:
1427
    hv = get_hv_fn(hname)
1428
    names = hv.ListInstances(hvparams=hvparams)
1429
    results.extend(names)
1430
  except errors.HypervisorError, err:
1431
    _Fail("Error enumerating instances (hypervisor %s): %s",
1432
          hname, err, exc=True)
1433
  return results
1434

    
1435

    
1436
def GetInstanceList(hypervisor_list, all_hvparams=None,
1437
                    get_hv_fn=hypervisor.GetHypervisor):
1438
  """Provides a list of instances.
1439

1440
  @type hypervisor_list: list
1441
  @param hypervisor_list: the list of hypervisors to query information
1442
  @type all_hvparams: dict of dict of strings
1443
  @param all_hvparams: a dictionary mapping hypervisor types to respective
1444
    cluster-wide hypervisor parameters
1445
  @type get_hv_fn: function
1446
  @param get_hv_fn: function that returns a hypervisor for the given hypervisor
1447
    name; optional parameter to increase testability
1448

1449
  @rtype: list
1450
  @return: a list of all running instances on the current node
1451
    - instance1.example.com
1452
    - instance2.example.com
1453

1454
  """
1455
  results = []
1456
  for hname in hypervisor_list:
1457
    hvparams = all_hvparams[hname]
1458
    results.extend(GetInstanceListForHypervisor(hname, hvparams=hvparams,
1459
                                                get_hv_fn=get_hv_fn))
1460
  return results
1461

    
1462

    
1463
def GetInstanceInfo(instance, hname, hvparams=None):
1464
  """Gives back the information about an instance as a dictionary.
1465

1466
  @type instance: string
1467
  @param instance: the instance name
1468
  @type hname: string
1469
  @param hname: the hypervisor type of the instance
1470
  @type hvparams: dict of strings
1471
  @param hvparams: the instance's hvparams
1472

1473
  @rtype: dict
1474
  @return: dictionary with the following keys:
1475
      - memory: memory size of instance (int)
1476
      - state: state of instance (HvInstanceState)
1477
      - time: cpu time of instance (float)
1478
      - vcpus: the number of vcpus (int)
1479

1480
  """
1481
  output = {}
1482

    
1483
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance,
1484
                                                          hvparams=hvparams)
1485
  if iinfo is not None:
1486
    output["memory"] = iinfo[2]
1487
    output["vcpus"] = iinfo[3]
1488
    output["state"] = iinfo[4]
1489
    output["time"] = iinfo[5]
1490

    
1491
  return output
1492

    
1493

    
1494
def GetInstanceMigratable(instance):
1495
  """Computes whether an instance can be migrated.
1496

1497
  @type instance: L{objects.Instance}
1498
  @param instance: object representing the instance to be checked.
1499

1500
  @rtype: tuple
1501
  @return: tuple of (result, description) where:
1502
      - result: whether the instance can be migrated or not
1503
      - description: a description of the issue, if relevant
1504

1505
  """
1506
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1507
  iname = instance.name
1508
  if iname not in hyper.ListInstances(instance.hvparams):
1509
    _Fail("Instance %s is not running", iname)
1510

    
1511
  for idx in range(len(instance.disks)):
1512
    link_name = _GetBlockDevSymlinkPath(iname, idx)
1513
    if not os.path.islink(link_name):
1514
      logging.warning("Instance %s is missing symlink %s for disk %d",
1515
                      iname, link_name, idx)
1516

    
1517

    
1518
def GetAllInstancesInfo(hypervisor_list, all_hvparams):
1519
  """Gather data about all instances.
1520

1521
  This is the equivalent of L{GetInstanceInfo}, except that it
1522
  computes data for all instances at once, thus being faster if one
1523
  needs data about more than one instance.
1524

1525
  @type hypervisor_list: list
1526
  @param hypervisor_list: list of hypervisors to query for instance data
1527
  @type all_hvparams: dict of dict of strings
1528
  @param all_hvparams: mapping of hypervisor names to hvparams
1529

1530
  @rtype: dict
1531
  @return: dictionary of instance: data, with data having the following keys:
1532
      - memory: memory size of instance (int)
1533
      - state: xen state of instance (string)
1534
      - time: cpu time of instance (float)
1535
      - vcpus: the number of vcpus
1536

1537
  """
1538
  output = {}
1539
  for hname in hypervisor_list:
1540
    hvparams = all_hvparams[hname]
1541
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo(hvparams)
1542
    if iinfo:
1543
      for name, _, memory, vcpus, state, times in iinfo:
1544
        value = {
1545
          "memory": memory,
1546
          "vcpus": vcpus,
1547
          "state": state,
1548
          "time": times,
1549
          }
1550
        if name in output:
1551
          # we only check static parameters, like memory and vcpus,
1552
          # and not state and time which can change between the
1553
          # invocations of the different hypervisors
1554
          for key in "memory", "vcpus":
1555
            if value[key] != output[name][key]:
1556
              _Fail("Instance %s is running twice"
1557
                    " with different parameters", name)
1558
        output[name] = value
1559

    
1560
  return output
1561

    
1562

    
1563
def GetInstanceConsoleInfo(instance_param_dict,
1564
                           get_hv_fn=hypervisor.GetHypervisor):
1565
  """Gather data about the console access of a set of instances of this node.
1566

1567
  This function assumes that the caller already knows which instances are on
1568
  this node, by calling a function such as L{GetAllInstancesInfo} or
1569
  L{GetInstanceList}.
1570

1571
  For every instance, a large amount of configuration data needs to be
1572
  provided to the hypervisor interface in order to receive the console
1573
  information. Whether this could or should be cut down can be discussed.
1574
  The information is provided in a dictionary indexed by instance name,
1575
  allowing any number of instance queries to be done.
1576

1577
  @type instance_param_dict: dict of string to tuple of dictionaries, where the
1578
    dictionaries represent: L{objects.Instance}, L{objects.Node},
1579
    L{objects.NodeGroup}, HvParams, BeParams
1580
  @param instance_param_dict: mapping of instance name to parameters necessary
1581
    for console information retrieval
1582

1583
  @rtype: dict
1584
  @return: dictionary of instance: data, with data having the following keys:
1585
      - instance: instance name
1586
      - kind: console kind
1587
      - message: used with kind == CONS_MESSAGE, indicates console to be
1588
                 unavailable, supplies error message
1589
      - host: host to connect to
1590
      - port: port to use
1591
      - user: user for login
1592
      - command: the command, broken into parts as an array
1593
      - display: unknown, potentially unused?
1594

1595
  """
1596

    
1597
  output = {}
1598
  for inst_name in instance_param_dict:
1599
    instance = instance_param_dict[inst_name]["instance"]
1600
    pnode = instance_param_dict[inst_name]["node"]
1601
    group = instance_param_dict[inst_name]["group"]
1602
    hvparams = instance_param_dict[inst_name]["hvParams"]
1603
    beparams = instance_param_dict[inst_name]["beParams"]
1604

    
1605
    instance = objects.Instance.FromDict(instance)
1606
    pnode = objects.Node.FromDict(pnode)
1607
    group = objects.NodeGroup.FromDict(group)
1608

    
1609
    h = get_hv_fn(instance.hypervisor)
1610
    output[inst_name] = h.GetInstanceConsole(instance, pnode, group,
1611
                                             hvparams, beparams).ToDict()
1612

    
1613
  return output
1614

    
1615

    
1616
def _InstanceLogName(kind, os_name, instance, component):
1617
  """Compute the OS log filename for a given instance and operation.
1618

1619
  The instance name and os name are passed in as strings since not all
1620
  operations have these as part of an instance object.
1621

1622
  @type kind: string
1623
  @param kind: the operation type (e.g. add, import, etc.)
1624
  @type os_name: string
1625
  @param os_name: the os name
1626
  @type instance: string
1627
  @param instance: the name of the instance being imported/added/etc.
1628
  @type component: string or None
1629
  @param component: the name of the component of the instance being
1630
      transferred
1631

1632
  """
1633
  # TODO: Use tempfile.mkstemp to create unique filename
1634
  if component:
1635
    assert "/" not in component
1636
    c_msg = "-%s" % component
1637
  else:
1638
    c_msg = ""
1639
  base = ("%s-%s-%s%s-%s.log" %
1640
          (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1641
  return utils.PathJoin(pathutils.LOG_OS_DIR, base)
1642

    
1643

    
1644
def InstanceOsAdd(instance, reinstall, debug):
1645
  """Add an OS to an instance.
1646

1647
  @type instance: L{objects.Instance}
1648
  @param instance: Instance whose OS is to be installed
1649
  @type reinstall: boolean
1650
  @param reinstall: whether this is an instance reinstall
1651
  @type debug: integer
1652
  @param debug: debug level, passed to the OS scripts
1653
  @rtype: None
1654

1655
  """
1656
  inst_os = OSFromDisk(instance.os)
1657

    
1658
  create_env = OSEnvironment(instance, inst_os, debug)
1659
  if reinstall:
1660
    create_env["INSTANCE_REINSTALL"] = "1"
1661

    
1662
  logfile = _InstanceLogName("add", instance.os, instance.name, None)
1663

    
1664
  result = utils.RunCmd([inst_os.create_script], env=create_env,
1665
                        cwd=inst_os.path, output=logfile, reset_env=True)
1666
  if result.failed:
1667
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
1668
                  " output: %s", result.cmd, result.fail_reason, logfile,
1669
                  result.output)
1670
    lines = [utils.SafeEncode(val)
1671
             for val in utils.TailFile(logfile, lines=20)]
1672
    _Fail("OS create script failed (%s), last lines in the"
1673
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1674

    
1675

    
1676
def RunRenameInstance(instance, old_name, debug):
1677
  """Run the OS rename script for an instance.
1678

1679
  @type instance: L{objects.Instance}
1680
  @param instance: Instance whose OS is to be installed
1681
  @type old_name: string
1682
  @param old_name: previous instance name
1683
  @type debug: integer
1684
  @param debug: debug level, passed to the OS scripts
1685
  @rtype: boolean
1686
  @return: the success of the operation
1687

1688
  """
1689
  inst_os = OSFromDisk(instance.os)
1690

    
1691
  rename_env = OSEnvironment(instance, inst_os, debug)
1692
  rename_env["OLD_INSTANCE_NAME"] = old_name
1693

    
1694
  logfile = _InstanceLogName("rename", instance.os,
1695
                             "%s-%s" % (old_name, instance.name), None)
1696

    
1697
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1698
                        cwd=inst_os.path, output=logfile, reset_env=True)
1699

    
1700
  if result.failed:
1701
    logging.error("os create command '%s' returned error: %s output: %s",
1702
                  result.cmd, result.fail_reason, result.output)
1703
    lines = [utils.SafeEncode(val)
1704
             for val in utils.TailFile(logfile, lines=20)]
1705
    _Fail("OS rename script failed (%s), last lines in the"
1706
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1707

    
1708

    
1709
def _GetBlockDevSymlinkPath(instance_name, idx, _dir=None):
1710
  """Returns symlink path for block device.
1711

1712
  """
1713
  if _dir is None:
1714
    _dir = pathutils.DISK_LINKS_DIR
1715

    
1716
  return utils.PathJoin(_dir,
1717
                        ("%s%s%s" %
1718
                         (instance_name, constants.DISK_SEPARATOR, idx)))
1719

    
1720

    
1721
def _SymlinkBlockDev(instance_name, device_path, idx):
1722
  """Set up symlinks to a instance's block device.
1723

1724
  This is an auxiliary function run when an instance is start (on the primary
1725
  node) or when an instance is migrated (on the target node).
1726

1727

1728
  @param instance_name: the name of the target instance
1729
  @param device_path: path of the physical block device, on the node
1730
  @param idx: the disk index
1731
  @return: absolute path to the disk's symlink
1732

1733
  """
1734
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1735
  try:
1736
    os.symlink(device_path, link_name)
1737
  except OSError, err:
1738
    if err.errno == errno.EEXIST:
1739
      if (not os.path.islink(link_name) or
1740
          os.readlink(link_name) != device_path):
1741
        os.remove(link_name)
1742
        os.symlink(device_path, link_name)
1743
    else:
1744
      raise
1745

    
1746
  return link_name
1747

    
1748

    
1749
def _RemoveBlockDevLinks(instance_name, disks):
1750
  """Remove the block device symlinks belonging to the given instance.
1751

1752
  """
1753
  for idx, _ in enumerate(disks):
1754
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1755
    if os.path.islink(link_name):
1756
      try:
1757
        os.remove(link_name)
1758
      except OSError:
1759
        logging.exception("Can't remove symlink '%s'", link_name)
1760

    
1761

    
1762
def _CalculateDeviceURI(instance, disk, device):
1763
  """Get the URI for the device.
1764

1765
  @type instance: L{objects.Instance}
1766
  @param instance: the instance which disk belongs to
1767
  @type disk: L{objects.Disk}
1768
  @param disk: the target disk object
1769
  @type device: L{bdev.BlockDev}
1770
  @param device: the corresponding BlockDevice
1771
  @rtype: string
1772
  @return: the device uri if any else None
1773

1774
  """
1775
  access_mode = disk.params.get(constants.LDP_ACCESS,
1776
                                constants.DISK_KERNELSPACE)
1777
  if access_mode == constants.DISK_USERSPACE:
1778
    # This can raise errors.BlockDeviceError
1779
    return device.GetUserspaceAccessUri(instance.hypervisor)
1780
  else:
1781
    return None
1782

    
1783

    
1784
def _GatherAndLinkBlockDevs(instance):
1785
  """Set up an instance's block device(s).
1786

1787
  This is run on the primary node at instance startup. The block
1788
  devices must be already assembled.
1789

1790
  @type instance: L{objects.Instance}
1791
  @param instance: the instance whose disks we should assemble
1792
  @rtype: list
1793
  @return: list of (disk_object, link_name, drive_uri)
1794

1795
  """
1796
  block_devices = []
1797
  for idx, disk in enumerate(instance.disks):
1798
    device = _RecursiveFindBD(disk)
1799
    if device is None:
1800
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1801
                                    str(disk))
1802
    device.Open()
1803
    try:
1804
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1805
    except OSError, e:
1806
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1807
                                    e.strerror)
1808
    uri = _CalculateDeviceURI(instance, disk, device)
1809

    
1810
    block_devices.append((disk, link_name, uri))
1811

    
1812
  return block_devices
1813

    
1814

    
1815
def StartInstance(instance, startup_paused, reason, store_reason=True):
1816
  """Start an instance.
1817

1818
  @type instance: L{objects.Instance}
1819
  @param instance: the instance object
1820
  @type startup_paused: bool
1821
  @param instance: pause instance at startup?
1822
  @type reason: list of reasons
1823
  @param reason: the reason trail for this startup
1824
  @type store_reason: boolean
1825
  @param store_reason: whether to store the shutdown reason trail on file
1826
  @rtype: None
1827

1828
  """
1829
  running_instances = GetInstanceListForHypervisor(instance.hypervisor,
1830
                                                   instance.hvparams)
1831

    
1832
  if instance.name in running_instances:
1833
    logging.info("Instance %s already running, not starting", instance.name)
1834
    return
1835

    
1836
  try:
1837
    block_devices = _GatherAndLinkBlockDevs(instance)
1838
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1839
    hyper.StartInstance(instance, block_devices, startup_paused)
1840
    if store_reason:
1841
      _StoreInstReasonTrail(instance.name, reason)
1842
  except errors.BlockDeviceError, err:
1843
    _Fail("Block device error: %s", err, exc=True)
1844
  except errors.HypervisorError, err:
1845
    _RemoveBlockDevLinks(instance.name, instance.disks)
1846
    _Fail("Hypervisor error: %s", err, exc=True)
1847

    
1848

    
1849
def InstanceShutdown(instance, timeout, reason, store_reason=True):
1850
  """Shut an instance down.
1851

1852
  @note: this functions uses polling with a hardcoded timeout.
1853

1854
  @type instance: L{objects.Instance}
1855
  @param instance: the instance object
1856
  @type timeout: integer
1857
  @param timeout: maximum timeout for soft shutdown
1858
  @type reason: list of reasons
1859
  @param reason: the reason trail for this shutdown
1860
  @type store_reason: boolean
1861
  @param store_reason: whether to store the shutdown reason trail on file
1862
  @rtype: None
1863

1864
  """
1865
  hv_name = instance.hypervisor
1866
  hyper = hypervisor.GetHypervisor(hv_name)
1867
  iname = instance.name
1868

    
1869
  if instance.name not in hyper.ListInstances(instance.hvparams):
1870
    logging.info("Instance %s not running, doing nothing", iname)
1871
    return
1872

    
1873
  class _TryShutdown:
1874
    def __init__(self):
1875
      self.tried_once = False
1876

    
1877
    def __call__(self):
1878
      if iname not in hyper.ListInstances(instance.hvparams):
1879
        return
1880

    
1881
      try:
1882
        hyper.StopInstance(instance, retry=self.tried_once)
1883
        if store_reason:
1884
          _StoreInstReasonTrail(instance.name, reason)
1885
      except errors.HypervisorError, err:
1886
        if iname not in hyper.ListInstances(instance.hvparams):
1887
          # if the instance is no longer existing, consider this a
1888
          # success and go to cleanup
1889
          return
1890

    
1891
        _Fail("Failed to stop instance %s: %s", iname, err)
1892

    
1893
      self.tried_once = True
1894

    
1895
      raise utils.RetryAgain()
1896

    
1897
  try:
1898
    utils.Retry(_TryShutdown(), 5, timeout)
1899
  except utils.RetryTimeout:
1900
    # the shutdown did not succeed
1901
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1902

    
1903
    try:
1904
      hyper.StopInstance(instance, force=True)
1905
    except errors.HypervisorError, err:
1906
      if iname in hyper.ListInstances(instance.hvparams):
1907
        # only raise an error if the instance still exists, otherwise
1908
        # the error could simply be "instance ... unknown"!
1909
        _Fail("Failed to force stop instance %s: %s", iname, err)
1910

    
1911
    time.sleep(1)
1912

    
1913
    if iname in hyper.ListInstances(instance.hvparams):
1914
      _Fail("Could not shutdown instance %s even by destroy", iname)
1915

    
1916
  try:
1917
    hyper.CleanupInstance(instance.name)
1918
  except errors.HypervisorError, err:
1919
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1920

    
1921
  _RemoveBlockDevLinks(iname, instance.disks)
1922

    
1923

    
1924
def InstanceReboot(instance, reboot_type, shutdown_timeout, reason):
1925
  """Reboot an instance.
1926

1927
  @type instance: L{objects.Instance}
1928
  @param instance: the instance object to reboot
1929
  @type reboot_type: str
1930
  @param reboot_type: the type of reboot, one the following
1931
    constants:
1932
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1933
        instance OS, do not recreate the VM
1934
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1935
        restart the VM (at the hypervisor level)
1936
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1937
        not accepted here, since that mode is handled differently, in
1938
        cmdlib, and translates into full stop and start of the
1939
        instance (instead of a call_instance_reboot RPC)
1940
  @type shutdown_timeout: integer
1941
  @param shutdown_timeout: maximum timeout for soft shutdown
1942
  @type reason: list of reasons
1943
  @param reason: the reason trail for this reboot
1944
  @rtype: None
1945

1946
  """
1947
  running_instances = GetInstanceListForHypervisor(instance.hypervisor,
1948
                                                   instance.hvparams)
1949

    
1950
  if instance.name not in running_instances:
1951
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1952

    
1953
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1954
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1955
    try:
1956
      hyper.RebootInstance(instance)
1957
    except errors.HypervisorError, err:
1958
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1959
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1960
    try:
1961
      InstanceShutdown(instance, shutdown_timeout, reason, store_reason=False)
1962
      result = StartInstance(instance, False, reason, store_reason=False)
1963
      _StoreInstReasonTrail(instance.name, reason)
1964
      return result
1965
    except errors.HypervisorError, err:
1966
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1967
  else:
1968
    _Fail("Invalid reboot_type received: %s", reboot_type)
1969

    
1970

    
1971
def InstanceBalloonMemory(instance, memory):
1972
  """Resize an instance's memory.
1973

1974
  @type instance: L{objects.Instance}
1975
  @param instance: the instance object
1976
  @type memory: int
1977
  @param memory: new memory amount in MB
1978
  @rtype: None
1979

1980
  """
1981
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1982
  running = hyper.ListInstances(instance.hvparams)
1983
  if instance.name not in running:
1984
    logging.info("Instance %s is not running, cannot balloon", instance.name)
1985
    return
1986
  try:
1987
    hyper.BalloonInstanceMemory(instance, memory)
1988
  except errors.HypervisorError, err:
1989
    _Fail("Failed to balloon instance memory: %s", err, exc=True)
1990

    
1991

    
1992
def MigrationInfo(instance):
1993
  """Gather information about an instance to be migrated.
1994

1995
  @type instance: L{objects.Instance}
1996
  @param instance: the instance definition
1997

1998
  """
1999
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2000
  try:
2001
    info = hyper.MigrationInfo(instance)
2002
  except errors.HypervisorError, err:
2003
    _Fail("Failed to fetch migration information: %s", err, exc=True)
2004
  return info
2005

    
2006

    
2007
def AcceptInstance(instance, info, target):
2008
  """Prepare the node to accept an instance.
2009

2010
  @type instance: L{objects.Instance}
2011
  @param instance: the instance definition
2012
  @type info: string/data (opaque)
2013
  @param info: migration information, from the source node
2014
  @type target: string
2015
  @param target: target host (usually ip), on this node
2016

2017
  """
2018
  # TODO: why is this required only for DTS_EXT_MIRROR?
2019
  if instance.disk_template in constants.DTS_EXT_MIRROR:
2020
    # Create the symlinks, as the disks are not active
2021
    # in any way
2022
    try:
2023
      _GatherAndLinkBlockDevs(instance)
2024
    except errors.BlockDeviceError, err:
2025
      _Fail("Block device error: %s", err, exc=True)
2026

    
2027
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2028
  try:
2029
    hyper.AcceptInstance(instance, info, target)
2030
  except errors.HypervisorError, err:
2031
    if instance.disk_template in constants.DTS_EXT_MIRROR:
2032
      _RemoveBlockDevLinks(instance.name, instance.disks)
2033
    _Fail("Failed to accept instance: %s", err, exc=True)
2034

    
2035

    
2036
def FinalizeMigrationDst(instance, info, success):
2037
  """Finalize any preparation to accept an instance.
2038

2039
  @type instance: L{objects.Instance}
2040
  @param instance: the instance definition
2041
  @type info: string/data (opaque)
2042
  @param info: migration information, from the source node
2043
  @type success: boolean
2044
  @param success: whether the migration was a success or a failure
2045

2046
  """
2047
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2048
  try:
2049
    hyper.FinalizeMigrationDst(instance, info, success)
2050
  except errors.HypervisorError, err:
2051
    _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
2052

    
2053

    
2054
def MigrateInstance(cluster_name, instance, target, live):
2055
  """Migrates an instance to another node.
2056

2057
  @type cluster_name: string
2058
  @param cluster_name: name of the cluster
2059
  @type instance: L{objects.Instance}
2060
  @param instance: the instance definition
2061
  @type target: string
2062
  @param target: the target node name
2063
  @type live: boolean
2064
  @param live: whether the migration should be done live or not (the
2065
      interpretation of this parameter is left to the hypervisor)
2066
  @raise RPCFail: if migration fails for some reason
2067

2068
  """
2069
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2070

    
2071
  try:
2072
    hyper.MigrateInstance(cluster_name, instance, target, live)
2073
  except errors.HypervisorError, err:
2074
    _Fail("Failed to migrate instance: %s", err, exc=True)
2075

    
2076

    
2077
def FinalizeMigrationSource(instance, success, live):
2078
  """Finalize the instance migration on the source node.
2079

2080
  @type instance: L{objects.Instance}
2081
  @param instance: the instance definition of the migrated instance
2082
  @type success: bool
2083
  @param success: whether the migration succeeded or not
2084
  @type live: bool
2085
  @param live: whether the user requested a live migration or not
2086
  @raise RPCFail: If the execution fails for some reason
2087

2088
  """
2089
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2090

    
2091
  try:
2092
    hyper.FinalizeMigrationSource(instance, success, live)
2093
  except Exception, err:  # pylint: disable=W0703
2094
    _Fail("Failed to finalize the migration on the source node: %s", err,
2095
          exc=True)
2096

    
2097

    
2098
def GetMigrationStatus(instance):
2099
  """Get the migration status
2100

2101
  @type instance: L{objects.Instance}
2102
  @param instance: the instance that is being migrated
2103
  @rtype: L{objects.MigrationStatus}
2104
  @return: the status of the current migration (one of
2105
           L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
2106
           progress info that can be retrieved from the hypervisor
2107
  @raise RPCFail: If the migration status cannot be retrieved
2108

2109
  """
2110
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2111
  try:
2112
    return hyper.GetMigrationStatus(instance)
2113
  except Exception, err:  # pylint: disable=W0703
2114
    _Fail("Failed to get migration status: %s", err, exc=True)
2115

    
2116

    
2117
def HotplugDevice(instance, action, dev_type, device, extra, seq):
2118
  """Hotplug a device
2119

2120
  Hotplug is currently supported only for KVM Hypervisor.
2121
  @type instance: L{objects.Instance}
2122
  @param instance: the instance to which we hotplug a device
2123
  @type action: string
2124
  @param action: the hotplug action to perform
2125
  @type dev_type: string
2126
  @param dev_type: the device type to hotplug
2127
  @type device: either L{objects.NIC} or L{objects.Disk}
2128
  @param device: the device object to hotplug
2129
  @type extra: string
2130
  @param extra: extra info used by hotplug code (e.g. disk link)
2131
  @type seq: int
2132
  @param seq: the index of the device from master perspective
2133
  @raise RPCFail: in case instance does not have KVM hypervisor
2134

2135
  """
2136
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2137
  try:
2138
    hyper.VerifyHotplugSupport(instance, action, dev_type)
2139
  except errors.HotplugError, err:
2140
    _Fail("Hotplug is not supported: %s", err)
2141

    
2142
  if action == constants.HOTPLUG_ACTION_ADD:
2143
    fn = hyper.HotAddDevice
2144
  elif action == constants.HOTPLUG_ACTION_REMOVE:
2145
    fn = hyper.HotDelDevice
2146
  elif action == constants.HOTPLUG_ACTION_MODIFY:
2147
    fn = hyper.HotModDevice
2148
  else:
2149
    assert action in constants.HOTPLUG_ALL_ACTIONS
2150

    
2151
  return fn(instance, dev_type, device, extra, seq)
2152

    
2153

    
2154
def HotplugSupported(instance):
2155
  """Checks if hotplug is generally supported.
2156

2157
  """
2158
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2159
  try:
2160
    hyper.HotplugSupported(instance)
2161
  except errors.HotplugError, err:
2162
    _Fail("Hotplug is not supported: %s", err)
2163

    
2164

    
2165
def BlockdevCreate(disk, size, owner, on_primary, info, excl_stor):
2166
  """Creates a block device for an instance.
2167

2168
  @type disk: L{objects.Disk}
2169
  @param disk: the object describing the disk we should create
2170
  @type size: int
2171
  @param size: the size of the physical underlying device, in MiB
2172
  @type owner: str
2173
  @param owner: the name of the instance for which disk is created,
2174
      used for device cache data
2175
  @type on_primary: boolean
2176
  @param on_primary:  indicates if it is the primary node or not
2177
  @type info: string
2178
  @param info: string that will be sent to the physical device
2179
      creation, used for example to set (LVM) tags on LVs
2180
  @type excl_stor: boolean
2181
  @param excl_stor: Whether exclusive_storage is active
2182

2183
  @return: the new unique_id of the device (this can sometime be
2184
      computed only after creation), or None. On secondary nodes,
2185
      it's not required to return anything.
2186

2187
  """
2188
  # TODO: remove the obsolete "size" argument
2189
  # pylint: disable=W0613
2190
  clist = []
2191
  if disk.children:
2192
    for child in disk.children:
2193
      try:
2194
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
2195
      except errors.BlockDeviceError, err:
2196
        _Fail("Can't assemble device %s: %s", child, err)
2197
      if on_primary or disk.AssembleOnSecondary():
2198
        # we need the children open in case the device itself has to
2199
        # be assembled
2200
        try:
2201
          # pylint: disable=E1103
2202
          crdev.Open()
2203
        except errors.BlockDeviceError, err:
2204
          _Fail("Can't make child '%s' read-write: %s", child, err)
2205
      clist.append(crdev)
2206

    
2207
  try:
2208
    device = bdev.Create(disk, clist, excl_stor)
2209
  except errors.BlockDeviceError, err:
2210
    _Fail("Can't create block device: %s", err)
2211

    
2212
  if on_primary or disk.AssembleOnSecondary():
2213
    try:
2214
      device.Assemble()
2215
    except errors.BlockDeviceError, err:
2216
      _Fail("Can't assemble device after creation, unusual event: %s", err)
2217
    if on_primary or disk.OpenOnSecondary():
2218
      try:
2219
        device.Open(force=True)
2220
      except errors.BlockDeviceError, err:
2221
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
2222
    DevCacheManager.UpdateCache(device.dev_path, owner,
2223
                                on_primary, disk.iv_name)
2224

    
2225
  device.SetInfo(info)
2226

    
2227
  return device.unique_id
2228

    
2229

    
2230
def _WipeDevice(path, offset, size):
2231
  """This function actually wipes the device.
2232

2233
  @param path: The path to the device to wipe
2234
  @param offset: The offset in MiB in the file
2235
  @param size: The size in MiB to write
2236

2237
  """
2238
  # Internal sizes are always in Mebibytes; if the following "dd" command
2239
  # should use a different block size the offset and size given to this
2240
  # function must be adjusted accordingly before being passed to "dd".
2241
  block_size = 1024 * 1024
2242

    
2243
  cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
2244
         "bs=%s" % block_size, "oflag=direct", "of=%s" % path,
2245
         "count=%d" % size]
2246
  result = utils.RunCmd(cmd)
2247

    
2248
  if result.failed:
2249
    _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
2250
          result.fail_reason, result.output)
2251

    
2252

    
2253
def BlockdevWipe(disk, offset, size):
2254
  """Wipes a block device.
2255

2256
  @type disk: L{objects.Disk}
2257
  @param disk: the disk object we want to wipe
2258
  @type offset: int
2259
  @param offset: The offset in MiB in the file
2260
  @type size: int
2261
  @param size: The size in MiB to write
2262

2263
  """
2264
  try:
2265
    rdev = _RecursiveFindBD(disk)
2266
  except errors.BlockDeviceError:
2267
    rdev = None
2268

    
2269
  if not rdev:
2270
    _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
2271

    
2272
  # Do cross verify some of the parameters
2273
  if offset < 0:
2274
    _Fail("Negative offset")
2275
  if size < 0:
2276
    _Fail("Negative size")
2277
  if offset > rdev.size:
2278
    _Fail("Offset is bigger than device size")
2279
  if (offset + size) > rdev.size:
2280
    _Fail("The provided offset and size to wipe is bigger than device size")
2281

    
2282
  _WipeDevice(rdev.dev_path, offset, size)
2283

    
2284

    
2285
def BlockdevPauseResumeSync(disks, pause):
2286
  """Pause or resume the sync of the block device.
2287

2288
  @type disks: list of L{objects.Disk}
2289
  @param disks: the disks object we want to pause/resume
2290
  @type pause: bool
2291
  @param pause: Wheater to pause or resume
2292

2293
  """
2294
  success = []
2295
  for disk in disks:
2296
    try:
2297
      rdev = _RecursiveFindBD(disk)
2298
    except errors.BlockDeviceError:
2299
      rdev = None
2300

    
2301
    if not rdev:
2302
      success.append((False, ("Cannot change sync for device %s:"
2303
                              " device not found" % disk.iv_name)))
2304
      continue
2305

    
2306
    result = rdev.PauseResumeSync(pause)
2307

    
2308
    if result:
2309
      success.append((result, None))
2310
    else:
2311
      if pause:
2312
        msg = "Pause"
2313
      else:
2314
        msg = "Resume"
2315
      success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
2316

    
2317
  return success
2318

    
2319

    
2320
def BlockdevRemove(disk):
2321
  """Remove a block device.
2322

2323
  @note: This is intended to be called recursively.
2324

2325
  @type disk: L{objects.Disk}
2326
  @param disk: the disk object we should remove
2327
  @rtype: boolean
2328
  @return: the success of the operation
2329

2330
  """
2331
  msgs = []
2332
  try:
2333
    rdev = _RecursiveFindBD(disk)
2334
  except errors.BlockDeviceError, err:
2335
    # probably can't attach
2336
    logging.info("Can't attach to device %s in remove", disk)
2337
    rdev = None
2338
  if rdev is not None:
2339
    r_path = rdev.dev_path
2340

    
2341
    def _TryRemove():
2342
      try:
2343
        rdev.Remove()
2344
        return []
2345
      except errors.BlockDeviceError, err:
2346
        return [str(err)]
2347

    
2348
    msgs.extend(utils.SimpleRetry([], _TryRemove,
2349
                                  constants.DISK_REMOVE_RETRY_INTERVAL,
2350
                                  constants.DISK_REMOVE_RETRY_TIMEOUT))
2351

    
2352
    if not msgs:
2353
      DevCacheManager.RemoveCache(r_path)
2354

    
2355
  if disk.children:
2356
    for child in disk.children:
2357
      try:
2358
        BlockdevRemove(child)
2359
      except RPCFail, err:
2360
        msgs.append(str(err))
2361

    
2362
  if msgs:
2363
    _Fail("; ".join(msgs))
2364

    
2365

    
2366
def _RecursiveAssembleBD(disk, owner, as_primary):
2367
  """Activate a block device for an instance.
2368

2369
  This is run on the primary and secondary nodes for an instance.
2370

2371
  @note: this function is called recursively.
2372

2373
  @type disk: L{objects.Disk}
2374
  @param disk: the disk we try to assemble
2375
  @type owner: str
2376
  @param owner: the name of the instance which owns the disk
2377
  @type as_primary: boolean
2378
  @param as_primary: if we should make the block device
2379
      read/write
2380

2381
  @return: the assembled device or None (in case no device
2382
      was assembled)
2383
  @raise errors.BlockDeviceError: in case there is an error
2384
      during the activation of the children or the device
2385
      itself
2386

2387
  """
2388
  children = []
2389
  if disk.children:
2390
    mcn = disk.ChildrenNeeded()
2391
    if mcn == -1:
2392
      mcn = 0 # max number of Nones allowed
2393
    else:
2394
      mcn = len(disk.children) - mcn # max number of Nones
2395
    for chld_disk in disk.children:
2396
      try:
2397
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
2398
      except errors.BlockDeviceError, err:
2399
        if children.count(None) >= mcn:
2400
          raise
2401
        cdev = None
2402
        logging.error("Error in child activation (but continuing): %s",
2403
                      str(err))
2404
      children.append(cdev)
2405

    
2406
  if as_primary or disk.AssembleOnSecondary():
2407
    r_dev = bdev.Assemble(disk, children)
2408
    result = r_dev
2409
    if as_primary or disk.OpenOnSecondary():
2410
      r_dev.Open()
2411
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
2412
                                as_primary, disk.iv_name)
2413

    
2414
  else:
2415
    result = True
2416
  return result
2417

    
2418

    
2419
def BlockdevAssemble(disk, owner, as_primary, idx):
2420
  """Activate a block device for an instance.
2421

2422
  This is a wrapper over _RecursiveAssembleBD.
2423

2424
  @rtype: str or boolean
2425
  @return: a tuple with the C{/dev/...} path and the created symlink
2426
      for primary nodes, and (C{True}, C{True}) for secondary nodes
2427

2428
  """
2429
  try:
2430
    result = _RecursiveAssembleBD(disk, owner, as_primary)
2431
    if isinstance(result, BlockDev):
2432
      # pylint: disable=E1103
2433
      dev_path = result.dev_path
2434
      link_name = None
2435
      if as_primary:
2436
        link_name = _SymlinkBlockDev(owner, dev_path, idx)
2437
    elif result:
2438
      return result, result
2439
    else:
2440
      _Fail("Unexpected result from _RecursiveAssembleBD")
2441
  except errors.BlockDeviceError, err:
2442
    _Fail("Error while assembling disk: %s", err, exc=True)
2443
  except OSError, err:
2444
    _Fail("Error while symlinking disk: %s", err, exc=True)
2445

    
2446
  return dev_path, link_name
2447

    
2448

    
2449
def BlockdevShutdown(disk):
2450
  """Shut down a block device.
2451

2452
  First, if the device is assembled (Attach() is successful), then
2453
  the device is shutdown. Then the children of the device are
2454
  shutdown.
2455

2456
  This function is called recursively. Note that we don't cache the
2457
  children or such, as oppossed to assemble, shutdown of different
2458
  devices doesn't require that the upper device was active.
2459

2460
  @type disk: L{objects.Disk}
2461
  @param disk: the description of the disk we should
2462
      shutdown
2463
  @rtype: None
2464

2465
  """
2466
  msgs = []
2467
  r_dev = _RecursiveFindBD(disk)
2468
  if r_dev is not None:
2469
    r_path = r_dev.dev_path
2470
    try:
2471
      r_dev.Shutdown()
2472
      DevCacheManager.RemoveCache(r_path)
2473
    except errors.BlockDeviceError, err:
2474
      msgs.append(str(err))
2475

    
2476
  if disk.children:
2477
    for child in disk.children:
2478
      try:
2479
        BlockdevShutdown(child)
2480
      except RPCFail, err:
2481
        msgs.append(str(err))
2482

    
2483
  if msgs:
2484
    _Fail("; ".join(msgs))
2485

    
2486

    
2487
def BlockdevAddchildren(parent_cdev, new_cdevs):
2488
  """Extend a mirrored block device.
2489

2490
  @type parent_cdev: L{objects.Disk}
2491
  @param parent_cdev: the disk to which we should add children
2492
  @type new_cdevs: list of L{objects.Disk}
2493
  @param new_cdevs: the list of children which we should add
2494
  @rtype: None
2495

2496
  """
2497
  parent_bdev = _RecursiveFindBD(parent_cdev)
2498
  if parent_bdev is None:
2499
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
2500
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
2501
  if new_bdevs.count(None) > 0:
2502
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
2503
  parent_bdev.AddChildren(new_bdevs)
2504

    
2505

    
2506
def BlockdevRemovechildren(parent_cdev, new_cdevs):
2507
  """Shrink a mirrored block device.
2508

2509
  @type parent_cdev: L{objects.Disk}
2510
  @param parent_cdev: the disk from which we should remove children
2511
  @type new_cdevs: list of L{objects.Disk}
2512
  @param new_cdevs: the list of children which we should remove
2513
  @rtype: None
2514

2515
  """
2516
  parent_bdev = _RecursiveFindBD(parent_cdev)
2517
  if parent_bdev is None:
2518
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
2519
  devs = []
2520
  for disk in new_cdevs:
2521
    rpath = disk.StaticDevPath()
2522
    if rpath is None:
2523
      bd = _RecursiveFindBD(disk)
2524
      if bd is None:
2525
        _Fail("Can't find device %s while removing children", disk)
2526
      else:
2527
        devs.append(bd.dev_path)
2528
    else:
2529
      if not utils.IsNormAbsPath(rpath):
2530
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
2531
      devs.append(rpath)
2532
  parent_bdev.RemoveChildren(devs)
2533

    
2534

    
2535
def BlockdevGetmirrorstatus(disks):
2536
  """Get the mirroring status of a list of devices.
2537

2538
  @type disks: list of L{objects.Disk}
2539
  @param disks: the list of disks which we should query
2540
  @rtype: disk
2541
  @return: List of L{objects.BlockDevStatus}, one for each disk
2542
  @raise errors.BlockDeviceError: if any of the disks cannot be
2543
      found
2544

2545
  """
2546
  stats = []
2547
  for dsk in disks:
2548
    rbd = _RecursiveFindBD(dsk)
2549
    if rbd is None:
2550
      _Fail("Can't find device %s", dsk)
2551

    
2552
    stats.append(rbd.CombinedSyncStatus())
2553

    
2554
  return stats
2555

    
2556

    
2557
def BlockdevGetmirrorstatusMulti(disks):
2558
  """Get the mirroring status of a list of devices.
2559

2560
  @type disks: list of L{objects.Disk}
2561
  @param disks: the list of disks which we should query
2562
  @rtype: disk
2563
  @return: List of tuples, (bool, status), one for each disk; bool denotes
2564
    success/failure, status is L{objects.BlockDevStatus} on success, string
2565
    otherwise
2566

2567
  """
2568
  result = []
2569
  for disk in disks:
2570
    try:
2571
      rbd = _RecursiveFindBD(disk)
2572
      if rbd is None:
2573
        result.append((False, "Can't find device %s" % disk))
2574
        continue
2575

    
2576
      status = rbd.CombinedSyncStatus()
2577
    except errors.BlockDeviceError, err:
2578
      logging.exception("Error while getting disk status")
2579
      result.append((False, str(err)))
2580
    else:
2581
      result.append((True, status))
2582

    
2583
  assert len(disks) == len(result)
2584

    
2585
  return result
2586

    
2587

    
2588
def _RecursiveFindBD(disk):
2589
  """Check if a device is activated.
2590

2591
  If so, return information about the real device.
2592

2593
  @type disk: L{objects.Disk}
2594
  @param disk: the disk object we need to find
2595

2596
  @return: None if the device can't be found,
2597
      otherwise the device instance
2598

2599
  """
2600
  children = []
2601
  if disk.children:
2602
    for chdisk in disk.children:
2603
      children.append(_RecursiveFindBD(chdisk))
2604

    
2605
  return bdev.FindDevice(disk, children)
2606

    
2607

    
2608
def _OpenRealBD(disk):
2609
  """Opens the underlying block device of a disk.
2610

2611
  @type disk: L{objects.Disk}
2612
  @param disk: the disk object we want to open
2613

2614
  """
2615
  real_disk = _RecursiveFindBD(disk)
2616
  if real_disk is None:
2617
    _Fail("Block device '%s' is not set up", disk)
2618

    
2619
  real_disk.Open()
2620

    
2621
  return real_disk
2622

    
2623

    
2624
def BlockdevFind(disk):
2625
  """Check if a device is activated.
2626

2627
  If it is, return information about the real device.
2628

2629
  @type disk: L{objects.Disk}
2630
  @param disk: the disk to find
2631
  @rtype: None or objects.BlockDevStatus
2632
  @return: None if the disk cannot be found, otherwise a the current
2633
           information
2634

2635
  """
2636
  try:
2637
    rbd = _RecursiveFindBD(disk)
2638
  except errors.BlockDeviceError, err:
2639
    _Fail("Failed to find device: %s", err, exc=True)
2640

    
2641
  if rbd is None:
2642
    return None
2643

    
2644
  return rbd.GetSyncStatus()
2645

    
2646

    
2647
def BlockdevGetdimensions(disks):
2648
  """Computes the size of the given disks.
2649

2650
  If a disk is not found, returns None instead.
2651

2652
  @type disks: list of L{objects.Disk}
2653
  @param disks: the list of disk to compute the size for
2654
  @rtype: list
2655
  @return: list with elements None if the disk cannot be found,
2656
      otherwise the pair (size, spindles), where spindles is None if the
2657
      device doesn't support that
2658

2659
  """
2660
  result = []
2661
  for cf in disks:
2662
    try:
2663
      rbd = _RecursiveFindBD(cf)
2664
    except errors.BlockDeviceError:
2665
      result.append(None)
2666
      continue
2667
    if rbd is None:
2668
      result.append(None)
2669
    else:
2670
      result.append(rbd.GetActualDimensions())
2671
  return result
2672

    
2673

    
2674
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2675
  """Write a file to the filesystem.
2676

2677
  This allows the master to overwrite(!) a file. It will only perform
2678
  the operation if the file belongs to a list of configuration files.
2679

2680
  @type file_name: str
2681
  @param file_name: the target file name
2682
  @type data: str
2683
  @param data: the new contents of the file
2684
  @type mode: int
2685
  @param mode: the mode to give the file (can be None)
2686
  @type uid: string
2687
  @param uid: the owner of the file
2688
  @type gid: string
2689
  @param gid: the group of the file
2690
  @type atime: float
2691
  @param atime: the atime to set on the file (can be None)
2692
  @type mtime: float
2693
  @param mtime: the mtime to set on the file (can be None)
2694
  @rtype: None
2695

2696
  """
2697
  file_name = vcluster.LocalizeVirtualPath(file_name)
2698

    
2699
  if not os.path.isabs(file_name):
2700
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2701

    
2702
  if file_name not in _ALLOWED_UPLOAD_FILES:
2703
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2704
          file_name)
2705

    
2706
  raw_data = _Decompress(data)
2707

    
2708
  if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2709
    _Fail("Invalid username/groupname type")
2710

    
2711
  getents = runtime.GetEnts()
2712
  uid = getents.LookupUser(uid)
2713
  gid = getents.LookupGroup(gid)
2714

    
2715
  utils.SafeWriteFile(file_name, None,
2716
                      data=raw_data, mode=mode, uid=uid, gid=gid,
2717
                      atime=atime, mtime=mtime)
2718

    
2719

    
2720
def RunOob(oob_program, command, node, timeout):
2721
  """Executes oob_program with given command on given node.
2722

2723
  @param oob_program: The path to the executable oob_program
2724
  @param command: The command to invoke on oob_program
2725
  @param node: The node given as an argument to the program
2726
  @param timeout: Timeout after which we kill the oob program
2727

2728
  @return: stdout
2729
  @raise RPCFail: If execution fails for some reason
2730

2731
  """
2732
  result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2733

    
2734
  if result.failed:
2735
    _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2736
          result.fail_reason, result.output)
2737

    
2738
  return result.stdout
2739

    
2740

    
2741
def _OSOndiskAPIVersion(os_dir):
2742
  """Compute and return the API version of a given OS.
2743

2744
  This function will try to read the API version of the OS residing in
2745
  the 'os_dir' directory.
2746

2747
  @type os_dir: str
2748
  @param os_dir: the directory in which we should look for the OS
2749
  @rtype: tuple
2750
  @return: tuple (status, data) with status denoting the validity and
2751
      data holding either the vaid versions or an error message
2752

2753
  """
2754
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2755

    
2756
  try:
2757
    st = os.stat(api_file)
2758
  except EnvironmentError, err:
2759
    return False, ("Required file '%s' not found under path %s: %s" %
2760
                   (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err)))
2761

    
2762
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2763
    return False, ("File '%s' in %s is not a regular file" %
2764
                   (constants.OS_API_FILE, os_dir))
2765

    
2766
  try:
2767
    api_versions = utils.ReadFile(api_file).splitlines()
2768
  except EnvironmentError, err:
2769
    return False, ("Error while reading the API version file at %s: %s" %
2770
                   (api_file, utils.ErrnoOrStr(err)))
2771

    
2772
  try:
2773
    api_versions = [int(version.strip()) for version in api_versions]
2774
  except (TypeError, ValueError), err:
2775
    return False, ("API version(s) can't be converted to integer: %s" %
2776
                   str(err))
2777

    
2778
  return True, api_versions
2779

    
2780

    
2781
def DiagnoseOS(top_dirs=None):
2782
  """Compute the validity for all OSes.
2783

2784
  @type top_dirs: list
2785
  @param top_dirs: the list of directories in which to
2786
      search (if not given defaults to
2787
      L{pathutils.OS_SEARCH_PATH})
2788
  @rtype: list of L{objects.OS}
2789
  @return: a list of tuples (name, path, status, diagnose, variants,
2790
      parameters, api_version) for all (potential) OSes under all
2791
      search paths, where:
2792
          - name is the (potential) OS name
2793
          - path is the full path to the OS
2794
          - status True/False is the validity of the OS
2795
          - diagnose is the error message for an invalid OS, otherwise empty
2796
          - variants is a list of supported OS variants, if any
2797
          - parameters is a list of (name, help) parameters, if any
2798
          - api_version is a list of support OS API versions
2799

2800
  """
2801
  if top_dirs is None:
2802
    top_dirs = pathutils.OS_SEARCH_PATH
2803

    
2804
  result = []
2805
  for dir_name in top_dirs:
2806
    if os.path.isdir(dir_name):
2807
      try:
2808
        f_names = utils.ListVisibleFiles(dir_name)
2809
      except EnvironmentError, err:
2810
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2811
        break
2812
      for name in f_names:
2813
        os_path = utils.PathJoin(dir_name, name)
2814
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2815
        if status:
2816
          diagnose = ""
2817
          variants = os_inst.supported_variants
2818
          parameters = os_inst.supported_parameters
2819
          api_versions = os_inst.api_versions
2820
        else:
2821
          diagnose = os_inst
2822
          variants = parameters = api_versions = []
2823
        result.append((name, os_path, status, diagnose, variants,
2824
                       parameters, api_versions))
2825

    
2826
  return result
2827

    
2828

    
2829
def _TryOSFromDisk(name, base_dir=None):
2830
  """Create an OS instance from disk.
2831

2832
  This function will return an OS instance if the given name is a
2833
  valid OS name.
2834

2835
  @type base_dir: string
2836
  @keyword base_dir: Base directory containing OS installations.
2837
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2838
  @rtype: tuple
2839
  @return: success and either the OS instance if we find a valid one,
2840
      or error message
2841

2842
  """
2843
  if base_dir is None:
2844
    os_dir = utils.FindFile(name, pathutils.OS_SEARCH_PATH, os.path.isdir)
2845
  else:
2846
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2847

    
2848
  if os_dir is None:
2849
    return False, "Directory for OS %s not found in search path" % name
2850

    
2851
  status, api_versions = _OSOndiskAPIVersion(os_dir)
2852
  if not status:
2853
    # push the error up
2854
    return status, api_versions
2855

    
2856
  if not constants.OS_API_VERSIONS.intersection(api_versions):
2857
    return False, ("API version mismatch for path '%s': found %s, want %s." %
2858
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
2859

    
2860
  # OS Files dictionary, we will populate it with the absolute path
2861
  # names; if the value is True, then it is a required file, otherwise
2862
  # an optional one
2863
  os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2864

    
2865
  if max(api_versions) >= constants.OS_API_V15:
2866
    os_files[constants.OS_VARIANTS_FILE] = False
2867

    
2868
  if max(api_versions) >= constants.OS_API_V20:
2869
    os_files[constants.OS_PARAMETERS_FILE] = True
2870
  else:
2871
    del os_files[constants.OS_SCRIPT_VERIFY]
2872

    
2873
  for (filename, required) in os_files.items():
2874
    os_files[filename] = utils.PathJoin(os_dir, filename)
2875

    
2876
    try:
2877
      st = os.stat(os_files[filename])
2878
    except EnvironmentError, err:
2879
      if err.errno == errno.ENOENT and not required:
2880
        del os_files[filename]
2881
        continue
2882
      return False, ("File '%s' under path '%s' is missing (%s)" %
2883
                     (filename, os_dir, utils.ErrnoOrStr(err)))
2884

    
2885
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2886
      return False, ("File '%s' under path '%s' is not a regular file" %
2887
                     (filename, os_dir))
2888

    
2889
    if filename in constants.OS_SCRIPTS:
2890
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2891
        return False, ("File '%s' under path '%s' is not executable" %
2892
                       (filename, os_dir))
2893

    
2894
  variants = []
2895
  if constants.OS_VARIANTS_FILE in os_files:
2896
    variants_file = os_files[constants.OS_VARIANTS_FILE]
2897
    try:
2898
      variants = \
2899
        utils.FilterEmptyLinesAndComments(utils.ReadFile(variants_file))
2900
    except EnvironmentError, err:
2901
      # we accept missing files, but not other errors
2902
      if err.errno != errno.ENOENT:
2903
        return False, ("Error while reading the OS variants file at %s: %s" %
2904
                       (variants_file, utils.ErrnoOrStr(err)))
2905

    
2906
  parameters = []
2907
  if constants.OS_PARAMETERS_FILE in os_files:
2908
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2909
    try:
2910
      parameters = utils.ReadFile(parameters_file).splitlines()
2911
    except EnvironmentError, err:
2912
      return False, ("Error while reading the OS parameters file at %s: %s" %
2913
                     (parameters_file, utils.ErrnoOrStr(err)))
2914
    parameters = [v.split(None, 1) for v in parameters]
2915

    
2916
  os_obj = objects.OS(name=name, path=os_dir,
2917
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
2918
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
2919
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
2920
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
2921
                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2922
                                                 None),
2923
                      supported_variants=variants,
2924
                      supported_parameters=parameters,
2925
                      api_versions=api_versions)
2926
  return True, os_obj
2927

    
2928

    
2929
def OSFromDisk(name, base_dir=None):
2930
  """Create an OS instance from disk.
2931

2932
  This function will return an OS instance if the given name is a
2933
  valid OS name. Otherwise, it will raise an appropriate
2934
  L{RPCFail} exception, detailing why this is not a valid OS.
2935

2936
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2937
  an exception but returns true/false status data.
2938

2939
  @type base_dir: string
2940
  @keyword base_dir: Base directory containing OS installations.
2941
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2942
  @rtype: L{objects.OS}
2943
  @return: the OS instance if we find a valid one
2944
  @raise RPCFail: if we don't find a valid OS
2945

2946
  """
2947
  name_only = objects.OS.GetName(name)
2948
  status, payload = _TryOSFromDisk(name_only, base_dir)
2949

    
2950
  if not status:
2951
    _Fail(payload)
2952

    
2953
  return payload
2954

    
2955

    
2956
def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2957
  """Calculate the basic environment for an os script.
2958

2959
  @type os_name: str
2960
  @param os_name: full operating system name (including variant)
2961
  @type inst_os: L{objects.OS}
2962
  @param inst_os: operating system for which the environment is being built
2963
  @type os_params: dict
2964
  @param os_params: the OS parameters
2965
  @type debug: integer
2966
  @param debug: debug level (0 or 1, for OS Api 10)
2967
  @rtype: dict
2968
  @return: dict of environment variables
2969
  @raise errors.BlockDeviceError: if the block device
2970
      cannot be found
2971

2972
  """
2973
  result = {}
2974
  api_version = \
2975
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2976
  result["OS_API_VERSION"] = "%d" % api_version
2977
  result["OS_NAME"] = inst_os.name
2978
  result["DEBUG_LEVEL"] = "%d" % debug
2979

    
2980
  # OS variants
2981
  if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2982
    variant = objects.OS.GetVariant(os_name)
2983
    if not variant:
2984
      variant = inst_os.supported_variants[0]
2985
  else:
2986
    variant = ""
2987
  result["OS_VARIANT"] = variant
2988

    
2989
  # OS params
2990
  for pname, pvalue in os_params.items():
2991
    result["OSP_%s" % pname.upper()] = pvalue
2992

    
2993
  # Set a default path otherwise programs called by OS scripts (or
2994
  # even hooks called from OS scripts) might break, and we don't want
2995
  # to have each script require setting a PATH variable
2996
  result["PATH"] = constants.HOOKS_PATH
2997

    
2998
  return result
2999

    
3000

    
3001
def OSEnvironment(instance, inst_os, debug=0):
3002
  """Calculate the environment for an os script.
3003

3004
  @type instance: L{objects.Instance}
3005
  @param instance: target instance for the os script run
3006
  @type inst_os: L{objects.OS}
3007
  @param inst_os: operating system for which the environment is being built
3008
  @type debug: integer
3009
  @param debug: debug level (0 or 1, for OS Api 10)
3010
  @rtype: dict
3011
  @return: dict of environment variables
3012
  @raise errors.BlockDeviceError: if the block device
3013
      cannot be found
3014

3015
  """
3016
  result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
3017

    
3018
  for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
3019
    result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
3020

    
3021
  result["HYPERVISOR"] = instance.hypervisor
3022
  result["DISK_COUNT"] = "%d" % len(instance.disks)
3023
  result["NIC_COUNT"] = "%d" % len(instance.nics)
3024
  result["INSTANCE_SECONDARY_NODES"] = \
3025
      ("%s" % " ".join(instance.secondary_nodes))
3026

    
3027
  # Disks
3028
  for idx, disk in enumerate(instance.disks):
3029
    real_disk = _OpenRealBD(disk)
3030
    result["DISK_%d_PATH" % idx] = real_disk.dev_path
3031
    result["DISK_%d_ACCESS" % idx] = disk.mode
3032
    result["DISK_%d_UUID" % idx] = disk.uuid
3033
    if disk.name:
3034
      result["DISK_%d_NAME" % idx] = disk.name
3035
    if constants.HV_DISK_TYPE in instance.hvparams:
3036
      result["DISK_%d_FRONTEND_TYPE" % idx] = \
3037
        instance.hvparams[constants.HV_DISK_TYPE]
3038
    if disk.dev_type in constants.DTS_BLOCK:
3039
      result["DISK_%d_BACKEND_TYPE" % idx] = "block"
3040
    elif disk.dev_type in constants.DTS_FILEBASED:
3041
      result["DISK_%d_BACKEND_TYPE" % idx] = \
3042
        "file:%s" % disk.logical_id[0]
3043

    
3044
  # NICs
3045
  for idx, nic in enumerate(instance.nics):
3046
    result["NIC_%d_MAC" % idx] = nic.mac
3047
    result["NIC_%d_UUID" % idx] = nic.uuid
3048
    if nic.name:
3049
      result["NIC_%d_NAME" % idx] = nic.name
3050
    if nic.ip:
3051
      result["NIC_%d_IP" % idx] = nic.ip
3052
    result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
3053
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3054
      result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
3055
    if nic.nicparams[constants.NIC_LINK]:
3056
      result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
3057
    if nic.netinfo:
3058
      nobj = objects.Network.FromDict(nic.netinfo)
3059
      result.update(nobj.HooksDict("NIC_%d_" % idx))
3060
    if constants.HV_NIC_TYPE in instance.hvparams:
3061
      result["NIC_%d_FRONTEND_TYPE" % idx] = \
3062
        instance.hvparams[constants.HV_NIC_TYPE]
3063

    
3064
  # HV/BE params
3065
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
3066
    for key, value in source.items():
3067
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
3068

    
3069
  return result
3070

    
3071

    
3072
def DiagnoseExtStorage(top_dirs=None):
3073
  """Compute the validity for all ExtStorage Providers.
3074

3075
  @type top_dirs: list
3076
  @param top_dirs: the list of directories in which to
3077
      search (if not given defaults to
3078
      L{pathutils.ES_SEARCH_PATH})
3079
  @rtype: list of L{objects.ExtStorage}
3080
  @return: a list of tuples (name, path, status, diagnose, parameters)
3081
      for all (potential) ExtStorage Providers under all
3082
      search paths, where:
3083
          - name is the (potential) ExtStorage Provider
3084
          - path is the full path to the ExtStorage Provider
3085
          - status True/False is the validity of the ExtStorage Provider
3086
          - diagnose is the error message for an invalid ExtStorage Provider,
3087
            otherwise empty
3088
          - parameters is a list of (name, help) parameters, if any
3089

3090
  """
3091
  if top_dirs is None:
3092
    top_dirs = pathutils.ES_SEARCH_PATH
3093

    
3094
  result = []
3095
  for dir_name in top_dirs:
3096
    if os.path.isdir(dir_name):
3097
      try:
3098
        f_names = utils.ListVisibleFiles(dir_name)
3099
      except EnvironmentError, err:
3100
        logging.exception("Can't list the ExtStorage directory %s: %s",
3101
                          dir_name, err)
3102
        break
3103
      for name in f_names:
3104
        es_path = utils.PathJoin(dir_name, name)
3105
        status, es_inst = bdev.ExtStorageFromDisk(name, base_dir=dir_name)
3106
        if status:
3107
          diagnose = ""
3108
          parameters = es_inst.supported_parameters
3109
        else:
3110
          diagnose = es_inst
3111
          parameters = []
3112
        result.append((name, es_path, status, diagnose, parameters))
3113

    
3114
  return result
3115

    
3116

    
3117
def BlockdevGrow(disk, amount, dryrun, backingstore, excl_stor):
3118
  """Grow a stack of block devices.
3119

3120
  This function is called recursively, with the childrens being the
3121
  first ones to resize.
3122

3123
  @type disk: L{objects.Disk}
3124
  @param disk: the disk to be grown
3125
  @type amount: integer
3126
  @param amount: the amount (in mebibytes) to grow with
3127
  @type dryrun: boolean
3128
  @param dryrun: whether to execute the operation in simulation mode
3129
      only, without actually increasing the size
3130
  @param backingstore: whether to execute the operation on backing storage
3131
      only, or on "logical" storage only; e.g. DRBD is logical storage,
3132
      whereas LVM, file, RBD are backing storage
3133
  @rtype: (status, result)
3134
  @type excl_stor: boolean
3135
  @param excl_stor: Whether exclusive_storage is active
3136
  @return: a tuple with the status of the operation (True/False), and
3137
      the errors message if status is False
3138

3139
  """
3140
  r_dev = _RecursiveFindBD(disk)
3141
  if r_dev is None:
3142
    _Fail("Cannot find block device %s", disk)
3143

    
3144
  try:
3145
    r_dev.Grow(amount, dryrun, backingstore, excl_stor)
3146
  except errors.BlockDeviceError, err:
3147
    _Fail("Failed to grow block device: %s", err, exc=True)
3148

    
3149

    
3150
def BlockdevSnapshot(disk):
3151
  """Create a snapshot copy of a block device.
3152

3153
  This function is called recursively, and the snapshot is actually created
3154
  just for the leaf lvm backend device.
3155

3156
  @type disk: L{objects.Disk}
3157
  @param disk: the disk to be snapshotted
3158
  @rtype: string
3159
  @return: snapshot disk ID as (vg, lv)
3160

3161
  """
3162
  if disk.dev_type == constants.DT_DRBD8:
3163
    if not disk.children:
3164
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
3165
            disk.unique_id)
3166
    return BlockdevSnapshot(disk.children[0])
3167
  elif disk.dev_type == constants.DT_PLAIN:
3168
    r_dev = _RecursiveFindBD(disk)
3169
    if r_dev is not None:
3170
      # FIXME: choose a saner value for the snapshot size
3171
      # let's stay on the safe side and ask for the full size, for now
3172
      return r_dev.Snapshot(disk.size)
3173
    else:
3174
      _Fail("Cannot find block device %s", disk)
3175
  else:
3176
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
3177
          disk.unique_id, disk.dev_type)
3178

    
3179

    
3180
def BlockdevSetInfo(disk, info):
3181
  """Sets 'metadata' information on block devices.
3182

3183
  This function sets 'info' metadata on block devices. Initial
3184
  information is set at device creation; this function should be used
3185
  for example after renames.
3186

3187
  @type disk: L{objects.Disk}
3188
  @param disk: the disk to be grown
3189
  @type info: string
3190
  @param info: new 'info' metadata
3191
  @rtype: (status, result)
3192
  @return: a tuple with the status of the operation (True/False), and
3193
      the errors message if status is False
3194

3195
  """
3196
  r_dev = _RecursiveFindBD(disk)
3197
  if r_dev is None:
3198
    _Fail("Cannot find block device %s", disk)
3199

    
3200
  try:
3201
    r_dev.SetInfo(info)
3202
  except errors.BlockDeviceError, err:
3203
    _Fail("Failed to set information on block device: %s", err, exc=True)
3204

    
3205

    
3206
def FinalizeExport(instance, snap_disks):
3207
  """Write out the export configuration information.
3208

3209
  @type instance: L{objects.Instance}
3210
  @param instance: the instance which we export, used for
3211
      saving configuration
3212
  @type snap_disks: list of L{objects.Disk}
3213
  @param snap_disks: list of snapshot block devices, which
3214
      will be used to get the actual name of the dump file
3215

3216
  @rtype: None
3217

3218
  """
3219
  destdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name + ".new")
3220
  finaldestdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name)
3221

    
3222
  config = objects.SerializableConfigParser()
3223

    
3224
  config.add_section(constants.INISECT_EXP)
3225
  config.set(constants.INISECT_EXP, "version", "0")
3226
  config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
3227
  config.set(constants.INISECT_EXP, "source", instance.primary_node)
3228
  config.set(constants.INISECT_EXP, "os", instance.os)
3229
  config.set(constants.INISECT_EXP, "compression", "none")
3230

    
3231
  config.add_section(constants.INISECT_INS)
3232
  config.set(constants.INISECT_INS, "name", instance.name)
3233
  config.set(constants.INISECT_INS, "maxmem", "%d" %
3234
             instance.beparams[constants.BE_MAXMEM])
3235
  config.set(constants.INISECT_INS, "minmem", "%d" %
3236
             instance.beparams[constants.BE_MINMEM])
3237
  # "memory" is deprecated, but useful for exporting to old ganeti versions
3238
  config.set(constants.INISECT_INS, "memory", "%d" %
3239
             instance.beparams[constants.BE_MAXMEM])
3240
  config.set(constants.INISECT_INS, "vcpus", "%d" %
3241
             instance.beparams[constants.BE_VCPUS])
3242
  config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
3243
  config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
3244
  config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
3245

    
3246
  nic_total = 0
3247
  for nic_count, nic in enumerate(instance.nics):
3248
    nic_total += 1
3249
    config.set(constants.INISECT_INS, "nic%d_mac" %
3250
               nic_count, "%s" % nic.mac)
3251
    config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
3252
    config.set(constants.INISECT_INS, "nic%d_network" % nic_count,
3253
               "%s" % nic.network)
3254
    for param in constants.NICS_PARAMETER_TYPES:
3255
      config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
3256
                 "%s" % nic.nicparams.get(param, None))
3257
  # TODO: redundant: on load can read nics until it doesn't exist
3258
  config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
3259

    
3260
  disk_total = 0
3261
  for disk_count, disk in enumerate(snap_disks):
3262
    if disk:
3263
      disk_total += 1
3264
      config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
3265
                 ("%s" % disk.iv_name))
3266
      config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
3267
                 ("%s" % disk.logical_id[1]))
3268
      config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
3269
                 ("%d" % disk.size))
3270

    
3271
  config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
3272

    
3273
  # New-style hypervisor/backend parameters
3274

    
3275
  config.add_section(constants.INISECT_HYP)
3276
  for name, value in instance.hvparams.items():
3277
    if name not in constants.HVC_GLOBALS:
3278
      config.set(constants.INISECT_HYP, name, str(value))
3279

    
3280
  config.add_section(constants.INISECT_BEP)
3281
  for name, value in instance.beparams.items():
3282
    config.set(constants.INISECT_BEP, name, str(value))
3283

    
3284
  config.add_section(constants.INISECT_OSP)
3285
  for name, value in instance.osparams.items():
3286
    config.set(constants.INISECT_OSP, name, str(value))
3287

    
3288
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
3289
                  data=config.Dumps())
3290
  shutil.rmtree(finaldestdir, ignore_errors=True)
3291
  shutil.move(destdir, finaldestdir)
3292

    
3293

    
3294
def ExportInfo(dest):
3295
  """Get export configuration information.
3296

3297
  @type dest: str
3298
  @param dest: directory containing the export
3299

3300
  @rtype: L{objects.SerializableConfigParser}
3301
  @return: a serializable config file containing the
3302
      export info
3303

3304
  """
3305
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
3306

    
3307
  config = objects.SerializableConfigParser()
3308
  config.read(cff)
3309

    
3310
  if (not config.has_section(constants.INISECT_EXP) or
3311
      not config.has_section(constants.INISECT_INS)):
3312
    _Fail("Export info file doesn't have the required fields")
3313

    
3314
  return config.Dumps()
3315

    
3316

    
3317
def ListExports():
3318
  """Return a list of exports currently available on this machine.
3319

3320
  @rtype: list
3321
  @return: list of the exports
3322

3323
  """
3324
  if os.path.isdir(pathutils.EXPORT_DIR):
3325
    return sorted(utils.ListVisibleFiles(pathutils.EXPORT_DIR))
3326
  else:
3327
    _Fail("No exports directory")
3328

    
3329

    
3330
def RemoveExport(export):
3331
  """Remove an existing export from the node.
3332

3333
  @type export: str
3334
  @param export: the name of the export to remove
3335
  @rtype: None
3336

3337
  """
3338
  target = utils.PathJoin(pathutils.EXPORT_DIR, export)
3339

    
3340
  try:
3341
    shutil.rmtree(target)
3342
  except EnvironmentError, err:
3343
    _Fail("Error while removing the export: %s", err, exc=True)
3344

    
3345

    
3346
def BlockdevRename(devlist):
3347
  """Rename a list of block devices.
3348

3349
  @type devlist: list of tuples
3350
  @param devlist: list of tuples of the form  (disk, new_unique_id); disk is
3351
      an L{objects.Disk} object describing the current disk, and new
3352
      unique_id is the name we rename it to
3353
  @rtype: boolean
3354
  @return: True if all renames succeeded, False otherwise
3355

3356
  """
3357
  msgs = []
3358
  result = True
3359
  for disk, unique_id in devlist:
3360
    dev = _RecursiveFindBD(disk)
3361
    if dev is None:
3362
      msgs.append("Can't find device %s in rename" % str(disk))
3363
      result = False
3364
      continue
3365
    try:
3366
      old_rpath = dev.dev_path
3367
      dev.Rename(unique_id)
3368
      new_rpath = dev.dev_path
3369
      if old_rpath != new_rpath:
3370
        DevCacheManager.RemoveCache(old_rpath)
3371
        # FIXME: we should add the new cache information here, like:
3372
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
3373
        # but we don't have the owner here - maybe parse from existing
3374
        # cache? for now, we only lose lvm data when we rename, which
3375
        # is less critical than DRBD or MD
3376
    except errors.BlockDeviceError, err:
3377
      msgs.append("Can't rename device '%s' to '%s': %s" %
3378
                  (dev, unique_id, err))
3379
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
3380
      result = False
3381
  if not result:
3382
    _Fail("; ".join(msgs))
3383

    
3384

    
3385
def _TransformFileStorageDir(fs_dir):
3386
  """Checks whether given file_storage_dir is valid.
3387

3388
  Checks wheter the given fs_dir is within the cluster-wide default
3389
  file_storage_dir or the shared_file_storage_dir, which are stored in
3390
  SimpleStore. Only paths under those directories are allowed.
3391

3392
  @type fs_dir: str
3393
  @param fs_dir: the path to check
3394

3395
  @return: the normalized path if valid, None otherwise
3396

3397
  """
3398
  filestorage.CheckFileStoragePath(fs_dir)
3399

    
3400
  return os.path.normpath(fs_dir)
3401

    
3402

    
3403
def CreateFileStorageDir(file_storage_dir):
3404
  """Create file storage directory.
3405

3406
  @type file_storage_dir: str
3407
  @param file_storage_dir: directory to create
3408

3409
  @rtype: tuple
3410
  @return: tuple with first element a boolean indicating wheter dir
3411
      creation was successful or not
3412

3413
  """
3414
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
3415
  if os.path.exists(file_storage_dir):
3416
    if not os.path.isdir(file_storage_dir):
3417
      _Fail("Specified storage dir '%s' is not a directory",
3418
            file_storage_dir)
3419
  else:
3420
    try:
3421
      os.makedirs(file_storage_dir, 0750)
3422
    except OSError, err:
3423
      _Fail("Cannot create file storage directory '%s': %s",
3424
            file_storage_dir, err, exc=True)
3425

    
3426

    
3427
def RemoveFileStorageDir(file_storage_dir):
3428
  """Remove file storage directory.
3429

3430
  Remove it only if it's empty. If not log an error and return.
3431

3432
  @type file_storage_dir: str
3433
  @param file_storage_dir: the directory we should cleanup
3434
  @rtype: tuple (success,)
3435
  @return: tuple of one element, C{success}, denoting
3436
      whether the operation was successful
3437

3438
  """
3439
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
3440
  if os.path.exists(file_storage_dir):
3441
    if not os.path.isdir(file_storage_dir):
3442
      _Fail("Specified Storage directory '%s' is not a directory",
3443
            file_storage_dir)
3444
    # deletes dir only if empty, otherwise we want to fail the rpc call
3445
    try:
3446
      os.rmdir(file_storage_dir)
3447
    except OSError, err:
3448
      _Fail("Cannot remove file storage directory '%s': %s",
3449
            file_storage_dir, err)
3450

    
3451

    
3452
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
3453
  """Rename the file storage directory.
3454

3455
  @type old_file_storage_dir: str
3456
  @param old_file_storage_dir: the current path
3457
  @type new_file_storage_dir: str
3458
  @param new_file_storage_dir: the name we should rename to
3459
  @rtype: tuple (success,)
3460
  @return: tuple of one element, C{success}, denoting
3461
      whether the operation was successful
3462

3463
  """
3464
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
3465
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
3466
  if not os.path.exists(new_file_storage_dir):
3467
    if os.path.isdir(old_file_storage_dir):
3468
      try:
3469
        os.rename(old_file_storage_dir, new_file_storage_dir)
3470
      except OSError, err:
3471
        _Fail("Cannot rename '%s' to '%s': %s",
3472
              old_file_storage_dir, new_file_storage_dir, err)
3473
    else:
3474
      _Fail("Specified storage dir '%s' is not a directory",
3475
            old_file_storage_dir)
3476
  else:
3477
    if os.path.exists(old_file_storage_dir):
3478
      _Fail("Cannot rename '%s' to '%s': both locations exist",
3479
            old_file_storage_dir, new_file_storage_dir)
3480

    
3481

    
3482
def _EnsureJobQueueFile(file_name):
3483
  """Checks whether the given filename is in the queue directory.
3484

3485
  @type file_name: str
3486
  @param file_name: the file name we should check
3487
  @rtype: None
3488
  @raises RPCFail: if the file is not valid
3489

3490
  """
3491
  if not utils.IsBelowDir(pathutils.QUEUE_DIR, file_name):
3492
    _Fail("Passed job queue file '%s' does not belong to"
3493
          " the queue directory '%s'", file_name, pathutils.QUEUE_DIR)
3494

    
3495

    
3496
def JobQueueUpdate(file_name, content):
3497
  """Updates a file in the queue directory.
3498

3499
  This is just a wrapper over L{utils.io.WriteFile}, with proper
3500
  checking.
3501

3502
  @type file_name: str
3503
  @param file_name: the job file name
3504
  @type content: str
3505
  @param content: the new job contents
3506
  @rtype: boolean
3507
  @return: the success of the operation
3508

3509
  """
3510
  file_name = vcluster.LocalizeVirtualPath(file_name)
3511

    
3512
  _EnsureJobQueueFile(file_name)
3513
  getents = runtime.GetEnts()
3514

    
3515
  # Write and replace the file atomically
3516
  utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
3517
                  gid=getents.daemons_gid, mode=constants.JOB_QUEUE_FILES_PERMS)
3518

    
3519

    
3520
def JobQueueRename(old, new):
3521
  """Renames a job queue file.
3522

3523
  This is just a wrapper over os.rename with proper checking.
3524

3525
  @type old: str
3526
  @param old: the old (actual) file name
3527
  @type new: str
3528
  @param new: the desired file name
3529
  @rtype: tuple
3530
  @return: the success of the operation and payload
3531

3532
  """
3533
  old = vcluster.LocalizeVirtualPath(old)
3534
  new = vcluster.LocalizeVirtualPath(new)
3535

    
3536
  _EnsureJobQueueFile(old)
3537
  _EnsureJobQueueFile(new)
3538

    
3539
  getents = runtime.GetEnts()
3540

    
3541
  utils.RenameFile(old, new, mkdir=True, mkdir_mode=0750,
3542
                   dir_uid=getents.masterd_uid, dir_gid=getents.daemons_gid)
3543

    
3544

    
3545
def BlockdevClose(instance_name, disks):
3546
  """Closes the given block devices.
3547

3548
  This means they will be switched to secondary mode (in case of
3549
  DRBD).
3550

3551
  @param instance_name: if the argument is not empty, the symlinks
3552
      of this instance will be removed
3553
  @type disks: list of L{objects.Disk}
3554
  @param disks: the list of disks to be closed
3555
  @rtype: tuple (success, message)
3556
  @return: a tuple of success and message, where success
3557
      indicates the succes of the operation, and message
3558
      which will contain the error details in case we
3559
      failed
3560

3561
  """
3562
  bdevs = []
3563
  for cf in disks:
3564
    rd = _RecursiveFindBD(cf)
3565
    if rd is None:
3566
      _Fail("Can't find device %s", cf)
3567
    bdevs.append(rd)
3568

    
3569
  msg = []
3570
  for rd in bdevs:
3571
    try:
3572
      rd.Close()
3573
    except errors.BlockDeviceError, err:
3574
      msg.append(str(err))
3575
  if msg:
3576
    _Fail("Can't make devices secondary: %s", ",".join(msg))
3577
  else:
3578
    if instance_name:
3579
      _RemoveBlockDevLinks(instance_name, disks)
3580

    
3581

    
3582
def ValidateHVParams(hvname, hvparams):
3583
  """Validates the given hypervisor parameters.
3584

3585
  @type hvname: string
3586
  @param hvname: the hypervisor name
3587
  @type hvparams: dict
3588
  @param hvparams: the hypervisor parameters to be validated
3589
  @rtype: None
3590

3591
  """
3592
  try:
3593
    hv_type = hypervisor.GetHypervisor(hvname)
3594
    hv_type.ValidateParameters(hvparams)
3595
  except errors.HypervisorError, err:
3596
    _Fail(str(err), log=False)
3597

    
3598

    
3599
def _CheckOSPList(os_obj, parameters):
3600
  """Check whether a list of parameters is supported by the OS.
3601

3602
  @type os_obj: L{objects.OS}
3603
  @param os_obj: OS object to check
3604
  @type parameters: list
3605
  @param parameters: the list of parameters to check
3606

3607
  """
3608
  supported = [v[0] for v in os_obj.supported_parameters]
3609
  delta = frozenset(parameters).difference(supported)
3610
  if delta:
3611
    _Fail("The following parameters are not supported"
3612
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
3613

    
3614

    
3615
def ValidateOS(required, osname, checks, osparams):
3616
  """Validate the given OS' parameters.
3617

3618
  @type required: boolean
3619
  @param required: whether absence of the OS should translate into
3620
      failure or not
3621
  @type osname: string
3622
  @param osname: the OS to be validated
3623
  @type checks: list
3624
  @param checks: list of the checks to run (currently only 'parameters')
3625
  @type osparams: dict
3626
  @param osparams: dictionary with OS parameters
3627
  @rtype: boolean
3628
  @return: True if the validation passed, or False if the OS was not
3629
      found and L{required} was false
3630

3631
  """
3632
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
3633
    _Fail("Unknown checks required for OS %s: %s", osname,
3634
          set(checks).difference(constants.OS_VALIDATE_CALLS))
3635

    
3636
  name_only = objects.OS.GetName(osname)
3637
  status, tbv = _TryOSFromDisk(name_only, None)
3638

    
3639
  if not status:
3640
    if required:
3641
      _Fail(tbv)
3642
    else:
3643
      return False
3644

    
3645
  if max(tbv.api_versions) < constants.OS_API_V20:
3646
    return True
3647

    
3648
  if constants.OS_VALIDATE_PARAMETERS in checks:
3649
    _CheckOSPList(tbv, osparams.keys())
3650

    
3651
  validate_env = OSCoreEnv(osname, tbv, osparams)
3652
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
3653
                        cwd=tbv.path, reset_env=True)
3654
  if result.failed:
3655
    logging.error("os validate command '%s' returned error: %s output: %s",
3656
                  result.cmd, result.fail_reason, result.output)
3657
    _Fail("OS validation script failed (%s), output: %s",
3658
          result.fail_reason, result.output, log=False)
3659

    
3660
  return True
3661

    
3662

    
3663
def DemoteFromMC():
3664
  """Demotes the current node from master candidate role.
3665

3666
  """
3667
  # try to ensure we're not the master by mistake
3668
  master, myself = ssconf.GetMasterAndMyself()
3669
  if master == myself:
3670
    _Fail("ssconf status shows I'm the master node, will not demote")
3671

    
3672
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "check", constants.MASTERD])
3673
  if not result.failed:
3674
    _Fail("The master daemon is running, will not demote")
3675

    
3676
  try:
3677
    if os.path.isfile(pathutils.CLUSTER_CONF_FILE):
3678
      utils.CreateBackup(pathutils.CLUSTER_CONF_FILE)
3679
  except EnvironmentError, err:
3680
    if err.errno != errno.ENOENT:
3681
      _Fail("Error while backing up cluster file: %s", err, exc=True)
3682

    
3683
  utils.RemoveFile(pathutils.CLUSTER_CONF_FILE)
3684

    
3685

    
3686
def _GetX509Filenames(cryptodir, name):
3687
  """Returns the full paths for the private key and certificate.
3688

3689
  """
3690
  return (utils.PathJoin(cryptodir, name),
3691
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
3692
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
3693

    
3694

    
3695
def CreateX509Certificate(validity, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3696
  """Creates a new X509 certificate for SSL/TLS.
3697

3698
  @type validity: int
3699
  @param validity: Validity in seconds
3700
  @rtype: tuple; (string, string)
3701
  @return: Certificate name and public part
3702

3703
  """
3704
  (key_pem, cert_pem) = \
3705
    utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
3706
                                     min(validity, _MAX_SSL_CERT_VALIDITY), 1)
3707

    
3708
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
3709
                              prefix="x509-%s-" % utils.TimestampForFilename())
3710
  try:
3711
    name = os.path.basename(cert_dir)
3712
    assert len(name) > 5
3713

    
3714
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3715

    
3716
    utils.WriteFile(key_file, mode=0400, data=key_pem)
3717
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
3718

    
3719
    # Never return private key as it shouldn't leave the node
3720
    return (name, cert_pem)
3721
  except Exception:
3722
    shutil.rmtree(cert_dir, ignore_errors=True)
3723
    raise
3724

    
3725

    
3726
def RemoveX509Certificate(name, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3727
  """Removes a X509 certificate.
3728

3729
  @type name: string
3730
  @param name: Certificate name
3731

3732
  """
3733
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3734

    
3735
  utils.RemoveFile(key_file)
3736
  utils.RemoveFile(cert_file)
3737

    
3738
  try:
3739
    os.rmdir(cert_dir)
3740
  except EnvironmentError, err:
3741
    _Fail("Cannot remove certificate directory '%s': %s",
3742
          cert_dir, err)
3743

    
3744

    
3745
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3746
  """Returns the command for the requested input/output.
3747

3748
  @type instance: L{objects.Instance}
3749
  @param instance: The instance object
3750
  @param mode: Import/export mode
3751
  @param ieio: Input/output type
3752
  @param ieargs: Input/output arguments
3753

3754
  """
3755
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3756

    
3757
  env = None
3758
  prefix = None
3759
  suffix = None
3760
  exp_size = None
3761

    
3762
  if ieio == constants.IEIO_FILE:
3763
    (filename, ) = ieargs
3764

    
3765
    if not utils.IsNormAbsPath(filename):
3766
      _Fail("Path '%s' is not normalized or absolute", filename)
3767

    
3768
    real_filename = os.path.realpath(filename)
3769
    directory = os.path.dirname(real_filename)
3770

    
3771
    if not utils.IsBelowDir(pathutils.EXPORT_DIR, real_filename):
3772
      _Fail("File '%s' is not under exports directory '%s': %s",
3773
            filename, pathutils.EXPORT_DIR, real_filename)
3774

    
3775
    # Create directory
3776
    utils.Makedirs(directory, mode=0750)
3777

    
3778
    quoted_filename = utils.ShellQuote(filename)
3779

    
3780
    if mode == constants.IEM_IMPORT:
3781
      suffix = "> %s" % quoted_filename
3782
    elif mode == constants.IEM_EXPORT:
3783
      suffix = "< %s" % quoted_filename
3784

    
3785
      # Retrieve file size
3786
      try:
3787
        st = os.stat(filename)
3788
      except EnvironmentError, err:
3789
        logging.error("Can't stat(2) %s: %s", filename, err)
3790
      else:
3791
        exp_size = utils.BytesToMebibyte(st.st_size)
3792

    
3793
  elif ieio == constants.IEIO_RAW_DISK:
3794
    (disk, ) = ieargs
3795

    
3796
    real_disk = _OpenRealBD(disk)
3797

    
3798
    if mode == constants.IEM_IMPORT:
3799
      # we use nocreat to fail if the device is not already there or we pass a
3800
      # wrong path; we use notrunc to no attempt truncate on an LV device
3801
      suffix = utils.BuildShellCmd("| dd of=%s conv=nocreat,notrunc bs=%s",
3802
                                   real_disk.dev_path,
3803
                                   str(1024 * 1024)) # 1 MB
3804

    
3805
    elif mode == constants.IEM_EXPORT:
3806
      # the block size on the read dd is 1MiB to match our units
3807
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3808
                                   real_disk.dev_path,
3809
                                   str(1024 * 1024), # 1 MB
3810
                                   str(disk.size))
3811
      exp_size = disk.size
3812

    
3813
  elif ieio == constants.IEIO_SCRIPT:
3814
    (disk, disk_index, ) = ieargs
3815

    
3816
    assert isinstance(disk_index, (int, long))
3817

    
3818
    inst_os = OSFromDisk(instance.os)
3819
    env = OSEnvironment(instance, inst_os)
3820

    
3821
    if mode == constants.IEM_IMPORT:
3822
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3823
      env["IMPORT_INDEX"] = str(disk_index)
3824
      script = inst_os.import_script
3825

    
3826
    elif mode == constants.IEM_EXPORT:
3827
      real_disk = _OpenRealBD(disk)
3828
      env["EXPORT_DEVICE"] = real_disk.dev_path
3829
      env["EXPORT_INDEX"] = str(disk_index)
3830
      script = inst_os.export_script
3831

    
3832
    # TODO: Pass special environment only to script
3833
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3834

    
3835
    if mode == constants.IEM_IMPORT:
3836
      suffix = "| %s" % script_cmd
3837

    
3838
    elif mode == constants.IEM_EXPORT:
3839
      prefix = "%s |" % script_cmd
3840

    
3841
    # Let script predict size
3842
    exp_size = constants.IE_CUSTOM_SIZE
3843

    
3844
  else:
3845
    _Fail("Invalid %s I/O mode %r", mode, ieio)
3846

    
3847
  return (env, prefix, suffix, exp_size)
3848

    
3849

    
3850
def _CreateImportExportStatusDir(prefix):
3851
  """Creates status directory for import/export.
3852

3853
  """
3854
  return tempfile.mkdtemp(dir=pathutils.IMPORT_EXPORT_DIR,
3855
                          prefix=("%s-%s-" %
3856
                                  (prefix, utils.TimestampForFilename())))
3857

    
3858

    
3859
def StartImportExportDaemon(mode, opts, host, port, instance, component,
3860
                            ieio, ieioargs):
3861
  """Starts an import or export daemon.
3862

3863
  @param mode: Import/output mode
3864
  @type opts: L{objects.ImportExportOptions}
3865
  @param opts: Daemon options
3866
  @type host: string
3867
  @param host: Remote host for export (None for import)
3868
  @type port: int
3869
  @param port: Remote port for export (None for import)
3870
  @type instance: L{objects.Instance}
3871
  @param instance: Instance object
3872
  @type component: string
3873
  @param component: which part of the instance is transferred now,
3874
      e.g. 'disk/0'
3875
  @param ieio: Input/output type
3876
  @param ieioargs: Input/output arguments
3877

3878
  """
3879
  if mode == constants.IEM_IMPORT:
3880
    prefix = "import"
3881

    
3882
    if not (host is None and port is None):
3883
      _Fail("Can not specify host or port on import")
3884

    
3885
  elif mode == constants.IEM_EXPORT:
3886
    prefix = "export"
3887

    
3888
    if host is None or port is None:
3889
      _Fail("Host and port must be specified for an export")
3890

    
3891
  else:
3892
    _Fail("Invalid mode %r", mode)
3893

    
3894
  if (opts.key_name is None) ^ (opts.ca_pem is None):
3895
    _Fail("Cluster certificate can only be used for both key and CA")
3896

    
3897
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3898
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3899

    
3900
  if opts.key_name is None:
3901
    # Use server.pem
3902
    key_path = pathutils.NODED_CERT_FILE
3903
    cert_path = pathutils.NODED_CERT_FILE
3904
    assert opts.ca_pem is None
3905
  else:
3906
    (_, key_path, cert_path) = _GetX509Filenames(pathutils.CRYPTO_KEYS_DIR,
3907
                                                 opts.key_name)
3908
    assert opts.ca_pem is not None
3909

    
3910
  for i in [key_path, cert_path]:
3911
    if not os.path.exists(i):
3912
      _Fail("File '%s' does not exist" % i)
3913

    
3914
  status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3915
  try:
3916
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3917
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3918
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3919

    
3920
    if opts.ca_pem is None:
3921
      # Use server.pem
3922
      ca = utils.ReadFile(pathutils.NODED_CERT_FILE)
3923
    else:
3924
      ca = opts.ca_pem
3925

    
3926
    # Write CA file
3927
    utils.WriteFile(ca_file, data=ca, mode=0400)
3928

    
3929
    cmd = [
3930
      pathutils.IMPORT_EXPORT_DAEMON,
3931
      status_file, mode,
3932
      "--key=%s" % key_path,
3933
      "--cert=%s" % cert_path,
3934
      "--ca=%s" % ca_file,
3935
      ]
3936

    
3937
    if host:
3938
      cmd.append("--host=%s" % host)
3939

    
3940
    if port:
3941
      cmd.append("--port=%s" % port)
3942

    
3943
    if opts.ipv6:
3944
      cmd.append("--ipv6")
3945
    else:
3946
      cmd.append("--ipv4")
3947

    
3948
    if opts.compress:
3949
      cmd.append("--compress=%s" % opts.compress)
3950

    
3951
    if opts.magic:
3952
      cmd.append("--magic=%s" % opts.magic)
3953

    
3954
    if exp_size is not None:
3955
      cmd.append("--expected-size=%s" % exp_size)
3956

    
3957
    if cmd_prefix:
3958
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
3959

    
3960
    if cmd_suffix:
3961
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
3962

    
3963
    if mode == constants.IEM_EXPORT:
3964
      # Retry connection a few times when connecting to remote peer
3965
      cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3966
      cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3967
    elif opts.connect_timeout is not None:
3968
      assert mode == constants.IEM_IMPORT
3969
      # Overall timeout for establishing connection while listening
3970
      cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3971

    
3972
    logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3973

    
3974
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3975
    # support for receiving a file descriptor for output
3976
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3977
                      output=logfile)
3978

    
3979
    # The import/export name is simply the status directory name
3980
    return os.path.basename(status_dir)
3981

    
3982
  except Exception:
3983
    shutil.rmtree(status_dir, ignore_errors=True)
3984
    raise
3985

    
3986

    
3987
def GetImportExportStatus(names):
3988
  """Returns import/export daemon status.
3989

3990
  @type names: sequence
3991
  @param names: List of names
3992
  @rtype: List of dicts
3993
  @return: Returns a list of the state of each named import/export or None if a
3994
           status couldn't be read
3995

3996
  """
3997
  result = []
3998

    
3999
  for name in names:
4000
    status_file = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name,
4001
                                 _IES_STATUS_FILE)
4002

    
4003
    try:
4004
      data = utils.ReadFile(status_file)
4005
    except EnvironmentError, err:
4006
      if err.errno != errno.ENOENT:
4007
        raise
4008
      data = None
4009

    
4010
    if not data:
4011
      result.append(None)
4012
      continue
4013

    
4014
    result.append(serializer.LoadJson(data))
4015

    
4016
  return result
4017

    
4018

    
4019
def AbortImportExport(name):
4020
  """Sends SIGTERM to a running import/export daemon.
4021

4022
  """
4023
  logging.info("Abort import/export %s", name)
4024

    
4025
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
4026
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
4027

    
4028
  if pid:
4029
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
4030
                 name, pid)
4031
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
4032

    
4033

    
4034
def CleanupImportExport(name):
4035
  """Cleanup after an import or export.
4036

4037
  If the import/export daemon is still running it's killed. Afterwards the
4038
  whole status directory is removed.
4039

4040
  """
4041
  logging.info("Finalizing import/export %s", name)
4042

    
4043
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
4044

    
4045
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
4046

    
4047
  if pid:
4048
    logging.info("Import/export %s is still running with PID %s",
4049
                 name, pid)
4050
    utils.KillProcess(pid, waitpid=False)
4051

    
4052
  shutil.rmtree(status_dir, ignore_errors=True)
4053

    
4054

    
4055
def _FindDisks(disks):
4056
  """Finds attached L{BlockDev}s for the given disks.
4057

4058
  @type disks: list of L{objects.Disk}
4059
  @param disks: the disk objects we need to find
4060

4061
  @return: list of L{BlockDev} objects or C{None} if a given disk
4062
           was not found or was no attached.
4063

4064
  """
4065
  bdevs = []
4066

    
4067
  for disk in disks:
4068
    rd = _RecursiveFindBD(disk)
4069
    if rd is None:
4070
      _Fail("Can't find device %s", disk)
4071
    bdevs.append(rd)
4072
  return bdevs
4073

    
4074

    
4075
def DrbdDisconnectNet(disks):
4076
  """Disconnects the network on a list of drbd devices.
4077

4078
  """
4079
  bdevs = _FindDisks(disks)
4080

    
4081
  # disconnect disks
4082
  for rd in bdevs:
4083
    try:
4084
      rd.DisconnectNet()
4085
    except errors.BlockDeviceError, err:
4086
      _Fail("Can't change network configuration to standalone mode: %s",
4087
            err, exc=True)
4088

    
4089

    
4090
def DrbdAttachNet(disks, instance_name, multimaster):
4091
  """Attaches the network on a list of drbd devices.
4092

4093
  """
4094
  bdevs = _FindDisks(disks)
4095

    
4096
  if multimaster:
4097
    for idx, rd in enumerate(bdevs):
4098
      try:
4099
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
4100
      except EnvironmentError, err:
4101
        _Fail("Can't create symlink: %s", err)
4102
  # reconnect disks, switch to new master configuration and if
4103
  # needed primary mode
4104
  for rd in bdevs:
4105
    try:
4106
      rd.AttachNet(multimaster)
4107
    except errors.BlockDeviceError, err:
4108
      _Fail("Can't change network configuration: %s", err)
4109

    
4110
  # wait until the disks are connected; we need to retry the re-attach
4111
  # if the device becomes standalone, as this might happen if the one
4112
  # node disconnects and reconnects in a different mode before the
4113
  # other node reconnects; in this case, one or both of the nodes will
4114
  # decide it has wrong configuration and switch to standalone
4115

    
4116
  def _Attach():
4117
    all_connected = True
4118

    
4119
    for rd in bdevs:
4120
      stats = rd.GetProcStatus()
4121

    
4122
      if multimaster:
4123
        # In the multimaster case we have to wait explicitly until
4124
        # the resource is Connected and UpToDate/UpToDate, because
4125
        # we promote *both nodes* to primary directly afterwards.
4126
        # Being in resync is not enough, since there is a race during which we
4127
        # may promote a node with an Outdated disk to primary, effectively
4128
        # tearing down the connection.
4129
        all_connected = (all_connected and
4130
                         stats.is_connected and
4131
                         stats.is_disk_uptodate and
4132
                         stats.peer_disk_uptodate)
4133
      else:
4134
        all_connected = (all_connected and
4135
                         (stats.is_connected or stats.is_in_resync))
4136

    
4137
      if stats.is_standalone:
4138
        # peer had different config info and this node became
4139
        # standalone, even though this should not happen with the
4140
        # new staged way of changing disk configs
4141
        try:
4142
          rd.AttachNet(multimaster)
4143
        except errors.BlockDeviceError, err:
4144
          _Fail("Can't change network configuration: %s", err)
4145

    
4146
    if not all_connected:
4147
      raise utils.RetryAgain()
4148

    
4149
  try:
4150
    # Start with a delay of 100 miliseconds and go up to 5 seconds
4151
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
4152
  except utils.RetryTimeout:
4153
    _Fail("Timeout in disk reconnecting")
4154

    
4155
  if multimaster:
4156
    # change to primary mode
4157
    for rd in bdevs:
4158
      try:
4159
        rd.Open()
4160
      except errors.BlockDeviceError, err:
4161
        _Fail("Can't change to primary mode: %s", err)
4162

    
4163

    
4164
def DrbdWaitSync(disks):
4165
  """Wait until DRBDs have synchronized.
4166

4167
  """
4168
  def _helper(rd):
4169
    stats = rd.GetProcStatus()
4170
    if not (stats.is_connected or stats.is_in_resync):
4171
      raise utils.RetryAgain()
4172
    return stats
4173

    
4174
  bdevs = _FindDisks(disks)
4175

    
4176
  min_resync = 100
4177
  alldone = True
4178
  for rd in bdevs:
4179
    try:
4180
      # poll each second for 15 seconds
4181
      stats = utils.Retry(_helper, 1, 15, args=[rd])
4182
    except utils.RetryTimeout:
4183
      stats = rd.GetProcStatus()
4184
      # last check
4185
      if not (stats.is_connected or stats.is_in_resync):
4186
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
4187
    alldone = alldone and (not stats.is_in_resync)
4188
    if stats.sync_percent is not None:
4189
      min_resync = min(min_resync, stats.sync_percent)
4190

    
4191
  return (alldone, min_resync)
4192

    
4193

    
4194
def DrbdNeedsActivation(disks):
4195
  """Checks which of the passed disks needs activation and returns their UUIDs.
4196

4197
  """
4198
  faulty_disks = []
4199

    
4200
  for disk in disks:
4201
    rd = _RecursiveFindBD(disk)
4202
    if rd is None:
4203
      faulty_disks.append(disk)
4204
      continue
4205

    
4206
    stats = rd.GetProcStatus()
4207
    if stats.is_standalone or stats.is_diskless:
4208
      faulty_disks.append(disk)
4209

    
4210
  return [disk.uuid for disk in faulty_disks]
4211

    
4212

    
4213
def GetDrbdUsermodeHelper():
4214
  """Returns DRBD usermode helper currently configured.
4215

4216
  """
4217
  try:
4218
    return drbd.DRBD8.GetUsermodeHelper()
4219
  except errors.BlockDeviceError, err:
4220
    _Fail(str(err))
4221

    
4222

    
4223
def PowercycleNode(hypervisor_type, hvparams=None):
4224
  """Hard-powercycle the node.
4225

4226
  Because we need to return first, and schedule the powercycle in the
4227
  background, we won't be able to report failures nicely.
4228

4229
  """
4230
  hyper = hypervisor.GetHypervisor(hypervisor_type)
4231
  try:
4232
    pid = os.fork()
4233
  except OSError:
4234
    # if we can't fork, we'll pretend that we're in the child process
4235
    pid = 0
4236
  if pid > 0:
4237
    return "Reboot scheduled in 5 seconds"
4238
  # ensure the child is running on ram
4239
  try:
4240
    utils.Mlockall()
4241
  except Exception: # pylint: disable=W0703
4242
    pass
4243
  time.sleep(5)
4244
  hyper.PowercycleNode(hvparams=hvparams)
4245

    
4246

    
4247
def _VerifyRestrictedCmdName(cmd):
4248
  """Verifies a restricted command name.
4249

4250
  @type cmd: string
4251
  @param cmd: Command name
4252
  @rtype: tuple; (boolean, string or None)
4253
  @return: The tuple's first element is the status; if C{False}, the second
4254
    element is an error message string, otherwise it's C{None}
4255

4256
  """
4257
  if not cmd.strip():
4258
    return (False, "Missing command name")
4259

    
4260
  if os.path.basename(cmd) != cmd:
4261
    return (False, "Invalid command name")
4262

    
4263
  if not constants.EXT_PLUGIN_MASK.match(cmd):
4264
    return (False, "Command name contains forbidden characters")
4265

    
4266
  return (True, None)
4267

    
4268

    
4269
def _CommonRestrictedCmdCheck(path, owner):
4270
  """Common checks for restricted command file system directories and files.
4271

4272
  @type path: string
4273
  @param path: Path to check
4274
  @param owner: C{None} or tuple containing UID and GID
4275
  @rtype: tuple; (boolean, string or C{os.stat} result)
4276
  @return: The tuple's first element is the status; if C{False}, the second
4277
    element is an error message string, otherwise it's the result of C{os.stat}
4278

4279
  """
4280
  if owner is None:
4281
    # Default to root as owner
4282
    owner = (0, 0)
4283

    
4284
  try:
4285
    st = os.stat(path)
4286
  except EnvironmentError, err:
4287
    return (False, "Can't stat(2) '%s': %s" % (path, err))
4288

    
4289
  if stat.S_IMODE(st.st_mode) & (~_RCMD_MAX_MODE):
4290
    return (False, "Permissions on '%s' are too permissive" % path)
4291

    
4292
  if (st.st_uid, st.st_gid) != owner:
4293
    (owner_uid, owner_gid) = owner
4294
    return (False, "'%s' is not owned by %s:%s" % (path, owner_uid, owner_gid))
4295

    
4296
  return (True, st)
4297

    
4298

    
4299
def _VerifyRestrictedCmdDirectory(path, _owner=None):
4300
  """Verifies restricted command directory.
4301

4302
  @type path: string
4303
  @param path: Path to check
4304
  @rtype: tuple; (boolean, string or None)
4305
  @return: The tuple's first element is the status; if C{False}, the second
4306
    element is an error message string, otherwise it's C{None}
4307

4308
  """
4309
  (status, value) = _CommonRestrictedCmdCheck(path, _owner)
4310

    
4311
  if not status:
4312
    return (False, value)
4313

    
4314
  if not stat.S_ISDIR(value.st_mode):
4315
    return (False, "Path '%s' is not a directory" % path)
4316

    
4317
  return (True, None)
4318

    
4319

    
4320
def _VerifyRestrictedCmd(path, cmd, _owner=None):
4321
  """Verifies a whole restricted command and returns its executable filename.
4322

4323
  @type path: string
4324
  @param path: Directory containing restricted commands
4325
  @type cmd: string
4326
  @param cmd: Command name
4327
  @rtype: tuple; (boolean, string)
4328
  @return: The tuple's first element is the status; if C{False}, the second
4329
    element is an error message string, otherwise the second element is the
4330
    absolute path to the executable
4331

4332
  """
4333
  executable = utils.PathJoin(path, cmd)
4334

    
4335
  (status, msg) = _CommonRestrictedCmdCheck(executable, _owner)
4336

    
4337
  if not status:
4338
    return (False, msg)
4339

    
4340
  if not utils.IsExecutable(executable):
4341
    return (False, "access(2) thinks '%s' can't be executed" % executable)
4342

    
4343
  return (True, executable)
4344

    
4345

    
4346
def _PrepareRestrictedCmd(path, cmd,
4347
                          _verify_dir=_VerifyRestrictedCmdDirectory,
4348
                          _verify_name=_VerifyRestrictedCmdName,
4349
                          _verify_cmd=_VerifyRestrictedCmd):
4350
  """Performs a number of tests on a restricted command.
4351

4352
  @type path: string
4353
  @param path: Directory containing restricted commands
4354
  @type cmd: string
4355
  @param cmd: Command name
4356
  @return: Same as L{_VerifyRestrictedCmd}
4357

4358
  """
4359
  # Verify the directory first
4360
  (status, msg) = _verify_dir(path)
4361
  if status:
4362
    # Check command if everything was alright
4363
    (status, msg) = _verify_name(cmd)
4364

    
4365
  if not status:
4366
    return (False, msg)
4367

    
4368
  # Check actual executable
4369
  return _verify_cmd(path, cmd)
4370

    
4371

    
4372
def RunRestrictedCmd(cmd,
4373
                     _lock_timeout=_RCMD_LOCK_TIMEOUT,
4374
                     _lock_file=pathutils.RESTRICTED_COMMANDS_LOCK_FILE,
4375
                     _path=pathutils.RESTRICTED_COMMANDS_DIR,
4376
                     _sleep_fn=time.sleep,
4377
                     _prepare_fn=_PrepareRestrictedCmd,
4378
                     _runcmd_fn=utils.RunCmd,
4379
                     _enabled=constants.ENABLE_RESTRICTED_COMMANDS):
4380
  """Executes a restricted command after performing strict tests.
4381

4382
  @type cmd: string
4383
  @param cmd: Command name
4384
  @rtype: string
4385
  @return: Command output
4386
  @raise RPCFail: In case of an error
4387

4388
  """
4389
  logging.info("Preparing to run restricted command '%s'", cmd)
4390

    
4391
  if not _enabled:
4392
    _Fail("Restricted commands disabled at configure time")
4393

    
4394
  lock = None
4395
  try:
4396
    cmdresult = None
4397
    try:
4398
      lock = utils.FileLock.Open(_lock_file)
4399
      lock.Exclusive(blocking=True, timeout=_lock_timeout)
4400

    
4401
      (status, value) = _prepare_fn(_path, cmd)
4402

    
4403
      if status:
4404
        cmdresult = _runcmd_fn([value], env={}, reset_env=True,
4405
                               postfork_fn=lambda _: lock.Unlock())
4406
      else:
4407
        logging.error(value)
4408
    except Exception: # pylint: disable=W0703
4409
      # Keep original error in log
4410
      logging.exception("Caught exception")
4411

    
4412
    if cmdresult is None:
4413
      logging.info("Sleeping for %0.1f seconds before returning",
4414
                   _RCMD_INVALID_DELAY)
4415
      _sleep_fn(_RCMD_INVALID_DELAY)
4416

    
4417
      # Do not include original error message in returned error
4418
      _Fail("Executing command '%s' failed" % cmd)
4419
    elif cmdresult.failed or cmdresult.fail_reason:
4420
      _Fail("Restricted command '%s' failed: %s; output: %s",
4421
            cmd, cmdresult.fail_reason, cmdresult.output)
4422
    else:
4423
      return cmdresult.output
4424
  finally:
4425
    if lock is not None:
4426
      # Release lock at last
4427
      lock.Close()
4428
      lock = None
4429

    
4430

    
4431
def SetWatcherPause(until, _filename=pathutils.WATCHER_PAUSEFILE):
4432
  """Creates or removes the watcher pause file.
4433

4434
  @type until: None or number
4435
  @param until: Unix timestamp saying until when the watcher shouldn't run
4436

4437
  """
4438
  if until is None:
4439
    logging.info("Received request to no longer pause watcher")
4440
    utils.RemoveFile(_filename)
4441
  else:
4442
    logging.info("Received request to pause watcher until %s", until)
4443

    
4444
    if not ht.TNumber(until):
4445
      _Fail("Duration must be numeric")
4446

    
4447
    utils.WriteFile(_filename, data="%d\n" % (until, ), mode=0644)
4448

    
4449

    
4450
def ConfigureOVS(ovs_name, ovs_link):
4451
  """Creates a OpenvSwitch on the node.
4452

4453
  This function sets up a OpenvSwitch on the node with given name nad
4454
  connects it via a given eth device.
4455

4456
  @type ovs_name: string
4457
  @param ovs_name: Name of the OpenvSwitch to create.
4458
  @type ovs_link: None or string
4459
  @param ovs_link: Ethernet device for outside connection (can be missing)
4460

4461
  """
4462
  # Initialize the OpenvSwitch
4463
  result = utils.RunCmd(["ovs-vsctl", "add-br", ovs_name])
4464
  if result.failed:
4465
    _Fail("Failed to create openvswitch. Script return value: %s, output: '%s'"
4466
          % (result.exit_code, result.output), log=True)
4467

    
4468
  # And connect it to a physical interface, if given
4469
  if ovs_link:
4470
    result = utils.RunCmd(["ovs-vsctl", "add-port", ovs_name, ovs_link])
4471
    if result.failed:
4472
      _Fail("Failed to connect openvswitch to  interface %s. Script return"
4473
            " value: %s, output: '%s'" % (ovs_link, result.exit_code,
4474
            result.output), log=True)
4475

    
4476

    
4477
class HooksRunner(object):
4478
  """Hook runner.
4479

4480
  This class is instantiated on the node side (ganeti-noded) and not
4481
  on the master side.
4482

4483
  """
4484
  def __init__(self, hooks_base_dir=None):
4485
    """Constructor for hooks runner.
4486

4487
    @type hooks_base_dir: str or None
4488
    @param hooks_base_dir: if not None, this overrides the
4489
        L{pathutils.HOOKS_BASE_DIR} (useful for unittests)
4490

4491
    """
4492
    if hooks_base_dir is None:
4493
      hooks_base_dir = pathutils.HOOKS_BASE_DIR
4494
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
4495
    # constant
4496
    self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
4497

    
4498
  def RunLocalHooks(self, node_list, hpath, phase, env):
4499
    """Check that the hooks will be run only locally and then run them.
4500

4501
    """
4502
    assert len(node_list) == 1
4503
    node = node_list[0]
4504
    _, myself = ssconf.GetMasterAndMyself()
4505
    assert node == myself
4506

    
4507
    results = self.RunHooks(hpath, phase, env)
4508

    
4509
    # Return values in the form expected by HooksMaster
4510
    return {node: (None, False, results)}
4511

    
4512
  def RunHooks(self, hpath, phase, env):
4513
    """Run the scripts in the hooks directory.
4514

4515
    @type hpath: str
4516
    @param hpath: the path to the hooks directory which
4517
        holds the scripts
4518
    @type phase: str
4519
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
4520
        L{constants.HOOKS_PHASE_POST}
4521
    @type env: dict
4522
    @param env: dictionary with the environment for the hook
4523
    @rtype: list
4524
    @return: list of 3-element tuples:
4525
      - script path
4526
      - script result, either L{constants.HKR_SUCCESS} or
4527
        L{constants.HKR_FAIL}
4528
      - output of the script
4529

4530
    @raise errors.ProgrammerError: for invalid input
4531
        parameters
4532

4533
    """
4534
    if phase == constants.HOOKS_PHASE_PRE:
4535
      suffix = "pre"
4536
    elif phase == constants.HOOKS_PHASE_POST:
4537
      suffix = "post"
4538
    else:
4539
      _Fail("Unknown hooks phase '%s'", phase)
4540

    
4541
    subdir = "%s-%s.d" % (hpath, suffix)
4542
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
4543

    
4544
    results = []
4545

    
4546
    if not os.path.isdir(dir_name):
4547
      # for non-existing/non-dirs, we simply exit instead of logging a
4548
      # warning at every operation
4549
      return results
4550

    
4551
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
4552

    
4553
    for (relname, relstatus, runresult) in runparts_results:
4554
      if relstatus == constants.RUNPARTS_SKIP:
4555
        rrval = constants.HKR_SKIP
4556
        output = ""
4557
      elif relstatus == constants.RUNPARTS_ERR:
4558
        rrval = constants.HKR_FAIL
4559
        output = "Hook script execution error: %s" % runresult
4560
      elif relstatus == constants.RUNPARTS_RUN:
4561
        if runresult.failed:
4562
          rrval = constants.HKR_FAIL
4563
        else:
4564
          rrval = constants.HKR_SUCCESS
4565
        output = utils.SafeEncode(runresult.output.strip())
4566
      results.append(("%s/%s" % (subdir, relname), rrval, output))
4567

    
4568
    return results
4569

    
4570

    
4571
class IAllocatorRunner(object):
4572
  """IAllocator runner.
4573

4574
  This class is instantiated on the node side (ganeti-noded) and not on
4575
  the master side.
4576

4577
  """
4578
  @staticmethod
4579
  def Run(name, idata, ial_params):
4580
    """Run an iallocator script.
4581

4582
    @type name: str
4583
    @param name: the iallocator script name
4584
    @type idata: str
4585
    @param idata: the allocator input data
4586
    @type ial_params: list
4587
    @param ial_params: the iallocator parameters
4588

4589
    @rtype: tuple
4590
    @return: two element tuple of:
4591
       - status
4592
       - either error message or stdout of allocator (for success)
4593

4594
    """
4595
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
4596
                                  os.path.isfile)
4597
    if alloc_script is None:
4598
      _Fail("iallocator module '%s' not found in the search path", name)
4599

    
4600
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
4601
    try:
4602
      os.write(fd, idata)
4603
      os.close(fd)
4604
      result = utils.RunCmd([alloc_script, fin_name] + ial_params)
4605
      if result.failed:
4606
        _Fail("iallocator module '%s' failed: %s, output '%s'",
4607
              name, result.fail_reason, result.output)
4608
    finally:
4609
      os.unlink(fin_name)
4610

    
4611
    return result.stdout
4612

    
4613

    
4614
class DevCacheManager(object):
4615
  """Simple class for managing a cache of block device information.
4616

4617
  """
4618
  _DEV_PREFIX = "/dev/"
4619
  _ROOT_DIR = pathutils.BDEV_CACHE_DIR
4620

    
4621
  @classmethod
4622
  def _ConvertPath(cls, dev_path):
4623
    """Converts a /dev/name path to the cache file name.
4624

4625
    This replaces slashes with underscores and strips the /dev
4626
    prefix. It then returns the full path to the cache file.
4627

4628
    @type dev_path: str
4629
    @param dev_path: the C{/dev/} path name
4630
    @rtype: str
4631
    @return: the converted path name
4632

4633
    """
4634
    if dev_path.startswith(cls._DEV_PREFIX):
4635
      dev_path = dev_path[len(cls._DEV_PREFIX):]
4636
    dev_path = dev_path.replace("/", "_")
4637
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
4638
    return fpath
4639

    
4640
  @classmethod
4641
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
4642
    """Updates the cache information for a given device.
4643

4644
    @type dev_path: str
4645
    @param dev_path: the pathname of the device
4646
    @type owner: str
4647
    @param owner: the owner (instance name) of the device
4648
    @type on_primary: bool
4649
    @param on_primary: whether this is the primary
4650
        node nor not
4651
    @type iv_name: str
4652
    @param iv_name: the instance-visible name of the
4653
        device, as in objects.Disk.iv_name
4654

4655
    @rtype: None
4656

4657
    """
4658
    if dev_path is None:
4659
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
4660
      return
4661
    fpath = cls._ConvertPath(dev_path)
4662
    if on_primary:
4663
      state = "primary"
4664
    else:
4665
      state = "secondary"
4666
    if iv_name is None:
4667
      iv_name = "not_visible"
4668
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
4669
    try:
4670
      utils.WriteFile(fpath, data=fdata)
4671
    except EnvironmentError, err:
4672
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
4673

    
4674
  @classmethod
4675
  def RemoveCache(cls, dev_path):
4676
    """Remove data for a dev_path.
4677

4678
    This is just a wrapper over L{utils.io.RemoveFile} with a converted
4679
    path name and logging.
4680

4681
    @type dev_path: str
4682
    @param dev_path: the pathname of the device
4683

4684
    @rtype: None
4685

4686
    """
4687
    if dev_path is None:
4688
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
4689
      return
4690
    fpath = cls._ConvertPath(dev_path)
4691
    try:
4692
      utils.RemoveFile(fpath)
4693
    except EnvironmentError, err:
4694
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)