Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 439e1d3f

History | View | Annotate | Download (133.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable=E1103
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37

    
38
import os
39
import os.path
40
import shutil
41
import time
42
import stat
43
import errno
44
import re
45
import random
46
import logging
47
import tempfile
48
import zlib
49
import base64
50
import signal
51

    
52
from ganeti import errors
53
from ganeti import utils
54
from ganeti import ssh
55
from ganeti import hypervisor
56
from ganeti import constants
57
from ganeti.storage import bdev
58
from ganeti.storage import drbd
59
from ganeti import objects
60
from ganeti import ssconf
61
from ganeti import serializer
62
from ganeti import netutils
63
from ganeti import runtime
64
from ganeti import compat
65
from ganeti import pathutils
66
from ganeti import vcluster
67
from ganeti import ht
68
from ganeti.storage.base import BlockDev
69
from ganeti.storage.drbd import DRBD8
70
from ganeti import hooksmaster
71

    
72

    
73
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
74
_ALLOWED_CLEAN_DIRS = compat.UniqueFrozenset([
75
  pathutils.DATA_DIR,
76
  pathutils.JOB_QUEUE_ARCHIVE_DIR,
77
  pathutils.QUEUE_DIR,
78
  pathutils.CRYPTO_KEYS_DIR,
79
  ])
80
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
81
_X509_KEY_FILE = "key"
82
_X509_CERT_FILE = "cert"
83
_IES_STATUS_FILE = "status"
84
_IES_PID_FILE = "pid"
85
_IES_CA_FILE = "ca"
86

    
87
#: Valid LVS output line regex
88
_LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$")
89

    
90
# Actions for the master setup script
91
_MASTER_START = "start"
92
_MASTER_STOP = "stop"
93

    
94
#: Maximum file permissions for restricted command directory and executables
95
_RCMD_MAX_MODE = (stat.S_IRWXU |
96
                  stat.S_IRGRP | stat.S_IXGRP |
97
                  stat.S_IROTH | stat.S_IXOTH)
98

    
99
#: Delay before returning an error for restricted commands
100
_RCMD_INVALID_DELAY = 10
101

    
102
#: How long to wait to acquire lock for restricted commands (shorter than
103
#: L{_RCMD_INVALID_DELAY}) to reduce blockage of noded forks when many
104
#: command requests arrive
105
_RCMD_LOCK_TIMEOUT = _RCMD_INVALID_DELAY * 0.8
106

    
107

    
108
class RPCFail(Exception):
109
  """Class denoting RPC failure.
110

111
  Its argument is the error message.
112

113
  """
114

    
115

    
116
def _GetInstReasonFilename(instance_name):
117
  """Path of the file containing the reason of the instance status change.
118

119
  @type instance_name: string
120
  @param instance_name: The name of the instance
121
  @rtype: string
122
  @return: The path of the file
123

124
  """
125
  return utils.PathJoin(pathutils.INSTANCE_REASON_DIR, instance_name)
126

    
127

    
128
def _StoreInstReasonTrail(instance_name, trail):
129
  """Serialize a reason trail related to an instance change of state to file.
130

131
  The exact location of the file depends on the name of the instance and on
132
  the configuration of the Ganeti cluster defined at deploy time.
133

134
  @type instance_name: string
135
  @param instance_name: The name of the instance
136
  @rtype: None
137

138
  """
139
  json = serializer.DumpJson(trail)
140
  filename = _GetInstReasonFilename(instance_name)
141
  utils.WriteFile(filename, data=json)
142

    
143

    
144
def _Fail(msg, *args, **kwargs):
145
  """Log an error and the raise an RPCFail exception.
146

147
  This exception is then handled specially in the ganeti daemon and
148
  turned into a 'failed' return type. As such, this function is a
149
  useful shortcut for logging the error and returning it to the master
150
  daemon.
151

152
  @type msg: string
153
  @param msg: the text of the exception
154
  @raise RPCFail
155

156
  """
157
  if args:
158
    msg = msg % args
159
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
160
    if "exc" in kwargs and kwargs["exc"]:
161
      logging.exception(msg)
162
    else:
163
      logging.error(msg)
164
  raise RPCFail(msg)
165

    
166

    
167
def _GetConfig():
168
  """Simple wrapper to return a SimpleStore.
169

170
  @rtype: L{ssconf.SimpleStore}
171
  @return: a SimpleStore instance
172

173
  """
174
  return ssconf.SimpleStore()
175

    
176

    
177
def _GetSshRunner(cluster_name):
178
  """Simple wrapper to return an SshRunner.
179

180
  @type cluster_name: str
181
  @param cluster_name: the cluster name, which is needed
182
      by the SshRunner constructor
183
  @rtype: L{ssh.SshRunner}
184
  @return: an SshRunner instance
185

186
  """
187
  return ssh.SshRunner(cluster_name)
188

    
189

    
190
def _Decompress(data):
191
  """Unpacks data compressed by the RPC client.
192

193
  @type data: list or tuple
194
  @param data: Data sent by RPC client
195
  @rtype: str
196
  @return: Decompressed data
197

198
  """
199
  assert isinstance(data, (list, tuple))
200
  assert len(data) == 2
201
  (encoding, content) = data
202
  if encoding == constants.RPC_ENCODING_NONE:
203
    return content
204
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
205
    return zlib.decompress(base64.b64decode(content))
206
  else:
207
    raise AssertionError("Unknown data encoding")
208

    
209

    
210
def _CleanDirectory(path, exclude=None):
211
  """Removes all regular files in a directory.
212

213
  @type path: str
214
  @param path: the directory to clean
215
  @type exclude: list
216
  @param exclude: list of files to be excluded, defaults
217
      to the empty list
218

219
  """
220
  if path not in _ALLOWED_CLEAN_DIRS:
221
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
222
          path)
223

    
224
  if not os.path.isdir(path):
225
    return
226
  if exclude is None:
227
    exclude = []
228
  else:
229
    # Normalize excluded paths
230
    exclude = [os.path.normpath(i) for i in exclude]
231

    
232
  for rel_name in utils.ListVisibleFiles(path):
233
    full_name = utils.PathJoin(path, rel_name)
234
    if full_name in exclude:
235
      continue
236
    if os.path.isfile(full_name) and not os.path.islink(full_name):
237
      utils.RemoveFile(full_name)
238

    
239

    
240
def _BuildUploadFileList():
241
  """Build the list of allowed upload files.
242

243
  This is abstracted so that it's built only once at module import time.
244

245
  """
246
  allowed_files = set([
247
    pathutils.CLUSTER_CONF_FILE,
248
    pathutils.ETC_HOSTS,
249
    pathutils.SSH_KNOWN_HOSTS_FILE,
250
    pathutils.VNC_PASSWORD_FILE,
251
    pathutils.RAPI_CERT_FILE,
252
    pathutils.SPICE_CERT_FILE,
253
    pathutils.SPICE_CACERT_FILE,
254
    pathutils.RAPI_USERS_FILE,
255
    pathutils.CONFD_HMAC_KEY,
256
    pathutils.CLUSTER_DOMAIN_SECRET_FILE,
257
    ])
258

    
259
  for hv_name in constants.HYPER_TYPES:
260
    hv_class = hypervisor.GetHypervisorClass(hv_name)
261
    allowed_files.update(hv_class.GetAncillaryFiles()[0])
262

    
263
  assert pathutils.FILE_STORAGE_PATHS_FILE not in allowed_files, \
264
    "Allowed file storage paths should never be uploaded via RPC"
265

    
266
  return frozenset(allowed_files)
267

    
268

    
269
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
270

    
271

    
272
def JobQueuePurge():
273
  """Removes job queue files and archived jobs.
274

275
  @rtype: tuple
276
  @return: True, None
277

278
  """
279
  _CleanDirectory(pathutils.QUEUE_DIR, exclude=[pathutils.JOB_QUEUE_LOCK_FILE])
280
  _CleanDirectory(pathutils.JOB_QUEUE_ARCHIVE_DIR)
281

    
282

    
283
def GetMasterInfo():
284
  """Returns master information.
285

286
  This is an utility function to compute master information, either
287
  for consumption here or from the node daemon.
288

289
  @rtype: tuple
290
  @return: master_netdev, master_ip, master_name, primary_ip_family,
291
    master_netmask
292
  @raise RPCFail: in case of errors
293

294
  """
295
  try:
296
    cfg = _GetConfig()
297
    master_netdev = cfg.GetMasterNetdev()
298
    master_ip = cfg.GetMasterIP()
299
    master_netmask = cfg.GetMasterNetmask()
300
    master_node = cfg.GetMasterNode()
301
    primary_ip_family = cfg.GetPrimaryIPFamily()
302
  except errors.ConfigurationError, err:
303
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
304
  return (master_netdev, master_ip, master_node, primary_ip_family,
305
          master_netmask)
306

    
307

    
308
def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
309
  """Decorator that runs hooks before and after the decorated function.
310

311
  @type hook_opcode: string
312
  @param hook_opcode: opcode of the hook
313
  @type hooks_path: string
314
  @param hooks_path: path of the hooks
315
  @type env_builder_fn: function
316
  @param env_builder_fn: function that returns a dictionary containing the
317
    environment variables for the hooks. Will get all the parameters of the
318
    decorated function.
319
  @raise RPCFail: in case of pre-hook failure
320

321
  """
322
  def decorator(fn):
323
    def wrapper(*args, **kwargs):
324
      _, myself = ssconf.GetMasterAndMyself()
325
      nodes = ([myself], [myself])  # these hooks run locally
326

    
327
      env_fn = compat.partial(env_builder_fn, *args, **kwargs)
328

    
329
      cfg = _GetConfig()
330
      hr = HooksRunner()
331
      hm = hooksmaster.HooksMaster(hook_opcode, hooks_path, nodes,
332
                                   hr.RunLocalHooks, None, env_fn,
333
                                   logging.warning, cfg.GetClusterName(),
334
                                   cfg.GetMasterNode())
335
      hm.RunPhase(constants.HOOKS_PHASE_PRE)
336
      result = fn(*args, **kwargs)
337
      hm.RunPhase(constants.HOOKS_PHASE_POST)
338

    
339
      return result
340
    return wrapper
341
  return decorator
342

    
343

    
344
def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
345
  """Builds environment variables for master IP hooks.
346

347
  @type master_params: L{objects.MasterNetworkParameters}
348
  @param master_params: network parameters of the master
349
  @type use_external_mip_script: boolean
350
  @param use_external_mip_script: whether to use an external master IP
351
    address setup script (unused, but necessary per the implementation of the
352
    _RunLocalHooks decorator)
353

354
  """
355
  # pylint: disable=W0613
356
  ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
357
  env = {
358
    "MASTER_NETDEV": master_params.netdev,
359
    "MASTER_IP": master_params.ip,
360
    "MASTER_NETMASK": str(master_params.netmask),
361
    "CLUSTER_IP_VERSION": str(ver),
362
  }
363

    
364
  return env
365

    
366

    
367
def _RunMasterSetupScript(master_params, action, use_external_mip_script):
368
  """Execute the master IP address setup script.
369

370
  @type master_params: L{objects.MasterNetworkParameters}
371
  @param master_params: network parameters of the master
372
  @type action: string
373
  @param action: action to pass to the script. Must be one of
374
    L{backend._MASTER_START} or L{backend._MASTER_STOP}
375
  @type use_external_mip_script: boolean
376
  @param use_external_mip_script: whether to use an external master IP
377
    address setup script
378
  @raise backend.RPCFail: if there are errors during the execution of the
379
    script
380

381
  """
382
  env = _BuildMasterIpEnv(master_params)
383

    
384
  if use_external_mip_script:
385
    setup_script = pathutils.EXTERNAL_MASTER_SETUP_SCRIPT
386
  else:
387
    setup_script = pathutils.DEFAULT_MASTER_SETUP_SCRIPT
388

    
389
  result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
390

    
391
  if result.failed:
392
    _Fail("Failed to %s the master IP. Script return value: %s, output: '%s'" %
393
          (action, result.exit_code, result.output), log=True)
394

    
395

    
396
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
397
               _BuildMasterIpEnv)
398
def ActivateMasterIp(master_params, use_external_mip_script):
399
  """Activate the IP address of the master daemon.
400

401
  @type master_params: L{objects.MasterNetworkParameters}
402
  @param master_params: network parameters of the master
403
  @type use_external_mip_script: boolean
404
  @param use_external_mip_script: whether to use an external master IP
405
    address setup script
406
  @raise RPCFail: in case of errors during the IP startup
407

408
  """
409
  _RunMasterSetupScript(master_params, _MASTER_START,
410
                        use_external_mip_script)
411

    
412

    
413
def StartMasterDaemons(no_voting):
414
  """Activate local node as master node.
415

416
  The function will start the master daemons (ganeti-masterd and ganeti-rapi).
417

418
  @type no_voting: boolean
419
  @param no_voting: whether to start ganeti-masterd without a node vote
420
      but still non-interactively
421
  @rtype: None
422

423
  """
424

    
425
  if no_voting:
426
    masterd_args = "--no-voting --yes-do-it"
427
  else:
428
    masterd_args = ""
429

    
430
  env = {
431
    "EXTRA_MASTERD_ARGS": masterd_args,
432
    }
433

    
434
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
435
  if result.failed:
436
    msg = "Can't start Ganeti master: %s" % result.output
437
    logging.error(msg)
438
    _Fail(msg)
439

    
440

    
441
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
442
               _BuildMasterIpEnv)
443
def DeactivateMasterIp(master_params, use_external_mip_script):
444
  """Deactivate the master IP on this node.
445

446
  @type master_params: L{objects.MasterNetworkParameters}
447
  @param master_params: network parameters of the master
448
  @type use_external_mip_script: boolean
449
  @param use_external_mip_script: whether to use an external master IP
450
    address setup script
451
  @raise RPCFail: in case of errors during the IP turndown
452

453
  """
454
  _RunMasterSetupScript(master_params, _MASTER_STOP,
455
                        use_external_mip_script)
456

    
457

    
458
def StopMasterDaemons():
459
  """Stop the master daemons on this node.
460

461
  Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
462

463
  @rtype: None
464

465
  """
466
  # TODO: log and report back to the caller the error failures; we
467
  # need to decide in which case we fail the RPC for this
468

    
469
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
470
  if result.failed:
471
    logging.error("Could not stop Ganeti master, command %s had exitcode %s"
472
                  " and error %s",
473
                  result.cmd, result.exit_code, result.output)
474

    
475

    
476
def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
477
  """Change the netmask of the master IP.
478

479
  @param old_netmask: the old value of the netmask
480
  @param netmask: the new value of the netmask
481
  @param master_ip: the master IP
482
  @param master_netdev: the master network device
483

484
  """
485
  if old_netmask == netmask:
486
    return
487

    
488
  if not netutils.IPAddress.Own(master_ip):
489
    _Fail("The master IP address is not up, not attempting to change its"
490
          " netmask")
491

    
492
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
493
                         "%s/%s" % (master_ip, netmask),
494
                         "dev", master_netdev, "label",
495
                         "%s:0" % master_netdev])
496
  if result.failed:
497
    _Fail("Could not set the new netmask on the master IP address")
498

    
499
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
500
                         "%s/%s" % (master_ip, old_netmask),
501
                         "dev", master_netdev, "label",
502
                         "%s:0" % master_netdev])
503
  if result.failed:
504
    _Fail("Could not bring down the master IP address with the old netmask")
505

    
506

    
507
def EtcHostsModify(mode, host, ip):
508
  """Modify a host entry in /etc/hosts.
509

510
  @param mode: The mode to operate. Either add or remove entry
511
  @param host: The host to operate on
512
  @param ip: The ip associated with the entry
513

514
  """
515
  if mode == constants.ETC_HOSTS_ADD:
516
    if not ip:
517
      RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
518
              " present")
519
    utils.AddHostToEtcHosts(host, ip)
520
  elif mode == constants.ETC_HOSTS_REMOVE:
521
    if ip:
522
      RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
523
              " parameter is present")
524
    utils.RemoveHostFromEtcHosts(host)
525
  else:
526
    RPCFail("Mode not supported")
527

    
528

    
529
def LeaveCluster(modify_ssh_setup):
530
  """Cleans up and remove the current node.
531

532
  This function cleans up and prepares the current node to be removed
533
  from the cluster.
534

535
  If processing is successful, then it raises an
536
  L{errors.QuitGanetiException} which is used as a special case to
537
  shutdown the node daemon.
538

539
  @param modify_ssh_setup: boolean
540

541
  """
542
  _CleanDirectory(pathutils.DATA_DIR)
543
  _CleanDirectory(pathutils.CRYPTO_KEYS_DIR)
544
  JobQueuePurge()
545

    
546
  if modify_ssh_setup:
547
    try:
548
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
549

    
550
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
551

    
552
      utils.RemoveFile(priv_key)
553
      utils.RemoveFile(pub_key)
554
    except errors.OpExecError:
555
      logging.exception("Error while processing ssh files")
556

    
557
  try:
558
    utils.RemoveFile(pathutils.CONFD_HMAC_KEY)
559
    utils.RemoveFile(pathutils.RAPI_CERT_FILE)
560
    utils.RemoveFile(pathutils.SPICE_CERT_FILE)
561
    utils.RemoveFile(pathutils.SPICE_CACERT_FILE)
562
    utils.RemoveFile(pathutils.NODED_CERT_FILE)
563
  except: # pylint: disable=W0702
564
    logging.exception("Error while removing cluster secrets")
565

    
566
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.CONFD])
567
  if result.failed:
568
    logging.error("Command %s failed with exitcode %s and error %s",
569
                  result.cmd, result.exit_code, result.output)
570

    
571
  # Raise a custom exception (handled in ganeti-noded)
572
  raise errors.QuitGanetiException(True, "Shutdown scheduled")
573

    
574

    
575
def _GetVgInfo(name, excl_stor):
576
  """Retrieves information about a LVM volume group.
577

578
  """
579
  # TODO: GetVGInfo supports returning information for multiple VGs at once
580
  vginfo = bdev.LogicalVolume.GetVGInfo([name], excl_stor)
581
  if vginfo:
582
    vg_free = int(round(vginfo[0][0], 0))
583
    vg_size = int(round(vginfo[0][1], 0))
584
  else:
585
    vg_free = None
586
    vg_size = None
587

    
588
  return {
589
    "name": name,
590
    "vg_free": vg_free,
591
    "vg_size": vg_size,
592
    }
593

    
594

    
595
def _GetVgSpindlesInfo(name, excl_stor):
596
  """Retrieves information about spindles in an LVM volume group.
597

598
  @type name: string
599
  @param name: VG name
600
  @type excl_stor: bool
601
  @param excl_stor: exclusive storage
602
  @rtype: dict
603
  @return: dictionary whose keys are "name", "vg_free", "vg_size" for VG name,
604
      free spindles, total spindles respectively
605

606
  """
607
  if excl_stor:
