Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 5b0dfcef

History | View | Annotate | Download (132.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable=E1103
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37

    
38
import os
39
import os.path
40
import shutil
41
import time
42
import stat
43
import errno
44
import re
45
import random
46
import logging
47
import tempfile
48
import zlib
49
import base64
50
import signal
51

    
52
from ganeti import errors
53
from ganeti import utils
54
from ganeti import ssh
55
from ganeti import hypervisor
56
from ganeti import constants
57
from ganeti.storage import bdev
58
from ganeti.storage import drbd
59
from ganeti import objects
60
from ganeti import ssconf
61
from ganeti import serializer
62
from ganeti import netutils
63
from ganeti import runtime
64
from ganeti import compat
65
from ganeti import pathutils
66
from ganeti import vcluster
67
from ganeti import ht
68
from ganeti.storage.base import BlockDev
69
from ganeti.storage.drbd import DRBD8
70
from ganeti import hooksmaster
71

    
72

    
73
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
74
_ALLOWED_CLEAN_DIRS = compat.UniqueFrozenset([
75
  pathutils.DATA_DIR,
76
  pathutils.JOB_QUEUE_ARCHIVE_DIR,
77
  pathutils.QUEUE_DIR,
78
  pathutils.CRYPTO_KEYS_DIR,
79
  ])
80
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
81
_X509_KEY_FILE = "key"
82
_X509_CERT_FILE = "cert"
83
_IES_STATUS_FILE = "status"
84
_IES_PID_FILE = "pid"
85
_IES_CA_FILE = "ca"
86

    
87
#: Valid LVS output line regex
88
_LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$")
89

    
90
# Actions for the master setup script
91
_MASTER_START = "start"
92
_MASTER_STOP = "stop"
93

    
94
#: Maximum file permissions for restricted command directory and executables
95
_RCMD_MAX_MODE = (stat.S_IRWXU |
96
                  stat.S_IRGRP | stat.S_IXGRP |
97
                  stat.S_IROTH | stat.S_IXOTH)
98

    
99
#: Delay before returning an error for restricted commands
100
_RCMD_INVALID_DELAY = 10
101

    
102
#: How long to wait to acquire lock for restricted commands (shorter than
103
#: L{_RCMD_INVALID_DELAY}) to reduce blockage of noded forks when many
104
#: command requests arrive
105
_RCMD_LOCK_TIMEOUT = _RCMD_INVALID_DELAY * 0.8
106

    
107

    
108
class RPCFail(Exception):
109
  """Class denoting RPC failure.
110

111
  Its argument is the error message.
112

113
  """
114

    
115

    
116
def _GetInstReasonFilename(instance_name):
117
  """Path of the file containing the reason of the instance status change.
118

119
  @type instance_name: string
120
  @param instance_name: The name of the instance
121
  @rtype: string
122
  @return: The path of the file
123

124
  """
125
  return utils.PathJoin(pathutils.INSTANCE_REASON_DIR, instance_name)
126

    
127

    
128
def _StoreInstReasonTrail(instance_name, trail):
129
  """Serialize a reason trail related to an instance change of state to file.
130

131
  The exact location of the file depends on the name of the instance and on
132
  the configuration of the Ganeti cluster defined at deploy time.
133

134
  @type instance_name: string
135
  @param instance_name: The name of the instance
136
  @rtype: None
137

138
  """
139
  json = serializer.DumpJson(trail)
140
  filename = _GetInstReasonFilename(instance_name)
141
  utils.WriteFile(filename, data=json)
142

    
143

    
144
def _Fail(msg, *args, **kwargs):
145
  """Log an error and the raise an RPCFail exception.
146

147
  This exception is then handled specially in the ganeti daemon and
148
  turned into a 'failed' return type. As such, this function is a
149
  useful shortcut for logging the error and returning it to the master
150
  daemon.
151

152
  @type msg: string
153
  @param msg: the text of the exception
154
  @raise RPCFail
155

156
  """
157
  if args:
158
    msg = msg % args
159
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
160
    if "exc" in kwargs and kwargs["exc"]:
161
      logging.exception(msg)
162
    else:
163
      logging.error(msg)
164
  raise RPCFail(msg)
165

    
166

    
167
def _GetConfig():
168
  """Simple wrapper to return a SimpleStore.
169

170
  @rtype: L{ssconf.SimpleStore}
171
  @return: a SimpleStore instance
172

173
  """
174
  return ssconf.SimpleStore()
175

    
176

    
177
def _GetSshRunner(cluster_name):
178
  """Simple wrapper to return an SshRunner.
179

180
  @type cluster_name: str
181
  @param cluster_name: the cluster name, which is needed
182
      by the SshRunner constructor
183
  @rtype: L{ssh.SshRunner}
184
  @return: an SshRunner instance
185

186
  """
187
  return ssh.SshRunner(cluster_name)
188

    
189

    
190
def _Decompress(data):
191
  """Unpacks data compressed by the RPC client.
192

193
  @type data: list or tuple
194
  @param data: Data sent by RPC client
195
  @rtype: str
196
  @return: Decompressed data
197

198
  """
199
  assert isinstance(data, (list, tuple))
200
  assert len(data) == 2
201
  (encoding, content) = data
202
  if encoding == constants.RPC_ENCODING_NONE:
203
    return content
204
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
205
    return zlib.decompress(base64.b64decode(content))
206
  else:
207
    raise AssertionError("Unknown data encoding")
208

    
209

    
210
def _CleanDirectory(path, exclude=None):
211
  """Removes all regular files in a directory.
212

213
  @type path: str
214
  @param path: the directory to clean
215
  @type exclude: list
216
  @param exclude: list of files to be excluded, defaults
217
      to the empty list
218

219
  """
220
  if path not in _ALLOWED_CLEAN_DIRS:
221
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
222
          path)
223

    
224
  if not os.path.isdir(path):
225
    return
226
  if exclude is None:
227
    exclude = []
228
  else:
229
    # Normalize excluded paths
230
    exclude = [os.path.normpath(i) for i in exclude]
231

    
232
  for rel_name in utils.ListVisibleFiles(path):
233
    full_name = utils.PathJoin(path, rel_name)
234
    if full_name in exclude:
235
      continue
236
    if os.path.isfile(full_name) and not os.path.islink(full_name):
237
      utils.RemoveFile(full_name)
238

    
239

    
240
def _BuildUploadFileList():
241
  """Build the list of allowed upload files.
242

243
  This is abstracted so that it's built only once at module import time.
244

245
  """
246
  allowed_files = set([
247
    pathutils.CLUSTER_CONF_FILE,
248
    pathutils.ETC_HOSTS,
249
    pathutils.SSH_KNOWN_HOSTS_FILE,
250
    pathutils.VNC_PASSWORD_FILE,
251
    pathutils.RAPI_CERT_FILE,
252
    pathutils.SPICE_CERT_FILE,
253
    pathutils.SPICE_CACERT_FILE,
254
    pathutils.RAPI_USERS_FILE,
255
    pathutils.CONFD_HMAC_KEY,
256
    pathutils.CLUSTER_DOMAIN_SECRET_FILE,
257
    ])
258

    
259
  for hv_name in constants.HYPER_TYPES:
260
    hv_class = hypervisor.GetHypervisorClass(hv_name)
261
    allowed_files.update(hv_class.GetAncillaryFiles()[0])
262

    
263
  assert pathutils.FILE_STORAGE_PATHS_FILE not in allowed_files, \
264
    "Allowed file storage paths should never be uploaded via RPC"
265

    
266
  return frozenset(allowed_files)
267

    
268

    
269
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
270

    
271

    
272
def JobQueuePurge():
273
  """Removes job queue files and archived jobs.
274

275
  @rtype: tuple
276
  @return: True, None
277

278
  """
279
  _CleanDirectory(pathutils.QUEUE_DIR, exclude=[pathutils.JOB_QUEUE_LOCK_FILE])
280
  _CleanDirectory(pathutils.JOB_QUEUE_ARCHIVE_DIR)
281

    
282

    
283
def GetMasterInfo():
284
  """Returns master information.
285

286
  This is an utility function to compute master information, either
287
  for consumption here or from the node daemon.
288

289
  @rtype: tuple
290
  @return: master_netdev, master_ip, master_name, primary_ip_family,
291
    master_netmask
292
  @raise RPCFail: in case of errors
293

294
  """
295
  try:
296
    cfg = _GetConfig()
297
    master_netdev = cfg.GetMasterNetdev()
298
    master_ip = cfg.GetMasterIP()
299
    master_netmask = cfg.GetMasterNetmask()
300
    master_node = cfg.GetMasterNode()
301
    primary_ip_family = cfg.GetPrimaryIPFamily()
302
  except errors.ConfigurationError, err:
303
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
304
  return (master_netdev, master_ip, master_node, primary_ip_family,
305
          master_netmask)
306

    
307

    
308
def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
309
  """Decorator that runs hooks before and after the decorated function.
310

311
  @type hook_opcode: string
312
  @param hook_opcode: opcode of the hook
313
  @type hooks_path: string
314
  @param hooks_path: path of the hooks
315
  @type env_builder_fn: function
316
  @param env_builder_fn: function that returns a dictionary containing the
317
    environment variables for the hooks. Will get all the parameters of the
318
    decorated function.
319
  @raise RPCFail: in case of pre-hook failure
320

321
  """
322
  def decorator(fn):
323
    def wrapper(*args, **kwargs):
324
      _, myself = ssconf.GetMasterAndMyself()
325
      nodes = ([myself], [myself])  # these hooks run locally
326

    
327
      env_fn = compat.partial(env_builder_fn, *args, **kwargs)
328

    
329
      cfg = _GetConfig()
330
      hr = HooksRunner()
331
      hm = hooksmaster.HooksMaster(hook_opcode, hooks_path, nodes,
332
                                   hr.RunLocalHooks, None, env_fn,
333
                                   logging.warning, cfg.GetClusterName(),
334
                                   cfg.GetMasterNode())
335
      hm.RunPhase(constants.HOOKS_PHASE_PRE)
336
      result = fn(*args, **kwargs)
337
      hm.RunPhase(constants.HOOKS_PHASE_POST)
338

    
339
      return result
340
    return wrapper
341
  return decorator
342

    
343

    
344
def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
345
  """Builds environment variables for master IP hooks.
346

347
  @type master_params: L{objects.MasterNetworkParameters}
348
  @param master_params: network parameters of the master
349
  @type use_external_mip_script: boolean
350
  @param use_external_mip_script: whether to use an external master IP
351
    address setup script (unused, but necessary per the implementation of the
352
    _RunLocalHooks decorator)
353

354
  """
355
  # pylint: disable=W0613
356
  ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
357
  env = {
358
    "MASTER_NETDEV": master_params.netdev,
359
    "MASTER_IP": master_params.ip,
360
    "MASTER_NETMASK": str(master_params.netmask),
361
    "CLUSTER_IP_VERSION": str(ver),
362
  }
363

    
364
  return env
365

    
366

    
367
def _RunMasterSetupScript(master_params, action, use_external_mip_script):
368
  """Execute the master IP address setup script.
369

370
  @type master_params: L{objects.MasterNetworkParameters}
371
  @param master_params: network parameters of the master
372
  @type action: string
373
  @param action: action to pass to the script. Must be one of
374
    L{backend._MASTER_START} or L{backend._MASTER_STOP}
375
  @type use_external_mip_script: boolean
376
  @param use_external_mip_script: whether to use an external master IP
377
    address setup script
378
  @raise backend.RPCFail: if there are errors during the execution of the
379
    script
380

381
  """
382
  env = _BuildMasterIpEnv(master_params)
383

    
384
  if use_external_mip_script:
385
    setup_script = pathutils.EXTERNAL_MASTER_SETUP_SCRIPT
386
  else:
387
    setup_script = pathutils.DEFAULT_MASTER_SETUP_SCRIPT
388

    
389
  result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
390

    
391
  if result.failed:
392
    _Fail("Failed to %s the master IP. Script return value: %s, output: '%s'" %
393
          (action, result.exit_code, result.output), log=True)
394

    
395

    
396
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
397
               _BuildMasterIpEnv)
398
def ActivateMasterIp(master_params, use_external_mip_script):
399
  """Activate the IP address of the master daemon.
400

401
  @type master_params: L{objects.MasterNetworkParameters}
402
  @param master_params: network parameters of the master
403
  @type use_external_mip_script: boolean
404
  @param use_external_mip_script: whether to use an external master IP
405
    address setup script
406
  @raise RPCFail: in case of errors during the IP startup
407

408
  """
409
  _RunMasterSetupScript(master_params, _MASTER_START,
410
                        use_external_mip_script)
411

    
412

    
413
def StartMasterDaemons(no_voting):
414
  """Activate local node as master node.
415

416
  The function will start the master daemons (ganeti-masterd and ganeti-rapi).
417

418
  @type no_voting: boolean
419
  @param no_voting: whether to start ganeti-masterd without a node vote
420
      but still non-interactively
421
  @rtype: None
422

423
  """
424

    
425
  if no_voting:
426
    masterd_args = "--no-voting --yes-do-it"
427
  else:
428
    masterd_args = ""
429

    
430
  env = {
431
    "EXTRA_MASTERD_ARGS": masterd_args,
432
    }
433

    
434
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
435
  if result.failed:
436
    msg = "Can't start Ganeti master: %s" % result.output
437
    logging.error(msg)
438
    _Fail(msg)
439

    
440

    
441
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
442
               _BuildMasterIpEnv)
443
def DeactivateMasterIp(master_params, use_external_mip_script):
444
  """Deactivate the master IP on this node.
445

446
  @type master_params: L{objects.MasterNetworkParameters}
447
  @param master_params: network parameters of the master
448
  @type use_external_mip_script: boolean
449
  @param use_external_mip_script: whether to use an external master IP
450
    address setup script
451
  @raise RPCFail: in case of errors during the IP turndown
452

453
  """
454
  _RunMasterSetupScript(master_params, _MASTER_STOP,
455
                        use_external_mip_script)
456

    
457

    
458
def StopMasterDaemons():
459
  """Stop the master daemons on this node.
460

461
  Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
462

463
  @rtype: None
464

465
  """
466
  # TODO: log and report back to the caller the error failures; we
467
  # need to decide in which case we fail the RPC for this
468

    
469
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
470
  if result.failed:
471
    logging.error("Could not stop Ganeti master, command %s had exitcode %s"
472
                  " and error %s",
473
                  result.cmd, result.exit_code, result.output)
474

    
475

    
476
def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
477
  """Change the netmask of the master IP.
478

479
  @param old_netmask: the old value of the netmask
480
  @param netmask: the new value of the netmask
481
  @param master_ip: the master IP
482
  @param master_netdev: the master network device
483

484
  """
485
  if old_netmask == netmask:
486
    return
487

    
488
  if not netutils.IPAddress.Own(master_ip):
489
    _Fail("The master IP address is not up, not attempting to change its"
490
          " netmask")
491

    
492
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
493
                         "%s/%s" % (master_ip, netmask),
494
                         "dev", master_netdev, "label",
495
                         "%s:0" % master_netdev])
496
  if result.failed:
497
    _Fail("Could not set the new netmask on the master IP address")
498

    
499
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
500
                         "%s/%s" % (master_ip, old_netmask),
501
                         "dev", master_netdev, "label",
502
                         "%s:0" % master_netdev])
503
  if result.failed:
504
    _Fail("Could not bring down the master IP address with the old netmask")
505

    
506

    
507
def EtcHostsModify(mode, host, ip):
508
  """Modify a host entry in /etc/hosts.
509

510
  @param mode: The mode to operate. Either add or remove entry
511
  @param host: The host to operate on
512
  @param ip: The ip associated with the entry
513

514
  """
515
  if mode == constants.ETC_HOSTS_ADD:
516
    if not ip:
517
      RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
518
              " present")
519
    utils.AddHostToEtcHosts(host, ip)
520
  elif mode == constants.ETC_HOSTS_REMOVE:
521
    if ip:
522
      RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
523
              " parameter is present")
524
    utils.RemoveHostFromEtcHosts(host)
525
  else:
526
    RPCFail("Mode not supported")
527

    
528

    
529
def LeaveCluster(modify_ssh_setup):
530
  """Cleans up and remove the current node.
531

532
  This function cleans up and prepares the current node to be removed
533
  from the cluster.
534

535
  If processing is successful, then it raises an
536
  L{errors.QuitGanetiException} which is used as a special case to
537
  shutdown the node daemon.
538

539
  @param modify_ssh_setup: boolean
540

541
  """
542
  _CleanDirectory(pathutils.DATA_DIR)
543
  _CleanDirectory(pathutils.CRYPTO_KEYS_DIR)
544
  JobQueuePurge()
545

    
546
  if modify_ssh_setup:
547
    try:
548
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
549

    
550
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
551

    
552
      utils.RemoveFile(priv_key)
553
      utils.RemoveFile(pub_key)
554
    except errors.OpExecError:
555
      logging.exception("Error while processing ssh files")
556

    
557
  try:
558
    utils.RemoveFile(pathutils.CONFD_HMAC_KEY)
559
    utils.RemoveFile(pathutils.RAPI_CERT_FILE)
560
    utils.RemoveFile(pathutils.SPICE_CERT_FILE)
561
    utils.RemoveFile(pathutils.SPICE_CACERT_FILE)
562
    utils.RemoveFile(pathutils.NODED_CERT_FILE)
563
  except: # pylint: disable=W0702
564
    logging.exception("Error while removing cluster secrets")
565

    
566
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.CONFD])
567
  if result.failed:
568
    logging.error("Command %s failed with exitcode %s and error %s",
569
                  result.cmd, result.exit_code, result.output)
570

    
571
  # Raise a custom exception (handled in ganeti-noded)
572
  raise errors.QuitGanetiException(True, "Shutdown scheduled")
573

    
574

    
575
def _GetVgInfo(name, excl_stor):
576
  """Retrieves information about a LVM volume group.
577

578
  """
579
  # TODO: GetVGInfo supports returning information for multiple VGs at once
580
  vginfo = bdev.LogicalVolume.GetVGInfo([name], excl_stor)
581
  if vginfo:
582
    vg_free = int(round(vginfo[0][0], 0))
583
    vg_size = int(round(vginfo[0][1], 0))
584
  else:
585
    vg_free = None
586
    vg_size = None
587

    
588
  return {
589
    "name": name,
590
    "vg_free": vg_free,
591
    "vg_size": vg_size,
592
    }
593

    
594

    
595
def _GetVgSpindlesInfo(name, excl_stor):
596
  """Retrieves information about spindles in an LVM volume group.
597

598
  @type name: string
599
  @param name: VG name
600
  @type excl_stor: bool
601
  @param excl_stor: exclusive storage
602
  @rtype: dict
603
  @return: dictionary whose keys are "name", "vg_free", "vg_size" for VG name,
604
      free spindles, total spindles respectively
605

606
  """
