Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 0bbec3af

History | View | Annotate | Download (133.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable=E1103
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37

    
38
import os
39
import os.path
40
import shutil
41
import time
42
import stat
43
import errno
44
import re
45
import random
46
import logging
47
import tempfile
48
import zlib
49
import base64
50
import signal
51

    
52
from ganeti import errors
53
from ganeti import utils
54
from ganeti import ssh
55
from ganeti import hypervisor
56
from ganeti import constants
57
from ganeti.storage import bdev
58
from ganeti.storage import drbd
59
from ganeti import objects
60
from ganeti import ssconf
61
from ganeti import serializer
62
from ganeti import netutils
63
from ganeti import runtime
64
from ganeti import compat
65
from ganeti import pathutils
66
from ganeti import vcluster
67
from ganeti import ht
68
from ganeti.storage.base import BlockDev
69
from ganeti.storage.drbd import DRBD8
70
from ganeti import hooksmaster
71

    
72

    
73
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
74
_ALLOWED_CLEAN_DIRS = compat.UniqueFrozenset([
75
  pathutils.DATA_DIR,
76
  pathutils.JOB_QUEUE_ARCHIVE_DIR,
77
  pathutils.QUEUE_DIR,
78
  pathutils.CRYPTO_KEYS_DIR,
79
  ])
80
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
81
_X509_KEY_FILE = "key"
82
_X509_CERT_FILE = "cert"
83
_IES_STATUS_FILE = "status"
84
_IES_PID_FILE = "pid"
85
_IES_CA_FILE = "ca"
86

    
87
#: Valid LVS output line regex
88
_LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$")
89

    
90
# Actions for the master setup script
91
_MASTER_START = "start"
92
_MASTER_STOP = "stop"
93

    
94
#: Maximum file permissions for restricted command directory and executables
95
_RCMD_MAX_MODE = (stat.S_IRWXU |
96
                  stat.S_IRGRP | stat.S_IXGRP |
97
                  stat.S_IROTH | stat.S_IXOTH)
98

    
99
#: Delay before returning an error for restricted commands
100
_RCMD_INVALID_DELAY = 10
101

    
102
#: How long to wait to acquire lock for restricted commands (shorter than
103
#: L{_RCMD_INVALID_DELAY}) to reduce blockage of noded forks when many
104
#: command requests arrive
105
_RCMD_LOCK_TIMEOUT = _RCMD_INVALID_DELAY * 0.8
106

    
107

    
108
class RPCFail(Exception):
109
  """Class denoting RPC failure.
110

111
  Its argument is the error message.
112

113
  """
114

    
115

    
116
def _GetInstReasonFilename(instance_name):
117
  """Path of the file containing the reason of the instance status change.
118

119
  @type instance_name: string
120
  @param instance_name: The name of the instance
121
  @rtype: string
122
  @return: The path of the file
123

124
  """
125
  return utils.PathJoin(pathutils.INSTANCE_REASON_DIR, instance_name)
126

    
127

    
128
def _StoreInstReasonTrail(instance_name, trail):
129
  """Serialize a reason trail related to an instance change of state to file.
130

131
  The exact location of the file depends on the name of the instance and on
132
  the configuration of the Ganeti cluster defined at deploy time.
133

134
  @type instance_name: string
135
  @param instance_name: The name of the instance
136
  @rtype: None
137

138
  """
139
  json = serializer.DumpJson(trail)
140
  filename = _GetInstReasonFilename(instance_name)
141
  utils.WriteFile(filename, data=json)
142

    
143

    
144
def _Fail(msg, *args, **kwargs):
145
  """Log an error and the raise an RPCFail exception.
146

147
  This exception is then handled specially in the ganeti daemon and
148
  turned into a 'failed' return type. As such, this function is a
149
  useful shortcut for logging the error and returning it to the master
150
  daemon.
151

152
  @type msg: string
153
  @param msg: the text of the exception
154
  @raise RPCFail
155

156
  """
157
  if args:
158
    msg = msg % args
159
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
160
    if "exc" in kwargs and kwargs["exc"]:
161
      logging.exception(msg)
162
    else:
163
      logging.error(msg)
164
  raise RPCFail(msg)
165

    
166

    
167
def _GetConfig():
168
  """Simple wrapper to return a SimpleStore.
169

170
  @rtype: L{ssconf.SimpleStore}
171
  @return: a SimpleStore instance
172

173
  """
174
  return ssconf.SimpleStore()
175

    
176

    
177
def _GetSshRunner(cluster_name):
178
  """Simple wrapper to return an SshRunner.
179

180
  @type cluster_name: str
181
  @param cluster_name: the cluster name, which is needed
182
      by the SshRunner constructor
183
  @rtype: L{ssh.SshRunner}
184
  @return: an SshRunner instance
185

186
  """
187
  return ssh.SshRunner(cluster_name)
188

    
189

    
190
def _Decompress(data):
191
  """Unpacks data compressed by the RPC client.
192

193
  @type data: list or tuple
194
  @param data: Data sent by RPC client
195
  @rtype: str
196
  @return: Decompressed data
197

198
  """
199
  assert isinstance(data, (list, tuple))
200
  assert len(data) == 2
201
  (encoding, content) = data
202
  if encoding == constants.RPC_ENCODING_NONE:
203
    return content
204
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
205
    return zlib.decompress(base64.b64decode(content))
206
  else:
207
    raise AssertionError("Unknown data encoding")
208

    
209

    
210
def _CleanDirectory(path, exclude=None):
211
  """Removes all regular files in a directory.
212

213
  @type path: str
214
  @param path: the directory to clean
215
  @type exclude: list
216
  @param exclude: list of files to be excluded, defaults
217
      to the empty list
218

219
  """
220
  if path not in _ALLOWED_CLEAN_DIRS:
221
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
222
          path)
223

    
224
  if not os.path.isdir(path):
225
    return
226
  if exclude is None:
227
    exclude = []
228
  else:
229
    # Normalize excluded paths
230
    exclude = [os.path.normpath(i) for i in exclude]
231

    
232
  for rel_name in utils.ListVisibleFiles(path):
233
    full_name = utils.PathJoin(path, rel_name)
234
    if full_name in exclude:
235
      continue
236
    if os.path.isfile(full_name) and not os.path.islink(full_name):
237
      utils.RemoveFile(full_name)
238

    
239

    
240
def _BuildUploadFileList():
241
  """Build the list of allowed upload files.
242

243
  This is abstracted so that it's built only once at module import time.
244

245
  """
246
  allowed_files = set([
247
    pathutils.CLUSTER_CONF_FILE,
248
    pathutils.ETC_HOSTS,
249
    pathutils.SSH_KNOWN_HOSTS_FILE,
250
    pathutils.VNC_PASSWORD_FILE,
251
    pathutils.RAPI_CERT_FILE,
252
    pathutils.SPICE_CERT_FILE,
253
    pathutils.SPICE_CACERT_FILE,
254
    pathutils.RAPI_USERS_FILE,
255
    pathutils.CONFD_HMAC_KEY,
256
    pathutils.CLUSTER_DOMAIN_SECRET_FILE,
257
    ])
258

    
259
  for hv_name in constants.HYPER_TYPES:
260
    hv_class = hypervisor.GetHypervisorClass(hv_name)
261
    allowed_files.update(hv_class.GetAncillaryFiles()[0])
262

    
263
  assert pathutils.FILE_STORAGE_PATHS_FILE not in allowed_files, \
264
    "Allowed file storage paths should never be uploaded via RPC"
265

    
266
  return frozenset(allowed_files)
267

    
268

    
269
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
270

    
271

    
272
def JobQueuePurge():
273
  """Removes job queue files and archived jobs.
274

275
  @rtype: tuple
276
  @return: True, None
277

278
  """
279
  _CleanDirectory(pathutils.QUEUE_DIR, exclude=[pathutils.JOB_QUEUE_LOCK_FILE])
280
  _CleanDirectory(pathutils.JOB_QUEUE_ARCHIVE_DIR)
281

    
282

    
283
def GetMasterInfo():
284
  """Returns master information.
285

286
  This is an utility function to compute master information, either
287
  for consumption here or from the node daemon.
288

289
  @rtype: tuple
290
  @return: master_netdev, master_ip, master_name, primary_ip_family,
291
    master_netmask
292
  @raise RPCFail: in case of errors
293

294
  """
295
  try:
296
    cfg = _GetConfig()
297
    master_netdev = cfg.GetMasterNetdev()
298
    master_ip = cfg.GetMasterIP()
299
    master_netmask = cfg.GetMasterNetmask()
300
    master_node = cfg.GetMasterNode()
301
    primary_ip_family = cfg.GetPrimaryIPFamily()
302
  except errors.ConfigurationError, err:
303
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
304
  return (master_netdev, master_ip, master_node, primary_ip_family,
305
          master_netmask)
306

    
307

    
308
def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
309
  """Decorator that runs hooks before and after the decorated function.
310

311
  @type hook_opcode: string
312
  @param hook_opcode: opcode of the hook
313
  @type hooks_path: string
314
  @param hooks_path: path of the hooks
315
  @type env_builder_fn: function
316
  @param env_builder_fn: function that returns a dictionary containing the
317
    environment variables for the hooks. Will get all the parameters of the
318
    decorated function.
319
  @raise RPCFail: in case of pre-hook failure
320

321
  """
322
  def decorator(fn):
323
    def wrapper(*args, **kwargs):
324
      _, myself = ssconf.GetMasterAndMyself()
325
      nodes = ([myself], [myself])  # these hooks run locally
326

    
327
      env_fn = compat.partial(env_builder_fn, *args, **kwargs)
328

    
329
      cfg = _GetConfig()
330
      hr = HooksRunner()
331
      hm = hooksmaster.HooksMaster(hook_opcode, hooks_path, nodes,
332
                                   hr.RunLocalHooks, None, env_fn,
333
                                   logging.warning, cfg.GetClusterName(),
334
                                   cfg.GetMasterNode())
335
      hm.RunPhase(constants.HOOKS_PHASE_PRE)
336
      result = fn(*args, **kwargs)
337
      hm.RunPhase(constants.HOOKS_PHASE_POST)
338

    
339
      return result
340
    return wrapper
341
  return decorator
342

    
343

    
344
def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
345
  """Builds environment variables for master IP hooks.
346

347
  @type master_params: L{objects.MasterNetworkParameters}
348
  @param master_params: network parameters of the master
349
  @type use_external_mip_script: boolean
350
  @param use_external_mip_script: whether to use an external master IP
351
    address setup script (unused, but necessary per the implementation of the
352
    _RunLocalHooks decorator)
353

354
  """
355
  # pylint: disable=W0613
356
  ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
357
  env = {
358
    "MASTER_NETDEV": master_params.netdev,
359
    "MASTER_IP": master_params.ip,
360
    "MASTER_NETMASK": str(master_params.netmask),
361
    "CLUSTER_IP_VERSION": str(ver),
362
  }
363

    
364
  return env
365

    
366

    
367
def _RunMasterSetupScript(master_params, action, use_external_mip_script):
368
  """Execute the master IP address setup script.
369

370
  @type master_params: L{objects.MasterNetworkParameters}
371
  @param master_params: network parameters of the master
372
  @type action: string
373
  @param action: action to pass to the script. Must be one of
374
    L{backend._MASTER_START} or L{backend._MASTER_STOP}
375
  @type use_external_mip_script: boolean
376
  @param use_external_mip_script: whether to use an external master IP
377
    address setup script
378
  @raise backend.RPCFail: if there are errors during the execution of the
379
    script
380

381
  """
382
  env = _BuildMasterIpEnv(master_params)
383

    
384
  if use_external_mip_script:
385
    setup_script = pathutils.EXTERNAL_MASTER_SETUP_SCRIPT
386
  else:
387
    setup_script = pathutils.DEFAULT_MASTER_SETUP_SCRIPT
388

    
389
  result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
390

    
391
  if result.failed:
392
    _Fail("Failed to %s the master IP. Script return value: %s, output: '%s'" %
393
          (action, result.exit_code, result.output), log=True)
394

    
395

    
396
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
397
               _BuildMasterIpEnv)
398
def ActivateMasterIp(master_params, use_external_mip_script):
399
  """Activate the IP address of the master daemon.
400

401
  @type master_params: L{objects.MasterNetworkParameters}
402
  @param master_params: network parameters of the master
403
  @type use_external_mip_script: boolean
404
  @param use_external_mip_script: whether to use an external master IP
405
    address setup script
406
  @raise RPCFail: in case of errors during the IP startup
407

408
  """
409
  _RunMasterSetupScript(master_params, _MASTER_START,
410
                        use_external_mip_script)
411

    
412

    
413
def StartMasterDaemons(no_voting):
414
  """Activate local node as master node.
415

416
  The function will start the master daemons (ganeti-masterd and ganeti-rapi).
417

418
  @type no_voting: boolean
419
  @param no_voting: whether to start ganeti-masterd without a node vote
420
      but still non-interactively
421
  @rtype: None
422

423
  """
424

    
425
  if no_voting:
426
    masterd_args = "--no-voting --yes-do-it"
427
  else:
428
    masterd_args = ""
429

    
430
  env = {
431
    "EXTRA_MASTERD_ARGS": masterd_args,
432
    }
433

    
434
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
435
  if result.failed:
436
    msg = "Can't start Ganeti master: %s" % result.output
437
    logging.error(msg)
438
    _Fail(msg)
439

    
440

    
441
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
442
               _BuildMasterIpEnv)
443
def DeactivateMasterIp(master_params, use_external_mip_script):
444
  """Deactivate the master IP on this node.
445

446
  @type master_params: L{objects.MasterNetworkParameters}
447
  @param master_params: network parameters of the master
448
  @type use_external_mip_script: boolean
449
  @param use_external_mip_script: whether to use an external master IP
450
    address setup script
451
  @raise RPCFail: in case of errors during the IP turndown
452

453
  """
454
  _RunMasterSetupScript(master_params, _MASTER_STOP,
455
                        use_external_mip_script)
456

    
457

    
458
def StopMasterDaemons():
459
  """Stop the master daemons on this node.
460

461
  Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
462

463
  @rtype: None
464

465
  """
466
  # TODO: log and report back to the caller the error failures; we
467
  # need to decide in which case we fail the RPC for this
468

    
469
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
470
  if result.failed:
471
    logging.error("Could not stop Ganeti master, command %s had exitcode %s"
472
                  " and error %s",
473
                  result.cmd, result.exit_code, result.output)
474

    
475

    
476
def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
477
  """Change the netmask of the master IP.
478

479
  @param old_netmask: the old value of the netmask
480
  @param netmask: the new value of the netmask
481
  @param master_ip: the master IP
482
  @param master_netdev: the master network device
483

484
  """
485
  if old_netmask == netmask:
486
    return
487

    
488
  if not netutils.IPAddress.Own(master_ip):
489
    _Fail("The master IP address is not up, not attempting to change its"
490
          " netmask")
491

    
492
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
493
                         "%s/%s" % (master_ip, netmask),
494
                         "dev", master_netdev, "label",
495
                         "%s:0" % master_netdev])
496
  if result.failed:
497
    _Fail("Could not set the new netmask on the master IP address")
498

    
499
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
500
                         "%s/%s" % (master_ip, old_netmask),
501
                         "dev", master_netdev, "label",
502
                         "%s:0" % master_netdev])
503
  if result.failed:
504
    _Fail("Could not bring down the master IP address with the old netmask")
505

    
506

    
507
def EtcHostsModify(mode, host, ip):
508
  """Modify a host entry in /etc/hosts.
509

510
  @param mode: The mode to operate. Either add or remove entry
511
  @param host: The host to operate on
512
  @param ip: The ip associated with the entry
513

514
  """
515
  if mode == constants.ETC_HOSTS_ADD:
516
    if not ip:
517
      RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
518
              " present")
519
    utils.AddHostToEtcHosts(host, ip)
520
  elif mode == constants.ETC_HOSTS_REMOVE:
521
    if ip:
522
      RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
523
              " parameter is present")
524
    utils.RemoveHostFromEtcHosts(host)
525
  else:
526
    RPCFail("Mode not supported")
527

    
528

    
529
def LeaveCluster(modify_ssh_setup):
530
  """Cleans up and remove the current node.
531

532
  This function cleans up and prepares the current node to be removed
533
  from the cluster.
534

535
  If processing is successful, then it raises an
536
  L{errors.QuitGanetiException} which is used as a special case to
537
  shutdown the node daemon.
538

539
  @param modify_ssh_setup: boolean
540

541
  """
542
  _CleanDirectory(pathutils.DATA_DIR)
543
  _CleanDirectory(pathutils.CRYPTO_KEYS_DIR)
544
  JobQueuePurge()
545

    
546
  if modify_ssh_setup:
547
    try:
548
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
549

    
550
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
551

    
552
      utils.RemoveFile(priv_key)
553
      utils.RemoveFile(pub_key)
554
    except errors.OpExecError:
555
      logging.exception("Error while processing ssh files")
556

    
557
  try:
558
    utils.RemoveFile(pathutils.CONFD_HMAC_KEY)
559
    utils.RemoveFile(pathutils.RAPI_CERT_FILE)
560
    utils.RemoveFile(pathutils.SPICE_CERT_FILE)
561
    utils.RemoveFile(pathutils.SPICE_CACERT_FILE)
562
    utils.RemoveFile(pathutils.NODED_CERT_FILE)
563
  except: # pylint: disable=W0702
564
    logging.exception("Error while removing cluster secrets")
565

    
566
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.CONFD])
567
  if result.failed:
568
    logging.error("Command %s failed with exitcode %s and error %s",
569
                  result.cmd, result.exit_code, result.output)
570

    
571
  # Raise a custom exception (handled in ganeti-noded)
572
  raise errors.QuitGanetiException(True, "Shutdown scheduled")
573

    
574

    
575
def _GetVgInfo(name, excl_stor):
576
  """Retrieves information about a LVM volume group.
577

578
  """
579
  # TODO: GetVGInfo supports returning information for multiple VGs at once
580
  vginfo = bdev.LogicalVolume.GetVGInfo([name], excl_stor)
581
  if vginfo:
582
    vg_free = int(round(vginfo[0][0], 0))
583
    vg_size = int(round(vginfo[0][1], 0))
584
  else:
585
    vg_free = None
586
    vg_size = None
587

    
588
  return {
589
    "name": name,
590
    "vg_free": vg_free,
591
    "vg_size": vg_size,
592
    }
593

    
594

    
595
def _GetVgSpindlesInfo(name, excl_stor):
596
  """Retrieves information about spindles in an LVM volume group.
597

598
  @type name: string
599
  @param name: VG name
600
  @type excl_stor: bool
601
  @param excl_stor: exclusive storage
602
  @rtype: dict
603
  @return: dictionary whose keys are "name", "vg_free", "vg_size" for VG name,
604
      free spindles, total spindles respectively
605

606
  """
607
  if excl_stor:
608
    (vg_free, vg_size) = bdev.LogicalVolume.GetVgSpindlesInfo(name)
609
  else:
610
    vg_free = 0
611
    vg_size = 0
612
  return {
613
    "name": name,
614
    "vg_free": vg_free,
615
    "vg_size": vg_size,
616
    }
617

    
618

    
619
def _GetHvInfo(name, hvparams, get_hv_fn=hypervisor.GetHypervisor):
620
  """Retrieves node information from a hypervisor.
621

622
  The information returned depends on the hypervisor. Common items:
623

624
    - vg_size is the size of the configured volume group in MiB
625
    - vg_free is the free size of the volume group in MiB
626
    - memory_dom0 is the memory allocated for domain0 in MiB
627
    - memory_free is the currently available (free) ram in MiB
628
    - memory_total is the total number of ram in MiB
629
    - hv_version: the hypervisor version, if available
630

631
  @type hvparams: dict of string
632
  @param hvparams: the hypervisor's hvparams
633

634
  """
635
  return get_hv_fn(name).GetNodeInfo(hvparams=hvparams)
636

    
637

    
638
def _GetHvInfoAll(hv_specs, get_hv_fn=hypervisor.GetHypervisor):
639
  """Retrieves node information for all hypervisors.
640

641
  See C{_GetHvInfo} for information on the output.
642

643
  @type hv_specs: list of pairs (string, dict of strings)
644
  @param hv_specs: list of pairs of a hypervisor's name and its hvparams
645

646
  """
647
  if hv_specs is None:
648
    return None
649

    
650
  result = []
651
  for hvname, hvparams in hv_specs:
652
    result.append(_GetHvInfo(hvname, hvparams, get_hv_fn))
653
  return result
654

    
655

    
656
def _GetNamedNodeInfo(names, fn):
657
  """Calls C{fn} for all names in C{names} and returns a dictionary.
658

659
  @rtype: None or dict
660

661
  """
662
  if names is None:
663
    return None
664
  else:
665
    return map(fn, names)
666

    
667

    
668
def GetNodeInfo(storage_units, hv_specs, excl_stor):
669
  """Gives back a hash with different information about the node.
670

671
  @type storage_units: list of pairs (string, string)
672
  @param storage_units: List of pairs (storage unit, identifier) to ask for disk
673
                        space information. In case of lvm-vg, the identifier is
674
                        the VG name.
675
  @type hv_specs: list of pairs (string, dict of strings)
676
  @param hv_specs: list of pairs of a hypervisor's name and its hvparams
677
  @type excl_stor: boolean
678
  @param excl_stor: Whether exclusive_storage is active
679
  @rtype: tuple; (string, None/dict, None/dict)
680
  @return: Tuple containing boot ID, volume group information and hypervisor
681
    information
682

683
  """
684
  bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
685
  storage_info = _GetNamedNodeInfo(
686
    storage_units,
687
    (lambda storage_unit: _ApplyStorageInfoFunction(storage_unit[0],
688
                                                    storage_unit[1],
689
                                                    excl_stor)))
690
  hv_info = _GetHvInfoAll(hv_specs)
691
  return (bootid, storage_info, hv_info)
692

    
693

    
694
# FIXME: implement storage reporting for all missing storage types.
695
_STORAGE_TYPE_INFO_FN = {
696
  constants.ST_BLOCK: None,
697
  constants.ST_DISKLESS: None,
698
  constants.ST_EXT: None,
699
  constants.ST_FILE: None,
700
  constants.ST_LVM_PV: _GetVgSpindlesInfo,
701
  constants.ST_LVM_VG: _GetVgInfo,
702
  constants.ST_RADOS: None,
703
}
704

    
705

    
706
def _ApplyStorageInfoFunction(storage_type, storage_key, *args):
707
  """Looks up and applies the correct function to calculate free and total
708
  storage for the given storage type.
709

710
  @type storage_type: string
711
  @param storage_type: the storage type for which the storage shall be reported.
712
  @type storage_key: string
713
  @param storage_key: identifier of a storage unit, e.g. the volume group name
714
    of an LVM storage unit
715
  @type args: any
716
  @param args: various parameters that can be used for storage reporting. These
717
    parameters and their semantics vary from storage type to storage type and
718
    are just propagated in this function.
719
  @return: the results of the application of the storage space function (see
720
    _STORAGE_TYPE_INFO_FN) if storage space reporting is implemented for that
721
    storage type
722
  @raises NotImplementedError: for storage types who don't support space
723
    reporting yet
724
  """
725
  fn = _STORAGE_TYPE_INFO_FN[storage_type]
726
  if fn is not None:
727
    return fn(storage_key, *args)
728
  else:
729
    raise NotImplementedError
730

    
731

    
732
def _CheckExclusivePvs(pvi_list):
733
  """Check that PVs are not shared among LVs
734

735
  @type pvi_list: list of L{objects.LvmPvInfo} objects
736
  @param pvi_list: information about the PVs
737

738
  @rtype: list of tuples (string, list of strings)
739
  @return: offending volumes, as tuples: (pv_name, [lv1_name, lv2_name...])
740

741
  """
742
  res = []
743
  for pvi in pvi_list:
744
    if len(pvi.lv_list) > 1:
745
      res.append((pvi.name, pvi.lv_list))
746
  return res
747

    
748

    
749
def _VerifyHypervisors(what, vm_capable, result, all_hvparams,
750
                       get_hv_fn=hypervisor.GetHypervisor):
751
  """Verifies the hypervisor. Appends the results to the 'results' list.
752

753
  @type what: C{dict}
754
  @param what: a dictionary of things to check
755
  @type vm_capable: boolean
756
  @param vm_capable: whether or not this node is vm capable
757
  @type result: dict
758
  @param result: dictionary of verification results; results of the
759
    verifications in this function will be added here
760
  @type all_hvparams: dict of dict of string
761
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
762
  @type get_hv_fn: function
763
  @param get_hv_fn: function to retrieve the hypervisor, to improve testability
764

765
  """
766
  if not vm_capable:
767
    return
768

    
769
  if constants.NV_HYPERVISOR in what:
770
    result[constants.NV_HYPERVISOR] = {}
771
    for hv_name in what[constants.NV_HYPERVISOR]:
772
      hvparams = all_hvparams[hv_name]
773
      try:
774
        val = get_hv_fn(hv_name).Verify(hvparams=hvparams)
775
      except errors.HypervisorError, err:
776
        val = "Error while checking hypervisor: %s" % str(err)
777
      result[constants.NV_HYPERVISOR][hv_name] = val
778

    
779

    
780
def _VerifyHvparams(what, vm_capable, result,
781
                    get_hv_fn=hypervisor.GetHypervisor):
782
  """Verifies the hvparams. Appends the results to the 'results' list.
783

784
  @type what: C{dict}
785
  @param what: a dictionary of things to check
786
  @type vm_capable: boolean
787
  @param vm_capable: whether or not this node is vm capable
788
  @type result: dict
789
  @param result: dictionary of verification results; results of the
790
    verifications in this function will be added here
791
  @type get_hv_fn: function
792
  @param get_hv_fn: function to retrieve the hypervisor, to improve testability
793

794
  """
795
  if not vm_capable:
796
    return
797

    
798
  if constants.NV_HVPARAMS in what:
799
    result[constants.NV_HVPARAMS] = []
800
    for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
801
      try:
802
        logging.info("Validating hv %s, %s", hv_name, hvparms)
803
        get_hv_fn(hv_name).ValidateParameters(hvparms)
804
      except errors.HypervisorError, err:
805
        result[constants.NV_HVPARAMS].append((source, hv_name, str(err)))
806

    
807

    
808
def _VerifyInstanceList(what, vm_capable, result, all_hvparams):
809
  """Verifies the instance list.
810

811
  @type what: C{dict}
812
  @param what: a dictionary of things to check
813
  @type vm_capable: boolean
814
  @param vm_capable: whether or not this node is vm capable
815
  @type result: dict
816
  @param result: dictionary of verification results; results of the
817
    verifications in this function will be added here
818
  @type all_hvparams: dict of dict of string
819
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
820

821
  """
822
  if constants.NV_INSTANCELIST in what and vm_capable:
823
    # GetInstanceList can fail
824
    try:
825
      val = GetInstanceList(what[constants.NV_INSTANCELIST],
826
                            all_hvparams=all_hvparams)
827
    except RPCFail, err:
828
      val = str(err)
829
    result[constants.NV_INSTANCELIST] = val
830

    
831

    
832
def _VerifyNodeInfo(what, vm_capable, result, all_hvparams):
833
  """Verifies the node info.
834

835
  @type what: C{dict}
836
  @param what: a dictionary of things to check
837
  @type vm_capable: boolean
838
  @param vm_capable: whether or not this node is vm capable
839
  @type result: dict
840
  @param result: dictionary of verification results; results of the
841
    verifications in this function will be added here
842
  @type all_hvparams: dict of dict of string
843
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
844

845
  """
846
  if constants.NV_HVINFO in what and vm_capable:
847
    hvname = what[constants.NV_HVINFO]
848
    hyper = hypervisor.GetHypervisor(hvname)
849
    hvparams = all_hvparams[hvname]
850
    result[constants.NV_HVINFO] = hyper.GetNodeInfo(hvparams=hvparams)
851

    
852

    
853
def VerifyNode(what, cluster_name, all_hvparams):
854
  """Verify the status of the local node.
855

856
  Based on the input L{what} parameter, various checks are done on the
857
  local node.
858

859
  If the I{filelist} key is present, this list of
860
  files is checksummed and the file/checksum pairs are returned.
861

862
  If the I{nodelist} key is present, we check that we have
863
  connectivity via ssh with the target nodes (and check the hostname
864
  report).
865

866
  If the I{node-net-test} key is present, we check that we have
867
  connectivity to the given nodes via both primary IP and, if
868
  applicable, secondary IPs.
869

870
  @type what: C{dict}
871
  @param what: a dictionary of things to check:
872
      - filelist: list of files for which to compute checksums
873
      - nodelist: list of nodes we should check ssh communication with
874
      - node-net-test: list of nodes we should check node daemon port
875
        connectivity with
876
      - hypervisor: list with hypervisors to run the verify for
877
  @type cluster_name: string
878
  @param cluster_name: the cluster's name
879
  @type all_hvparams: dict of dict of strings
880
  @param all_hvparams: a dictionary mapping hypervisor names to hvparams
881
  @rtype: dict
882
  @return: a dictionary with the same keys as the input dict, and
883
      values representing the result of the checks
884

885
  """
886
  result = {}
887
  my_name = netutils.Hostname.GetSysName()
888
  port = netutils.GetDaemonPort(constants.NODED)
889
  vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
890

    
891
  _VerifyHypervisors(what, vm_capable, result, all_hvparams)
892
  _VerifyHvparams(what, vm_capable, result)
893

    
894
  if constants.NV_FILELIST in what:
895
    fingerprints = utils.FingerprintFiles(map(vcluster.LocalizeVirtualPath,
896
                                              what[constants.NV_FILELIST]))
897
    result[constants.NV_FILELIST] = \
898
      dict((vcluster.MakeVirtualPath(key), value)
899
           for (key, value) in fingerprints.items())
900

    
901
  if constants.NV_NODELIST in what:
902
    (nodes, bynode) = what[constants.NV_NODELIST]
903

    
904
    # Add nodes from other groups (different for each node)
905
    try:
906
      nodes.extend(bynode[my_name])
907
    except KeyError:
908
      pass
909

    
910
    # Use a random order
911
    random.shuffle(nodes)
912

    
913
    # Try to contact all nodes
914
    val = {}
915
    for node in nodes:
916
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
917
      if not success:
918
        val[node] = message
919

    
920
    result[constants.NV_NODELIST] = val
921

    
922
  if constants.NV_NODENETTEST in what:
923
    result[constants.NV_NODENETTEST] = tmp = {}
924
    my_pip = my_sip = None
925
    for name, pip, sip in what[constants.NV_NODENETTEST]:
926
      if name == my_name:
927
        my_pip = pip
928
        my_sip = sip
929
        break
930
    if not my_pip:
931
      tmp[my_name] = ("Can't find my own primary/secondary IP"
932
                      " in the node list")
933
    else:
934
      for name, pip, sip in what[constants.NV_NODENETTEST]:
935
        fail = []
936
        if not netutils.TcpPing(pip, port, source=my_pip):
937
          fail.append("primary")
938
        if sip != pip:
939
          if not netutils.TcpPing(sip, port, source=my_sip):
940
            fail.append("secondary")
941
        if fail:
942
          tmp[name] = ("failure using the %s interface(s)" %
943
                       " and ".join(fail))
944

    
945
  if constants.NV_MASTERIP in what:
946
    # FIXME: add checks on incoming data structures (here and in the
947
    # rest of the function)
948
    master_name, master_ip = what[constants.NV_MASTERIP]
949
    if master_name == my_name:
950
      source = constants.IP4_ADDRESS_LOCALHOST
951
    else:
952
      source = None
953
    result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
954
                                                     source=source)
955

    
956
  if constants.NV_USERSCRIPTS in what:
957
    result[constants.NV_USERSCRIPTS] = \
958
      [script for script in what[constants.NV_USERSCRIPTS]
959
       if not utils.IsExecutable(script)]
960

    
961
  if constants.NV_OOB_PATHS in what:
962
    result[constants.NV_OOB_PATHS] = tmp = []
963
    for path in what[constants.NV_OOB_PATHS]:
964
      try:
965
        st = os.stat(path)
966
      except OSError, err:
967
        tmp.append("error stating out of band helper: %s" % err)
968
      else:
969
        if stat.S_ISREG(st.st_mode):
970
          if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
971
            tmp.append(None)
972
          else:
973
            tmp.append("out of band helper %s is not executable" % path)
974
        else:
975
          tmp.append("out of band helper %s is not a file" % path)
976

    
977
  if constants.NV_LVLIST in what and vm_capable:
978
    try:
979
      val = GetVolumeList(utils.ListVolumeGroups().keys())
980
    except RPCFail, err:
981
      val = str(err)
982
    result[constants.NV_LVLIST] = val
983

    
984
  _VerifyInstanceList(what, vm_capable, result, all_hvparams)
985

    
986
  if constants.NV_VGLIST in what and vm_capable:
987
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
988

    
989
  if constants.NV_PVLIST in what and vm_capable:
990
    check_exclusive_pvs = constants.NV_EXCLUSIVEPVS in what
991
    val = bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
992
                                       filter_allocatable=False,
993
                                       include_lvs=check_exclusive_pvs)
994
    if check_exclusive_pvs:
995
      result[constants.NV_EXCLUSIVEPVS] = _CheckExclusivePvs(val)
996
      for pvi in val:
997
        # Avoid sending useless data on the wire
998
        pvi.lv_list = []
999
    result[constants.NV_PVLIST] = map(objects.LvmPvInfo.ToDict, val)
1000

    
1001
  if constants.NV_VERSION in what:
1002
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
1003
                                    constants.RELEASE_VERSION)
1004

    
1005
  _VerifyNodeInfo(what, vm_capable, result, all_hvparams)
1006

    
1007
  if constants.NV_DRBDVERSION in what and vm_capable:
1008
    try:
1009
      drbd_version = DRBD8.GetProcInfo().GetVersionString()
1010
    except errors.BlockDeviceError, err:
1011
      logging.warning("Can't get DRBD version", exc_info=True)
1012
      drbd_version = str(err)
1013
    result[constants.NV_DRBDVERSION] = drbd_version
1014

    
1015
  if constants.NV_DRBDLIST in what and vm_capable:
1016
    try:
1017
      used_minors = drbd.DRBD8.GetUsedDevs()
1018
    except errors.BlockDeviceError, err:
1019
      logging.warning("Can't get used minors list", exc_info=True)
1020
      used_minors = str(err)
1021
    result[constants.NV_DRBDLIST] = used_minors
1022

    
1023
  if constants.NV_DRBDHELPER in what and vm_capable:
1024
    status = True
1025
    try:
1026
      payload = drbd.DRBD8.GetUsermodeHelper()
1027
    except errors.BlockDeviceError, err:
1028
      logging.error("Can't get DRBD usermode helper: %s", str(err))
1029
      status = False
1030
      payload = str(err)
1031
    result[constants.NV_DRBDHELPER] = (status, payload)
1032

    
1033
  if constants.NV_NODESETUP in what:
1034
    result[constants.NV_NODESETUP] = tmpr = []
1035
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
1036
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
1037
                  " under /sys, missing required directories /sys/block"
1038
                  " and /sys/class/net")
1039
    if (not os.path.isdir("/proc/sys") or
1040
        not os.path.isfile("/proc/sysrq-trigger")):
1041
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
1042
                  " under /proc, missing required directory /proc/sys and"
1043
                  " the file /proc/sysrq-trigger")
1044

    
1045
  if constants.NV_TIME in what:
1046
    result[constants.NV_TIME] = utils.SplitTime(time.time())
1047

    
1048
  if constants.NV_OSLIST in what and vm_capable:
1049
    result[constants.NV_OSLIST] = DiagnoseOS()
1050

    
1051
  if constants.NV_BRIDGES in what and vm_capable:
1052
    result[constants.NV_BRIDGES] = [bridge
1053
                                    for bridge in what[constants.NV_BRIDGES]
1054
                                    if not utils.BridgeExists(bridge)]
1055

    
1056
  if what.get(constants.NV_FILE_STORAGE_PATHS) == my_name:
1057
    result[constants.NV_FILE_STORAGE_PATHS] = \
1058
      bdev.ComputeWrongFileStoragePaths()
1059

    
1060
  return result
1061

    
1062

    
1063
def GetBlockDevSizes(devices):
1064
  """Return the size of the given block devices
1065

1066
  @type devices: list
1067
  @param devices: list of block device nodes to query
1068
  @rtype: dict
1069
  @return:
1070
    dictionary of all block devices under /dev (key). The value is their
1071
    size in MiB.
1072

1073
    {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
1074

1075
  """
1076
  DEV_PREFIX = "/dev/"
1077
  blockdevs = {}
1078

    
1079
  for devpath in devices:
1080
    if not utils.IsBelowDir(DEV_PREFIX, devpath):
1081
      continue
1082

    
1083
    try:
1084
      st = os.stat(devpath)
1085
    except EnvironmentError, err:
1086
      logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
1087
      continue
1088

    
1089
    if stat.S_ISBLK(st.st_mode):
1090
      result = utils.RunCmd(["blockdev", "--getsize64", devpath])
1091
      if result.failed:
1092
        # We don't want to fail, just do not list this device as available
1093
        logging.warning("Cannot get size for block device %s", devpath)
1094
        continue
1095

    
1096
      size = int(result.stdout) / (1024 * 1024)
1097
      blockdevs[devpath] = size
1098
  return blockdevs