608
    (vg_free, vg_size) = bdev.LogicalVolume.GetVgSpindlesInfo(name)
609
  else:
610
    vg_free = 0
611
    vg_size = 0
612
  return {
613
    "name": name,
614
    "vg_free": vg_free,
615
    "vg_size": vg_size,
616
    }
617

    
618

    
619
def _GetHvInfo(name, hvparams, get_hv_fn=hypervisor.GetHypervisor):
620
  """Retrieves node information from a hypervisor.
621

622
  The information returned depends on the hypervisor. Common items:
623

624
    - vg_size is the size of the configured volume group in MiB
625
    - vg_free is the free size of the volume group in MiB
626
    - memory_dom0 is the memory allocated for domain0 in MiB
627
    - memory_free is the currently available (free) ram in MiB
628
    - memory_total is the total number of ram in MiB
629
    - hv_version: the hypervisor version, if available
630

631
  @type hvparams: dict of string
632
  @param hvparams: the hypervisor's hvparams
633

634
  """
635
  return get_hv_fn(name).GetNodeInfo(hvparams=hvparams)
636

    
637

    
638
def _GetHvInfoAll(hv_specs, get_hv_fn=hypervisor.GetHypervisor):
639
  """Retrieves node information for all hypervisors.
640

641
  See C{_GetHvInfo} for information on the output.
642

643
  @type hv_specs: list of pairs (string, dict of strings)
644
  @param hv_specs: list of pairs of a hypervisor's name and its hvparams
645

646
  """
647
  if hv_specs is None:
648
    return None
649

    
650
  # FIXME: remove this fallback once all calls to call_node_info are fixed
651
  if (len(hv_specs) > 0) and isinstance(hv_specs[0], str):
652
    result = []
653
    for hvname in hv_specs:
654
      result.append(_GetHvInfo(hvname, None, get_hv_fn))
655
    return result
656

    
657
  result = []
658
  for hvname, hvparams in hv_specs:
659
    result.append(_GetHvInfo(hvname, hvparams, get_hv_fn))
660
  return result
661

    
662

    
663
def _GetNamedNodeInfo(names, fn):
664
  """Calls C{fn} for all names in C{names} and returns a dictionary.
665

666
  @rtype: None or dict
667

668
  """
669
  if names is None:
670
    return None
671
  else:
672
    return map(fn, names)
673

    
674

    
675
def GetNodeInfo(storage_units, hv_specs, excl_stor):
676
  """Gives back a hash with different information about the node.
677

678
  @type storage_units: list of pairs (string, string)
679
  @param storage_units: List of pairs (storage unit, identifier) to ask for disk
680
                        space information. In case of lvm-vg, the identifier is
681
                        the VG name.
682
  @type hv_specs: list of pairs (string, dict of strings)
683
  @param hv_specs: list of pairs of a hypervisor's name and its hvparams
684
  @type excl_stor: boolean
685
  @param excl_stor: Whether exclusive_storage is active
686
  @rtype: tuple; (string, None/dict, None/dict)
687
  @return: Tuple containing boot ID, volume group information and hypervisor
688
    information
689

690
  """
691
  bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
692
  storage_info = _GetNamedNodeInfo(
693
    storage_units,
694
    (lambda storage_unit: _ApplyStorageInfoFunction(storage_unit[0],
695
                                                    storage_unit[1],
696
                                                    excl_stor)))
697
  hv_info = _GetHvInfoAll(hv_specs)
698
  return (bootid, storage_info, hv_info)
699

    
700

    
701
# FIXME: implement storage reporting for all missing storage types.
702
_STORAGE_TYPE_INFO_FN = {
703
  constants.ST_BLOCK: None,
704
  constants.ST_DISKLESS: None,
705
  constants.ST_EXT: None,
706
  constants.ST_FILE: None,
707
  constants.ST_LVM_PV: _GetVgSpindlesInfo,
708
  constants.ST_LVM_VG: _GetVgInfo,
709
  constants.ST_RADOS: None,
710
}
711

    
712

    
713
def _ApplyStorageInfoFunction(storage_type, storage_key, *args):
714
  """Looks up and applies the correct function to calculate free and total
715
  storage for the given storage type.
716

717
  @type storage_type: string
718
  @param storage_type: the storage type for which the storage shall be reported.
719
  @type storage_key: string
720
  @param storage_key: identifier of a storage unit, e.g. the volume group name
721
    of an LVM storage unit
722
  @type args: any
723
  @param args: various parameters that can be used for storage reporting. These
724
    parameters and their semantics vary from storage type to storage type and
725
    are just propagated in this function.
726
  @return: the results of the application of the storage space function (see
727
    _STORAGE_TYPE_INFO_FN) if storage space reporting is implemented for that
728
    storage type
729
  @raises NotImplementedError: for storage types who don't support space
730
    reporting yet
731
  """
732
  fn = _STORAGE_TYPE_INFO_FN[storage_type]
733
  if fn is not None:
734
    return fn(storage_key, *args)
735
  else:
736
    raise NotImplementedError
737

    
738

    
739
def _CheckExclusivePvs(pvi_list):
740
  """Check that PVs are not shared among LVs
741

742
  @type pvi_list: list of L{objects.LvmPvInfo} objects
743
  @param pvi_list: information about the PVs
744

745
  @rtype: list of tuples (string, list of strings)
746
  @return: offending volumes, as tuples: (pv_name, [lv1_name, lv2_name...])
747

748
  """
749
  res = []
750
  for pvi in pvi_list:
751
    if len(pvi.lv_list) > 1:
752
      res.append((pvi.name, pvi.lv_list))
753
  return res
754

    
755

    
756
def _VerifyHypervisors(what, vm_capable, result, all_hvparams,
757
                       get_hv_fn=hypervisor.GetHypervisor):
758
  """Verifies the hypervisor. Appends the results to the 'results' list.
759

760
  @type what: C{dict}
761
  @param what: a dictionary of things to check
762
  @type vm_capable: boolean
763
  @param vm_capable: whether or not this node is vm capable
764
  @type result: dict
765
  @param result: dictionary of verification results; results of the
766
    verifications in this function will be added here
767
  @type all_hvparams: dict of dict of string
768
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
769
  @type get_hv_fn: function
770
  @param get_hv_fn: function to retrieve the hypervisor, to improve testability
771

772
  """
773
  if not vm_capable:
774
    return
775

    
776
  if constants.NV_HYPERVISOR in what:
777
    result[constants.NV_HYPERVISOR] = {}
778
    for hv_name in what[constants.NV_HYPERVISOR]:
779
      hvparams = all_hvparams[hv_name]
780
      try:
781
        val = get_hv_fn(hv_name).Verify(hvparams=hvparams)
782
      except errors.HypervisorError, err:
783
        val = "Error while checking hypervisor: %s" % str(err)
784
      result[constants.NV_HYPERVISOR][hv_name] = val
785

    
786

    
787
def _VerifyHvparams(what, vm_capable, result,
788
                    get_hv_fn=hypervisor.GetHypervisor):
789
  """Verifies the hvparams. Appends the results to the 'results' list.
790

791
  @type what: C{dict}
792
  @param what: a dictionary of things to check
793
  @type vm_capable: boolean
794
  @param vm_capable: whether or not this node is vm capable
795
  @type result: dict
796
  @param result: dictionary of verification results; results of the
797
    verifications in this function will be added here
798
  @type get_hv_fn: function
799
  @param get_hv_fn: function to retrieve the hypervisor, to improve testability
800

801
  """
802
  if not vm_capable:
803
    return
804

    
805
  if constants.NV_HVPARAMS in what:
806
    result[constants.NV_HVPARAMS] = []
807
    for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
808
      try:
809
        logging.info("Validating hv %s, %s", hv_name, hvparms)
810
        get_hv_fn(hv_name).ValidateParameters(hvparms)
811
      except errors.HypervisorError, err:
812
        result[constants.NV_HVPARAMS].append((source, hv_name, str(err)))
813

    
814

    
815
def _VerifyInstanceList(what, vm_capable, result, all_hvparams):
816
  """Verifies the instance list.
817

818
  @type what: C{dict}
819
  @param what: a dictionary of things to check
820
  @type vm_capable: boolean
821
  @param vm_capable: whether or not this node is vm capable
822
  @type result: dict
823
  @param result: dictionary of verification results; results of the
824
    verifications in this function will be added here
825
  @type all_hvparams: dict of dict of string
826
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
827

828
  """
829
  if constants.NV_INSTANCELIST in what and vm_capable:
830
    # GetInstanceList can fail
831
    try:
832
      val = GetInstanceList(what[constants.NV_INSTANCELIST],
833
                            all_hvparams=all_hvparams)
834
    except RPCFail, err:
835
      val = str(err)
836
    result[constants.NV_INSTANCELIST] = val
837

    
838

    
839
def _VerifyNodeInfo(what, vm_capable, result, all_hvparams):
840
  """Verifies the node info.
841

842
  @type what: C{dict}
843
  @param what: a dictionary of things to check
844
  @type vm_capable: boolean
845
  @param vm_capable: whether or not this node is vm capable
846
  @type result: dict
847
  @param result: dictionary of verification results; results of the
848
    verifications in this function will be added here
849
  @type all_hvparams: dict of dict of string
850
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
851

852
  """
853
  if constants.NV_HVINFO in what and vm_capable:
854
    hvname = what[constants.NV_HVINFO]
855
    hyper = hypervisor.GetHypervisor(hvname)
856
    hvparams = all_hvparams[hvname]
857
    result[constants.NV_HVINFO] = hyper.GetNodeInfo(hvparams=hvparams)
858

    
859

    
860
def VerifyNode(what, cluster_name, all_hvparams):
861
  """Verify the status of the local node.
862

863
  Based on the input L{what} parameter, various checks are done on the
864
  local node.
865

866
  If the I{filelist} key is present, this list of
867
  files is checksummed and the file/checksum pairs are returned.
868

869
  If the I{nodelist} key is present, we check that we have
870
  connectivity via ssh with the target nodes (and check the hostname
871
  report).
872

873
  If the I{node-net-test} key is present, we check that we have
874
  connectivity to the given nodes via both primary IP and, if
875
  applicable, secondary IPs.
876

877
  @type what: C{dict}
878
  @param what: a dictionary of things to check:
879
      - filelist: list of files for which to compute checksums
880
      - nodelist: list of nodes we should check ssh communication with
881
      - node-net-test: list of nodes we should check node daemon port
882
        connectivity with
883
      - hypervisor: list with hypervisors to run the verify for
884
  @type cluster_name: string
885
  @param cluster_name: the cluster's name
886
  @type all_hvparams: dict of dict of strings
887
  @param all_hvparams: a dictionary mapping hypervisor names to hvparams
888
  @rtype: dict
889
  @return: a dictionary with the same keys as the input dict, and
890
      values representing the result of the checks
891

892
  """
893
  result = {}
894
  my_name = netutils.Hostname.GetSysName()
895
  port = netutils.GetDaemonPort(constants.NODED)
896
  vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
897

    
898
  _VerifyHypervisors(what, vm_capable, result, all_hvparams)
899
  _VerifyHvparams(what, vm_capable, result)
900

    
901
  if constants.NV_FILELIST in what:
902
    fingerprints = utils.FingerprintFiles(map(vcluster.LocalizeVirtualPath,
903
                                              what[constants.NV_FILELIST]))
904
    result[constants.NV_FILELIST] = \
905
      dict((vcluster.MakeVirtualPath(key), value)
906
           for (key, value) in fingerprints.items())
907

    
908
  if constants.NV_NODELIST in what:
909
    (nodes, bynode) = what[constants.NV_NODELIST]
910

    
911
    # Add nodes from other groups (different for each node)
912
    try:
913
      nodes.extend(bynode[my_name])
914
    except KeyError:
915
      pass
916

    
917
    # Use a random order
918
    random.shuffle(nodes)
919

    
920
    # Try to contact all nodes
921
    val = {}
922
    for node in nodes:
923
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
924
      if not success:
925
        val[node] = message
926

    
927
    result[constants.NV_NODELIST] = val
928

    
929
  if constants.NV_NODENETTEST in what:
930
    result[constants.NV_NODENETTEST] = tmp = {}
931
    my_pip = my_sip = None
932
    for name, pip, sip in what[constants.NV_NODENETTEST]:
933
      if name == my_name:
934
        my_pip = pip
935
        my_sip = sip
936
        break
937
    if not my_pip:
938
      tmp[my_name] = ("Can't find my own primary/secondary IP"
939
                      " in the node list")
940
    else:
941
      for name, pip, sip in what[constants.NV_NODENETTEST]:
942
        fail = []
943
        if not netutils.TcpPing(pip, port, source=my_pip):
944
          fail.append("primary")
945
        if sip != pip:
946
          if not netutils.TcpPing(sip, port, source=my_sip):
947
            fail.append("secondary")
948
        if fail:
949
          tmp[name] = ("failure using the %s interface(s)" %
950
                       " and ".join(fail))
951

    
952
  if constants.NV_MASTERIP in what:
953
    # FIXME: add checks on incoming data structures (here and in the
954
    # rest of the function)
955
    master_name, master_ip = what[constants.NV_MASTERIP]
956
    if master_name == my_name:
957
      source = constants.IP4_ADDRESS_LOCALHOST
958
    else:
959
      source = None
960
    result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
961
                                                     source=source)
962

    
963
  if constants.NV_USERSCRIPTS in what:
964
    result[constants.NV_USERSCRIPTS] = \
965
      [script for script in what[constants.NV_USERSCRIPTS]
966
       if not utils.IsExecutable(script)]
967

    
968
  if constants.NV_OOB_PATHS in what:
969
    result[constants.NV_OOB_PATHS] = tmp = []
970
    for path in what[constants.NV_OOB_PATHS]:
971
      try:
972
        st = os.stat(path)
973
      except OSError, err:
974
        tmp.append("error stating out of band helper: %s" % err)
975
      else:
976
        if stat.S_ISREG(st.st_mode):
977
          if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
978
            tmp.append(None)
979
          else:
980
            tmp.append("out of band helper %s is not executable" % path)
981
        else:
982
          tmp.append("out of band helper %s is not a file" % path)
983

    
984
  if constants.NV_LVLIST in what and vm_capable:
985
    try:
986
      val = GetVolumeList(utils.ListVolumeGroups().keys())
987
    except RPCFail, err:
988
      val = str(err)
989
    result[constants.NV_LVLIST] = val
990

    
991
  _VerifyInstanceList(what, vm_capable, result, all_hvparams)
992

    
993
  if constants.NV_VGLIST in what and vm_capable:
994
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
995

    
996
  if constants.NV_PVLIST in what and vm_capable:
997
    check_exclusive_pvs = constants.NV_EXCLUSIVEPVS in what
998
    val = bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
999
                                       filter_allocatable=False,
1000
                                       include_lvs=check_exclusive_pvs)
1001
    if check_exclusive_pvs:
1002
      result[constants.NV_EXCLUSIVEPVS] = _CheckExclusivePvs(val)
1003
      for pvi in val:
1004
        # Avoid sending useless data on the wire
1005
        pvi.lv_list = []
1006
    result[constants.NV_PVLIST] = map(objects.LvmPvInfo.ToDict, val)
1007

    
1008
  if constants.NV_VERSION in what:
1009
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
1010
                                    constants.RELEASE_VERSION)
1011

    
1012
  _VerifyNodeInfo(what, vm_capable, result, all_hvparams)
1013

    
1014
  if constants.NV_DRBDVERSION in what and vm_capable:
1015
    try:
1016
      drbd_version = DRBD8.GetProcInfo().GetVersionString()
1017
    except errors.BlockDeviceError, err:
1018
      logging.warning("Can't get DRBD version", exc_info=True)
1019
      drbd_version = str(err)
1020
    result[constants.NV_DRBDVERSION] = drbd_version
1021

    
1022
  if constants.NV_DRBDLIST in what and vm_capable:
1023
    try:
1024
      used_minors = drbd.DRBD8.GetUsedDevs()
1025
    except errors.BlockDeviceError, err:
1026
      logging.warning("Can't get used minors list", exc_info=True)
1027
      used_minors = str(err)
1028
    result[constants.NV_DRBDLIST] = used_minors
1029

    
1030
  if constants.NV_DRBDHELPER in what and vm_capable:
1031
    status = True
1032
    try:
1033
      payload = drbd.DRBD8.GetUsermodeHelper()
1034
    except errors.BlockDeviceError, err:
1035
      logging.error("Can't get DRBD usermode helper: %s", str(err))
1036
      status = False
1037
      payload = str(err)
1038
    result[constants.NV_DRBDHELPER] = (status, payload)
1039

    
1040
  if constants.NV_NODESETUP in what:
1041
    result[constants.NV_NODESETUP] = tmpr = []
1042
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
1043
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
1044
                  " under /sys, missing required directories /sys/block"
1045
                  " and /sys/class/net")
1046
    if (not os.path.isdir("/proc/sys") or
1047
        not os.path.isfile("/proc/sysrq-trigger")):
1048
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
1049
                  " under /proc, missing required directory /proc/sys and"
1050
                  " the file /proc/sysrq-trigger")
1051

    
1052
  if constants.NV_TIME in what:
1053
    result[constants.NV_TIME] = utils.SplitTime(time.time())
1054

    
1055
  if constants.NV_OSLIST in what and vm_capable:
1056
    result[constants.NV_OSLIST] = DiagnoseOS()
1057

    
1058
  if constants.NV_BRIDGES in what and vm_capable:
1059
    result[constants.NV_BRIDGES] = [bridge
1060
                                    for bridge in what[constants.NV_BRIDGES]
1061
                                    if not utils.BridgeExists(bridge)]
1062

    
1063
  if what.get(constants.NV_FILE_STORAGE_PATHS) == my_name:
1064
    result[constants.NV_FILE_STORAGE_PATHS] = \
1065
      bdev.ComputeWrongFileStoragePaths()
1066

    
1067
  return result
1068

    
1069

    
1070
def GetBlockDevSizes(devices):
1071
  """Return the size of the given block devices
1072

1073
  @type devices: list
1074
  @param devices: list of block device nodes to query
1075
  @rtype: dict
1076
  @return:
1077
    dictionary of all block devices under /dev (key). The value is their
1078
    size in MiB.
1079

1080
    {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
1081

1082
  """
1083
  DEV_PREFIX = "/dev/"
1084
  blockdevs = {}
1085

    
1086
  for devpath in devices:
1087
    if not utils.IsBelowDir(DEV_PREFIX, devpath):
1088
      continue
1089

    
1090
    try:
1091
      st = os.stat(devpath)
1092
    except EnvironmentError, err:
1093
      logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
1094
      continue
1095

    
1096
    if stat.S_ISBLK(st.st_mode):
1097
      result = utils.RunCmd(["blockdev", "--getsize64", devpath])
1098
      if result.failed:
1099
        # We don't want to fail, just do not list this device as available
1100
        logging.warning("Cannot get size for block device %s", devpath)