607
  if excl_stor:
608
    (vg_free, vg_size) = bdev.LogicalVolume.GetVgSpindlesInfo(name)
609
  else:
610
    vg_free = 0
611
    vg_size = 0
612
  return {
613
    "name": name,
614
    "vg_free": vg_free,
615
    "vg_size": vg_size,
616
    }
617

    
618

    
619
def _GetHvInfo(name):
620
  """Retrieves node information from a hypervisor.
621

622
  The information returned depends on the hypervisor. Common items:
623

624
    - vg_size is the size of the configured volume group in MiB
625
    - vg_free is the free size of the volume group in MiB
626
    - memory_dom0 is the memory allocated for domain0 in MiB
627
    - memory_free is the currently available (free) ram in MiB
628
    - memory_total is the total number of ram in MiB
629
    - hv_version: the hypervisor version, if available
630

631
  """
632
  return hypervisor.GetHypervisor(name).GetNodeInfo()
633

    
634

    
635
def _GetNamedNodeInfo(names, fn):
636
  """Calls C{fn} for all names in C{names} and returns a dictionary.
637

638
  @rtype: None or dict
639

640
  """
641
  if names is None:
642
    return None
643
  else:
644
    return map(fn, names)
645

    
646

    
647
def GetNodeInfo(storage_units, hv_names, excl_stor):
648
  """Gives back a hash with different information about the node.
649

650
  @type storage_units: list of pairs (string, string)
651
  @param storage_units: List of pairs (storage unit, identifier) to ask for disk
652
                        space information. In case of lvm-vg, the identifier is
653
                        the VG name.
654
  @type hv_names: list of string
655
  @param hv_names: Names of the hypervisors to ask for node information
656
  @type excl_stor: boolean
657
  @param excl_stor: Whether exclusive_storage is active
658
  @rtype: tuple; (string, None/dict, None/dict)
659
  @return: Tuple containing boot ID, volume group information and hypervisor
660
    information
661

662
  """
663
  bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
664
  storage_info = _GetNamedNodeInfo(
665
    storage_units,
666
    (lambda storage_unit: _ApplyStorageInfoFunction(storage_unit[0],
667
                                                    storage_unit[1],
668
                                                    excl_stor)))
669
  hv_info = _GetNamedNodeInfo(hv_names, _GetHvInfo)
670

    
671
  return (bootid, storage_info, hv_info)
672

    
673

    
674
# FIXME: implement storage reporting for all missing storage types.
675
_STORAGE_TYPE_INFO_FN = {
676
  constants.ST_BLOCK: None,
677
  constants.ST_DISKLESS: None,
678
  constants.ST_EXT: None,
679
  constants.ST_FILE: None,
680
  constants.ST_LVM_PV: _GetVgSpindlesInfo,
681
  constants.ST_LVM_VG: _GetVgInfo,
682
  constants.ST_RADOS: None,
683
}
684

    
685

    
686
def _ApplyStorageInfoFunction(storage_type, storage_key, *args):
687
  """Looks up and applies the correct function to calculate free and total
688
  storage for the given storage type.
689

690
  @type storage_type: string
691
  @param storage_type: the storage type for which the storage shall be reported.
692
  @type storage_key: string
693
  @param storage_key: identifier of a storage unit, e.g. the volume group name
694
    of an LVM storage unit
695
  @type args: any
696
  @param args: various parameters that can be used for storage reporting. These
697
    parameters and their semantics vary from storage type to storage type and
698
    are just propagated in this function.
699
  @return: the results of the application of the storage space function (see
700
    _STORAGE_TYPE_INFO_FN) if storage space reporting is implemented for that
701
    storage type
702
  @raises NotImplementedError: for storage types who don't support space
703
    reporting yet
704
  """
705
  fn = _STORAGE_TYPE_INFO_FN[storage_type]
706
  if fn is not None:
707
    return fn(storage_key, *args)
708
  else:
709
    raise NotImplementedError
710

    
711

    
712
def _CheckExclusivePvs(pvi_list):
713
  """Check that PVs are not shared among LVs
714

715
  @type pvi_list: list of L{objects.LvmPvInfo} objects
716
  @param pvi_list: information about the PVs
717

718
  @rtype: list of tuples (string, list of strings)
719
  @return: offending volumes, as tuples: (pv_name, [lv1_name, lv2_name...])
720

721
  """
722
  res = []
723
  for pvi in pvi_list:
724
    if len(pvi.lv_list) > 1:
725
      res.append((pvi.name, pvi.lv_list))
726
  return res
727

    
728

    
729
def _VerifyHypervisors(what, vm_capable, result, all_hvparams,
730
                       get_hv_fn=hypervisor.GetHypervisor):
731
  """Verifies the hypervisor. Appends the results to the 'results' list.
732

733
  @type what: C{dict}
734
  @param what: a dictionary of things to check
735
  @type vm_capable: boolean
736
  @param vm_capable: whether or not this node is vm capable
737
  @type result: dict
738
  @param result: dictionary of verification results; results of the
739
    verifications in this function will be added here
740
  @type all_hvparams: dict of dict of string
741
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
742
  @type get_hv_fn: function
743
  @param get_hv_fn: function to retrieve the hypervisor, to improve testability
744

745
  """
746
  if not vm_capable:
747
    return
748

    
749
  if constants.NV_HYPERVISOR in what:
750
    result[constants.NV_HYPERVISOR] = {}
751
    for hv_name in what[constants.NV_HYPERVISOR]:
752
      hvparams = all_hvparams[hv_name]
753
      try:
754
        val = get_hv_fn(hv_name).Verify(hvparams=hvparams)
755
      except errors.HypervisorError, err:
756
        val = "Error while checking hypervisor: %s" % str(err)
757
      result[constants.NV_HYPERVISOR][hv_name] = val
758

    
759

    
760
def _VerifyHvparams(what, vm_capable, result,
761
                    get_hv_fn=hypervisor.GetHypervisor):
762
  """Verifies the hvparams. Appends the results to the 'results' list.
763

764
  @type what: C{dict}
765
  @param what: a dictionary of things to check
766
  @type vm_capable: boolean
767
  @param vm_capable: whether or not this node is vm capable
768
  @type result: dict
769
  @param result: dictionary of verification results; results of the
770
    verifications in this function will be added here
771
  @type get_hv_fn: function
772
  @param get_hv_fn: function to retrieve the hypervisor, to improve testability
773

774
  """
775
  if not vm_capable:
776
    return
777

    
778
  if constants.NV_HVPARAMS in what:
779
    result[constants.NV_HVPARAMS] = []
780
    for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
781
      try:
782
        logging.info("Validating hv %s, %s", hv_name, hvparms)
783
        get_hv_fn(hv_name).ValidateParameters(hvparms)
784
      except errors.HypervisorError, err:
785
        result[constants.NV_HVPARAMS].append((source, hv_name, str(err)))
786

    
787

    
788
def _VerifyInstanceList(what, vm_capable, result, all_hvparams):
789
  """Verifies the instance list.
790

791
  @type what: C{dict}
792
  @param what: a dictionary of things to check
793
  @type vm_capable: boolean
794
  @param vm_capable: whether or not this node is vm capable
795
  @type result: dict
796
  @param result: dictionary of verification results; results of the
797
    verifications in this function will be added here
798
  @type all_hvparams: dict of dict of string
799
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
800

801
  """
802
  if constants.NV_INSTANCELIST in what and vm_capable:
803
    # GetInstanceList can fail
804
    try:
805
      val = GetInstanceList(what[constants.NV_INSTANCELIST],
806
                            all_hvparams=all_hvparams)
807
    except RPCFail, err:
808
      val = str(err)
809
    result[constants.NV_INSTANCELIST] = val
810

    
811

    
812
def _VerifyNodeInfo(what, vm_capable, result, all_hvparams):
813
  """Verifies the node info.
814

815
  @type what: C{dict}
816
  @param what: a dictionary of things to check
817
  @type vm_capable: boolean
818
  @param vm_capable: whether or not this node is vm capable
819
  @type result: dict
820
  @param result: dictionary of verification results; results of the
821
    verifications in this function will be added here
822
  @type all_hvparams: dict of dict of string
823
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
824

825
  """
826
  if constants.NV_HVINFO in what and vm_capable:
827
    hvname = what[constants.NV_HVINFO]
828
    hyper = hypervisor.GetHypervisor(hvname)
829
    hvparams = all_hvparams[hvname]
830
    result[constants.NV_HVINFO] = hyper.GetNodeInfo(hvparams=hvparams)
831

    
832

    
833
def VerifyNode(what, cluster_name, all_hvparams):
834
  """Verify the status of the local node.
835

836
  Based on the input L{what} parameter, various checks are done on the
837
  local node.
838

839
  If the I{filelist} key is present, this list of
840
  files is checksummed and the file/checksum pairs are returned.
841

842
  If the I{nodelist} key is present, we check that we have
843
  connectivity via ssh with the target nodes (and check the hostname
844
  report).
845

846
  If the I{node-net-test} key is present, we check that we have
847
  connectivity to the given nodes via both primary IP and, if
848
  applicable, secondary IPs.
849

850
  @type what: C{dict}
851
  @param what: a dictionary of things to check:
852
      - filelist: list of files for which to compute checksums
853
      - nodelist: list of nodes we should check ssh communication with
854
      - node-net-test: list of nodes we should check node daemon port
855
        connectivity with
856
      - hypervisor: list with hypervisors to run the verify for
857
  @type cluster_name: string
858
  @param cluster_name: the cluster's name
859
  @type all_hvparams: dict of dict of strings
860
  @param all_hvparams: a dictionary mapping hypervisor names to hvparams
861
  @rtype: dict
862
  @return: a dictionary with the same keys as the input dict, and
863
      values representing the result of the checks
864

865
  """
866
  result = {}
867
  my_name = netutils.Hostname.GetSysName()
868
  port = netutils.GetDaemonPort(constants.NODED)
869
  vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
870

    
871
  _VerifyHypervisors(what, vm_capable, result, all_hvparams)
872
  _VerifyHvparams(what, vm_capable, result)
873

    
874
  if constants.NV_FILELIST in what:
875
    fingerprints = utils.FingerprintFiles(map(vcluster.LocalizeVirtualPath,
876
                                              what[constants.NV_FILELIST]))
877
    result[constants.NV_FILELIST] = \
878
      dict((vcluster.MakeVirtualPath(key), value)
879
           for (key, value) in fingerprints.items())
880

    
881
  if constants.NV_NODELIST in what:
882
    (nodes, bynode) = what[constants.NV_NODELIST]
883

    
884
    # Add nodes from other groups (different for each node)
885
    try:
886
      nodes.extend(bynode[my_name])
887
    except KeyError:
888
      pass
889

    
890
    # Use a random order
891
    random.shuffle(nodes)
892

    
893
    # Try to contact all nodes
894
    val = {}
895
    for node in nodes:
896
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
897
      if not success:
898
        val[node] = message
899

    
900
    result[constants.NV_NODELIST] = val
901

    
902
  if constants.NV_NODENETTEST in what:
903
    result[constants.NV_NODENETTEST] = tmp = {}
904
    my_pip = my_sip = None
905
    for name, pip, sip in what[constants.NV_NODENETTEST]:
906
      if name == my_name:
907
        my_pip = pip
908
        my_sip = sip
909
        break
910
    if not my_pip:
911
      tmp[my_name] = ("Can't find my own primary/secondary IP"
912
                      " in the node list")
913
    else:
914
      for name, pip, sip in what[constants.NV_NODENETTEST]:
915
        fail = []
916
        if not netutils.TcpPing(pip, port, source=my_pip):
917
          fail.append("primary")
918
        if sip != pip:
919
          if not netutils.TcpPing(sip, port, source=my_sip):
920
            fail.append("secondary")
921
        if fail:
922
          tmp[name] = ("failure using the %s interface(s)" %
923
                       " and ".join(fail))
924

    
925
  if constants.NV_MASTERIP in what:
926
    # FIXME: add checks on incoming data structures (here and in the
927
    # rest of the function)
928
    master_name, master_ip = what[constants.NV_MASTERIP]
929
    if master_name == my_name:
930
      source = constants.IP4_ADDRESS_LOCALHOST
931
    else:
932
      source = None
933
    result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
934
                                                     source=source)
935

    
936
  if constants.NV_USERSCRIPTS in what:
937
    result[constants.NV_USERSCRIPTS] = \
938
      [script for script in what[constants.NV_USERSCRIPTS]
939
       if not utils.IsExecutable(script)]
940

    
941
  if constants.NV_OOB_PATHS in what:
942
    result[constants.NV_OOB_PATHS] = tmp = []
943
    for path in what[constants.NV_OOB_PATHS]:
944
      try:
945
        st = os.stat(path)
946
      except OSError, err:
947
        tmp.append("error stating out of band helper: %s" % err)
948
      else:
949
        if stat.S_ISREG(st.st_mode):
950
          if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
951
            tmp.append(None)
952
          else:
953
            tmp.append("out of band helper %s is not executable" % path)
954
        else:
955
          tmp.append("out of band helper %s is not a file" % path)
956

    
957
  if constants.NV_LVLIST in what and vm_capable:
958
    try:
959
      val = GetVolumeList(utils.ListVolumeGroups().keys())
960
    except RPCFail, err:
961
      val = str(err)
962
    result[constants.NV_LVLIST] = val
963

    
964
  _VerifyInstanceList(what, vm_capable, result, all_hvparams)
965

    
966
  if constants.NV_VGLIST in what and vm_capable:
967
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
968

    
969
  if constants.NV_PVLIST in what and vm_capable:
970
    check_exclusive_pvs = constants.NV_EXCLUSIVEPVS in what
971
    val = bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
972
                                       filter_allocatable=False,
973
                                       include_lvs=check_exclusive_pvs)
974
    if check_exclusive_pvs:
975
      result[constants.NV_EXCLUSIVEPVS] = _CheckExclusivePvs(val)
976
      for pvi in val:
977
        # Avoid sending useless data on the wire
978
        pvi.lv_list = []
979
    result[constants.NV_PVLIST] = map(objects.LvmPvInfo.ToDict, val)
980

    
981
  if constants.NV_VERSION in what:
982
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
983
                                    constants.RELEASE_VERSION)
984

    
985
  _VerifyNodeInfo(what, vm_capable, result, all_hvparams)
986

    
987
  if constants.NV_DRBDVERSION in what and vm_capable:
988
    try:
989
      drbd_version = DRBD8.GetProcInfo().GetVersionString()
990
    except errors.BlockDeviceError, err:
991
      logging.warning("Can't get DRBD version", exc_info=True)
992
      drbd_version = str(err)
993
    result[constants.NV_DRBDVERSION] = drbd_version
994

    
995
  if constants.NV_DRBDLIST in what and vm_capable:
996
    try:
997
      used_minors = drbd.DRBD8.GetUsedDevs()
998
    except errors.BlockDeviceError, err:
999
      logging.warning("Can't get used minors list", exc_info=True)
1000
      used_minors = str(err)
1001
    result[constants.NV_DRBDLIST] = used_minors
1002

    
1003
  if constants.NV_DRBDHELPER in what and vm_capable:
1004
    status = True
1005
    try:
1006
      payload = drbd.DRBD8.GetUsermodeHelper()
1007
    except errors.BlockDeviceError, err:
1008
      logging.error("Can't get DRBD usermode helper: %s", str(err))
1009
      status = False
1010
      payload = str(err)
1011
    result[constants.NV_DRBDHELPER] = (status, payload)
1012

    
1013
  if constants.NV_NODESETUP in what:
1014
    result[constants.NV_NODESETUP] = tmpr = []
1015
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
1016
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
1017
                  " under /sys, missing required directories /sys/block"
1018
                  " and /sys/class/net")
1019
    if (not os.path.isdir("/proc/sys") or
1020
        not os.path.isfile("/proc/sysrq-trigger")):
1021
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
1022
                  " under /proc, missing required directory /proc/sys and"
1023
                  " the file /proc/sysrq-trigger")
1024

    
1025
  if constants.NV_TIME in what:
1026
    result[constants.NV_TIME] = utils.SplitTime(time.time())
1027

    
1028
  if constants.NV_OSLIST in what and vm_capable:
1029
    result[constants.NV_OSLIST] = DiagnoseOS()
1030

    
1031
  if constants.NV_BRIDGES in what and vm_capable:
1032
    result[constants.NV_BRIDGES] = [bridge
1033
                                    for bridge in what[constants.NV_BRIDGES]
1034
                                    if not utils.BridgeExists(bridge)]
1035

    
1036
  if what.get(constants.NV_FILE_STORAGE_PATHS) == my_name:
1037
    result[constants.NV_FILE_STORAGE_PATHS] = \
1038
      bdev.ComputeWrongFileStoragePaths()
1039

    
1040
  return result
1041

    
1042

    
1043
def GetBlockDevSizes(devices):
1044
  """Return the size of the given block devices
1045

1046
  @type devices: list
1047
  @param devices: list of block device nodes to query
1048
  @rtype: dict
1049
  @return:
1050
    dictionary of all block devices under /dev (key). The value is their
1051
    size in MiB.
1052

1053
    {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
1054

1055
  """
1056
  DEV_PREFIX = "/dev/"
1057
  blockdevs = {}
1058

    
1059
  for devpath in devices:
1060
    if not utils.IsBelowDir(DEV_PREFIX, devpath):
1061
      continue
1062

    
1063
    try:
1064
      st = os.stat(devpath)
1065
    except EnvironmentError, err:
1066
      logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
1067
      continue
1068

    
1069
    if stat.S_ISBLK(st.st_mode):
1070
      result = utils.RunCmd(["blockdev", "--getsize64", devpath])
1071
      if result.failed:
1072
        # We don't want to fail, just do not list this device as available
1073
        logging.warning("Cannot get size for block device %s", devpath)
1074
        continue
1075

    
1076
      size = int(result.stdout) / (1024 * 1024)
1077
      blockdevs[devpath] = size
1078
  return blockdevs
1079

    
1080

    
1081
def GetVolumeList(vg_names):
1082
  """Compute list of logical volumes and their size.
1083

1084
  @type vg_names: list
1085
  @param vg_names: the volume groups whose LVs we should list, or
1086
      empty for all volume groups
1087
  @rtype: dict
1088
  @return:
1089
      dictionary of all partions (key) with value being a tuple of
1090
      their size (in MiB), inactive and online status::
1091

1092
        {'xenvg/test1': ('20.06', True, True)}
1093

1094
      in case of errors, a string is returned with the error
1095
      details.
1096

1097
  """
1098
  lvs = {}
1099
  sep = "|"
1100
  if not vg_names:
1101
    vg_names = []