1099

    
1100

    
1101
def GetVolumeList(vg_names):
1102
  """Compute list of logical volumes and their size.
1103

1104
  @type vg_names: list
1105
  @param vg_names: the volume groups whose LVs we should list, or
1106
      empty for all volume groups
1107
  @rtype: dict
1108
  @return:
1109
      dictionary of all partions (key) with value being a tuple of
1110
      their size (in MiB), inactive and online status::
1111

1112
        {'xenvg/test1': ('20.06', True, True)}
1113

1114
      in case of errors, a string is returned with the error
1115
      details.
1116

1117
  """
1118
  lvs = {}
1119
  sep = "|"
1120
  if not vg_names:
1121
    vg_names = []
1122
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
1123
                         "--separator=%s" % sep,
1124
                         "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
1125
  if result.failed:
1126
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
1127

    
1128
  for line in result.stdout.splitlines():
1129
    line = line.strip()
1130
    match = _LVSLINE_REGEX.match(line)
1131
    if not match:
1132
      logging.error("Invalid line returned from lvs output: '%s'", line)
1133
      continue
1134
    vg_name, name, size, attr = match.groups()
1135
    inactive = attr[4] == "-"
1136
    online = attr[5] == "o"
1137
    virtual = attr[0] == "v"
1138
    if virtual:
1139
      # we don't want to report such volumes as existing, since they
1140
      # don't really hold data
1141
      continue
1142
    lvs[vg_name + "/" + name] = (size, inactive, online)
1143

    
1144
  return lvs
1145

    
1146

    
1147
def ListVolumeGroups():
1148
  """List the volume groups and their size.
1149

1150
  @rtype: dict
1151
  @return: dictionary with keys volume name and values the
1152
      size of the volume
1153

1154
  """
1155
  return utils.ListVolumeGroups()
1156

    
1157

    
1158
def NodeVolumes():
1159
  """List all volumes on this node.
1160

1161
  @rtype: list
1162
  @return:
1163
    A list of dictionaries, each having four keys:
1164
      - name: the logical volume name,
1165
      - size: the size of the logical volume
1166
      - dev: the physical device on which the LV lives
1167
      - vg: the volume group to which it belongs
1168

1169
    In case of errors, we return an empty list and log the
1170
    error.
1171

1172
    Note that since a logical volume can live on multiple physical
1173
    volumes, the resulting list might include a logical volume
1174
    multiple times.
1175

1176
  """
1177
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
1178
                         "--separator=|",
1179
                         "--options=lv_name,lv_size,devices,vg_name"])
1180
  if result.failed:
1181
    _Fail("Failed to list logical volumes, lvs output: %s",
1182
          result.output)
1183

    
1184
  def parse_dev(dev):
1185
    return dev.split("(")[0]
1186

    
1187
  def handle_dev(dev):
1188
    return [parse_dev(x) for x in dev.split(",")]
1189

    
1190
  def map_line(line):
1191
    line = [v.strip() for v in line]
1192
    return [{"name": line[0], "size": line[1],
1193
             "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
1194

    
1195
  all_devs = []
1196
  for line in result.stdout.splitlines():
1197
    if line.count("|") >= 3:
1198
      all_devs.extend(map_line(line.split("|")))
1199
    else:
1200
      logging.warning("Strange line in the output from lvs: '%s'", line)
1201
  return all_devs
1202

    
1203

    
1204
def BridgesExist(bridges_list):
1205
  """Check if a list of bridges exist on the current node.
1206

1207
  @rtype: boolean
1208
  @return: C{True} if all of them exist, C{False} otherwise
1209

1210
  """
1211
  missing = []
1212
  for bridge in bridges_list:
1213
    if not utils.BridgeExists(bridge):
1214
      missing.append(bridge)
1215

    
1216
  if missing:
1217
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
1218

    
1219

    
1220
def GetInstanceListForHypervisor(hname, hvparams=None,
1221
                                 get_hv_fn=hypervisor.GetHypervisor):
1222
  """Provides a list of instances of the given hypervisor.
1223

1224
  @type hname: string
1225
  @param hname: name of the hypervisor
1226
  @type hvparams: dict of strings
1227
  @param hvparams: hypervisor parameters for the given hypervisor
1228
  @type get_hv_fn: function
1229
  @param get_hv_fn: function that returns a hypervisor for the given hypervisor
1230
    name; optional parameter to increase testability
1231

1232
  @rtype: list
1233
  @return: a list of all running instances on the current node
1234
    - instance1.example.com
1235
    - instance2.example.com
1236

1237
  """
1238
  results = []
1239
  try:
1240
    hv = get_hv_fn(hname)
1241
    names = hv.ListInstances(hvparams=hvparams)
1242
    results.extend(names)
1243
  except errors.HypervisorError, err:
1244
    _Fail("Error enumerating instances (hypervisor %s): %s",
1245
          hname, err, exc=True)
1246
  return results
1247

    
1248

    
1249
def GetInstanceList(hypervisor_list, all_hvparams=None,
1250
                    get_hv_fn=hypervisor.GetHypervisor):
1251
  """Provides a list of instances.
1252

1253
  @type hypervisor_list: list
1254
  @param hypervisor_list: the list of hypervisors to query information
1255
  @type all_hvparams: dict of dict of strings
1256
  @param all_hvparams: a dictionary mapping hypervisor types to respective
1257
    cluster-wide hypervisor parameters
1258
  @type get_hv_fn: function
1259
  @param get_hv_fn: function that returns a hypervisor for the given hypervisor
1260
    name; optional parameter to increase testability
1261

1262
  @rtype: list
1263
  @return: a list of all running instances on the current node
1264
    - instance1.example.com
1265
    - instance2.example.com
1266

1267
  """
1268
  results = []
1269
  for hname in hypervisor_list:
1270
    hvparams = all_hvparams[hname]
1271
    results.extend(GetInstanceListForHypervisor(hname, hvparams=hvparams,
1272
                                                get_hv_fn=get_hv_fn))
1273
  return results
1274

    
1275

    
1276
def GetInstanceInfo(instance, hname, hvparams=None):
1277
  """Gives back the information about an instance as a dictionary.
1278

1279
  @type instance: string
1280
  @param instance: the instance name
1281
  @type hname: string
1282
  @param hname: the hypervisor type of the instance
1283
  @type hvparams: dict of strings
1284
  @param hvparams: the instance's hvparams
1285

1286
  @rtype: dict
1287
  @return: dictionary with the following keys:
1288
      - memory: memory size of instance (int)
1289
      - state: xen state of instance (string)
1290
      - time: cpu time of instance (float)
1291
      - vcpus: the number of vcpus (int)
1292

1293
  """
1294
  output = {}
1295

    
1296
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance,
1297
                                                          hvparams=hvparams)
1298
  if iinfo is not None:
1299
    output["memory"] = iinfo[2]
1300
    output["vcpus"] = iinfo[3]
1301
    output["state"] = iinfo[4]
1302
    output["time"] = iinfo[5]
1303

    
1304
  return output
1305

    
1306

    
1307
def GetInstanceMigratable(instance):
1308
  """Computes whether an instance can be migrated.
1309

1310
  @type instance: L{objects.Instance}
1311
  @param instance: object representing the instance to be checked.
1312

1313
  @rtype: tuple
1314
  @return: tuple of (result, description) where:
1315
      - result: whether the instance can be migrated or not
1316
      - description: a description of the issue, if relevant
1317

1318
  """
1319
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1320
  iname = instance.name
1321
  if iname not in hyper.ListInstances(instance.hvparams):
1322
    _Fail("Instance %s is not running", iname)
1323

    
1324
  for idx in range(len(instance.disks)):
1325
    link_name = _GetBlockDevSymlinkPath(iname, idx)
1326
    if not os.path.islink(link_name):
1327
      logging.warning("Instance %s is missing symlink %s for disk %d",
1328
                      iname, link_name, idx)
1329

    
1330

    
1331
def GetAllInstancesInfo(hypervisor_list):
1332
  """Gather data about all instances.
1333

1334
  This is the equivalent of L{GetInstanceInfo}, except that it
1335
  computes data for all instances at once, thus being faster if one
1336
  needs data about more than one instance.
1337

1338
  @type hypervisor_list: list
1339
  @param hypervisor_list: list of hypervisors to query for instance data
1340

1341
  @rtype: dict
1342
  @return: dictionary of instance: data, with data having the following keys:
1343
      - memory: memory size of instance (int)
1344
      - state: xen state of instance (string)
1345
      - time: cpu time of instance (float)
1346
      - vcpus: the number of vcpus
1347

1348
  """
1349
  output = {}
1350

    
1351
  for hname in hypervisor_list:
1352
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
1353
    if iinfo:
1354
      for name, _, memory, vcpus, state, times in iinfo:
1355
        value = {
1356
          "memory": memory,
1357
          "vcpus": vcpus,
1358
          "state": state,
1359
          "time": times,
1360
          }
1361
        if name in output:
1362
          # we only check static parameters, like memory and vcpus,
1363
          # and not state and time which can change between the
1364
          # invocations of the different hypervisors
1365
          for key in "memory", "vcpus":
1366
            if value[key] != output[name][key]:
1367
              _Fail("Instance %s is running twice"
1368
                    " with different parameters", name)
1369
        output[name] = value
1370

    
1371
  return output
1372

    
1373

    
1374
def _InstanceLogName(kind, os_name, instance, component):
1375
  """Compute the OS log filename for a given instance and operation.
1376

1377
  The instance name and os name are passed in as strings since not all
1378
  operations have these as part of an instance object.
1379

1380
  @type kind: string
1381
  @param kind: the operation type (e.g. add, import, etc.)
1382
  @type os_name: string
1383
  @param os_name: the os name
1384
  @type instance: string
1385
  @param instance: the name of the instance being imported/added/etc.
1386
  @type component: string or None
1387
  @param component: the name of the component of the instance being
1388
      transferred
1389

1390
  """
1391
  # TODO: Use tempfile.mkstemp to create unique filename
1392
  if component:
1393
    assert "/" not in component
1394
    c_msg = "-%s" % component
1395
  else:
1396
    c_msg = ""
1397
  base = ("%s-%s-%s%s-%s.log" %
1398
          (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1399
  return utils.PathJoin(pathutils.LOG_OS_DIR, base)
1400

    
1401

    
1402
def InstanceOsAdd(instance, reinstall, debug):
1403
  """Add an OS to an instance.
1404

1405
  @type instance: L{objects.Instance}
1406
  @param instance: Instance whose OS is to be installed
1407
  @type reinstall: boolean
1408
  @param reinstall: whether this is an instance reinstall
1409
  @type debug: integer
1410
  @param debug: debug level, passed to the OS scripts
1411
  @rtype: None
1412

1413
  """
1414
  inst_os = OSFromDisk(instance.os)
1415

    
1416
  create_env = OSEnvironment(instance, inst_os, debug)
1417
  if reinstall:
1418
    create_env["INSTANCE_REINSTALL"] = "1"
1419

    
1420
  logfile = _InstanceLogName("add", instance.os, instance.name, None)
1421

    
1422
  result = utils.RunCmd([inst_os.create_script], env=create_env,
1423
                        cwd=inst_os.path, output=logfile, reset_env=True)
1424
  if result.failed:
1425
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
1426
                  " output: %s", result.cmd, result.fail_reason, logfile,
1427
                  result.output)
1428
    lines = [utils.SafeEncode(val)
1429
             for val in utils.TailFile(logfile, lines=20)]
1430
    _Fail("OS create script failed (%s), last lines in the"
1431
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1432

    
1433

    
1434
def RunRenameInstance(instance, old_name, debug):
1435
  """Run the OS rename script for an instance.
1436

1437
  @type instance: L{objects.Instance}
1438
  @param instance: Instance whose OS is to be installed
1439
  @type old_name: string
1440
  @param old_name: previous instance name
1441
  @type debug: integer
1442
  @param debug: debug level, passed to the OS scripts
1443
  @rtype: boolean
1444
  @return: the success of the operation
1445

1446
  """
1447
  inst_os = OSFromDisk(instance.os)
1448

    
1449
  rename_env = OSEnvironment(instance, inst_os, debug)
1450
  rename_env["OLD_INSTANCE_NAME"] = old_name
1451

    
1452
  logfile = _InstanceLogName("rename", instance.os,
1453
                             "%s-%s" % (old_name, instance.name), None)
1454

    
1455
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1456
                        cwd=inst_os.path, output=logfile, reset_env=True)
1457

    
1458
  if result.failed:
1459
    logging.error("os create command '%s' returned error: %s output: %s",
1460
                  result.cmd, result.fail_reason, result.output)
1461
    lines = [utils.SafeEncode(val)
1462
             for val in utils.TailFile(logfile, lines=20)]
1463
    _Fail("OS rename script failed (%s), last lines in the"
1464
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1465

    
1466

    
1467
def _GetBlockDevSymlinkPath(instance_name, idx, _dir=None):
1468
  """Returns symlink path for block device.
1469

1470
  """
1471
  if _dir is None:
1472
    _dir = pathutils.DISK_LINKS_DIR
1473

    
1474
  return utils.PathJoin(_dir,
1475
                        ("%s%s%s" %
1476
                         (instance_name, constants.DISK_SEPARATOR, idx)))
1477

    
1478

    
1479
def _SymlinkBlockDev(instance_name, device_path, idx):
1480
  """Set up symlinks to a instance's block device.
1481

1482
  This is an auxiliary function run when an instance is start (on the primary
1483
  node) or when an instance is migrated (on the target node).
1484

1485

1486
  @param instance_name: the name of the target instance
1487
  @param device_path: path of the physical block device, on the node
1488
  @param idx: the disk index
1489
  @return: absolute path to the disk's symlink
1490

1491
  """
1492
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1493
  try:
1494
    os.symlink(device_path, link_name)
1495
  except OSError, err:
1496
    if err.errno == errno.EEXIST:
1497
      if (not os.path.islink(link_name) or
1498
          os.readlink(link_name) != device_path):
1499
        os.remove(link_name)
1500
        os.symlink(device_path, link_name)
1501
    else:
1502
      raise
1503

    
1504
  return link_name
1505

    
1506

    
1507
def _RemoveBlockDevLinks(instance_name, disks):
1508
  """Remove the block device symlinks belonging to the given instance.
1509

1510
  """
1511
  for idx, _ in enumerate(disks):
1512
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1513
    if os.path.islink(link_name):
1514
      try:
1515
        os.remove(link_name)
1516
      except OSError:
1517
        logging.exception("Can't remove symlink '%s'", link_name)
1518

    
1519

    
1520
def _GatherAndLinkBlockDevs(instance):
1521
  """Set up an instance's block device(s).
1522

1523
  This is run on the primary node at instance startup. The block
1524
  devices must be already assembled.
1525

1526
  @type instance: L{objects.Instance}
1527
  @param instance: the instance whose disks we shoul assemble
1528
  @rtype: list
1529
  @return: list of (disk_object, device_path)
1530

1531
  """
1532
  block_devices = []
1533
  for idx, disk in enumerate(instance.disks):
1534
    device = _RecursiveFindBD(disk)
1535
    if device is None:
1536
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1537
                                    str(disk))
1538
    device.Open()
1539
    try:
1540
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1541
    except OSError, e:
1542
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1543
                                    e.strerror)
1544

    
1545
    block_devices.append((disk, link_name))
1546

    
1547
  return block_devices
1548

    
1549

    
1550
def StartInstance(instance, startup_paused, reason, store_reason=True):
1551
  """Start an instance.
1552

1553
  @type instance: L{objects.Instance}
1554
  @param instance: the instance object
1555
  @type startup_paused: bool
1556
  @param instance: pause instance at startup?
1557
  @type reason: list of reasons
1558
  @param reason: the reason trail for this startup
1559
  @type store_reason: boolean
1560
  @param store_reason: whether to store the shutdown reason trail on file
1561
  @rtype: None
1562

1563
  """
1564
  running_instances = GetInstanceListForHypervisor(instance.hypervisor,
1565
                                                   instance.hvparams)
1566

    
1567
  if instance.name in running_instances:
1568
    logging.info("Instance %s already running, not starting", instance.name)
1569
    return
1570

    
1571
  try:
1572
    block_devices = _GatherAndLinkBlockDevs(instance)
1573
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1574
    hyper.StartInstance(instance, block_devices, startup_paused)
1575
    if store_reason:
1576
      _StoreInstReasonTrail(instance.name, reason)
1577
  except errors.BlockDeviceError, err:
1578
    _Fail("Block device error: %s", err, exc=True)
1579
  except errors.HypervisorError, err:
1580
    _RemoveBlockDevLinks(instance.name, instance.disks)
1581
    _Fail("Hypervisor error: %s", err, exc=True)
1582

    
1583

    
1584
def InstanceShutdown(instance, timeout, reason, store_reason=True):
1585
  """Shut an instance down.
1586

1587
  @note: this functions uses polling with a hardcoded timeout.
1588

1589
  @type instance: L{objects.Instance}
1590
  @param instance: the instance object
1591
  @type timeout: integer
1592
  @param timeout: maximum timeout for soft shutdown
1593
  @type reason: list of reasons
1594
  @param reason: the reason trail for this shutdown
1595
  @type store_reason: boolean
1596
  @param store_reason: whether to store the shutdown reason trail on file
1597
  @rtype: None
1598

1599
  """
1600
  hv_name = instance.hypervisor
1601
  hyper = hypervisor.GetHypervisor(hv_name)
1602
  iname = instance.name
1603

    
1604
  if instance.name not in hyper.ListInstances(instance.hvparams):
1605
    logging.info("Instance %s not running, doing nothing", iname)
1606
    return
1607

    
1608
  class _TryShutdown:
1609
    def __init__(self):
1610
      self.tried_once = False
1611

    
1612
    def __call__(self):
1613
      if iname not in hyper.ListInstances(instance.hvparams):
1614
        return
1615

    
1616
      try:
1617
        hyper.StopInstance(instance, retry=self.tried_once)
1618
        if store_reason:
1619
          _StoreInstReasonTrail(instance.name, reason)
1620
      except errors.HypervisorError, err:
1621
        if iname not in hyper.ListInstances(instance.hvparams):
1622
          # if the instance is no longer existing, consider this a
1623
          # success and go to cleanup
1624
          return
1625

    
1626
        _Fail("Failed to stop instance %s: %s", iname, err)
1627

    
1628
      self.tried_once = True
1629

    
1630
      raise utils.RetryAgain()
1631

    
1632
  try:
1633
    utils.Retry(_TryShutdown(), 5, timeout)
1634
  except utils.RetryTimeout:
1635
    # the shutdown did not succeed
1636
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1637

    
1638
    try:
1639
      hyper.StopInstance(instance, force=True)
1640
    except errors.HypervisorError, err:
1641
      if iname in hyper.ListInstances(instance.hvparams):
1642
        # only raise an error if the instance still exists, otherwise
1643
        # the error could simply be "instance ... unknown"!
1644
        _Fail("Failed to force stop instance %s: %s", iname, err)
1645

    
1646
    time.sleep(1)
1647

    
1648
    if iname in hyper.ListInstances(instance.hvparams):
1649
      _Fail("Could not shutdown instance %s even by destroy", iname)
1650

    
1651
  try:
1652
    hyper.CleanupInstance(instance.name)
1653
  except errors.HypervisorError, err:
1654
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1655

    
1656
  _RemoveBlockDevLinks(iname, instance.disks)
1657

    
1658

    
1659
def InstanceReboot(instance, reboot_type, shutdown_timeout, reason):
1660
  """Reboot an instance.
1661

1662
  @type instance: L{objects.Instance}
1663
  @param instance: the instance object to reboot
1664
  @type reboot_type: str
1665
  @param reboot_type: the type of reboot, one the following
1666
    constants:
1667
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1668
        instance OS, do not recreate the VM
1669
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1670
        restart the VM (at the hypervisor level)
1671
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1672
        not accepted here, since that mode is handled differently, in
1673
        cmdlib, and translates into full stop and start of the
1674
        instance (instead of a call_instance_reboot RPC)
1675
  @type shutdown_timeout: integer
1676
  @param shutdown_timeout: maximum timeout for soft shutdown
1677
  @type reason: list of reasons
1678
  @param reason: the reason trail for this reboot
1679
  @rtype: None
1680

1681
  """
1682
  running_instances = GetInstanceListForHypervisor(instance.hypervisor,
1683
                                                   instance.hvparams)
1684

    
1685
  if instance.name not in running_instances:
1686
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1687

    
1688
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1689
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1690
    try:
1691
      hyper.RebootInstance(instance)
1692
    except errors.HypervisorError, err:
1693
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1694
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1695
    try:
1696
      InstanceShutdown(instance, shutdown_timeout, reason, store_reason=False)
1697
      result = StartInstance(instance, False, reason, store_reason=False)
1698
      _StoreInstReasonTrail(instance.name, reason)
1699
      return result
1700
    except errors.HypervisorError, err:
1701
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1702
  else:
1703
    _Fail("Invalid reboot_type received: %s", reboot_type)
1704

    
1705

    
1706
def InstanceBalloonMemory(instance, memory):
1707
  """Resize an instance's memory.
1708

1709
  @type instance: L{objects.Instance}
1710
  @param instance: the instance object
1711
  @type memory: int
1712
  @param memory: new memory amount in MB
1713
  @rtype: None
1714

1715
  """
1716
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1717
  running = hyper.ListInstances(instance.hvparams)
1718
  if instance.name not in running:
1719
    logging.info("Instance %s is not running, cannot balloon", instance.name)
1720
    return
1721
  try:
1722
    hyper.BalloonInstanceMemory(instance, memory)
1723
  except errors.HypervisorError, err:
1724
    _Fail("Failed to balloon instance memory: %s", err, exc=True)
1725

    
1726

    
1727
def MigrationInfo(instance):
1728
  """Gather information about an instance to be migrated.
1729

1730
  @type instance: L{objects.Instance}
1731
  @param instance: the instance definition
1732

1733
  """
1734
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1735
  try:
1736
    info = hyper.MigrationInfo(instance)
1737
  except errors.HypervisorError, err:
1738
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1739
  return info
1740

    
1741

    
1742
def AcceptInstance(instance, info, target):
1743
  """Prepare the node to accept an instance.
1744

1745
  @type instance: L{objects.Instance}
1746
  @param instance: the instance definition
1747
  @type info: string/data (opaque)
1748
  @param info: migration information, from the source node
1749
  @type target: string
1750
  @param target: target host (usually ip), on this node
1751

1752
  """
1753
  # TODO: why is this required only for DTS_EXT_MIRROR?
1754
  if instance.disk_template in constants.DTS_EXT_MIRROR:
1755
    # Create the symlinks, as the disks are not active
1756
    # in any way
1757
    try:
1758
      _GatherAndLinkBlockDevs(instance)
1759
    except errors.BlockDeviceError, err:
1760
      _Fail("Block device error: %s", err, exc=True)
1761

    
1762
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1763
  try:
1764
    hyper.AcceptInstance(instance, info, target)
1765
  except errors.HypervisorError, err:
1766
    if instance.disk_template in constants.DTS_EXT_MIRROR:
1767
      _RemoveBlockDevLinks(instance.name, instance.disks)
1768
    _Fail("Failed to accept instance: %s", err, exc=True)
1769

    
1770

    
1771
def FinalizeMigrationDst(instance, info, success):
1772
  """Finalize any preparation to accept an instance.
1773

1774
  @type instance: L{objects.Instance}
1775
  @param instance: the instance definition
1776
  @type info: string/data (opaque)
1777
  @param info: migration information, from the source node
1778
  @type success: boolean
1779
  @param success: whether the migration was a success or a failure
1780

1781
  """
1782
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1783
  try:
1784
    hyper.FinalizeMigrationDst(instance, info, success)
1785
  except errors.HypervisorError, err:
1786
    _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1787

    
1788

    
1789
def MigrateInstance(instance, target, live):
1790
  """Migrates an instance to another node.
1791

1792
  @type instance: L{objects.Instance}
1793
  @param instance: the instance definition
1794
  @type target: string
1795
  @param target: the target node name
1796
  @type live: boolean
1797
  @param live: whether the migration should be done live or not (the
1798
      interpretation of this parameter is left to the hypervisor)
1799
  @raise RPCFail: if migration fails for some reason
1800

1801
  """
1802
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1803

    
1804
  try:
1805
    hyper.MigrateInstance(instance, target, live)
1806
  except errors.HypervisorError, err:
1807
    _Fail("Failed to migrate instance: %s", err, exc=True)
1808

    
1809

    
1810
def FinalizeMigrationSource(instance, success, live):
1811
  """Finalize the instance migration on the source node.
1812

1813
  @type instance: L{objects.Instance}
1814
  @param instance: the instance definition of the migrated instance
1815
  @type success: bool
1816
  @param success: whether the migration succeeded or not
1817
  @type live: bool
1818
  @param live: whether the user requested a live migration or not
1819
  @raise RPCFail: If the execution fails for some reason
1820

1821
  """
1822
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1823

    
1824
  try:
1825
    hyper.FinalizeMigrationSource(instance, success, live)
1826
  except Exception, err:  # pylint: disable=W0703
1827
    _Fail("Failed to finalize the migration on the source node: %s", err,
1828
          exc=True)
1829

    
1830

    
1831
def GetMigrationStatus(instance):
1832
  """Get the migration status
1833

1834
  @type instance: L{objects.Instance}
1835
  @param instance: the instance that is being migrated
1836
  @rtype: L{objects.MigrationStatus}
1837
  @return: the status of the current migration (one of
1838
           L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
1839
           progress info that can be retrieved from the hypervisor
1840
  @raise RPCFail: If the migration status cannot be retrieved
1841

1842
  """
1843
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1844
  try:
1845
    return hyper.GetMigrationStatus(instance)
1846
  except Exception, err:  # pylint: disable=W0703
1847
    _Fail("Failed to get migration status: %s", err, exc=True)
1848

    
1849

    
1850
def BlockdevCreate(disk, size, owner, on_primary, info, excl_stor):
1851
  """Creates a block device for an instance.
1852

1853
  @type disk: L{objects.Disk}
1854
  @param disk: the object describing the disk we should create
1855
  @type size: int
1856
  @param size: the size of the physical underlying device, in MiB
1857
  @type owner: str
1858
  @param owner: the name of the instance for which disk is created,
1859
      used for device cache data
1860
  @type on_primary: boolean
1861
  @param on_primary:  indicates if it is the primary node or not
1862
  @type info: string
1863
  @param info: string that will be sent to the physical device
1864
      creation, used for example to set (LVM) tags on LVs
1865
  @type excl_stor: boolean
1866
  @param excl_stor: Whether exclusive_storage is active
1867

1868
  @return: the new unique_id of the device (this can sometime be
1869
      computed only after creation), or None. On secondary nodes,
1870
      it's not required to return anything.
1871

1872
  """
1873
  # TODO: remove the obsolete "size" argument
1874
  # pylint: disable=W0613
1875
  clist = []
1876
  if disk.children:
1877
    for child in disk.children:
1878
      try:
1879
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1880
      except errors.BlockDeviceError, err:
1881
        _Fail("Can't assemble device %s: %s", child, err)
1882
      if on_primary or disk.AssembleOnSecondary():
1883
        # we need the children open in case the device itself has to
1884
        # be assembled
1885
        try:
1886
          # pylint: disable=E1103
1887
          crdev.Open()
1888
        except errors.BlockDeviceError, err:
1889
          _Fail("Can't make child '%s' read-write: %s", child, err)
1890
      clist.append(crdev)
1891

    
1892
  try:
1893
    device = bdev.Create(disk, clist, excl_stor)
1894
  except errors.BlockDeviceError, err:
1895
    _Fail("Can't create block device: %s", err)
1896

    
1897
  if on_primary or disk.AssembleOnSecondary():
1898
    try:
1899
      device.Assemble()
1900
    except errors.BlockDeviceError, err:
1901
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1902
    if on_primary or disk.OpenOnSecondary():
1903
      try:
1904
        device.Open(force=True)
1905
      except errors.BlockDeviceError, err:
1906
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1907
    DevCacheManager.UpdateCache(device.dev_path, owner,
1908
                                on_primary, disk.iv_name)
1909

    
1910
  device.SetInfo(info)
1911

    
1912
  return device.unique_id
1913

    
1914

    
1915
def _WipeDevice(path, offset, size):
1916
  """This function actually wipes the device.
1917

1918
  @param path: The path to the device to wipe
1919
  @param offset: The offset in MiB in the file
1920
  @param size: The size in MiB to write
1921

1922
  """
1923
  # Internal sizes are always in Mebibytes; if the following "dd" command
1924
  # should use a different block size the offset and size given to this
1925
  # function must be adjusted accordingly before being passed to "dd".
1926
  block_size = 1024 * 1024
1927

    
1928
  cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1929
         "bs=%s" % block_size, "oflag=direct", "of=%s" % path,
1930
         "count=%d" % size]