1101
        continue
1102

    
1103
      size = int(result.stdout) / (1024 * 1024)
1104
      blockdevs[devpath] = size
1105
  return blockdevs
1106

    
1107

    
1108
def GetVolumeList(vg_names):
1109
  """Compute list of logical volumes and their size.
1110

1111
  @type vg_names: list
1112
  @param vg_names: the volume groups whose LVs we should list, or
1113
      empty for all volume groups
1114
  @rtype: dict
1115
  @return:
1116
      dictionary of all partions (key) with value being a tuple of
1117
      their size (in MiB), inactive and online status::
1118

1119
        {'xenvg/test1': ('20.06', True, True)}
1120

1121
      in case of errors, a string is returned with the error
1122
      details.
1123

1124
  """
1125
  lvs = {}
1126
  sep = "|"
1127
  if not vg_names:
1128
    vg_names = []
1129
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
1130
                         "--separator=%s" % sep,
1131
                         "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
1132
  if result.failed:
1133
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
1134

    
1135
  for line in result.stdout.splitlines():
1136
    line = line.strip()
1137
    match = _LVSLINE_REGEX.match(line)
1138
    if not match:
1139
      logging.error("Invalid line returned from lvs output: '%s'", line)
1140
      continue
1141
    vg_name, name, size, attr = match.groups()
1142
    inactive = attr[4] == "-"
1143
    online = attr[5] == "o"
1144
    virtual = attr[0] == "v"
1145
    if virtual:
1146
      # we don't want to report such volumes as existing, since they
1147
      # don't really hold data
1148
      continue
1149
    lvs[vg_name + "/" + name] = (size, inactive, online)
1150

    
1151
  return lvs
1152

    
1153

    
1154
def ListVolumeGroups():
1155
  """List the volume groups and their size.
1156

1157
  @rtype: dict
1158
  @return: dictionary with keys volume name and values the
1159
      size of the volume
1160

1161
  """
1162
  return utils.ListVolumeGroups()
1163

    
1164

    
1165
def NodeVolumes():
1166
  """List all volumes on this node.
1167

1168
  @rtype: list
1169
  @return:
1170
    A list of dictionaries, each having four keys:
1171
      - name: the logical volume name,
1172
      - size: the size of the logical volume
1173
      - dev: the physical device on which the LV lives
1174
      - vg: the volume group to which it belongs
1175

1176
    In case of errors, we return an empty list and log the
1177
    error.
1178

1179
    Note that since a logical volume can live on multiple physical
1180
    volumes, the resulting list might include a logical volume
1181
    multiple times.
1182

1183
  """
1184
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
1185
                         "--separator=|",
1186
                         "--options=lv_name,lv_size,devices,vg_name"])
1187
  if result.failed:
1188
    _Fail("Failed to list logical volumes, lvs output: %s",
1189
          result.output)
1190

    
1191
  def parse_dev(dev):
1192
    return dev.split("(")[0]
1193

    
1194
  def handle_dev(dev):
1195
    return [parse_dev(x) for x in dev.split(",")]
1196

    
1197
  def map_line(line):
1198
    line = [v.strip() for v in line]
1199
    return [{"name": line[0], "size": line[1],
1200
             "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
1201

    
1202
  all_devs = []
1203
  for line in result.stdout.splitlines():
1204
    if line.count("|") >= 3:
1205
      all_devs.extend(map_line(line.split("|")))
1206
    else:
1207
      logging.warning("Strange line in the output from lvs: '%s'", line)
1208
  return all_devs
1209

    
1210

    
1211
def BridgesExist(bridges_list):
1212
  """Check if a list of bridges exist on the current node.
1213

1214
  @rtype: boolean
1215
  @return: C{True} if all of them exist, C{False} otherwise
1216

1217
  """
1218
  missing = []
1219
  for bridge in bridges_list:
1220
    if not utils.BridgeExists(bridge):
1221
      missing.append(bridge)
1222

    
1223
  if missing:
1224
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
1225

    
1226

    
1227
def GetInstanceListForHypervisor(hname, hvparams=None,
1228
                                 get_hv_fn=hypervisor.GetHypervisor):
1229
  """Provides a list of instances of the given hypervisor.
1230

1231
  @type hname: string
1232
  @param hname: name of the hypervisor
1233
  @type hvparams: dict of strings
1234
  @param hvparams: hypervisor parameters for the given hypervisor
1235
  @type get_hv_fn: function
1236
  @param get_hv_fn: function that returns a hypervisor for the given hypervisor
1237
    name; optional parameter to increase testability
1238

1239
  @rtype: list
1240
  @return: a list of all running instances on the current node
1241
    - instance1.example.com
1242
    - instance2.example.com
1243

1244
  """
1245
  results = []
1246
  try:
1247
    hv = get_hv_fn(hname)
1248
    names = hv.ListInstances(hvparams=hvparams)
1249
    results.extend(names)
1250
  except errors.HypervisorError, err:
1251
    _Fail("Error enumerating instances (hypervisor %s): %s",
1252
          hname, err, exc=True)
1253
  return results
1254

    
1255

    
1256
def GetInstanceList(hypervisor_list, all_hvparams=None,
1257
                    get_hv_fn=hypervisor.GetHypervisor):
1258
  """Provides a list of instances.
1259

1260
  @type hypervisor_list: list
1261
  @param hypervisor_list: the list of hypervisors to query information
1262
  @type all_hvparams: dict of dict of strings
1263
  @param all_hvparams: a dictionary mapping hypervisor types to respective
1264
    cluster-wide hypervisor parameters
1265
  @type get_hv_fn: function
1266
  @param get_hv_fn: function that returns a hypervisor for the given hypervisor
1267
    name; optional parameter to increase testability
1268

1269
  @rtype: list
1270
  @return: a list of all running instances on the current node
1271
    - instance1.example.com
1272
    - instance2.example.com
1273

1274
  """
1275
  results = []
1276
  for hname in hypervisor_list:
1277
    hvparams = all_hvparams[hname]
1278
    results.extend(GetInstanceListForHypervisor(hname, hvparams=hvparams,
1279
                                                get_hv_fn=get_hv_fn))
1280
  return results
1281

    
1282

    
1283
def GetInstanceInfo(instance, hname):
1284
  """Gives back the information about an instance as a dictionary.
1285

1286
  @type instance: string
1287
  @param instance: the instance name
1288
  @type hname: string
1289
  @param hname: the hypervisor type of the instance
1290

1291
  @rtype: dict
1292
  @return: dictionary with the following keys:
1293
      - memory: memory size of instance (int)
1294
      - state: xen state of instance (string)
1295
      - time: cpu time of instance (float)
1296
      - vcpus: the number of vcpus (int)
1297

1298
  """
1299
  output = {}
1300

    
1301
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
1302
  if iinfo is not None:
1303
    output["memory"] = iinfo[2]
1304
    output["vcpus"] = iinfo[3]
1305
    output["state"] = iinfo[4]
1306
    output["time"] = iinfo[5]
1307

    
1308
  return output
1309

    
1310

    
1311
def GetInstanceMigratable(instance):
1312
  """Computes whether an instance can be migrated.
1313

1314
  @type instance: L{objects.Instance}
1315
  @param instance: object representing the instance to be checked.
1316

1317
  @rtype: tuple
1318
  @return: tuple of (result, description) where:
1319
      - result: whether the instance can be migrated or not
1320
      - description: a description of the issue, if relevant
1321

1322
  """
1323
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1324
  iname = instance.name
1325
  if iname not in hyper.ListInstances(instance.hvparams):
1326
    _Fail("Instance %s is not running", iname)
1327

    
1328
  for idx in range(len(instance.disks)):
1329
    link_name = _GetBlockDevSymlinkPath(iname, idx)
1330
    if not os.path.islink(link_name):
1331
      logging.warning("Instance %s is missing symlink %s for disk %d",
1332
                      iname, link_name, idx)
1333

    
1334

    
1335
def GetAllInstancesInfo(hypervisor_list):
1336
  """Gather data about all instances.
1337

1338
  This is the equivalent of L{GetInstanceInfo}, except that it
1339
  computes data for all instances at once, thus being faster if one
1340
  needs data about more than one instance.
1341

1342
  @type hypervisor_list: list
1343
  @param hypervisor_list: list of hypervisors to query for instance data
1344

1345
  @rtype: dict
1346
  @return: dictionary of instance: data, with data having the following keys:
1347
      - memory: memory size of instance (int)
1348
      - state: xen state of instance (string)
1349
      - time: cpu time of instance (float)
1350
      - vcpus: the number of vcpus
1351

1352
  """
1353
  output = {}
1354

    
1355
  for hname in hypervisor_list:
1356
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
1357
    if iinfo:
1358
      for name, _, memory, vcpus, state, times in iinfo:
1359
        value = {
1360
          "memory": memory,
1361
          "vcpus": vcpus,
1362
          "state": state,
1363
          "time": times,
1364
          }
1365
        if name in output:
1366
          # we only check static parameters, like memory and vcpus,
1367
          # and not state and time which can change between the
1368
          # invocations of the different hypervisors
1369
          for key in "memory", "vcpus":
1370
            if value[key] != output[name][key]:
1371
              _Fail("Instance %s is running twice"
1372
                    " with different parameters", name)
1373
        output[name] = value
1374

    
1375
  return output
1376

    
1377

    
1378
def _InstanceLogName(kind, os_name, instance, component):
1379
  """Compute the OS log filename for a given instance and operation.
1380

1381
  The instance name and os name are passed in as strings since not all
1382
  operations have these as part of an instance object.
1383

1384
  @type kind: string
1385
  @param kind: the operation type (e.g. add, import, etc.)
1386
  @type os_name: string
1387
  @param os_name: the os name
1388
  @type instance: string
1389
  @param instance: the name of the instance being imported/added/etc.
1390
  @type component: string or None
1391
  @param component: the name of the component of the instance being
1392
      transferred
1393

1394
  """
1395
  # TODO: Use tempfile.mkstemp to create unique filename
1396
  if component:
1397
    assert "/" not in component
1398
    c_msg = "-%s" % component
1399
  else:
1400
    c_msg = ""
1401
  base = ("%s-%s-%s%s-%s.log" %
1402
          (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1403
  return utils.PathJoin(pathutils.LOG_OS_DIR, base)
1404

    
1405

    
1406
def InstanceOsAdd(instance, reinstall, debug):
1407
  """Add an OS to an instance.
1408

1409
  @type instance: L{objects.Instance}
1410
  @param instance: Instance whose OS is to be installed
1411
  @type reinstall: boolean
1412
  @param reinstall: whether this is an instance reinstall
1413
  @type debug: integer
1414
  @param debug: debug level, passed to the OS scripts
1415
  @rtype: None
1416

1417
  """
1418
  inst_os = OSFromDisk(instance.os)
1419

    
1420
  create_env = OSEnvironment(instance, inst_os, debug)
1421
  if reinstall:
1422
    create_env["INSTANCE_REINSTALL"] = "1"
1423

    
1424
  logfile = _InstanceLogName("add", instance.os, instance.name, None)
1425

    
1426
  result = utils.RunCmd([inst_os.create_script], env=create_env,
1427
                        cwd=inst_os.path, output=logfile, reset_env=True)
1428
  if result.failed:
1429
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
1430
                  " output: %s", result.cmd, result.fail_reason, logfile,
1431
                  result.output)
1432
    lines = [utils.SafeEncode(val)
1433
             for val in utils.TailFile(logfile, lines=20)]
1434
    _Fail("OS create script failed (%s), last lines in the"
1435
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1436

    
1437

    
1438
def RunRenameInstance(instance, old_name, debug):
1439
  """Run the OS rename script for an instance.
1440

1441
  @type instance: L{objects.Instance}
1442
  @param instance: Instance whose OS is to be installed
1443
  @type old_name: string
1444
  @param old_name: previous instance name
1445
  @type debug: integer
1446
  @param debug: debug level, passed to the OS scripts
1447
  @rtype: boolean
1448
  @return: the success of the operation
1449

1450
  """
1451
  inst_os = OSFromDisk(instance.os)
1452

    
1453
  rename_env = OSEnvironment(instance, inst_os, debug)
1454
  rename_env["OLD_INSTANCE_NAME"] = old_name
1455

    
1456
  logfile = _InstanceLogName("rename", instance.os,
1457
                             "%s-%s" % (old_name, instance.name), None)
1458

    
1459
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1460
                        cwd=inst_os.path, output=logfile, reset_env=True)
1461

    
1462
  if result.failed:
1463
    logging.error("os create command '%s' returned error: %s output: %s",
1464
                  result.cmd, result.fail_reason, result.output)
1465
    lines = [utils.SafeEncode(val)
1466
             for val in utils.TailFile(logfile, lines=20)]
1467
    _Fail("OS rename script failed (%s), last lines in the"
1468
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1469

    
1470

    
1471
def _GetBlockDevSymlinkPath(instance_name, idx, _dir=None):
1472
  """Returns symlink path for block device.
1473

1474
  """
1475
  if _dir is None:
1476
    _dir = pathutils.DISK_LINKS_DIR
1477

    
1478
  return utils.PathJoin(_dir,
1479
                        ("%s%s%s" %
1480
                         (instance_name, constants.DISK_SEPARATOR, idx)))
1481

    
1482

    
1483
def _SymlinkBlockDev(instance_name, device_path, idx):
1484
  """Set up symlinks to a instance's block device.
1485

1486
  This is an auxiliary function run when an instance is start (on the primary
1487
  node) or when an instance is migrated (on the target node).
1488

1489

1490
  @param instance_name: the name of the target instance
1491
  @param device_path: path of the physical block device, on the node
1492
  @param idx: the disk index
1493
  @return: absolute path to the disk's symlink
1494

1495
  """
1496
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1497
  try:
1498
    os.symlink(device_path, link_name)
1499
  except OSError, err:
1500
    if err.errno == errno.EEXIST:
1501
      if (not os.path.islink(link_name) or
1502
          os.readlink(link_name) != device_path):
1503
        os.remove(link_name)
1504
        os.symlink(device_path, link_name)
1505
    else:
1506
      raise
1507

    
1508
  return link_name
1509

    
1510

    
1511
def _RemoveBlockDevLinks(instance_name, disks):
1512
  """Remove the block device symlinks belonging to the given instance.
1513

1514
  """
1515
  for idx, _ in enumerate(disks):
1516
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1517
    if os.path.islink(link_name):
1518
      try:
1519
        os.remove(link_name)
1520
      except OSError:
1521
        logging.exception("Can't remove symlink '%s'", link_name)
1522

    
1523

    
1524
def _GatherAndLinkBlockDevs(instance):
1525
  """Set up an instance's block device(s).
1526

1527
  This is run on the primary node at instance startup. The block
1528
  devices must be already assembled.
1529

1530
  @type instance: L{objects.Instance}
1531
  @param instance: the instance whose disks we shoul assemble
1532
  @rtype: list
1533
  @return: list of (disk_object, device_path)
1534

1535
  """
1536
  block_devices = []
1537
  for idx, disk in enumerate(instance.disks):
1538
    device = _RecursiveFindBD(disk)
1539
    if device is None:
1540
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1541
                                    str(disk))
1542
    device.Open()
1543
    try:
1544
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1545
    except OSError, e:
1546
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1547
                                    e.strerror)
1548

    
1549
    block_devices.append((disk, link_name))
1550

    
1551
  return block_devices
1552

    
1553

    
1554
def StartInstance(instance, startup_paused, reason, store_reason=True):
1555
  """Start an instance.
1556

1557
  @type instance: L{objects.Instance}
1558
  @param instance: the instance object
1559
  @type startup_paused: bool
1560
  @param instance: pause instance at startup?
1561
  @type reason: list of reasons
1562
  @param reason: the reason trail for this startup
1563
  @type store_reason: boolean
1564
  @param store_reason: whether to store the shutdown reason trail on file
1565
  @rtype: None
1566

1567
  """
1568
  running_instances = GetInstanceListForHypervisor(instance.hypervisor,
1569
                                                   instance.hvparams)
1570

    
1571
  if instance.name in running_instances:
1572
    logging.info("Instance %s already running, not starting", instance.name)
1573
    return
1574

    
1575
  try:
1576
    block_devices = _GatherAndLinkBlockDevs(instance)
1577
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1578
    hyper.StartInstance(instance, block_devices, startup_paused)
1579
    if store_reason:
1580
      _StoreInstReasonTrail(instance.name, reason)
1581
  except errors.BlockDeviceError, err:
1582
    _Fail("Block device error: %s", err, exc=True)
1583
  except errors.HypervisorError, err:
1584
    _RemoveBlockDevLinks(instance.name, instance.disks)
1585
    _Fail("Hypervisor error: %s", err, exc=True)
1586

    
1587

    
1588
def InstanceShutdown(instance, timeout, reason, store_reason=True):
1589
  """Shut an instance down.
1590

1591
  @note: this functions uses polling with a hardcoded timeout.
1592

1593
  @type instance: L{objects.Instance}
1594
  @param instance: the instance object
1595
  @type timeout: integer
1596
  @param timeout: maximum timeout for soft shutdown
1597
  @type reason: list of reasons
1598
  @param reason: the reason trail for this shutdown
1599
  @type store_reason: boolean
1600
  @param store_reason: whether to store the shutdown reason trail on file
1601
  @rtype: None
1602

1603
  """
1604
  hv_name = instance.hypervisor
1605
  hyper = hypervisor.GetHypervisor(hv_name)
1606
  iname = instance.name
1607

    
1608
  if instance.name not in hyper.ListInstances(instance.hvparams):
1609
    logging.info("Instance %s not running, doing nothing", iname)
1610
    return
1611

    
1612
  class _TryShutdown:
1613
    def __init__(self):
1614
      self.tried_once = False
1615

    
1616
    def __call__(self):
1617
      if iname not in hyper.ListInstances(instance.hvparams):
1618
        return
1619

    
1620
      try:
1621
        hyper.StopInstance(instance, retry=self.tried_once)
1622
        if store_reason:
1623
          _StoreInstReasonTrail(instance.name, reason)
1624
      except errors.HypervisorError, err:
1625
        if iname not in hyper.ListInstances(instance.hvparams):
1626
          # if the instance is no longer existing, consider this a
1627
          # success and go to cleanup
1628
          return
1629

    
1630
        _Fail("Failed to stop instance %s: %s", iname, err)
1631

    
1632
      self.tried_once = True
1633

    
1634
      raise utils.RetryAgain()
1635

    
1636
  try:
1637
    utils.Retry(_TryShutdown(), 5, timeout)
1638
  except utils.RetryTimeout:
1639
    # the shutdown did not succeed
1640
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1641

    
1642
    try:
1643
      hyper.StopInstance(instance, force=True)
1644
    except errors.HypervisorError, err:
1645
      if iname in hyper.ListInstances(instance.hvparams):
1646
        # only raise an error if the instance still exists, otherwise
1647
        # the error could simply be "instance ... unknown"!
1648
        _Fail("Failed to force stop instance %s: %s", iname, err)
1649

    
1650
    time.sleep(1)
1651

    
1652
    if iname in hyper.ListInstances(instance.hvparams):
1653
      _Fail("Could not shutdown instance %s even by destroy", iname)
1654

    
1655
  try:
1656
    hyper.CleanupInstance(instance.name)
1657
  except errors.HypervisorError, err:
1658
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1659

    
1660
  _RemoveBlockDevLinks(iname, instance.disks)
1661

    
1662

    
1663
def InstanceReboot(instance, reboot_type, shutdown_timeout, reason):
1664
  """Reboot an instance.
1665

1666
  @type instance: L{objects.Instance}
1667
  @param instance: the instance object to reboot
1668
  @type reboot_type: str
1669
  @param reboot_type: the type of reboot, one the following
1670
    constants:
1671
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1672
        instance OS, do not recreate the VM
1673
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1674
        restart the VM (at the hypervisor level)
1675
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1676
        not accepted here, since that mode is handled differently, in
1677
        cmdlib, and translates into full stop and start of the
1678
        instance (instead of a call_instance_reboot RPC)
1679
  @type shutdown_timeout: integer
1680
  @param shutdown_timeout: maximum timeout for soft shutdown
1681
  @type reason: list of reasons
1682
  @param reason: the reason trail for this reboot
1683
  @rtype: None
1684

1685
  """
1686
  running_instances = GetInstanceListForHypervisor(instance.hypervisor,
1687
                                                   instance.hvparams)
1688

    
1689
  if instance.name not in running_instances:
1690
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1691

    
1692
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1693
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1694
    try:
1695
      hyper.RebootInstance(instance)
1696
    except errors.HypervisorError, err:
1697
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1698
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1699
    try:
1700
      InstanceShutdown(instance, shutdown_timeout, reason, store_reason=False)
1701
      result = StartInstance(instance, False, reason, store_reason=False)
1702
      _StoreInstReasonTrail(instance.name, reason)
1703
      return result
1704
    except errors.HypervisorError, err:
1705
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1706
  else:
1707
    _Fail("Invalid reboot_type received: %s", reboot_type)
1708

    
1709

    
1710
def InstanceBalloonMemory(instance, memory):
1711
  """Resize an instance's memory.
1712

1713
  @type instance: L{objects.Instance}
1714
  @param instance: the instance object
1715
  @type memory: int
1716
  @param memory: new memory amount in MB
1717
  @rtype: None
1718

1719
  """
1720
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1721
  running = hyper.ListInstances(instance.hvparams)
1722
  if instance.name not in running:
1723
    logging.info("Instance %s is not running, cannot balloon", instance.name)
1724
    return
1725
  try:
1726
    hyper.BalloonInstanceMemory(instance, memory)
1727
  except errors.HypervisorError, err:
1728
    _Fail("Failed to balloon instance memory: %s", err, exc=True)
1729

    
1730

    
1731
def MigrationInfo(instance):
1732
  """Gather information about an instance to be migrated.
1733

1734
  @type instance: L{objects.Instance}
1735
  @param instance: the instance definition
1736

1737
  """
1738
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1739
  try:
1740
    info = hyper.MigrationInfo(instance)
1741
  except errors.HypervisorError, err:
1742
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1743
  return info
1744

    
1745

    
1746
def AcceptInstance(instance, info, target):
1747
  """Prepare the node to accept an instance.
1748

1749
  @type instance: L{objects.Instance}
1750
  @param instance: the instance definition
1751
  @type info: string/data (opaque)
1752
  @param info: migration information, from the source node
1753
  @type target: string
1754
  @param target: target host (usually ip), on this node
1755

1756
  """
1757
  # TODO: why is this required only for DTS_EXT_MIRROR?
1758
  if instance.disk_template in constants.DTS_EXT_MIRROR:
1759
    # Create the symlinks, as the disks are not active
1760
    # in any way
1761
    try:
1762
      _GatherAndLinkBlockDevs(instance)
1763
    except errors.BlockDeviceError, err:
1764
      _Fail("Block device error: %s", err, exc=True)
1765

    
1766
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1767
  try:
1768
    hyper.AcceptInstance(instance, info, target)
1769
  except errors.HypervisorError, err:
1770
    if instance.disk_template in constants.DTS_EXT_MIRROR:
1771
      _RemoveBlockDevLinks(instance.name, instance.disks)
1772
    _Fail("Failed to accept instance: %s", err, exc=True)
1773

    
1774

    
1775
def FinalizeMigrationDst(instance, info, success):
1776
  """Finalize any preparation to accept an instance.
1777

1778
  @type instance: L{objects.Instance}
1779
  @param instance: the instance definition
1780
  @type info: string/data (opaque)
1781
  @param info: migration information, from the source node
1782
  @type success: boolean
1783
  @param success: whether the migration was a success or a failure
1784

1785
  """
1786
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1787
  try:
1788
    hyper.FinalizeMigrationDst(instance, info, success)
1789
  except errors.HypervisorError, err:
1790
    _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1791

    
1792

    
1793
def MigrateInstance(instance, target, live):
1794
  """Migrates an instance to another node.
1795

1796
  @type instance: L{objects.Instance}
1797
  @param instance: the instance definition
1798
  @type target: string
1799
  @param target: the target node name
1800
  @type live: boolean
1801
  @param live: whether the migration should be done live or not (the
1802
      interpretation of this parameter is left to the hypervisor)
1803
  @raise RPCFail: if migration fails for some reason
1804

1805
  """
1806
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1807

    
1808
  try:
1809
    hyper.MigrateInstance(instance, target, live)
1810
  except errors.HypervisorError, err:
1811
    _Fail("Failed to migrate instance: %s", err, exc=True)
1812

    
1813

    
1814
def FinalizeMigrationSource(instance, success, live):
1815
  """Finalize the instance migration on the source node.
1816

1817
  @type instance: L{objects.Instance}
1818
  @param instance: the instance definition of the migrated instance
1819
  @type success: bool
1820
  @param success: whether the migration succeeded or not
1821
  @type live: bool
1822
  @param live: whether the user requested a live migration or not
1823
  @raise RPCFail: If the execution fails for some reason
1824

1825
  """
1826
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1827

    
1828
  try:
1829
    hyper.FinalizeMigrationSource(instance, success, live)
1830
  except Exception, err:  # pylint: disable=W0703
1831
    _Fail("Failed to finalize the migration on the source node: %s", err,
1832
          exc=True)
1833

    
1834

    
1835
def GetMigrationStatus(instance):
1836
  """Get the migration status
1837

1838
  @type instance: L{objects.Instance}
1839
  @param instance: the instance that is being migrated
1840
  @rtype: L{objects.MigrationStatus}
1841
  @return: the status of the current migration (one of
1842
           L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
1843
           progress info that can be retrieved from the hypervisor
1844
  @raise RPCFail: If the migration status cannot be retrieved
1845

1846
  """
1847
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1848
  try:
1849
    return hyper.GetMigrationStatus(instance)
1850
  except Exception, err:  # pylint: disable=W0703
1851
    _Fail("Failed to get migration status: %s", err, exc=True)
1852

    
1853

    
1854
def BlockdevCreate(disk, size, owner, on_primary, info, excl_stor):
1855
  """Creates a block device for an instance.
1856

1857
  @type disk: L{objects.Disk}
1858
  @param disk: the object describing the disk we should create
1859
  @type size: int
1860
  @param size: the size of the physical underlying device, in MiB
1861
  @type owner: str
1862
  @param owner: the name of the instance for which disk is created,
1863
      used for device cache data
1864
  @type on_primary: boolean
1865
  @param on_primary:  indicates if it is the primary node or not
1866
  @type info: string
1867
  @param info: string that will be sent to the physical device
1868
      creation, used for example to set (LVM) tags on LVs
1869
  @type excl_stor: boolean
1870
  @param excl_stor: Whether exclusive_storage is active
1871

1872
  @return: the new unique_id of the device (this can sometime be
1873
      computed only after creation), or None. On secondary nodes,
1874
      it's not required to return anything.
1875

1876
  """
1877
  # TODO: remove the obsolete "size" argument
1878
  # pylint: disable=W0613
1879
  clist = []
1880
  if disk.children:
1881
    for child in disk.children:
1882
      try:
1883
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1884
      except errors.BlockDeviceError, err:
1885
        _Fail("Can't assemble device %s: %s", child, err)
1886
      if on_primary or disk.AssembleOnSecondary():
1887
        # we need the children open in case the device itself has to
1888
        # be assembled
1889
        try:
1890
          # pylint: disable=E1103
1891
          crdev.Open()
1892
        except errors.BlockDeviceError, err:
1893
          _Fail("Can't make child '%s' read-write: %s", child, err)
1894
      clist.append(crdev)
1895

    
1896
  try:
1897
    device = bdev.Create(disk, clist, excl_stor)
1898
  except errors.BlockDeviceError, err:
1899
    _Fail("Can't create block device: %s", err)
1900

    
1901
  if on_primary or disk.AssembleOnSecondary():
1902
    try:
1903
      device.Assemble()
1904
    except errors.BlockDeviceError, err:
1905
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1906
    if on_primary or disk.OpenOnSecondary():
1907
      try:
1908
        device.Open(force=True)
1909
      except errors.BlockDeviceError, err:
1910
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1911
    DevCacheManager.UpdateCache(device.dev_path, owner,
1912
                                on_primary, disk.iv_name)
1913

    
1914
  device.SetInfo(info)
1915

    
1916
  return device.unique_id
1917

    
1918

    
1919
def _WipeDevice(path, offset, size):
1920
  """This function actually wipes the device.
1921

1922
  @param path: The path to the device to wipe
1923
  @param offset: The offset in MiB in the file
1924
  @param size: The size in MiB to write
1925

1926
  """
1927
  # Internal sizes are always in Mebibytes; if the following "dd" command
1928
  # should use a different block size the offset and size given to this
1929
  # function must be adjusted accordingly before being passed to "dd".
1930
  block_size = 1024 * 1024
1931

    
1932
  cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1933
         "bs=%s" % block_size, "oflag=direct", "of=%s" % path,
1934
         "count=%d" % size]
1935
  result = utils.RunCmd(cmd)
1936

    
1937
  if result.failed:
1938
    _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1939
          result.fail_reason, result.output)