1102
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
1103
                         "--separator=%s" % sep,
1104
                         "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
1105
  if result.failed:
1106
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
1107

    
1108
  for line in result.stdout.splitlines():
1109
    line = line.strip()
1110
    match = _LVSLINE_REGEX.match(line)
1111
    if not match:
1112
      logging.error("Invalid line returned from lvs output: '%s'", line)
1113
      continue
1114
    vg_name, name, size, attr = match.groups()
1115
    inactive = attr[4] == "-"
1116
    online = attr[5] == "o"
1117
    virtual = attr[0] == "v"
1118
    if virtual:
1119
      # we don't want to report such volumes as existing, since they
1120
      # don't really hold data
1121
      continue
1122
    lvs[vg_name + "/" + name] = (size, inactive, online)
1123

    
1124
  return lvs
1125

    
1126

    
1127
def ListVolumeGroups():
1128
  """List the volume groups and their size.
1129

1130
  @rtype: dict
1131
  @return: dictionary with keys volume name and values the
1132
      size of the volume
1133

1134
  """
1135
  return utils.ListVolumeGroups()
1136

    
1137

    
1138
def NodeVolumes():
1139
  """List all volumes on this node.
1140

1141
  @rtype: list
1142
  @return:
1143
    A list of dictionaries, each having four keys:
1144
      - name: the logical volume name,
1145
      - size: the size of the logical volume
1146
      - dev: the physical device on which the LV lives
1147
      - vg: the volume group to which it belongs
1148

1149
    In case of errors, we return an empty list and log the
1150
    error.
1151

1152
    Note that since a logical volume can live on multiple physical
1153
    volumes, the resulting list might include a logical volume
1154
    multiple times.
1155

1156
  """
1157
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
1158
                         "--separator=|",
1159
                         "--options=lv_name,lv_size,devices,vg_name"])
1160
  if result.failed:
1161
    _Fail("Failed to list logical volumes, lvs output: %s",
1162
          result.output)
1163

    
1164
  def parse_dev(dev):
1165
    return dev.split("(")[0]
1166

    
1167
  def handle_dev(dev):
1168
    return [parse_dev(x) for x in dev.split(",")]
1169

    
1170
  def map_line(line):
1171
    line = [v.strip() for v in line]
1172
    return [{"name": line[0], "size": line[1],
1173
             "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
1174

    
1175
  all_devs = []
1176
  for line in result.stdout.splitlines():
1177
    if line.count("|") >= 3:
1178
      all_devs.extend(map_line(line.split("|")))
1179
    else:
1180
      logging.warning("Strange line in the output from lvs: '%s'", line)
1181
  return all_devs
1182

    
1183

    
1184
def BridgesExist(bridges_list):
1185
  """Check if a list of bridges exist on the current node.
1186

1187
  @rtype: boolean
1188
  @return: C{True} if all of them exist, C{False} otherwise
1189

1190
  """
1191
  missing = []
1192
  for bridge in bridges_list:
1193
    if not utils.BridgeExists(bridge):
1194
      missing.append(bridge)
1195

    
1196
  if missing:
1197
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
1198

    
1199

    
1200
def GetInstanceListForHypervisor(hname, hvparams=None,
1201
                                 get_hv_fn=hypervisor.GetHypervisor):
1202
  """Provides a list of instances of the given hypervisor.
1203

1204
  @type hname: string
1205
  @param hname: name of the hypervisor
1206
  @type hvparams: dict of strings
1207
  @param hvparams: hypervisor parameters for the given hypervisor
1208
  @type get_hv_fn: function
1209
  @param get_hv_fn: function that returns a hypervisor for the given hypervisor
1210
    name; optional parameter to increase testability
1211

1212
  @rtype: list
1213
  @return: a list of all running instances on the current node
1214
    - instance1.example.com
1215
    - instance2.example.com
1216

1217
  """
1218
  results = []
1219
  try:
1220
    hv = get_hv_fn(hname)
1221
    names = hv.ListInstances(hvparams=hvparams)
1222
    results.extend(names)
1223
  except errors.HypervisorError, err:
1224
    _Fail("Error enumerating instances (hypervisor %s): %s",
1225
          hname, err, exc=True)
1226
  return results
1227

    
1228

    
1229
def GetInstanceList(hypervisor_list, all_hvparams=None,
1230
                    get_hv_fn=hypervisor.GetHypervisor):
1231
  """Provides a list of instances.
1232

1233
  @type hypervisor_list: list
1234
  @param hypervisor_list: the list of hypervisors to query information
1235
  @type all_hvparams: dict of dict of strings
1236
  @param all_hvparams: a dictionary mapping hypervisor types to respective
1237
    cluster-wide hypervisor parameters
1238
  @type get_hv_fn: function
1239
  @param get_hv_fn: function that returns a hypervisor for the given hypervisor
1240
    name; optional parameter to increase testability
1241

1242
  @rtype: list
1243
  @return: a list of all running instances on the current node
1244
    - instance1.example.com
1245
    - instance2.example.com
1246

1247
  """
1248
  results = []
1249
  for hname in hypervisor_list:
1250
    hvparams = all_hvparams[hname]
1251
    results.extend(GetInstanceListForHypervisor(hname, hvparams=hvparams,
1252
                                                get_hv_fn=get_hv_fn))
1253
  return results
1254

    
1255

    
1256
def GetInstanceInfo(instance, hname):
1257
  """Gives back the information about an instance as a dictionary.
1258

1259
  @type instance: string
1260
  @param instance: the instance name
1261
  @type hname: string
1262
  @param hname: the hypervisor type of the instance
1263

1264
  @rtype: dict
1265
  @return: dictionary with the following keys:
1266
      - memory: memory size of instance (int)
1267
      - state: xen state of instance (string)
1268
      - time: cpu time of instance (float)
1269
      - vcpus: the number of vcpus (int)
1270

1271
  """
1272
  output = {}
1273

    
1274
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
1275
  if iinfo is not None:
1276
    output["memory"] = iinfo[2]
1277
    output["vcpus"] = iinfo[3]
1278
    output["state"] = iinfo[4]
1279
    output["time"] = iinfo[5]
1280

    
1281
  return output
1282

    
1283

    
1284
def GetInstanceMigratable(instance):
1285
  """Computes whether an instance can be migrated.
1286

1287
  @type instance: L{objects.Instance}
1288
  @param instance: object representing the instance to be checked.
1289

1290
  @rtype: tuple
1291
  @return: tuple of (result, description) where:
1292
      - result: whether the instance can be migrated or not
1293
      - description: a description of the issue, if relevant
1294

1295
  """
1296
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1297
  iname = instance.name
1298
  if iname not in hyper.ListInstances(instance.hvparams):
1299
    _Fail("Instance %s is not running", iname)
1300

    
1301
  for idx in range(len(instance.disks)):
1302
    link_name = _GetBlockDevSymlinkPath(iname, idx)
1303
    if not os.path.islink(link_name):
1304
      logging.warning("Instance %s is missing symlink %s for disk %d",
1305
                      iname, link_name, idx)
1306

    
1307

    
1308
def GetAllInstancesInfo(hypervisor_list):
1309
  """Gather data about all instances.
1310

1311
  This is the equivalent of L{GetInstanceInfo}, except that it
1312
  computes data for all instances at once, thus being faster if one
1313
  needs data about more than one instance.
1314

1315
  @type hypervisor_list: list
1316
  @param hypervisor_list: list of hypervisors to query for instance data
1317

1318
  @rtype: dict
1319
  @return: dictionary of instance: data, with data having the following keys:
1320
      - memory: memory size of instance (int)
1321
      - state: xen state of instance (string)
1322
      - time: cpu time of instance (float)
1323
      - vcpus: the number of vcpus
1324

1325
  """
1326
  output = {}
1327

    
1328
  for hname in hypervisor_list:
1329
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
1330
    if iinfo:
1331
      for name, _, memory, vcpus, state, times in iinfo:
1332
        value = {
1333
          "memory": memory,
1334
          "vcpus": vcpus,
1335
          "state": state,
1336
          "time": times,
1337
          }
1338
        if name in output:
1339
          # we only check static parameters, like memory and vcpus,
1340
          # and not state and time which can change between the
1341
          # invocations of the different hypervisors
1342
          for key in "memory", "vcpus":
1343
            if value[key] != output[name][key]:
1344
              _Fail("Instance %s is running twice"
1345
                    " with different parameters", name)
1346
        output[name] = value
1347

    
1348
  return output
1349

    
1350

    
1351
def _InstanceLogName(kind, os_name, instance, component):
1352
  """Compute the OS log filename for a given instance and operation.
1353

1354
  The instance name and os name are passed in as strings since not all
1355
  operations have these as part of an instance object.
1356

1357
  @type kind: string
1358
  @param kind: the operation type (e.g. add, import, etc.)
1359
  @type os_name: string
1360
  @param os_name: the os name
1361
  @type instance: string
1362
  @param instance: the name of the instance being imported/added/etc.
1363
  @type component: string or None
1364
  @param component: the name of the component of the instance being
1365
      transferred
1366

1367
  """
1368
  # TODO: Use tempfile.mkstemp to create unique filename
1369
  if component:
1370
    assert "/" not in component
1371
    c_msg = "-%s" % component
1372
  else:
1373
    c_msg = ""
1374
  base = ("%s-%s-%s%s-%s.log" %
1375
          (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1376
  return utils.PathJoin(pathutils.LOG_OS_DIR, base)
1377

    
1378

    
1379
def InstanceOsAdd(instance, reinstall, debug):
1380
  """Add an OS to an instance.
1381

1382
  @type instance: L{objects.Instance}
1383
  @param instance: Instance whose OS is to be installed
1384
  @type reinstall: boolean
1385
  @param reinstall: whether this is an instance reinstall
1386
  @type debug: integer
1387
  @param debug: debug level, passed to the OS scripts
1388
  @rtype: None
1389

1390
  """
1391
  inst_os = OSFromDisk(instance.os)
1392

    
1393
  create_env = OSEnvironment(instance, inst_os, debug)
1394
  if reinstall:
1395
    create_env["INSTANCE_REINSTALL"] = "1"
1396

    
1397
  logfile = _InstanceLogName("add", instance.os, instance.name, None)
1398

    
1399
  result = utils.RunCmd([inst_os.create_script], env=create_env,
1400
                        cwd=inst_os.path, output=logfile, reset_env=True)
1401
  if result.failed:
1402
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
1403
                  " output: %s", result.cmd, result.fail_reason, logfile,
1404
                  result.output)
1405
    lines = [utils.SafeEncode(val)
1406
             for val in utils.TailFile(logfile, lines=20)]
1407
    _Fail("OS create script failed (%s), last lines in the"
1408
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1409

    
1410

    
1411
def RunRenameInstance(instance, old_name, debug):
1412
  """Run the OS rename script for an instance.
1413

1414
  @type instance: L{objects.Instance}
1415
  @param instance: Instance whose OS is to be installed
1416
  @type old_name: string
1417
  @param old_name: previous instance name
1418
  @type debug: integer
1419
  @param debug: debug level, passed to the OS scripts
1420
  @rtype: boolean
1421
  @return: the success of the operation
1422

1423
  """
1424
  inst_os = OSFromDisk(instance.os)
1425

    
1426
  rename_env = OSEnvironment(instance, inst_os, debug)
1427
  rename_env["OLD_INSTANCE_NAME"] = old_name
1428

    
1429
  logfile = _InstanceLogName("rename", instance.os,
1430
                             "%s-%s" % (old_name, instance.name), None)
1431

    
1432
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1433
                        cwd=inst_os.path, output=logfile, reset_env=True)
1434

    
1435
  if result.failed:
1436
    logging.error("os create command '%s' returned error: %s output: %s",
1437
                  result.cmd, result.fail_reason, result.output)
1438
    lines = [utils.SafeEncode(val)
1439
             for val in utils.TailFile(logfile, lines=20)]
1440
    _Fail("OS rename script failed (%s), last lines in the"
1441
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1442

    
1443

    
1444
def _GetBlockDevSymlinkPath(instance_name, idx, _dir=None):
1445
  """Returns symlink path for block device.
1446

1447
  """
1448
  if _dir is None:
1449
    _dir = pathutils.DISK_LINKS_DIR
1450

    
1451
  return utils.PathJoin(_dir,
1452
                        ("%s%s%s" %
1453
                         (instance_name, constants.DISK_SEPARATOR, idx)))
1454

    
1455

    
1456
def _SymlinkBlockDev(instance_name, device_path, idx):
1457
  """Set up symlinks to a instance's block device.
1458

1459
  This is an auxiliary function run when an instance is start (on the primary
1460
  node) or when an instance is migrated (on the target node).
1461

1462

1463
  @param instance_name: the name of the target instance
1464
  @param device_path: path of the physical block device, on the node
1465
  @param idx: the disk index
1466
  @return: absolute path to the disk's symlink
1467

1468
  """
1469
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1470
  try:
1471
    os.symlink(device_path, link_name)
1472
  except OSError, err:
1473
    if err.errno == errno.EEXIST:
1474
      if (not os.path.islink(link_name) or
1475
          os.readlink(link_name) != device_path):
1476
        os.remove(link_name)
1477
        os.symlink(device_path, link_name)
1478
    else:
1479
      raise
1480

    
1481
  return link_name
1482

    
1483

    
1484
def _RemoveBlockDevLinks(instance_name, disks):
1485
  """Remove the block device symlinks belonging to the given instance.
1486

1487
  """
1488
  for idx, _ in enumerate(disks):
1489
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1490
    if os.path.islink(link_name):
1491
      try:
1492
        os.remove(link_name)
1493
      except OSError:
1494
        logging.exception("Can't remove symlink '%s'", link_name)
1495

    
1496

    
1497
def _GatherAndLinkBlockDevs(instance):
1498
  """Set up an instance's block device(s).
1499

1500
  This is run on the primary node at instance startup. The block
1501
  devices must be already assembled.
1502

1503
  @type instance: L{objects.Instance}
1504
  @param instance: the instance whose disks we shoul assemble
1505
  @rtype: list
1506
  @return: list of (disk_object, device_path)
1507

1508
  """
1509
  block_devices = []
1510
  for idx, disk in enumerate(instance.disks):
1511
    device = _RecursiveFindBD(disk)
1512
    if device is None:
1513
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1514
                                    str(disk))
1515
    device.Open()
1516
    try:
1517
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1518
    except OSError, e:
1519
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1520
                                    e.strerror)
1521

    
1522
    block_devices.append((disk, link_name))
1523

    
1524
  return block_devices
1525

    
1526

    
1527
def StartInstance(instance, startup_paused, reason, store_reason=True):
1528
  """Start an instance.
1529

1530
  @type instance: L{objects.Instance}
1531
  @param instance: the instance object
1532
  @type startup_paused: bool
1533
  @param instance: pause instance at startup?
1534
  @type reason: list of reasons
1535
  @param reason: the reason trail for this startup
1536
  @type store_reason: boolean
1537
  @param store_reason: whether to store the shutdown reason trail on file
1538
  @rtype: None
1539

1540
  """
1541
  running_instances = GetInstanceListForHypervisor(instance.hypervisor,
1542
                                                   instance.hvparams)
1543

    
1544
  if instance.name in running_instances:
1545
    logging.info("Instance %s already running, not starting", instance.name)
1546
    return
1547

    
1548
  try:
1549
    block_devices = _GatherAndLinkBlockDevs(instance)
1550
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1551
    hyper.StartInstance(instance, block_devices, startup_paused)
1552
    if store_reason:
1553
      _StoreInstReasonTrail(instance.name, reason)
1554
  except errors.BlockDeviceError, err:
1555
    _Fail("Block device error: %s", err, exc=True)
1556
  except errors.HypervisorError, err:
1557
    _RemoveBlockDevLinks(instance.name, instance.disks)
1558
    _Fail("Hypervisor error: %s", err, exc=True)
1559

    
1560

    
1561
def InstanceShutdown(instance, timeout, reason, store_reason=True):
1562
  """Shut an instance down.
1563

1564
  @note: this functions uses polling with a hardcoded timeout.
1565

1566
  @type instance: L{objects.Instance}
1567
  @param instance: the instance object
1568
  @type timeout: integer
1569
  @param timeout: maximum timeout for soft shutdown
1570
  @type reason: list of reasons
1571
  @param reason: the reason trail for this shutdown
1572
  @type store_reason: boolean
1573
  @param store_reason: whether to store the shutdown reason trail on file
1574
  @rtype: None
1575

1576
  """
1577
  hv_name = instance.hypervisor
1578
  hyper = hypervisor.GetHypervisor(hv_name)
1579
  iname = instance.name
1580

    
1581
  if instance.name not in hyper.ListInstances(instance.hvparams):
1582
    logging.info("Instance %s not running, doing nothing", iname)
1583
    return
1584

    
1585
  class _TryShutdown:
1586
    def __init__(self):
1587
      self.tried_once = False
1588

    
1589
    def __call__(self):
1590
      if iname not in hyper.ListInstances(instance.hvparams):
1591
        return
1592

    
1593
      try:
1594
        hyper.StopInstance(instance, retry=self.tried_once)
1595
        if store_reason:
1596
          _StoreInstReasonTrail(instance.name, reason)
1597
      except errors.HypervisorError, err:
1598
        if iname not in hyper.ListInstances(instance.hvparams):
1599
          # if the instance is no longer existing, consider this a
1600
          # success and go to cleanup
1601
          return
1602

    
1603
        _Fail("Failed to stop instance %s: %s", iname, err)
1604

    
1605
      self.tried_once = True
1606

    
1607
      raise utils.RetryAgain()
1608

    
1609
  try:
1610
    utils.Retry(_TryShutdown(), 5, timeout)
1611
  except utils.RetryTimeout:
1612
    # the shutdown did not succeed
1613
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1614

    
1615
    try:
1616
      hyper.StopInstance(instance, force=True)
1617
    except errors.HypervisorError, err:
1618
      if iname in hyper.ListInstances(instance.hvparams):
1619
        # only raise an error if the instance still exists, otherwise
1620
        # the error could simply be "instance ... unknown"!
1621
        _Fail("Failed to force stop instance %s: %s", iname, err)
1622

    
1623
    time.sleep(1)
1624

    
1625
    if iname in hyper.ListInstances(instance.hvparams):
1626
      _Fail("Could not shutdown instance %s even by destroy", iname)
1627

    
1628
  try:
1629
    hyper.CleanupInstance(instance.name)
1630
  except errors.HypervisorError, err:
1631
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1632

    
1633
  _RemoveBlockDevLinks(iname, instance.disks)
1634

    
1635

    
1636
def InstanceReboot(instance, reboot_type, shutdown_timeout, reason):
1637
  """Reboot an instance.
1638

1639
  @type instance: L{objects.Instance}
1640
  @param instance: the instance object to reboot
1641
  @type reboot_type: str
1642
  @param reboot_type: the type of reboot, one the following
1643
    constants:
1644
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1645
        instance OS, do not recreate the VM
1646
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1647
        restart the VM (at the hypervisor level)
1648
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1649
        not accepted here, since that mode is handled differently, in
1650
        cmdlib, and translates into full stop and start of the
1651
        instance (instead of a call_instance_reboot RPC)
1652
  @type shutdown_timeout: integer
1653
  @param shutdown_timeout: maximum timeout for soft shutdown
1654
  @type reason: list of reasons
1655
  @param reason: the reason trail for this reboot
1656
  @rtype: None
1657

1658
  """
1659
  running_instances = GetInstanceListForHypervisor(instance.hypervisor,
1660
                                                   instance.hvparams)
1661

    
1662
  if instance.name not in running_instances:
1663
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1664

    
1665
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1666
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1667
    try:
1668
      hyper.RebootInstance(instance)
1669
    except errors.HypervisorError, err:
1670
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1671
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1672
    try:
1673
      InstanceShutdown(instance, shutdown_timeout, reason, store_reason=False)
1674
      result = StartInstance(instance, False, reason, store_reason=False)
1675
      _StoreInstReasonTrail(instance.name, reason)
1676
      return result
1677
    except errors.HypervisorError, err:
1678
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1679
  else:
1680
    _Fail("Invalid reboot_type received: %s", reboot_type)
1681

    
1682

    
1683
def InstanceBalloonMemory(instance, memory):
1684
  """Resize an instance's memory.
1685

1686
  @type instance: L{objects.Instance}
1687
  @param instance: the instance object
1688
  @type memory: int
1689
  @param memory: new memory amount in MB
1690
  @rtype: None
1691

1692
  """
1693
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1694
  running = hyper.ListInstances(instance.hvparams)
1695
  if instance.name not in running:
1696
    logging.info("Instance %s is not running, cannot balloon", instance.name)
1697
    return
1698
  try:
1699
    hyper.BalloonInstanceMemory(instance, memory)
1700
  except errors.HypervisorError, err:
1701
    _Fail("Failed to balloon instance memory: %s", err, exc=True)
1702

    
1703

    
1704
def MigrationInfo(instance):
1705
  """Gather information about an instance to be migrated.
1706

1707
  @type instance: L{objects.Instance}
1708
  @param instance: the instance definition
1709

1710
  """
1711
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1712
  try:
1713
    info = hyper.MigrationInfo(instance)
1714
  except errors.HypervisorError, err:
1715
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1716
  return info
1717

    
1718

    
1719
def AcceptInstance(instance, info, target):
1720
  """Prepare the node to accept an instance.
1721

1722
  @type instance: L{objects.Instance}
1723
  @param instance: the instance definition
1724
  @type info: string/data (opaque)
1725
  @param info: migration information, from the source node
1726
  @type target: string
1727
  @param target: target host (usually ip), on this node
1728

1729
  """
1730
  # TODO: why is this required only for DTS_EXT_MIRROR?
1731
  if instance.disk_template in constants.DTS_EXT_MIRROR:
1732
    # Create the symlinks, as the disks are not active
1733
    # in any way
1734
    try:
1735
      _GatherAndLinkBlockDevs(instance)
1736
    except errors.BlockDeviceError, err:
1737
      _Fail("Block device error: %s", err, exc=True)
1738

    
1739
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1740
  try:
1741
    hyper.AcceptInstance(instance, info, target)
1742
  except errors.HypervisorError, err:
1743
    if instance.disk_template in constants.DTS_EXT_MIRROR:
1744
      _RemoveBlockDevLinks(instance.name, instance.disks)
1745
    _Fail("Failed to accept instance: %s", err, exc=True)
1746

    
1747

    
1748
def FinalizeMigrationDst(instance, info, success):
1749
  """Finalize any preparation to accept an instance.
1750

1751
  @type instance: L{objects.Instance}
1752
  @param instance: the instance definition
1753
  @type info: string/data (opaque)
1754
  @param info: migration information, from the source node
1755
  @type success: boolean
1756
  @param success: whether the migration was a success or a failure
1757

1758
  """
1759
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1760
  try:
1761
    hyper.FinalizeMigrationDst(instance, info, success)
1762
  except errors.HypervisorError, err:
1763
    _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1764

    
1765

    
1766
def MigrateInstance(instance, target, live):
1767
  """Migrates an instance to another node.
1768

1769
  @type instance: L{objects.Instance}
1770
  @param instance: the instance definition
1771
  @type target: string
1772
  @param target: the target node name
1773
  @type live: boolean
1774
  @param live: whether the migration should be done live or not (the
1775
      interpretation of this parameter is left to the hypervisor)
1776
  @raise RPCFail: if migration fails for some reason
1777

1778
  """
1779
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1780

    
1781
  try:
1782
    hyper.MigrateInstance(instance, target, live)
1783
  except errors.HypervisorError, err:
1784
    _Fail("Failed to migrate instance: %s", err, exc=True)
1785

    
1786

    
1787
def FinalizeMigrationSource(instance, success, live):
1788
  """Finalize the instance migration on the source node.
1789

1790
  @type instance: L{objects.Instance}
1791
  @param instance: the instance definition of the migrated instance
1792
  @type success: bool
1793
  @param success: whether the migration succeeded or not
1794
  @type live: bool
1795
  @param live: whether the user requested a live migration or not
1796
  @raise RPCFail: If the execution fails for some reason
1797

1798
  """
1799
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1800

    
1801
  try:
1802
    hyper.FinalizeMigrationSource(instance, success, live)
1803
  except Exception, err:  # pylint: disable=W0703
1804
    _Fail("Failed to finalize the migration on the source node: %s", err,
1805
          exc=True)
1806

    
1807

    
1808
def GetMigrationStatus(instance):
1809
  """Get the migration status
1810

1811
  @type instance: L{objects.Instance}
1812
  @param instance: the instance that is being migrated
1813
  @rtype: L{objects.MigrationStatus}
1814
  @return: the status of the current migration (one of
1815
           L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
1816
           progress info that can be retrieved from the hypervisor
1817
  @raise RPCFail: If the migration status cannot be retrieved
1818

1819
  """
1820
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1821
  try:
1822
    return hyper.GetMigrationStatus(instance)
1823
  except Exception, err:  # pylint: disable=W0703
1824
    _Fail("Failed to get migration status: %s", err, exc=True)
1825

    
1826

    
1827
def BlockdevCreate(disk, size, owner, on_primary, info, excl_stor):
1828
  """Creates a block device for an instance.
1829

1830
  @type disk: L{objects.Disk}
1831
  @param disk: the object describing the disk we should create
1832
  @type size: int
1833
  @param size: the size of the physical underlying device, in MiB
1834
  @type owner: str
1835
  @param owner: the name of the instance for which disk is created,
1836
      used for device cache data
1837
  @type on_primary: boolean
1838
  @param on_primary:  indicates if it is the primary node or not
1839
  @type info: string
1840
  @param info: string that will be sent to the physical device
1841
      creation, used for example to set (LVM) tags on LVs
1842
  @type excl_stor: boolean
1843
  @param excl_stor: Whether exclusive_storage is active
1844

1845
  @return: the new unique_id of the device (this can sometime be
1846
      computed only after creation), or None. On secondary nodes,
1847
      it's not required to return anything.
1848

1849
  """
1850
  # TODO: remove the obsolete "size" argument
1851
  # pylint: disable=W0613
1852
  clist = []
1853
  if disk.children:
1854
    for child in disk.children:
1855
      try:
1856
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1857
      except errors.BlockDeviceError, err:
1858
        _Fail("Can't assemble device %s: %s", child, err)
1859
      if on_primary or disk.AssembleOnSecondary():
1860
        # we need the children open in case the device itself has to
1861
        # be assembled
1862
        try:
1863
          # pylint: disable=E1103
1864
          crdev.Open()
1865
        except errors.BlockDeviceError, err:
1866
          _Fail("Can't make child '%s' read-write: %s", child, err)
1867
      clist.append(crdev)
1868

    
1869
  try:
1870
    device = bdev.Create(disk, clist, excl_stor)
1871
  except errors.BlockDeviceError, err:
1872
    _Fail("Can't create block device: %s", err)
1873

    
1874
  if on_primary or disk.AssembleOnSecondary():
1875
    try:
1876
      device.Assemble()
1877
    except errors.BlockDeviceError, err:
1878
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1879
    if on_primary or disk.OpenOnSecondary():
1880
      try:
1881
        device.Open(force=True)
1882
      except errors.BlockDeviceError, err:
1883
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1884
    DevCacheManager.UpdateCache(device.dev_path, owner,
1885
                                on_primary, disk.iv_name)
1886

    
1887
  device.SetInfo(info)
1888

    
1889
  return device.unique_id
1890

    
1891

    
1892
def _WipeDevice(path, offset, size):
1893
  """This function actually wipes the device.
1894

1895
  @param path: The path to the device to wipe
1896
  @param offset: The offset in MiB in the file
1897
  @param size: The size in MiB to write
1898

1899
  """
1900
  # Internal sizes are always in Mebibytes; if the following "dd" command
1901
  # should use a different block size the offset and size given to this
1902
  # function must be adjusted accordingly before being passed to "dd".
1903
  block_size = 1024 * 1024
1904

    
1905
  cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1906
         "bs=%s" % block_size, "oflag=direct", "of=%s" % path,
1907
         "count=%d" % size]
1908
  result = utils.RunCmd(cmd)
1909

    
1910
  if result.failed:
1911
    _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1912
          result.fail_reason, result.output)