1931
  result = utils.RunCmd(cmd)
1932

    
1933
  if result.failed:
1934
    _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1935
          result.fail_reason, result.output)
1936

    
1937

    
1938
def BlockdevWipe(disk, offset, size):
1939
  """Wipes a block device.
1940

1941
  @type disk: L{objects.Disk}
1942
  @param disk: the disk object we want to wipe
1943
  @type offset: int
1944
  @param offset: The offset in MiB in the file
1945
  @type size: int
1946
  @param size: The size in MiB to write
1947

1948
  """
1949
  try:
1950
    rdev = _RecursiveFindBD(disk)
1951
  except errors.BlockDeviceError:
1952
    rdev = None
1953

    
1954
  if not rdev:
1955
    _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1956

    
1957
  # Do cross verify some of the parameters
1958
  if offset < 0:
1959
    _Fail("Negative offset")
1960
  if size < 0:
1961
    _Fail("Negative size")
1962
  if offset > rdev.size:
1963
    _Fail("Offset is bigger than device size")
1964
  if (offset + size) > rdev.size:
1965
    _Fail("The provided offset and size to wipe is bigger than device size")
1966

    
1967
  _WipeDevice(rdev.dev_path, offset, size)
1968

    
1969

    
1970
def BlockdevPauseResumeSync(disks, pause):
1971
  """Pause or resume the sync of the block device.
1972

1973
  @type disks: list of L{objects.Disk}
1974
  @param disks: the disks object we want to pause/resume
1975
  @type pause: bool
1976
  @param pause: Wheater to pause or resume
1977

1978
  """
1979
  success = []
1980
  for disk in disks:
1981
    try:
1982
      rdev = _RecursiveFindBD(disk)
1983
    except errors.BlockDeviceError:
1984
      rdev = None
1985

    
1986
    if not rdev:
1987
      success.append((False, ("Cannot change sync for device %s:"
1988
                              " device not found" % disk.iv_name)))
1989
      continue
1990

    
1991
    result = rdev.PauseResumeSync(pause)
1992

    
1993
    if result:
1994
      success.append((result, None))
1995
    else:
1996
      if pause:
1997
        msg = "Pause"
1998
      else:
1999
        msg = "Resume"
2000
      success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
2001

    
2002
  return success
2003

    
2004

    
2005
def BlockdevRemove(disk):
2006
  """Remove a block device.
2007

2008
  @note: This is intended to be called recursively.
2009

2010
  @type disk: L{objects.Disk}
2011
  @param disk: the disk object we should remove
2012
  @rtype: boolean
2013
  @return: the success of the operation
2014

2015
  """
2016
  msgs = []
2017
  try:
2018
    rdev = _RecursiveFindBD(disk)
2019
  except errors.BlockDeviceError, err:
2020
    # probably can't attach
2021
    logging.info("Can't attach to device %s in remove", disk)
2022
    rdev = None
2023
  if rdev is not None:
2024
    r_path = rdev.dev_path
2025
    try:
2026
      rdev.Remove()
2027
    except errors.BlockDeviceError, err:
2028
      msgs.append(str(err))
2029
    if not msgs:
2030
      DevCacheManager.RemoveCache(r_path)
2031

    
2032
  if disk.children:
2033
    for child in disk.children:
2034
      try:
2035
        BlockdevRemove(child)
2036
      except RPCFail, err:
2037
        msgs.append(str(err))
2038

    
2039
  if msgs:
2040
    _Fail("; ".join(msgs))
2041

    
2042

    
2043
def _RecursiveAssembleBD(disk, owner, as_primary):
2044
  """Activate a block device for an instance.
2045

2046
  This is run on the primary and secondary nodes for an instance.
2047

2048
  @note: this function is called recursively.
2049

2050
  @type disk: L{objects.Disk}
2051
  @param disk: the disk we try to assemble
2052
  @type owner: str
2053
  @param owner: the name of the instance which owns the disk
2054
  @type as_primary: boolean
2055
  @param as_primary: if we should make the block device
2056
      read/write
2057

2058
  @return: the assembled device or None (in case no device
2059
      was assembled)
2060
  @raise errors.BlockDeviceError: in case there is an error
2061
      during the activation of the children or the device
2062
      itself
2063

2064
  """
2065
  children = []
2066
  if disk.children:
2067
    mcn = disk.ChildrenNeeded()
2068
    if mcn == -1:
2069
      mcn = 0 # max number of Nones allowed
2070
    else:
2071
      mcn = len(disk.children) - mcn # max number of Nones
2072
    for chld_disk in disk.children:
2073
      try:
2074
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
2075
      except errors.BlockDeviceError, err:
2076
        if children.count(None) >= mcn:
2077
          raise
2078
        cdev = None
2079
        logging.error("Error in child activation (but continuing): %s",
2080
                      str(err))
2081
      children.append(cdev)
2082

    
2083
  if as_primary or disk.AssembleOnSecondary():
2084
    r_dev = bdev.Assemble(disk, children)
2085
    result = r_dev
2086
    if as_primary or disk.OpenOnSecondary():
2087
      r_dev.Open()
2088
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
2089
                                as_primary, disk.iv_name)
2090

    
2091
  else:
2092
    result = True
2093
  return result
2094

    
2095

    
2096
def BlockdevAssemble(disk, owner, as_primary, idx):
2097
  """Activate a block device for an instance.
2098

2099
  This is a wrapper over _RecursiveAssembleBD.
2100

2101
  @rtype: str or boolean
2102
  @return: a C{/dev/...} path for primary nodes, and
2103
      C{True} for secondary nodes
2104

2105
  """
2106
  try:
2107
    result = _RecursiveAssembleBD(disk, owner, as_primary)
2108
    if isinstance(result, BlockDev):
2109
      # pylint: disable=E1103
2110
      result = result.dev_path
2111
      if as_primary:
2112
        _SymlinkBlockDev(owner, result, idx)
2113
  except errors.BlockDeviceError, err:
2114
    _Fail("Error while assembling disk: %s", err, exc=True)
2115
  except OSError, err:
2116
    _Fail("Error while symlinking disk: %s", err, exc=True)
2117

    
2118
  return result
2119

    
2120

    
2121
def BlockdevShutdown(disk):
2122
  """Shut down a block device.
2123

2124
  First, if the device is assembled (Attach() is successful), then
2125
  the device is shutdown. Then the children of the device are
2126
  shutdown.
2127

2128
  This function is called recursively. Note that we don't cache the
2129
  children or such, as oppossed to assemble, shutdown of different
2130
  devices doesn't require that the upper device was active.
2131

2132
  @type disk: L{objects.Disk}
2133
  @param disk: the description of the disk we should
2134
      shutdown
2135
  @rtype: None
2136

2137
  """
2138
  msgs = []
2139
  r_dev = _RecursiveFindBD(disk)
2140
  if r_dev is not None:
2141
    r_path = r_dev.dev_path
2142
    try:
2143
      r_dev.Shutdown()
2144
      DevCacheManager.RemoveCache(r_path)
2145
    except errors.BlockDeviceError, err:
2146
      msgs.append(str(err))
2147

    
2148
  if disk.children:
2149
    for child in disk.children:
2150
      try:
2151
        BlockdevShutdown(child)
2152
      except RPCFail, err:
2153
        msgs.append(str(err))
2154

    
2155
  if msgs:
2156
    _Fail("; ".join(msgs))
2157

    
2158

    
2159
def BlockdevAddchildren(parent_cdev, new_cdevs):
2160
  """Extend a mirrored block device.
2161

2162
  @type parent_cdev: L{objects.Disk}
2163
  @param parent_cdev: the disk to which we should add children
2164
  @type new_cdevs: list of L{objects.Disk}
2165
  @param new_cdevs: the list of children which we should add
2166
  @rtype: None
2167

2168
  """
2169
  parent_bdev = _RecursiveFindBD(parent_cdev)
2170
  if parent_bdev is None:
2171
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
2172
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
2173
  if new_bdevs.count(None) > 0:
2174
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
2175
  parent_bdev.AddChildren(new_bdevs)
2176

    
2177

    
2178
def BlockdevRemovechildren(parent_cdev, new_cdevs):
2179
  """Shrink a mirrored block device.
2180

2181
  @type parent_cdev: L{objects.Disk}
2182
  @param parent_cdev: the disk from which we should remove children
2183
  @type new_cdevs: list of L{objects.Disk}
2184
  @param new_cdevs: the list of children which we should remove
2185
  @rtype: None
2186

2187
  """
2188
  parent_bdev = _RecursiveFindBD(parent_cdev)
2189
  if parent_bdev is None:
2190
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
2191
  devs = []
2192
  for disk in new_cdevs:
2193
    rpath = disk.StaticDevPath()
2194
    if rpath is None:
2195
      bd = _RecursiveFindBD(disk)
2196
      if bd is None:
2197
        _Fail("Can't find device %s while removing children", disk)
2198
      else:
2199
        devs.append(bd.dev_path)
2200
    else:
2201
      if not utils.IsNormAbsPath(rpath):
2202
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
2203
      devs.append(rpath)
2204
  parent_bdev.RemoveChildren(devs)
2205

    
2206

    
2207
def BlockdevGetmirrorstatus(disks):
2208
  """Get the mirroring status of a list of devices.
2209

2210
  @type disks: list of L{objects.Disk}
2211
  @param disks: the list of disks which we should query
2212
  @rtype: disk
2213
  @return: List of L{objects.BlockDevStatus}, one for each disk
2214
  @raise errors.BlockDeviceError: if any of the disks cannot be
2215
      found
2216

2217
  """
2218
  stats = []
2219
  for dsk in disks:
2220
    rbd = _RecursiveFindBD(dsk)
2221
    if rbd is None:
2222
      _Fail("Can't find device %s", dsk)
2223

    
2224
    stats.append(rbd.CombinedSyncStatus())
2225

    
2226
  return stats
2227

    
2228

    
2229
def BlockdevGetmirrorstatusMulti(disks):
2230
  """Get the mirroring status of a list of devices.
2231

2232
  @type disks: list of L{objects.Disk}
2233
  @param disks: the list of disks which we should query
2234
  @rtype: disk
2235
  @return: List of tuples, (bool, status), one for each disk; bool denotes
2236
    success/failure, status is L{objects.BlockDevStatus} on success, string
2237
    otherwise
2238

2239
  """
2240
  result = []
2241
  for disk in disks:
2242
    try:
2243
      rbd = _RecursiveFindBD(disk)
2244
      if rbd is None:
2245
        result.append((False, "Can't find device %s" % disk))
2246
        continue
2247

    
2248
      status = rbd.CombinedSyncStatus()
2249
    except errors.BlockDeviceError, err:
2250
      logging.exception("Error while getting disk status")
2251
      result.append((False, str(err)))
2252
    else:
2253
      result.append((True, status))
2254

    
2255
  assert len(disks) == len(result)
2256

    
2257
  return result
2258

    
2259

    
2260
def _RecursiveFindBD(disk):
2261
  """Check if a device is activated.
2262

2263
  If so, return information about the real device.
2264

2265
  @type disk: L{objects.Disk}
2266
  @param disk: the disk object we need to find
2267

2268
  @return: None if the device can't be found,
2269
      otherwise the device instance
2270

2271
  """
2272
  children = []
2273
  if disk.children:
2274
    for chdisk in disk.children:
2275
      children.append(_RecursiveFindBD(chdisk))
2276

    
2277
  return bdev.FindDevice(disk, children)
2278

    
2279

    
2280
def _OpenRealBD(disk):
2281
  """Opens the underlying block device of a disk.
2282

2283
  @type disk: L{objects.Disk}
2284
  @param disk: the disk object we want to open
2285

2286
  """
2287
  real_disk = _RecursiveFindBD(disk)
2288
  if real_disk is None:
2289
    _Fail("Block device '%s' is not set up", disk)
2290

    
2291
  real_disk.Open()
2292

    
2293
  return real_disk
2294

    
2295

    
2296
def BlockdevFind(disk):
2297
  """Check if a device is activated.
2298

2299
  If it is, return information about the real device.
2300

2301
  @type disk: L{objects.Disk}
2302
  @param disk: the disk to find
2303
  @rtype: None or objects.BlockDevStatus
2304
  @return: None if the disk cannot be found, otherwise a the current
2305
           information
2306

2307
  """
2308
  try:
2309
    rbd = _RecursiveFindBD(disk)
2310
  except errors.BlockDeviceError, err:
2311
    _Fail("Failed to find device: %s", err, exc=True)
2312

    
2313
  if rbd is None:
2314
    return None
2315

    
2316
  return rbd.GetSyncStatus()
2317

    
2318

    
2319
def BlockdevGetdimensions(disks):
2320
  """Computes the size of the given disks.
2321

2322
  If a disk is not found, returns None instead.
2323

2324
  @type disks: list of L{objects.Disk}
2325
  @param disks: the list of disk to compute the size for
2326
  @rtype: list
2327
  @return: list with elements None if the disk cannot be found,
2328
      otherwise the pair (size, spindles), where spindles is None if the
2329
      device doesn't support that
2330

2331
  """
2332
  result = []
2333
  for cf in disks:
2334
    try:
2335
      rbd = _RecursiveFindBD(cf)
2336
    except errors.BlockDeviceError:
2337
      result.append(None)
2338
      continue
2339
    if rbd is None:
2340
      result.append(None)
2341
    else:
2342
      result.append(rbd.GetActualDimensions())
2343
  return result
2344

    
2345

    
2346
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
2347
  """Export a block device to a remote node.
2348

2349
  @type disk: L{objects.Disk}
2350
  @param disk: the description of the disk to export
2351
  @type dest_node: str
2352
  @param dest_node: the destination node to export to
2353
  @type dest_path: str
2354
  @param dest_path: the destination path on the target node
2355
  @type cluster_name: str
2356
  @param cluster_name: the cluster name, needed for SSH hostalias
2357
  @rtype: None
2358

2359
  """
2360
  real_disk = _OpenRealBD(disk)
2361

    
2362
  # the block size on the read dd is 1MiB to match our units
2363
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
2364
                               "dd if=%s bs=1048576 count=%s",
2365
                               real_disk.dev_path, str(disk.size))
2366

    
2367
  # we set here a smaller block size as, due to ssh buffering, more
2368
  # than 64-128k will mostly ignored; we use nocreat to fail if the
2369
  # device is not already there or we pass a wrong path; we use
2370
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
2371
  # to not buffer too much memory; this means that at best, we flush
2372
  # every 64k, which will not be very fast
2373
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
2374
                                " oflag=dsync", dest_path)
2375

    
2376
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
2377
                                                   constants.SSH_LOGIN_USER,
2378
                                                   destcmd)
2379

    
2380
  # all commands have been checked, so we're safe to combine them
2381
  command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
2382

    
2383
  result = utils.RunCmd(["bash", "-c", command])
2384

    
2385
  if result.failed:
2386
    _Fail("Disk copy command '%s' returned error: %s"
2387
          " output: %s", command, result.fail_reason, result.output)
2388

    
2389

    
2390
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2391
  """Write a file to the filesystem.
2392

2393
  This allows the master to overwrite(!) a file. It will only perform
2394
  the operation if the file belongs to a list of configuration files.
2395

2396
  @type file_name: str
2397
  @param file_name: the target file name
2398
  @type data: str
2399
  @param data: the new contents of the file
2400
  @type mode: int
2401
  @param mode: the mode to give the file (can be None)
2402
  @type uid: string
2403
  @param uid: the owner of the file
2404
  @type gid: string
2405
  @param gid: the group of the file
2406
  @type atime: float
2407
  @param atime: the atime to set on the file (can be None)
2408
  @type mtime: float
2409
  @param mtime: the mtime to set on the file (can be None)
2410
  @rtype: None
2411

2412
  """
2413
  file_name = vcluster.LocalizeVirtualPath(file_name)
2414

    
2415
  if not os.path.isabs(file_name):
2416
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2417

    
2418
  if file_name not in _ALLOWED_UPLOAD_FILES:
2419
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2420
          file_name)
2421

    
2422
  raw_data = _Decompress(data)
2423

    
2424
  if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2425
    _Fail("Invalid username/groupname type")
2426

    
2427
  getents = runtime.GetEnts()
2428
  uid = getents.LookupUser(uid)
2429
  gid = getents.LookupGroup(gid)
2430

    
2431
  utils.SafeWriteFile(file_name, None,
2432
                      data=raw_data, mode=mode, uid=uid, gid=gid,
2433
                      atime=atime, mtime=mtime)
2434

    
2435

    
2436
def RunOob(oob_program, command, node, timeout):
2437
  """Executes oob_program with given command on given node.
2438

2439
  @param oob_program: The path to the executable oob_program
2440
  @param command: The command to invoke on oob_program
2441
  @param node: The node given as an argument to the program
2442
  @param timeout: Timeout after which we kill the oob program
2443

2444
  @return: stdout
2445
  @raise RPCFail: If execution fails for some reason
2446

2447
  """
2448
  result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2449

    
2450
  if result.failed:
2451
    _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2452
          result.fail_reason, result.output)
2453

    
2454
  return result.stdout
2455

    
2456

    
2457
def _OSOndiskAPIVersion(os_dir):
2458
  """Compute and return the API version of a given OS.
2459

2460
  This function will try to read the API version of the OS residing in
2461
  the 'os_dir' directory.
2462

2463
  @type os_dir: str
2464
  @param os_dir: the directory in which we should look for the OS
2465
  @rtype: tuple
2466
  @return: tuple (status, data) with status denoting the validity and
2467
      data holding either the vaid versions or an error message
2468

2469
  """
2470
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2471

    
2472
  try:
2473
    st = os.stat(api_file)
2474
  except EnvironmentError, err:
2475
    return False, ("Required file '%s' not found under path %s: %s" %
2476
                   (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err)))
2477

    
2478
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2479
    return False, ("File '%s' in %s is not a regular file" %
2480
                   (constants.OS_API_FILE, os_dir))
2481

    
2482
  try:
2483
    api_versions = utils.ReadFile(api_file).splitlines()
2484
  except EnvironmentError, err:
2485
    return False, ("Error while reading the API version file at %s: %s" %
2486
                   (api_file, utils.ErrnoOrStr(err)))
2487

    
2488
  try:
2489
    api_versions = [int(version.strip()) for version in api_versions]
2490
  except (TypeError, ValueError), err:
2491
    return False, ("API version(s) can't be converted to integer: %s" %
2492
                   str(err))
2493

    
2494
  return True, api_versions
2495

    
2496

    
2497
def DiagnoseOS(top_dirs=None):
2498
  """Compute the validity for all OSes.
2499

2500
  @type top_dirs: list
2501
  @param top_dirs: the list of directories in which to
2502
      search (if not given defaults to
2503
      L{pathutils.OS_SEARCH_PATH})
2504
  @rtype: list of L{objects.OS}
2505
  @return: a list of tuples (name, path, status, diagnose, variants,
2506
      parameters, api_version) for all (potential) OSes under all
2507
      search paths, where:
2508
          - name is the (potential) OS name
2509
          - path is the full path to the OS
2510
          - status True/False is the validity of the OS
2511
          - diagnose is the error message for an invalid OS, otherwise empty
2512
          - variants is a list of supported OS variants, if any
2513
          - parameters is a list of (name, help) parameters, if any
2514
          - api_version is a list of support OS API versions
2515

2516
  """
2517
  if top_dirs is None:
2518
    top_dirs = pathutils.OS_SEARCH_PATH
2519

    
2520
  result = []
2521
  for dir_name in top_dirs:
2522
    if os.path.isdir(dir_name):
2523
      try:
2524
        f_names = utils.ListVisibleFiles(dir_name)
2525
      except EnvironmentError, err:
2526
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2527
        break
2528
      for name in f_names:
2529
        os_path = utils.PathJoin(dir_name, name)
2530
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2531
        if status:
2532
          diagnose = ""
2533
          variants = os_inst.supported_variants
2534
          parameters = os_inst.supported_parameters
2535
          api_versions = os_inst.api_versions
2536
        else:
2537
          diagnose = os_inst
2538
          variants = parameters = api_versions = []
2539
        result.append((name, os_path, status, diagnose, variants,
2540
                       parameters, api_versions))
2541

    
2542
  return result
2543

    
2544

    
2545
def _TryOSFromDisk(name, base_dir=None):
2546
  """Create an OS instance from disk.
2547

2548
  This function will return an OS instance if the given name is a
2549
  valid OS name.
2550

2551
  @type base_dir: string
2552
  @keyword base_dir: Base directory containing OS installations.
2553
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2554
  @rtype: tuple
2555
  @return: success and either the OS instance if we find a valid one,
2556
      or error message
2557

2558
  """
2559
  if base_dir is None:
2560
    os_dir = utils.FindFile(name, pathutils.OS_SEARCH_PATH, os.path.isdir)
2561
  else:
2562
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2563

    
2564
  if os_dir is None:
2565
    return False, "Directory for OS %s not found in search path" % name
2566

    
2567
  status, api_versions = _OSOndiskAPIVersion(os_dir)
2568
  if not status:
2569
    # push the error up
2570
    return status, api_versions
2571

    
2572
  if not constants.OS_API_VERSIONS.intersection(api_versions):
2573
    return False, ("API version mismatch for path '%s': found %s, want %s." %
2574
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
2575

    
2576
  # OS Files dictionary, we will populate it with the absolute path
2577
  # names; if the value is True, then it is a required file, otherwise
2578
  # an optional one
2579
  os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2580

    
2581
  if max(api_versions) >= constants.OS_API_V15:
2582
    os_files[constants.OS_VARIANTS_FILE] = False
2583

    
2584
  if max(api_versions) >= constants.OS_API_V20:
2585
    os_files[constants.OS_PARAMETERS_FILE] = True
2586
  else:
2587
    del os_files[constants.OS_SCRIPT_VERIFY]
2588

    
2589
  for (filename, required) in os_files.items():
2590
    os_files[filename] = utils.PathJoin(os_dir, filename)
2591

    
2592
    try:
2593
      st = os.stat(os_files[filename])
2594
    except EnvironmentError, err:
2595
      if err.errno == errno.ENOENT and not required:
2596
        del os_files[filename]
2597
        continue
2598
      return False, ("File '%s' under path '%s' is missing (%s)" %
2599
                     (filename, os_dir, utils.ErrnoOrStr(err)))
2600

    
2601
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2602
      return False, ("File '%s' under path '%s' is not a regular file" %
2603
                     (filename, os_dir))
2604

    
2605
    if filename in constants.OS_SCRIPTS:
2606
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2607
        return False, ("File '%s' under path '%s' is not executable" %
2608
                       (filename, os_dir))
2609

    
2610
  variants = []
2611
  if constants.OS_VARIANTS_FILE in os_files:
2612
    variants_file = os_files[constants.OS_VARIANTS_FILE]
2613
    try:
2614
      variants = \
2615
        utils.FilterEmptyLinesAndComments(utils.ReadFile(variants_file))
2616
    except EnvironmentError, err:
2617
      # we accept missing files, but not other errors
2618
      if err.errno != errno.ENOENT:
2619
        return False, ("Error while reading the OS variants file at %s: %s" %
2620
                       (variants_file, utils.ErrnoOrStr(err)))
2621

    
2622
  parameters = []
2623
  if constants.OS_PARAMETERS_FILE in os_files:
2624
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2625
    try:
2626
      parameters = utils.ReadFile(parameters_file).splitlines()
2627
    except EnvironmentError, err:
2628
      return False, ("Error while reading the OS parameters file at %s: %s" %
2629
                     (parameters_file, utils.ErrnoOrStr(err)))
2630
    parameters = [v.split(None, 1) for v in parameters]
2631

    
2632
  os_obj = objects.OS(name=name, path=os_dir,
2633
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
2634
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
2635
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
2636
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
2637
                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2638
                                                 None),
2639
                      supported_variants=variants,
2640
                      supported_parameters=parameters,
2641
                      api_versions=api_versions)
2642
  return True, os_obj
2643

    
2644

    
2645
def OSFromDisk(name, base_dir=None):
2646
  """Create an OS instance from disk.
2647

2648
  This function will return an OS instance if the given name is a
2649
  valid OS name. Otherwise, it will raise an appropriate
2650
  L{RPCFail} exception, detailing why this is not a valid OS.
2651

2652
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2653
  an exception but returns true/false status data.
2654

2655
  @type base_dir: string
2656
  @keyword base_dir: Base directory containing OS installations.
2657
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2658
  @rtype: L{objects.OS}
2659
  @return: the OS instance if we find a valid one
2660
  @raise RPCFail: if we don't find a valid OS
2661

2662
  """
2663
  name_only = objects.OS.GetName(name)
2664
  status, payload = _TryOSFromDisk(name_only, base_dir)
2665

    
2666
  if not status:
2667
    _Fail(payload)
2668

    
2669
  return payload
2670

    
2671

    
2672
def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2673
  """Calculate the basic environment for an os script.
2674

2675
  @type os_name: str
2676
  @param os_name: full operating system name (including variant)
2677
  @type inst_os: L{objects.OS}
2678
  @param inst_os: operating system for which the environment is being built
2679
  @type os_params: dict
2680
  @param os_params: the OS parameters
2681
  @type debug: integer
2682
  @param debug: debug level (0 or 1, for OS Api 10)
2683
  @rtype: dict
2684
  @return: dict of environment variables
2685
  @raise errors.BlockDeviceError: if the block device
2686
      cannot be found
2687

2688
  """
2689
  result = {}
2690
  api_version = \
2691
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2692
  result["OS_API_VERSION"] = "%d" % api_version
2693
  result["OS_NAME"] = inst_os.name
2694
  result["DEBUG_LEVEL"] = "%d" % debug
2695

    
2696
  # OS variants
2697
  if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2698
    variant = objects.OS.GetVariant(os_name)
2699
    if not variant:
2700
      variant = inst_os.supported_variants[0]
2701
  else:
2702
    variant = ""
2703
  result["OS_VARIANT"] = variant
2704

    
2705
  # OS params
2706
  for pname, pvalue in os_params.items():
2707
    result["OSP_%s" % pname.upper()] = pvalue
2708

    
2709
  # Set a default path otherwise programs called by OS scripts (or
2710
  # even hooks called from OS scripts) might break, and we don't want
2711
  # to have each script require setting a PATH variable
2712
  result["PATH"] = constants.HOOKS_PATH
2713

    
2714
  return result
2715

    
2716

    
2717
def OSEnvironment(instance, inst_os, debug=0):
2718
  """Calculate the environment for an os script.
2719

2720
  @type instance: L{objects.Instance}
2721
  @param instance: target instance for the os script run
2722
  @type inst_os: L{objects.OS}
2723
  @param inst_os: operating system for which the environment is being built
2724
  @type debug: integer
2725
  @param debug: debug level (0 or 1, for OS Api 10)
2726
  @rtype: dict
2727
  @return: dict of environment variables
2728
  @raise errors.BlockDeviceError: if the block device
2729
      cannot be found
2730

2731
  """
2732
  result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2733

    
2734
  for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2735
    result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2736

    
2737
  result["HYPERVISOR"] = instance.hypervisor
2738
  result["DISK_COUNT"] = "%d" % len(instance.disks)
2739
  result["NIC_COUNT"] = "%d" % len(instance.nics)
2740
  result["INSTANCE_SECONDARY_NODES"] = \
2741
      ("%s" % " ".join(instance.secondary_nodes))
2742

    
2743
  # Disks
2744
  for idx, disk in enumerate(instance.disks):
2745
    real_disk = _OpenRealBD(disk)
2746
    result["DISK_%d_PATH" % idx] = real_disk.dev_path
2747
    result["DISK_%d_ACCESS" % idx] = disk.mode