1940

    
1941

    
1942
def BlockdevWipe(disk, offset, size):
1943
  """Wipes a block device.
1944

1945
  @type disk: L{objects.Disk}
1946
  @param disk: the disk object we want to wipe
1947
  @type offset: int
1948
  @param offset: The offset in MiB in the file
1949
  @type size: int
1950
  @param size: The size in MiB to write
1951

1952
  """
1953
  try:
1954
    rdev = _RecursiveFindBD(disk)
1955
  except errors.BlockDeviceError:
1956
    rdev = None
1957

    
1958
  if not rdev:
1959
    _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1960

    
1961
  # Do cross verify some of the parameters
1962
  if offset < 0:
1963
    _Fail("Negative offset")
1964
  if size < 0:
1965
    _Fail("Negative size")
1966
  if offset > rdev.size:
1967
    _Fail("Offset is bigger than device size")
1968
  if (offset + size) > rdev.size:
1969
    _Fail("The provided offset and size to wipe is bigger than device size")
1970

    
1971
  _WipeDevice(rdev.dev_path, offset, size)
1972

    
1973

    
1974
def BlockdevPauseResumeSync(disks, pause):
1975
  """Pause or resume the sync of the block device.
1976

1977
  @type disks: list of L{objects.Disk}
1978
  @param disks: the disks object we want to pause/resume
1979
  @type pause: bool
1980
  @param pause: Wheater to pause or resume
1981

1982
  """
1983
  success = []
1984
  for disk in disks:
1985
    try:
1986
      rdev = _RecursiveFindBD(disk)
1987
    except errors.BlockDeviceError:
1988
      rdev = None
1989

    
1990
    if not rdev:
1991
      success.append((False, ("Cannot change sync for device %s:"
1992
                              " device not found" % disk.iv_name)))
1993
      continue
1994

    
1995
    result = rdev.PauseResumeSync(pause)
1996

    
1997
    if result:
1998
      success.append((result, None))
1999
    else:
2000
      if pause:
2001
        msg = "Pause"
2002
      else:
2003
        msg = "Resume"
2004
      success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
2005

    
2006
  return success
2007

    
2008

    
2009
def BlockdevRemove(disk):
2010
  """Remove a block device.
2011

2012
  @note: This is intended to be called recursively.
2013

2014
  @type disk: L{objects.Disk}
2015
  @param disk: the disk object we should remove
2016
  @rtype: boolean
2017
  @return: the success of the operation
2018

2019
  """
2020
  msgs = []
2021
  try:
2022
    rdev = _RecursiveFindBD(disk)
2023
  except errors.BlockDeviceError, err:
2024
    # probably can't attach
2025
    logging.info("Can't attach to device %s in remove", disk)
2026
    rdev = None
2027
  if rdev is not None:
2028
    r_path = rdev.dev_path
2029
    try:
2030
      rdev.Remove()
2031
    except errors.BlockDeviceError, err:
2032
      msgs.append(str(err))
2033
    if not msgs:
2034
      DevCacheManager.RemoveCache(r_path)
2035

    
2036
  if disk.children:
2037
    for child in disk.children:
2038
      try:
2039
        BlockdevRemove(child)
2040
      except RPCFail, err:
2041
        msgs.append(str(err))
2042

    
2043
  if msgs:
2044
    _Fail("; ".join(msgs))
2045

    
2046

    
2047
def _RecursiveAssembleBD(disk, owner, as_primary):
2048
  """Activate a block device for an instance.
2049

2050
  This is run on the primary and secondary nodes for an instance.
2051

2052
  @note: this function is called recursively.
2053

2054
  @type disk: L{objects.Disk}
2055
  @param disk: the disk we try to assemble
2056
  @type owner: str
2057
  @param owner: the name of the instance which owns the disk
2058
  @type as_primary: boolean
2059
  @param as_primary: if we should make the block device
2060
      read/write
2061

2062
  @return: the assembled device or None (in case no device
2063
      was assembled)
2064
  @raise errors.BlockDeviceError: in case there is an error
2065
      during the activation of the children or the device
2066
      itself
2067

2068
  """
2069
  children = []
2070
  if disk.children:
2071
    mcn = disk.ChildrenNeeded()
2072
    if mcn == -1:
2073
      mcn = 0 # max number of Nones allowed
2074
    else:
2075
      mcn = len(disk.children) - mcn # max number of Nones
2076
    for chld_disk in disk.children:
2077
      try:
2078
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
2079
      except errors.BlockDeviceError, err:
2080
        if children.count(None) >= mcn:
2081
          raise
2082
        cdev = None
2083
        logging.error("Error in child activation (but continuing): %s",
2084
                      str(err))
2085
      children.append(cdev)
2086

    
2087
  if as_primary or disk.AssembleOnSecondary():
2088
    r_dev = bdev.Assemble(disk, children)
2089
    result = r_dev
2090
    if as_primary or disk.OpenOnSecondary():
2091
      r_dev.Open()
2092
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
2093
                                as_primary, disk.iv_name)
2094

    
2095
  else:
2096
    result = True
2097
  return result
2098

    
2099

    
2100
def BlockdevAssemble(disk, owner, as_primary, idx):
2101
  """Activate a block device for an instance.
2102

2103
  This is a wrapper over _RecursiveAssembleBD.
2104

2105
  @rtype: str or boolean
2106
  @return: a C{/dev/...} path for primary nodes, and
2107
      C{True} for secondary nodes
2108

2109
  """
2110
  try:
2111
    result = _RecursiveAssembleBD(disk, owner, as_primary)
2112
    if isinstance(result, BlockDev):
2113
      # pylint: disable=E1103
2114
      result = result.dev_path
2115
      if as_primary:
2116
        _SymlinkBlockDev(owner, result, idx)
2117
  except errors.BlockDeviceError, err:
2118
    _Fail("Error while assembling disk: %s", err, exc=True)
2119
  except OSError, err:
2120
    _Fail("Error while symlinking disk: %s", err, exc=True)
2121

    
2122
  return result
2123

    
2124

    
2125
def BlockdevShutdown(disk):
2126
  """Shut down a block device.
2127

2128
  First, if the device is assembled (Attach() is successful), then
2129
  the device is shutdown. Then the children of the device are
2130
  shutdown.
2131

2132
  This function is called recursively. Note that we don't cache the
2133
  children or such, as oppossed to assemble, shutdown of different
2134
  devices doesn't require that the upper device was active.
2135

2136
  @type disk: L{objects.Disk}
2137
  @param disk: the description of the disk we should
2138
      shutdown
2139
  @rtype: None
2140

2141
  """
2142
  msgs = []
2143
  r_dev = _RecursiveFindBD(disk)
2144
  if r_dev is not None:
2145
    r_path = r_dev.dev_path
2146
    try:
2147
      r_dev.Shutdown()
2148
      DevCacheManager.RemoveCache(r_path)
2149
    except errors.BlockDeviceError, err:
2150
      msgs.append(str(err))
2151

    
2152
  if disk.children:
2153
    for child in disk.children:
2154
      try:
2155
        BlockdevShutdown(child)
2156
      except RPCFail, err:
2157
        msgs.append(str(err))
2158

    
2159
  if msgs:
2160
    _Fail("; ".join(msgs))
2161

    
2162

    
2163
def BlockdevAddchildren(parent_cdev, new_cdevs):
2164
  """Extend a mirrored block device.
2165

2166
  @type parent_cdev: L{objects.Disk}
2167
  @param parent_cdev: the disk to which we should add children
2168
  @type new_cdevs: list of L{objects.Disk}
2169
  @param new_cdevs: the list of children which we should add
2170
  @rtype: None
2171

2172
  """
2173
  parent_bdev = _RecursiveFindBD(parent_cdev)
2174
  if parent_bdev is None:
2175
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
2176
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
2177
  if new_bdevs.count(None) > 0:
2178
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
2179
  parent_bdev.AddChildren(new_bdevs)
2180

    
2181

    
2182
def BlockdevRemovechildren(parent_cdev, new_cdevs):
2183
  """Shrink a mirrored block device.
2184

2185
  @type parent_cdev: L{objects.Disk}
2186
  @param parent_cdev: the disk from which we should remove children
2187
  @type new_cdevs: list of L{objects.Disk}
2188
  @param new_cdevs: the list of children which we should remove
2189
  @rtype: None
2190

2191
  """
2192
  parent_bdev = _RecursiveFindBD(parent_cdev)
2193
  if parent_bdev is None:
2194
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
2195
  devs = []
2196
  for disk in new_cdevs:
2197
    rpath = disk.StaticDevPath()
2198
    if rpath is None:
2199
      bd = _RecursiveFindBD(disk)
2200
      if bd is None:
2201
        _Fail("Can't find device %s while removing children", disk)