1913

    
1914

    
1915
def BlockdevWipe(disk, offset, size):
1916
  """Wipes a block device.
1917

1918
  @type disk: L{objects.Disk}
1919
  @param disk: the disk object we want to wipe
1920
  @type offset: int
1921
  @param offset: The offset in MiB in the file
1922
  @type size: int
1923
  @param size: The size in MiB to write
1924

1925
  """
1926
  try:
1927
    rdev = _RecursiveFindBD(disk)
1928
  except errors.BlockDeviceError:
1929
    rdev = None
1930

    
1931
  if not rdev:
1932
    _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1933

    
1934
  # Do cross verify some of the parameters
1935
  if offset < 0:
1936
    _Fail("Negative offset")
1937
  if size < 0:
1938
    _Fail("Negative size")
1939
  if offset > rdev.size:
1940
    _Fail("Offset is bigger than device size")
1941
  if (offset + size) > rdev.size:
1942
    _Fail("The provided offset and size to wipe is bigger than device size")
1943

    
1944
  _WipeDevice(rdev.dev_path, offset, size)
1945

    
1946

    
1947
def BlockdevPauseResumeSync(disks, pause):
1948
  """Pause or resume the sync of the block device.
1949

1950
  @type disks: list of L{objects.Disk}
1951
  @param disks: the disks object we want to pause/resume
1952
  @type pause: bool
1953
  @param pause: Wheater to pause or resume
1954

1955
  """
1956
  success = []
1957
  for disk in disks:
1958
    try:
1959
      rdev = _RecursiveFindBD(disk)
1960
    except errors.BlockDeviceError:
1961
      rdev = None
1962

    
1963
    if not rdev:
1964
      success.append((False, ("Cannot change sync for device %s:"
1965
                              " device not found" % disk.iv_name)))
1966
      continue
1967

    
1968
    result = rdev.PauseResumeSync(pause)
1969

    
1970
    if result:
1971
      success.append((result, None))
1972
    else:
1973
      if pause:
1974
        msg = "Pause"
1975
      else:
1976
        msg = "Resume"
1977
      success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1978

    
1979
  return success
1980

    
1981

    
1982
def BlockdevRemove(disk):
1983
  """Remove a block device.
1984

1985
  @note: This is intended to be called recursively.
1986

1987
  @type disk: L{objects.Disk}
1988
  @param disk: the disk object we should remove
1989
  @rtype: boolean
1990
  @return: the success of the operation
1991

1992
  """
1993
  msgs = []
1994
  try:
1995
    rdev = _RecursiveFindBD(disk)
1996
  except errors.BlockDeviceError, err:
1997
    # probably can't attach
1998
    logging.info("Can't attach to device %s in remove", disk)
1999
    rdev = None
2000
  if rdev is not None:
2001
    r_path = rdev.dev_path
2002
    try:
2003
      rdev.Remove()
2004
    except errors.BlockDeviceError, err:
2005
      msgs.append(str(err))
2006
    if not msgs:
2007
      DevCacheManager.RemoveCache(r_path)
2008

    
2009
  if disk.children:
2010
    for child in disk.children:
2011
      try:
2012
        BlockdevRemove(child)
2013
      except RPCFail, err:
2014
        msgs.append(str(err))
2015

    
2016
  if msgs:
2017
    _Fail("; ".join(msgs))
2018

    
2019

    
2020
def _RecursiveAssembleBD(disk, owner, as_primary):
2021
  """Activate a block device for an instance.
2022

2023
  This is run on the primary and secondary nodes for an instance.
2024

2025
  @note: this function is called recursively.
2026

2027
  @type disk: L{objects.Disk}
2028
  @param disk: the disk we try to assemble
2029
  @type owner: str
2030
  @param owner: the name of the instance which owns the disk
2031
  @type as_primary: boolean
2032
  @param as_primary: if we should make the block device
2033
      read/write
2034

2035
  @return: the assembled device or None (in case no device
2036
      was assembled)
2037
  @raise errors.BlockDeviceError: in case there is an error
2038
      during the activation of the children or the device
2039
      itself
2040

2041
  """
2042
  children = []
2043
  if disk.children:
2044
    mcn = disk.ChildrenNeeded()
2045
    if mcn == -1:
2046
      mcn = 0 # max number of Nones allowed
2047
    else:
2048
      mcn = len(disk.children) - mcn # max number of Nones
2049
    for chld_disk in disk.children:
2050
      try:
2051
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
2052
      except errors.BlockDeviceError, err:
2053
        if children.count(None) >= mcn:
2054
          raise
2055
        cdev = None
2056
        logging.error("Error in child activation (but continuing): %s",
2057
                      str(err))
2058
      children.append(cdev)
2059

    
2060
  if as_primary or disk.AssembleOnSecondary():
2061
    r_dev = bdev.Assemble(disk, children)
2062
    result = r_dev
2063
    if as_primary or disk.OpenOnSecondary():
2064
      r_dev.Open()
2065
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
2066
                                as_primary, disk.iv_name)
2067

    
2068
  else:
2069
    result = True
2070
  return result
2071

    
2072

    
2073
def BlockdevAssemble(disk, owner, as_primary, idx):
2074
  """Activate a block device for an instance.
2075

2076
  This is a wrapper over _RecursiveAssembleBD.
2077

2078
  @rtype: str or boolean
2079
  @return: a C{/dev/...} path for primary nodes, and
2080
      C{True} for secondary nodes
2081

2082
  """
2083
  try:
2084
    result = _RecursiveAssembleBD(disk, owner, as_primary)
2085
    if isinstance(result, BlockDev):
2086
      # pylint: disable=E1103
2087
      result = result.dev_path
2088
      if as_primary:
2089
        _SymlinkBlockDev(owner, result, idx)
2090
  except errors.BlockDeviceError, err:
2091
    _Fail("Error while assembling disk: %s", err, exc=True)
2092
  except OSError, err:
2093
    _Fail("Error while symlinking disk: %s", err, exc=True)
2094

    
2095
  return result
2096

    
2097

    
2098
def BlockdevShutdown(disk):
2099
  """Shut down a block device.
2100

2101
  First, if the device is assembled (Attach() is successful), then
2102
  the device is shutdown. Then the children of the device are
2103
  shutdown.
2104

2105
  This function is called recursively. Note that we don't cache the
2106
  children or such, as oppossed to assemble, shutdown of different
2107
  devices doesn't require that the upper device was active.
2108

2109
  @type disk: L{objects.Disk}
2110
  @param disk: the description of the disk we should
2111
      shutdown
2112
  @rtype: None
2113

2114
  """
2115
  msgs = []
2116
  r_dev = _RecursiveFindBD(disk)
2117
  if r_dev is not None:
2118
    r_path = r_dev.dev_path
2119
    try:
2120
      r_dev.Shutdown()
2121
      DevCacheManager.RemoveCache(r_path)
2122
    except errors.BlockDeviceError, err:
2123
      msgs.append(str(err))
2124

    
2125
  if disk.children:
2126
    for child in disk.children:
2127
      try:
2128
        BlockdevShutdown(child)
2129
      except RPCFail, err:
2130
        msgs.append(str(err))
2131

    
2132
  if msgs:
2133
    _Fail("; ".join(msgs))
2134

    
2135

    
2136
def BlockdevAddchildren(parent_cdev, new_cdevs):
2137
  """Extend a mirrored block device.
2138

2139
  @type parent_cdev: L{objects.Disk}
2140
  @param parent_cdev: the disk to which we should add children
2141
  @type new_cdevs: list of L{objects.Disk}
2142
  @param new_cdevs: the list of children which we should add
2143
  @rtype: None
2144

2145
  """
2146
  parent_bdev = _RecursiveFindBD(parent_cdev)
2147
  if parent_bdev is None:
2148
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
2149
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
2150
  if new_bdevs.count(None) > 0:
2151
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
2152
  parent_bdev.AddChildren(new_bdevs)
2153

    
2154

    
2155
def BlockdevRemovechildren(parent_cdev, new_cdevs):
2156
  """Shrink a mirrored block device.
2157

2158
  @type parent_cdev: L{objects.Disk}
2159
  @param parent_cdev: the disk from which we should remove children
2160
  @type new_cdevs: list of L{objects.Disk}
2161
  @param new_cdevs: the list of children which we should remove
2162
  @rtype: None
2163

2164
  """
2165
  parent_bdev = _RecursiveFindBD(parent_cdev)
2166
  if parent_bdev is None:
2167
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
2168
  devs = []
2169
  for disk in new_cdevs:
2170
    rpath = disk.StaticDevPath()
2171
    if rpath is None:
2172
      bd = _RecursiveFindBD(disk)
2173
      if bd is None:
2174
        _Fail("Can't find device %s while removing children", disk)
2175
      else:
2176
        devs.append(bd.dev_path)
2177
    else:
2178
      if not utils.IsNormAbsPath(rpath):
2179
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
2180
      devs.append(rpath)
2181
  parent_bdev.RemoveChildren(devs)
2182

    
2183

    
2184
def BlockdevGetmirrorstatus(disks):
2185
  """Get the mirroring status of a list of devices.
2186

2187
  @type disks: list of L{objects.Disk}
2188
  @param disks: the list of disks which we should query
2189
  @rtype: disk
2190
  @return: List of L{objects.BlockDevStatus}, one for each disk
2191
  @raise errors.BlockDeviceError: if any of the disks cannot be
2192
      found
2193

2194
  """
