Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ fac83f8a

History | View | Annotate | Download (128.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable=E1103
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37

    
38
import os
39
import os.path
40
import shutil
41
import time
42
import stat
43
import errno
44
import re
45
import random
46
import logging
47
import tempfile
48
import zlib
49
import base64
50
import signal
51

    
52
from ganeti import errors
53
from ganeti import utils
54
from ganeti import ssh
55
from ganeti import hypervisor
56
from ganeti import constants
57
from ganeti.storage import bdev
58
from ganeti.storage import drbd
59
from ganeti import objects
60
from ganeti import ssconf
61
from ganeti import serializer
62
from ganeti import netutils
63
from ganeti import runtime
64
from ganeti import compat
65
from ganeti import pathutils
66
from ganeti import vcluster
67
from ganeti import ht
68
from ganeti.storage.base import BlockDev
69
from ganeti.storage.drbd import DRBD8
70
from ganeti import hooksmaster
71

    
72

    
73
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
74
_ALLOWED_CLEAN_DIRS = compat.UniqueFrozenset([
75
  pathutils.DATA_DIR,
76
  pathutils.JOB_QUEUE_ARCHIVE_DIR,
77
  pathutils.QUEUE_DIR,
78
  pathutils.CRYPTO_KEYS_DIR,
79
  ])
80
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
81
_X509_KEY_FILE = "key"
82
_X509_CERT_FILE = "cert"
83
_IES_STATUS_FILE = "status"
84
_IES_PID_FILE = "pid"
85
_IES_CA_FILE = "ca"
86

    
87
#: Valid LVS output line regex
88
_LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$")
89

    
90
# Actions for the master setup script
91
_MASTER_START = "start"
92
_MASTER_STOP = "stop"
93

    
94
#: Maximum file permissions for restricted command directory and executables
95
_RCMD_MAX_MODE = (stat.S_IRWXU |
96
                  stat.S_IRGRP | stat.S_IXGRP |
97
                  stat.S_IROTH | stat.S_IXOTH)
98

    
99
#: Delay before returning an error for restricted commands
100
_RCMD_INVALID_DELAY = 10
101

    
102
#: How long to wait to acquire lock for restricted commands (shorter than
103
#: L{_RCMD_INVALID_DELAY}) to reduce blockage of noded forks when many
104
#: command requests arrive
105
_RCMD_LOCK_TIMEOUT = _RCMD_INVALID_DELAY * 0.8
106

    
107

    
108
class RPCFail(Exception):
109
  """Class denoting RPC failure.
110

111
  Its argument is the error message.
112

113
  """
114

    
115

    
116
def _GetInstReasonFilename(instance_name):
117
  """Path of the file containing the reason of the instance status change.
118

119
  @type instance_name: string
120
  @param instance_name: The name of the instance
121
  @rtype: string
122
  @return: The path of the file
123

124
  """
125
  return utils.PathJoin(pathutils.INSTANCE_REASON_DIR, instance_name)
126

    
127

    
128
def _StoreInstReasonTrail(instance_name, trail):
129
  """Serialize a reason trail related to an instance change of state to file.
130

131
  The exact location of the file depends on the name of the instance and on
132
  the configuration of the Ganeti cluster defined at deploy time.
133

134
  @type instance_name: string
135
  @param instance_name: The name of the instance
136
  @rtype: None
137

138
  """
139
  json = serializer.DumpJson(trail)
140
  filename = _GetInstReasonFilename(instance_name)
141
  utils.WriteFile(filename, data=json)
142

    
143

    
144
def _Fail(msg, *args, **kwargs):
145
  """Log an error and the raise an RPCFail exception.
146

147
  This exception is then handled specially in the ganeti daemon and
148
  turned into a 'failed' return type. As such, this function is a
149
  useful shortcut for logging the error and returning it to the master
150
  daemon.
151

152
  @type msg: string
153
  @param msg: the text of the exception
154
  @raise RPCFail
155

156
  """
157
  if args:
158
    msg = msg % args
159
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
160
    if "exc" in kwargs and kwargs["exc"]:
161
      logging.exception(msg)
162
    else:
163
      logging.error(msg)
164
  raise RPCFail(msg)
165

    
166

    
167
def _GetConfig():
168
  """Simple wrapper to return a SimpleStore.
169

170
  @rtype: L{ssconf.SimpleStore}
171
  @return: a SimpleStore instance
172

173
  """
174
  return ssconf.SimpleStore()
175

    
176

    
177
def _GetSshRunner(cluster_name):
178
  """Simple wrapper to return an SshRunner.
179

180
  @type cluster_name: str
181
  @param cluster_name: the cluster name, which is needed
182
      by the SshRunner constructor
183
  @rtype: L{ssh.SshRunner}
184
  @return: an SshRunner instance
185

186
  """
187
  return ssh.SshRunner(cluster_name)
188

    
189

    
190
def _Decompress(data):
191
  """Unpacks data compressed by the RPC client.
192

193
  @type data: list or tuple
194
  @param data: Data sent by RPC client
195
  @rtype: str
196
  @return: Decompressed data
197

198
  """
199
  assert isinstance(data, (list, tuple))
200
  assert len(data) == 2
201
  (encoding, content) = data
202
  if encoding == constants.RPC_ENCODING_NONE:
203
    return content
204
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
205
    return zlib.decompress(base64.b64decode(content))
206
  else:
207
    raise AssertionError("Unknown data encoding")
208

    
209

    
210
def _CleanDirectory(path, exclude=None):
211
  """Removes all regular files in a directory.
212

213
  @type path: str
214
  @param path: the directory to clean
215
  @type exclude: list
216
  @param exclude: list of files to be excluded, defaults
217
      to the empty list
218

219
  """
220
  if path not in _ALLOWED_CLEAN_DIRS:
221
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
222
          path)
223

    
224
  if not os.path.isdir(path):
225
    return
226
  if exclude is None:
227
    exclude = []
228
  else:
229
    # Normalize excluded paths
230
    exclude = [os.path.normpath(i) for i in exclude]
231

    
232
  for rel_name in utils.ListVisibleFiles(path):
233
    full_name = utils.PathJoin(path, rel_name)
234
    if full_name in exclude:
235
      continue
236
    if os.path.isfile(full_name) and not os.path.islink(full_name):
237
      utils.RemoveFile(full_name)
238

    
239

    
240
def _BuildUploadFileList():
241
  """Build the list of allowed upload files.
242

243
  This is abstracted so that it's built only once at module import time.
244

245
  """
246
  allowed_files = set([
247
    pathutils.CLUSTER_CONF_FILE,
248
    pathutils.ETC_HOSTS,
249
    pathutils.SSH_KNOWN_HOSTS_FILE,
250
    pathutils.VNC_PASSWORD_FILE,
251
    pathutils.RAPI_CERT_FILE,
252
    pathutils.SPICE_CERT_FILE,
253
    pathutils.SPICE_CACERT_FILE,
254
    pathutils.RAPI_USERS_FILE,
255
    pathutils.CONFD_HMAC_KEY,
256
    pathutils.CLUSTER_DOMAIN_SECRET_FILE,
257
    ])
258

    
259
  for hv_name in constants.HYPER_TYPES:
260
    hv_class = hypervisor.GetHypervisorClass(hv_name)
261
    allowed_files.update(hv_class.GetAncillaryFiles()[0])
262

    
263
  assert pathutils.FILE_STORAGE_PATHS_FILE not in allowed_files, \
264
    "Allowed file storage paths should never be uploaded via RPC"
265

    
266
  return frozenset(allowed_files)
267

    
268

    
269
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
270

    
271

    
272
def JobQueuePurge():
273
  """Removes job queue files and archived jobs.
274

275
  @rtype: tuple
276
  @return: True, None
277

278
  """
279
  _CleanDirectory(pathutils.QUEUE_DIR, exclude=[pathutils.JOB_QUEUE_LOCK_FILE])
280
  _CleanDirectory(pathutils.JOB_QUEUE_ARCHIVE_DIR)
281

    
282

    
283
def GetMasterInfo():
284
  """Returns master information.
285

286
  This is an utility function to compute master information, either
287
  for consumption here or from the node daemon.
288

289
  @rtype: tuple
290
  @return: master_netdev, master_ip, master_name, primary_ip_family,
291
    master_netmask
292
  @raise RPCFail: in case of errors
293

294
  """
295
  try:
296
    cfg = _GetConfig()
297
    master_netdev = cfg.GetMasterNetdev()
298
    master_ip = cfg.GetMasterIP()
299
    master_netmask = cfg.GetMasterNetmask()
300
    master_node = cfg.GetMasterNode()
301
    primary_ip_family = cfg.GetPrimaryIPFamily()
302
  except errors.ConfigurationError, err:
303
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
304
  return (master_netdev, master_ip, master_node, primary_ip_family,
305
          master_netmask)
306

    
307

    
308
def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
309
  """Decorator that runs hooks before and after the decorated function.
310

311
  @type hook_opcode: string
312
  @param hook_opcode: opcode of the hook
313
  @type hooks_path: string
314
  @param hooks_path: path of the hooks
315
  @type env_builder_fn: function
316
  @param env_builder_fn: function that returns a dictionary containing the
317
    environment variables for the hooks. Will get all the parameters of the
318
    decorated function.
319
  @raise RPCFail: in case of pre-hook failure
320

321
  """
322
  def decorator(fn):
323
    def wrapper(*args, **kwargs):
324
      _, myself = ssconf.GetMasterAndMyself()
325
      nodes = ([myself], [myself])  # these hooks run locally
326

    
327
      env_fn = compat.partial(env_builder_fn, *args, **kwargs)
328

    
329
      cfg = _GetConfig()
330
      hr = HooksRunner()
331
      hm = hooksmaster.HooksMaster(hook_opcode, hooks_path, nodes,
332
                                   hr.RunLocalHooks, None, env_fn,
333
                                   logging.warning, cfg.GetClusterName(),
334
                                   cfg.GetMasterNode())
335
      hm.RunPhase(constants.HOOKS_PHASE_PRE)
336
      result = fn(*args, **kwargs)
337
      hm.RunPhase(constants.HOOKS_PHASE_POST)
338

    
339
      return result
340
    return wrapper
341
  return decorator
342

    
343

    
344
def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
345
  """Builds environment variables for master IP hooks.
346

347
  @type master_params: L{objects.MasterNetworkParameters}
348
  @param master_params: network parameters of the master
349
  @type use_external_mip_script: boolean
350
  @param use_external_mip_script: whether to use an external master IP
351
    address setup script (unused, but necessary per the implementation of the
352
    _RunLocalHooks decorator)
353

354
  """
355
  # pylint: disable=W0613
356
  ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
357
  env = {
358
    "MASTER_NETDEV": master_params.netdev,
359
    "MASTER_IP": master_params.ip,
360
    "MASTER_NETMASK": str(master_params.netmask),
361
    "CLUSTER_IP_VERSION": str(ver),
362
  }
363

    
364
  return env
365

    
366

    
367
def _RunMasterSetupScript(master_params, action, use_external_mip_script):
368
  """Execute the master IP address setup script.
369

370
  @type master_params: L{objects.MasterNetworkParameters}
371
  @param master_params: network parameters of the master
372
  @type action: string
373
  @param action: action to pass to the script. Must be one of
374
    L{backend._MASTER_START} or L{backend._MASTER_STOP}
375
  @type use_external_mip_script: boolean
376
  @param use_external_mip_script: whether to use an external master IP
377
    address setup script
378
  @raise backend.RPCFail: if there are errors during the execution of the
379
    script
380

381
  """
382
  env = _BuildMasterIpEnv(master_params)
383

    
384
  if use_external_mip_script:
385
    setup_script = pathutils.EXTERNAL_MASTER_SETUP_SCRIPT
386
  else:
387
    setup_script = pathutils.DEFAULT_MASTER_SETUP_SCRIPT
388

    
389
  result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
390

    
391
  if result.failed:
392
    _Fail("Failed to %s the master IP. Script return value: %s, output: '%s'" %
393
          (action, result.exit_code, result.output), log=True)
394

    
395

    
396
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
397
               _BuildMasterIpEnv)
398
def ActivateMasterIp(master_params, use_external_mip_script):
399
  """Activate the IP address of the master daemon.
400

401
  @type master_params: L{objects.MasterNetworkParameters}
402
  @param master_params: network parameters of the master
403
  @type use_external_mip_script: boolean
404
  @param use_external_mip_script: whether to use an external master IP
405
    address setup script
406
  @raise RPCFail: in case of errors during the IP startup
407

408
  """
409
  _RunMasterSetupScript(master_params, _MASTER_START,
410
                        use_external_mip_script)
411

    
412

    
413
def StartMasterDaemons(no_voting):
414
  """Activate local node as master node.
415

416
  The function will start the master daemons (ganeti-masterd and ganeti-rapi).
417

418
  @type no_voting: boolean
419
  @param no_voting: whether to start ganeti-masterd without a node vote
420
      but still non-interactively
421
  @rtype: None
422

423
  """
424

    
425
  if no_voting:
426
    masterd_args = "--no-voting --yes-do-it"
427
  else:
428
    masterd_args = ""
429

    
430
  env = {
431
    "EXTRA_MASTERD_ARGS": masterd_args,
432
    }
433

    
434
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
435
  if result.failed:
436
    msg = "Can't start Ganeti master: %s" % result.output
437
    logging.error(msg)
438
    _Fail(msg)
439

    
440

    
441
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
442
               _BuildMasterIpEnv)
443
def DeactivateMasterIp(master_params, use_external_mip_script):
444
  """Deactivate the master IP on this node.
445

446
  @type master_params: L{objects.MasterNetworkParameters}
447
  @param master_params: network parameters of the master
448
  @type use_external_mip_script: boolean
449
  @param use_external_mip_script: whether to use an external master IP
450
    address setup script
451
  @raise RPCFail: in case of errors during the IP turndown
452

453
  """
454
  _RunMasterSetupScript(master_params, _MASTER_STOP,
455
                        use_external_mip_script)
456

    
457

    
458
def StopMasterDaemons():
459
  """Stop the master daemons on this node.
460

461
  Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
462

463
  @rtype: None
464

465
  """
466
  # TODO: log and report back to the caller the error failures; we
467
  # need to decide in which case we fail the RPC for this
468

    
469
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
470
  if result.failed:
471
    logging.error("Could not stop Ganeti master, command %s had exitcode %s"
472
                  " and error %s",
473
                  result.cmd, result.exit_code, result.output)
474

    
475

    
476
def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
477
  """Change the netmask of the master IP.
478

479
  @param old_netmask: the old value of the netmask
480
  @param netmask: the new value of the netmask
481
  @param master_ip: the master IP
482
  @param master_netdev: the master network device
483

484
  """
485
  if old_netmask == netmask:
486
    return
487

    
488
  if not netutils.IPAddress.Own(master_ip):
489
    _Fail("The master IP address is not up, not attempting to change its"
490
          " netmask")
491

    
492
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
493
                         "%s/%s" % (master_ip, netmask),
494
                         "dev", master_netdev, "label",
495
                         "%s:0" % master_netdev])
496
  if result.failed:
497
    _Fail("Could not set the new netmask on the master IP address")
498

    
499
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
500
                         "%s/%s" % (master_ip, old_netmask),
501
                         "dev", master_netdev, "label",
502
                         "%s:0" % master_netdev])
503
  if result.failed:
504
    _Fail("Could not bring down the master IP address with the old netmask")
505

    
506

    
507
def EtcHostsModify(mode, host, ip):
508
  """Modify a host entry in /etc/hosts.
509

510
  @param mode: The mode to operate. Either add or remove entry
511
  @param host: The host to operate on
512
  @param ip: The ip associated with the entry
513

514
  """
515
  if mode == constants.ETC_HOSTS_ADD:
516
    if not ip:
517
      RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
518
              " present")
519
    utils.AddHostToEtcHosts(host, ip)
520
  elif mode == constants.ETC_HOSTS_REMOVE:
521
    if ip:
522
      RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
523
              " parameter is present")
524
    utils.RemoveHostFromEtcHosts(host)
525
  else:
526
    RPCFail("Mode not supported")
527

    
528

    
529
def LeaveCluster(modify_ssh_setup):
530
  """Cleans up and remove the current node.
531

532
  This function cleans up and prepares the current node to be removed
533
  from the cluster.
534

535
  If processing is successful, then it raises an
536
  L{errors.QuitGanetiException} which is used as a special case to
537
  shutdown the node daemon.
538

539
  @param modify_ssh_setup: boolean
540

541
  """
542
  _CleanDirectory(pathutils.DATA_DIR)
543
  _CleanDirectory(pathutils.CRYPTO_KEYS_DIR)
544
  JobQueuePurge()
545

    
546
  if modify_ssh_setup:
547
    try:
548
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
549

    
550
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
551

    
552
      utils.RemoveFile(priv_key)
553
      utils.RemoveFile(pub_key)
554
    except errors.OpExecError:
555
      logging.exception("Error while processing ssh files")
556

    
557
  try:
558
    utils.RemoveFile(pathutils.CONFD_HMAC_KEY)
559
    utils.RemoveFile(pathutils.RAPI_CERT_FILE)
560
    utils.RemoveFile(pathutils.SPICE_CERT_FILE)
561
    utils.RemoveFile(pathutils.SPICE_CACERT_FILE)
562
    utils.RemoveFile(pathutils.NODED_CERT_FILE)
563
  except: # pylint: disable=W0702
564
    logging.exception("Error while removing cluster secrets")
565

    
566
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.CONFD])
567
  if result.failed:
568
    logging.error("Command %s failed with exitcode %s and error %s",
569
                  result.cmd, result.exit_code, result.output)
570

    
571
  # Raise a custom exception (handled in ganeti-noded)
572
  raise errors.QuitGanetiException(True, "Shutdown scheduled")
573

    
574

    
575
def _GetVgInfo(name, excl_stor):
576
  """Retrieves information about a LVM volume group.
577

578
  """
579
  # TODO: GetVGInfo supports returning information for multiple VGs at once
580
  vginfo = bdev.LogicalVolume.GetVGInfo([name], excl_stor)
581
  if vginfo:
582
    vg_free = int(round(vginfo[0][0], 0))