2202
      else:
2203
        devs.append(bd.dev_path)
2204
    else:
2205
      if not utils.IsNormAbsPath(rpath):
2206
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
2207
      devs.append(rpath)
2208
  parent_bdev.RemoveChildren(devs)
2209

    
2210

    
2211
def BlockdevGetmirrorstatus(disks):
2212
  """Get the mirroring status of a list of devices.
2213

2214
  @type disks: list of L{objects.Disk}
2215
  @param disks: the list of disks which we should query
2216
  @rtype: disk
2217
  @return: List of L{objects.BlockDevStatus}, one for each disk
2218
  @raise errors.BlockDeviceError: if any of the disks cannot be
2219
      found
2220

2221
  """
2222
  stats = []
2223
  for dsk in disks:
2224
    rbd = _RecursiveFindBD(dsk)
2225
    if rbd is None:
2226
      _Fail("Can't find device %s", dsk)
2227

    
2228
    stats.append(rbd.CombinedSyncStatus())
2229

    
2230
  return stats
2231

    
2232

    
2233
def BlockdevGetmirrorstatusMulti(disks):
2234
  """Get the mirroring status of a list of devices.
2235

2236
  @type disks: list of L{objects.Disk}
2237
  @param disks: the list of disks which we should query
2238
  @rtype: disk
2239
  @return: List of tuples, (bool, status), one for each disk; bool denotes
2240
    success/failure, status is L{objects.BlockDevStatus} on success, string
2241
    otherwise
2242

2243
  """
2244
  result = []
2245
  for disk in disks:
2246
    try:
2247
      rbd = _RecursiveFindBD(disk)
2248
      if rbd is None:
2249
        result.append((False, "Can't find device %s" % disk))
2250
        continue
2251

    
2252
      status = rbd.CombinedSyncStatus()
2253
    except errors.BlockDeviceError, err:
2254
      logging.exception("Error while getting disk status")
2255
      result.append((False, str(err)))
2256
    else:
2257
      result.append((True, status))
2258

    
2259
  assert len(disks) == len(result)
2260

    
2261
  return result
2262

    
2263

    
2264
def _RecursiveFindBD(disk):
2265
  """Check if a device is activated.
2266

2267
  If so, return information about the real device.
2268

2269
  @type disk: L{objects.Disk}
2270
  @param disk: the disk object we need to find
2271

2272
  @return: None if the device can't be found,
2273
      otherwise the device instance
2274

2275
  """
2276
  children = []
2277
  if disk.children:
2278
    for chdisk in disk.children:
2279
      children.append(_RecursiveFindBD(chdisk))
2280

    
2281
  return bdev.FindDevice(disk, children)
2282

    
2283

    
2284
def _OpenRealBD(disk):
2285
  """Opens the underlying block device of a disk.
2286

2287
  @type disk: L{objects.Disk}
2288
  @param disk: the disk object we want to open
2289

2290
  """
2291
  real_disk = _RecursiveFindBD(disk)
2292
  if real_disk is None:
2293
    _Fail("Block device '%s' is not set up", disk)
2294

    
2295
  real_disk.Open()
2296

    
2297
  return real_disk
2298

    
2299

    
2300
def BlockdevFind(disk):
2301
  """Check if a device is activated.
2302

2303
  If it is, return information about the real device.
2304

2305
  @type disk: L{objects.Disk}
2306
  @param disk: the disk to find
2307
  @rtype: None or objects.BlockDevStatus
2308
  @return: None if the disk cannot be found, otherwise a the current
2309
           information
2310

2311
  """
2312
  try:
2313
    rbd = _RecursiveFindBD(disk)
2314
  except errors.BlockDeviceError, err:
2315
    _Fail("Failed to find device: %s", err, exc=True)
2316

    
2317
  if rbd is None:
2318
    return None
2319

    
2320
  return rbd.GetSyncStatus()
2321

    
2322

    
2323
def BlockdevGetdimensions(disks):
2324
  """Computes the size of the given disks.
2325

2326
  If a disk is not found, returns None instead.
2327

2328
  @type disks: list of L{objects.Disk}
2329
  @param disks: the list of disk to compute the size for
2330
  @rtype: list
2331
  @return: list with elements None if the disk cannot be found,
2332
      otherwise the pair (size, spindles), where spindles is None if the
2333
      device doesn't support that
2334

2335
  """
2336
  result = []
2337
  for cf in disks:
2338
    try:
2339
      rbd = _RecursiveFindBD(cf)
2340
    except errors.BlockDeviceError:
2341
      result.append(None)
2342
      continue
2343
    if rbd is None:
2344
      result.append(None)
2345
    else:
2346
      result.append(rbd.GetActualDimensions())
2347
  return result
2348

    
2349

    
2350
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
2351
  """Export a block device to a remote node.
2352

2353
  @type disk: L{objects.Disk}
2354
  @param disk: the description of the disk to export
2355
  @type dest_node: str
2356
  @param dest_node: the destination node to export to
2357
  @type dest_path: str
2358
  @param dest_path: the destination path on the target node
2359
  @type cluster_name: str
2360
  @param cluster_name: the cluster name, needed for SSH hostalias
2361
  @rtype: None
2362

2363
  """
2364
  real_disk = _OpenRealBD(disk)
2365

    
2366
  # the block size on the read dd is 1MiB to match our units
2367
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
2368
                               "dd if=%s bs=1048576 count=%s",
2369
                               real_disk.dev_path, str(disk.size))
2370

    
2371
  # we set here a smaller block size as, due to ssh buffering, more
2372
  # than 64-128k will mostly ignored; we use nocreat to fail if the
2373
  # device is not already there or we pass a wrong path; we use
2374
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
2375
  # to not buffer too much memory; this means that at best, we flush
2376
  # every 64k, which will not be very fast
2377
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
2378
                                " oflag=dsync", dest_path)
2379

    
2380
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
2381
                                                   constants.SSH_LOGIN_USER,
2382
                                                   destcmd)
2383

    
2384
  # all commands have been checked, so we're safe to combine them
2385
  command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
2386

    
2387
  result = utils.RunCmd(["bash", "-c", command])
2388

    
2389
  if result.failed:
2390
    _Fail("Disk copy command '%s' returned error: %s"
2391
          " output: %s", command, result.fail_reason, result.output)
2392

    
2393

    
2394
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2395
  """Write a file to the filesystem.
2396

2397
  This allows the master to overwrite(!) a file. It will only perform
2398
  the operation if the file belongs to a list of configuration files.
2399

2400
  @type file_name: str
2401
  @param file_name: the target file name
2402
  @type data: str
2403
  @param data: the new contents of the file
2404
  @type mode: int
2405
  @param mode: the mode to give the file (can be None)
2406
  @type uid: string
2407
  @param uid: the owner of the file
2408
  @type gid: string
2409
  @param gid: the group of the file
2410
  @type atime: float
2411
  @param atime: the atime to set on the file (can be None)
2412
  @type mtime: float
2413
  @param mtime: the mtime to set on the file (can be None)
2414
  @rtype: None
2415

2416
  """
2417
  file_name = vcluster.LocalizeVirtualPath(file_name)
2418

    
2419
  if not os.path.isabs(file_name):
2420
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2421

    
2422
  if file_name not in _ALLOWED_UPLOAD_FILES:
2423
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2424
          file_name)
2425

    
2426
  raw_data = _Decompress(data)
2427

    
2428
  if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2429
    _Fail("Invalid username/groupname type")
2430

    
2431
  getents = runtime.GetEnts()
2432
  uid = getents.LookupUser(uid)
2433
  gid = getents.LookupGroup(gid)
2434

    
2435
  utils.SafeWriteFile(file_name, None,
2436
                      data=raw_data, mode=mode, uid=uid, gid=gid,
2437
                      atime=atime, mtime=mtime)
2438

    
2439

    
2440
def RunOob(oob_program, command, node, timeout):
2441
  """Executes oob_program with given command on given node.
2442

2443
  @param oob_program: The path to the executable oob_program
2444
  @param command: The command to invoke on oob_program
2445
  @param node: The node given as an argument to the program
2446
  @param timeout: Timeout after which we kill the oob program
2447

2448
  @return: stdout
2449
  @raise RPCFail: If execution fails for some reason
2450

2451
  """
2452
  result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2453

    
2454
  if result.failed:
2455
    _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2456
          result.fail_reason, result.output)
2457

    
2458
  return result.stdout
2459

    
2460

    
2461
def _OSOndiskAPIVersion(os_dir):
2462
  """Compute and return the API version of a given OS.
2463

2464
  This function will try to read the API version of the OS residing in
2465
  the 'os_dir' directory.
2466

2467
  @type os_dir: str
2468
  @param os_dir: the directory in which we should look for the OS
2469
  @rtype: tuple
2470
  @return: tuple (status, data) with status denoting the validity and
2471
      data holding either the vaid versions or an error message
2472

2473
  """
2474
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2475

    
2476
  try:
2477
    st = os.stat(api_file)
2478
  except EnvironmentError, err:
2479
    return False, ("Required file '%s' not found under path %s: %s" %
2480
                   (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err)))
2481

    
2482
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2483
    return False, ("File '%s' in %s is not a regular file" %
2484
                   (constants.OS_API_FILE, os_dir))
2485

    
2486
  try:
2487
    api_versions = utils.ReadFile(api_file).splitlines()
2488
  except EnvironmentError, err:
2489
    return False, ("Error while reading the API version file at %s: %s" %
2490
                   (api_file, utils.ErrnoOrStr(err)))
2491

    
2492
  try:
2493
    api_versions = [int(version.strip()) for version in api_versions]
2494
  except (TypeError, ValueError), err:
2495
    return False, ("API version(s) can't be converted to integer: %s" %
2496
                   str(err))
2497

    
2498
  return True, api_versions
2499

    
2500

    
2501
def DiagnoseOS(top_dirs=None):
2502
  """Compute the validity for all OSes.
2503

2504
  @type top_dirs: list
2505
  @param top_dirs: the list of directories in which to
2506
      search (if not given defaults to
2507
      L{pathutils.OS_SEARCH_PATH})
2508
  @rtype: list of L{objects.OS}
2509
  @return: a list of tuples (name, path, status, diagnose, variants,
2510
      parameters, api_version) for all (potential) OSes under all
2511
      search paths, where:
2512
          - name is the (potential) OS name
2513
          - path is the full path to the OS
2514
          - status True/False is the validity of the OS
2515
          - diagnose is the error message for an invalid OS, otherwise empty
2516
          - variants is a list of supported OS variants, if any
2517
          - parameters is a list of (name, help) parameters, if any
2518
          - api_version is a list of support OS API versions
2519

2520
  """
2521
  if top_dirs is None:
2522
    top_dirs = pathutils.OS_SEARCH_PATH
2523

    
2524
  result = []
2525
  for dir_name in top_dirs:
2526
    if os.path.isdir(dir_name):
2527
      try:
2528
        f_names = utils.ListVisibleFiles(dir_name)
2529
      except EnvironmentError, err:
2530
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2531
        break
2532
      for name in f_names:
2533
        os_path = utils.PathJoin(dir_name, name)
2534
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2535
        if status:
2536
          diagnose = ""
2537
          variants = os_inst.supported_variants
2538
          parameters = os_inst.supported_parameters
2539
          api_versions = os_inst.api_versions
2540
        else:
2541
          diagnose = os_inst
2542
          variants = parameters = api_versions = []
2543
        result.append((name, os_path, status, diagnose, variants,
2544
                       parameters, api_versions))
2545

    
2546
  return result
2547

    
2548

    
2549
def _TryOSFromDisk(name, base_dir=None):
2550
  """Create an OS instance from disk.
2551

2552
  This function will return an OS instance if the given name is a
2553
  valid OS name.
2554

2555
  @type base_dir: string
2556
  @keyword base_dir: Base directory containing OS installations.
2557
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2558
  @rtype: tuple
2559
  @return: success and either the OS instance if we find a valid one,
2560
      or error message
2561

2562
  """
2563
  if base_dir is None:
2564
    os_dir = utils.FindFile(name, pathutils.OS_SEARCH_PATH, os.path.isdir)
2565
  else:
2566
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2567

    
2568
  if os_dir is None:
2569
    return False, "Directory for OS %s not found in search path" % name
2570

    
2571
  status, api_versions = _OSOndiskAPIVersion(os_dir)
2572
  if not status:
2573
    # push the error up
2574
    return status, api_versions
2575

    
2576
  if not constants.OS_API_VERSIONS.intersection(api_versions):
2577
    return False, ("API version mismatch for path '%s': found %s, want %s." %
2578
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
2579

    
2580
  # OS Files dictionary, we will populate it with the absolute path
2581
  # names; if the value is True, then it is a required file, otherwise
2582
  # an optional one
2583
  os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2584

    
2585
  if max(api_versions) >= constants.OS_API_V15:
2586
    os_files[constants.OS_VARIANTS_FILE] = False
2587

    
2588
  if max(api_versions) >= constants.OS_API_V20:
2589
    os_files[constants.OS_PARAMETERS_FILE] = True
2590
  else:
2591
    del os_files[constants.OS_SCRIPT_VERIFY]
2592

    
2593
  for (filename, required) in os_files.items():
2594
    os_files[filename] = utils.PathJoin(os_dir, filename)
2595

    
2596
    try:
2597
      st = os.stat(os_files[filename])
2598
    except EnvironmentError, err:
2599
      if err.errno == errno.ENOENT and not required:
2600
        del os_files[filename]
2601
        continue
2602
      return False, ("File '%s' under path '%s' is missing (%s)" %
2603
                     (filename, os_dir, utils.ErrnoOrStr(err)))
2604

    
2605
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2606
      return False, ("File '%s' under path '%s' is not a regular file" %
2607
                     (filename, os_dir))
2608

    
2609
    if filename in constants.OS_SCRIPTS:
2610
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2611
        return False, ("File '%s' under path '%s' is not executable" %
2612
                       (filename, os_dir))
2613

    
2614
  variants = []
2615
  if constants.OS_VARIANTS_FILE in os_files:
2616
    variants_file = os_files[constants.OS_VARIANTS_FILE]
2617
    try:
2618
      variants = \
2619
        utils.FilterEmptyLinesAndComments(utils.ReadFile(variants_file))
2620
    except EnvironmentError, err:
2621
      # we accept missing files, but not other errors
2622
      if err.errno != errno.ENOENT:
2623
        return False, ("Error while reading the OS variants file at %s: %s" %
2624
                       (variants_file, utils.ErrnoOrStr(err)))
2625

    
2626
  parameters = []
2627
  if constants.OS_PARAMETERS_FILE in os_files:
2628
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2629
    try:
2630
      parameters = utils.ReadFile(parameters_file).splitlines()
2631
    except EnvironmentError, err:
2632
      return False, ("Error while reading the OS parameters file at %s: %s" %
2633
                     (parameters_file, utils.ErrnoOrStr(err)))
2634
    parameters = [v.split(None, 1) for v in parameters]
2635

    
2636
  os_obj = objects.OS(name=name, path=os_dir,
2637
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
2638
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
2639
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
2640
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
2641
                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2642
                                                 None),
2643
                      supported_variants=variants,
2644
                      supported_parameters=parameters,
2645
                      api_versions=api_versions)
2646
  return True, os_obj
2647

    
2648

    
2649
def OSFromDisk(name, base_dir=None):
2650
  """Create an OS instance from disk.
2651

2652
  This function will return an OS instance if the given name is a
2653
  valid OS name. Otherwise, it will raise an appropriate
2654
  L{RPCFail} exception, detailing why this is not a valid OS.
2655

2656
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2657
  an exception but returns true/false status data.
2658

2659
  @type base_dir: string
2660
  @keyword base_dir: Base directory containing OS installations.
2661
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2662
  @rtype: L{objects.OS}
2663
  @return: the OS instance if we find a valid one
2664
  @raise RPCFail: if we don't find a valid OS
2665

2666
  """
2667
  name_only = objects.OS.GetName(name)
2668
  status, payload = _TryOSFromDisk(name_only, base_dir)
2669

    
2670
  if not status:
2671
    _Fail(payload)
2672

    
2673
  return payload
2674

    
2675

    
2676
def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2677
  """Calculate the basic environment for an os script.
2678

2679
  @type os_name: str
2680
  @param os_name: full operating system name (including variant)
2681
  @type inst_os: L{objects.OS}
2682
  @param inst_os: operating system for which the environment is being built
2683
  @type os_params: dict
2684
  @param os_params: the OS parameters
2685
  @type debug: integer
2686
  @param debug: debug level (0 or 1, for OS Api 10)
2687
  @rtype: dict
2688
  @return: dict of environment variables
2689
  @raise errors.BlockDeviceError: if the block device
2690
      cannot be found
2691

2692
  """
2693
  result = {}
2694
  api_version = \
2695
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2696
  result["OS_API_VERSION"] = "%d" % api_version
2697
  result["OS_NAME"] = inst_os.name
2698
  result["DEBUG_LEVEL"] = "%d" % debug
2699

    
2700
  # OS variants
2701
  if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2702
    variant = objects.OS.GetVariant(os_name)
2703
    if not variant:
2704
      variant = inst_os.supported_variants[0]
2705
  else:
2706
    variant = ""
2707
  result["OS_VARIANT"] = variant
2708

    
2709
  # OS params
2710
  for pname, pvalue in os_params.items():
2711
    result["OSP_%s" % pname.upper()] = pvalue
2712

    
2713
  # Set a default path otherwise programs called by OS scripts (or
2714
  # even hooks called from OS scripts) might break, and we don't want
2715
  # to have each script require setting a PATH variable
2716
  result["PATH"] = constants.HOOKS_PATH
2717

    
2718
  return result
2719

    
2720

    
2721
def OSEnvironment(instance, inst_os, debug=0):
2722
  """Calculate the environment for an os script.
2723

2724
  @type instance: L{objects.Instance}
2725
  @param instance: target instance for the os script run
2726
  @type inst_os: L{objects.OS}
2727
  @param inst_os: operating system for which the environment is being built
2728
  @type debug: integer
2729
  @param debug: debug level (0 or 1, for OS Api 10)
2730
  @rtype: dict
2731
  @return: dict of environment variables
2732
  @raise errors.BlockDeviceError: if the block device
2733
      cannot be found
2734

2735
  """
2736
  result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2737

    
2738
  for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2739
    result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2740

    
2741
  result["HYPERVISOR"] = instance.hypervisor
2742
  result["DISK_COUNT"] = "%d" % len(instance.disks)
2743
  result["NIC_COUNT"] = "%d" % len(instance.nics)
2744
  result["INSTANCE_SECONDARY_NODES"] = \
2745
      ("%s" % " ".join(instance.secondary_nodes))
2746

    
2747
  # Disks
2748
  for idx, disk in enumerate(instance.disks):
2749
    real_disk = _OpenRealBD(disk)
2750
    result["DISK_%d_PATH" % idx] = real_disk.dev_path