2195
  stats = []
2196
  for dsk in disks:
2197
    rbd = _RecursiveFindBD(dsk)
2198
    if rbd is None:
2199
      _Fail("Can't find device %s", dsk)
2200

    
2201
    stats.append(rbd.CombinedSyncStatus())
2202

    
2203
  return stats
2204

    
2205

    
2206
def BlockdevGetmirrorstatusMulti(disks):
2207
  """Get the mirroring status of a list of devices.
2208

2209
  @type disks: list of L{objects.Disk}
2210
  @param disks: the list of disks which we should query
2211
  @rtype: disk
2212
  @return: List of tuples, (bool, status), one for each disk; bool denotes
2213
    success/failure, status is L{objects.BlockDevStatus} on success, string
2214
    otherwise
2215

2216
  """
2217
  result = []
2218
  for disk in disks:
2219
    try:
2220
      rbd = _RecursiveFindBD(disk)
2221
      if rbd is None:
2222
        result.append((False, "Can't find device %s" % disk))
2223
        continue
2224

    
2225
      status = rbd.CombinedSyncStatus()
2226
    except errors.BlockDeviceError, err:
2227
      logging.exception("Error while getting disk status")
2228
      result.append((False, str(err)))
2229
    else:
2230
      result.append((True, status))
2231

    
2232
  assert len(disks) == len(result)
2233

    
2234
  return result
2235

    
2236

    
2237
def _RecursiveFindBD(disk):
2238
  """Check if a device is activated.
2239

2240
  If so, return information about the real device.
2241

2242
  @type disk: L{objects.Disk}
2243
  @param disk: the disk object we need to find
2244

2245
  @return: None if the device can't be found,
2246
      otherwise the device instance
2247

2248
  """
2249
  children = []
2250
  if disk.children:
2251
    for chdisk in disk.children:
2252
      children.append(_RecursiveFindBD(chdisk))
2253

    
2254
  return bdev.FindDevice(disk, children)
2255

    
2256

    
2257
def _OpenRealBD(disk):
2258
  """Opens the underlying block device of a disk.
2259

2260
  @type disk: L{objects.Disk}
2261
  @param disk: the disk object we want to open
2262

2263
  """
2264
  real_disk = _RecursiveFindBD(disk)
2265
  if real_disk is None:
2266
    _Fail("Block device '%s' is not set up", disk)
2267

    
2268
  real_disk.Open()
2269

    
2270
  return real_disk
2271

    
2272

    
2273
def BlockdevFind(disk):
2274
  """Check if a device is activated.
2275

2276
  If it is, return information about the real device.
2277

2278
  @type disk: L{objects.Disk}
2279
  @param disk: the disk to find
2280
  @rtype: None or objects.BlockDevStatus
2281
  @return: None if the disk cannot be found, otherwise a the current
2282
           information
2283

2284
  """
2285
  try:
2286
    rbd = _RecursiveFindBD(disk)
2287
  except errors.BlockDeviceError, err:
2288
    _Fail("Failed to find device: %s", err, exc=True)
2289

    
2290
  if rbd is None:
2291
    return None
2292

    
2293
  return rbd.GetSyncStatus()
2294

    
2295

    
2296
def BlockdevGetdimensions(disks):
2297
  """Computes the size of the given disks.
2298

2299
  If a disk is not found, returns None instead.
2300

2301
  @type disks: list of L{objects.Disk}
2302
  @param disks: the list of disk to compute the size for
2303
  @rtype: list
2304
  @return: list with elements None if the disk cannot be found,
2305
      otherwise the pair (size, spindles), where spindles is None if the
2306
      device doesn't support that
2307

2308
  """
2309
  result = []
2310
  for cf in disks:
2311
    try:
2312
      rbd = _RecursiveFindBD(cf)
2313
    except errors.BlockDeviceError:
2314
      result.append(None)
2315
      continue
2316
    if rbd is None:
2317
      result.append(None)
2318
    else:
2319
      result.append(rbd.GetActualDimensions())
2320
  return result
2321

    
2322

    
2323
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
2324
  """Export a block device to a remote node.
2325

2326
  @type disk: L{objects.Disk}
2327
  @param disk: the description of the disk to export
2328
  @type dest_node: str
2329
  @param dest_node: the destination node to export to
2330
  @type dest_path: str
2331
  @param dest_path: the destination path on the target node
2332
  @type cluster_name: str
2333
  @param cluster_name: the cluster name, needed for SSH hostalias
2334
  @rtype: None
2335

2336
  """
2337
  real_disk = _OpenRealBD(disk)
2338

    
2339
  # the block size on the read dd is 1MiB to match our units
2340
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
2341
                               "dd if=%s bs=1048576 count=%s",
2342
                               real_disk.dev_path, str(disk.size))
2343

    
2344
  # we set here a smaller block size as, due to ssh buffering, more
2345
  # than 64-128k will mostly ignored; we use nocreat to fail if the
2346
  # device is not already there or we pass a wrong path; we use
2347
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
2348
  # to not buffer too much memory; this means that at best, we flush
2349
  # every 64k, which will not be very fast
2350
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
2351
                                " oflag=dsync", dest_path)
2352

    
2353
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
2354
                                                   constants.SSH_LOGIN_USER,
2355
                                                   destcmd)
2356

    
2357
  # all commands have been checked, so we're safe to combine them
2358
  command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
2359

    
2360
  result = utils.RunCmd(["bash", "-c", command])
2361

    
2362
  if result.failed:
2363
    _Fail("Disk copy command '%s' returned error: %s"
2364
          " output: %s", command, result.fail_reason, result.output)
2365

    
2366

    
2367
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2368
  """Write a file to the filesystem.
2369

2370
  This allows the master to overwrite(!) a file. It will only perform
2371
  the operation if the file belongs to a list of configuration files.
2372

2373
  @type file_name: str
2374
  @param file_name: the target file name
2375
  @type data: str
2376
  @param data: the new contents of the file
2377
  @type mode: int
2378
  @param mode: the mode to give the file (can be None)
2379
  @type uid: string
2380
  @param uid: the owner of the file
2381
  @type gid: string
2382
  @param gid: the group of the file
2383
  @type atime: float
2384
  @param atime: the atime to set on the file (can be None)
2385
  @type mtime: float
2386
  @param mtime: the mtime to set on the file (can be None)
2387
  @rtype: None
2388

2389
  """
2390
  file_name = vcluster.LocalizeVirtualPath(file_name)
2391

    
2392
  if not os.path.isabs(file_name):
2393
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2394

    
2395
  if file_name not in _ALLOWED_UPLOAD_FILES:
2396
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2397
          file_name)
2398

    
2399
  raw_data = _Decompress(data)
2400

    
2401
  if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2402
    _Fail("Invalid username/groupname type")
2403

    
2404
  getents = runtime.GetEnts()
2405
  uid = getents.LookupUser(uid)
2406
  gid = getents.LookupGroup(gid)
2407

    
2408
  utils.SafeWriteFile(file_name, None,
2409
                      data=raw_data, mode=mode, uid=uid, gid=gid,
2410
                      atime=atime, mtime=mtime)
2411

    
2412

    
2413
def RunOob(oob_program, command, node, timeout):
2414
  """Executes oob_program with given command on given node.
2415

2416
  @param oob_program: The path to the executable oob_program
2417
  @param command: The command to invoke on oob_program
2418
  @param node: The node given as an argument to the program
2419
  @param timeout: Timeout after which we kill the oob program
2420

2421
  @return: stdout
2422
  @raise RPCFail: If execution fails for some reason
2423

2424
  """
2425
  result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2426

    
2427
  if result.failed:
2428
    _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2429
          result.fail_reason, result.output)
2430

    
2431
  return result.stdout
2432

    
2433

    
2434
def _OSOndiskAPIVersion(os_dir):
2435
  """Compute and return the API version of a given OS.
2436

2437
  This function will try to read the API version of the OS residing in
2438
  the 'os_dir' directory.
2439

2440
  @type os_dir: str
2441
  @param os_dir: the directory in which we should look for the OS
2442
  @rtype: tuple
2443
  @return: tuple (status, data) with status denoting the validity and
2444
      data holding either the vaid versions or an error message
2445

2446
  """
2447
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2448

    
2449
  try:
2450
    st = os.stat(api_file)
2451
  except EnvironmentError, err:
2452
    return False, ("Required file '%s' not found under path %s: %s" %
2453
                   (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err)))
2454

    
2455
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2456
    return False, ("File '%s' in %s is not a regular file" %
2457
                   (constants.OS_API_FILE, os_dir))
2458

    
2459
  try:
2460
    api_versions = utils.ReadFile(api_file).splitlines()
2461
  except EnvironmentError, err:
2462
    return False, ("Error while reading the API version file at %s: %s" %
2463
                   (api_file, utils.ErrnoOrStr(err)))
2464

    
2465
  try:
2466
    api_versions = [int(version.strip()) for version in api_versions]
2467
  except (TypeError, ValueError), err:
2468
    return False, ("API version(s) can't be converted to integer: %s" %
2469
                   str(err))
2470

    
2471
  return True, api_versions
2472

    
2473

    
2474
def DiagnoseOS(top_dirs=None):
2475
  """Compute the validity for all OSes.
2476

2477
  @type top_dirs: list
2478
  @param top_dirs: the list of directories in which to
2479
      search (if not given defaults to
2480
      L{pathutils.OS_SEARCH_PATH})
2481
  @rtype: list of L{objects.OS}
2482
  @return: a list of tuples (name, path, status, diagnose, variants,
2483
      parameters, api_version) for all (potential) OSes under all
2484
      search paths, where:
2485
          - name is the (potential) OS name
2486
          - path is the full path to the OS
2487
          - status True/False is the validity of the OS
2488
          - diagnose is the error message for an invalid OS, otherwise empty
2489
          - variants is a list of supported OS variants, if any
2490
          - parameters is a list of (name, help) parameters, if any
2491
          - api_version is a list of support OS API versions
2492

2493
  """
2494
  if top_dirs is None:
2495
    top_dirs = pathutils.OS_SEARCH_PATH
2496

    
2497
  result = []
2498
  for dir_name in top_dirs:
2499
    if os.path.isdir(dir_name):
2500
      try:
2501
        f_names = utils.ListVisibleFiles(dir_name)
2502
      except EnvironmentError, err:
2503
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2504
        break
2505
      for name in f_names:
2506
        os_path = utils.PathJoin(dir_name, name)
2507
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2508
        if status:
2509
          diagnose = ""
2510
          variants = os_inst.supported_variants
2511
          parameters = os_inst.supported_parameters
2512
          api_versions = os_inst.api_versions
2513
        else:
2514
          diagnose = os_inst
2515
          variants = parameters = api_versions = []
2516
        result.append((name, os_path, status, diagnose, variants,
2517
                       parameters, api_versions))
2518

    
2519
  return result
2520

    
2521

    
2522
def _TryOSFromDisk(name, base_dir=None):
2523
  """Create an OS instance from disk.
2524

2525
  This function will return an OS instance if the given name is a
2526
  valid OS name.
2527

2528
  @type base_dir: string
2529
  @keyword base_dir: Base directory containing OS installations.
2530
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2531
  @rtype: tuple
2532
  @return: success and either the OS instance if we find a valid one,
2533
      or error message
2534

2535
  """
2536
  if base_dir is None:
2537
    os_dir = utils.FindFile(name, pathutils.OS_SEARCH_PATH, os.path.isdir)
2538
  else:
2539
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2540

    
2541
  if os_dir is None:
2542
    return False, "Directory for OS %s not found in search path" % name
2543

    
2544
  status, api_versions = _OSOndiskAPIVersion(os_dir)
2545
  if not status:
2546
    # push the error up
2547
    return status, api_versions
2548

    
2549
  if not constants.OS_API_VERSIONS.intersection(api_versions):
2550
    return False, ("API version mismatch for path '%s': found %s, want %s." %
2551
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
2552

    
2553
  # OS Files dictionary, we will populate it with the absolute path
2554
  # names; if the value is True, then it is a required file, otherwise
2555
  # an optional one
2556
  os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2557

    
2558
  if max(api_versions) >= constants.OS_API_V15:
2559
    os_files[constants.OS_VARIANTS_FILE] = False
2560

    
2561
  if max(api_versions) >= constants.OS_API_V20:
2562
    os_files[constants.OS_PARAMETERS_FILE] = True
2563
  else:
2564
    del os_files[constants.OS_SCRIPT_VERIFY]
2565

    
2566
  for (filename, required) in os_files.items():
2567
    os_files[filename] = utils.PathJoin(os_dir, filename)
2568

    
2569
    try:
2570
      st = os.stat(os_files[filename])
2571
    except EnvironmentError, err:
2572
      if err.errno == errno.ENOENT and not required:
2573
        del os_files[filename]
2574
        continue
2575
      return False, ("File '%s' under path '%s' is missing (%s)" %
2576
                     (filename, os_dir, utils.ErrnoOrStr(err)))
2577

    
2578
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2579
      return False, ("File '%s' under path '%s' is not a regular file" %
2580
                     (filename, os_dir))
2581

    
2582
    if filename in constants.OS_SCRIPTS:
2583
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2584
        return False, ("File '%s' under path '%s' is not executable" %
2585
                       (filename, os_dir))
2586

    
2587
  variants = []
2588
  if constants.OS_VARIANTS_FILE in os_files:
2589
    variants_file = os_files[constants.OS_VARIANTS_FILE]
2590
    try:
2591
      variants = \
2592
        utils.FilterEmptyLinesAndComments(utils.ReadFile(variants_file))
2593
    except EnvironmentError, err:
2594
      # we accept missing files, but not other errors
2595
      if err.errno != errno.ENOENT:
2596
        return False, ("Error while reading the OS variants file at %s: %s" %
2597
                       (variants_file, utils.ErrnoOrStr(err)))
2598

    
2599
  parameters = []
2600
  if constants.OS_PARAMETERS_FILE in os_files:
2601
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2602
    try:
2603
      parameters = utils.ReadFile(parameters_file).splitlines()
2604
    except EnvironmentError, err:
2605
      return False, ("Error while reading the OS parameters file at %s: %s" %
2606
                     (parameters_file, utils.ErrnoOrStr(err)))
2607
    parameters = [v.split(None, 1) for v in parameters]
2608

    
2609
  os_obj = objects.OS(name=name, path=os_dir,
2610
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
2611
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
2612
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
2613
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
2614
                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2615
                                                 None),
2616
                      supported_variants=variants,
2617
                      supported_parameters=parameters,
2618
                      api_versions=api_versions)
2619
  return True, os_obj
2620

    
2621

    
2622
def OSFromDisk(name, base_dir=None):
2623
  """Create an OS instance from disk.
2624

2625
  This function will return an OS instance if the given name is a
2626
  valid OS name. Otherwise, it will raise an appropriate
2627
  L{RPCFail} exception, detailing why this is not a valid OS.
2628

2629
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2630
  an exception but returns true/false status data.
2631

2632
  @type base_dir: string
2633
  @keyword base_dir: Base directory containing OS installations.
2634
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2635
  @rtype: L{objects.OS}
2636
  @return: the OS instance if we find a valid one
2637
  @raise RPCFail: if we don't find a valid OS
2638

2639
  """
2640
  name_only = objects.OS.GetName(name)
2641
  status, payload = _TryOSFromDisk(name_only, base_dir)
2642

    
2643
  if not status:
2644
    _Fail(payload)
2645

    
2646
  return payload
2647

    
2648

    
2649
def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2650
  """Calculate the basic environment for an os script.
2651

2652
  @type os_name: str
2653
  @param os_name: full operating system name (including variant)
2654
  @type inst_os: L{objects.OS}
2655
  @param inst_os: operating system for which the environment is being built
2656
  @type os_params: dict
2657
  @param os_params: the OS parameters
2658
  @type debug: integer
2659
  @param debug: debug level (0 or 1, for OS Api 10)
2660
  @rtype: dict
2661
  @return: dict of environment variables
2662
  @raise errors.BlockDeviceError: if the block device
2663
      cannot be found
2664

2665
  """
2666
  result = {}
2667
  api_version = \
2668
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2669
  result["OS_API_VERSION"] = "%d" % api_version
2670
  result["OS_NAME"] = inst_os.name
2671
  result["DEBUG_LEVEL"] = "%d" % debug
2672

    
2673
  # OS variants
2674
  if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2675
    variant = objects.OS.GetVariant(os_name)
2676
    if not variant:
2677
      variant = inst_os.supported_variants[0]
2678
  else:
2679
    variant = ""
2680
  result["OS_VARIANT"] = variant
2681

    
2682
  # OS params
2683
  for pname, pvalue in os_params.items():
2684
    result["OSP_%s" % pname.upper()] = pvalue
2685

    
2686
  # Set a default path otherwise programs called by OS scripts (or
2687
  # even hooks called from OS scripts) might break, and we don't want
2688
  # to have each script require setting a PATH variable
2689
  result["PATH"] = constants.HOOKS_PATH
2690

    
2691
  return result
2692

    
2693

    
2694
def OSEnvironment(instance, inst_os, debug=0):
2695
  """Calculate the environment for an os script.
2696

2697
  @type instance: L{objects.Instance}
2698
  @param instance: target instance for the os script run
2699
  @type inst_os: L{objects.OS}
2700
  @param inst_os: operating system for which the environment is being built
2701
  @type debug: integer
2702
  @param debug: debug level (0 or 1, for OS Api 10)
2703
  @rtype: dict
2704
  @return: dict of environment variables
2705
  @raise errors.BlockDeviceError: if the block device
2706
      cannot be found
2707

2708
  """
2709
  result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2710

    
2711
  for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2712
    result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2713

    
2714
  result["HYPERVISOR"] = instance.hypervisor
2715
  result["DISK_COUNT"] = "%d" % len(instance.disks)
2716
  result["NIC_COUNT"] = "%d" % len(instance.nics)
2717
  result["INSTANCE_SECONDARY_NODES"] = \
2718
      ("%s" % " ".join(instance.secondary_nodes))
2719

    
2720
  # Disks
2721
  for idx, disk in enumerate(instance.disks):
2722
    real_disk = _OpenRealBD(disk)
2723
    result["DISK_%d_PATH" % idx] = real_disk.dev_path
2724
    result["DISK_%d_ACCESS" % idx] = disk.mode
2725
    if constants.HV_DISK_TYPE in instance.hvparams:
2726
      result["DISK_%d_FRONTEND_TYPE" % idx] = \
2727
        instance.hvparams[constants.HV_DISK_TYPE]
2728
    if disk.dev_type in constants.LDS_BLOCK:
2729
      result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2730
    elif disk.dev_type == constants.LD_FILE:
2731
      result["DISK_%d_BACKEND_TYPE" % idx] = \
2732
        "file:%s" % disk.physical_id[0]
2733

    
2734
  # NICs
2735
  for idx, nic in enumerate(instance.nics):
2736
    result["NIC_%d_MAC" % idx] = nic.mac
2737
    if nic.ip:
2738
      result["NIC_%d_IP" % idx] = nic.ip
2739
    result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2740
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2741
      result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2742
    if nic.nicparams[constants.NIC_LINK]:
2743
      result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2744
    if nic.netinfo:
2745
      nobj = objects.Network.FromDict(nic.netinfo)
2746
      result.update(nobj.HooksDict("NIC_%d_" % idx))
2747
    if constants.HV_NIC_TYPE in instance.hvparams:
2748
      result["NIC_%d_FRONTEND_TYPE" % idx] = \
2749
        instance.hvparams[constants.HV_NIC_TYPE]
2750

    
2751
  # HV/BE params
2752
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2753
    for key, value in source.items():
2754
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2755

    
2756
  return result
2757

    
2758

    
2759
def DiagnoseExtStorage(top_dirs=None):
2760
  """Compute the validity for all ExtStorage Providers.
2761

2762
  @type top_dirs: list
2763
  @param top_dirs: the list of directories in which to
2764
      search (if not given defaults to
2765
      L{pathutils.ES_SEARCH_PATH})
2766
  @rtype: list of L{objects.ExtStorage}
2767
  @return: a list of tuples (name, path, status, diagnose, parameters)
2768
      for all (potential) ExtStorage Providers under all
2769
      search paths, where:
2770
          - name is the (potential) ExtStorage Provider
2771
          - path is the full path to the ExtStorage Provider
2772
          - status True/False is the validity of the ExtStorage Provider
2773
          - diagnose is the error message for an invalid ExtStorage Provider,
2774
            otherwise empty
2775
          - parameters is a list of (name, help) parameters, if any
2776

2777
  """
2778
  if top_dirs is None:
2779
    top_dirs = pathutils.ES_SEARCH_PATH
2780

    
2781
  result = []
2782
  for dir_name in top_dirs:
2783
    if os.path.isdir(dir_name):
2784
      try:
2785
        f_names = utils.ListVisibleFiles(dir_name)
2786
      except EnvironmentError, err:
2787
        logging.exception("Can't list the ExtStorage directory %s: %s",
2788
                          dir_name, err)
2789
        break
2790
      for name in f_names:
2791
        es_path = utils.PathJoin(dir_name, name)
2792
        status, es_inst = bdev.ExtStorageFromDisk(name, base_dir=dir_name)
2793
        if status:
2794
          diagnose = ""
2795
          parameters = es_inst.supported_parameters
2796
        else:
2797
          diagnose = es_inst
2798
          parameters = []
2799
        result.append((name, es_path, status, diagnose, parameters))
2800

    
2801
  return result
2802

    
2803

    
2804
def BlockdevGrow(disk, amount, dryrun, backingstore):
2805
  """Grow a stack of block devices.
2806

2807
  This function is called recursively, with the childrens being the
2808
  first ones to resize.
2809

2810
  @type disk: L{objects.Disk}
2811
  @param disk: the disk to be grown
2812
  @type amount: integer
2813
  @param amount: the amount (in mebibytes) to grow with
2814
  @type dryrun: boolean
2815
  @param dryrun: whether to execute the operation in simulation mode
2816
      only, without actually increasing the size
2817
  @param backingstore: whether to execute the operation on backing storage
2818
      only, or on "logical" storage only; e.g. DRBD is logical storage,
2819
      whereas LVM, file, RBD are backing storage
2820
  @rtype: (status, result)
2821
  @return: a tuple with the status of the operation (True/False), and
2822
      the errors message if status is False
2823

2824
  """
2825
  r_dev = _RecursiveFindBD(disk)
2826
  if r_dev is None:
2827
    _Fail("Cannot find block device %s", disk)
2828

    
2829
  try:
2830
    r_dev.Grow(amount, dryrun, backingstore)
2831
  except errors.BlockDeviceError, err:
2832
    _Fail("Failed to grow block device: %s", err, exc=True)
2833

    
2834

    
2835
def BlockdevSnapshot(disk):
2836
  """Create a snapshot copy of a block device.
2837

2838
  This function is called recursively, and the snapshot is actually created
2839
  just for the leaf lvm backend device.
2840

2841
  @type disk: L{objects.Disk}
2842
  @param disk: the disk to be snapshotted
2843
  @rtype: string
2844
  @return: snapshot disk ID as (vg, lv)
2845

2846
  """
2847
  if disk.dev_type == constants.LD_DRBD8:
2848
    if not disk.children:
2849
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2850
            disk.unique_id)
2851
    return BlockdevSnapshot(disk.children[0])
2852
  elif disk.dev_type == constants.LD_LV:
2853
    r_dev = _RecursiveFindBD(disk)
2854
    if r_dev is not None:
2855
      # FIXME: choose a saner value for the snapshot size
2856
      # let's stay on the safe side and ask for the full size, for now
2857
      return r_dev.Snapshot(disk.size)
2858
    else:
2859
      _Fail("Cannot find block device %s", disk)
2860
  else:
2861
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2862
          disk.unique_id, disk.dev_type)
2863

    
2864

    
2865
def BlockdevSetInfo(disk, info):
2866
  """Sets 'metadata' information on block devices.
2867

2868
  This function sets 'info' metadata on block devices. Initial
2869
  information is set at device creation; this function should be used
2870
  for example after renames.
2871

2872
  @type disk: L{objects.Disk}
2873
  @param disk: the disk to be grown
2874
  @type info: string
2875
  @param info: new 'info' metadata
2876
  @rtype: (status, result)
2877
  @return: a tuple with the status of the operation (True/False), and
2878
      the errors message if status is False
2879

2880
  """
2881
  r_dev = _RecursiveFindBD(disk)
2882
  if r_dev is None:
2883
    _Fail("Cannot find block device %s", disk)
2884

    
2885
  try:
2886
    r_dev.SetInfo(info)
2887
  except errors.BlockDeviceError, err:
2888
    _Fail("Failed to set information on block device: %s", err, exc=True)
2889

    
2890

    
2891
def FinalizeExport(instance, snap_disks):
2892
  """Write out the export configuration information.
2893

2894
  @type instance: L{objects.Instance}
2895
  @param instance: the instance which we export, used for
2896
      saving configuration
2897
  @type snap_disks: list of L{objects.Disk}
2898
  @param snap_disks: list of snapshot block devices, which
2899
      will be used to get the actual name of the dump file
2900

2901
  @rtype: None
2902

2903
  """
2904
  destdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name + ".new")
2905
  finaldestdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name)
2906

    
2907
  config = objects.SerializableConfigParser()
2908

    
2909
  config.add_section(constants.INISECT_EXP)
2910
  config.set(constants.INISECT_EXP, "version", "0")
2911
  config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2912
  config.set(constants.INISECT_EXP, "source", instance.primary_node)
2913
  config.set(constants.INISECT_EXP, "os", instance.os)
2914
  config.set(constants.INISECT_EXP, "compression", "none")
2915

    
2916
  config.add_section(constants.INISECT_INS)
2917
  config.set(constants.INISECT_INS, "name", instance.name)
2918
  config.set(constants.INISECT_INS, "maxmem", "%d" %
2919
             instance.beparams[constants.BE_MAXMEM])
2920
  config.set(constants.INISECT_INS, "minmem", "%d" %
2921
             instance.beparams[constants.BE_MINMEM])
2922
  # "memory" is deprecated, but useful for exporting to old ganeti versions
2923
  config.set(constants.INISECT_INS, "memory", "%d" %
2924
             instance.beparams[constants.BE_MAXMEM])
2925
  config.set(constants.INISECT_INS, "vcpus", "%d" %
2926
             instance.beparams[constants.BE_VCPUS])
2927
  config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2928
  config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2929
  config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2930

    
2931
  nic_total = 0
2932
  for nic_count, nic in enumerate(instance.nics):
2933
    nic_total += 1
2934
    config.set(constants.INISECT_INS, "nic%d_mac" %
2935
               nic_count, "%s" % nic.mac)
2936
    config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2937
    config.set(constants.INISECT_INS, "nic%d_network" % nic_count,
2938
               "%s" % nic.network)
2939
    for param in constants.NICS_PARAMETER_TYPES:
2940
      config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2941
                 "%s" % nic.nicparams.get(param, None))
2942
  # TODO: redundant: on load can read nics until it doesn't exist
2943
  config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2944

    
2945
  disk_total = 0
2946
  for disk_count, disk in enumerate(snap_disks):
2947
    if disk:
2948
      disk_total += 1
2949
      config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2950
                 ("%s" % disk.iv_name))
2951
      config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2952
                 ("%s" % disk.physical_id[1]))
2953
      config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2954
                 ("%d" % disk.size))
2955

    
2956
  config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2957

    
2958
  # New-style hypervisor/backend parameters
2959

    
2960
  config.add_section(constants.INISECT_HYP)
2961
  for name, value in instance.hvparams.items():
2962
    if name not in constants.HVC_GLOBALS:
2963
      config.set(constants.INISECT_HYP, name, str(value))
2964

    
2965
  config.add_section(constants.INISECT_BEP)
2966
  for name, value in instance.beparams.items():
2967
    config.set(constants.INISECT_BEP, name, str(value))
2968

    
2969
  config.add_section(constants.INISECT_OSP)
2970
  for name, value in instance.osparams.items():
2971
    config.set(constants.INISECT_OSP, name, str(value))
2972

    
2973
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2974
                  data=config.Dumps())
2975
  shutil.rmtree(finaldestdir, ignore_errors=True)
2976
  shutil.move(destdir, finaldestdir)
2977

    
2978

    
2979
def ExportInfo(dest):
2980
  """Get export configuration information.
2981

2982
  @type dest: str
2983
  @param dest: directory containing the export
2984

2985
  @rtype: L{objects.SerializableConfigParser}
2986
  @return: a serializable config file containing the
2987
      export info
2988

2989
  """
2990
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2991

    
2992
  config = objects.SerializableConfigParser()
2993
  config.read(cff)
2994

    
2995
  if (not config.has_section(constants.INISECT_EXP) or
2996
      not config.has_section(constants.INISECT_INS)):
2997
    _Fail("Export info file doesn't have the required fields")
2998

    
2999
  return config.Dumps()
3000

    
3001

    
3002
def ListExports():
3003
  """Return a list of exports currently available on this machine.
3004

3005
  @rtype: list
3006
  @return: list of the exports
3007

3008
  """
3009
  if os.path.isdir(pathutils.EXPORT_DIR):
3010
    return sorted(utils.ListVisibleFiles(pathutils.EXPORT_DIR))
3011
  else:
3012
    _Fail("No exports directory")
3013

    
3014

    
3015
def RemoveExport(export):
3016
  """Remove an existing export from the node.
3017

3018
  @type export: str
3019
  @param export: the name of the export to remove
3020
  @rtype: None
3021

3022
  """
3023
  target = utils.PathJoin(pathutils.EXPORT_DIR, export)
3024

    
3025
  try:
3026
    shutil.rmtree(target)
3027
  except EnvironmentError, err:
3028
    _Fail("Error while removing the export: %s", err, exc=True)
3029

    
3030

    
3031
def BlockdevRename(devlist):
3032
  """Rename a list of block devices.
3033

3034
  @type devlist: list of tuples
3035
  @param devlist: list of tuples of the form  (disk,
3036
      new_logical_id, new_physical_id); disk is an
3037
      L{objects.Disk} object describing the current disk,
3038
      and new logical_id/physical_id is the name we
3039
      rename it to
3040
  @rtype: boolean
3041
  @return: True if all renames succeeded, False otherwise
3042

3043
  """
3044
  msgs = []
3045
  result = True
3046
  for disk, unique_id in devlist:
3047
    dev = _RecursiveFindBD(disk)
3048
    if dev is None:
3049
      msgs.append("Can't find device %s in rename" % str(disk))
3050
      result = False
3051
      continue
3052
    try:
3053
      old_rpath = dev.dev_path
3054
      dev.Rename(unique_id)
3055
      new_rpath = dev.dev_path
3056
      if old_rpath != new_rpath:
3057
        DevCacheManager.RemoveCache(old_rpath)
3058
        # FIXME: we should add the new cache information here, like:
3059
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
3060
        # but we don't have the owner here - maybe parse from existing
3061
        # cache? for now, we only lose lvm data when we rename, which
3062
        # is less critical than DRBD or MD
3063
    except errors.BlockDeviceError, err:
3064
      msgs.append("Can't rename device '%s' to '%s': %s" %
3065
                  (dev, unique_id, err))
3066
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
3067
      result = False
3068
  if not result:
3069
    _Fail("; ".join(msgs))
3070

    
3071

    
3072
def _TransformFileStorageDir(fs_dir):
3073
  """Checks whether given file_storage_dir is valid.
3074

3075
  Checks wheter the given fs_dir is within the cluster-wide default
3076
  file_storage_dir or the shared_file_storage_dir, which are stored in
3077
  SimpleStore. Only paths under those directories are allowed.
3078

3079
  @type fs_dir: str
3080
  @param fs_dir: the path to check
3081

3082
  @return: the normalized path if valid, None otherwise
3083

3084
  """
3085
  if not (constants.ENABLE_FILE_STORAGE or
3086
          constants.ENABLE_SHARED_FILE_STORAGE):
3087
    _Fail("File storage disabled at configure time")
3088

    
3089
  bdev.CheckFileStoragePath(fs_dir)
3090

    
3091
  return os.path.normpath(fs_dir)
3092

    
3093

    
3094
def CreateFileStorageDir(file_storage_dir):
3095
  """Create file storage directory.
3096

3097
  @type file_storage_dir: str
3098
  @param file_storage_dir: directory to create
3099

3100
  @rtype: tuple
3101
  @return: tuple with first element a boolean indicating wheter dir
3102
      creation was successful or not
3103

3104
  """
3105
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
3106
  if os.path.exists(file_storage_dir):
3107
    if not os.path.isdir(file_storage_dir):
3108
      _Fail("Specified storage dir '%s' is not a directory",
3109
            file_storage_dir)
3110
  else:
3111
    try:
3112
      os.makedirs(file_storage_dir, 0750)
3113
    except OSError, err:
3114
      _Fail("Cannot create file storage directory '%s': %s",
3115
            file_storage_dir, err, exc=True)
3116

    
3117

    
3118
def RemoveFileStorageDir(file_storage_dir):
3119
  """Remove file storage directory.
3120

3121
  Remove it only if it's empty. If not log an error and return.
3122

3123
  @type file_storage_dir: str
3124
  @param file_storage_dir: the directory we should cleanup
3125
  @rtype: tuple (success,)
3126
  @return: tuple of one element, C{success}, denoting
3127
      whether the operation was successful
3128

3129
  """
3130
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
3131
  if os.path.exists(file_storage_dir):
3132
    if not os.path.isdir(file_storage_dir):
3133
      _Fail("Specified Storage directory '%s' is not a directory",
3134
            file_storage_dir)
3135
    # deletes dir only if empty, otherwise we want to fail the rpc call
3136
    try:
3137
      os.rmdir(file_storage_dir)
3138
    except OSError, err:
3139
      _Fail("Cannot remove file storage directory '%s': %s",
3140
            file_storage_dir, err)
3141

    
3142

    
3143
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
3144
  """Rename the file storage directory.
3145

3146
  @type old_file_storage_dir: str
3147
  @param old_file_storage_dir: the current path
3148
  @type new_file_storage_dir: str
3149
  @param new_file_storage_dir: the name we should rename to
3150
  @rtype: tuple (success,)
3151
  @return: tuple of one element, C{success}, denoting
3152
      whether the operation was successful
3153

3154
  """
3155
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
3156
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
3157
  if not os.path.exists(new_file_storage_dir):
3158
    if os.path.isdir(old_file_storage_dir):
3159
      try:
3160
        os.rename(old_file_storage_dir, new_file_storage_dir)
3161
      except OSError, err:
3162
        _Fail("Cannot rename '%s' to '%s': %s",
3163
              old_file_storage_dir, new_file_storage_dir, err)
3164
    else:
3165
      _Fail("Specified storage dir '%s' is not a directory",
3166
            old_file_storage_dir)
3167
  else:
3168
    if os.path.exists(old_file_storage_dir):
3169
      _Fail("Cannot rename '%s' to '%s': both locations exist",
3170
            old_file_storage_dir, new_file_storage_dir)
3171

    
3172

    
3173
def _EnsureJobQueueFile(file_name):
3174
  """Checks whether the given filename is in the queue directory.
3175

3176
  @type file_name: str
3177
  @param file_name: the file name we should check
3178
  @rtype: None
3179
  @raises RPCFail: if the file is not valid
3180

3181
  """
3182
  if not utils.IsBelowDir(pathutils.QUEUE_DIR, file_name):
3183
    _Fail("Passed job queue file '%s' does not belong to"
3184
          " the queue directory '%s'", file_name, pathutils.QUEUE_DIR)
3185

    
3186

    
3187
def JobQueueUpdate(file_name, content):
3188
  """Updates a file in the queue directory.
3189

3190
  This is just a wrapper over L{utils.io.WriteFile}, with proper
3191
  checking.
3192

3193
  @type file_name: str
3194
  @param file_name: the job file name
3195
  @type content: str
3196
  @param content: the new job contents
3197
  @rtype: boolean
3198
  @return: the success of the operation
3199

3200
  """
3201
  file_name = vcluster.LocalizeVirtualPath(file_name)
3202

    
3203
  _EnsureJobQueueFile(file_name)
3204
  getents = runtime.GetEnts()
3205

    
3206
  # Write and replace the file atomically
3207
  utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
3208
                  gid=getents.daemons_gid, mode=constants.JOB_QUEUE_FILES_PERMS)
3209

    
3210

    
3211
def JobQueueRename(old, new):
3212
  """Renames a job queue file.
3213

3214
  This is just a wrapper over os.rename with proper checking.
3215

3216
  @type old: str
3217
  @param old: the old (actual) file name
3218
  @type new: str
3219
  @param new: the desired file name
3220
  @rtype: tuple
3221
  @return: the success of the operation and payload
3222

3223
  """
3224
  old = vcluster.LocalizeVirtualPath(old)
3225
  new = vcluster.LocalizeVirtualPath(new)
3226

    
3227
  _EnsureJobQueueFile(old)
3228
  _EnsureJobQueueFile(new)
3229

    
3230
  getents = runtime.GetEnts()
3231

    
3232
  utils.RenameFile(old, new, mkdir=True, mkdir_mode=0750,
3233
                   dir_uid=getents.masterd_uid, dir_gid=getents.daemons_gid)