583
    vg_size = int(round(vginfo[0][1], 0))
584
  else:
585
    vg_free = None
586
    vg_size = None
587

    
588
  return {
589
    "name": name,
590
    "vg_free": vg_free,
591
    "vg_size": vg_size,
592
    }
593

    
594

    
595
def _GetVgSpindlesInfo(name, excl_stor):
596
  """Retrieves information about spindles in an LVM volume group.
597

598
  @type name: string
599
  @param name: VG name
600
  @type excl_stor: bool
601
  @param excl_stor: exclusive storage
602
  @rtype: dict
603
  @return: dictionary whose keys are "name", "vg_free", "vg_size" for VG name,
604
      free spindles, total spindles respectively
605

606
  """
607
  if excl_stor:
608
    (vg_free, vg_size) = bdev.LogicalVolume.GetVgSpindlesInfo(name)
609
  else:
610
    vg_free = 0
611
    vg_size = 0
612
  return {
613
    "name": name,
614
    "vg_free": vg_free,
615
    "vg_size": vg_size,
616
    }
617

    
618

    
619
def _GetHvInfo(name):
620
  """Retrieves node information from a hypervisor.
621

622
  The information returned depends on the hypervisor. Common items:
623

624
    - vg_size is the size of the configured volume group in MiB
625
    - vg_free is the free size of the volume group in MiB
626
    - memory_dom0 is the memory allocated for domain0 in MiB
627
    - memory_free is the currently available (free) ram in MiB
628
    - memory_total is the total number of ram in MiB
629
    - hv_version: the hypervisor version, if available
630

631
  """
632
  return hypervisor.GetHypervisor(name).GetNodeInfo()
633

    
634

    
635
def _GetNamedNodeInfo(names, fn):
636
  """Calls C{fn} for all names in C{names} and returns a dictionary.
637

638
  @rtype: None or dict
639

640
  """
641
  if names is None:
642
    return None
643
  else:
644
    return map(fn, names)
645

    
646

    
647
def GetNodeInfo(storage_units, hv_names, excl_stor):
648
  """Gives back a hash with different information about the node.
649

650
  @type storage_units: list of pairs (string, string)
651
  @param storage_units: List of pairs (storage unit, identifier) to ask for disk
652
                        space information. In case of lvm-vg, the identifier is
653
                        the VG name.
654
  @type hv_names: list of string
655
  @param hv_names: Names of the hypervisors to ask for node information
656
  @type excl_stor: boolean
657
  @param excl_stor: Whether exclusive_storage is active
658
  @rtype: tuple; (string, None/dict, None/dict)
659
  @return: Tuple containing boot ID, volume group information and hypervisor
660
    information
661

662
  """
663
  bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
664
  storage_info = _GetNamedNodeInfo(
665
    storage_units,
666
    (lambda storage_unit: _ApplyStorageInfoFunction(storage_unit[0],
667
                                                    storage_unit[1],
668
                                                    excl_stor)))
669
  hv_info = _GetNamedNodeInfo(hv_names, _GetHvInfo)
670

    
671
  return (bootid, storage_info, hv_info)
672

    
673

    
674
# FIXME: implement storage reporting for all missing storage types.
675
_STORAGE_TYPE_INFO_FN = {
676
  constants.ST_BLOCK: None,
677
  constants.ST_DISKLESS: None,
678
  constants.ST_EXT: None,
679
  constants.ST_FILE: None,
680
  constants.ST_LVM_PV: _GetVgSpindlesInfo,
681
  constants.ST_LVM_VG: _GetVgInfo,
682
  constants.ST_RADOS: None,
683
}
684

    
685

    
686
def _ApplyStorageInfoFunction(storage_type, storage_key, *args):
687
  """Looks up and applies the correct function to calculate free and total
688
  storage for the given storage type.
689

690
  @type storage_type: string
691
  @param storage_type: the storage type for which the storage shall be reported.
692
  @type storage_key: string
693
  @param storage_key: identifier of a storage unit, e.g. the volume group name
694
    of an LVM storage unit
695
  @type args: any
696
  @param args: various parameters that can be used for storage reporting. These
697
    parameters and their semantics vary from storage type to storage type and
698
    are just propagated in this function.
699
  @return: the results of the application of the storage space function (see
700
    _STORAGE_TYPE_INFO_FN) if storage space reporting is implemented for that
701
    storage type
702
  @raises NotImplementedError: for storage types who don't support space
703
    reporting yet
704
  """
705
  fn = _STORAGE_TYPE_INFO_FN[storage_type]
706
  if fn is not None:
707
    return fn(storage_key, *args)
708
  else:
709
    raise NotImplementedError
710

    
711

    
712
def _CheckExclusivePvs(pvi_list):
713
  """Check that PVs are not shared among LVs
714

715
  @type pvi_list: list of L{objects.LvmPvInfo} objects
716
  @param pvi_list: information about the PVs
717

718
  @rtype: list of tuples (string, list of strings)
719
  @return: offending volumes, as tuples: (pv_name, [lv1_name, lv2_name...])
720

721
  """
722
  res = []
723
  for pvi in pvi_list:
724
    if len(pvi.lv_list) > 1:
725
      res.append((pvi.name, pvi.lv_list))
726
  return res
727

    
728

    
729
def VerifyNode(what, cluster_name):
730
  """Verify the status of the local node.
731

732
  Based on the input L{what} parameter, various checks are done on the
733
  local node.
734

735
  If the I{filelist} key is present, this list of
736
  files is checksummed and the file/checksum pairs are returned.
737

738
  If the I{nodelist} key is present, we check that we have
739
  connectivity via ssh with the target nodes (and check the hostname
740
  report).
741

742
  If the I{node-net-test} key is present, we check that we have
743
  connectivity to the given nodes via both primary IP and, if
744
  applicable, secondary IPs.
745

746
  @type what: C{dict}
747
  @param what: a dictionary of things to check:
748
      - filelist: list of files for which to compute checksums
749
      - nodelist: list of nodes we should check ssh communication with
750
      - node-net-test: list of nodes we should check node daemon port
751
        connectivity with
752
      - hypervisor: list with hypervisors to run the verify for
753
  @rtype: dict
754
  @return: a dictionary with the same keys as the input dict, and
755
      values representing the result of the checks
756

757
  """
758
  result = {}
759
  my_name = netutils.Hostname.GetSysName()
760
  port = netutils.GetDaemonPort(constants.NODED)
761
  vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
762

    
763
  if constants.NV_HYPERVISOR in what and vm_capable:
764
    result[constants.NV_HYPERVISOR] = tmp = {}
765
    for hv_name in what[constants.NV_HYPERVISOR]:
766
      try:
767
        val = hypervisor.GetHypervisor(hv_name).Verify()
768
      except errors.HypervisorError, err:
769
        val = "Error while checking hypervisor: %s" % str(err)
770
      tmp[hv_name] = val
771

    
772
  if constants.NV_HVPARAMS in what and vm_capable:
773
    result[constants.NV_HVPARAMS] = tmp = []
774
    for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
775
      try:
776
        logging.info("Validating hv %s, %s", hv_name, hvparms)
777
        hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
778
      except errors.HypervisorError, err:
779
        tmp.append((source, hv_name, str(err)))
780

    
781
  if constants.NV_FILELIST in what:
782
    fingerprints = utils.FingerprintFiles(map(vcluster.LocalizeVirtualPath,
783
                                              what[constants.NV_FILELIST]))
784
    result[constants.NV_FILELIST] = \
785
      dict((vcluster.MakeVirtualPath(key), value)
786
           for (key, value) in fingerprints.items())
787

    
788
  if constants.NV_NODELIST in what:
789
    (nodes, bynode) = what[constants.NV_NODELIST]
790

    
791
    # Add nodes from other groups (different for each node)
792
    try:
793
      nodes.extend(bynode[my_name])
794
    except KeyError:
795
      pass
796

    
797
    # Use a random order
798
    random.shuffle(nodes)
799

    
800
    # Try to contact all nodes
801
    val = {}
802
    for node in nodes:
803
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
804
      if not success:
805
        val[node] = message
806

    
807
    result[constants.NV_NODELIST] = val
808

    
809
  if constants.NV_NODENETTEST in what:
810
    result[constants.NV_NODENETTEST] = tmp = {}
811
    my_pip = my_sip = None
812
    for name, pip, sip in what[constants.NV_NODENETTEST]:
813
      if name == my_name:
814
        my_pip = pip
815
        my_sip = sip
816
        break
817
    if not my_pip:
818
      tmp[my_name] = ("Can't find my own primary/secondary IP"
819
                      " in the node list")
820
    else:
821
      for name, pip, sip in what[constants.NV_NODENETTEST]:
822
        fail = []
823
        if not netutils.TcpPing(pip, port, source=my_pip):
824
          fail.append("primary")
825
        if sip != pip:
826
          if not netutils.TcpPing(sip, port, source=my_sip):
827
            fail.append("secondary")
828
        if fail:
829
          tmp[name] = ("failure using the %s interface(s)" %
830
                       " and ".join(fail))
831

    
832
  if constants.NV_MASTERIP in what:
833
    # FIXME: add checks on incoming data structures (here and in the
834
    # rest of the function)
835
    master_name, master_ip = what[constants.NV_MASTERIP]
836
    if master_name == my_name:
837
      source = constants.IP4_ADDRESS_LOCALHOST
838
    else:
839
      source = None
840
    result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
841
                                                     source=source)
842

    
843
  if constants.NV_USERSCRIPTS in what:
844
    result[constants.NV_USERSCRIPTS] = \
845
      [script for script in what[constants.NV_USERSCRIPTS]
846
       if not utils.IsExecutable(script)]
847

    
848
  if constants.NV_OOB_PATHS in what:
849
    result[constants.NV_OOB_PATHS] = tmp = []
850
    for path in what[constants.NV_OOB_PATHS]:
851
      try:
852
        st = os.stat(path)
853
      except OSError, err:
854
        tmp.append("error stating out of band helper: %s" % err)
855
      else:
856
        if stat.S_ISREG(st.st_mode):
857
          if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
858
            tmp.append(None)
859
          else:
860
            tmp.append("out of band helper %s is not executable" % path)
861
        else:
862
          tmp.append("out of band helper %s is not a file" % path)
863

    
864
  if constants.NV_LVLIST in what and vm_capable:
865
    try:
866
      val = GetVolumeList(utils.ListVolumeGroups().keys())
867
    except RPCFail, err:
868
      val = str(err)
869
    result[constants.NV_LVLIST] = val
870

    
871
  if constants.NV_INSTANCELIST in what and vm_capable:
872
    # GetInstanceList can fail
873
    try:
874
      val = GetInstanceList(what[constants.NV_INSTANCELIST])
875
    except RPCFail, err:
876
      val = str(err)
877
    result[constants.NV_INSTANCELIST] = val
878

    
879
  if constants.NV_VGLIST in what and vm_capable:
880
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
881

    
882
  if constants.NV_PVLIST in what and vm_capable:
883
    check_exclusive_pvs = constants.NV_EXCLUSIVEPVS in what
884
    val = bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
885
                                       filter_allocatable=False,
886
                                       include_lvs=check_exclusive_pvs)
887
    if check_exclusive_pvs:
888
      result[constants.NV_EXCLUSIVEPVS] = _CheckExclusivePvs(val)
889
      for pvi in val:
890
        # Avoid sending useless data on the wire
891
        pvi.lv_list = []
892
    result[constants.NV_PVLIST] = map(objects.LvmPvInfo.ToDict, val)
893

    
894
  if constants.NV_VERSION in what:
895
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
896
                                    constants.RELEASE_VERSION)
897

    
898
  if constants.NV_HVINFO in what and vm_capable:
899
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
900
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
901

    
902
  if constants.NV_DRBDVERSION in what and vm_capable:
903
    try:
904
      drbd_version = DRBD8.GetProcInfo().GetVersionString()
905
    except errors.BlockDeviceError, err:
906
      logging.warning("Can't get DRBD version", exc_info=True)
907
      drbd_version = str(err)
908
    result[constants.NV_DRBDVERSION] = drbd_version
909

    
910
  if constants.NV_DRBDLIST in what and vm_capable:
911
    try:
912
      used_minors = drbd.DRBD8.GetUsedDevs()
913
    except errors.BlockDeviceError, err:
914
      logging.warning("Can't get used minors list", exc_info=True)
915
      used_minors = str(err)
916
    result[constants.NV_DRBDLIST] = used_minors
917

    
918
  if constants.NV_DRBDHELPER in what and vm_capable:
919
    status = True
920
    try:
921
      payload = drbd.DRBD8.GetUsermodeHelper()
922
    except errors.BlockDeviceError, err:
923
      logging.error("Can't get DRBD usermode helper: %s", str(err))
924
      status = False
925
      payload = str(err)
926
    result[constants.NV_DRBDHELPER] = (status, payload)
927

    
928
  if constants.NV_NODESETUP in what:
929
    result[constants.NV_NODESETUP] = tmpr = []
930
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
931
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
932
                  " under /sys, missing required directories /sys/block"
933
                  " and /sys/class/net")
934
    if (not os.path.isdir("/proc/sys") or
935
        not os.path.isfile("/proc/sysrq-trigger")):
936
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
937
                  " under /proc, missing required directory /proc/sys and"
938
                  " the file /proc/sysrq-trigger")
939

    
940
  if constants.NV_TIME in what:
941
    result[constants.NV_TIME] = utils.SplitTime(time.time())
942

    
943
  if constants.NV_OSLIST in what and vm_capable:
944
    result[constants.NV_OSLIST] = DiagnoseOS()
945

    
946
  if constants.NV_BRIDGES in what and vm_capable:
947
    result[constants.NV_BRIDGES] = [bridge
948
                                    for bridge in what[constants.NV_BRIDGES]
949
                                    if not utils.BridgeExists(bridge)]
950

    
951
  if what.get(constants.NV_FILE_STORAGE_PATHS) == my_name:
952
    result[constants.NV_FILE_STORAGE_PATHS] = \
953
      bdev.ComputeWrongFileStoragePaths()
954

    
955
  return result
956

    
957

    
958
def GetBlockDevSizes(devices):
959
  """Return the size of the given block devices
960

961
  @type devices: list
962
  @param devices: list of block device nodes to query
963
  @rtype: dict
964
  @return:
965
    dictionary of all block devices under /dev (key). The value is their
966
    size in MiB.
967

968
    {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
969

970
  """
971
  DEV_PREFIX = "/dev/"
972
  blockdevs = {}
973

    
974
  for devpath in devices:
975
    if not utils.IsBelowDir(DEV_PREFIX, devpath):
976
      continue
977

    
978
    try:
979
      st = os.stat(devpath)
980
    except EnvironmentError, err:
981
      logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
982
      continue
983

    
984
    if stat.S_ISBLK(st.st_mode):
985
      result = utils.RunCmd(["blockdev", "--getsize64", devpath])
986
      if result.failed:
987
        # We don't want to fail, just do not list this device as available
988
        logging.warning("Cannot get size for block device %s", devpath)
989
        continue
990

    
991
      size = int(result.stdout) / (1024 * 1024)
992
      blockdevs[devpath] = size
993
  return blockdevs
994

    
995

    
996
def GetVolumeList(vg_names):
997
  """Compute list of logical volumes and their size.
998

999
  @type vg_names: list
1000
  @param vg_names: the volume groups whose LVs we should list, or
1001
      empty for all volume groups
1002
  @rtype: dict
1003
  @return:
1004
      dictionary of all partions (key) with value being a tuple of
1005
      their size (in MiB), inactive and online status::
1006

1007
        {'xenvg/test1': ('20.06', True, True)}
1008

1009
      in case of errors, a string is returned with the error
1010
      details.
1011

1012
  """
1013
  lvs = {}
1014
  sep = "|"
1015
  if not vg_names:
1016
    vg_names = []
1017
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
1018
                         "--separator=%s" % sep,
1019
                         "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
1020
  if result.failed:
1021
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
1022

    
1023
  for line in result.stdout.splitlines():
1024
    line = line.strip()
1025
    match = _LVSLINE_REGEX.match(line)
1026
    if not match:
1027
      logging.error("Invalid line returned from lvs output: '%s'", line)
1028
      continue
1029
    vg_name, name, size, attr = match.groups()
1030
    inactive = attr[4] == "-"
1031
    online = attr[5] == "o"
1032
    virtual = attr[0] == "v"
1033
    if virtual:
1034
      # we don't want to report such volumes as existing, since they
1035
      # don't really hold data
1036
      continue
1037
    lvs[vg_name + "/" + name] = (size, inactive, online)
1038

    
1039
  return lvs
1040

    
1041

    
1042
def ListVolumeGroups():
1043
  """List the volume groups and their size.
1044

1045
  @rtype: dict
1046
  @return: dictionary with keys volume name and values the
1047
      size of the volume
1048

1049
  """
1050
  return utils.ListVolumeGroups()
1051

    
1052

    
1053
def NodeVolumes():
1054
  """List all volumes on this node.
1055

1056
  @rtype: list
1057
  @return:
1058
    A list of dictionaries, each having four keys:
1059
      - name: the logical volume name,
1060
      - size: the size of the logical volume
1061
      - dev: the physical device on which the LV lives
1062
      - vg: the volume group to which it belongs
1063

1064
    In case of errors, we return an empty list and log the
1065
    error.
1066

1067
    Note that since a logical volume can live on multiple physical
1068
    volumes, the resulting list might include a logical volume
1069
    multiple times.
1070

1071
  """
1072
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
1073
                         "--separator=|",
1074
                         "--options=lv_name,lv_size,devices,vg_name"])
1075
  if result.failed:
1076
    _Fail("Failed to list logical volumes, lvs output: %s",
1077
          result.output)
1078

    
1079
  def parse_dev(dev):
1080
    return dev.split("(")[0]
1081

    
1082
  def handle_dev(dev):
1083
    return [parse_dev(x) for x in dev.split(",")]
1084

    
1085
  def map_line(line):
1086
    line = [v.strip() for v in line]