2751
    result["DISK_%d_ACCESS" % idx] = disk.mode
2752
    if constants.HV_DISK_TYPE in instance.hvparams:
2753
      result["DISK_%d_FRONTEND_TYPE" % idx] = \
2754
        instance.hvparams[constants.HV_DISK_TYPE]
2755
    if disk.dev_type in constants.LDS_BLOCK:
2756
      result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2757
    elif disk.dev_type == constants.LD_FILE:
2758
      result["DISK_%d_BACKEND_TYPE" % idx] = \
2759
        "file:%s" % disk.physical_id[0]
2760

    
2761
  # NICs
2762
  for idx, nic in enumerate(instance.nics):
2763
    result["NIC_%d_MAC" % idx] = nic.mac
2764
    if nic.ip:
2765
      result["NIC_%d_IP" % idx] = nic.ip
2766
    result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2767
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2768
      result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2769
    if nic.nicparams[constants.NIC_LINK]:
2770
      result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2771
    if nic.netinfo:
2772
      nobj = objects.Network.FromDict(nic.netinfo)
2773
      result.update(nobj.HooksDict("NIC_%d_" % idx))
2774
    if constants.HV_NIC_TYPE in instance.hvparams:
2775
      result["NIC_%d_FRONTEND_TYPE" % idx] = \
2776
        instance.hvparams[constants.HV_NIC_TYPE]
2777

    
2778
  # HV/BE params
2779
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2780
    for key, value in source.items():
2781
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2782

    
2783
  return result
2784

    
2785

    
2786
def DiagnoseExtStorage(top_dirs=None):
2787
  """Compute the validity for all ExtStorage Providers.
2788

2789
  @type top_dirs: list
2790
  @param top_dirs: the list of directories in which to
2791
      search (if not given defaults to
2792
      L{pathutils.ES_SEARCH_PATH})
2793
  @rtype: list of L{objects.ExtStorage}
2794
  @return: a list of tuples (name, path, status, diagnose, parameters)
2795
      for all (potential) ExtStorage Providers under all
2796
      search paths, where:
2797
          - name is the (potential) ExtStorage Provider
2798
          - path is the full path to the ExtStorage Provider
2799
          - status True/False is the validity of the ExtStorage Provider
2800
          - diagnose is the error message for an invalid ExtStorage Provider,
2801
            otherwise empty
2802
          - parameters is a list of (name, help) parameters, if any
2803

2804
  """
2805
  if top_dirs is None:
2806
    top_dirs = pathutils.ES_SEARCH_PATH
2807

    
2808
  result = []
2809
  for dir_name in top_dirs:
2810
    if os.path.isdir(dir_name):
2811
      try:
2812
        f_names = utils.ListVisibleFiles(dir_name)
2813
      except EnvironmentError, err:
2814
        logging.exception("Can't list the ExtStorage directory %s: %s",
2815
                          dir_name, err)
2816
        break
2817
      for name in f_names:
2818
        es_path = utils.PathJoin(dir_name, name)
2819
        status, es_inst = bdev.ExtStorageFromDisk(name, base_dir=dir_name)
2820
        if status:
2821
          diagnose = ""
2822
          parameters = es_inst.supported_parameters
2823
        else:
2824
          diagnose = es_inst
2825
          parameters = []
2826
        result.append((name, es_path, status, diagnose, parameters))
2827

    
2828
  return result
2829

    
2830

    
2831
def BlockdevGrow(disk, amount, dryrun, backingstore):
2832
  """Grow a stack of block devices.
2833

2834
  This function is called recursively, with the childrens being the
2835
  first ones to resize.
2836

2837
  @type disk: L{objects.Disk}
2838
  @param disk: the disk to be grown
2839
  @type amount: integer
2840
  @param amount: the amount (in mebibytes) to grow with
2841
  @type dryrun: boolean
2842
  @param dryrun: whether to execute the operation in simulation mode
2843
      only, without actually increasing the size
2844
  @param backingstore: whether to execute the operation on backing storage
2845
      only, or on "logical" storage only; e.g. DRBD is logical storage,
2846
      whereas LVM, file, RBD are backing storage
2847
  @rtype: (status, result)
2848
  @return: a tuple with the status of the operation (True/False), and
2849
      the errors message if status is False
2850

2851
  """
2852
  r_dev = _RecursiveFindBD(disk)
2853
  if r_dev is None:
2854
    _Fail("Cannot find block device %s", disk)
2855

    
2856
  try:
2857
    r_dev.Grow(amount, dryrun, backingstore)
2858
  except errors.BlockDeviceError, err:
2859
    _Fail("Failed to grow block device: %s", err, exc=True)
2860

    
2861

    
2862
def BlockdevSnapshot(disk):
2863
  """Create a snapshot copy of a block device.
2864

2865
  This function is called recursively, and the snapshot is actually created
2866
  just for the leaf lvm backend device.
2867

2868
  @type disk: L{objects.Disk}
2869
  @param disk: the disk to be snapshotted
2870
  @rtype: string
2871
  @return: snapshot disk ID as (vg, lv)
2872

2873
  """
2874
  if disk.dev_type == constants.LD_DRBD8:
2875
    if not disk.children:
2876
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2877
            disk.unique_id)
2878
    return BlockdevSnapshot(disk.children[0])
2879
  elif disk.dev_type == constants.LD_LV:
2880
    r_dev = _RecursiveFindBD(disk)
2881
    if r_dev is not None:
2882
      # FIXME: choose a saner value for the snapshot size
2883
      # let's stay on the safe side and ask for the full size, for now
2884
      return r_dev.Snapshot(disk.size)
2885
    else:
2886
      _Fail("Cannot find block device %s", disk)
2887
  else:
2888
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2889
          disk.unique_id, disk.dev_type)
2890

    
2891

    
2892
def BlockdevSetInfo(disk, info):
2893
  """Sets 'metadata' information on block devices.
2894

2895
  This function sets 'info' metadata on block devices. Initial
2896
  information is set at device creation; this function should be used
2897
  for example after renames.
2898

2899
  @type disk: L{objects.Disk}
2900
  @param disk: the disk to be grown
2901
  @type info: string
2902
  @param info: new 'info' metadata
2903
  @rtype: (status, result)
2904
  @return: a tuple with the status of the operation (True/False), and
2905
      the errors message if status is False
2906

2907
  """
2908
  r_dev = _RecursiveFindBD(disk)
2909
  if r_dev is None:
2910
    _Fail("Cannot find block device %s", disk)
2911

    
2912
  try:
2913
    r_dev.SetInfo(info)
2914
  except errors.BlockDeviceError, err:
2915
    _Fail("Failed to set information on block device: %s", err, exc=True)
2916

    
2917

    
2918
def FinalizeExport(instance, snap_disks):
2919
  """Write out the export configuration information.
2920

2921
  @type instance: L{objects.Instance}
2922
  @param instance: the instance which we export, used for
2923
      saving configuration
2924
  @type snap_disks: list of L{objects.Disk}
2925
  @param snap_disks: list of snapshot block devices, which
2926
      will be used to get the actual name of the dump file
2927

2928
  @rtype: None
2929

2930
  """
2931
  destdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name + ".new")
2932
  finaldestdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name)
2933

    
2934
  config = objects.SerializableConfigParser()
2935

    
2936
  config.add_section(constants.INISECT_EXP)
2937
  config.set(constants.INISECT_EXP, "version", "0")
2938
  config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2939
  config.set(constants.INISECT_EXP, "source", instance.primary_node)
2940
  config.set(constants.INISECT_EXP, "os", instance.os)
2941
  config.set(constants.INISECT_EXP, "compression", "none")
2942

    
2943
  config.add_section(constants.INISECT_INS)
2944
  config.set(constants.INISECT_INS, "name", instance.name)
2945
  config.set(constants.INISECT_INS, "maxmem", "%d" %
2946
             instance.beparams[constants.BE_MAXMEM])
2947
  config.set(constants.INISECT_INS, "minmem", "%d" %
2948
             instance.beparams[constants.BE_MINMEM])
2949
  # "memory" is deprecated, but useful for exporting to old ganeti versions
2950
  config.set(constants.INISECT_INS, "memory", "%d" %
2951
             instance.beparams[constants.BE_MAXMEM])
2952
  config.set(constants.INISECT_INS, "vcpus", "%d" %
2953
             instance.beparams[constants.BE_VCPUS])
2954
  config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2955
  config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2956
  config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2957

    
2958
  nic_total = 0
2959
  for nic_count, nic in enumerate(instance.nics):
2960
    nic_total += 1
2961
    config.set(constants.INISECT_INS, "nic%d_mac" %
2962
               nic_count, "%s" % nic.mac)
2963
    config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2964
    config.set(constants.INISECT_INS, "nic%d_network" % nic_count,
2965
               "%s" % nic.network)
2966
    for param in constants.NICS_PARAMETER_TYPES:
2967
      config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2968
                 "%s" % nic.nicparams.get(param, None))
2969
  # TODO: redundant: on load can read nics until it doesn't exist
2970
  config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2971

    
2972
  disk_total = 0
2973
  for disk_count, disk in enumerate(snap_disks):
2974
    if disk:
2975
      disk_total += 1
2976
      config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2977
                 ("%s" % disk.iv_name))
2978
      config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2979
                 ("%s" % disk.physical_id[1]))
2980
      config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2981
                 ("%d" % disk.size))
2982

    
2983
  config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2984

    
2985
  # New-style hypervisor/backend parameters
2986

    
2987
  config.add_section(constants.INISECT_HYP)
2988
  for name, value in instance.hvparams.items():
2989
    if name not in constants.HVC_GLOBALS:
2990
      config.set(constants.INISECT_HYP, name, str(value))
2991

    
2992
  config.add_section(constants.INISECT_BEP)
2993
  for name, value in instance.beparams.items():
2994
    config.set(constants.INISECT_BEP, name, str(value))
2995

    
2996
  config.add_section(constants.INISECT_OSP)
2997
  for name, value in instance.osparams.items():
2998
    config.set(constants.INISECT_OSP, name, str(value))
2999

    
3000
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
3001
                  data=config.Dumps())
3002
  shutil.rmtree(finaldestdir, ignore_errors=True)
3003
  shutil.move(destdir, finaldestdir)
3004

    
3005

    
3006
def ExportInfo(dest):
3007
  """Get export configuration information.
3008

3009
  @type dest: str
3010
  @param dest: directory containing the export
3011

3012
  @rtype: L{objects.SerializableConfigParser}
3013
  @return: a serializable config file containing the
3014
      export info
3015

3016
  """
3017
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
3018

    
3019
  config = objects.SerializableConfigParser()
3020
  config.read(cff)
3021

    
3022
  if (not config.has_section(constants.INISECT_EXP) or
3023
      not config.has_section(constants.INISECT_INS)):
3024
    _Fail("Export info file doesn't have the required fields")
3025

    
3026
  return config.Dumps()
3027

    
3028

    
3029
def ListExports():
3030
  """Return a list of exports currently available on this machine.
3031

3032
  @rtype: list
3033
  @return: list of the exports
3034

3035
  """
3036
  if os.path.isdir(pathutils.EXPORT_DIR):
3037
    return sorted(utils.ListVisibleFiles(pathutils.EXPORT_DIR))
3038
  else:
3039
    _Fail("No exports directory")
3040

    
3041

    
3042
def RemoveExport(export):
3043
  """Remove an existing export from the node.
3044

3045
  @type export: str
3046
  @param export: the name of the export to remove
3047
  @rtype: None
3048

3049
  """
3050
  target = utils.PathJoin(pathutils.EXPORT_DIR, export)
3051

    
3052
  try:
3053
    shutil.rmtree(target)
3054
  except EnvironmentError, err:
3055
    _Fail("Error while removing the export: %s", err, exc=True)
3056

    
3057

    
3058
def BlockdevRename(devlist):
3059
  """Rename a list of block devices.
3060

3061
  @type devlist: list of tuples
3062
  @param devlist: list of tuples of the form  (disk,
3063
      new_logical_id, new_physical_id); disk is an
3064
      L{objects.Disk} object describing the current disk,
3065
      and new logical_id/physical_id is the name we
3066
      rename it to
3067
  @rtype: boolean
3068
  @return: True if all renames succeeded, False otherwise
3069

3070
  """
3071
  msgs = []
3072
  result = True
3073
  for disk, unique_id in devlist:
3074
    dev = _RecursiveFindBD(disk)
3075
    if dev is None:
3076
      msgs.append("Can't find device %s in rename" % str(disk))
3077
      result = False
3078
      continue
3079
    try:
3080
      old_rpath = dev.dev_path
3081
      dev.Rename(unique_id)
3082
      new_rpath = dev.dev_path
3083
      if old_rpath != new_rpath:
3084
        DevCacheManager.RemoveCache(old_rpath)
3085
        # FIXME: we should add the new cache information here, like:
3086
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
3087
        # but we don't have the owner here - maybe parse from existing
3088
        # cache? for now, we only lose lvm data when we rename, which
3089
        # is less critical than DRBD or MD
3090
    except errors.BlockDeviceError, err:
3091
      msgs.append("Can't rename device '%s' to '%s': %s" %
3092
                  (dev, unique_id, err))
3093
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
3094
      result = False
3095
  if not result:
3096
    _Fail("; ".join(msgs))
3097

    
3098

    
3099
def _TransformFileStorageDir(fs_dir):
3100
  """Checks whether given file_storage_dir is valid.
3101

3102
  Checks wheter the given fs_dir is within the cluster-wide default
3103
  file_storage_dir or the shared_file_storage_dir, which are stored in
3104
  SimpleStore. Only paths under those directories are allowed.
3105

3106
  @type fs_dir: str
3107
  @param fs_dir: the path to check
3108

3109
  @return: the normalized path if valid, None otherwise
3110

3111
  """
3112
  if not (constants.ENABLE_FILE_STORAGE or
3113
          constants.ENABLE_SHARED_FILE_STORAGE):
3114
    _Fail("File storage disabled at configure time")
3115

    
3116
  bdev.CheckFileStoragePath(fs_dir)
3117

    
3118
  return os.path.normpath(fs_dir)
3119

    
3120

    
3121
def CreateFileStorageDir(file_storage_dir):
3122
  """Create file storage directory.
3123

3124
  @type file_storage_dir: str
3125
  @param file_storage_dir: directory to create
3126

3127
  @rtype: tuple
3128
  @return: tuple with first element a boolean indicating wheter dir
3129
      creation was successful or not
3130

3131
  """
3132
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
3133
  if os.path.exists(file_storage_dir):
3134
    if not os.path.isdir(file_storage_dir):
3135
      _Fail("Specified storage dir '%s' is not a directory",
3136
            file_storage_dir)
3137
  else:
3138
    try:
3139
      os.makedirs(file_storage_dir, 0750)
3140
    except OSError, err:
3141
      _Fail("Cannot create file storage directory '%s': %s",
3142
            file_storage_dir, err, exc=True)
3143

    
3144

    
3145
def RemoveFileStorageDir(file_storage_dir):
3146
  """Remove file storage directory.
3147

3148
  Remove it only if it's empty. If not log an error and return.
3149

3150
  @type file_storage_dir: str
3151
  @param file_storage_dir: the directory we should cleanup
3152
  @rtype: tuple (success,)
3153
  @return: tuple of one element, C{success}, denoting
3154
      whether the operation was successful
3155

3156
  """
3157
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
3158
  if os.path.exists(file_storage_dir):
3159
    if not os.path.isdir(file_storage_dir):
3160
      _Fail("Specified Storage directory '%s' is not a directory",
3161
            file_storage_dir)
3162
    # deletes dir only if empty, otherwise we want to fail the rpc call
3163
    try:
3164
      os.rmdir(file_storage_dir)
3165
    except OSError, err:
3166
      _Fail("Cannot remove file storage directory '%s': %s",
3167
            file_storage_dir, err)
3168

    
3169

    
3170
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
3171
  """Rename the file storage directory.
3172

3173
  @type old_file_storage_dir: str
3174
  @param old_file_storage_dir: the current path
3175
  @type new_file_storage_dir: str
3176
  @param new_file_storage_dir: the name we should rename to
3177
  @rtype: tuple (success,)
3178
  @return: tuple of one element, C{success}, denoting
3179
      whether the operation was successful
3180

3181
  """
3182
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
3183
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
3184
  if not os.path.exists(new_file_storage_dir):
3185
    if os.path.isdir(old_file_storage_dir):
3186
      try:
3187
        os.rename(old_file_storage_dir, new_file_storage_dir)
3188
      except OSError, err:
3189
        _Fail("Cannot rename '%s' to '%s': %s",
3190
              old_file_storage_dir, new_file_storage_dir, err)
3191
    else:
3192
      _Fail("Specified storage dir '%s' is not a directory",
3193
            old_file_storage_dir)
3194
  else:
3195
    if os.path.exists(old_file_storage_dir):
3196
      _Fail("Cannot rename '%s' to '%s': both locations exist",
3197
            old_file_storage_dir, new_file_storage_dir)
3198

    
3199

    
3200
def _EnsureJobQueueFile(file_name):
3201
  """Checks whether the given filename is in the queue directory.
3202

3203
  @type file_name: str
3204
  @param file_name: the file name we should check
3205
  @rtype: None
3206
  @raises RPCFail: if the file is not valid
3207

3208
  """
3209
  if not utils.IsBelowDir(pathutils.QUEUE_DIR, file_name):
3210
    _Fail("Passed job queue file '%s' does not belong to"
3211
          " the queue directory '%s'", file_name, pathutils.QUEUE_DIR)
3212

    
3213

    
3214
def JobQueueUpdate(file_name, content):
3215
  """Updates a file in the queue directory.
3216

3217
  This is just a wrapper over L{utils.io.WriteFile}, with proper
3218
  checking.
3219

3220
  @type file_name: str
3221
  @param file_name: the job file name
3222
  @type content: str
3223
  @param content: the new job contents
3224
  @rtype: boolean
3225
  @return: the success of the operation
3226

3227
  """
3228
  file_name = vcluster.LocalizeVirtualPath(file_name)
3229

    
3230
  _EnsureJobQueueFile(file_name)
3231
  getents = runtime.GetEnts()
3232

    
3233
  # Write and replace the file atomically
3234
  utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
3235
                  gid=getents.daemons_gid, mode=constants.JOB_QUEUE_FILES_PERMS)
3236

    
3237

    
3238
def JobQueueRename(old, new):
3239
  """Renames a job queue file.
3240

3241
  This is just a wrapper over os.rename with proper checking.
3242

3243
  @type old: str
3244
  @param old: the old (actual) file name
3245
  @type new: str
3246
  @param new: the desired file name
3247
  @rtype: tuple
3248
  @return: the success of the operation and payload
3249

3250
  """
3251
  old = vcluster.LocalizeVirtualPath(old)
3252
  new = vcluster.LocalizeVirtualPath(new)
3253

    
3254
  _EnsureJobQueueFile(old)