2748
    if constants.HV_DISK_TYPE in instance.hvparams:
2749
      result["DISK_%d_FRONTEND_TYPE" % idx] = \
2750
        instance.hvparams[constants.HV_DISK_TYPE]
2751
    if disk.dev_type in constants.LDS_BLOCK:
2752
      result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2753
    elif disk.dev_type == constants.LD_FILE:
2754
      result["DISK_%d_BACKEND_TYPE" % idx] = \
2755
        "file:%s" % disk.physical_id[0]
2756

    
2757
  # NICs
2758
  for idx, nic in enumerate(instance.nics):
2759
    result["NIC_%d_MAC" % idx] = nic.mac
2760
    if nic.ip:
2761
      result["NIC_%d_IP" % idx] = nic.ip
2762
    result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2763
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2764
      result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2765
    if nic.nicparams[constants.NIC_LINK]:
2766
      result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2767
    if nic.netinfo:
2768
      nobj = objects.Network.FromDict(nic.netinfo)
2769
      result.update(nobj.HooksDict("NIC_%d_" % idx))
2770
    if constants.HV_NIC_TYPE in instance.hvparams:
2771
      result["NIC_%d_FRONTEND_TYPE" % idx] = \
2772
        instance.hvparams[constants.HV_NIC_TYPE]
2773

    
2774
  # HV/BE params
2775
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2776
    for key, value in source.items():
2777
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2778

    
2779
  return result
2780

    
2781

    
2782
def DiagnoseExtStorage(top_dirs=None):
2783
  """Compute the validity for all ExtStorage Providers.
2784

2785
  @type top_dirs: list
2786
  @param top_dirs: the list of directories in which to
2787
      search (if not given defaults to
2788
      L{pathutils.ES_SEARCH_PATH})
2789
  @rtype: list of L{objects.ExtStorage}
2790
  @return: a list of tuples (name, path, status, diagnose, parameters)
2791
      for all (potential) ExtStorage Providers under all
2792
      search paths, where:
2793
          - name is the (potential) ExtStorage Provider
2794
          - path is the full path to the ExtStorage Provider
2795
          - status True/False is the validity of the ExtStorage Provider
2796
          - diagnose is the error message for an invalid ExtStorage Provider,
2797
            otherwise empty
2798
          - parameters is a list of (name, help) parameters, if any
2799

2800
  """
2801
  if top_dirs is None:
2802
    top_dirs = pathutils.ES_SEARCH_PATH
2803

    
2804
  result = []
2805
  for dir_name in top_dirs:
2806
    if os.path.isdir(dir_name):
2807
      try:
2808
        f_names = utils.ListVisibleFiles(dir_name)
2809
      except EnvironmentError, err:
2810
        logging.exception("Can't list the ExtStorage directory %s: %s",
2811
                          dir_name, err)
2812
        break
2813
      for name in f_names:
2814
        es_path = utils.PathJoin(dir_name, name)
2815
        status, es_inst = bdev.ExtStorageFromDisk(name, base_dir=dir_name)
2816
        if status:
2817
          diagnose = ""
2818
          parameters = es_inst.supported_parameters
2819
        else:
2820
          diagnose = es_inst
2821
          parameters = []
2822
        result.append((name, es_path, status, diagnose, parameters))
2823

    
2824
  return result
2825

    
2826

    
2827
def BlockdevGrow(disk, amount, dryrun, backingstore):
2828
  """Grow a stack of block devices.
2829

2830
  This function is called recursively, with the childrens being the
2831
  first ones to resize.
2832

2833
  @type disk: L{objects.Disk}
2834
  @param disk: the disk to be grown
2835
  @type amount: integer
2836
  @param amount: the amount (in mebibytes) to grow with
2837
  @type dryrun: boolean
2838
  @param dryrun: whether to execute the operation in simulation mode
2839
      only, without actually increasing the size
2840
  @param backingstore: whether to execute the operation on backing storage
2841
      only, or on "logical" storage only; e.g. DRBD is logical storage,
2842
      whereas LVM, file, RBD are backing storage
2843
  @rtype: (status, result)
2844
  @return: a tuple with the status of the operation (True/False), and
2845
      the errors message if status is False
2846

2847
  """
2848
  r_dev = _RecursiveFindBD(disk)
2849
  if r_dev is None:
2850
    _Fail("Cannot find block device %s", disk)
2851

    
2852
  try:
2853
    r_dev.Grow(amount, dryrun, backingstore)
2854
  except errors.BlockDeviceError, err:
2855
    _Fail("Failed to grow block device: %s", err, exc=True)
2856

    
2857

    
2858
def BlockdevSnapshot(disk):
2859
  """Create a snapshot copy of a block device.
2860

2861
  This function is called recursively, and the snapshot is actually created
2862
  just for the leaf lvm backend device.
2863

2864
  @type disk: L{objects.Disk}
2865
  @param disk: the disk to be snapshotted
2866
  @rtype: string
2867
  @return: snapshot disk ID as (vg, lv)
2868

2869
  """
2870
  if disk.dev_type == constants.LD_DRBD8:
2871
    if not disk.children:
2872
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2873
            disk.unique_id)
2874
    return BlockdevSnapshot(disk.children[0])
2875
  elif disk.dev_type == constants.LD_LV:
2876
    r_dev = _RecursiveFindBD(disk)
2877
    if r_dev is not None:
2878
      # FIXME: choose a saner value for the snapshot size
2879
      # let's stay on the safe side and ask for the full size, for now
2880
      return r_dev.Snapshot(disk.size)
2881
    else:
2882
      _Fail("Cannot find block device %s", disk)
2883
  else:
2884
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2885
          disk.unique_id, disk.dev_type)
2886

    
2887

    
2888
def BlockdevSetInfo(disk, info):
2889
  """Sets 'metadata' information on block devices.
2890

2891
  This function sets 'info' metadata on block devices. Initial
2892
  information is set at device creation; this function should be used
2893
  for example after renames.
2894

2895
  @type disk: L{objects.Disk}
2896
  @param disk: the disk to be grown
2897
  @type info: string
2898
  @param info: new 'info' metadata
2899
  @rtype: (status, result)
2900
  @return: a tuple with the status of the operation (True/False), and
2901
      the errors message if status is False
2902

2903
  """
2904
  r_dev = _RecursiveFindBD(disk)
2905
  if r_dev is None:
2906
    _Fail("Cannot find block device %s", disk)
2907

    
2908
  try:
2909
    r_dev.SetInfo(info)
2910
  except errors.BlockDeviceError, err:
2911
    _Fail("Failed to set information on block device: %s", err, exc=True)
2912

    
2913

    
2914
def FinalizeExport(instance, snap_disks):
2915
  """Write out the export configuration information.
2916

2917
  @type instance: L{objects.Instance}
2918
  @param instance: the instance which we export, used for
2919
      saving configuration
2920
  @type snap_disks: list of L{objects.Disk}
2921
  @param snap_disks: list of snapshot block devices, which
2922
      will be used to get the actual name of the dump file
2923

2924
  @rtype: None
2925

2926
  """
2927
  destdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name + ".new")
2928
  finaldestdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name)
2929

    
2930
  config = objects.SerializableConfigParser()
2931

    
2932
  config.add_section(constants.INISECT_EXP)
2933
  config.set(constants.INISECT_EXP, "version", "0")
2934
  config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2935
  config.set(constants.INISECT_EXP, "source", instance.primary_node)
2936
  config.set(constants.INISECT_EXP, "os", instance.os)
2937
  config.set(constants.INISECT_EXP, "compression", "none")
2938

    
2939
  config.add_section(constants.INISECT_INS)
2940
  config.set(constants.INISECT_INS, "name", instance.name)
2941
  config.set(constants.INISECT_INS, "maxmem", "%d" %
2942
             instance.beparams[constants.BE_MAXMEM])
2943
  config.set(constants.INISECT_INS, "minmem", "%d" %
2944
             instance.beparams[constants.BE_MINMEM])
2945
  # "memory" is deprecated, but useful for exporting to old ganeti versions
2946
  config.set(constants.INISECT_INS, "memory", "%d" %
2947
             instance.beparams[constants.BE_MAXMEM])
2948
  config.set(constants.INISECT_INS, "vcpus", "%d" %
2949
             instance.beparams[constants.BE_VCPUS])
2950
  config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2951
  config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2952
  config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2953

    
2954
  nic_total = 0
2955
  for nic_count, nic in enumerate(instance.nics):
2956
    nic_total += 1
2957
    config.set(constants.INISECT_INS, "nic%d_mac" %
2958
               nic_count, "%s" % nic.mac)
2959
    config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2960
    config.set(constants.INISECT_INS, "nic%d_network" % nic_count,
2961
               "%s" % nic.network)
2962
    for param in constants.NICS_PARAMETER_TYPES:
2963
      config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2964
                 "%s" % nic.nicparams.get(param, None))
2965
  # TODO: redundant: on load can read nics until it doesn't exist
2966
  config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2967

    
2968
  disk_total = 0
2969
  for disk_count, disk in enumerate(snap_disks):
2970
    if disk:
2971
      disk_total += 1
2972
      config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2973
                 ("%s" % disk.iv_name))
2974
      config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2975
                 ("%s" % disk.physical_id[1]))
2976
      config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2977
                 ("%d" % disk.size))
2978

    
2979
  config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2980

    
2981
  # New-style hypervisor/backend parameters
2982

    
2983
  config.add_section(constants.INISECT_HYP)
2984
  for name, value in instance.hvparams.items():
2985
    if name not in constants.HVC_GLOBALS:
2986
      config.set(constants.INISECT_HYP, name, str(value))
2987

    
2988
  config.add_section(constants.INISECT_BEP)
2989
  for name, value in instance.beparams.items():
2990
    config.set(constants.INISECT_BEP, name, str(value))
2991

    
2992
  config.add_section(constants.INISECT_OSP)
2993
  for name, value in instance.osparams.items():
2994
    config.set(constants.INISECT_OSP, name, str(value))
2995

    
2996
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2997
                  data=config.Dumps())
2998
  shutil.rmtree(finaldestdir, ignore_errors=True)
2999
  shutil.move(destdir, finaldestdir)
3000

    
3001

    
3002
def ExportInfo(dest):
3003
  """Get export configuration information.
3004

3005
  @type dest: str
3006
  @param dest: directory containing the export
3007

3008
  @rtype: L{objects.SerializableConfigParser}
3009
  @return: a serializable config file containing the
3010
      export info
3011

3012
  """
3013
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
3014

    
3015
  config = objects.SerializableConfigParser()
3016
  config.read(cff)
3017

    
3018
  if (not config.has_section(constants.INISECT_EXP) or
3019
      not config.has_section(constants.INISECT_INS)):
3020
    _Fail("Export info file doesn't have the required fields")
3021

    
3022
  return config.Dumps()
3023

    
3024

    
3025
def ListExports():
3026
  """Return a list of exports currently available on this machine.
3027

3028
  @rtype: list
3029
  @return: list of the exports
3030

3031
  """
3032
  if os.path.isdir(pathutils.EXPORT_DIR):
3033
    return sorted(utils.ListVisibleFiles(pathutils.EXPORT_DIR))
3034
  else:
3035
    _Fail("No exports directory")
3036

    
3037

    
3038
def RemoveExport(export):
3039
  """Remove an existing export from the node.
3040

3041
  @type export: str
3042
  @param export: the name of the export to remove
3043
  @rtype: None
3044

3045
  """
3046
  target = utils.PathJoin(pathutils.EXPORT_DIR, export)
3047

    
3048
  try:
3049
    shutil.rmtree(target)
3050
  except EnvironmentError, err:
3051
    _Fail("Error while removing the export: %s", err, exc=True)
3052

    
3053

    
3054
def BlockdevRename(devlist):
3055
  """Rename a list of block devices.
3056

3057
  @type devlist: list of tuples
3058
  @param devlist: list of tuples of the form  (disk,
3059
      new_logical_id, new_physical_id); disk is an
3060
      L{objects.Disk} object describing the current disk,
3061
      and new logical_id/physical_id is the name we
3062
      rename it to
3063
  @rtype: boolean
3064
  @return: True if all renames succeeded, False otherwise
3065

3066
  """
3067
  msgs = []
3068
  result = True
3069
  for disk, unique_id in devlist:
3070
    dev = _RecursiveFindBD(disk)
3071
    if dev is None:
3072
      msgs.append("Can't find device %s in rename" % str(disk))
3073
      result = False
3074
      continue
3075
    try:
3076
      old_rpath = dev.dev_path
3077
      dev.Rename(unique_id)
3078
      new_rpath = dev.dev_path
3079
      if old_rpath != new_rpath:
3080
        DevCacheManager.RemoveCache(old_rpath)
3081
        # FIXME: we should add the new cache information here, like:
3082
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
3083
        # but we don't have the owner here - maybe parse from existing
3084
        # cache? for now, we only lose lvm data when we rename, which
3085
        # is less critical than DRBD or MD
3086
    except errors.BlockDeviceError, err:
3087
      msgs.append("Can't rename device '%s' to '%s': %s" %
3088
                  (dev, unique_id, err))
3089
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
3090
      result = False
3091
  if not result:
3092
    _Fail("; ".join(msgs))
3093

    
3094

    
3095
def _TransformFileStorageDir(fs_dir):
3096
  """Checks whether given file_storage_dir is valid.
3097

3098
  Checks wheter the given fs_dir is within the cluster-wide default
3099
  file_storage_dir or the shared_file_storage_dir, which are stored in
3100
  SimpleStore. Only paths under those directories are allowed.
3101

3102
  @type fs_dir: str
3103
  @param fs_dir: the path to check
3104

3105
  @return: the normalized path if valid, None otherwise
3106

3107
  """
3108
  if not (constants.ENABLE_FILE_STORAGE or
3109
          constants.ENABLE_SHARED_FILE_STORAGE):
3110
    _Fail("File storage disabled at configure time")
3111

    
3112
  bdev.CheckFileStoragePath(fs_dir)
3113

    
3114
  return os.path.normpath(fs_dir)
3115

    
3116

    
3117
def CreateFileStorageDir(file_storage_dir):
3118
  """Create file storage directory.
3119

3120
  @type file_storage_dir: str
3121
  @param file_storage_dir: directory to create
3122

3123
  @rtype: tuple
3124
  @return: tuple with first element a boolean indicating wheter dir
3125
      creation was successful or not
3126

3127
  """
3128
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
3129
  if os.path.exists(file_storage_dir):
3130
    if not os.path.isdir(file_storage_dir):
3131
      _Fail("Specified storage dir '%s' is not a directory",
3132
            file_storage_dir)
3133
  else:
3134
    try:
3135
      os.makedirs(file_storage_dir, 0750)
3136
    except OSError, err:
3137
      _Fail("Cannot create file storage directory '%s': %s",
3138
            file_storage_dir, err, exc=True)
3139

    
3140

    
3141
def RemoveFileStorageDir(file_storage_dir):
3142
  """Remove file storage directory.
3143

3144
  Remove it only if it's empty. If not log an error and return.
3145

3146
  @type file_storage_dir: str
3147
  @param file_storage_dir: the directory we should cleanup
3148
  @rtype: tuple (success,)
3149
  @return: tuple of one element, C{success}, denoting
3150
      whether the operation was successful
3151

3152
  """
3153
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
3154
  if os.path.exists(file_storage_dir):
3155
    if not os.path.isdir(file_storage_dir):
3156
      _Fail("Specified Storage directory '%s' is not a directory",
3157
            file_storage_dir)
3158
    # deletes dir only if empty, otherwise we want to fail the rpc call
3159
    try:
3160
      os.rmdir(file_storage_dir)
3161
    except OSError, err:
3162
      _Fail("Cannot remove file storage directory '%s': %s",
3163
            file_storage_dir, err)
3164

    
3165

    
3166
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
3167
  """Rename the file storage directory.
3168

3169
  @type old_file_storage_dir: str
3170
  @param old_file_storage_dir: the current path
3171
  @type new_file_storage_dir: str
3172
  @param new_file_storage_dir: the name we should rename to
3173
  @rtype: tuple (success,)
3174
  @return: tuple of one element, C{success}, denoting
3175
      whether the operation was successful
3176

3177
  """
3178
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
3179
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
3180
  if not os.path.exists(new_file_storage_dir):
3181
    if os.path.isdir(old_file_storage_dir):
3182
      try:
3183
        os.rename(old_file_storage_dir, new_file_storage_dir)
3184
      except OSError, err:
3185
        _Fail("Cannot rename '%s' to '%s': %s",
3186
              old_file_storage_dir, new_file_storage_dir, err)
3187
    else:
3188
      _Fail("Specified storage dir '%s' is not a directory",
3189
            old_file_storage_dir)
3190
  else:
3191
    if os.path.exists(old_file_storage_dir):
3192
      _Fail("Cannot rename '%s' to '%s': both locations exist",
3193
            old_file_storage_dir, new_file_storage_dir)
3194

    
3195

    
3196
def _EnsureJobQueueFile(file_name):
3197
  """Checks whether the given filename is in the queue directory.
3198

3199
  @type file_name: str
3200
  @param file_name: the file name we should check
3201
  @rtype: None
3202
  @raises RPCFail: if the file is not valid
3203

3204
  """
3205
  if not utils.IsBelowDir(pathutils.QUEUE_DIR, file_name):
3206
    _Fail("Passed job queue file '%s' does not belong to"
3207
          " the queue directory '%s'", file_name, pathutils.QUEUE_DIR)
3208

    
3209

    
3210
def JobQueueUpdate(file_name, content):
3211
  """Updates a file in the queue directory.
3212

3213
  This is just a wrapper over L{utils.io.WriteFile}, with proper
3214
  checking.
3215

3216
  @type file_name: str
3217
  @param file_name: the job file name
3218
  @type content: str
3219
  @param content: the new job contents
3220
  @rtype: boolean
3221
  @return: the success of the operation
3222

3223
  """
3224
  file_name = vcluster.LocalizeVirtualPath(file_name)
3225

    
3226
  _EnsureJobQueueFile(file_name)
3227
  getents = runtime.GetEnts()
3228

    
3229
  # Write and replace the file atomically
3230
  utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
3231
                  gid=getents.daemons_gid, mode=constants.JOB_QUEUE_FILES_PERMS)
3232

    
3233

    
3234
def JobQueueRename(old, new):
3235
  """Renames a job queue file.
3236

3237
  This is just a wrapper over os.rename with proper checking.
3238

3239
  @type old: str
3240
  @param old: the old (actual) file name
3241
  @type new: str
3242
  @param new: the desired file name
3243
  @rtype: tuple
3244
  @return: the success of the operation and payload
3245

3246
  """
3247
  old = vcluster.LocalizeVirtualPath(old)
3248
  new = vcluster.LocalizeVirtualPath(new)
3249

    
3250
  _EnsureJobQueueFile(old)
3251
  _EnsureJobQueueFile(new)