1087
    return [{"name": line[0], "size": line[1],
1088
             "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
1089

    
1090
  all_devs = []
1091
  for line in result.stdout.splitlines():
1092
    if line.count("|") >= 3:
1093
      all_devs.extend(map_line(line.split("|")))
1094
    else:
1095
      logging.warning("Strange line in the output from lvs: '%s'", line)
1096
  return all_devs
1097

    
1098

    
1099
def BridgesExist(bridges_list):
1100
  """Check if a list of bridges exist on the current node.
1101

1102
  @rtype: boolean
1103
  @return: C{True} if all of them exist, C{False} otherwise
1104

1105
  """
1106
  missing = []
1107
  for bridge in bridges_list:
1108
    if not utils.BridgeExists(bridge):
1109
      missing.append(bridge)
1110

    
1111
  if missing:
1112
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
1113

    
1114

    
1115
def GetInstanceList(hypervisor_list, all_hvparams=None,
1116
                    get_hv_fn=hypervisor.GetHypervisor):
1117
  """Provides a list of instances.
1118

1119
  @type hypervisor_list: list
1120
  @param hypervisor_list: the list of hypervisors to query information
1121
  @type all_hvparams: dict of dict of strings
1122
  @param all_hvparams: a dictionary mapping hypervisor types to respective
1123
    cluster-wide hypervisor parameters
1124
  @type get_hv_fn: function
1125
  @param get_hv_fn: function that returns a hypervisor for the given hypervisor
1126
    name; optional parameter to increase testability
1127

1128
  @rtype: list
1129
  @return: a list of all running instances on the current node
1130
    - instance1.example.com
1131
    - instance2.example.com
1132

1133
  """
1134
  results = []
1135
  for hname in hypervisor_list:
1136
    try:
1137
      hvparams = None
1138
      if all_hvparams is not None:
1139
        hvparams = all_hvparams[hname]
1140
      hv = get_hv_fn(hname)
1141
      names = hv.ListInstances(hvparams)
1142
      results.extend(names)
1143
    except errors.HypervisorError, err:
1144
      _Fail("Error enumerating instances (hypervisor %s): %s",
1145
            hname, err, exc=True)
1146

    
1147
  return results
1148

    
1149

    
1150
def GetInstanceInfo(instance, hname):
1151
  """Gives back the information about an instance as a dictionary.
1152

1153
  @type instance: string
1154
  @param instance: the instance name
1155
  @type hname: string
1156
  @param hname: the hypervisor type of the instance
1157

1158
  @rtype: dict
1159
  @return: dictionary with the following keys:
1160
      - memory: memory size of instance (int)
1161
      - state: xen state of instance (string)
1162
      - time: cpu time of instance (float)
1163
      - vcpus: the number of vcpus (int)
1164

1165
  """
1166
  output = {}
1167

    
1168
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
1169
  if iinfo is not None:
1170
    output["memory"] = iinfo[2]
1171
    output["vcpus"] = iinfo[3]
1172
    output["state"] = iinfo[4]
1173
    output["time"] = iinfo[5]
1174

    
1175
  return output
1176

    
1177

    
1178
def GetInstanceMigratable(instance):
1179
  """Gives whether an instance can be migrated.
1180

1181
  @type instance: L{objects.Instance}
1182
  @param instance: object representing the instance to be checked.
1183

1184
  @rtype: tuple
1185
  @return: tuple of (result, description) where:
1186
      - result: whether the instance can be migrated or not
1187
      - description: a description of the issue, if relevant
1188

1189
  """
1190
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1191
  iname = instance.name
1192
  if iname not in hyper.ListInstances():
1193
    _Fail("Instance %s is not running", iname)
1194

    
1195
  for idx in range(len(instance.disks)):
1196
    link_name = _GetBlockDevSymlinkPath(iname, idx)
1197
    if not os.path.islink(link_name):
1198
      logging.warning("Instance %s is missing symlink %s for disk %d",
1199
                      iname, link_name, idx)
1200

    
1201

    
1202
def GetAllInstancesInfo(hypervisor_list):
1203
  """Gather data about all instances.
1204

1205
  This is the equivalent of L{GetInstanceInfo}, except that it
1206
  computes data for all instances at once, thus being faster if one
1207
  needs data about more than one instance.
1208

1209
  @type hypervisor_list: list
1210
  @param hypervisor_list: list of hypervisors to query for instance data
1211

1212
  @rtype: dict
1213
  @return: dictionary of instance: data, with data having the following keys:
1214
      - memory: memory size of instance (int)
1215
      - state: xen state of instance (string)
1216
      - time: cpu time of instance (float)
1217
      - vcpus: the number of vcpus
1218

1219
  """
1220
  output = {}
1221

    
1222
  for hname in hypervisor_list:
1223
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
1224
    if iinfo:
1225
      for name, _, memory, vcpus, state, times in iinfo:
1226
        value = {
1227
          "memory": memory,
1228
          "vcpus": vcpus,
1229
          "state": state,
1230
          "time": times,
1231
          }
1232
        if name in output:
1233
          # we only check static parameters, like memory and vcpus,
1234
          # and not state and time which can change between the
1235
          # invocations of the different hypervisors
1236
          for key in "memory", "vcpus":
1237
            if value[key] != output[name][key]:
1238
              _Fail("Instance %s is running twice"
1239
                    " with different parameters", name)
1240
        output[name] = value
1241

    
1242
  return output
1243

    
1244

    
1245
def _InstanceLogName(kind, os_name, instance, component):
1246
  """Compute the OS log filename for a given instance and operation.
1247

1248
  The instance name and os name are passed in as strings since not all
1249
  operations have these as part of an instance object.
1250

1251
  @type kind: string
1252
  @param kind: the operation type (e.g. add, import, etc.)
1253
  @type os_name: string
1254
  @param os_name: the os name
1255
  @type instance: string
1256
  @param instance: the name of the instance being imported/added/etc.
1257
  @type component: string or None
1258
  @param component: the name of the component of the instance being
1259
      transferred
1260

1261
  """
1262
  # TODO: Use tempfile.mkstemp to create unique filename
1263
  if component:
1264
    assert "/" not in component
1265
    c_msg = "-%s" % component
1266
  else:
1267
    c_msg = ""
1268
  base = ("%s-%s-%s%s-%s.log" %
1269
          (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1270
  return utils.PathJoin(pathutils.LOG_OS_DIR, base)
1271

    
1272

    
1273
def InstanceOsAdd(instance, reinstall, debug):
1274
  """Add an OS to an instance.
1275

1276
  @type instance: L{objects.Instance}
1277
  @param instance: Instance whose OS is to be installed
1278
  @type reinstall: boolean
1279
  @param reinstall: whether this is an instance reinstall
1280
  @type debug: integer
1281
  @param debug: debug level, passed to the OS scripts
1282
  @rtype: None
1283

1284
  """
1285
  inst_os = OSFromDisk(instance.os)
1286

    
1287
  create_env = OSEnvironment(instance, inst_os, debug)
1288
  if reinstall:
1289
    create_env["INSTANCE_REINSTALL"] = "1"
1290

    
1291
  logfile = _InstanceLogName("add", instance.os, instance.name, None)
1292

    
1293
  result = utils.RunCmd([inst_os.create_script], env=create_env,
1294
                        cwd=inst_os.path, output=logfile, reset_env=True)
1295
  if result.failed:
1296
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
1297
                  " output: %s", result.cmd, result.fail_reason, logfile,
1298
                  result.output)
1299
    lines = [utils.SafeEncode(val)
1300
             for val in utils.TailFile(logfile, lines=20)]
1301
    _Fail("OS create script failed (%s), last lines in the"
1302
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1303

    
1304

    
1305
def RunRenameInstance(instance, old_name, debug):
1306
  """Run the OS rename script for an instance.
1307

1308
  @type instance: L{objects.Instance}
1309
  @param instance: Instance whose OS is to be installed
1310
  @type old_name: string
1311
  @param old_name: previous instance name
1312
  @type debug: integer
1313
  @param debug: debug level, passed to the OS scripts
1314
  @rtype: boolean
1315
  @return: the success of the operation
1316

1317
  """
1318
  inst_os = OSFromDisk(instance.os)
1319

    
1320
  rename_env = OSEnvironment(instance, inst_os, debug)
1321
  rename_env["OLD_INSTANCE_NAME"] = old_name
1322

    
1323
  logfile = _InstanceLogName("rename", instance.os,
1324
                             "%s-%s" % (old_name, instance.name), None)
1325

    
1326
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1327
                        cwd=inst_os.path, output=logfile, reset_env=True)
1328

    
1329
  if result.failed:
1330
    logging.error("os create command '%s' returned error: %s output: %s",
1331
                  result.cmd, result.fail_reason, result.output)
1332
    lines = [utils.SafeEncode(val)
1333
             for val in utils.TailFile(logfile, lines=20)]
1334
    _Fail("OS rename script failed (%s), last lines in the"
1335
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1336

    
1337

    
1338
def _GetBlockDevSymlinkPath(instance_name, idx, _dir=None):
1339
  """Returns symlink path for block device.
1340

1341
  """
1342
  if _dir is None:
1343
    _dir = pathutils.DISK_LINKS_DIR
1344

    
1345
  return utils.PathJoin(_dir,
1346
                        ("%s%s%s" %
1347
                         (instance_name, constants.DISK_SEPARATOR, idx)))
1348

    
1349

    
1350
def _SymlinkBlockDev(instance_name, device_path, idx):
1351
  """Set up symlinks to a instance's block device.
1352

1353
  This is an auxiliary function run when an instance is start (on the primary
1354
  node) or when an instance is migrated (on the target node).
1355

1356

1357
  @param instance_name: the name of the target instance
1358
  @param device_path: path of the physical block device, on the node
1359
  @param idx: the disk index
1360
  @return: absolute path to the disk's symlink
1361

1362
  """
1363
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1364
  try:
1365
    os.symlink(device_path, link_name)
1366
  except OSError, err:
1367
    if err.errno == errno.EEXIST:
1368
      if (not os.path.islink(link_name) or
1369
          os.readlink(link_name) != device_path):
1370
        os.remove(link_name)
1371
        os.symlink(device_path, link_name)
1372
    else:
1373
      raise
1374

    
1375
  return link_name
1376

    
1377

    
1378
def _RemoveBlockDevLinks(instance_name, disks):
1379
  """Remove the block device symlinks belonging to the given instance.
1380

1381
  """
1382
  for idx, _ in enumerate(disks):
1383
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1384
    if os.path.islink(link_name):
1385
      try:
1386
        os.remove(link_name)
1387
      except OSError:
1388
        logging.exception("Can't remove symlink '%s'", link_name)
1389

    
1390

    
1391
def _GatherAndLinkBlockDevs(instance):
1392
  """Set up an instance's block device(s).
1393

1394
  This is run on the primary node at instance startup. The block
1395
  devices must be already assembled.
1396

1397
  @type instance: L{objects.Instance}
1398
  @param instance: the instance whose disks we shoul assemble
1399
  @rtype: list
1400
  @return: list of (disk_object, device_path)
1401

1402
  """
1403
  block_devices = []
1404
  for idx, disk in enumerate(instance.disks):
1405
    device = _RecursiveFindBD(disk)
1406
    if device is None:
1407
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1408
                                    str(disk))
1409
    device.Open()
1410
    try:
1411
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1412
    except OSError, e:
1413
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1414
                                    e.strerror)
1415

    
1416
    block_devices.append((disk, link_name))
1417

    
1418
  return block_devices
1419

    
1420

    
1421
def StartInstance(instance, startup_paused, reason, store_reason=True):
1422
  """Start an instance.
1423

1424
  @type instance: L{objects.Instance}
1425
  @param instance: the instance object
1426
  @type startup_paused: bool
1427
  @param instance: pause instance at startup?
1428
  @type reason: list of reasons
1429
  @param reason: the reason trail for this startup
1430
  @type store_reason: boolean
1431
  @param store_reason: whether to store the shutdown reason trail on file
1432
  @rtype: None
1433

1434
  """
1435
  running_instances = GetInstanceList([instance.hypervisor])
1436

    
1437
  if instance.name in running_instances:
1438
    logging.info("Instance %s already running, not starting", instance.name)
1439
    return
1440

    
1441
  try:
1442
    block_devices = _GatherAndLinkBlockDevs(instance)
1443
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1444
    hyper.StartInstance(instance, block_devices, startup_paused)
1445
    if store_reason:
1446
      _StoreInstReasonTrail(instance.name, reason)
1447
  except errors.BlockDeviceError, err:
1448
    _Fail("Block device error: %s", err, exc=True)
1449
  except errors.HypervisorError, err:
1450
    _RemoveBlockDevLinks(instance.name, instance.disks)
1451
    _Fail("Hypervisor error: %s", err, exc=True)
1452

    
1453

    
1454
def InstanceShutdown(instance, timeout, reason, store_reason=True):
1455
  """Shut an instance down.
1456

1457
  @note: this functions uses polling with a hardcoded timeout.
1458

1459
  @type instance: L{objects.Instance}
1460
  @param instance: the instance object
1461
  @type timeout: integer
1462
  @param timeout: maximum timeout for soft shutdown
1463
  @type reason: list of reasons
1464
  @param reason: the reason trail for this shutdown
1465
  @type store_reason: boolean
1466
  @param store_reason: whether to store the shutdown reason trail on file
1467
  @rtype: None
1468

1469
  """
1470
  hv_name = instance.hypervisor
1471
  hyper = hypervisor.GetHypervisor(hv_name)
1472
  iname = instance.name
1473

    
1474
  if instance.name not in hyper.ListInstances():
1475
    logging.info("Instance %s not running, doing nothing", iname)
1476
    return
1477

    
1478
  class _TryShutdown:
1479
    def __init__(self):
1480
      self.tried_once = False
1481

    
1482
    def __call__(self):
1483
      if iname not in hyper.ListInstances():
1484
        return
1485

    
1486
      try:
1487
        hyper.StopInstance(instance, retry=self.tried_once)
1488
        if store_reason:
1489
          _StoreInstReasonTrail(instance.name, reason)
1490
      except errors.HypervisorError, err:
1491
        if iname not in hyper.ListInstances():
1492
          # if the instance is no longer existing, consider this a
1493
          # success and go to cleanup
1494
          return
1495

    
1496
        _Fail("Failed to stop instance %s: %s", iname, err)
1497

    
1498
      self.tried_once = True
1499

    
1500
      raise utils.RetryAgain()
1501

    
1502
  try:
1503
    utils.Retry(_TryShutdown(), 5, timeout)
1504
  except utils.RetryTimeout:
1505
    # the shutdown did not succeed
1506
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1507

    
1508
    try:
1509
      hyper.StopInstance(instance, force=True)
1510
    except errors.HypervisorError, err:
1511
      if iname in hyper.ListInstances():
1512
        # only raise an error if the instance still exists, otherwise
1513
        # the error could simply be "instance ... unknown"!
1514
        _Fail("Failed to force stop instance %s: %s", iname, err)
1515

    
1516
    time.sleep(1)
1517

    
1518
    if iname in hyper.ListInstances():
1519
      _Fail("Could not shutdown instance %s even by destroy", iname)
1520

    
1521
  try:
1522
    hyper.CleanupInstance(instance.name)
1523
  except errors.HypervisorError, err:
1524
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1525

    
1526
  _RemoveBlockDevLinks(iname, instance.disks)
1527

    
1528

    
1529
def InstanceReboot(instance, reboot_type, shutdown_timeout, reason):
1530
  """Reboot an instance.
1531

1532
  @type instance: L{objects.Instance}
1533
  @param instance: the instance object to reboot
1534
  @type reboot_type: str
1535
  @param reboot_type: the type of reboot, one the following
1536
    constants:
1537
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1538
        instance OS, do not recreate the VM
1539
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1540
        restart the VM (at the hypervisor level)
1541
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1542
        not accepted here, since that mode is handled differently, in
1543
        cmdlib, and translates into full stop and start of the
1544
        instance (instead of a call_instance_reboot RPC)
1545
  @type shutdown_timeout: integer
1546
  @param shutdown_timeout: maximum timeout for soft shutdown
1547
  @type reason: list of reasons
1548
  @param reason: the reason trail for this reboot
1549
  @rtype: None
1550

1551
  """
1552
  running_instances = GetInstanceList([instance.hypervisor])
1553

    
1554
  if instance.name not in running_instances:
1555
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1556

    
1557
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1558
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1559
    try:
1560
      hyper.RebootInstance(instance)
1561
    except errors.HypervisorError, err:
1562
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1563
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1564
    try:
1565
      InstanceShutdown(instance, shutdown_timeout, reason, store_reason=False)
1566
      result = StartInstance(instance, False, reason, store_reason=False)
1567
      _StoreInstReasonTrail(instance.name, reason)
1568
      return result
1569
    except errors.HypervisorError, err:
1570
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1571
  else:
1572
    _Fail("Invalid reboot_type received: %s", reboot_type)
1573

    
1574

    
1575
def InstanceBalloonMemory(instance, memory):
1576
  """Resize an instance's memory.
1577

1578
  @type instance: L{objects.Instance}
1579
  @param instance: the instance object
1580
  @type memory: int
1581
  @param memory: new memory amount in MB
1582
  @rtype: None
1583

1584
  """
1585
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1586
  running = hyper.ListInstances()
1587
  if instance.name not in running:
1588
    logging.info("Instance %s is not running, cannot balloon", instance.name)
1589
    return
1590
  try:
1591
    hyper.BalloonInstanceMemory(instance, memory)
1592
  except errors.HypervisorError, err:
1593
    _Fail("Failed to balloon instance memory: %s", err, exc=True)
1594

    
1595

    
1596
def MigrationInfo(instance):
1597
  """Gather information about an instance to be migrated.
1598

1599
  @type instance: L{objects.Instance}
1600
  @param instance: the instance definition
1601

1602
  """
1603
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1604
  try:
1605
    info = hyper.MigrationInfo(instance)
1606
  except errors.HypervisorError, err:
1607
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1608
  return info
1609

    
1610

    
1611
def AcceptInstance(instance, info, target):
1612
  """Prepare the node to accept an instance.
1613

1614
  @type instance: L{objects.Instance}
1615
  @param instance: the instance definition
1616
  @type info: string/data (opaque)
1617
  @param info: migration information, from the source node
1618
  @type target: string
1619
  @param target: target host (usually ip), on this node
1620

1621
  """
1622
  # TODO: why is this required only for DTS_EXT_MIRROR?
1623
  if instance.disk_template in constants.DTS_EXT_MIRROR:
1624
    # Create the symlinks, as the disks are not active
1625
    # in any way
1626
    try:
1627
      _GatherAndLinkBlockDevs(instance)
1628
    except errors.BlockDeviceError, err:
1629
      _Fail("Block device error: %s", err, exc=True)
1630

    
1631
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1632
  try:
1633
    hyper.AcceptInstance(instance, info, target)
1634
  except errors.HypervisorError, err:
1635
    if instance.disk_template in constants.DTS_EXT_MIRROR:
1636
      _RemoveBlockDevLinks(instance.name, instance.disks)
1637
    _Fail("Failed to accept instance: %s", err, exc=True)
1638

    
1639

    
1640
def FinalizeMigrationDst(instance, info, success):
1641
  """Finalize any preparation to accept an instance.
1642

1643
  @type instance: L{objects.Instance}
1644
  @param instance: the instance definition
1645
  @type info: string/data (opaque)
1646
  @param info: migration information, from the source node
1647
  @type success: boolean
1648
  @param success: whether the migration was a success or a failure
1649

1650
  """
1651
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1652
  try:
1653
    hyper.FinalizeMigrationDst(instance, info, success)
1654
  except errors.HypervisorError, err:
1655
    _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1656

    
1657

    
1658
def MigrateInstance(instance, target, live):
1659
  """Migrates an instance to another node.
1660

1661
  @type instance: L{objects.Instance}
1662
  @param instance: the instance definition
1663
  @type target: string
1664
  @param target: the target node name
1665
  @type live: boolean
1666
  @param live: whether the migration should be done live or not (the
1667
      interpretation of this parameter is left to the hypervisor)
1668
  @raise RPCFail: if migration fails for some reason
1669

1670
  """
1671
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1672

    
1673
  try:
1674
    hyper.MigrateInstance(instance, target, live)
1675
  except errors.HypervisorError, err:
1676
    _Fail("Failed to migrate instance: %s", err, exc=True)
1677

    
1678

    
1679
def FinalizeMigrationSource(instance, success, live):
1680
  """Finalize the instance migration on the source node.
1681

1682
  @type instance: L{objects.Instance}
1683
  @param instance: the instance definition of the migrated instance
1684
  @type success: bool
1685
  @param success: whether the migration succeeded or not
1686
  @type live: bool
1687
  @param live: whether the user requested a live migration or not
1688
  @raise RPCFail: If the execution fails for some reason
1689

1690
  """
1691
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1692

    
1693
  try:
1694
    hyper.FinalizeMigrationSource(instance, success, live)
1695
  except Exception, err:  # pylint: disable=W0703
1696
    _Fail("Failed to finalize the migration on the source node: %s", err,
1697
          exc=True)
1698

    
1699

    
1700
def GetMigrationStatus(instance):
1701
  """Get the migration status
1702

1703
  @type instance: L{objects.Instance}
1704
  @param instance: the instance that is being migrated
1705
  @rtype: L{objects.MigrationStatus}
1706
  @return: the status of the current migration (one of
1707
           L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
1708
           progress info that can be retrieved from the hypervisor
1709
  @raise RPCFail: If the migration status cannot be retrieved
1710

1711
  """
1712
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1713
  try:
1714
    return hyper.GetMigrationStatus(instance)
1715
  except Exception, err:  # pylint: disable=W0703
1716
    _Fail("Failed to get migration status: %s", err, exc=True)
1717

    
1718

    
1719
def BlockdevCreate(disk, size, owner, on_primary, info, excl_stor):
1720
  """Creates a block device for an instance.
1721

1722
  @type disk: L{objects.Disk}
1723
  @param disk: the object describing the disk we should create
1724
  @type size: int
1725
  @param size: the size of the physical underlying device, in MiB
1726
  @type owner: str
1727
  @param owner: the name of the instance for which disk is created,
1728
      used for device cache data
1729
  @type on_primary: boolean
1730
  @param on_primary:  indicates if it is the primary node or not
1731
  @type info: string
1732
  @param info: string that will be sent to the physical device
1733
      creation, used for example to set (LVM) tags on LVs
1734
  @type excl_stor: boolean
1735
  @param excl_stor: Whether exclusive_storage is active
1736

1737
  @return: the new unique_id of the device (this can sometime be
1738
      computed only after creation), or None. On secondary nodes,
1739
      it's not required to return anything.
1740

1741
  """
1742
  # TODO: remove the obsolete "size" argument
1743
  # pylint: disable=W0613
1744
  clist = []
1745
  if disk.children:
1746
    for child in disk.children:
1747
      try:
1748
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1749
      except errors.BlockDeviceError, err:
1750
        _Fail("Can't assemble device %s: %s", child, err)
1751
      if on_primary or disk.AssembleOnSecondary():
1752
        # we need the children open in case the device itself has to
1753
        # be assembled
1754
        try:
1755
          # pylint: disable=E1103
1756
          crdev.Open()
1757
        except errors.BlockDeviceError, err:
1758
          _Fail("Can't make child '%s' read-write: %s", child, err)
1759
      clist.append(crdev)
1760

    
1761
  try:
1762
    device = bdev.Create(disk, clist, excl_stor)
1763
  except errors.BlockDeviceError, err:
1764
    _Fail("Can't create block device: %s", err)
1765

    
1766
  if on_primary or disk.AssembleOnSecondary():
1767
    try:
1768
      device.Assemble()
1769
    except errors.BlockDeviceError, err:
1770
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1771
    if on_primary or disk.OpenOnSecondary():
1772
      try:
1773
        device.Open(force=True)
1774
      except errors.BlockDeviceError, err:
1775
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1776
    DevCacheManager.UpdateCache(device.dev_path, owner,
1777
                                on_primary, disk.iv_name)
1778

    
1779
  device.SetInfo(info)
1780

    
1781
  return device.unique_id
1782

    
1783

    
1784
def _WipeDevice(path, offset, size):
1785
  """This function actually wipes the device.
1786

1787
  @param path: The path to the device to wipe
1788
  @param offset: The offset in MiB in the file
1789
  @param size: The size in MiB to write
1790

1791
  """
1792
  # Internal sizes are always in Mebibytes; if the following "dd" command
1793
  # should use a different block size the offset and size given to this
1794
  # function must be adjusted accordingly before being passed to "dd".
1795
  block_size = 1024 * 1024
1796

    
1797
  cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1798
         "bs=%s" % block_size, "oflag=direct", "of=%s" % path,
1799
         "count=%d" % size]
1800
  result = utils.RunCmd(cmd)
1801

    
1802
  if result.failed:
1803
    _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1804
          result.fail_reason, result.output)