3255
  _EnsureJobQueueFile(new)
3256

    
3257
  getents = runtime.GetEnts()
3258

    
3259
  utils.RenameFile(old, new, mkdir=True, mkdir_mode=0750,
3260
                   dir_uid=getents.masterd_uid, dir_gid=getents.daemons_gid)
3261

    
3262

    
3263
def BlockdevClose(instance_name, disks):
3264
  """Closes the given block devices.
3265

3266
  This means they will be switched to secondary mode (in case of
3267
  DRBD).
3268

3269
  @param instance_name: if the argument is not empty, the symlinks
3270
      of this instance will be removed
3271
  @type disks: list of L{objects.Disk}
3272
  @param disks: the list of disks to be closed
3273
  @rtype: tuple (success, message)
3274
  @return: a tuple of success and message, where success
3275
      indicates the succes of the operation, and message
3276
      which will contain the error details in case we
3277
      failed
3278

3279
  """
3280
  bdevs = []
3281
  for cf in disks:
3282
    rd = _RecursiveFindBD(cf)
3283
    if rd is None:
3284
      _Fail("Can't find device %s", cf)
3285
    bdevs.append(rd)
3286

    
3287
  msg = []
3288
  for rd in bdevs:
3289
    try:
3290
      rd.Close()
3291
    except errors.BlockDeviceError, err:
3292
      msg.append(str(err))
3293
  if msg:
3294
    _Fail("Can't make devices secondary: %s", ",".join(msg))
3295
  else:
3296
    if instance_name:
3297
      _RemoveBlockDevLinks(instance_name, disks)
3298

    
3299

    
3300
def ValidateHVParams(hvname, hvparams):
3301
  """Validates the given hypervisor parameters.
3302

3303
  @type hvname: string
3304
  @param hvname: the hypervisor name
3305
  @type hvparams: dict
3306
  @param hvparams: the hypervisor parameters to be validated
3307
  @rtype: None
3308

3309
  """
3310
  try:
3311
    hv_type = hypervisor.GetHypervisor(hvname)
3312
    hv_type.ValidateParameters(hvparams)
3313
  except errors.HypervisorError, err:
3314
    _Fail(str(err), log=False)
3315

    
3316

    
3317
def _CheckOSPList(os_obj, parameters):
3318
  """Check whether a list of parameters is supported by the OS.
3319

3320
  @type os_obj: L{objects.OS}
3321
  @param os_obj: OS object to check
3322
  @type parameters: list
3323
  @param parameters: the list of parameters to check
3324

3325
  """
3326
  supported = [v[0] for v in os_obj.supported_parameters]
3327
  delta = frozenset(parameters).difference(supported)
3328
  if delta:
3329
    _Fail("The following parameters are not supported"
3330
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
3331

    
3332

    
3333
def ValidateOS(required, osname, checks, osparams):
3334
  """Validate the given OS' parameters.
3335

3336
  @type required: boolean
3337
  @param required: whether absence of the OS should translate into
3338
      failure or not
3339
  @type osname: string
3340
  @param osname: the OS to be validated
3341
  @type checks: list
3342
  @param checks: list of the checks to run (currently only 'parameters')
3343
  @type osparams: dict
3344
  @param osparams: dictionary with OS parameters
3345
  @rtype: boolean
3346
  @return: True if the validation passed, or False if the OS was not
3347
      found and L{required} was false
3348

3349
  """
3350
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
3351
    _Fail("Unknown checks required for OS %s: %s", osname,
3352
          set(checks).difference(constants.OS_VALIDATE_CALLS))
3353

    
3354
  name_only = objects.OS.GetName(osname)
3355
  status, tbv = _TryOSFromDisk(name_only, None)
3356

    
3357
  if not status:
3358
    if required:
3359
      _Fail(tbv)
3360
    else:
3361
      return False
3362

    
3363
  if max(tbv.api_versions) < constants.OS_API_V20:
3364
    return True
3365

    
3366
  if constants.OS_VALIDATE_PARAMETERS in checks:
3367
    _CheckOSPList(tbv, osparams.keys())
3368

    
3369
  validate_env = OSCoreEnv(osname, tbv, osparams)
3370
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
3371
                        cwd=tbv.path, reset_env=True)
3372
  if result.failed:
3373
    logging.error("os validate command '%s' returned error: %s output: %s",
3374
                  result.cmd, result.fail_reason, result.output)
3375
    _Fail("OS validation script failed (%s), output: %s",
3376
          result.fail_reason, result.output, log=False)
3377

    
3378
  return True
3379

    
3380

    
3381
def DemoteFromMC():
3382
  """Demotes the current node from master candidate role.
3383

3384
  """
3385
  # try to ensure we're not the master by mistake
3386
  master, myself = ssconf.GetMasterAndMyself()
3387
  if master == myself:
3388
    _Fail("ssconf status shows I'm the master node, will not demote")
3389

    
3390
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "check", constants.MASTERD])
3391
  if not result.failed:
3392
    _Fail("The master daemon is running, will not demote")
3393

    
3394
  try:
3395
    if os.path.isfile(pathutils.CLUSTER_CONF_FILE):
3396
      utils.CreateBackup(pathutils.CLUSTER_CONF_FILE)
3397
  except EnvironmentError, err:
3398
    if err.errno != errno.ENOENT:
3399
      _Fail("Error while backing up cluster file: %s", err, exc=True)
3400

    
3401
  utils.RemoveFile(pathutils.CLUSTER_CONF_FILE)
3402

    
3403

    
3404
def _GetX509Filenames(cryptodir, name):
3405
  """Returns the full paths for the private key and certificate.
3406

3407
  """
3408
  return (utils.PathJoin(cryptodir, name),
3409
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
3410
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
3411

    
3412

    
3413
def CreateX509Certificate(validity, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3414
  """Creates a new X509 certificate for SSL/TLS.
3415

3416
  @type validity: int
3417
  @param validity: Validity in seconds
3418
  @rtype: tuple; (string, string)
3419
  @return: Certificate name and public part
3420

3421
  """
3422
  (key_pem, cert_pem) = \
3423
    utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
3424
                                     min(validity, _MAX_SSL_CERT_VALIDITY))
3425

    
3426
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
3427
                              prefix="x509-%s-" % utils.TimestampForFilename())
3428
  try:
3429
    name = os.path.basename(cert_dir)
3430
    assert len(name) > 5
3431

    
3432
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3433

    
3434
    utils.WriteFile(key_file, mode=0400, data=key_pem)
3435
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
3436

    
3437
    # Never return private key as it shouldn't leave the node
3438
    return (name, cert_pem)
3439
  except Exception:
3440
    shutil.rmtree(cert_dir, ignore_errors=True)
3441
    raise
3442

    
3443

    
3444
def RemoveX509Certificate(name, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3445
  """Removes a X509 certificate.
3446

3447
  @type name: string
3448
  @param name: Certificate name
3449

3450
  """
3451
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3452

    
3453
  utils.RemoveFile(key_file)
3454
  utils.RemoveFile(cert_file)
3455

    
3456
  try:
3457
    os.rmdir(cert_dir)
3458
  except EnvironmentError, err:
3459
    _Fail("Cannot remove certificate directory '%s': %s",
3460
          cert_dir, err)
3461

    
3462

    
3463
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3464
  """Returns the command for the requested input/output.
3465

3466
  @type instance: L{objects.Instance}
3467
  @param instance: The instance object
3468
  @param mode: Import/export mode
3469
  @param ieio: Input/output type
3470
  @param ieargs: Input/output arguments
3471

3472
  """
3473
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3474

    
3475
  env = None
3476
  prefix = None
3477
  suffix = None
3478
  exp_size = None
3479

    
3480
  if ieio == constants.IEIO_FILE:
3481
    (filename, ) = ieargs
3482

    
3483
    if not utils.IsNormAbsPath(filename):
3484
      _Fail("Path '%s' is not normalized or absolute", filename)
3485

    
3486
    real_filename = os.path.realpath(filename)
3487
    directory = os.path.dirname(real_filename)
3488

    
3489
    if not utils.IsBelowDir(pathutils.EXPORT_DIR, real_filename):
3490
      _Fail("File '%s' is not under exports directory '%s': %s",
3491
            filename, pathutils.EXPORT_DIR, real_filename)
3492

    
3493
    # Create directory
3494
    utils.Makedirs(directory, mode=0750)
3495

    
3496
    quoted_filename = utils.ShellQuote(filename)
3497

    
3498
    if mode == constants.IEM_IMPORT:
3499
      suffix = "> %s" % quoted_filename
3500
    elif mode == constants.IEM_EXPORT:
3501
      suffix = "< %s" % quoted_filename
3502

    
3503
      # Retrieve file size
3504
      try:
3505
        st = os.stat(filename)
3506
      except EnvironmentError, err:
3507
        logging.error("Can't stat(2) %s: %s", filename, err)
3508
      else:
3509
        exp_size = utils.BytesToMebibyte(st.st_size)
3510

    
3511
  elif ieio == constants.IEIO_RAW_DISK:
3512
    (disk, ) = ieargs
3513

    
3514
    real_disk = _OpenRealBD(disk)
3515

    
3516
    if mode == constants.IEM_IMPORT:
3517
      # we set here a smaller block size as, due to transport buffering, more
3518
      # than 64-128k will mostly ignored; we use nocreat to fail if the device
3519
      # is not already there or we pass a wrong path; we use notrunc to no
3520
      # attempt truncate on an LV device; we use oflag=dsync to not buffer too
3521
      # much memory; this means that at best, we flush every 64k, which will
3522
      # not be very fast
3523
      suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
3524
                                    " bs=%s oflag=dsync"),
3525
                                    real_disk.dev_path,
3526
                                    str(64 * 1024))
3527

    
3528
    elif mode == constants.IEM_EXPORT:
3529
      # the block size on the read dd is 1MiB to match our units
3530
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3531
                                   real_disk.dev_path,
3532
                                   str(1024 * 1024), # 1 MB
3533
                                   str(disk.size))
3534
      exp_size = disk.size
3535

    
3536
  elif ieio == constants.IEIO_SCRIPT:
3537
    (disk, disk_index, ) = ieargs
3538

    
3539
    assert isinstance(disk_index, (int, long))
3540

    
3541
    real_disk = _OpenRealBD(disk)
3542

    
3543
    inst_os = OSFromDisk(instance.os)
3544
    env = OSEnvironment(instance, inst_os)
3545

    
3546
    if mode == constants.IEM_IMPORT:
3547
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3548
      env["IMPORT_INDEX"] = str(disk_index)
3549
      script = inst_os.import_script
3550

    
3551
    elif mode == constants.IEM_EXPORT:
3552
      env["EXPORT_DEVICE"] = real_disk.dev_path
3553
      env["EXPORT_INDEX"] = str(disk_index)
3554
      script = inst_os.export_script
3555

    
3556
    # TODO: Pass special environment only to script
3557
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3558

    
3559
    if mode == constants.IEM_IMPORT:
3560
      suffix = "| %s" % script_cmd
3561

    
3562
    elif mode == constants.IEM_EXPORT:
3563
      prefix = "%s |" % script_cmd
3564

    
3565
    # Let script predict size
3566
    exp_size = constants.IE_CUSTOM_SIZE
3567

    
3568
  else:
3569
    _Fail("Invalid %s I/O mode %r", mode, ieio)
3570

    
3571
  return (env, prefix, suffix, exp_size)
3572

    
3573

    
3574
def _CreateImportExportStatusDir(prefix):
3575
  """Creates status directory for import/export.
3576

3577
  """
3578
  return tempfile.mkdtemp(dir=pathutils.IMPORT_EXPORT_DIR,
3579
                          prefix=("%s-%s-" %
3580
                                  (prefix, utils.TimestampForFilename())))
3581

    
3582

    
3583
def StartImportExportDaemon(mode, opts, host, port, instance, component,
3584
                            ieio, ieioargs):
3585
  """Starts an import or export daemon.
3586

3587
  @param mode: Import/output mode
3588
  @type opts: L{objects.ImportExportOptions}
3589
  @param opts: Daemon options
3590
  @type host: string
3591
  @param host: Remote host for export (None for import)
3592
  @type port: int
3593
  @param port: Remote port for export (None for import)
3594
  @type instance: L{objects.Instance}
3595
  @param instance: Instance object
3596
  @type component: string
3597
  @param component: which part of the instance is transferred now,
3598
      e.g. 'disk/0'
3599
  @param ieio: Input/output type
3600
  @param ieioargs: Input/output arguments
3601

3602
  """
3603
  if mode == constants.IEM_IMPORT:
3604
    prefix = "import"
3605

    
3606
    if not (host is None and port is None):
3607
      _Fail("Can not specify host or port on import")
3608

    
3609
  elif mode == constants.IEM_EXPORT:
3610
    prefix = "export"
3611

    
3612
    if host is None or port is None:
3613
      _Fail("Host and port must be specified for an export")
3614

    
3615
  else:
3616
    _Fail("Invalid mode %r", mode)
3617

    
3618
  if (opts.key_name is None) ^ (opts.ca_pem is None):
3619
    _Fail("Cluster certificate can only be used for both key and CA")
3620

    
3621
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3622
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3623

    
3624
  if opts.key_name is None:
3625
    # Use server.pem
3626
    key_path = pathutils.NODED_CERT_FILE
3627
    cert_path = pathutils.NODED_CERT_FILE
3628
    assert opts.ca_pem is None
3629
  else:
3630
    (_, key_path, cert_path) = _GetX509Filenames(pathutils.CRYPTO_KEYS_DIR,
3631
                                                 opts.key_name)
3632
    assert opts.ca_pem is not None
3633

    
3634
  for i in [key_path, cert_path]:
3635
    if not os.path.exists(i):
3636
      _Fail("File '%s' does not exist" % i)
3637

    
3638
  status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3639
  try:
3640
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3641
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3642
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3643

    
3644
    if opts.ca_pem is None:
3645
      # Use server.pem
3646
      ca = utils.ReadFile(pathutils.NODED_CERT_FILE)
3647
    else:
3648
      ca = opts.ca_pem
3649

    
3650
    # Write CA file
3651
    utils.WriteFile(ca_file, data=ca, mode=0400)
3652

    
3653
    cmd = [
3654
      pathutils.IMPORT_EXPORT_DAEMON,
3655
      status_file, mode,
3656
      "--key=%s" % key_path,
3657
      "--cert=%s" % cert_path,
3658
      "--ca=%s" % ca_file,
3659
      ]
3660

    
3661
    if host:
3662
      cmd.append("--host=%s" % host)
3663

    
3664
    if port:
3665
      cmd.append("--port=%s" % port)
3666

    
3667
    if opts.ipv6:
3668
      cmd.append("--ipv6")
3669
    else:
3670
      cmd.append("--ipv4")
3671

    
3672
    if opts.compress:
3673
      cmd.append("--compress=%s" % opts.compress)
3674

    
3675
    if opts.magic:
3676
      cmd.append("--magic=%s" % opts.magic)
3677

    
3678
    if exp_size is not None:
3679
      cmd.append("--expected-size=%s" % exp_size)
3680

    
3681
    if cmd_prefix:
3682
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
3683

    
3684
    if cmd_suffix:
3685
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
3686

    
3687
    if mode == constants.IEM_EXPORT:
3688
      # Retry connection a few times when connecting to remote peer
3689
      cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3690
      cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3691
    elif opts.connect_timeout is not None:
3692
      assert mode == constants.IEM_IMPORT
3693
      # Overall timeout for establishing connection while listening
3694
      cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3695

    
3696
    logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3697

    
3698
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3699
    # support for receiving a file descriptor for output
3700
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3701
                      output=logfile)
3702

    
3703
    # The import/export name is simply the status directory name
3704
    return os.path.basename(status_dir)
3705

    
3706
  except Exception:
3707
    shutil.rmtree(status_dir, ignore_errors=True)
3708
    raise
3709

    
3710

    
3711
def GetImportExportStatus(names):
3712
  """Returns import/export daemon status.
3713

3714
  @type names: sequence
3715
  @param names: List of names
3716
  @rtype: List of dicts
3717
  @return: Returns a list of the state of each named import/export or None if a
3718
           status couldn't be read
3719

3720
  """
3721
  result = []
3722

    
3723
  for name in names:
3724
    status_file = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name,
3725
                                 _IES_STATUS_FILE)
3726

    
3727
    try:
3728
      data = utils.ReadFile(status_file)
3729
    except EnvironmentError, err:
3730
      if err.errno != errno.ENOENT:
3731
        raise
3732
      data = None
3733

    
3734
    if not data:
3735
      result.append(None)
3736
      continue
3737

    
3738
    result.append(serializer.LoadJson(data))
3739

    
3740
  return result
3741

    
3742

    
3743
def AbortImportExport(name):
3744
  """Sends SIGTERM to a running import/export daemon.
3745

3746
  """
3747
  logging.info("Abort import/export %s", name)
3748

    
3749
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3750
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3751

    
3752
  if pid:
3753
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3754
                 name, pid)
3755
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3756

    
3757

    
3758
def CleanupImportExport(name):
3759
  """Cleanup after an import or export.
3760

3761
  If the import/export daemon is still running it's killed. Afterwards the
3762
  whole status directory is removed.
3763

3764
  """
3765
  logging.info("Finalizing import/export %s", name)
3766

    
3767
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3768

    
3769
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3770

    
3771
  if pid:
3772
    logging.info("Import/export %s is still running with PID %s",
3773
                 name, pid)
3774
    utils.KillProcess(pid, waitpid=False)
3775

    
3776
  shutil.rmtree(status_dir, ignore_errors=True)
3777

    
3778

    
3779
def _FindDisks(nodes_ip, disks):
3780
  """Sets the physical ID on disks and returns the block devices.
3781

3782
  """
3783
  # set the correct physical ID
3784
  my_name = netutils.Hostname.GetSysName()
3785
  for cf in disks:
3786
    cf.SetPhysicalID(my_name, nodes_ip)
3787

    
3788
  bdevs = []
3789

    
3790
  for cf in disks:
3791
    rd = _RecursiveFindBD(cf)
3792
    if rd is None:
3793
      _Fail("Can't find device %s", cf)
3794
    bdevs.append(rd)
3795
  return bdevs
3796

    
3797

    
3798
def DrbdDisconnectNet(nodes_ip, disks):
3799
  """Disconnects the network on a list of drbd devices.
3800

3801
  """
3802
  bdevs = _FindDisks(nodes_ip, disks)
3803

    
3804
  # disconnect disks
3805
  for rd in bdevs:
3806
    try:
3807
      rd.DisconnectNet()
3808
    except errors.BlockDeviceError, err:
3809
      _Fail("Can't change network configuration to standalone mode: %s",
3810
            err, exc=True)
3811

    
3812

    
3813
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3814
  """Attaches the network on a list of drbd devices.
3815

3816
  """
3817
  bdevs = _FindDisks(nodes_ip, disks)