3252

    
3253
  getents = runtime.GetEnts()
3254

    
3255
  utils.RenameFile(old, new, mkdir=True, mkdir_mode=0750,
3256
                   dir_uid=getents.masterd_uid, dir_gid=getents.daemons_gid)
3257

    
3258

    
3259
def BlockdevClose(instance_name, disks):
3260
  """Closes the given block devices.
3261

3262
  This means they will be switched to secondary mode (in case of
3263
  DRBD).
3264

3265
  @param instance_name: if the argument is not empty, the symlinks
3266
      of this instance will be removed
3267
  @type disks: list of L{objects.Disk}
3268
  @param disks: the list of disks to be closed
3269
  @rtype: tuple (success, message)
3270
  @return: a tuple of success and message, where success
3271
      indicates the succes of the operation, and message
3272
      which will contain the error details in case we
3273
      failed
3274

3275
  """
3276
  bdevs = []
3277
  for cf in disks:
3278
    rd = _RecursiveFindBD(cf)
3279
    if rd is None:
3280
      _Fail("Can't find device %s", cf)
3281
    bdevs.append(rd)
3282

    
3283
  msg = []
3284
  for rd in bdevs:
3285
    try:
3286
      rd.Close()
3287
    except errors.BlockDeviceError, err:
3288
      msg.append(str(err))
3289
  if msg:
3290
    _Fail("Can't make devices secondary: %s", ",".join(msg))
3291
  else:
3292
    if instance_name:
3293
      _RemoveBlockDevLinks(instance_name, disks)
3294

    
3295

    
3296
def ValidateHVParams(hvname, hvparams):
3297
  """Validates the given hypervisor parameters.
3298

3299
  @type hvname: string
3300
  @param hvname: the hypervisor name
3301
  @type hvparams: dict
3302
  @param hvparams: the hypervisor parameters to be validated
3303
  @rtype: None
3304

3305
  """
3306
  try:
3307
    hv_type = hypervisor.GetHypervisor(hvname)
3308
    hv_type.ValidateParameters(hvparams)
3309
  except errors.HypervisorError, err:
3310
    _Fail(str(err), log=False)
3311

    
3312

    
3313
def _CheckOSPList(os_obj, parameters):
3314
  """Check whether a list of parameters is supported by the OS.
3315

3316
  @type os_obj: L{objects.OS}
3317
  @param os_obj: OS object to check
3318
  @type parameters: list
3319
  @param parameters: the list of parameters to check
3320

3321
  """
3322
  supported = [v[0] for v in os_obj.supported_parameters]
3323
  delta = frozenset(parameters).difference(supported)
3324
  if delta:
3325
    _Fail("The following parameters are not supported"
3326
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
3327

    
3328

    
3329
def ValidateOS(required, osname, checks, osparams):
3330
  """Validate the given OS' parameters.
3331

3332
  @type required: boolean
3333
  @param required: whether absence of the OS should translate into
3334
      failure or not
3335
  @type osname: string
3336
  @param osname: the OS to be validated
3337
  @type checks: list
3338
  @param checks: list of the checks to run (currently only 'parameters')
3339
  @type osparams: dict
3340
  @param osparams: dictionary with OS parameters
3341
  @rtype: boolean
3342
  @return: True if the validation passed, or False if the OS was not
3343
      found and L{required} was false
3344

3345
  """
3346
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
3347
    _Fail("Unknown checks required for OS %s: %s", osname,
3348
          set(checks).difference(constants.OS_VALIDATE_CALLS))
3349

    
3350
  name_only = objects.OS.GetName(osname)
3351
  status, tbv = _TryOSFromDisk(name_only, None)
3352

    
3353
  if not status:
3354
    if required:
3355
      _Fail(tbv)
3356
    else:
3357
      return False
3358

    
3359
  if max(tbv.api_versions) < constants.OS_API_V20:
3360
    return True
3361

    
3362
  if constants.OS_VALIDATE_PARAMETERS in checks:
3363
    _CheckOSPList(tbv, osparams.keys())
3364

    
3365
  validate_env = OSCoreEnv(osname, tbv, osparams)
3366
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
3367
                        cwd=tbv.path, reset_env=True)
3368
  if result.failed:
3369
    logging.error("os validate command '%s' returned error: %s output: %s",
3370
                  result.cmd, result.fail_reason, result.output)
3371
    _Fail("OS validation script failed (%s), output: %s",
3372
          result.fail_reason, result.output, log=False)
3373

    
3374
  return True
3375

    
3376

    
3377
def DemoteFromMC():
3378
  """Demotes the current node from master candidate role.
3379

3380
  """
3381
  # try to ensure we're not the master by mistake
3382
  master, myself = ssconf.GetMasterAndMyself()
3383
  if master == myself:
3384
    _Fail("ssconf status shows I'm the master node, will not demote")
3385

    
3386
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "check", constants.MASTERD])
3387
  if not result.failed:
3388
    _Fail("The master daemon is running, will not demote")
3389

    
3390
  try:
3391
    if os.path.isfile(pathutils.CLUSTER_CONF_FILE):
3392
      utils.CreateBackup(pathutils.CLUSTER_CONF_FILE)
3393
  except EnvironmentError, err:
3394
    if err.errno != errno.ENOENT:
3395
      _Fail("Error while backing up cluster file: %s", err, exc=True)
3396

    
3397
  utils.RemoveFile(pathutils.CLUSTER_CONF_FILE)
3398

    
3399

    
3400
def _GetX509Filenames(cryptodir, name):
3401
  """Returns the full paths for the private key and certificate.
3402

3403
  """
3404
  return (utils.PathJoin(cryptodir, name),
3405
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
3406
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
3407

    
3408

    
3409
def CreateX509Certificate(validity, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3410
  """Creates a new X509 certificate for SSL/TLS.
3411

3412
  @type validity: int
3413
  @param validity: Validity in seconds
3414
  @rtype: tuple; (string, string)
3415
  @return: Certificate name and public part
3416

3417
  """
3418
  (key_pem, cert_pem) = \
3419
    utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
3420
                                     min(validity, _MAX_SSL_CERT_VALIDITY))
3421

    
3422
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
3423
                              prefix="x509-%s-" % utils.TimestampForFilename())
3424
  try:
3425
    name = os.path.basename(cert_dir)
3426
    assert len(name) > 5
3427

    
3428
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3429

    
3430
    utils.WriteFile(key_file, mode=0400, data=key_pem)
3431
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
3432

    
3433
    # Never return private key as it shouldn't leave the node
3434
    return (name, cert_pem)
3435
  except Exception:
3436
    shutil.rmtree(cert_dir, ignore_errors=True)
3437
    raise
3438

    
3439

    
3440
def RemoveX509Certificate(name, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3441
  """Removes a X509 certificate.
3442

3443
  @type name: string
3444
  @param name: Certificate name
3445

3446
  """
3447
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3448

    
3449
  utils.RemoveFile(key_file)
3450
  utils.RemoveFile(cert_file)
3451

    
3452
  try:
3453
    os.rmdir(cert_dir)
3454
  except EnvironmentError, err:
3455
    _Fail("Cannot remove certificate directory '%s': %s",
3456
          cert_dir, err)
3457

    
3458

    
3459
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3460
  """Returns the command for the requested input/output.
3461

3462
  @type instance: L{objects.Instance}
3463
  @param instance: The instance object
3464
  @param mode: Import/export mode
3465
  @param ieio: Input/output type
3466
  @param ieargs: Input/output arguments
3467

3468
  """
3469
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3470

    
3471
  env = None
3472
  prefix = None
3473
  suffix = None
3474
  exp_size = None
3475

    
3476
  if ieio == constants.IEIO_FILE:
3477
    (filename, ) = ieargs
3478

    
3479
    if not utils.IsNormAbsPath(filename):
3480
      _Fail("Path '%s' is not normalized or absolute", filename)
3481

    
3482
    real_filename = os.path.realpath(filename)
3483
    directory = os.path.dirname(real_filename)
3484

    
3485
    if not utils.IsBelowDir(pathutils.EXPORT_DIR, real_filename):
3486
      _Fail("File '%s' is not under exports directory '%s': %s",
3487
            filename, pathutils.EXPORT_DIR, real_filename)
3488

    
3489
    # Create directory
3490
    utils.Makedirs(directory, mode=0750)
3491

    
3492
    quoted_filename = utils.ShellQuote(filename)
3493

    
3494
    if mode == constants.IEM_IMPORT:
3495
      suffix = "> %s" % quoted_filename
3496
    elif mode == constants.IEM_EXPORT:
3497
      suffix = "< %s" % quoted_filename
3498

    
3499
      # Retrieve file size
3500
      try:
3501
        st = os.stat(filename)
3502
      except EnvironmentError, err:
3503
        logging.error("Can't stat(2) %s: %s", filename, err)
3504
      else:
3505
        exp_size = utils.BytesToMebibyte(st.st_size)
3506

    
3507
  elif ieio == constants.IEIO_RAW_DISK:
3508
    (disk, ) = ieargs
3509

    
3510
    real_disk = _OpenRealBD(disk)
3511

    
3512
    if mode == constants.IEM_IMPORT:
3513
      # we set here a smaller block size as, due to transport buffering, more
3514
      # than 64-128k will mostly ignored; we use nocreat to fail if the device
3515
      # is not already there or we pass a wrong path; we use notrunc to no
3516
      # attempt truncate on an LV device; we use oflag=dsync to not buffer too
3517
      # much memory; this means that at best, we flush every 64k, which will
3518
      # not be very fast
3519
      suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
3520
                                    " bs=%s oflag=dsync"),
3521
                                    real_disk.dev_path,
3522
                                    str(64 * 1024))
3523

    
3524
    elif mode == constants.IEM_EXPORT:
3525
      # the block size on the read dd is 1MiB to match our units
3526
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3527
                                   real_disk.dev_path,
3528
                                   str(1024 * 1024), # 1 MB
3529
                                   str(disk.size))
3530
      exp_size = disk.size
3531

    
3532
  elif ieio == constants.IEIO_SCRIPT:
3533
    (disk, disk_index, ) = ieargs
3534

    
3535
    assert isinstance(disk_index, (int, long))
3536

    
3537
    real_disk = _OpenRealBD(disk)
3538

    
3539
    inst_os = OSFromDisk(instance.os)
3540
    env = OSEnvironment(instance, inst_os)
3541

    
3542
    if mode == constants.IEM_IMPORT:
3543
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3544
      env["IMPORT_INDEX"] = str(disk_index)
3545
      script = inst_os.import_script
3546

    
3547
    elif mode == constants.IEM_EXPORT:
3548
      env["EXPORT_DEVICE"] = real_disk.dev_path
3549
      env["EXPORT_INDEX"] = str(disk_index)
3550
      script = inst_os.export_script
3551

    
3552
    # TODO: Pass special environment only to script
3553
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3554

    
3555
    if mode == constants.IEM_IMPORT:
3556
      suffix = "| %s" % script_cmd
3557

    
3558
    elif mode == constants.IEM_EXPORT:
3559
      prefix = "%s |" % script_cmd
3560

    
3561
    # Let script predict size
3562
    exp_size = constants.IE_CUSTOM_SIZE
3563

    
3564
  else:
3565
    _Fail("Invalid %s I/O mode %r", mode, ieio)
3566

    
3567
  return (env, prefix, suffix, exp_size)
3568

    
3569

    
3570
def _CreateImportExportStatusDir(prefix):
3571
  """Creates status directory for import/export.
3572

3573
  """
3574
  return tempfile.mkdtemp(dir=pathutils.IMPORT_EXPORT_DIR,
3575
                          prefix=("%s-%s-" %
3576
                                  (prefix, utils.TimestampForFilename())))
3577

    
3578

    
3579
def StartImportExportDaemon(mode, opts, host, port, instance, component,
3580
                            ieio, ieioargs):
3581
  """Starts an import or export daemon.
3582

3583
  @param mode: Import/output mode
3584
  @type opts: L{objects.ImportExportOptions}
3585
  @param opts: Daemon options
3586
  @type host: string
3587
  @param host: Remote host for export (None for import)
3588
  @type port: int
3589
  @param port: Remote port for export (None for import)
3590
  @type instance: L{objects.Instance}
3591
  @param instance: Instance object
3592
  @type component: string
3593
  @param component: which part of the instance is transferred now,
3594
      e.g. 'disk/0'
3595
  @param ieio: Input/output type
3596
  @param ieioargs: Input/output arguments
3597

3598
  """
3599
  if mode == constants.IEM_IMPORT:
3600
    prefix = "import"
3601

    
3602
    if not (host is None and port is None):
3603
      _Fail("Can not specify host or port on import")
3604

    
3605
  elif mode == constants.IEM_EXPORT:
3606
    prefix = "export"
3607

    
3608
    if host is None or port is None:
3609
      _Fail("Host and port must be specified for an export")
3610

    
3611
  else:
3612
    _Fail("Invalid mode %r", mode)
3613

    
3614
  if (opts.key_name is None) ^ (opts.ca_pem is None):
3615
    _Fail("Cluster certificate can only be used for both key and CA")
3616

    
3617
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3618
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3619

    
3620
  if opts.key_name is None:
3621
    # Use server.pem
3622
    key_path = pathutils.NODED_CERT_FILE
3623
    cert_path = pathutils.NODED_CERT_FILE
3624
    assert opts.ca_pem is None
3625
  else:
3626
    (_, key_path, cert_path) = _GetX509Filenames(pathutils.CRYPTO_KEYS_DIR,
3627
                                                 opts.key_name)
3628
    assert opts.ca_pem is not None
3629

    
3630
  for i in [key_path, cert_path]:
3631
    if not os.path.exists(i):
3632
      _Fail("File '%s' does not exist" % i)
3633

    
3634
  status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3635
  try:
3636
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3637
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3638
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3639

    
3640
    if opts.ca_pem is None:
3641
      # Use server.pem
3642
      ca = utils.ReadFile(pathutils.NODED_CERT_FILE)
3643
    else:
3644
      ca = opts.ca_pem
3645

    
3646
    # Write CA file
3647
    utils.WriteFile(ca_file, data=ca, mode=0400)
3648

    
3649
    cmd = [
3650
      pathutils.IMPORT_EXPORT_DAEMON,
3651
      status_file, mode,
3652
      "--key=%s" % key_path,
3653
      "--cert=%s" % cert_path,
3654
      "--ca=%s" % ca_file,
3655
      ]
3656

    
3657
    if host:
3658
      cmd.append("--host=%s" % host)
3659

    
3660
    if port:
3661
      cmd.append("--port=%s" % port)
3662

    
3663
    if opts.ipv6:
3664
      cmd.append("--ipv6")
3665
    else:
3666
      cmd.append("--ipv4")
3667

    
3668
    if opts.compress:
3669
      cmd.append("--compress=%s" % opts.compress)
3670

    
3671
    if opts.magic:
3672
      cmd.append("--magic=%s" % opts.magic)
3673

    
3674
    if exp_size is not None:
3675
      cmd.append("--expected-size=%s" % exp_size)
3676

    
3677
    if cmd_prefix:
3678
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
3679

    
3680
    if cmd_suffix:
3681
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
3682

    
3683
    if mode == constants.IEM_EXPORT:
3684
      # Retry connection a few times when connecting to remote peer
3685
      cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3686
      cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3687
    elif opts.connect_timeout is not None:
3688
      assert mode == constants.IEM_IMPORT
3689
      # Overall timeout for establishing connection while listening
3690
      cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3691

    
3692
    logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3693

    
3694
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3695
    # support for receiving a file descriptor for output
3696
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3697
                      output=logfile)
3698

    
3699
    # The import/export name is simply the status directory name
3700
    return os.path.basename(status_dir)
3701

    
3702
  except Exception:
3703
    shutil.rmtree(status_dir, ignore_errors=True)
3704
    raise
3705

    
3706

    
3707
def GetImportExportStatus(names):
3708
  """Returns import/export daemon status.
3709

3710
  @type names: sequence
3711
  @param names: List of names
3712
  @rtype: List of dicts
3713
  @return: Returns a list of the state of each named import/export or None if a
3714
           status couldn't be read
3715

3716
  """
3717
  result = []
3718

    
3719
  for name in names:
3720
    status_file = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name,
3721
                                 _IES_STATUS_FILE)
3722

    
3723
    try:
3724
      data = utils.ReadFile(status_file)
3725
    except EnvironmentError, err:
3726
      if err.errno != errno.ENOENT:
3727
        raise
3728
      data = None
3729

    
3730
    if not data:
3731
      result.append(None)
3732
      continue
3733

    
3734
    result.append(serializer.LoadJson(data))
3735

    
3736
  return result
3737

    
3738

    
3739
def AbortImportExport(name):
3740
  """Sends SIGTERM to a running import/export daemon.
3741

3742
  """
3743
  logging.info("Abort import/export %s", name)
3744

    
3745
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3746
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3747

    
3748
  if pid:
3749
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3750
                 name, pid)
3751
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3752

    
3753

    
3754
def CleanupImportExport(name):
3755
  """Cleanup after an import or export.
3756

3757
  If the import/export daemon is still running it's killed. Afterwards the
3758
  whole status directory is removed.
3759

3760
  """
3761
  logging.info("Finalizing import/export %s", name)
3762

    
3763
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3764

    
3765
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3766

    
3767
  if pid:
3768
    logging.info("Import/export %s is still running with PID %s",
3769
                 name, pid)
3770
    utils.KillProcess(pid, waitpid=False)
3771

    
3772
  shutil.rmtree(status_dir, ignore_errors=True)
3773

    
3774

    
3775
def _FindDisks(nodes_ip, disks):
3776
  """Sets the physical ID on disks and returns the block devices.
3777

3778
  """
3779
  # set the correct physical ID
3780
  my_name = netutils.Hostname.GetSysName()
3781
  for cf in disks:
3782
    cf.SetPhysicalID(my_name, nodes_ip)
3783

    
3784
  bdevs = []
3785

    
3786
  for cf in disks:
3787
    rd = _RecursiveFindBD(cf)
3788
    if rd is None:
3789
      _Fail("Can't find device %s", cf)
3790
    bdevs.append(rd)
3791
  return bdevs
3792

    
3793

    
3794
def DrbdDisconnectNet(nodes_ip, disks):
3795
  """Disconnects the network on a list of drbd devices.
3796

3797
  """
3798
  bdevs = _FindDisks(nodes_ip, disks)
3799

    
3800
  # disconnect disks
3801
  for rd in bdevs:
3802
    try:
3803
      rd.DisconnectNet()
3804
    except errors.BlockDeviceError, err:
3805
      _Fail("Can't change network configuration to standalone mode: %s",
3806
            err, exc=True)
3807

    
3808

    
3809
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3810
  """Attaches the network on a list of drbd devices.
3811

3812
  """
3813
  bdevs = _FindDisks(nodes_ip, disks)
3814

    
3815
  if multimaster:
3816
    for idx, rd in enumerate(bdevs):
3817
      try:
3818
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3819
      except EnvironmentError, err:
3820
        _Fail("Can't create symlink: %s", err)
3821
  # reconnect disks, switch to new master configuration and if
3822
  # needed primary mode
3823
  for rd in bdevs:
3824
    try:
3825
      rd.AttachNet(multimaster)
3826
    except errors.BlockDeviceError, err:
3827
      _Fail("Can't change network configuration: %s", err)
3828

    
3829
  # wait until the disks are connected; we need to retry the re-attach
3830
  # if the device becomes standalone, as this might happen if the one
3831
  # node disconnects and reconnects in a different mode before the
3832
  # other node reconnects; in this case, one or both of the nodes will
3833
  # decide it has wrong configuration and switch to standalone
3834

    
3835
  def _Attach():
3836
    all_connected = True
3837

    
3838
    for rd in bdevs:
3839
      stats = rd.GetProcStatus()
3840

    
3841
      all_connected = (all_connected and
3842
                       (stats.is_connected or stats.is_in_resync))
3843

    
3844
      if stats.is_standalone:
3845
        # peer had different config info and this node became
3846
        # standalone, even though this should not happen with the
3847
        # new staged way of changing disk configs
3848
        try:
3849
          rd.AttachNet(multimaster)
3850
        except errors.BlockDeviceError, err:
3851
          _Fail("Can't change network configuration: %s", err)
3852

    
3853
    if not all_connected:
3854
      raise utils.RetryAgain()
3855

    
3856
  try:
3857
    # Start with a delay of 100 miliseconds and go up to 5 seconds
3858
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3859
  except utils.RetryTimeout:
3860
    _Fail("Timeout in disk reconnecting")
3861

    
3862
  if multimaster:
3863
    # change to primary mode
3864
    for rd in bdevs:
3865
      try:
3866
        rd.Open()
3867
      except errors.BlockDeviceError, err:
3868
        _Fail("Can't change to primary mode: %s", err)
3869

    
3870

    
3871
def DrbdWaitSync(nodes_ip, disks):
3872
  """Wait until DRBDs have synchronized.
3873

3874
  """
3875
  def _helper(rd):
3876
    stats = rd.GetProcStatus()
3877
    if not (stats.is_connected or stats.is_in_resync):
3878
      raise utils.RetryAgain()
3879
    return stats
3880

    
3881
  bdevs = _FindDisks(nodes_ip, disks)
3882

    
3883
  min_resync = 100
3884
  alldone = True
3885
  for rd in bdevs:
3886
    try:
3887
      # poll each second for 15 seconds
3888
      stats = utils.Retry(_helper, 1, 15, args=[rd])
3889
    except utils.RetryTimeout:
3890
      stats = rd.GetProcStatus()
3891
      # last check
3892
      if not (stats.is_connected or stats.is_in_resync):
3893
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3894
    alldone = alldone and (not stats.is_in_resync)
3895
    if stats.sync_percent is not None:
3896
      min_resync = min(min_resync, stats.sync_percent)
3897

    
3898
  return (alldone, min_resync)
3899

    
3900

    
3901
def GetDrbdUsermodeHelper():
3902
  """Returns DRBD usermode helper currently configured.
3903

3904
  """
3905
  try:
3906
    return drbd.DRBD8.GetUsermodeHelper()
3907
  except errors.BlockDeviceError, err:
3908
    _Fail(str(err))
3909

    
3910

    
3911
def PowercycleNode(hypervisor_type):
3912
  """Hard-powercycle the node.
3913

3914
  Because we need to return first, and schedule the powercycle in the
3915
  background, we won't be able to report failures nicely.
3916

3917
  """
3918
  hyper = hypervisor.GetHypervisor(hypervisor_type)
3919
  try:
3920
    pid = os.fork()
3921
  except OSError:
3922
    # if we can't fork, we'll pretend that we're in the child process
3923
    pid = 0
3924
  if pid > 0:
3925
    return "Reboot scheduled in 5 seconds"
3926
  # ensure the child is running on ram
3927
  try:
3928
    utils.Mlockall()
3929
  except Exception: # pylint: disable=W0703
3930
    pass
3931
  time.sleep(5)
3932
  hyper.PowercycleNode()
3933

    
3934

    
3935
def _VerifyRestrictedCmdName(cmd):
3936
  """Verifies a restricted command name.
3937

3938
  @type cmd: string
3939
  @param cmd: Command name
3940
  @rtype: tuple; (boolean, string or None)
3941
  @return: The tuple's first element is the status; if C{False}, the second
3942
    element is an error message string, otherwise it's C{None}
3943

3944
  """
3945
  if not cmd.strip():
3946
    return (False, "Missing command name")
3947

    
3948
  if os.path.basename(cmd) != cmd:
3949
    return (False, "Invalid command name")
3950

    
3951
  if not constants.EXT_PLUGIN_MASK.match(cmd):
3952
    return (False, "Command name contains forbidden characters")
3953

    
3954
  return (True, None)
3955

    
3956

    
3957
def _CommonRestrictedCmdCheck(path, owner):
3958
  """Common checks for restricted command file system directories and files.
3959

3960
  @type path: string
3961
  @param path: Path to check
3962
  @param owner: C{None} or tuple containing UID and GID
3963
  @rtype: tuple; (boolean, string or C{os.stat} result)
3964
  @return: The tuple's first element is the status; if C{False}, the second
3965
    element is an error message string, otherwise it's the result of C{os.stat}
3966

3967
  """
3968
  if owner is None:
3969
    # Default to root as owner
3970
    owner = (0, 0)
3971

    
3972
  try:
3973
    st = os.stat(path)
3974
  except EnvironmentError, err:
3975
    return (False, "Can't stat(2) '%s': %s" % (path, err))
3976

    
3977
  if stat.S_IMODE(st.st_mode) & (~_RCMD_MAX_MODE):
3978
    return (False, "Permissions on '%s' are too permissive" % path)
3979

    
3980
  if (st.st_uid, st.st_gid) != owner:
3981
    (owner_uid, owner_gid) = owner
3982
    return (False, "'%s' is not owned by %s:%s" % (path, owner_uid, owner_gid))
3983

    
3984
  return (True, st)
3985

    
3986

    
3987
def _VerifyRestrictedCmdDirectory(path, _owner=None):
3988
  """Verifies restricted command directory.
3989

3990
  @type path: string
3991
  @param path: Path to check
3992
  @rtype: tuple; (boolean, string or None)
3993
  @return: The tuple's first element is the status; if C{False}, the second
3994
    element is an error message string, otherwise it's C{None}
3995

3996
  """
3997
  (status, value) = _CommonRestrictedCmdCheck(path, _owner)
3998

    
3999
  if not status:
4000
    return (False, value)
4001

    
4002
  if not stat.S_ISDIR(value.st_mode):
4003
    return (False, "Path '%s' is not a directory" % path)
4004

    
4005
  return (True, None)
4006

    
4007

    
4008
def _VerifyRestrictedCmd(path, cmd, _owner=None):
4009
  """Verifies a whole restricted command and returns its executable filename.
4010

4011
  @type path: string
4012
  @param path: Directory containing restricted commands
4013
  @type cmd: string
4014
  @param cmd: Command name
4015
  @rtype: tuple; (boolean, string)
4016
  @return: The tuple's first element is the status; if C{False}, the second
4017
    element is an error message string, otherwise the second element is the
4018
    absolute path to the executable
4019

4020
  """
4021
  executable = utils.PathJoin(path, cmd)
4022

    
4023
  (status, msg) = _CommonRestrictedCmdCheck(executable, _owner)
4024

    
4025
  if not status:
4026
    return (False, msg)
4027

    
4028
  if not utils.IsExecutable(executable):
4029
    return (False, "access(2) thinks '%s' can't be executed" % executable)
4030

    
4031
  return (True, executable)
4032

    
4033

    
4034
def _PrepareRestrictedCmd(path, cmd,
4035
                          _verify_dir=_VerifyRestrictedCmdDirectory,
4036
                          _verify_name=_VerifyRestrictedCmdName,
4037
                          _verify_cmd=_VerifyRestrictedCmd):
4038
  """Performs a number of tests on a restricted command.
4039

4040
  @type path: string
4041
  @param path: Directory containing restricted commands
4042
  @type cmd: string
4043
  @param cmd: Command name
4044
  @return: Same as L{_VerifyRestrictedCmd}
4045

4046
  """
4047
  # Verify the directory first
4048
  (status, msg) = _verify_dir(path)
4049
  if status:
4050
    # Check command if everything was alright
4051
    (status, msg) = _verify_name(cmd)
4052

    
4053
  if not status:
4054
    return (False, msg)
4055

    
4056
  # Check actual executable
4057
  return _verify_cmd(path, cmd)
4058

    
4059

    
4060
def RunRestrictedCmd(cmd,
4061
                     _lock_timeout=_RCMD_LOCK_TIMEOUT,
4062
                     _lock_file=pathutils.RESTRICTED_COMMANDS_LOCK_FILE,
4063
                     _path=pathutils.RESTRICTED_COMMANDS_DIR,
4064
                     _sleep_fn=time.sleep,
4065
                     _prepare_fn=_PrepareRestrictedCmd,
4066
                     _runcmd_fn=utils.RunCmd,
4067
                     _enabled=constants.ENABLE_RESTRICTED_COMMANDS):
4068
  """Executes a restricted command after performing strict tests.
4069

4070
  @type cmd: string
4071
  @param cmd: Command name
4072
  @rtype: string
4073
  @return: Command output
4074
  @raise RPCFail: In case of an error
4075

4076
  """
4077
  logging.info("Preparing to run restricted command '%s'", cmd)
4078

    
4079
  if not _enabled:
4080
    _Fail("Restricted commands disabled at configure time")
4081

    
4082
  lock = None
4083
  try:
4084
    cmdresult = None
4085
    try:
4086
      lock = utils.FileLock.Open(_lock_file)
4087
      lock.Exclusive(blocking=True, timeout=_lock_timeout)
4088

    
4089
      (status, value) = _prepare_fn(_path, cmd)
4090

    
4091
      if status:
4092
        cmdresult = _runcmd_fn([value], env={}, reset_env=True,
4093
                               postfork_fn=lambda _: lock.Unlock())
4094
      else:
4095
        logging.error(value)
4096
    except Exception: # pylint: disable=W0703
4097
      # Keep original error in log
4098
      logging.exception("Caught exception")
4099

    
4100
    if cmdresult is None:
4101
      logging.info("Sleeping for %0.1f seconds before returning",
4102
                   _RCMD_INVALID_DELAY)
4103
      _sleep_fn(_RCMD_INVALID_DELAY)
4104

    
4105
      # Do not include original error message in returned error
4106
      _Fail("Executing command '%s' failed" % cmd)
4107
    elif cmdresult.failed or cmdresult.fail_reason:
4108
      _Fail("Restricted command '%s' failed: %s; output: %s",
4109
            cmd, cmdresult.fail_reason, cmdresult.output)
4110
    else:
4111
      return cmdresult.output
4112
  finally:
4113
    if lock is not None:
4114
      # Release lock at last
4115
      lock.Close()
4116
      lock = None
4117

    
4118

    
4119
def SetWatcherPause(until, _filename=pathutils.WATCHER_PAUSEFILE):
4120
  """Creates or removes the watcher pause file.
4121

4122
  @type until: None or number
4123
  @param until: Unix timestamp saying until when the watcher shouldn't run
4124

4125
  """
4126
  if until is None:
4127
    logging.info("Received request to no longer pause watcher")
4128
    utils.RemoveFile(_filename)
4129
  else:
4130
    logging.info("Received request to pause watcher until %s", until)
4131

    
4132
    if not ht.TNumber(until):
4133
      _Fail("Duration must be numeric")
4134

    
4135
    utils.WriteFile(_filename, data="%d\n" % (until, ), mode=0644)
4136

    
4137

    
4138
class HooksRunner(object):
4139
  """Hook runner.
4140

4141
  This class is instantiated on the node side (ganeti-noded) and not
4142
  on the master side.
4143

4144
  """
4145
  def __init__(self, hooks_base_dir=None):
4146
    """Constructor for hooks runner.
4147

4148
    @type hooks_base_dir: str or None
4149
    @param hooks_base_dir: if not None, this overrides the
4150
        L{pathutils.HOOKS_BASE_DIR} (useful for unittests)
4151

4152
    """
4153
    if hooks_base_dir is None:
4154
      hooks_base_dir = pathutils.HOOKS_BASE_DIR
4155
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
4156
    # constant
4157
    self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
4158

    
4159
  def RunLocalHooks(self, node_list, hpath, phase, env):
4160
    """Check that the hooks will be run only locally and then run them.
4161

4162
    """
4163
    assert len(node_list) == 1
4164
    node = node_list[0]
4165
    _, myself = ssconf.GetMasterAndMyself()
4166
    assert node == myself
4167

    
4168
    results = self.RunHooks(hpath, phase, env)
4169

    
4170
    # Return values in the form expected by HooksMaster
4171
    return {node: (None, False, results)}
4172

    
4173
  def RunHooks(self, hpath, phase, env):
4174
    """Run the scripts in the hooks directory.
4175

4176
    @type hpath: str
4177
    @param hpath: the path to the hooks directory which
4178
        holds the scripts
4179
    @type phase: str
4180
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
4181
        L{constants.HOOKS_PHASE_POST}
4182
    @type env: dict
4183
    @param env: dictionary with the environment for the hook
4184
    @rtype: list
4185
    @return: list of 3-element tuples:
4186
      - script path
4187
      - script result, either L{constants.HKR_SUCCESS} or
4188
        L{constants.HKR_FAIL}
4189
      - output of the script
4190

4191
    @raise errors.ProgrammerError: for invalid input
4192
        parameters
4193

4194
    """
4195
    if phase == constants.HOOKS_PHASE_PRE:
4196
      suffix = "pre"
4197
    elif phase == constants.HOOKS_PHASE_POST:
4198
      suffix = "post"
4199
    else:
4200
      _Fail("Unknown hooks phase '%s'", phase)
4201

    
4202
    subdir = "%s-%s.d" % (hpath, suffix)
4203
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
4204

    
4205
    results = []
4206

    
4207
    if not os.path.isdir(dir_name):
4208
      # for non-existing/non-dirs, we simply exit instead of logging a
4209
      # warning at every operation
4210
      return results
4211

    
4212
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
4213

    
4214
    for (relname, relstatus, runresult) in runparts_results:
4215
      if relstatus == constants.RUNPARTS_SKIP:
4216
        rrval = constants.HKR_SKIP
4217
        output = ""
4218
      elif relstatus == constants.RUNPARTS_ERR:
4219
        rrval = constants.HKR_FAIL
4220
        output = "Hook script execution error: %s" % runresult
4221
      elif relstatus == constants.RUNPARTS_RUN:
4222
        if runresult.failed:
4223
          rrval = constants.HKR_FAIL
4224
        else:
4225
          rrval = constants.HKR_SUCCESS
4226
        output = utils.SafeEncode(runresult.output.strip())
4227
      results.append(("%s/%s" % (subdir, relname), rrval, output))
4228

    
4229
    return results
4230

    
4231

    
4232
class IAllocatorRunner(object):
4233
  """IAllocator runner.
4234

4235
  This class is instantiated on the node side (ganeti-noded) and not on
4236
  the master side.
4237

4238
  """
4239
  @staticmethod
4240
  def Run(name, idata):
4241
    """Run an iallocator script.
4242

4243
    @type name: str
4244
    @param name: the iallocator script name
4245
    @type idata: str
4246
    @param idata: the allocator input data
4247

4248
    @rtype: tuple
4249
    @return: two element tuple of:
4250
       - status
4251
       - either error message or stdout of allocator (for success)
4252

4253
    """
4254
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
4255
                                  os.path.isfile)
4256
    if alloc_script is None:
4257
      _Fail("iallocator module '%s' not found in the search path", name)
4258

    
4259
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
4260
    try:
4261
      os.write(fd, idata)
4262
      os.close(fd)
4263
      result = utils.RunCmd([alloc_script, fin_name])
4264
      if result.failed:
4265
        _Fail("iallocator module '%s' failed: %s, output '%s'",
4266
              name, result.fail_reason, result.output)
4267
    finally:
4268
      os.unlink(fin_name)
4269

    
4270
    return result.stdout
4271

    
4272

    
4273
class DevCacheManager(object):
4274
  """Simple class for managing a cache of block device information.
4275

4276
  """
4277
  _DEV_PREFIX = "/dev/"
4278
  _ROOT_DIR = pathutils.BDEV_CACHE_DIR
4279

    
4280
  @classmethod
4281
  def _ConvertPath(cls, dev_path):
4282
    """Converts a /dev/name path to the cache file name.
4283

4284
    This replaces slashes with underscores and strips the /dev
4285
    prefix. It then returns the full path to the cache file.
4286

4287
    @type dev_path: str
4288
    @param dev_path: the C{/dev/} path name
4289
    @rtype: str
4290
    @return: the converted path name
4291

4292
    """
4293
    if dev_path.startswith(cls._DEV_PREFIX):
4294
      dev_path = dev_path[len(cls._DEV_PREFIX):]
4295
    dev_path = dev_path.replace("/", "_")
4296
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
4297
    return fpath
4298

    
4299
  @classmethod
4300
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
4301
    """Updates the cache information for a given device.
4302

4303
    @type dev_path: str
4304
    @param dev_path: the pathname of the device
4305
    @type owner: str
4306
    @param owner: the owner (instance name) of the device
4307
    @type on_primary: bool
4308
    @param on_primary: whether this is the primary
4309
        node nor not
4310
    @type iv_name: str
4311
    @param iv_name: the instance-visible name of the
4312
        device, as in objects.Disk.iv_name
4313

4314
    @rtype: None
4315

4316
    """
4317
    if dev_path is None:
4318
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
4319
      return
4320
    fpath = cls._ConvertPath(dev_path)
4321
    if on_primary:
4322
      state = "primary"
4323
    else:
4324
      state = "secondary"
4325
    if iv_name is None:
4326
      iv_name = "not_visible"
4327
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
4328
    try:
4329
      utils.WriteFile(fpath, data=fdata)
4330
    except EnvironmentError, err:
4331
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
4332

    
4333
  @classmethod
4334
  def RemoveCache(cls, dev_path):
4335
    """Remove data for a dev_path.
4336

4337
    This is just a wrapper over L{utils.io.RemoveFile} with a converted
4338
    path name and logging.
4339

4340
    @type dev_path: str
4341
    @param dev_path: the pathname of the device
4342

4343
    @rtype: None
4344

4345
    """
4346
    if dev_path is None:
4347
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
4348
      return
4349
    fpath = cls._ConvertPath(dev_path)
4350
    try:
4351
      utils.RemoveFile(fpath)
4352
    except EnvironmentError, err:
4353
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)