1805

    
1806

    
1807
def BlockdevWipe(disk, offset, size):
1808
  """Wipes a block device.
1809

1810
  @type disk: L{objects.Disk}
1811
  @param disk: the disk object we want to wipe
1812
  @type offset: int
1813
  @param offset: The offset in MiB in the file
1814
  @type size: int
1815
  @param size: The size in MiB to write
1816

1817
  """
1818
  try:
1819
    rdev = _RecursiveFindBD(disk)
1820
  except errors.BlockDeviceError:
1821
    rdev = None
1822

    
1823
  if not rdev:
1824
    _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1825

    
1826
  # Do cross verify some of the parameters
1827
  if offset < 0:
1828
    _Fail("Negative offset")
1829
  if size < 0:
1830
    _Fail("Negative size")
1831
  if offset > rdev.size:
1832
    _Fail("Offset is bigger than device size")
1833
  if (offset + size) > rdev.size:
1834
    _Fail("The provided offset and size to wipe is bigger than device size")
1835

    
1836
  _WipeDevice(rdev.dev_path, offset, size)
1837

    
1838

    
1839
def BlockdevPauseResumeSync(disks, pause):
1840
  """Pause or resume the sync of the block device.
1841

1842
  @type disks: list of L{objects.Disk}
1843
  @param disks: the disks object we want to pause/resume
1844
  @type pause: bool
1845
  @param pause: Wheater to pause or resume
1846

1847
  """
1848
  success = []
1849
  for disk in disks:
1850
    try:
1851
      rdev = _RecursiveFindBD(disk)
1852
    except errors.BlockDeviceError:
1853
      rdev = None
1854

    
1855
    if not rdev:
1856
      success.append((False, ("Cannot change sync for device %s:"
1857
                              " device not found" % disk.iv_name)))
1858
      continue
1859

    
1860
    result = rdev.PauseResumeSync(pause)
1861

    
1862
    if result:
1863
      success.append((result, None))
1864
    else:
1865
      if pause:
1866
        msg = "Pause"
1867
      else:
1868
        msg = "Resume"
1869
      success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1870

    
1871
  return success
1872

    
1873

    
1874
def BlockdevRemove(disk):
1875
  """Remove a block device.
1876

1877
  @note: This is intended to be called recursively.
1878

1879
  @type disk: L{objects.Disk}
1880
  @param disk: the disk object we should remove
1881
  @rtype: boolean
1882
  @return: the success of the operation
1883

1884
  """
1885
  msgs = []
1886
  try:
1887
    rdev = _RecursiveFindBD(disk)
1888
  except errors.BlockDeviceError, err:
1889
    # probably can't attach
1890
    logging.info("Can't attach to device %s in remove", disk)
1891
    rdev = None
1892
  if rdev is not None:
1893
    r_path = rdev.dev_path
1894
    try:
1895
      rdev.Remove()
1896
    except errors.BlockDeviceError, err:
1897
      msgs.append(str(err))
1898
    if not msgs:
1899
      DevCacheManager.RemoveCache(r_path)
1900

    
1901
  if disk.children:
1902
    for child in disk.children:
1903
      try:
1904
        BlockdevRemove(child)
1905
      except RPCFail, err:
1906
        msgs.append(str(err))
1907

    
1908
  if msgs:
1909
    _Fail("; ".join(msgs))
1910

    
1911

    
1912
def _RecursiveAssembleBD(disk, owner, as_primary):
1913
  """Activate a block device for an instance.
1914

1915
  This is run on the primary and secondary nodes for an instance.
1916

1917
  @note: this function is called recursively.
1918

1919
  @type disk: L{objects.Disk}
1920
  @param disk: the disk we try to assemble
1921
  @type owner: str
1922
  @param owner: the name of the instance which owns the disk
1923
  @type as_primary: boolean
1924
  @param as_primary: if we should make the block device
1925
      read/write
1926

1927
  @return: the assembled device or None (in case no device
1928
      was assembled)
1929
  @raise errors.BlockDeviceError: in case there is an error
1930
      during the activation of the children or the device
1931
      itself
1932

1933
  """
1934
  children = []
1935
  if disk.children:
1936
    mcn = disk.ChildrenNeeded()
1937
    if mcn == -1:
1938
      mcn = 0 # max number of Nones allowed
1939
    else:
1940
      mcn = len(disk.children) - mcn # max number of Nones
1941
    for chld_disk in disk.children:
1942
      try:
1943
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1944
      except errors.BlockDeviceError, err:
1945
        if children.count(None) >= mcn:
1946
          raise
1947
        cdev = None
1948
        logging.error("Error in child activation (but continuing): %s",
1949
                      str(err))
1950
      children.append(cdev)
1951

    
1952
  if as_primary or disk.AssembleOnSecondary():
1953
    r_dev = bdev.Assemble(disk, children)
1954
    result = r_dev
1955
    if as_primary or disk.OpenOnSecondary():
1956
      r_dev.Open()
1957
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1958
                                as_primary, disk.iv_name)
1959

    
1960
  else:
1961
    result = True
1962
  return result
1963

    
1964

    
1965
def BlockdevAssemble(disk, owner, as_primary, idx):
1966
  """Activate a block device for an instance.
1967

1968
  This is a wrapper over _RecursiveAssembleBD.
1969

1970
  @rtype: str or boolean
1971
  @return: a C{/dev/...} path for primary nodes, and
1972
      C{True} for secondary nodes
1973

1974
  """
1975
  try:
1976
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1977
    if isinstance(result, BlockDev):
1978
      # pylint: disable=E1103
1979
      result = result.dev_path
1980
      if as_primary:
1981
        _SymlinkBlockDev(owner, result, idx)
1982
  except errors.BlockDeviceError, err:
1983
    _Fail("Error while assembling disk: %s", err, exc=True)
1984
  except OSError, err:
1985
    _Fail("Error while symlinking disk: %s", err, exc=True)
1986

    
1987
  return result
1988

    
1989

    
1990
def BlockdevShutdown(disk):
1991
  """Shut down a block device.
1992

1993
  First, if the device is assembled (Attach() is successful), then
1994
  the device is shutdown. Then the children of the device are
1995
  shutdown.
1996

1997
  This function is called recursively. Note that we don't cache the
1998
  children or such, as oppossed to assemble, shutdown of different
1999
  devices doesn't require that the upper device was active.
2000

2001
  @type disk: L{objects.Disk}
2002
  @param disk: the description of the disk we should
2003
      shutdown
2004
  @rtype: None
2005

2006
  """
2007
  msgs = []
2008
  r_dev = _RecursiveFindBD(disk)
2009
  if r_dev is not None:
2010
    r_path = r_dev.dev_path
2011
    try:
2012
      r_dev.Shutdown()
2013
      DevCacheManager.RemoveCache(r_path)
2014
    except errors.BlockDeviceError, err:
2015
      msgs.append(str(err))
2016

    
2017
  if disk.children:
2018
    for child in disk.children:
2019
      try:
2020
        BlockdevShutdown(child)
2021
      except RPCFail, err:
2022
        msgs.append(str(err))
2023

    
2024
  if msgs:
2025
    _Fail("; ".join(msgs))
2026

    
2027

    
2028
def BlockdevAddchildren(parent_cdev, new_cdevs):
2029
  """Extend a mirrored block device.
2030

2031
  @type parent_cdev: L{objects.Disk}
2032
  @param parent_cdev: the disk to which we should add children
2033
  @type new_cdevs: list of L{objects.Disk}
2034
  @param new_cdevs: the list of children which we should add
2035
  @rtype: None
2036

2037
  """
2038
  parent_bdev = _RecursiveFindBD(parent_cdev)
2039
  if parent_bdev is None:
2040
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
2041
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
2042
  if new_bdevs.count(None) > 0:
2043
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
2044
  parent_bdev.AddChildren(new_bdevs)
2045

    
2046

    
2047
def BlockdevRemovechildren(parent_cdev, new_cdevs):
2048
  """Shrink a mirrored block device.
2049

2050
  @type parent_cdev: L{objects.Disk}
2051
  @param parent_cdev: the disk from which we should remove children
2052
  @type new_cdevs: list of L{objects.Disk}
2053
  @param new_cdevs: the list of children which we should remove
2054
  @rtype: None
2055

2056
  """
2057
  parent_bdev = _RecursiveFindBD(parent_cdev)
2058
  if parent_bdev is None:
2059
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
2060
  devs = []
2061
  for disk in new_cdevs:
2062
    rpath = disk.StaticDevPath()
2063
    if rpath is None:
2064
      bd = _RecursiveFindBD(disk)
2065
      if bd is None:
2066
        _Fail("Can't find device %s while removing children", disk)
2067
      else:
2068
        devs.append(bd.dev_path)
2069
    else:
2070
      if not utils.IsNormAbsPath(rpath):
2071
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
2072
      devs.append(rpath)
2073
  parent_bdev.RemoveChildren(devs)
2074

    
2075

    
2076
def BlockdevGetmirrorstatus(disks):
2077
  """Get the mirroring status of a list of devices.
2078

2079
  @type disks: list of L{objects.Disk}
2080
  @param disks: the list of disks which we should query
2081
  @rtype: disk
2082
  @return: List of L{objects.BlockDevStatus}, one for each disk
2083
  @raise errors.BlockDeviceError: if any of the disks cannot be
2084
      found
2085

2086
  """
2087
  stats = []
2088
  for dsk in disks:
2089
    rbd = _RecursiveFindBD(dsk)
2090
    if rbd is None:
2091
      _Fail("Can't find device %s", dsk)
2092

    
2093
    stats.append(rbd.CombinedSyncStatus())
2094

    
2095
  return stats
2096

    
2097

    
2098
def BlockdevGetmirrorstatusMulti(disks):
2099
  """Get the mirroring status of a list of devices.
2100

2101
  @type disks: list of L{objects.Disk}
2102
  @param disks: the list of disks which we should query
2103
  @rtype: disk
2104
  @return: List of tuples, (bool, status), one for each disk; bool denotes
2105
    success/failure, status is L{objects.BlockDevStatus} on success, string
2106
    otherwise
2107

2108
  """
2109
  result = []
2110
  for disk in disks:
2111
    try:
2112
      rbd = _RecursiveFindBD(disk)
2113
      if rbd is None:
2114
        result.append((False, "Can't find device %s" % disk))
2115
        continue
2116

    
2117
      status = rbd.CombinedSyncStatus()
2118
    except errors.BlockDeviceError, err:
2119
      logging.exception("Error while getting disk status")
2120
      result.append((False, str(err)))
2121
    else:
2122
      result.append((True, status))
2123

    
2124
  assert len(disks) == len(result)
2125

    
2126
  return result
2127

    
2128

    
2129
def _RecursiveFindBD(disk):
2130
  """Check if a device is activated.
2131

2132
  If so, return information about the real device.
2133

2134
  @type disk: L{objects.Disk}
2135
  @param disk: the disk object we need to find
2136

2137
  @return: None if the device can't be found,
2138
      otherwise the device instance
2139

2140
  """
2141
  children = []
2142
  if disk.children:
2143
    for chdisk in disk.children:
2144
      children.append(_RecursiveFindBD(chdisk))
2145

    
2146
  return bdev.FindDevice(disk, children)
2147

    
2148

    
2149
def _OpenRealBD(disk):
2150
  """Opens the underlying block device of a disk.
2151

2152
  @type disk: L{objects.Disk}
2153
  @param disk: the disk object we want to open
2154

2155
  """
2156
  real_disk = _RecursiveFindBD(disk)
2157
  if real_disk is None:
2158
    _Fail("Block device '%s' is not set up", disk)
2159

    
2160
  real_disk.Open()
2161

    
2162
  return real_disk
2163

    
2164

    
2165
def BlockdevFind(disk):
2166
  """Check if a device is activated.
2167

2168
  If it is, return information about the real device.
2169

2170
  @type disk: L{objects.Disk}
2171
  @param disk: the disk to find
2172
  @rtype: None or objects.BlockDevStatus
2173
  @return: None if the disk cannot be found, otherwise a the current
2174
           information
2175

2176
  """
2177
  try:
2178
    rbd = _RecursiveFindBD(disk)
2179
  except errors.BlockDeviceError, err:
2180
    _Fail("Failed to find device: %s", err, exc=True)
2181

    
2182
  if rbd is None:
2183
    return None
2184

    
2185
  return rbd.GetSyncStatus()
2186

    
2187

    
2188
def BlockdevGetdimensions(disks):
2189
  """Computes the size of the given disks.
2190

2191
  If a disk is not found, returns None instead.
2192

2193
  @type disks: list of L{objects.Disk}
2194
  @param disks: the list of disk to compute the size for
2195
  @rtype: list
2196
  @return: list with elements None if the disk cannot be found,
2197
      otherwise the pair (size, spindles), where spindles is None if the
2198
      device doesn't support that
2199

2200
  """
2201
  result = []
2202
  for cf in disks:
2203
    try:
2204
      rbd = _RecursiveFindBD(cf)
2205
    except errors.BlockDeviceError:
2206
      result.append(None)
2207
      continue
2208
    if rbd is None:
2209
      result.append(None)
2210
    else:
2211
      result.append(rbd.GetActualDimensions())
2212
  return result
2213

    
2214

    
2215
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
2216
  """Export a block device to a remote node.
2217

2218
  @type disk: L{objects.Disk}
2219
  @param disk: the description of the disk to export
2220
  @type dest_node: str
2221
  @param dest_node: the destination node to export to
2222
  @type dest_path: str
2223
  @param dest_path: the destination path on the target node
2224
  @type cluster_name: str
2225
  @param cluster_name: the cluster name, needed for SSH hostalias
2226
  @rtype: None
2227

2228
  """
2229
  real_disk = _OpenRealBD(disk)
2230

    
2231
  # the block size on the read dd is 1MiB to match our units
2232
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
2233
                               "dd if=%s bs=1048576 count=%s",
2234
                               real_disk.dev_path, str(disk.size))
2235

    
2236
  # we set here a smaller block size as, due to ssh buffering, more
2237
  # than 64-128k will mostly ignored; we use nocreat to fail if the
2238
  # device is not already there or we pass a wrong path; we use
2239
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
2240
  # to not buffer too much memory; this means that at best, we flush
2241
  # every 64k, which will not be very fast
2242
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
2243
                                " oflag=dsync", dest_path)
2244

    
2245
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
2246
                                                   constants.SSH_LOGIN_USER,
2247
                                                   destcmd)
2248

    
2249
  # all commands have been checked, so we're safe to combine them
2250
  command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
2251

    
2252
  result = utils.RunCmd(["bash", "-c", command])
2253

    
2254
  if result.failed:
2255
    _Fail("Disk copy command '%s' returned error: %s"
2256
          " output: %s", command, result.fail_reason, result.output)
2257

    
2258

    
2259
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2260
  """Write a file to the filesystem.
2261

2262
  This allows the master to overwrite(!) a file. It will only perform
2263
  the operation if the file belongs to a list of configuration files.
2264

2265
  @type file_name: str
2266
  @param file_name: the target file name
2267
  @type data: str
2268
  @param data: the new contents of the file
2269
  @type mode: int
2270
  @param mode: the mode to give the file (can be None)
2271
  @type uid: string
2272
  @param uid: the owner of the file
2273
  @type gid: string
2274
  @param gid: the group of the file
2275
  @type atime: float
2276
  @param atime: the atime to set on the file (can be None)
2277
  @type mtime: float
2278
  @param mtime: the mtime to set on the file (can be None)
2279
  @rtype: None
2280

2281
  """
2282
  file_name = vcluster.LocalizeVirtualPath(file_name)
2283

    
2284
  if not os.path.isabs(file_name):
2285
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2286

    
2287
  if file_name not in _ALLOWED_UPLOAD_FILES:
2288
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2289
          file_name)
2290

    
2291
  raw_data = _Decompress(data)
2292

    
2293
  if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2294
    _Fail("Invalid username/groupname type")
2295

    
2296
  getents = runtime.GetEnts()
2297
  uid = getents.LookupUser(uid)
2298
  gid = getents.LookupGroup(gid)
2299

    
2300
  utils.SafeWriteFile(file_name, None,
2301
                      data=raw_data, mode=mode, uid=uid, gid=gid,
2302
                      atime=atime, mtime=mtime)
2303

    
2304

    
2305
def RunOob(oob_program, command, node, timeout):
2306
  """Executes oob_program with given command on given node.
2307

2308
  @param oob_program: The path to the executable oob_program
2309
  @param command: The command to invoke on oob_program
2310
  @param node: The node given as an argument to the program
2311
  @param timeout: Timeout after which we kill the oob program
2312

2313
  @return: stdout
2314
  @raise RPCFail: If execution fails for some reason
2315

2316
  """
2317
  result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2318

    
2319
  if result.failed:
2320
    _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2321
          result.fail_reason, result.output)
2322

    
2323
  return result.stdout
2324

    
2325

    
2326
def _OSOndiskAPIVersion(os_dir):
2327
  """Compute and return the API version of a given OS.
2328

2329
  This function will try to read the API version of the OS residing in
2330
  the 'os_dir' directory.
2331

2332
  @type os_dir: str
2333
  @param os_dir: the directory in which we should look for the OS
2334
  @rtype: tuple
2335
  @return: tuple (status, data) with status denoting the validity and
2336
      data holding either the vaid versions or an error message
2337

2338
  """
2339
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2340

    
2341
  try:
2342
    st = os.stat(api_file)
2343
  except EnvironmentError, err:
2344
    return False, ("Required file '%s' not found under path %s: %s" %
2345
                   (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err)))
2346

    
2347
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2348
    return False, ("File '%s' in %s is not a regular file" %
2349
                   (constants.OS_API_FILE, os_dir))
2350

    
2351
  try:
2352
    api_versions = utils.ReadFile(api_file).splitlines()
2353
  except EnvironmentError, err:
2354
    return False, ("Error while reading the API version file at %s: %s" %
2355
                   (api_file, utils.ErrnoOrStr(err)))
2356

    
2357
  try:
2358
    api_versions = [int(version.strip()) for version in api_versions]
2359
  except (TypeError, ValueError), err:
2360
    return False, ("API version(s) can't be converted to integer: %s" %
2361
                   str(err))
2362

    
2363
  return True, api_versions
2364

    
2365

    
2366
def DiagnoseOS(top_dirs=None):
2367
  """Compute the validity for all OSes.
2368

2369
  @type top_dirs: list
2370
  @param top_dirs: the list of directories in which to
2371
      search (if not given defaults to
2372
      L{pathutils.OS_SEARCH_PATH})
2373
  @rtype: list of L{objects.OS}
2374
  @return: a list of tuples (name, path, status, diagnose, variants,
2375
      parameters, api_version) for all (potential) OSes under all
2376
      search paths, where:
2377
          - name is the (potential) OS name
2378
          - path is the full path to the OS
2379
          - status True/False is the validity of the OS
2380
          - diagnose is the error message for an invalid OS, otherwise empty
2381
          - variants is a list of supported OS variants, if any
2382
          - parameters is a list of (name, help) parameters, if any
2383
          - api_version is a list of support OS API versions
2384

2385
  """
2386
  if top_dirs is None:
2387
    top_dirs = pathutils.OS_SEARCH_PATH
2388

    
2389
  result = []
2390
  for dir_name in top_dirs:
2391
    if os.path.isdir(dir_name):
2392
      try:
2393
        f_names = utils.ListVisibleFiles(dir_name)
2394
      except EnvironmentError, err:
2395
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2396
        break
2397
      for name in f_names:
2398
        os_path = utils.PathJoin(dir_name, name)
2399
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2400
        if status:
2401
          diagnose = ""
2402
          variants = os_inst.supported_variants
2403
          parameters = os_inst.supported_parameters
2404
          api_versions = os_inst.api_versions
2405
        else:
2406
          diagnose = os_inst
2407
          variants = parameters = api_versions = []
2408
        result.append((name, os_path, status, diagnose, variants,
2409
                       parameters, api_versions))
2410

    
2411
  return result
2412

    
2413

    
2414
def _TryOSFromDisk(name, base_dir=None):
2415
  """Create an OS instance from disk.
2416

2417
  This function will return an OS instance if the given name is a
2418
  valid OS name.
2419

2420
  @type base_dir: string
2421
  @keyword base_dir: Base directory containing OS installations.
2422
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2423
  @rtype: tuple
2424
  @return: success and either the OS instance if we find a valid one,
2425
      or error message
2426

2427
  """
2428
  if base_dir is None:
2429
    os_dir = utils.FindFile(name, pathutils.OS_SEARCH_PATH, os.path.isdir)
2430
  else:
2431
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2432

    
2433
  if os_dir is None:
2434
    return False, "Directory for OS %s not found in search path" % name
2435

    
2436
  status, api_versions = _OSOndiskAPIVersion(os_dir)
2437
  if not status:
2438
    # push the error up
2439
    return status, api_versions
2440

    
2441
  if not constants.OS_API_VERSIONS.intersection(api_versions):
2442
    return False, ("API version mismatch for path '%s': found %s, want %s." %
2443
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
2444

    
2445
  # OS Files dictionary, we will populate it with the absolute path
2446
  # names; if the value is True, then it is a required file, otherwise
2447
  # an optional one
2448
  os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2449

    
2450
  if max(api_versions) >= constants.OS_API_V15:
2451
    os_files[constants.OS_VARIANTS_FILE] = False
2452

    
2453
  if max(api_versions) >= constants.OS_API_V20:
2454
    os_files[constants.OS_PARAMETERS_FILE] = True
2455
  else:
2456
    del os_files[constants.OS_SCRIPT_VERIFY]
2457

    
2458
  for (filename, required) in os_files.items():
2459
    os_files[filename] = utils.PathJoin(os_dir, filename)
2460

    
2461
    try:
2462
      st = os.stat(os_files[filename])
2463
    except EnvironmentError, err:
2464
      if err.errno == errno.ENOENT and not required:
2465
        del os_files[filename]
2466
        continue
2467
      return False, ("File '%s' under path '%s' is missing (%s)" %
2468
                     (filename, os_dir, utils.ErrnoOrStr(err)))
2469

    
2470
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2471
      return False, ("File '%s' under path '%s' is not a regular file" %
2472
                     (filename, os_dir))
2473

    
2474
    if filename in constants.OS_SCRIPTS:
2475
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2476
        return False, ("File '%s' under path '%s' is not executable" %
2477
                       (filename, os_dir))
2478

    
2479
  variants = []
2480
  if constants.OS_VARIANTS_FILE in os_files:
2481
    variants_file = os_files[constants.OS_VARIANTS_FILE]
2482
    try:
2483
      variants = \
2484
        utils.FilterEmptyLinesAndComments(utils.ReadFile(variants_file))
2485
    except EnvironmentError, err:
2486
      # we accept missing files, but not other errors
2487
      if err.errno != errno.ENOENT:
2488
        return False, ("Error while reading the OS variants file at %s: %s" %
2489
                       (variants_file, utils.ErrnoOrStr(err)))
2490

    
2491
  parameters = []
2492
  if constants.OS_PARAMETERS_FILE in os_files:
2493
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2494
    try:
2495
      parameters = utils.ReadFile(parameters_file).splitlines()
2496
    except EnvironmentError, err:
2497
      return False, ("Error while reading the OS parameters file at %s: %s" %
2498
                     (parameters_file, utils.ErrnoOrStr(err)))
2499
    parameters = [v.split(None, 1) for v in parameters]
2500

    
2501
  os_obj = objects.OS(name=name, path=os_dir,
2502
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
2503
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
2504
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
2505
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
2506
                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2507
                                                 None),
2508
                      supported_variants=variants,
2509
                      supported_parameters=parameters,
2510
                      api_versions=api_versions)
2511
  return True, os_obj
2512

    
2513

    
2514
def OSFromDisk(name, base_dir=None):
2515
  """Create an OS instance from disk.
2516

2517
  This function will return an OS instance if the given name is a
2518
  valid OS name. Otherwise, it will raise an appropriate
2519
  L{RPCFail} exception, detailing why this is not a valid OS.
2520

2521
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2522
  an exception but returns true/false status data.
2523

2524
  @type base_dir: string
2525
  @keyword base_dir: Base directory containing OS installations.
2526
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2527
  @rtype: L{objects.OS}
2528
  @return: the OS instance if we find a valid one
2529
  @raise RPCFail: if we don't find a valid OS
2530

2531
  """
2532
  name_only = objects.OS.GetName(name)
2533
  status, payload = _TryOSFromDisk(name_only, base_dir)
2534

    
2535
  if not status:
2536
    _Fail(payload)
2537

    
2538
  return payload
2539

    
2540

    
2541
def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2542
  """Calculate the basic environment for an os script.
2543

2544
  @type os_name: str
2545
  @param os_name: full operating system name (including variant)
2546
  @type inst_os: L{objects.OS}
2547
  @param inst_os: operating system for which the environment is being built
2548
  @type os_params: dict
2549
  @param os_params: the OS parameters
2550
  @type debug: integer
2551
  @param debug: debug level (0 or 1, for OS Api 10)
2552
  @rtype: dict
2553
  @return: dict of environment variables
2554
  @raise errors.BlockDeviceError: if the block device
2555
      cannot be found
2556

2557
  """
2558
  result = {}
2559
  api_version = \
2560
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2561
  result["OS_API_VERSION"] = "%d" % api_version
2562
  result["OS_NAME"] = inst_os.name
2563
  result["DEBUG_LEVEL"] = "%d" % debug
2564

    
2565
  # OS variants
2566
  if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2567
    variant = objects.OS.GetVariant(os_name)
2568
    if not variant:
2569
      variant = inst_os.supported_variants[0]
2570
  else:
2571
    variant = ""
2572
  result["OS_VARIANT"] = variant
2573

    
2574
  # OS params
2575
  for pname, pvalue in os_params.items():
2576
    result["OSP_%s" % pname.upper()] = pvalue
2577

    
2578
  # Set a default path otherwise programs called by OS scripts (or
2579
  # even hooks called from OS scripts) might break, and we don't want
2580
  # to have each script require setting a PATH variable
2581
  result["PATH"] = constants.HOOKS_PATH
2582

    
2583
  return result
2584

    
2585

    
2586
def OSEnvironment(instance, inst_os, debug=0):
2587
  """Calculate the environment for an os script.
2588

2589
  @type instance: L{objects.Instance}
2590
  @param instance: target instance for the os script run
2591
  @type inst_os: L{objects.OS}
2592
  @param inst_os: operating system for which the environment is being built
2593
  @type debug: integer
2594
  @param debug: debug level (0 or 1, for OS Api 10)
2595
  @rtype: dict
2596
  @return: dict of environment variables
2597
  @raise errors.BlockDeviceError: if the block device
2598
      cannot be found
2599

2600
  """
2601
  result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2602

    
2603
  for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2604
    result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2605

    
2606
  result["HYPERVISOR"] = instance.hypervisor
2607
  result["DISK_COUNT"] = "%d" % len(instance.disks)
2608
  result["NIC_COUNT"] = "%d" % len(instance.nics)
2609
  result["INSTANCE_SECONDARY_NODES"] = \
2610
      ("%s" % " ".join(instance.secondary_nodes))
2611

    
2612
  # Disks
2613
  for idx, disk in enumerate(instance.disks):
2614
    real_disk = _OpenRealBD(disk)
2615
    result["DISK_%d_PATH" % idx] = real_disk.dev_path
2616
    result["DISK_%d_ACCESS" % idx] = disk.mode
2617
    if constants.HV_DISK_TYPE in instance.hvparams:
2618
      result["DISK_%d_FRONTEND_TYPE" % idx] = \
2619
        instance.hvparams[constants.HV_DISK_TYPE]
2620
    if disk.dev_type in constants.LDS_BLOCK:
2621
      result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2622
    elif disk.dev_type == constants.LD_FILE:
2623
      result["DISK_%d_BACKEND_TYPE" % idx] = \
2624
        "file:%s" % disk.physical_id[0]
2625

    
2626
  # NICs
2627
  for idx, nic in enumerate(instance.nics):
2628
    result["NIC_%d_MAC" % idx] = nic.mac
2629
    if nic.ip:
2630
      result["NIC_%d_IP" % idx] = nic.ip
2631
    result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2632
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2633
      result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2634
    if nic.nicparams[constants.NIC_LINK]:
2635
      result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2636
    if nic.netinfo:
2637
      nobj = objects.Network.FromDict(nic.netinfo)
2638
      result.update(nobj.HooksDict("NIC_%d_" % idx))
2639
    if constants.HV_NIC_TYPE in instance.hvparams:
2640
      result["NIC_%d_FRONTEND_TYPE" % idx] = \
2641
        instance.hvparams[constants.HV_NIC_TYPE]
2642

    
2643
  # HV/BE params
2644
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2645
    for key, value in source.items():
2646
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2647

    
2648
  return result
2649

    
2650

    
2651
def DiagnoseExtStorage(top_dirs=None):
2652
  """Compute the validity for all ExtStorage Providers.
2653

2654
  @type top_dirs: list
2655
  @param top_dirs: the list of directories in which to
2656
      search (if not given defaults to
2657
      L{pathutils.ES_SEARCH_PATH})
2658
  @rtype: list of L{objects.ExtStorage}
2659
  @return: a list of tuples (name, path, status, diagnose, parameters)
2660
      for all (potential) ExtStorage Providers under all
2661
      search paths, where:
2662
          - name is the (potential) ExtStorage Provider
2663
          - path is the full path to the ExtStorage Provider
2664
          - status True/False is the validity of the ExtStorage Provider
2665
          - diagnose is the error message for an invalid ExtStorage Provider,
2666
            otherwise empty
2667
          - parameters is a list of (name, help) parameters, if any
2668

2669
  """