3818

    
3819
  if multimaster:
3820
    for idx, rd in enumerate(bdevs):
3821
      try:
3822
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3823
      except EnvironmentError, err:
3824
        _Fail("Can't create symlink: %s", err)
3825
  # reconnect disks, switch to new master configuration and if
3826
  # needed primary mode
3827
  for rd in bdevs:
3828
    try:
3829
      rd.AttachNet(multimaster)
3830
    except errors.BlockDeviceError, err:
3831
      _Fail("Can't change network configuration: %s", err)
3832

    
3833
  # wait until the disks are connected; we need to retry the re-attach
3834
  # if the device becomes standalone, as this might happen if the one
3835
  # node disconnects and reconnects in a different mode before the
3836
  # other node reconnects; in this case, one or both of the nodes will
3837
  # decide it has wrong configuration and switch to standalone
3838

    
3839
  def _Attach():
3840
    all_connected = True
3841

    
3842
    for rd in bdevs:
3843
      stats = rd.GetProcStatus()
3844

    
3845
      all_connected = (all_connected and
3846
                       (stats.is_connected or stats.is_in_resync))
3847

    
3848
      if stats.is_standalone:
3849
        # peer had different config info and this node became
3850
        # standalone, even though this should not happen with the
3851
        # new staged way of changing disk configs
3852
        try:
3853
          rd.AttachNet(multimaster)
3854
        except errors.BlockDeviceError, err:
3855
          _Fail("Can't change network configuration: %s", err)
3856

    
3857
    if not all_connected:
3858
      raise utils.RetryAgain()
3859

    
3860
  try:
3861
    # Start with a delay of 100 miliseconds and go up to 5 seconds
3862
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3863
  except utils.RetryTimeout:
3864
    _Fail("Timeout in disk reconnecting")
3865

    
3866
  if multimaster:
3867
    # change to primary mode
3868
    for rd in bdevs:
3869
      try:
3870
        rd.Open()
3871
      except errors.BlockDeviceError, err:
3872
        _Fail("Can't change to primary mode: %s", err)
3873

    
3874

    
3875
def DrbdWaitSync(nodes_ip, disks):
3876
  """Wait until DRBDs have synchronized.
3877

3878
  """
3879
  def _helper(rd):
3880
    stats = rd.GetProcStatus()
3881
    if not (stats.is_connected or stats.is_in_resync):
3882
      raise utils.RetryAgain()
3883
    return stats
3884

    
3885
  bdevs = _FindDisks(nodes_ip, disks)
3886

    
3887
  min_resync = 100
3888
  alldone = True
3889
  for rd in bdevs:
3890
    try:
3891
      # poll each second for 15 seconds
3892
      stats = utils.Retry(_helper, 1, 15, args=[rd])
3893
    except utils.RetryTimeout:
3894
      stats = rd.GetProcStatus()
3895
      # last check
3896
      if not (stats.is_connected or stats.is_in_resync):
3897
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3898
    alldone = alldone and (not stats.is_in_resync)
3899
    if stats.sync_percent is not None:
3900
      min_resync = min(min_resync, stats.sync_percent)
3901

    
3902
  return (alldone, min_resync)
3903

    
3904

    
3905
def GetDrbdUsermodeHelper():
3906
  """Returns DRBD usermode helper currently configured.
3907

3908
  """
3909
  try:
3910
    return drbd.DRBD8.GetUsermodeHelper()
3911
  except errors.BlockDeviceError, err:
3912
    _Fail(str(err))
3913

    
3914

    
3915
def PowercycleNode(hypervisor_type):
3916
  """Hard-powercycle the node.
3917

3918
  Because we need to return first, and schedule the powercycle in the
3919
  background, we won't be able to report failures nicely.
3920

3921
  """
3922
  hyper = hypervisor.GetHypervisor(hypervisor_type)
3923
  try:
3924
    pid = os.fork()
3925
  except OSError:
3926
    # if we can't fork, we'll pretend that we're in the child process
3927
    pid = 0
3928
  if pid > 0:
3929
    return "Reboot scheduled in 5 seconds"
3930
  # ensure the child is running on ram
3931
  try:
3932
    utils.Mlockall()
3933
  except Exception: # pylint: disable=W0703
3934
    pass
3935
  time.sleep(5)
3936
  hyper.PowercycleNode()
3937

    
3938

    
3939
def _VerifyRestrictedCmdName(cmd):
3940
  """Verifies a restricted command name.
3941

3942
  @type cmd: string
3943
  @param cmd: Command name
3944
  @rtype: tuple; (boolean, string or None)
3945
  @return: The tuple's first element is the status; if C{False}, the second
3946
    element is an error message string, otherwise it's C{None}
3947

3948
  """
3949
  if not cmd.strip():
3950
    return (False, "Missing command name")
3951

    
3952
  if os.path.basename(cmd) != cmd:
3953
    return (False, "Invalid command name")
3954

    
3955
  if not constants.EXT_PLUGIN_MASK.match(cmd):
3956
    return (False, "Command name contains forbidden characters")
3957

    
3958
  return (True, None)
3959

    
3960

    
3961
def _CommonRestrictedCmdCheck(path, owner):
3962
  """Common checks for restricted command file system directories and files.
3963

3964
  @type path: string
3965
  @param path: Path to check
3966
  @param owner: C{None} or tuple containing UID and GID
3967
  @rtype: tuple; (boolean, string or C{os.stat} result)
3968
  @return: The tuple's first element is the status; if C{False}, the second
3969
    element is an error message string, otherwise it's the result of C{os.stat}
3970

3971
  """
3972
  if owner is None:
3973
    # Default to root as owner
3974
    owner = (0, 0)
3975

    
3976
  try:
3977
    st = os.stat(path)
3978
  except EnvironmentError, err:
3979
    return (False, "Can't stat(2) '%s': %s" % (path, err))
3980

    
3981
  if stat.S_IMODE(st.st_mode) & (~_RCMD_MAX_MODE):
3982
    return (False, "Permissions on '%s' are too permissive" % path)
3983

    
3984
  if (st.st_uid, st.st_gid) != owner:
3985
    (owner_uid, owner_gid) = owner
3986
    return (False, "'%s' is not owned by %s:%s" % (path, owner_uid, owner_gid))
3987

    
3988
  return (True, st)
3989

    
3990

    
3991
def _VerifyRestrictedCmdDirectory(path, _owner=None):
3992
  """Verifies restricted command directory.
3993

3994
  @type path: string
3995
  @param path: Path to check
3996
  @rtype: tuple; (boolean, string or None)
3997
  @return: The tuple's first element is the status; if C{False}, the second
3998
    element is an error message string, otherwise it's C{None}
3999

4000
  """
4001
  (status, value) = _CommonRestrictedCmdCheck(path, _owner)
4002

    
4003
  if not status:
4004
    return (False, value)
4005

    
4006
  if not stat.S_ISDIR(value.st_mode):
4007
    return (False, "Path '%s' is not a directory" % path)
4008

    
4009
  return (True, None)
4010

    
4011

    
4012
def _VerifyRestrictedCmd(path, cmd, _owner=None):
4013
  """Verifies a whole restricted command and returns its executable filename.
4014

4015
  @type path: string
4016
  @param path: Directory containing restricted commands
4017
  @type cmd: string
4018
  @param cmd: Command name
4019
  @rtype: tuple; (boolean, string)
4020
  @return: The tuple's first element is the status; if C{False}, the second
4021
    element is an error message string, otherwise the second element is the
4022
    absolute path to the executable
4023

4024
  """
4025
  executable = utils.PathJoin(path, cmd)
4026

    
4027
  (status, msg) = _CommonRestrictedCmdCheck(executable, _owner)
4028

    
4029
  if not status:
4030
    return (False, msg)
4031

    
4032
  if not utils.IsExecutable(executable):
4033
    return (False, "access(2) thinks '%s' can't be executed" % executable)
4034

    
4035
  return (True, executable)
4036

    
4037

    
4038
def _PrepareRestrictedCmd(path, cmd,
4039
                          _verify_dir=_VerifyRestrictedCmdDirectory,
4040
                          _verify_name=_VerifyRestrictedCmdName,
4041
                          _verify_cmd=_VerifyRestrictedCmd):
4042
  """Performs a number of tests on a restricted command.
4043

4044
  @type path: string
4045
  @param path: Directory containing restricted commands
4046
  @type cmd: string
4047
  @param cmd: Command name
4048
  @return: Same as L{_VerifyRestrictedCmd}
4049

4050
  """
4051
  # Verify the directory first
4052
  (status, msg) = _verify_dir(path)
4053
  if status:
4054
    # Check command if everything was alright
4055
    (status, msg) = _verify_name(cmd)
4056

    
4057
  if not status:
4058
    return (False, msg)
4059

    
4060
  # Check actual executable
4061
  return _verify_cmd(path, cmd)
4062

    
4063

    
4064
def RunRestrictedCmd(cmd,
4065
                     _lock_timeout=_RCMD_LOCK_TIMEOUT,
4066
                     _lock_file=pathutils.RESTRICTED_COMMANDS_LOCK_FILE,
4067
                     _path=pathutils.RESTRICTED_COMMANDS_DIR,
4068
                     _sleep_fn=time.sleep,
4069
                     _prepare_fn=_PrepareRestrictedCmd,
4070
                     _runcmd_fn=utils.RunCmd,
4071
                     _enabled=constants.ENABLE_RESTRICTED_COMMANDS):
4072
  """Executes a restricted command after performing strict tests.
4073

4074
  @type cmd: string
4075
  @param cmd: Command name
4076
  @rtype: string
4077
  @return: Command output
4078
  @raise RPCFail: In case of an error
4079

4080
  """
4081
  logging.info("Preparing to run restricted command '%s'", cmd)
4082

    
4083
  if not _enabled:
4084
    _Fail("Restricted commands disabled at configure time")
4085

    
4086
  lock = None
4087
  try:
4088
    cmdresult = None
4089
    try:
4090
      lock = utils.FileLock.Open(_lock_file)
4091
      lock.Exclusive(blocking=True, timeout=_lock_timeout)
4092

    
4093
      (status, value) = _prepare_fn(_path, cmd)
4094

    
4095
      if status:
4096
        cmdresult = _runcmd_fn([value], env={}, reset_env=True,
4097
                               postfork_fn=lambda _: lock.Unlock())
4098
      else:
4099
        logging.error(value)
4100
    except Exception: # pylint: disable=W0703
4101
      # Keep original error in log
4102
      logging.exception("Caught exception")
4103

    
4104
    if cmdresult is None:
4105
      logging.info("Sleeping for %0.1f seconds before returning",
4106
                   _RCMD_INVALID_DELAY)
4107
      _sleep_fn(_RCMD_INVALID_DELAY)
4108

    
4109
      # Do not include original error message in returned error
4110
      _Fail("Executing command '%s' failed" % cmd)
4111
    elif cmdresult.failed or cmdresult.fail_reason:
4112
      _Fail("Restricted command '%s' failed: %s; output: %s",
4113
            cmd, cmdresult.fail_reason, cmdresult.output)
4114
    else:
4115
      return cmdresult.output
4116
  finally:
4117
    if lock is not None:
4118
      # Release lock at last
4119
      lock.Close()
4120
      lock = None
4121

    
4122

    
4123
def SetWatcherPause(until, _filename=pathutils.WATCHER_PAUSEFILE):
4124
  """Creates or removes the watcher pause file.
4125

4126
  @type until: None or number
4127
  @param until: Unix timestamp saying until when the watcher shouldn't run
4128

4129
  """
4130
  if until is None:
4131
    logging.info("Received request to no longer pause watcher")
4132
    utils.RemoveFile(_filename)
4133
  else:
4134
    logging.info("Received request to pause watcher until %s", until)
4135

    
4136
    if not ht.TNumber(until):
4137
      _Fail("Duration must be numeric")
4138

    
4139
    utils.WriteFile(_filename, data="%d\n" % (until, ), mode=0644)
4140

    
4141

    
4142
class HooksRunner(object):
4143
  """Hook runner.
4144

4145
  This class is instantiated on the node side (ganeti-noded) and not
4146
  on the master side.
4147

4148
  """
4149
  def __init__(self, hooks_base_dir=None):
4150
    """Constructor for hooks runner.
4151

4152
    @type hooks_base_dir: str or None
4153
    @param hooks_base_dir: if not None, this overrides the
4154
        L{pathutils.HOOKS_BASE_DIR} (useful for unittests)
4155

4156
    """
4157
    if hooks_base_dir is None:
4158
      hooks_base_dir = pathutils.HOOKS_BASE_DIR
4159
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
4160
    # constant
4161
    self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
4162

    
4163
  def RunLocalHooks(self, node_list, hpath, phase, env):
4164
    """Check that the hooks will be run only locally and then run them.
4165

4166
    """
4167
    assert len(node_list) == 1
4168
    node = node_list[0]
4169
    _, myself = ssconf.GetMasterAndMyself()
4170
    assert node == myself
4171

    
4172
    results = self.RunHooks(hpath, phase, env)
4173

    
4174
    # Return values in the form expected by HooksMaster
4175
    return {node: (None, False, results)}
4176

    
4177
  def RunHooks(self, hpath, phase, env):
4178
    """Run the scripts in the hooks directory.
4179

4180
    @type hpath: str
4181
    @param hpath: the path to the hooks directory which
4182
        holds the scripts
4183
    @type phase: str
4184
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
4185
        L{constants.HOOKS_PHASE_POST}
4186
    @type env: dict
4187
    @param env: dictionary with the environment for the hook
4188
    @rtype: list
4189
    @return: list of 3-element tuples:
4190
      - script path
4191
      - script result, either L{constants.HKR_SUCCESS} or
4192
        L{constants.HKR_FAIL}
4193
      - output of the script
4194

4195
    @raise errors.ProgrammerError: for invalid input
4196
        parameters
4197

4198
    """
4199
    if phase == constants.HOOKS_PHASE_PRE:
4200
      suffix = "pre"
4201
    elif phase == constants.HOOKS_PHASE_POST:
4202
      suffix = "post"
4203
    else:
4204
      _Fail("Unknown hooks phase '%s'", phase)
4205

    
4206
    subdir = "%s-%s.d" % (hpath, suffix)
4207
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
4208

    
4209
    results = []
4210

    
4211
    if not os.path.isdir(dir_name):
4212
      # for non-existing/non-dirs, we simply exit instead of logging a
4213
      # warning at every operation
4214
      return results
4215

    
4216
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
4217

    
4218
    for (relname, relstatus, runresult) in runparts_results:
4219
      if relstatus == constants.RUNPARTS_SKIP:
4220
        rrval = constants.HKR_SKIP
4221
        output = ""
4222
      elif relstatus == constants.RUNPARTS_ERR:
4223
        rrval = constants.HKR_FAIL
4224
        output = "Hook script execution error: %s" % runresult
4225
      elif relstatus == constants.RUNPARTS_RUN:
4226
        if runresult.failed:
4227
          rrval = constants.HKR_FAIL
4228
        else:
4229
          rrval = constants.HKR_SUCCESS
4230
        output = utils.SafeEncode(runresult.output.strip())
4231
      results.append(("%s/%s" % (subdir, relname), rrval, output))
4232

    
4233
    return results
4234

    
4235

    
4236
class IAllocatorRunner(object):
4237
  """IAllocator runner.
4238

4239
  This class is instantiated on the node side (ganeti-noded) and not on
4240
  the master side.
4241

4242
  """
4243
  @staticmethod
4244
  def Run(name, idata):
4245
    """Run an iallocator script.
4246

4247
    @type name: str
4248
    @param name: the iallocator script name
4249
    @type idata: str
4250
    @param idata: the allocator input data
4251

4252
    @rtype: tuple
4253
    @return: two element tuple of:
4254
       - status
4255
       - either error message or stdout of allocator (for success)
4256

4257
    """
4258
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
4259
                                  os.path.isfile)
4260
    if alloc_script is None:
4261
      _Fail("iallocator module '%s' not found in the search path", name)
4262

    
4263
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
4264
    try:
4265
      os.write(fd, idata)
4266
      os.close(fd)
4267
      result = utils.RunCmd([alloc_script, fin_name])
4268
      if result.failed:
4269
        _Fail("iallocator module '%s' failed: %s, output '%s'",
4270
              name, result.fail_reason, result.output)
4271
    finally:
4272
      os.unlink(fin_name)
4273

    
4274
    return result.stdout
4275

    
4276

    
4277
class DevCacheManager(object):
4278
  """Simple class for managing a cache of block device information.
4279

4280
  """
4281
  _DEV_PREFIX = "/dev/"
4282
  _ROOT_DIR = pathutils.BDEV_CACHE_DIR
4283

    
4284
  @classmethod
4285
  def _ConvertPath(cls, dev_path):
4286
    """Converts a /dev/name path to the cache file name.
4287

4288
    This replaces slashes with underscores and strips the /dev
4289
    prefix. It then returns the full path to the cache file.
4290

4291
    @type dev_path: str
4292
    @param dev_path: the C{/dev/} path name
4293
    @rtype: str
4294
    @return: the converted path name
4295

4296
    """
4297
    if dev_path.startswith(cls._DEV_PREFIX):
4298
      dev_path = dev_path[len(cls._DEV_PREFIX):]
4299
    dev_path = dev_path.replace("/", "_")
4300
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
4301
    return fpath
4302

    
4303
  @classmethod
4304
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
4305
    """Updates the cache information for a given device.
4306

4307
    @type dev_path: str
4308
    @param dev_path: the pathname of the device
4309
    @type owner: str
4310
    @param owner: the owner (instance name) of the device
4311
    @type on_primary: bool
4312
    @param on_primary: whether this is the primary
4313
        node nor not
4314
    @type iv_name: str
4315
    @param iv_name: the instance-visible name of the
4316
        device, as in objects.Disk.iv_name
4317

4318
    @rtype: None
4319

4320
    """
4321
    if dev_path is None:
4322
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
4323
      return
4324
    fpath = cls._ConvertPath(dev_path)
4325
    if on_primary:
4326
      state = "primary"
4327
    else:
4328
      state = "secondary"
4329
    if iv_name is None:
4330
      iv_name = "not_visible"
4331
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
4332
    try:
4333
      utils.WriteFile(fpath, data=fdata)
4334
    except EnvironmentError, err:
4335
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
4336

    
4337
  @classmethod
4338
  def RemoveCache(cls, dev_path):
4339
    """Remove data for a dev_path.
4340

4341
    This is just a wrapper over L{utils.io.RemoveFile} with a converted
4342
    path name and logging.
4343

4344
    @type dev_path: str
4345
    @param dev_path: the pathname of the device
4346

4347
    @rtype: None
4348

4349
    """
4350
    if dev_path is None:
4351
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
4352
      return
4353
    fpath = cls._ConvertPath(dev_path)
4354
    try:
4355
      utils.RemoveFile(fpath)
4356
    except EnvironmentError, err:
4357
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)