3234

    
3235

    
3236
def BlockdevClose(instance_name, disks):
3237
  """Closes the given block devices.
3238

3239
  This means they will be switched to secondary mode (in case of
3240
  DRBD).
3241

3242
  @param instance_name: if the argument is not empty, the symlinks
3243
      of this instance will be removed
3244
  @type disks: list of L{objects.Disk}
3245
  @param disks: the list of disks to be closed
3246
  @rtype: tuple (success, message)
3247
  @return: a tuple of success and message, where success
3248
      indicates the succes of the operation, and message
3249
      which will contain the error details in case we
3250
      failed
3251

3252
  """
3253
  bdevs = []
3254
  for cf in disks:
3255
    rd = _RecursiveFindBD(cf)
3256
    if rd is None:
3257
      _Fail("Can't find device %s", cf)
3258
    bdevs.append(rd)
3259

    
3260
  msg = []
3261
  for rd in bdevs:
3262
    try:
3263
      rd.Close()
3264
    except errors.BlockDeviceError, err:
3265
      msg.append(str(err))
3266
  if msg:
3267
    _Fail("Can't make devices secondary: %s", ",".join(msg))
3268
  else:
3269
    if instance_name:
3270
      _RemoveBlockDevLinks(instance_name, disks)
3271

    
3272

    
3273
def ValidateHVParams(hvname, hvparams):
3274
  """Validates the given hypervisor parameters.
3275

3276
  @type hvname: string
3277
  @param hvname: the hypervisor name
3278
  @type hvparams: dict
3279
  @param hvparams: the hypervisor parameters to be validated
3280
  @rtype: None
3281

3282
  """
3283
  try:
3284
    hv_type = hypervisor.GetHypervisor(hvname)
3285
    hv_type.ValidateParameters(hvparams)
3286
  except errors.HypervisorError, err:
3287
    _Fail(str(err), log=False)
3288

    
3289

    
3290
def _CheckOSPList(os_obj, parameters):
3291
  """Check whether a list of parameters is supported by the OS.
3292

3293
  @type os_obj: L{objects.OS}
3294
  @param os_obj: OS object to check
3295
  @type parameters: list
3296
  @param parameters: the list of parameters to check
3297

3298
  """
3299
  supported = [v[0] for v in os_obj.supported_parameters]
3300
  delta = frozenset(parameters).difference(supported)
3301
  if delta:
3302
    _Fail("The following parameters are not supported"
3303
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
3304

    
3305

    
3306
def ValidateOS(required, osname, checks, osparams):
3307
  """Validate the given OS' parameters.
3308

3309
  @type required: boolean
3310
  @param required: whether absence of the OS should translate into
3311
      failure or not
3312
  @type osname: string
3313
  @param osname: the OS to be validated
3314
  @type checks: list
3315
  @param checks: list of the checks to run (currently only 'parameters')
3316
  @type osparams: dict
3317
  @param osparams: dictionary with OS parameters
3318
  @rtype: boolean
3319
  @return: True if the validation passed, or False if the OS was not
3320
      found and L{required} was false
3321

3322
  """
3323
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
3324
    _Fail("Unknown checks required for OS %s: %s", osname,
3325
          set(checks).difference(constants.OS_VALIDATE_CALLS))
3326

    
3327
  name_only = objects.OS.GetName(osname)
3328
  status, tbv = _TryOSFromDisk(name_only, None)
3329

    
3330
  if not status:
3331
    if required:
3332
      _Fail(tbv)
3333
    else:
3334
      return False
3335

    
3336
  if max(tbv.api_versions) < constants.OS_API_V20:
3337
    return True
3338

    
3339
  if constants.OS_VALIDATE_PARAMETERS in checks:
3340
    _CheckOSPList(tbv, osparams.keys())
3341

    
3342
  validate_env = OSCoreEnv(osname, tbv, osparams)
3343
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
3344
                        cwd=tbv.path, reset_env=True)
3345
  if result.failed:
3346
    logging.error("os validate command '%s' returned error: %s output: %s",
3347
                  result.cmd, result.fail_reason, result.output)
3348
    _Fail("OS validation script failed (%s), output: %s",
3349
          result.fail_reason, result.output, log=False)
3350

    
3351
  return True
3352

    
3353

    
3354
def DemoteFromMC():
3355
  """Demotes the current node from master candidate role.
3356

3357
  """
3358
  # try to ensure we're not the master by mistake
3359
  master, myself = ssconf.GetMasterAndMyself()
3360
  if master == myself:
3361
    _Fail("ssconf status shows I'm the master node, will not demote")
3362

    
3363
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "check", constants.MASTERD])
3364
  if not result.failed:
3365
    _Fail("The master daemon is running, will not demote")
3366

    
3367
  try:
3368
    if os.path.isfile(pathutils.CLUSTER_CONF_FILE):
3369
      utils.CreateBackup(pathutils.CLUSTER_CONF_FILE)
3370
  except EnvironmentError, err:
3371
    if err.errno != errno.ENOENT:
3372
      _Fail("Error while backing up cluster file: %s", err, exc=True)
3373

    
3374
  utils.RemoveFile(pathutils.CLUSTER_CONF_FILE)
3375

    
3376

    
3377
def _GetX509Filenames(cryptodir, name):
3378
  """Returns the full paths for the private key and certificate.
3379

3380
  """
3381
  return (utils.PathJoin(cryptodir, name),
3382
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
3383
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
3384

    
3385

    
3386
def CreateX509Certificate(validity, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3387
  """Creates a new X509 certificate for SSL/TLS.
3388

3389
  @type validity: int
3390
  @param validity: Validity in seconds
3391
  @rtype: tuple; (string, string)
3392
  @return: Certificate name and public part
3393

3394
  """
3395
  (key_pem, cert_pem) = \
3396
    utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
3397
                                     min(validity, _MAX_SSL_CERT_VALIDITY))
3398

    
3399
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
3400
                              prefix="x509-%s-" % utils.TimestampForFilename())
3401
  try:
3402
    name = os.path.basename(cert_dir)
3403
    assert len(name) > 5
3404

    
3405
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3406

    
3407
    utils.WriteFile(key_file, mode=0400, data=key_pem)
3408
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
3409

    
3410
    # Never return private key as it shouldn't leave the node
3411
    return (name, cert_pem)
3412
  except Exception:
3413
    shutil.rmtree(cert_dir, ignore_errors=True)
3414
    raise
3415

    
3416

    
3417
def RemoveX509Certificate(name, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3418
  """Removes a X509 certificate.
3419

3420
  @type name: string
3421
  @param name: Certificate name
3422

3423
  """
3424
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3425

    
3426
  utils.RemoveFile(key_file)
3427
  utils.RemoveFile(cert_file)
3428

    
3429
  try:
3430
    os.rmdir(cert_dir)
3431
  except EnvironmentError, err:
3432
    _Fail("Cannot remove certificate directory '%s': %s",
3433
          cert_dir, err)
3434

    
3435

    
3436
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3437
  """Returns the command for the requested input/output.
3438

3439
  @type instance: L{objects.Instance}
3440
  @param instance: The instance object
3441
  @param mode: Import/export mode
3442
  @param ieio: Input/output type
3443
  @param ieargs: Input/output arguments
3444

3445
  """
3446
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3447

    
3448
  env = None
3449
  prefix = None
3450
  suffix = None
3451
  exp_size = None
3452

    
3453
  if ieio == constants.IEIO_FILE:
3454
    (filename, ) = ieargs
3455

    
3456
    if not utils.IsNormAbsPath(filename):
3457
      _Fail("Path '%s' is not normalized or absolute", filename)
3458

    
3459
    real_filename = os.path.realpath(filename)
3460
    directory = os.path.dirname(real_filename)
3461

    
3462
    if not utils.IsBelowDir(pathutils.EXPORT_DIR, real_filename):
3463
      _Fail("File '%s' is not under exports directory '%s': %s",
3464
            filename, pathutils.EXPORT_DIR, real_filename)
3465

    
3466
    # Create directory
3467
    utils.Makedirs(directory, mode=0750)
3468

    
3469
    quoted_filename = utils.ShellQuote(filename)
3470

    
3471
    if mode == constants.IEM_IMPORT:
3472
      suffix = "> %s" % quoted_filename
3473
    elif mode == constants.IEM_EXPORT:
3474
      suffix = "< %s" % quoted_filename
3475

    
3476
      # Retrieve file size
3477
      try:
3478
        st = os.stat(filename)
3479
      except EnvironmentError, err:
3480
        logging.error("Can't stat(2) %s: %s", filename, err)
3481
      else:
3482
        exp_size = utils.BytesToMebibyte(st.st_size)
3483

    
3484
  elif ieio == constants.IEIO_RAW_DISK:
3485
    (disk, ) = ieargs
3486

    
3487
    real_disk = _OpenRealBD(disk)
3488

    
3489
    if mode == constants.IEM_IMPORT:
3490
      # we set here a smaller block size as, due to transport buffering, more
3491
      # than 64-128k will mostly ignored; we use nocreat to fail if the device
3492
      # is not already there or we pass a wrong path; we use notrunc to no
3493
      # attempt truncate on an LV device; we use oflag=dsync to not buffer too
3494
      # much memory; this means that at best, we flush every 64k, which will
3495
      # not be very fast
3496
      suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
3497
                                    " bs=%s oflag=dsync"),
3498
                                    real_disk.dev_path,
3499
                                    str(64 * 1024))
3500

    
3501
    elif mode == constants.IEM_EXPORT:
3502
      # the block size on the read dd is 1MiB to match our units
3503
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3504
                                   real_disk.dev_path,
3505
                                   str(1024 * 1024), # 1 MB
3506
                                   str(disk.size))
3507
      exp_size = disk.size
3508

    
3509
  elif ieio == constants.IEIO_SCRIPT:
3510
    (disk, disk_index, ) = ieargs
3511

    
3512
    assert isinstance(disk_index, (int, long))
3513

    
3514
    real_disk = _OpenRealBD(disk)
3515

    
3516
    inst_os = OSFromDisk(instance.os)
3517
    env = OSEnvironment(instance, inst_os)
3518

    
3519
    if mode == constants.IEM_IMPORT:
3520
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3521
      env["IMPORT_INDEX"] = str(disk_index)
3522
      script = inst_os.import_script
3523

    
3524
    elif mode == constants.IEM_EXPORT:
3525
      env["EXPORT_DEVICE"] = real_disk.dev_path
3526
      env["EXPORT_INDEX"] = str(disk_index)
3527
      script = inst_os.export_script
3528

    
3529
    # TODO: Pass special environment only to script
3530
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3531

    
3532
    if mode == constants.IEM_IMPORT:
3533
      suffix = "| %s" % script_cmd
3534

    
3535
    elif mode == constants.IEM_EXPORT:
3536
      prefix = "%s |" % script_cmd
3537

    
3538
    # Let script predict size
3539
    exp_size = constants.IE_CUSTOM_SIZE
3540

    
3541
  else:
3542
    _Fail("Invalid %s I/O mode %r", mode, ieio)
3543

    
3544
  return (env, prefix, suffix, exp_size)
3545

    
3546

    
3547
def _CreateImportExportStatusDir(prefix):
3548
  """Creates status directory for import/export.
3549

3550
  """
3551
  return tempfile.mkdtemp(dir=pathutils.IMPORT_EXPORT_DIR,
3552
                          prefix=("%s-%s-" %
3553
                                  (prefix, utils.TimestampForFilename())))
3554

    
3555

    
3556
def StartImportExportDaemon(mode, opts, host, port, instance, component,
3557
                            ieio, ieioargs):
3558
  """Starts an import or export daemon.
3559

3560
  @param mode: Import/output mode
3561
  @type opts: L{objects.ImportExportOptions}
3562
  @param opts: Daemon options
3563
  @type host: string
3564
  @param host: Remote host for export (None for import)
3565
  @type port: int
3566
  @param port: Remote port for export (None for import)
3567
  @type instance: L{objects.Instance}
3568
  @param instance: Instance object
3569
  @type component: string
3570
  @param component: which part of the instance is transferred now,
3571
      e.g. 'disk/0'
3572
  @param ieio: Input/output type
3573
  @param ieioargs: Input/output arguments
3574

3575
  """
3576
  if mode == constants.IEM_IMPORT:
3577
    prefix = "import"
3578

    
3579
    if not (host is None and port is None):
3580
      _Fail("Can not specify host or port on import")
3581

    
3582
  elif mode == constants.IEM_EXPORT:
3583
    prefix = "export"
3584

    
3585
    if host is None or port is None:
3586
      _Fail("Host and port must be specified for an export")
3587

    
3588
  else:
3589
    _Fail("Invalid mode %r", mode)
3590

    
3591
  if (opts.key_name is None) ^ (opts.ca_pem is None):
3592
    _Fail("Cluster certificate can only be used for both key and CA")
3593

    
3594
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3595
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3596

    
3597
  if opts.key_name is None:
3598
    # Use server.pem
3599
    key_path = pathutils.NODED_CERT_FILE
3600
    cert_path = pathutils.NODED_CERT_FILE
3601
    assert opts.ca_pem is None
3602
  else:
3603
    (_, key_path, cert_path) = _GetX509Filenames(pathutils.CRYPTO_KEYS_DIR,
3604
                                                 opts.key_name)
3605
    assert opts.ca_pem is not None
3606

    
3607
  for i in [key_path, cert_path]:
3608
    if not os.path.exists(i):
3609
      _Fail("File '%s' does not exist" % i)
3610

    
3611
  status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3612
  try:
3613
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3614
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3615
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3616

    
3617
    if opts.ca_pem is None:
3618
      # Use server.pem
3619
      ca = utils.ReadFile(pathutils.NODED_CERT_FILE)
3620
    else:
3621
      ca = opts.ca_pem
3622

    
3623
    # Write CA file
3624
    utils.WriteFile(ca_file, data=ca, mode=0400)
3625

    
3626
    cmd = [
3627
      pathutils.IMPORT_EXPORT_DAEMON,
3628
      status_file, mode,
3629
      "--key=%s" % key_path,
3630
      "--cert=%s" % cert_path,
3631
      "--ca=%s" % ca_file,
3632
      ]
3633

    
3634
    if host:
3635
      cmd.append("--host=%s" % host)
3636

    
3637
    if port:
3638
      cmd.append("--port=%s" % port)
3639

    
3640
    if opts.ipv6:
3641
      cmd.append("--ipv6")
3642
    else:
3643
      cmd.append("--ipv4")
3644

    
3645
    if opts.compress:
3646
      cmd.append("--compress=%s" % opts.compress)
3647

    
3648
    if opts.magic:
3649
      cmd.append("--magic=%s" % opts.magic)
3650

    
3651
    if exp_size is not None:
3652
      cmd.append("--expected-size=%s" % exp_size)
3653

    
3654
    if cmd_prefix:
3655
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
3656

    
3657
    if cmd_suffix:
3658
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
3659

    
3660
    if mode == constants.IEM_EXPORT:
3661
      # Retry connection a few times when connecting to remote peer
3662
      cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3663
      cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3664
    elif opts.connect_timeout is not None:
3665
      assert mode == constants.IEM_IMPORT
3666
      # Overall timeout for establishing connection while listening
3667
      cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3668

    
3669
    logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3670

    
3671
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3672
    # support for receiving a file descriptor for output
3673
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3674
                      output=logfile)
3675

    
3676
    # The import/export name is simply the status directory name
3677
    return os.path.basename(status_dir)
3678

    
3679
  except Exception:
3680
    shutil.rmtree(status_dir, ignore_errors=True)
3681
    raise
3682

    
3683

    
3684
def GetImportExportStatus(names):
3685
  """Returns import/export daemon status.
3686

3687
  @type names: sequence
3688
  @param names: List of names
3689
  @rtype: List of dicts
3690
  @return: Returns a list of the state of each named import/export or None if a
3691
           status couldn't be read
3692

3693
  """
3694
  result = []
3695

    
3696
  for name in names:
3697
    status_file = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name,
3698
                                 _IES_STATUS_FILE)
3699

    
3700
    try:
3701
      data = utils.ReadFile(status_file)
3702
    except EnvironmentError, err:
3703
      if err.errno != errno.ENOENT:
3704
        raise
3705
      data = None
3706

    
3707
    if not data:
3708
      result.append(None)
3709
      continue
3710

    
3711
    result.append(serializer.LoadJson(data))
3712

    
3713
  return result
3714

    
3715

    
3716
def AbortImportExport(name):
3717
  """Sends SIGTERM to a running import/export daemon.
3718

3719
  """
3720
  logging.info("Abort import/export %s", name)
3721

    
3722
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3723
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3724

    
3725
  if pid:
3726
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3727
                 name, pid)
3728
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3729

    
3730

    
3731
def CleanupImportExport(name):
3732
  """Cleanup after an import or export.
3733

3734
  If the import/export daemon is still running it's killed. Afterwards the
3735
  whole status directory is removed.
3736

3737
  """
3738
  logging.info("Finalizing import/export %s", name)
3739

    
3740
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3741

    
3742
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3743

    
3744
  if pid:
3745
    logging.info("Import/export %s is still running with PID %s",
3746
                 name, pid)
3747
    utils.KillProcess(pid, waitpid=False)
3748

    
3749
  shutil.rmtree(status_dir, ignore_errors=True)
3750

    
3751

    
3752
def _FindDisks(nodes_ip, disks):
3753
  """Sets the physical ID on disks and returns the block devices.
3754

3755
  """
3756
  # set the correct physical ID
3757
  my_name = netutils.Hostname.GetSysName()
3758
  for cf in disks:
3759
    cf.SetPhysicalID(my_name, nodes_ip)
3760

    
3761
  bdevs = []
3762

    
3763
  for cf in disks:
3764
    rd = _RecursiveFindBD(cf)
3765
    if rd is None:
3766
      _Fail("Can't find device %s", cf)
3767
    bdevs.append(rd)
3768
  return bdevs
3769

    
3770

    
3771
def DrbdDisconnectNet(nodes_ip, disks):
3772
  """Disconnects the network on a list of drbd devices.
3773

3774
  """
3775
  bdevs = _FindDisks(nodes_ip, disks)
3776

    
3777
  # disconnect disks
3778
  for rd in bdevs:
3779
    try:
3780
      rd.DisconnectNet()
3781
    except errors.BlockDeviceError, err:
3782
      _Fail("Can't change network configuration to standalone mode: %s",
3783
            err, exc=True)
3784

    
3785

    
3786
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3787
  """Attaches the network on a list of drbd devices.
3788

3789
  """
3790
  bdevs = _FindDisks(nodes_ip, disks)
3791

    
3792
  if multimaster:
3793
    for idx, rd in enumerate(bdevs):
3794
      try:
3795
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3796
      except EnvironmentError, err:
3797
        _Fail("Can't create symlink: %s", err)
3798
  # reconnect disks, switch to new master configuration and if
3799
  # needed primary mode
3800
  for rd in bdevs:
3801
    try:
3802
      rd.AttachNet(multimaster)
3803
    except errors.BlockDeviceError, err:
3804
      _Fail("Can't change network configuration: %s", err)
3805

    
3806
  # wait until the disks are connected; we need to retry the re-attach
3807
  # if the device becomes standalone, as this might happen if the one
3808
  # node disconnects and reconnects in a different mode before the
3809
  # other node reconnects; in this case, one or both of the nodes will
3810
  # decide it has wrong configuration and switch to standalone
3811

    
3812
  def _Attach():
3813
    all_connected = True
3814

    
3815
    for rd in bdevs:
3816
      stats = rd.GetProcStatus()
3817

    
3818
      all_connected = (all_connected and
3819
                       (stats.is_connected or stats.is_in_resync))
3820

    
3821
      if stats.is_standalone:
3822
        # peer had different config info and this node became
3823
        # standalone, even though this should not happen with the
3824
        # new staged way of changing disk configs
3825
        try:
3826
          rd.AttachNet(multimaster)
3827
        except errors.BlockDeviceError, err:
3828
          _Fail("Can't change network configuration: %s", err)
3829

    
3830
    if not all_connected:
3831
      raise utils.RetryAgain()
3832

    
3833
  try:
3834
    # Start with a delay of 100 miliseconds and go up to 5 seconds
3835
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3836
  except utils.RetryTimeout:
3837
    _Fail("Timeout in disk reconnecting")
3838

    
3839
  if multimaster:
3840
    # change to primary mode
3841
    for rd in bdevs:
3842
      try:
3843
        rd.Open()
3844
      except errors.BlockDeviceError, err:
3845
        _Fail("Can't change to primary mode: %s", err)
3846

    
3847

    
3848
def DrbdWaitSync(nodes_ip, disks):
3849
  """Wait until DRBDs have synchronized.
3850

3851
  """
3852
  def _helper(rd):
3853
    stats = rd.GetProcStatus()
3854
    if not (stats.is_connected or stats.is_in_resync):
3855
      raise utils.RetryAgain()
3856
    return stats
3857

    
3858
  bdevs = _FindDisks(nodes_ip, disks)
3859

    
3860
  min_resync = 100
3861
  alldone = True
3862
  for rd in bdevs:
3863
    try:
3864
      # poll each second for 15 seconds
3865
      stats = utils.Retry(_helper, 1, 15, args=[rd])
3866
    except utils.RetryTimeout:
3867
      stats = rd.GetProcStatus()
3868
      # last check
3869
      if not (stats.is_connected or stats.is_in_resync):
3870
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3871
    alldone = alldone and (not stats.is_in_resync)
3872
    if stats.sync_percent is not None:
3873
      min_resync = min(min_resync, stats.sync_percent)
3874

    
3875
  return (alldone, min_resync)
3876

    
3877

    
3878
def GetDrbdUsermodeHelper():
3879
  """Returns DRBD usermode helper currently configured.
3880

3881
  """
3882
  try:
3883
    return drbd.DRBD8.GetUsermodeHelper()
3884
  except errors.BlockDeviceError, err:
3885
    _Fail(str(err))
3886

    
3887

    
3888
def PowercycleNode(hypervisor_type):
3889
  """Hard-powercycle the node.
3890

3891
  Because we need to return first, and schedule the powercycle in the
3892
  background, we won't be able to report failures nicely.
3893

3894
  """
3895
  hyper = hypervisor.GetHypervisor(hypervisor_type)
3896
  try:
3897
    pid = os.fork()
3898
  except OSError:
3899
    # if we can't fork, we'll pretend that we're in the child process
3900
    pid = 0
3901
  if pid > 0:
3902
    return "Reboot scheduled in 5 seconds"
3903
  # ensure the child is running on ram
3904
  try:
3905
    utils.Mlockall()
3906
  except Exception: # pylint: disable=W0703
3907
    pass
3908
  time.sleep(5)
3909
  hyper.PowercycleNode()
3910

    
3911

    
3912
def _VerifyRestrictedCmdName(cmd):
3913
  """Verifies a restricted command name.
3914

3915
  @type cmd: string
3916
  @param cmd: Command name
3917
  @rtype: tuple; (boolean, string or None)
3918
  @return: The tuple's first element is the status; if C{False}, the second
3919
    element is an error message string, otherwise it's C{None}
3920

3921
  """
3922
  if not cmd.strip():
3923
    return (False, "Missing command name")
3924

    
3925
  if os.path.basename(cmd) != cmd:
3926
    return (False, "Invalid command name")
3927

    
3928
  if not constants.EXT_PLUGIN_MASK.match(cmd):
3929
    return (False, "Command name contains forbidden characters")
3930

    
3931
  return (True, None)
3932

    
3933

    
3934
def _CommonRestrictedCmdCheck(path, owner):
3935
  """Common checks for restricted command file system directories and files.
3936

3937
  @type path: string
3938
  @param path: Path to check
3939
  @param owner: C{None} or tuple containing UID and GID
3940
  @rtype: tuple; (boolean, string or C{os.stat} result)
3941
  @return: The tuple's first element is the status; if C{False}, the second
3942
    element is an error message string, otherwise it's the result of C{os.stat}
3943

3944
  """
3945
  if owner is None:
3946
    # Default to root as owner
3947
    owner = (0, 0)
3948

    
3949
  try:
3950
    st = os.stat(path)
3951
  except EnvironmentError, err:
3952
    return (False, "Can't stat(2) '%s': %s" % (path, err))
3953

    
3954
  if stat.S_IMODE(st.st_mode) & (~_RCMD_MAX_MODE):
3955
    return (False, "Permissions on '%s' are too permissive" % path)
3956

    
3957
  if (st.st_uid, st.st_gid) != owner:
3958
    (owner_uid, owner_gid) = owner
3959
    return (False, "'%s' is not owned by %s:%s" % (path, owner_uid, owner_gid))
3960

    
3961
  return (True, st)
3962

    
3963

    
3964
def _VerifyRestrictedCmdDirectory(path, _owner=None):
3965
  """Verifies restricted command directory.
3966

3967
  @type path: string
3968
  @param path: Path to check
3969
  @rtype: tuple; (boolean, string or None)
3970
  @return: The tuple's first element is the status; if C{False}, the second
3971
    element is an error message string, otherwise it's C{None}
3972

3973
  """
3974
  (status, value) = _CommonRestrictedCmdCheck(path, _owner)
3975

    
3976
  if not status:
3977
    return (False, value)
3978

    
3979
  if not stat.S_ISDIR(value.st_mode):
3980
    return (False, "Path '%s' is not a directory" % path)
3981

    
3982
  return (True, None)
3983

    
3984

    
3985
def _VerifyRestrictedCmd(path, cmd, _owner=None):
3986
  """Verifies a whole restricted command and returns its executable filename.
3987

3988
  @type path: string
3989
  @param path: Directory containing restricted commands
3990
  @type cmd: string
3991
  @param cmd: Command name
3992
  @rtype: tuple; (boolean, string)
3993
  @return: The tuple's first element is the status; if C{False}, the second
3994
    element is an error message string, otherwise the second element is the
3995
    absolute path to the executable
3996

3997
  """
3998
  executable = utils.PathJoin(path, cmd)
3999

    
4000
  (status, msg) = _CommonRestrictedCmdCheck(executable, _owner)
4001

    
4002
  if not status:
4003
    return (False, msg)
4004

    
4005
  if not utils.IsExecutable(executable):
4006
    return (False, "access(2) thinks '%s' can't be executed" % executable)
4007

    
4008
  return (True, executable)
4009

    
4010

    
4011
def _PrepareRestrictedCmd(path, cmd,
4012
                          _verify_dir=_VerifyRestrictedCmdDirectory,
4013
                          _verify_name=_VerifyRestrictedCmdName,
4014
                          _verify_cmd=_VerifyRestrictedCmd):
4015
  """Performs a number of tests on a restricted command.
4016

4017
  @type path: string
4018
  @param path: Directory containing restricted commands
4019
  @type cmd: string
4020
  @param cmd: Command name
4021
  @return: Same as L{_VerifyRestrictedCmd}
4022

4023
  """
4024
  # Verify the directory first
4025
  (status, msg) = _verify_dir(path)
4026
  if status:
4027
    # Check command if everything was alright
4028
    (status, msg) = _verify_name(cmd)
4029

    
4030
  if not status:
4031
    return (False, msg)
4032

    
4033
  # Check actual executable
4034
  return _verify_cmd(path, cmd)
4035

    
4036

    
4037
def RunRestrictedCmd(cmd,
4038
                     _lock_timeout=_RCMD_LOCK_TIMEOUT,
4039
                     _lock_file=pathutils.RESTRICTED_COMMANDS_LOCK_FILE,
4040
                     _path=pathutils.RESTRICTED_COMMANDS_DIR,
4041
                     _sleep_fn=time.sleep,
4042
                     _prepare_fn=_PrepareRestrictedCmd,
4043
                     _runcmd_fn=utils.RunCmd,
4044
                     _enabled=constants.ENABLE_RESTRICTED_COMMANDS):
4045
  """Executes a restricted command after performing strict tests.
4046

4047
  @type cmd: string
4048
  @param cmd: Command name
4049
  @rtype: string
4050
  @return: Command output
4051
  @raise RPCFail: In case of an error
4052

4053
  """
4054
  logging.info("Preparing to run restricted command '%s'", cmd)
4055

    
4056
  if not _enabled:
4057
    _Fail("Restricted commands disabled at configure time")
4058

    
4059
  lock = None
4060
  try:
4061
    cmdresult = None
4062
    try:
4063
      lock = utils.FileLock.Open(_lock_file)
4064
      lock.Exclusive(blocking=True, timeout=_lock_timeout)
4065

    
4066
      (status, value) = _prepare_fn(_path, cmd)
4067

    
4068
      if status:
4069
        cmdresult = _runcmd_fn([value], env={}, reset_env=True,
4070
                               postfork_fn=lambda _: lock.Unlock())
4071
      else:
4072
        logging.error(value)
4073
    except Exception: # pylint: disable=W0703
4074
      # Keep original error in log
4075
      logging.exception("Caught exception")
4076

    
4077
    if cmdresult is None:
4078
      logging.info("Sleeping for %0.1f seconds before returning",
4079
                   _RCMD_INVALID_DELAY)
4080
      _sleep_fn(_RCMD_INVALID_DELAY)
4081

    
4082
      # Do not include original error message in returned error
4083
      _Fail("Executing command '%s' failed" % cmd)
4084
    elif cmdresult.failed or cmdresult.fail_reason:
4085
      _Fail("Restricted command '%s' failed: %s; output: %s",
4086
            cmd, cmdresult.fail_reason, cmdresult.output)
4087
    else:
4088
      return cmdresult.output
4089
  finally:
4090
    if lock is not None:
4091
      # Release lock at last
4092
      lock.Close()
4093
      lock = None
4094

    
4095

    
4096
def SetWatcherPause(until, _filename=pathutils.WATCHER_PAUSEFILE):
4097
  """Creates or removes the watcher pause file.
4098

4099
  @type until: None or number
4100
  @param until: Unix timestamp saying until when the watcher shouldn't run
4101

4102
  """
4103
  if until is None:
4104
    logging.info("Received request to no longer pause watcher")
4105
    utils.RemoveFile(_filename)
4106
  else:
4107
    logging.info("Received request to pause watcher until %s", until)
4108

    
4109
    if not ht.TNumber(until):
4110
      _Fail("Duration must be numeric")
4111

    
4112
    utils.WriteFile(_filename, data="%d\n" % (until, ), mode=0644)
4113

    
4114

    
4115
class HooksRunner(object):
4116
  """Hook runner.
4117

4118
  This class is instantiated on the node side (ganeti-noded) and not
4119
  on the master side.
4120

4121
  """
4122
  def __init__(self, hooks_base_dir=None):
4123
    """Constructor for hooks runner.
4124

4125
    @type hooks_base_dir: str or None
4126
    @param hooks_base_dir: if not None, this overrides the
4127
        L{pathutils.HOOKS_BASE_DIR} (useful for unittests)
4128

4129
    """
4130
    if hooks_base_dir is None:
4131
      hooks_base_dir = pathutils.HOOKS_BASE_DIR
4132
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
4133
    # constant
4134
    self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
4135

    
4136
  def RunLocalHooks(self, node_list, hpath, phase, env):
4137
    """Check that the hooks will be run only locally and then run them.
4138

4139
    """
4140
    assert len(node_list) == 1
4141
    node = node_list[0]
4142
    _, myself = ssconf.GetMasterAndMyself()
4143
    assert node == myself
4144

    
4145
    results = self.RunHooks(hpath, phase, env)
4146

    
4147
    # Return values in the form expected by HooksMaster
4148
    return {node: (None, False, results)}
4149

    
4150
  def RunHooks(self, hpath, phase, env):
4151
    """Run the scripts in the hooks directory.
4152

4153
    @type hpath: str
4154
    @param hpath: the path to the hooks directory which
4155
        holds the scripts
4156
    @type phase: str
4157
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
4158
        L{constants.HOOKS_PHASE_POST}
4159
    @type env: dict
4160
    @param env: dictionary with the environment for the hook
4161
    @rtype: list
4162
    @return: list of 3-element tuples:
4163
      - script path
4164
      - script result, either L{constants.HKR_SUCCESS} or
4165
        L{constants.HKR_FAIL}
4166
      - output of the script
4167

4168
    @raise errors.ProgrammerError: for invalid input
4169
        parameters
4170

4171
    """
4172
    if phase == constants.HOOKS_PHASE_PRE:
4173
      suffix = "pre"
4174
    elif phase == constants.HOOKS_PHASE_POST:
4175
      suffix = "post"
4176
    else:
4177
      _Fail("Unknown hooks phase '%s'", phase)
4178

    
4179
    subdir = "%s-%s.d" % (hpath, suffix)
4180
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
4181

    
4182
    results = []
4183

    
4184
    if not os.path.isdir(dir_name):
4185
      # for non-existing/non-dirs, we simply exit instead of logging a
4186
      # warning at every operation
4187
      return results
4188

    
4189
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
4190

    
4191
    for (relname, relstatus, runresult) in runparts_results:
4192
      if relstatus == constants.RUNPARTS_SKIP:
4193
        rrval = constants.HKR_SKIP
4194
        output = ""
4195
      elif relstatus == constants.RUNPARTS_ERR:
4196
        rrval = constants.HKR_FAIL
4197
        output = "Hook script execution error: %s" % runresult
4198
      elif relstatus == constants.RUNPARTS_RUN:
4199
        if runresult.failed:
4200
          rrval = constants.HKR_FAIL
4201
        else:
4202
          rrval = constants.HKR_SUCCESS
4203
        output = utils.SafeEncode(runresult.output.strip())
4204
      results.append(("%s/%s" % (subdir, relname), rrval, output))
4205

    
4206
    return results
4207

    
4208

    
4209
class IAllocatorRunner(object):
4210
  """IAllocator runner.
4211

4212
  This class is instantiated on the node side (ganeti-noded) and not on
4213
  the master side.
4214

4215
  """
4216
  @staticmethod
4217
  def Run(name, idata):
4218
    """Run an iallocator script.
4219

4220
    @type name: str
4221
    @param name: the iallocator script name
4222
    @type idata: str
4223
    @param idata: the allocator input data
4224

4225
    @rtype: tuple
4226
    @return: two element tuple of:
4227
       - status
4228
       - either error message or stdout of allocator (for success)
4229

4230
    """
4231
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
4232
                                  os.path.isfile)
4233
    if alloc_script is None:
4234
      _Fail("iallocator module '%s' not found in the search path", name)
4235

    
4236
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
4237
    try:
4238
      os.write(fd, idata)
4239
      os.close(fd)
4240
      result = utils.RunCmd([alloc_script, fin_name])
4241
      if result.failed:
4242
        _Fail("iallocator module '%s' failed: %s, output '%s'",
4243
              name, result.fail_reason, result.output)
4244
    finally:
4245
      os.unlink(fin_name)
4246

    
4247
    return result.stdout
4248

    
4249

    
4250
class DevCacheManager(object):
4251
  """Simple class for managing a cache of block device information.
4252

4253
  """
4254
  _DEV_PREFIX = "/dev/"
4255
  _ROOT_DIR = pathutils.BDEV_CACHE_DIR
4256

    
4257
  @classmethod
4258
  def _ConvertPath(cls, dev_path):
4259
    """Converts a /dev/name path to the cache file name.
4260

4261
    This replaces slashes with underscores and strips the /dev
4262
    prefix. It then returns the full path to the cache file.
4263

4264
    @type dev_path: str
4265
    @param dev_path: the C{/dev/} path name
4266
    @rtype: str
4267
    @return: the converted path name
4268

4269
    """
4270
    if dev_path.startswith(cls._DEV_PREFIX):
4271
      dev_path = dev_path[len(cls._DEV_PREFIX):]
4272
    dev_path = dev_path.replace("/", "_")
4273
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
4274
    return fpath
4275

    
4276
  @classmethod
4277
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
4278
    """Updates the cache information for a given device.
4279

4280
    @type dev_path: str
4281
    @param dev_path: the pathname of the device
4282
    @type owner: str
4283
    @param owner: the owner (instance name) of the device
4284
    @type on_primary: bool
4285
    @param on_primary: whether this is the primary
4286
        node nor not
4287
    @type iv_name: str
4288
    @param iv_name: the instance-visible name of the
4289
        device, as in objects.Disk.iv_name
4290

4291
    @rtype: None
4292

4293
    """
4294
    if dev_path is None:
4295
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
4296
      return
4297
    fpath = cls._ConvertPath(dev_path)
4298
    if on_primary:
4299
      state = "primary"
4300
    else:
4301
      state = "secondary"
4302
    if iv_name is None:
4303
      iv_name = "not_visible"
4304
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
4305
    try:
4306
      utils.WriteFile(fpath, data=fdata)
4307
    except EnvironmentError, err:
4308
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
4309

    
4310
  @classmethod
4311
  def RemoveCache(cls, dev_path):
4312
    """Remove data for a dev_path.
4313

4314
    This is just a wrapper over L{utils.io.RemoveFile} with a converted
4315
    path name and logging.
4316

4317
    @type dev_path: str
4318
    @param dev_path: the pathname of the device
4319

4320
    @rtype: None
4321

4322
    """
4323
    if dev_path is None:
4324
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
4325
      return
4326
    fpath = cls._ConvertPath(dev_path)
4327
    try:
4328
      utils.RemoveFile(fpath)
4329
    except EnvironmentError, err:
4330
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)