2670
  if top_dirs is None:
2671
    top_dirs = pathutils.ES_SEARCH_PATH
2672

    
2673
  result = []
2674
  for dir_name in top_dirs:
2675
    if os.path.isdir(dir_name):
2676
      try:
2677
        f_names = utils.ListVisibleFiles(dir_name)
2678
      except EnvironmentError, err:
2679
        logging.exception("Can't list the ExtStorage directory %s: %s",
2680
                          dir_name, err)
2681
        break
2682
      for name in f_names:
2683
        es_path = utils.PathJoin(dir_name, name)
2684
        status, es_inst = bdev.ExtStorageFromDisk(name, base_dir=dir_name)
2685
        if status:
2686
          diagnose = ""
2687
          parameters = es_inst.supported_parameters
2688
        else:
2689
          diagnose = es_inst
2690
          parameters = []
2691
        result.append((name, es_path, status, diagnose, parameters))
2692

    
2693
  return result
2694

    
2695

    
2696
def BlockdevGrow(disk, amount, dryrun, backingstore):
2697
  """Grow a stack of block devices.
2698

2699
  This function is called recursively, with the childrens being the
2700
  first ones to resize.
2701

2702
  @type disk: L{objects.Disk}
2703
  @param disk: the disk to be grown
2704
  @type amount: integer
2705
  @param amount: the amount (in mebibytes) to grow with
2706
  @type dryrun: boolean
2707
  @param dryrun: whether to execute the operation in simulation mode
2708
      only, without actually increasing the size
2709
  @param backingstore: whether to execute the operation on backing storage
2710
      only, or on "logical" storage only; e.g. DRBD is logical storage,
2711
      whereas LVM, file, RBD are backing storage
2712
  @rtype: (status, result)
2713
  @return: a tuple with the status of the operation (True/False), and
2714
      the errors message if status is False
2715

2716
  """
2717
  r_dev = _RecursiveFindBD(disk)
2718
  if r_dev is None:
2719
    _Fail("Cannot find block device %s", disk)
2720

    
2721
  try:
2722
    r_dev.Grow(amount, dryrun, backingstore)
2723
  except errors.BlockDeviceError, err:
2724
    _Fail("Failed to grow block device: %s", err, exc=True)
2725

    
2726

    
2727
def BlockdevSnapshot(disk):
2728
  """Create a snapshot copy of a block device.
2729

2730
  This function is called recursively, and the snapshot is actually created
2731
  just for the leaf lvm backend device.
2732

2733
  @type disk: L{objects.Disk}
2734
  @param disk: the disk to be snapshotted
2735
  @rtype: string
2736
  @return: snapshot disk ID as (vg, lv)
2737

2738
  """
2739
  if disk.dev_type == constants.LD_DRBD8:
2740
    if not disk.children:
2741
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2742
            disk.unique_id)
2743
    return BlockdevSnapshot(disk.children[0])
2744
  elif disk.dev_type == constants.LD_LV:
2745
    r_dev = _RecursiveFindBD(disk)
2746
    if r_dev is not None:
2747
      # FIXME: choose a saner value for the snapshot size
2748
      # let's stay on the safe side and ask for the full size, for now
2749
      return r_dev.Snapshot(disk.size)
2750
    else:
2751
      _Fail("Cannot find block device %s", disk)
2752
  else:
2753
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2754
          disk.unique_id, disk.dev_type)
2755

    
2756

    
2757
def BlockdevSetInfo(disk, info):
2758
  """Sets 'metadata' information on block devices.
2759

2760
  This function sets 'info' metadata on block devices. Initial
2761
  information is set at device creation; this function should be used
2762
  for example after renames.
2763

2764
  @type disk: L{objects.Disk}
2765
  @param disk: the disk to be grown
2766
  @type info: string
2767
  @param info: new 'info' metadata
2768
  @rtype: (status, result)
2769
  @return: a tuple with the status of the operation (True/False), and
2770
      the errors message if status is False
2771

2772
  """
2773
  r_dev = _RecursiveFindBD(disk)
2774
  if r_dev is None:
2775
    _Fail("Cannot find block device %s", disk)
2776

    
2777
  try:
2778
    r_dev.SetInfo(info)
2779
  except errors.BlockDeviceError, err:
2780
    _Fail("Failed to set information on block device: %s", err, exc=True)
2781

    
2782

    
2783
def FinalizeExport(instance, snap_disks):
2784
  """Write out the export configuration information.
2785

2786
  @type instance: L{objects.Instance}
2787
  @param instance: the instance which we export, used for
2788
      saving configuration
2789
  @type snap_disks: list of L{objects.Disk}
2790
  @param snap_disks: list of snapshot block devices, which
2791
      will be used to get the actual name of the dump file
2792

2793
  @rtype: None
2794

2795
  """
2796
  destdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name + ".new")
2797
  finaldestdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name)
2798

    
2799
  config = objects.SerializableConfigParser()
2800

    
2801
  config.add_section(constants.INISECT_EXP)
2802
  config.set(constants.INISECT_EXP, "version", "0")
2803
  config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2804
  config.set(constants.INISECT_EXP, "source", instance.primary_node)
2805
  config.set(constants.INISECT_EXP, "os", instance.os)
2806
  config.set(constants.INISECT_EXP, "compression", "none")
2807

    
2808
  config.add_section(constants.INISECT_INS)
2809
  config.set(constants.INISECT_INS, "name", instance.name)
2810
  config.set(constants.INISECT_INS, "maxmem", "%d" %
2811
             instance.beparams[constants.BE_MAXMEM])
2812
  config.set(constants.INISECT_INS, "minmem", "%d" %
2813
             instance.beparams[constants.BE_MINMEM])
2814
  # "memory" is deprecated, but useful for exporting to old ganeti versions
2815
  config.set(constants.INISECT_INS, "memory", "%d" %
2816
             instance.beparams[constants.BE_MAXMEM])
2817
  config.set(constants.INISECT_INS, "vcpus", "%d" %
2818
             instance.beparams[constants.BE_VCPUS])
2819
  config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2820
  config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2821
  config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2822

    
2823
  nic_total = 0
2824
  for nic_count, nic in enumerate(instance.nics):
2825
    nic_total += 1
2826
    config.set(constants.INISECT_INS, "nic%d_mac" %
2827
               nic_count, "%s" % nic.mac)
2828
    config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2829
    config.set(constants.INISECT_INS, "nic%d_network" % nic_count,
2830
               "%s" % nic.network)
2831
    for param in constants.NICS_PARAMETER_TYPES:
2832
      config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2833
                 "%s" % nic.nicparams.get(param, None))
2834
  # TODO: redundant: on load can read nics until it doesn't exist
2835
  config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2836

    
2837
  disk_total = 0
2838
  for disk_count, disk in enumerate(snap_disks):
2839
    if disk:
2840
      disk_total += 1
2841
      config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2842
                 ("%s" % disk.iv_name))
2843
      config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2844
                 ("%s" % disk.physical_id[1]))
2845
      config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2846
                 ("%d" % disk.size))
2847

    
2848
  config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2849

    
2850
  # New-style hypervisor/backend parameters
2851

    
2852
  config.add_section(constants.INISECT_HYP)
2853
  for name, value in instance.hvparams.items():
2854
    if name not in constants.HVC_GLOBALS:
2855
      config.set(constants.INISECT_HYP, name, str(value))
2856

    
2857
  config.add_section(constants.INISECT_BEP)
2858
  for name, value in instance.beparams.items():
2859
    config.set(constants.INISECT_BEP, name, str(value))
2860

    
2861
  config.add_section(constants.INISECT_OSP)
2862
  for name, value in instance.osparams.items():
2863
    config.set(constants.INISECT_OSP, name, str(value))
2864

    
2865
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2866
                  data=config.Dumps())
2867
  shutil.rmtree(finaldestdir, ignore_errors=True)
2868
  shutil.move(destdir, finaldestdir)
2869

    
2870

    
2871
def ExportInfo(dest):
2872
  """Get export configuration information.
2873

2874
  @type dest: str
2875
  @param dest: directory containing the export
2876

2877
  @rtype: L{objects.SerializableConfigParser}
2878
  @return: a serializable config file containing the
2879
      export info
2880

2881
  """
2882
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2883

    
2884
  config = objects.SerializableConfigParser()
2885
  config.read(cff)
2886

    
2887
  if (not config.has_section(constants.INISECT_EXP) or
2888
      not config.has_section(constants.INISECT_INS)):
2889
    _Fail("Export info file doesn't have the required fields")
2890

    
2891
  return config.Dumps()
2892

    
2893

    
2894
def ListExports():
2895
  """Return a list of exports currently available on this machine.
2896

2897
  @rtype: list
2898
  @return: list of the exports
2899

2900
  """
2901
  if os.path.isdir(pathutils.EXPORT_DIR):
2902
    return sorted(utils.ListVisibleFiles(pathutils.EXPORT_DIR))
2903
  else:
2904
    _Fail("No exports directory")
2905

    
2906

    
2907
def RemoveExport(export):
2908
  """Remove an existing export from the node.
2909

2910
  @type export: str
2911
  @param export: the name of the export to remove
2912
  @rtype: None
2913

2914
  """
2915
  target = utils.PathJoin(pathutils.EXPORT_DIR, export)
2916

    
2917
  try:
2918
    shutil.rmtree(target)
2919
  except EnvironmentError, err:
2920
    _Fail("Error while removing the export: %s", err, exc=True)
2921

    
2922

    
2923
def BlockdevRename(devlist):
2924
  """Rename a list of block devices.
2925

2926
  @type devlist: list of tuples
2927
  @param devlist: list of tuples of the form  (disk,
2928
      new_logical_id, new_physical_id); disk is an
2929
      L{objects.Disk} object describing the current disk,
2930
      and new logical_id/physical_id is the name we
2931
      rename it to
2932
  @rtype: boolean
2933
  @return: True if all renames succeeded, False otherwise
2934

2935
  """
2936
  msgs = []
2937
  result = True
2938
  for disk, unique_id in devlist:
2939
    dev = _RecursiveFindBD(disk)
2940
    if dev is None:
2941
      msgs.append("Can't find device %s in rename" % str(disk))
2942
      result = False
2943
      continue
2944
    try:
2945
      old_rpath = dev.dev_path
2946
      dev.Rename(unique_id)
2947
      new_rpath = dev.dev_path
2948
      if old_rpath != new_rpath:
2949
        DevCacheManager.RemoveCache(old_rpath)
2950
        # FIXME: we should add the new cache information here, like:
2951
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2952
        # but we don't have the owner here - maybe parse from existing
2953
        # cache? for now, we only lose lvm data when we rename, which
2954
        # is less critical than DRBD or MD
2955
    except errors.BlockDeviceError, err:
2956
      msgs.append("Can't rename device '%s' to '%s': %s" %
2957
                  (dev, unique_id, err))
2958
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2959
      result = False
2960
  if not result:
2961
    _Fail("; ".join(msgs))
2962

    
2963

    
2964
def _TransformFileStorageDir(fs_dir):
2965
  """Checks whether given file_storage_dir is valid.
2966

2967
  Checks wheter the given fs_dir is within the cluster-wide default
2968
  file_storage_dir or the shared_file_storage_dir, which are stored in
2969
  SimpleStore. Only paths under those directories are allowed.
2970

2971
  @type fs_dir: str
2972
  @param fs_dir: the path to check
2973

2974
  @return: the normalized path if valid, None otherwise
2975

2976
  """
2977
  if not (constants.ENABLE_FILE_STORAGE or
2978
          constants.ENABLE_SHARED_FILE_STORAGE):
2979
    _Fail("File storage disabled at configure time")
2980

    
2981
  bdev.CheckFileStoragePath(fs_dir)
2982

    
2983
  return os.path.normpath(fs_dir)
2984

    
2985

    
2986
def CreateFileStorageDir(file_storage_dir):
2987
  """Create file storage directory.
2988

2989
  @type file_storage_dir: str
2990
  @param file_storage_dir: directory to create
2991

2992
  @rtype: tuple
2993
  @return: tuple with first element a boolean indicating wheter dir
2994
      creation was successful or not
2995

2996
  """
2997
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2998
  if os.path.exists(file_storage_dir):
2999
    if not os.path.isdir(file_storage_dir):
3000
      _Fail("Specified storage dir '%s' is not a directory",
3001
            file_storage_dir)
3002
  else:
3003
    try:
3004
      os.makedirs(file_storage_dir, 0750)
3005
    except OSError, err:
3006
      _Fail("Cannot create file storage directory '%s': %s",
3007
            file_storage_dir, err, exc=True)
3008

    
3009

    
3010
def RemoveFileStorageDir(file_storage_dir):
3011
  """Remove file storage directory.
3012

3013
  Remove it only if it's empty. If not log an error and return.
3014

3015
  @type file_storage_dir: str
3016
  @param file_storage_dir: the directory we should cleanup
3017
  @rtype: tuple (success,)
3018
  @return: tuple of one element, C{success}, denoting
3019
      whether the operation was successful
3020

3021
  """
3022
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
3023
  if os.path.exists(file_storage_dir):
3024
    if not os.path.isdir(file_storage_dir):
3025
      _Fail("Specified Storage directory '%s' is not a directory",
3026
            file_storage_dir)
3027
    # deletes dir only if empty, otherwise we want to fail the rpc call
3028
    try:
3029
      os.rmdir(file_storage_dir)
3030
    except OSError, err:
3031
      _Fail("Cannot remove file storage directory '%s': %s",
3032
            file_storage_dir, err)
3033

    
3034

    
3035
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
3036
  """Rename the file storage directory.
3037

3038
  @type old_file_storage_dir: str
3039
  @param old_file_storage_dir: the current path
3040
  @type new_file_storage_dir: str
3041
  @param new_file_storage_dir: the name we should rename to
3042
  @rtype: tuple (success,)
3043
  @return: tuple of one element, C{success}, denoting
3044
      whether the operation was successful
3045

3046
  """
3047
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
3048
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
3049
  if not os.path.exists(new_file_storage_dir):
3050
    if os.path.isdir(old_file_storage_dir):
3051
      try:
3052
        os.rename(old_file_storage_dir, new_file_storage_dir)
3053
      except OSError, err:
3054
        _Fail("Cannot rename '%s' to '%s': %s",
3055
              old_file_storage_dir, new_file_storage_dir, err)
3056
    else:
3057
      _Fail("Specified storage dir '%s' is not a directory",
3058
            old_file_storage_dir)
3059
  else:
3060
    if os.path.exists(old_file_storage_dir):
3061
      _Fail("Cannot rename '%s' to '%s': both locations exist",
3062
            old_file_storage_dir, new_file_storage_dir)
3063

    
3064

    
3065
def _EnsureJobQueueFile(file_name):
3066
  """Checks whether the given filename is in the queue directory.
3067

3068
  @type file_name: str
3069
  @param file_name: the file name we should check
3070
  @rtype: None
3071
  @raises RPCFail: if the file is not valid
3072

3073
  """
3074
  if not utils.IsBelowDir(pathutils.QUEUE_DIR, file_name):
3075
    _Fail("Passed job queue file '%s' does not belong to"
3076
          " the queue directory '%s'", file_name, pathutils.QUEUE_DIR)
3077

    
3078

    
3079
def JobQueueUpdate(file_name, content):
3080
  """Updates a file in the queue directory.
3081

3082
  This is just a wrapper over L{utils.io.WriteFile}, with proper
3083
  checking.
3084

3085
  @type file_name: str
3086
  @param file_name: the job file name
3087
  @type content: str
3088
  @param content: the new job contents
3089
  @rtype: boolean
3090
  @return: the success of the operation
3091

3092
  """
3093
  file_name = vcluster.LocalizeVirtualPath(file_name)
3094

    
3095
  _EnsureJobQueueFile(file_name)
3096
  getents = runtime.GetEnts()
3097

    
3098
  # Write and replace the file atomically
3099
  utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
3100
                  gid=getents.daemons_gid, mode=constants.JOB_QUEUE_FILES_PERMS)
3101

    
3102

    
3103
def JobQueueRename(old, new):
3104
  """Renames a job queue file.
3105

3106
  This is just a wrapper over os.rename with proper checking.
3107

3108
  @type old: str
3109
  @param old: the old (actual) file name
3110
  @type new: str
3111
  @param new: the desired file name
3112
  @rtype: tuple
3113
  @return: the success of the operation and payload
3114

3115
  """
3116
  old = vcluster.LocalizeVirtualPath(old)
3117
  new = vcluster.LocalizeVirtualPath(new)
3118

    
3119
  _EnsureJobQueueFile(old)
3120
  _EnsureJobQueueFile(new)
3121

    
3122
  getents = runtime.GetEnts()
3123

    
3124
  utils.RenameFile(old, new, mkdir=True, mkdir_mode=0750,
3125
                   dir_uid=getents.masterd_uid, dir_gid=getents.daemons_gid)
3126

    
3127

    
3128
def BlockdevClose(instance_name, disks):
3129
  """Closes the given block devices.
3130

3131
  This means they will be switched to secondary mode (in case of
3132
  DRBD).
3133

3134
  @param instance_name: if the argument is not empty, the symlinks
3135
      of this instance will be removed
3136
  @type disks: list of L{objects.Disk}
3137
  @param disks: the list of disks to be closed
3138
  @rtype: tuple (success, message)
3139
  @return: a tuple of success and message, where success
3140
      indicates the succes of the operation, and message
3141
      which will contain the error details in case we
3142
      failed
3143

3144
  """
3145
  bdevs = []
3146
  for cf in disks:
3147
    rd = _RecursiveFindBD(cf)
3148
    if rd is None:
3149
      _Fail("Can't find device %s", cf)
3150
    bdevs.append(rd)
3151

    
3152
  msg = []
3153
  for rd in bdevs:
3154
    try:
3155
      rd.Close()
3156
    except errors.BlockDeviceError, err:
3157
      msg.append(str(err))
3158
  if msg:
3159
    _Fail("Can't make devices secondary: %s", ",".join(msg))
3160
  else:
3161
    if instance_name:
3162
      _RemoveBlockDevLinks(instance_name, disks)
3163

    
3164

    
3165
def ValidateHVParams(hvname, hvparams):
3166
  """Validates the given hypervisor parameters.
3167

3168
  @type hvname: string
3169
  @param hvname: the hypervisor name
3170
  @type hvparams: dict
3171
  @param hvparams: the hypervisor parameters to be validated
3172
  @rtype: None
3173

3174
  """
3175
  try:
3176
    hv_type = hypervisor.GetHypervisor(hvname)
3177
    hv_type.ValidateParameters(hvparams)
3178
  except errors.HypervisorError, err:
3179
    _Fail(str(err), log=False)
3180

    
3181

    
3182
def _CheckOSPList(os_obj, parameters):
3183
  """Check whether a list of parameters is supported by the OS.
3184

3185
  @type os_obj: L{objects.OS}
3186
  @param os_obj: OS object to check
3187
  @type parameters: list
3188
  @param parameters: the list of parameters to check
3189

3190
  """
3191
  supported = [v[0] for v in os_obj.supported_parameters]
3192
  delta = frozenset(parameters).difference(supported)
3193
  if delta:
3194
    _Fail("The following parameters are not supported"
3195
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
3196

    
3197

    
3198
def ValidateOS(required, osname, checks, osparams):
3199
  """Validate the given OS' parameters.
3200

3201
  @type required: boolean
3202
  @param required: whether absence of the OS should translate into
3203
      failure or not
3204
  @type osname: string
3205
  @param osname: the OS to be validated
3206
  @type checks: list
3207
  @param checks: list of the checks to run (currently only 'parameters')
3208
  @type osparams: dict
3209
  @param osparams: dictionary with OS parameters
3210
  @rtype: boolean
3211
  @return: True if the validation passed, or False if the OS was not
3212
      found and L{required} was false
3213

3214
  """
3215
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
3216
    _Fail("Unknown checks required for OS %s: %s", osname,
3217
          set(checks).difference(constants.OS_VALIDATE_CALLS))
3218

    
3219
  name_only = objects.OS.GetName(osname)
3220
  status, tbv = _TryOSFromDisk(name_only, None)
3221

    
3222
  if not status:
3223
    if required:
3224
      _Fail(tbv)
3225
    else:
3226
      return False
3227

    
3228
  if max(tbv.api_versions) < constants.OS_API_V20:
3229
    return True
3230

    
3231
  if constants.OS_VALIDATE_PARAMETERS in checks:
3232
    _CheckOSPList(tbv, osparams.keys())
3233

    
3234
  validate_env = OSCoreEnv(osname, tbv, osparams)
3235
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
3236
                        cwd=tbv.path, reset_env=True)
3237
  if result.failed:
3238
    logging.error("os validate command '%s' returned error: %s output: %s",
3239
                  result.cmd, result.fail_reason, result.output)
3240
    _Fail("OS validation script failed (%s), output: %s",
3241
          result.fail_reason, result.output, log=False)
3242

    
3243
  return True
3244

    
3245

    
3246
def DemoteFromMC():
3247
  """Demotes the current node from master candidate role.
3248

3249
  """
3250
  # try to ensure we're not the master by mistake
3251
  master, myself = ssconf.GetMasterAndMyself()
3252
  if master == myself:
3253
    _Fail("ssconf status shows I'm the master node, will not demote")
3254

    
3255
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "check", constants.MASTERD])
3256
  if not result.failed:
3257
    _Fail("The master daemon is running, will not demote")
3258

    
3259
  try:
3260
    if os.path.isfile(pathutils.CLUSTER_CONF_FILE):
3261
      utils.CreateBackup(pathutils.CLUSTER_CONF_FILE)
3262
  except EnvironmentError, err:
3263
    if err.errno != errno.ENOENT:
3264
      _Fail("Error while backing up cluster file: %s", err, exc=True)
3265

    
3266
  utils.RemoveFile(pathutils.CLUSTER_CONF_FILE)
3267

    
3268

    
3269
def _GetX509Filenames(cryptodir, name):
3270
  """Returns the full paths for the private key and certificate.
3271

3272
  """
3273
  return (utils.PathJoin(cryptodir, name),
3274
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
3275
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
3276

    
3277

    
3278
def CreateX509Certificate(validity, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3279
  """Creates a new X509 certificate for SSL/TLS.
3280

3281
  @type validity: int
3282
  @param validity: Validity in seconds
3283
  @rtype: tuple; (string, string)
3284
  @return: Certificate name and public part
3285

3286
  """
3287
  (key_pem, cert_pem) = \
3288
    utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
3289
                                     min(validity, _MAX_SSL_CERT_VALIDITY))
3290

    
3291
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
3292
                              prefix="x509-%s-" % utils.TimestampForFilename())
3293
  try:
3294
    name = os.path.basename(cert_dir)
3295
    assert len(name) > 5
3296

    
3297
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3298

    
3299
    utils.WriteFile(key_file, mode=0400, data=key_pem)
3300
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
3301

    
3302
    # Never return private key as it shouldn't leave the node
3303
    return (name, cert_pem)
3304
  except Exception:
3305
    shutil.rmtree(cert_dir, ignore_errors=True)
3306
    raise
3307

    
3308

    
3309
def RemoveX509Certificate(name, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3310
  """Removes a X509 certificate.
3311

3312
  @type name: string
3313
  @param name: Certificate name
3314

3315
  """
3316
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3317

    
3318
  utils.RemoveFile(key_file)
3319
  utils.RemoveFile(cert_file)
3320

    
3321
  try:
3322
    os.rmdir(cert_dir)
3323
  except EnvironmentError, err:
3324
    _Fail("Cannot remove certificate directory '%s': %s",
3325
          cert_dir, err)
3326

    
3327

    
3328
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3329
  """Returns the command for the requested input/output.
3330

3331
  @type instance: L{objects.Instance}
3332
  @param instance: The instance object
3333
  @param mode: Import/export mode
3334
  @param ieio: Input/output type
3335
  @param ieargs: Input/output arguments
3336

3337
  """
3338
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3339

    
3340
  env = None
3341
  prefix = None
3342
  suffix = None
3343
  exp_size = None
3344

    
3345
  if ieio == constants.IEIO_FILE:
3346
    (filename, ) = ieargs
3347

    
3348
    if not utils.IsNormAbsPath(filename):
3349
      _Fail("Path '%s' is not normalized or absolute", filename)
3350

    
3351
    real_filename = os.path.realpath(filename)
3352
    directory = os.path.dirname(real_filename)
3353

    
3354
    if not utils.IsBelowDir(pathutils.EXPORT_DIR, real_filename):
3355
      _Fail("File '%s' is not under exports directory '%s': %s",
3356
            filename, pathutils.EXPORT_DIR, real_filename)
3357

    
3358
    # Create directory
3359
    utils.Makedirs(directory, mode=0750)
3360

    
3361
    quoted_filename = utils.ShellQuote(filename)
3362

    
3363
    if mode == constants.IEM_IMPORT:
3364
      suffix = "> %s" % quoted_filename
3365
    elif mode == constants.IEM_EXPORT:
3366
      suffix = "< %s" % quoted_filename
3367

    
3368
      # Retrieve file size
3369
      try:
3370
        st = os.stat(filename)
3371
      except EnvironmentError, err:
3372
        logging.error("Can't stat(2) %s: %s", filename, err)
3373
      else:
3374
        exp_size = utils.BytesToMebibyte(st.st_size)
3375

    
3376
  elif ieio == constants.IEIO_RAW_DISK:
3377
    (disk, ) = ieargs
3378

    
3379
    real_disk = _OpenRealBD(disk)
3380

    
3381
    if mode == constants.IEM_IMPORT:
3382
      # we set here a smaller block size as, due to transport buffering, more
3383
      # than 64-128k will mostly ignored; we use nocreat to fail if the device
3384
      # is not already there or we pass a wrong path; we use notrunc to no
3385
      # attempt truncate on an LV device; we use oflag=dsync to not buffer too
3386
      # much memory; this means that at best, we flush every 64k, which will
3387
      # not be very fast
3388
      suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
3389
                                    " bs=%s oflag=dsync"),
3390
                                    real_disk.dev_path,
3391
                                    str(64 * 1024))
3392

    
3393
    elif mode == constants.IEM_EXPORT:
3394
      # the block size on the read dd is 1MiB to match our units
3395
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3396
                                   real_disk.dev_path,
3397
                                   str(1024 * 1024), # 1 MB
3398
                                   str(disk.size))
3399
      exp_size = disk.size
3400

    
3401
  elif ieio == constants.IEIO_SCRIPT:
3402
    (disk, disk_index, ) = ieargs
3403

    
3404
    assert isinstance(disk_index, (int, long))
3405

    
3406
    real_disk = _OpenRealBD(disk)
3407

    
3408
    inst_os = OSFromDisk(instance.os)
3409
    env = OSEnvironment(instance, inst_os)
3410

    
3411
    if mode == constants.IEM_IMPORT:
3412
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3413
      env["IMPORT_INDEX"] = str(disk_index)
3414
      script = inst_os.import_script
3415

    
3416
    elif mode == constants.IEM_EXPORT:
3417
      env["EXPORT_DEVICE"] = real_disk.dev_path
3418
      env["EXPORT_INDEX"] = str(disk_index)
3419
      script = inst_os.export_script
3420

    
3421
    # TODO: Pass special environment only to script
3422
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3423

    
3424
    if mode == constants.IEM_IMPORT:
3425
      suffix = "| %s" % script_cmd
3426

    
3427
    elif mode == constants.IEM_EXPORT:
3428
      prefix = "%s |" % script_cmd
3429

    
3430
    # Let script predict size
3431
    exp_size = constants.IE_CUSTOM_SIZE
3432

    
3433
  else:
3434
    _Fail("Invalid %s I/O mode %r", mode, ieio)
3435

    
3436
  return (env, prefix, suffix, exp_size)
3437

    
3438

    
3439
def _CreateImportExportStatusDir(prefix):
3440
  """Creates status directory for import/export.
3441

3442
  """
3443
  return tempfile.mkdtemp(dir=pathutils.IMPORT_EXPORT_DIR,
3444
                          prefix=("%s-%s-" %
3445
                                  (prefix, utils.TimestampForFilename())))
3446

    
3447

    
3448
def StartImportExportDaemon(mode, opts, host, port, instance, component,
3449
                            ieio, ieioargs):
3450
  """Starts an import or export daemon.
3451

3452
  @param mode: Import/output mode
3453
  @type opts: L{objects.ImportExportOptions}
3454
  @param opts: Daemon options
3455
  @type host: string
3456
  @param host: Remote host for export (None for import)
3457
  @type port: int
3458
  @param port: Remote port for export (None for import)
3459
  @type instance: L{objects.Instance}
3460
  @param instance: Instance object
3461
  @type component: string
3462
  @param component: which part of the instance is transferred now,
3463
      e.g. 'disk/0'
3464
  @param ieio: Input/output type
3465
  @param ieioargs: Input/output arguments
3466

3467
  """
3468
  if mode == constants.IEM_IMPORT:
3469
    prefix = "import"
3470

    
3471
    if not (host is None and port is None):
3472
      _Fail("Can not specify host or port on import")
3473

    
3474
  elif mode == constants.IEM_EXPORT:
3475
    prefix = "export"
3476

    
3477
    if host is None or port is None:
3478
      _Fail("Host and port must be specified for an export")
3479

    
3480
  else:
3481
    _Fail("Invalid mode %r", mode)
3482

    
3483
  if (opts.key_name is None) ^ (opts.ca_pem is None):
3484
    _Fail("Cluster certificate can only be used for both key and CA")
3485

    
3486
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3487
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3488

    
3489
  if opts.key_name is None:
3490
    # Use server.pem
3491
    key_path = pathutils.NODED_CERT_FILE
3492
    cert_path = pathutils.NODED_CERT_FILE
3493
    assert opts.ca_pem is None
3494
  else:
3495
    (_, key_path, cert_path) = _GetX509Filenames(pathutils.CRYPTO_KEYS_DIR,
3496
                                                 opts.key_name)
3497
    assert opts.ca_pem is not None
3498

    
3499
  for i in [key_path, cert_path]:
3500
    if not os.path.exists(i):
3501
      _Fail("File '%s' does not exist" % i)
3502

    
3503
  status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3504
  try:
3505
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3506
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3507
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3508

    
3509
    if opts.ca_pem is None:
3510
      # Use server.pem
3511
      ca = utils.ReadFile(pathutils.NODED_CERT_FILE)
3512
    else:
3513
      ca = opts.ca_pem
3514

    
3515
    # Write CA file
3516
    utils.WriteFile(ca_file, data=ca, mode=0400)
3517

    
3518
    cmd = [
3519
      pathutils.IMPORT_EXPORT_DAEMON,
3520
      status_file, mode,
3521
      "--key=%s" % key_path,
3522
      "--cert=%s" % cert_path,
3523
      "--ca=%s" % ca_file,
3524
      ]
3525

    
3526
    if host:
3527
      cmd.append("--host=%s" % host)
3528

    
3529
    if port:
3530
      cmd.append("--port=%s" % port)
3531

    
3532
    if opts.ipv6:
3533
      cmd.append("--ipv6")
3534
    else:
3535
      cmd.append("--ipv4")
3536

    
3537
    if opts.compress:
3538
      cmd.append("--compress=%s" % opts.compress)
3539

    
3540
    if opts.magic:
3541
      cmd.append("--magic=%s" % opts.magic)
3542

    
3543
    if exp_size is not None:
3544
      cmd.append("--expected-size=%s" % exp_size)
3545

    
3546
    if cmd_prefix:
3547
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
3548

    
3549
    if cmd_suffix:
3550
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
3551

    
3552
    if mode == constants.IEM_EXPORT:
3553
      # Retry connection a few times when connecting to remote peer
3554
      cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3555
      cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3556
    elif opts.connect_timeout is not None:
3557
      assert mode == constants.IEM_IMPORT
3558
      # Overall timeout for establishing connection while listening
3559
      cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3560

    
3561
    logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3562

    
3563
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3564
    # support for receiving a file descriptor for output
3565
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3566
                      output=logfile)
3567

    
3568
    # The import/export name is simply the status directory name
3569
    return os.path.basename(status_dir)
3570

    
3571
  except Exception:
3572
    shutil.rmtree(status_dir, ignore_errors=True)
3573
    raise
3574

    
3575

    
3576
def GetImportExportStatus(names):
3577
  """Returns import/export daemon status.
3578

3579
  @type names: sequence
3580
  @param names: List of names
3581
  @rtype: List of dicts
3582
  @return: Returns a list of the state of each named import/export or None if a
3583
           status couldn't be read
3584

3585
  """
3586
  result = []
3587

    
3588
  for name in names:
3589
    status_file = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name,
3590
                                 _IES_STATUS_FILE)
3591

    
3592
    try:
3593
      data = utils.ReadFile(status_file)
3594
    except EnvironmentError, err:
3595
      if err.errno != errno.ENOENT:
3596
        raise
3597
      data = None
3598

    
3599
    if not data:
3600
      result.append(None)
3601
      continue
3602

    
3603
    result.append(serializer.LoadJson(data))
3604

    
3605
  return result
3606

    
3607

    
3608
def AbortImportExport(name):
3609
  """Sends SIGTERM to a running import/export daemon.
3610

3611
  """
3612
  logging.info("Abort import/export %s", name)
3613

    
3614
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3615
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3616

    
3617
  if pid:
3618
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3619
                 name, pid)
3620
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3621

    
3622

    
3623
def CleanupImportExport(name):
3624
  """Cleanup after an import or export.
3625

3626
  If the import/export daemon is still running it's killed. Afterwards the
3627
  whole status directory is removed.
3628

3629
  """
3630
  logging.info("Finalizing import/export %s", name)
3631

    
3632
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3633

    
3634
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3635

    
3636
  if pid:
3637
    logging.info("Import/export %s is still running with PID %s",
3638
                 name, pid)
3639
    utils.KillProcess(pid, waitpid=False)
3640

    
3641
  shutil.rmtree(status_dir, ignore_errors=True)
3642

    
3643

    
3644
def _FindDisks(nodes_ip, disks):
3645
  """Sets the physical ID on disks and returns the block devices.
3646

3647
  """
3648
  # set the correct physical ID
3649
  my_name = netutils.Hostname.GetSysName()
3650
  for cf in disks:
3651
    cf.SetPhysicalID(my_name, nodes_ip)
3652

    
3653
  bdevs = []
3654

    
3655
  for cf in disks:
3656
    rd = _RecursiveFindBD(cf)
3657
    if rd is None:
3658
      _Fail("Can't find device %s", cf)
3659
    bdevs.append(rd)
3660
  return bdevs
3661

    
3662

    
3663
def DrbdDisconnectNet(nodes_ip, disks):
3664
  """Disconnects the network on a list of drbd devices.
3665

3666
  """
3667
  bdevs = _FindDisks(nodes_ip, disks)
3668

    
3669
  # disconnect disks
3670
  for rd in bdevs:
3671
    try:
3672
      rd.DisconnectNet()
3673
    except errors.BlockDeviceError, err:
3674
      _Fail("Can't change network configuration to standalone mode: %s",
3675
            err, exc=True)
3676

    
3677

    
3678
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3679
  """Attaches the network on a list of drbd devices.
3680

3681
  """
3682
  bdevs = _FindDisks(nodes_ip, disks)
3683

    
3684
  if multimaster:
3685
    for idx, rd in enumerate(bdevs):
3686
      try:
3687
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3688
      except EnvironmentError, err:
3689
        _Fail("Can't create symlink: %s", err)
3690
  # reconnect disks, switch to new master configuration and if
3691
  # needed primary mode
3692
  for rd in bdevs:
3693
    try:
3694
      rd.AttachNet(multimaster)
3695
    except errors.BlockDeviceError, err:
3696
      _Fail("Can't change network configuration: %s", err)
3697

    
3698
  # wait until the disks are connected; we need to retry the re-attach
3699
  # if the device becomes standalone, as this might happen if the one
3700
  # node disconnects and reconnects in a different mode before the
3701
  # other node reconnects; in this case, one or both of the nodes will
3702
  # decide it has wrong configuration and switch to standalone
3703

    
3704
  def _Attach():
3705
    all_connected = True
3706

    
3707
    for rd in bdevs:
3708
      stats = rd.GetProcStatus()
3709

    
3710
      all_connected = (all_connected and
3711
                       (stats.is_connected or stats.is_in_resync))
3712

    
3713
      if stats.is_standalone:
3714
        # peer had different config info and this node became
3715
        # standalone, even though this should not happen with the
3716
        # new staged way of changing disk configs
3717
        try:
3718
          rd.AttachNet(multimaster)
3719
        except errors.BlockDeviceError, err:
3720
          _Fail("Can't change network configuration: %s", err)
3721

    
3722
    if not all_connected:
3723
      raise utils.RetryAgain()
3724

    
3725
  try:
3726
    # Start with a delay of 100 miliseconds and go up to 5 seconds
3727
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3728
  except utils.RetryTimeout:
3729
    _Fail("Timeout in disk reconnecting")
3730

    
3731
  if multimaster:
3732
    # change to primary mode
3733
    for rd in bdevs:
3734
      try:
3735
        rd.Open()
3736
      except errors.BlockDeviceError, err:
3737
        _Fail("Can't change to primary mode: %s", err)
3738

    
3739

    
3740
def DrbdWaitSync(nodes_ip, disks):
3741
  """Wait until DRBDs have synchronized.
3742

3743
  """
3744
  def _helper(rd):
3745
    stats = rd.GetProcStatus()
3746
    if not (stats.is_connected or stats.is_in_resync):
3747
      raise utils.RetryAgain()
3748
    return stats
3749

    
3750
  bdevs = _FindDisks(nodes_ip, disks)
3751

    
3752
  min_resync = 100
3753
  alldone = True
3754
  for rd in bdevs:
3755
    try:
3756
      # poll each second for 15 seconds
3757
      stats = utils.Retry(_helper, 1, 15, args=[rd])
3758
    except utils.RetryTimeout:
3759
      stats = rd.GetProcStatus()
3760
      # last check
3761
      if not (stats.is_connected or stats.is_in_resync):
3762
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3763
    alldone = alldone and (not stats.is_in_resync)
3764
    if stats.sync_percent is not None:
3765
      min_resync = min(min_resync, stats.sync_percent)
3766

    
3767
  return (alldone, min_resync)
3768

    
3769

    
3770
def GetDrbdUsermodeHelper():
3771
  """Returns DRBD usermode helper currently configured.
3772

3773
  """
3774
  try:
3775
    return drbd.DRBD8.GetUsermodeHelper()
3776
  except errors.BlockDeviceError, err:
3777
    _Fail(str(err))
3778

    
3779

    
3780
def PowercycleNode(hypervisor_type):
3781
  """Hard-powercycle the node.
3782

3783
  Because we need to return first, and schedule the powercycle in the
3784
  background, we won't be able to report failures nicely.
3785

3786
  """
3787
  hyper = hypervisor.GetHypervisor(hypervisor_type)
3788
  try:
3789
    pid = os.fork()
3790
  except OSError:
3791
    # if we can't fork, we'll pretend that we're in the child process
3792
    pid = 0
3793
  if pid > 0:
3794
    return "Reboot scheduled in 5 seconds"
3795
  # ensure the child is running on ram
3796
  try:
3797
    utils.Mlockall()
3798
  except Exception: # pylint: disable=W0703
3799
    pass
3800
  time.sleep(5)
3801
  hyper.PowercycleNode()
3802

    
3803

    
3804
def _VerifyRestrictedCmdName(cmd):
3805
  """Verifies a restricted command name.
3806

3807
  @type cmd: string
3808
  @param cmd: Command name
3809
  @rtype: tuple; (boolean, string or None)
3810
  @return: The tuple's first element is the status; if C{False}, the second
3811
    element is an error message string, otherwise it's C{None}
3812

3813
  """
3814
  if not cmd.strip():
3815
    return (False, "Missing command name")
3816

    
3817
  if os.path.basename(cmd) != cmd:
3818
    return (False, "Invalid command name")
3819

    
3820
  if not constants.EXT_PLUGIN_MASK.match(cmd):
3821
    return (False, "Command name contains forbidden characters")
3822

    
3823
  return (True, None)
3824

    
3825

    
3826
def _CommonRestrictedCmdCheck(path, owner):
3827
  """Common checks for restricted command file system directories and files.
3828

3829
  @type path: string
3830
  @param path: Path to check
3831
  @param owner: C{None} or tuple containing UID and GID
3832
  @rtype: tuple; (boolean, string or C{os.stat} result)
3833
  @return: The tuple's first element is the status; if C{False}, the second
3834
    element is an error message string, otherwise it's the result of C{os.stat}
3835

3836
  """
3837
  if owner is None:
3838
    # Default to root as owner
3839
    owner = (0, 0)
3840

    
3841
  try:
3842
    st = os.stat(path)
3843
  except EnvironmentError, err:
3844
    return (False, "Can't stat(2) '%s': %s" % (path, err))
3845

    
3846
  if stat.S_IMODE(st.st_mode) & (~_RCMD_MAX_MODE):
3847
    return (False, "Permissions on '%s' are too permissive" % path)
3848

    
3849
  if (st.st_uid, st.st_gid) != owner:
3850
    (owner_uid, owner_gid) = owner
3851
    return (False, "'%s' is not owned by %s:%s" % (path, owner_uid, owner_gid))
3852

    
3853
  return (True, st)
3854

    
3855

    
3856
def _VerifyRestrictedCmdDirectory(path, _owner=None):
3857
  """Verifies restricted command directory.
3858

3859
  @type path: string
3860
  @param path: Path to check
3861
  @rtype: tuple; (boolean, string or None)
3862
  @return: The tuple's first element is the status; if C{False}, the second
3863
    element is an error message string, otherwise it's C{None}
3864

3865
  """
3866
  (status, value) = _CommonRestrictedCmdCheck(path, _owner)
3867

    
3868
  if not status:
3869
    return (False, value)
3870

    
3871
  if not stat.S_ISDIR(value.st_mode):
3872
    return (False, "Path '%s' is not a directory" % path)
3873

    
3874
  return (True, None)
3875

    
3876

    
3877
def _VerifyRestrictedCmd(path, cmd, _owner=None):
3878
  """Verifies a whole restricted command and returns its executable filename.
3879

3880
  @type path: string
3881
  @param path: Directory containing restricted commands
3882
  @type cmd: string
3883
  @param cmd: Command name
3884
  @rtype: tuple; (boolean, string)
3885
  @return: The tuple's first element is the status; if C{False}, the second
3886
    element is an error message string, otherwise the second element is the
3887
    absolute path to the executable
3888

3889
  """
3890
  executable = utils.PathJoin(path, cmd)
3891

    
3892
  (status, msg) = _CommonRestrictedCmdCheck(executable, _owner)
3893

    
3894
  if not status:
3895
    return (False, msg)
3896

    
3897
  if not utils.IsExecutable(executable):
3898
    return (False, "access(2) thinks '%s' can't be executed" % executable)
3899

    
3900
  return (True, executable)
3901

    
3902

    
3903
def _PrepareRestrictedCmd(path, cmd,
3904
                          _verify_dir=_VerifyRestrictedCmdDirectory,
3905
                          _verify_name=_VerifyRestrictedCmdName,
3906
                          _verify_cmd=_VerifyRestrictedCmd):
3907
  """Performs a number of tests on a restricted command.
3908

3909
  @type path: string
3910
  @param path: Directory containing restricted commands
3911
  @type cmd: string
3912
  @param cmd: Command name
3913
  @return: Same as L{_VerifyRestrictedCmd}
3914

3915
  """
3916
  # Verify the directory first
3917
  (status, msg) = _verify_dir(path)
3918
  if status:
3919
    # Check command if everything was alright
3920
    (status, msg) = _verify_name(cmd)
3921

    
3922
  if not status:
3923
    return (False, msg)
3924

    
3925
  # Check actual executable
3926
  return _verify_cmd(path, cmd)
3927

    
3928

    
3929
def RunRestrictedCmd(cmd,
3930
                     _lock_timeout=_RCMD_LOCK_TIMEOUT,
3931
                     _lock_file=pathutils.RESTRICTED_COMMANDS_LOCK_FILE,
3932
                     _path=pathutils.RESTRICTED_COMMANDS_DIR,
3933
                     _sleep_fn=time.sleep,
3934
                     _prepare_fn=_PrepareRestrictedCmd,
3935
                     _runcmd_fn=utils.RunCmd,
3936
                     _enabled=constants.ENABLE_RESTRICTED_COMMANDS):
3937
  """Executes a restricted command after performing strict tests.
3938

3939
  @type cmd: string
3940
  @param cmd: Command name
3941
  @rtype: string
3942
  @return: Command output
3943
  @raise RPCFail: In case of an error
3944

3945
  """
3946
  logging.info("Preparing to run restricted command '%s'", cmd)
3947

    
3948
  if not _enabled:
3949
    _Fail("Restricted commands disabled at configure time")
3950

    
3951
  lock = None
3952
  try:
3953
    cmdresult = None
3954
    try:
3955
      lock = utils.FileLock.Open(_lock_file)
3956
      lock.Exclusive(blocking=True, timeout=_lock_timeout)
3957

    
3958
      (status, value) = _prepare_fn(_path, cmd)
3959

    
3960
      if status:
3961
        cmdresult = _runcmd_fn([value], env={}, reset_env=True,
3962
                               postfork_fn=lambda _: lock.Unlock())
3963
      else:
3964
        logging.error(value)
3965
    except Exception: # pylint: disable=W0703
3966
      # Keep original error in log
3967
      logging.exception("Caught exception")
3968

    
3969
    if cmdresult is None:
3970
      logging.info("Sleeping for %0.1f seconds before returning",
3971
                   _RCMD_INVALID_DELAY)
3972
      _sleep_fn(_RCMD_INVALID_DELAY)
3973

    
3974
      # Do not include original error message in returned error
3975
      _Fail("Executing command '%s' failed" % cmd)
3976
    elif cmdresult.failed or cmdresult.fail_reason:
3977
      _Fail("Restricted command '%s' failed: %s; output: %s",
3978
            cmd, cmdresult.fail_reason, cmdresult.output)
3979
    else:
3980
      return cmdresult.output
3981
  finally:
3982
    if lock is not None:
3983
      # Release lock at last
3984
      lock.Close()
3985
      lock = None
3986

    
3987

    
3988
def SetWatcherPause(until, _filename=pathutils.WATCHER_PAUSEFILE):
3989
  """Creates or removes the watcher pause file.
3990

3991
  @type until: None or number
3992
  @param until: Unix timestamp saying until when the watcher shouldn't run
3993

3994
  """
3995
  if until is None:
3996
    logging.info("Received request to no longer pause watcher")
3997
    utils.RemoveFile(_filename)
3998
  else:
3999
    logging.info("Received request to pause watcher until %s", until)
4000

    
4001
    if not ht.TNumber(until):
4002
      _Fail("Duration must be numeric")
4003

    
4004
    utils.WriteFile(_filename, data="%d\n" % (until, ), mode=0644)
4005

    
4006

    
4007
class HooksRunner(object):
4008
  """Hook runner.
4009

4010
  This class is instantiated on the node side (ganeti-noded) and not
4011
  on the master side.
4012

4013
  """
4014
  def __init__(self, hooks_base_dir=None):
4015
    """Constructor for hooks runner.
4016

4017
    @type hooks_base_dir: str or None
4018
    @param hooks_base_dir: if not None, this overrides the
4019
        L{pathutils.HOOKS_BASE_DIR} (useful for unittests)
4020

4021
    """
4022
    if hooks_base_dir is None:
4023
      hooks_base_dir = pathutils.HOOKS_BASE_DIR
4024
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
4025
    # constant
4026
    self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
4027

    
4028
  def RunLocalHooks(self, node_list, hpath, phase, env):
4029
    """Check that the hooks will be run only locally and then run them.
4030

4031
    """
4032
    assert len(node_list) == 1
4033
    node = node_list[0]
4034
    _, myself = ssconf.GetMasterAndMyself()
4035
    assert node == myself
4036

    
4037
    results = self.RunHooks(hpath, phase, env)
4038

    
4039
    # Return values in the form expected by HooksMaster
4040
    return {node: (None, False, results)}
4041

    
4042
  def RunHooks(self, hpath, phase, env):
4043
    """Run the scripts in the hooks directory.
4044

4045
    @type hpath: str
4046
    @param hpath: the path to the hooks directory which
4047
        holds the scripts
4048
    @type phase: str
4049
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
4050
        L{constants.HOOKS_PHASE_POST}
4051
    @type env: dict
4052
    @param env: dictionary with the environment for the hook
4053
    @rtype: list
4054
    @return: list of 3-element tuples:
4055
      - script path
4056
      - script result, either L{constants.HKR_SUCCESS} or
4057
        L{constants.HKR_FAIL}
4058
      - output of the script
4059

4060
    @raise errors.ProgrammerError: for invalid input
4061
        parameters
4062

4063
    """
4064
    if phase == constants.HOOKS_PHASE_PRE:
4065
      suffix = "pre"
4066
    elif phase == constants.HOOKS_PHASE_POST:
4067
      suffix = "post"
4068
    else:
4069
      _Fail("Unknown hooks phase '%s'", phase)
4070

    
4071
    subdir = "%s-%s.d" % (hpath, suffix)
4072
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
4073

    
4074
    results = []
4075

    
4076
    if not os.path.isdir(dir_name):
4077
      # for non-existing/non-dirs, we simply exit instead of logging a
4078
      # warning at every operation
4079
      return results
4080

    
4081
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
4082

    
4083
    for (relname, relstatus, runresult) in runparts_results:
4084
      if relstatus == constants.RUNPARTS_SKIP:
4085
        rrval = constants.HKR_SKIP
4086
        output = ""
4087
      elif relstatus == constants.RUNPARTS_ERR:
4088
        rrval = constants.HKR_FAIL
4089
        output = "Hook script execution error: %s" % runresult
4090
      elif relstatus == constants.RUNPARTS_RUN:
4091
        if runresult.failed:
4092
          rrval = constants.HKR_FAIL
4093
        else:
4094
          rrval = constants.HKR_SUCCESS
4095
        output = utils.SafeEncode(runresult.output.strip())
4096
      results.append(("%s/%s" % (subdir, relname), rrval, output))
4097

    
4098
    return results
4099

    
4100

    
4101
class IAllocatorRunner(object):
4102
  """IAllocator runner.
4103

4104
  This class is instantiated on the node side (ganeti-noded) and not on
4105
  the master side.
4106

4107
  """
4108
  @staticmethod
4109
  def Run(name, idata):
4110
    """Run an iallocator script.
4111

4112
    @type name: str
4113
    @param name: the iallocator script name
4114
    @type idata: str
4115
    @param idata: the allocator input data
4116

4117
    @rtype: tuple
4118
    @return: two element tuple of:
4119
       - status
4120
       - either error message or stdout of allocator (for success)
4121

4122
    """
4123
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
4124
                                  os.path.isfile)
4125
    if alloc_script is None:
4126
      _Fail("iallocator module '%s' not found in the search path", name)
4127

    
4128
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
4129
    try:
4130
      os.write(fd, idata)
4131
      os.close(fd)
4132
      result = utils.RunCmd([alloc_script, fin_name])
4133
      if result.failed:
4134
        _Fail("iallocator module '%s' failed: %s, output '%s'",
4135
              name, result.fail_reason, result.output)
4136
    finally:
4137
      os.unlink(fin_name)
4138

    
4139
    return result.stdout
4140

    
4141

    
4142
class DevCacheManager(object):
4143
  """Simple class for managing a cache of block device information.
4144

4145
  """
4146
  _DEV_PREFIX = "/dev/"
4147
  _ROOT_DIR = pathutils.BDEV_CACHE_DIR
4148

    
4149
  @classmethod
4150
  def _ConvertPath(cls, dev_path):
4151
    """Converts a /dev/name path to the cache file name.
4152

4153
    This replaces slashes with underscores and strips the /dev
4154
    prefix. It then returns the full path to the cache file.
4155

4156
    @type dev_path: str
4157
    @param dev_path: the C{/dev/} path name
4158
    @rtype: str
4159
    @return: the converted path name
4160

4161
    """
4162
    if dev_path.startswith(cls._DEV_PREFIX):
4163
      dev_path = dev_path[len(cls._DEV_PREFIX):]
4164
    dev_path = dev_path.replace("/", "_")
4165
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
4166
    return fpath
4167

    
4168
  @classmethod
4169
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
4170
    """Updates the cache information for a given device.
4171

4172
    @type dev_path: str
4173
    @param dev_path: the pathname of the device
4174
    @type owner: str
4175
    @param owner: the owner (instance name) of the device
4176
    @type on_primary: bool
4177
    @param on_primary: whether this is the primary
4178
        node nor not
4179
    @type iv_name: str
4180
    @param iv_name: the instance-visible name of the
4181
        device, as in objects.Disk.iv_name
4182

4183
    @rtype: None
4184

4185
    """
4186
    if dev_path is None:
4187
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
4188
      return
4189
    fpath = cls._ConvertPath(dev_path)
4190
    if on_primary:
4191
      state = "primary"
4192
    else:
4193
      state = "secondary"
4194
    if iv_name is None:
4195
      iv_name = "not_visible"
4196
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
4197
    try:
4198
      utils.WriteFile(fpath, data=fdata)
4199
    except EnvironmentError, err:
4200
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
4201

    
4202
  @classmethod
4203
  def RemoveCache(cls, dev_path):
4204
    """Remove data for a dev_path.
4205

4206
    This is just a wrapper over L{utils.io.RemoveFile} with a converted
4207
    path name and logging.
4208

4209
    @type dev_path: str
4210
    @param dev_path: the pathname of the device
4211

4212
    @rtype: None
4213

4214
    """
4215
    if dev_path is None:
4216
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
4217
      return
4218
    fpath = cls._ConvertPath(dev_path)
4219
    try:
4220
      utils.RemoveFile(fpath)
4221
    except EnvironmentError, err:
4222
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)