Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 235a6b29

History | View | Annotate | Download (135.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable=E1103
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37

    
38
import os
39
import os.path
40
import shutil
41
import time
42
import stat
43
import errno
44
import re
45
import random
46
import logging
47
import tempfile
48
import zlib
49
import base64
50
import signal
51

    
52
from ganeti import errors
53
from ganeti import utils
54
from ganeti import ssh
55
from ganeti import hypervisor
56
from ganeti import constants
57
from ganeti.storage import bdev
58
from ganeti.storage import drbd
59
from ganeti.storage import filestorage
60
from ganeti import objects
61
from ganeti import ssconf
62
from ganeti import serializer
63
from ganeti import netutils
64
from ganeti import runtime
65
from ganeti import compat
66
from ganeti import pathutils
67
from ganeti import vcluster
68
from ganeti import ht
69
from ganeti.storage.base import BlockDev
70
from ganeti.storage.drbd import DRBD8
71
from ganeti import hooksmaster
72

    
73

    
74
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
75
_ALLOWED_CLEAN_DIRS = compat.UniqueFrozenset([
76
  pathutils.DATA_DIR,
77
  pathutils.JOB_QUEUE_ARCHIVE_DIR,
78
  pathutils.QUEUE_DIR,
79
  pathutils.CRYPTO_KEYS_DIR,
80
  ])
81
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
82
_X509_KEY_FILE = "key"
83
_X509_CERT_FILE = "cert"
84
_IES_STATUS_FILE = "status"
85
_IES_PID_FILE = "pid"
86
_IES_CA_FILE = "ca"
87

    
88
#: Valid LVS output line regex
89
_LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$")
90

    
91
# Actions for the master setup script
92
_MASTER_START = "start"
93
_MASTER_STOP = "stop"
94

    
95
#: Maximum file permissions for restricted command directory and executables
96
_RCMD_MAX_MODE = (stat.S_IRWXU |
97
                  stat.S_IRGRP | stat.S_IXGRP |
98
                  stat.S_IROTH | stat.S_IXOTH)
99

    
100
#: Delay before returning an error for restricted commands
101
_RCMD_INVALID_DELAY = 10
102

    
103
#: How long to wait to acquire lock for restricted commands (shorter than
104
#: L{_RCMD_INVALID_DELAY}) to reduce blockage of noded forks when many
105
#: command requests arrive
106
_RCMD_LOCK_TIMEOUT = _RCMD_INVALID_DELAY * 0.8
107

    
108

    
109
class RPCFail(Exception):
110
  """Class denoting RPC failure.
111

112
  Its argument is the error message.
113

114
  """
115

    
116

    
117
def _GetInstReasonFilename(instance_name):
118
  """Path of the file containing the reason of the instance status change.
119

120
  @type instance_name: string
121
  @param instance_name: The name of the instance
122
  @rtype: string
123
  @return: The path of the file
124

125
  """
126
  return utils.PathJoin(pathutils.INSTANCE_REASON_DIR, instance_name)
127

    
128

    
129
def _StoreInstReasonTrail(instance_name, trail):
130
  """Serialize a reason trail related to an instance change of state to file.
131

132
  The exact location of the file depends on the name of the instance and on
133
  the configuration of the Ganeti cluster defined at deploy time.
134

135
  @type instance_name: string
136
  @param instance_name: The name of the instance
137
  @rtype: None
138

139
  """
140
  json = serializer.DumpJson(trail)
141
  filename = _GetInstReasonFilename(instance_name)
142
  utils.WriteFile(filename, data=json)
143

    
144

    
145
def _Fail(msg, *args, **kwargs):
146
  """Log an error and the raise an RPCFail exception.
147

148
  This exception is then handled specially in the ganeti daemon and
149
  turned into a 'failed' return type. As such, this function is a
150
  useful shortcut for logging the error and returning it to the master
151
  daemon.
152

153
  @type msg: string
154
  @param msg: the text of the exception
155
  @raise RPCFail
156

157
  """
158
  if args:
159
    msg = msg % args
160
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
161
    if "exc" in kwargs and kwargs["exc"]:
162
      logging.exception(msg)
163
    else:
164
      logging.error(msg)
165
  raise RPCFail(msg)
166

    
167

    
168
def _GetConfig():
169
  """Simple wrapper to return a SimpleStore.
170

171
  @rtype: L{ssconf.SimpleStore}
172
  @return: a SimpleStore instance
173

174
  """
175
  return ssconf.SimpleStore()
176

    
177

    
178
def _GetSshRunner(cluster_name):
179
  """Simple wrapper to return an SshRunner.
180

181
  @type cluster_name: str
182
  @param cluster_name: the cluster name, which is needed
183
      by the SshRunner constructor
184
  @rtype: L{ssh.SshRunner}
185
  @return: an SshRunner instance
186

187
  """
188
  return ssh.SshRunner(cluster_name)
189

    
190

    
191
def _Decompress(data):
192
  """Unpacks data compressed by the RPC client.
193

194
  @type data: list or tuple
195
  @param data: Data sent by RPC client
196
  @rtype: str
197
  @return: Decompressed data
198

199
  """
200
  assert isinstance(data, (list, tuple))
201
  assert len(data) == 2
202
  (encoding, content) = data
203
  if encoding == constants.RPC_ENCODING_NONE:
204
    return content
205
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
206
    return zlib.decompress(base64.b64decode(content))
207
  else:
208
    raise AssertionError("Unknown data encoding")
209

    
210

    
211
def _CleanDirectory(path, exclude=None):
212
  """Removes all regular files in a directory.
213

214
  @type path: str
215
  @param path: the directory to clean
216
  @type exclude: list
217
  @param exclude: list of files to be excluded, defaults
218
      to the empty list
219

220
  """
221
  if path not in _ALLOWED_CLEAN_DIRS:
222
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
223
          path)
224

    
225
  if not os.path.isdir(path):
226
    return
227
  if exclude is None:
228
    exclude = []
229
  else:
230
    # Normalize excluded paths
231
    exclude = [os.path.normpath(i) for i in exclude]
232

    
233
  for rel_name in utils.ListVisibleFiles(path):
234
    full_name = utils.PathJoin(path, rel_name)
235
    if full_name in exclude:
236
      continue
237
    if os.path.isfile(full_name) and not os.path.islink(full_name):
238
      utils.RemoveFile(full_name)
239

    
240

    
241
def _BuildUploadFileList():
242
  """Build the list of allowed upload files.
243

244
  This is abstracted so that it's built only once at module import time.
245

246
  """
247
  allowed_files = set([
248
    pathutils.CLUSTER_CONF_FILE,
249
    pathutils.ETC_HOSTS,
250
    pathutils.SSH_KNOWN_HOSTS_FILE,
251
    pathutils.VNC_PASSWORD_FILE,
252
    pathutils.RAPI_CERT_FILE,
253
    pathutils.SPICE_CERT_FILE,
254
    pathutils.SPICE_CACERT_FILE,
255
    pathutils.RAPI_USERS_FILE,
256
    pathutils.CONFD_HMAC_KEY,
257
    pathutils.CLUSTER_DOMAIN_SECRET_FILE,
258
    ])
259

    
260
  for hv_name in constants.HYPER_TYPES:
261
    hv_class = hypervisor.GetHypervisorClass(hv_name)
262
    allowed_files.update(hv_class.GetAncillaryFiles()[0])
263

    
264
  assert pathutils.FILE_STORAGE_PATHS_FILE not in allowed_files, \
265
    "Allowed file storage paths should never be uploaded via RPC"
266

    
267
  return frozenset(allowed_files)
268

    
269

    
270
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
271

    
272

    
273
def JobQueuePurge():
274
  """Removes job queue files and archived jobs.
275

276
  @rtype: tuple
277
  @return: True, None
278

279
  """
280
  _CleanDirectory(pathutils.QUEUE_DIR, exclude=[pathutils.JOB_QUEUE_LOCK_FILE])
281
  _CleanDirectory(pathutils.JOB_QUEUE_ARCHIVE_DIR)
282

    
283

    
284
def GetMasterInfo():
285
  """Returns master information.
286

287
  This is an utility function to compute master information, either
288
  for consumption here or from the node daemon.
289

290
  @rtype: tuple
291
  @return: master_netdev, master_ip, master_name, primary_ip_family,
292
    master_netmask
293
  @raise RPCFail: in case of errors
294

295
  """
296
  try:
297
    cfg = _GetConfig()
298
    master_netdev = cfg.GetMasterNetdev()
299
    master_ip = cfg.GetMasterIP()
300
    master_netmask = cfg.GetMasterNetmask()
301
    master_node = cfg.GetMasterNode()
302
    primary_ip_family = cfg.GetPrimaryIPFamily()
303
  except errors.ConfigurationError, err:
304
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
305
  return (master_netdev, master_ip, master_node, primary_ip_family,
306
          master_netmask)
307

    
308

    
309
def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
310
  """Decorator that runs hooks before and after the decorated function.
311

312
  @type hook_opcode: string
313
  @param hook_opcode: opcode of the hook
314
  @type hooks_path: string
315
  @param hooks_path: path of the hooks
316
  @type env_builder_fn: function
317
  @param env_builder_fn: function that returns a dictionary containing the
318
    environment variables for the hooks. Will get all the parameters of the
319
    decorated function.
320
  @raise RPCFail: in case of pre-hook failure
321

322
  """
323
  def decorator(fn):
324
    def wrapper(*args, **kwargs):
325
      _, myself = ssconf.GetMasterAndMyself()
326
      nodes = ([myself], [myself])  # these hooks run locally
327

    
328
      env_fn = compat.partial(env_builder_fn, *args, **kwargs)
329

    
330
      cfg = _GetConfig()
331
      hr = HooksRunner()
332
      hm = hooksmaster.HooksMaster(hook_opcode, hooks_path, nodes,
333
                                   hr.RunLocalHooks, None, env_fn,
334
                                   logging.warning, cfg.GetClusterName(),
335
                                   cfg.GetMasterNode())
336
      hm.RunPhase(constants.HOOKS_PHASE_PRE)
337
      result = fn(*args, **kwargs)
338
      hm.RunPhase(constants.HOOKS_PHASE_POST)
339

    
340
      return result
341
    return wrapper
342
  return decorator
343

    
344

    
345
def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
346
  """Builds environment variables for master IP hooks.
347

348
  @type master_params: L{objects.MasterNetworkParameters}
349
  @param master_params: network parameters of the master
350
  @type use_external_mip_script: boolean
351
  @param use_external_mip_script: whether to use an external master IP
352
    address setup script (unused, but necessary per the implementation of the
353
    _RunLocalHooks decorator)
354

355
  """
356
  # pylint: disable=W0613
357
  ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
358
  env = {
359
    "MASTER_NETDEV": master_params.netdev,
360
    "MASTER_IP": master_params.ip,
361
    "MASTER_NETMASK": str(master_params.netmask),
362
    "CLUSTER_IP_VERSION": str(ver),
363
  }
364

    
365
  return env
366

    
367

    
368
def _RunMasterSetupScript(master_params, action, use_external_mip_script):
369
  """Execute the master IP address setup script.
370

371
  @type master_params: L{objects.MasterNetworkParameters}
372
  @param master_params: network parameters of the master
373
  @type action: string
374
  @param action: action to pass to the script. Must be one of
375
    L{backend._MASTER_START} or L{backend._MASTER_STOP}
376
  @type use_external_mip_script: boolean
377
  @param use_external_mip_script: whether to use an external master IP
378
    address setup script
379
  @raise backend.RPCFail: if there are errors during the execution of the
380
    script
381

382
  """
383
  env = _BuildMasterIpEnv(master_params)
384

    
385
  if use_external_mip_script:
386
    setup_script = pathutils.EXTERNAL_MASTER_SETUP_SCRIPT
387
  else:
388
    setup_script = pathutils.DEFAULT_MASTER_SETUP_SCRIPT
389

    
390
  result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
391

    
392
  if result.failed:
393
    _Fail("Failed to %s the master IP. Script return value: %s, output: '%s'" %
394
          (action, result.exit_code, result.output), log=True)
395

    
396

    
397
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
398
               _BuildMasterIpEnv)
399
def ActivateMasterIp(master_params, use_external_mip_script):
400
  """Activate the IP address of the master daemon.
401

402
  @type master_params: L{objects.MasterNetworkParameters}
403
  @param master_params: network parameters of the master
404
  @type use_external_mip_script: boolean
405
  @param use_external_mip_script: whether to use an external master IP
406
    address setup script
407
  @raise RPCFail: in case of errors during the IP startup
408

409
  """
410
  _RunMasterSetupScript(master_params, _MASTER_START,
411
                        use_external_mip_script)
412

    
413

    
414
def StartMasterDaemons(no_voting):
415
  """Activate local node as master node.
416

417
  The function will start the master daemons (ganeti-masterd and ganeti-rapi).
418

419
  @type no_voting: boolean
420
  @param no_voting: whether to start ganeti-masterd without a node vote
421
      but still non-interactively
422
  @rtype: None
423

424
  """
425

    
426
  if no_voting:
427
    masterd_args = "--no-voting --yes-do-it"
428
  else:
429
    masterd_args = ""
430

    
431
  env = {
432
    "EXTRA_MASTERD_ARGS": masterd_args,
433
    }
434

    
435
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
436
  if result.failed:
437
    msg = "Can't start Ganeti master: %s" % result.output
438
    logging.error(msg)
439
    _Fail(msg)
440

    
441

    
442
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
443
               _BuildMasterIpEnv)
444
def DeactivateMasterIp(master_params, use_external_mip_script):
445
  """Deactivate the master IP on this node.
446

447
  @type master_params: L{objects.MasterNetworkParameters}
448
  @param master_params: network parameters of the master
449
  @type use_external_mip_script: boolean
450
  @param use_external_mip_script: whether to use an external master IP
451
    address setup script
452
  @raise RPCFail: in case of errors during the IP turndown
453

454
  """
455
  _RunMasterSetupScript(master_params, _MASTER_STOP,
456
                        use_external_mip_script)
457

    
458

    
459
def StopMasterDaemons():
460
  """Stop the master daemons on this node.
461

462
  Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
463

464
  @rtype: None
465

466
  """
467
  # TODO: log and report back to the caller the error failures; we
468
  # need to decide in which case we fail the RPC for this
469

    
470
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
471
  if result.failed:
472
    logging.error("Could not stop Ganeti master, command %s had exitcode %s"
473
                  " and error %s",
474
                  result.cmd, result.exit_code, result.output)
475

    
476

    
477
def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
478
  """Change the netmask of the master IP.
479

480
  @param old_netmask: the old value of the netmask
481
  @param netmask: the new value of the netmask
482
  @param master_ip: the master IP
483
  @param master_netdev: the master network device
484

485
  """
486
  if old_netmask == netmask:
487
    return
488

    
489
  if not netutils.IPAddress.Own(master_ip):
490
    _Fail("The master IP address is not up, not attempting to change its"
491
          " netmask")
492

    
493
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
494
                         "%s/%s" % (master_ip, netmask),
495
                         "dev", master_netdev, "label",
496
                         "%s:0" % master_netdev])
497
  if result.failed:
498
    _Fail("Could not set the new netmask on the master IP address")
499

    
500
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
501
                         "%s/%s" % (master_ip, old_netmask),
502
                         "dev", master_netdev, "label",
503
                         "%s:0" % master_netdev])
504
  if result.failed:
505
    _Fail("Could not bring down the master IP address with the old netmask")
506

    
507

    
508
def EtcHostsModify(mode, host, ip):
509
  """Modify a host entry in /etc/hosts.
510

511
  @param mode: The mode to operate. Either add or remove entry
512
  @param host: The host to operate on
513
  @param ip: The ip associated with the entry
514

515
  """
516
  if mode == constants.ETC_HOSTS_ADD:
517
    if not ip:
518
      RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
519
              " present")
520
    utils.AddHostToEtcHosts(host, ip)
521
  elif mode == constants.ETC_HOSTS_REMOVE:
522
    if ip:
523
      RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
524
              " parameter is present")
525
    utils.RemoveHostFromEtcHosts(host)
526
  else:
527
    RPCFail("Mode not supported")
528

    
529

    
530
def LeaveCluster(modify_ssh_setup):
531
  """Cleans up and remove the current node.
532

533
  This function cleans up and prepares the current node to be removed
534
  from the cluster.
535

536
  If processing is successful, then it raises an
537
  L{errors.QuitGanetiException} which is used as a special case to
538
  shutdown the node daemon.
539

540
  @param modify_ssh_setup: boolean
541

542
  """
543
  _CleanDirectory(pathutils.DATA_DIR)
544
  _CleanDirectory(pathutils.CRYPTO_KEYS_DIR)
545
  JobQueuePurge()
546

    
547
  if modify_ssh_setup:
548
    try:
549
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
550

    
551
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
552

    
553
      utils.RemoveFile(priv_key)
554
      utils.RemoveFile(pub_key)
555
    except errors.OpExecError:
556
      logging.exception("Error while processing ssh files")
557

    
558
  try:
559
    utils.RemoveFile(pathutils.CONFD_HMAC_KEY)
560
    utils.RemoveFile(pathutils.RAPI_CERT_FILE)
561
    utils.RemoveFile(pathutils.SPICE_CERT_FILE)
562
    utils.RemoveFile(pathutils.SPICE_CACERT_FILE)
563
    utils.RemoveFile(pathutils.NODED_CERT_FILE)
564
  except: # pylint: disable=W0702
565
    logging.exception("Error while removing cluster secrets")
566

    
567
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.CONFD])
568
  if result.failed:
569
    logging.error("Command %s failed with exitcode %s and error %s",
570
                  result.cmd, result.exit_code, result.output)
571

    
572
  # Raise a custom exception (handled in ganeti-noded)
573
  raise errors.QuitGanetiException(True, "Shutdown scheduled")
574

    
575

    
576
def _GetVgInfo(name, excl_stor):
577
  """Retrieves information about a LVM volume group.
578

579
  """
580
  # TODO: GetVGInfo supports returning information for multiple VGs at once
581
  vginfo = bdev.LogicalVolume.GetVGInfo([name], excl_stor)
582
  if vginfo:
583
    vg_free = int(round(vginfo[0][0], 0))
584
    vg_size = int(round(vginfo[0][1], 0))
585
  else:
586
    vg_free = None
587
    vg_size = None
588

    
589
  return {
590
    "type": constants.ST_LVM_VG,
591
    "name": name,
592
    "storage_free": vg_free,
593
    "storage_size": vg_size,
594
    }
595

    
596

    
597
def _GetVgSpindlesInfo(name, excl_stor):
598
  """Retrieves information about spindles in an LVM volume group.
599

600
  @type name: string
601
  @param name: VG name
602
  @type excl_stor: bool
603
  @param excl_stor: exclusive storage
604
  @rtype: dict
605
  @return: dictionary whose keys are "name", "vg_free", "vg_size" for VG name,
606
      free spindles, total spindles respectively
607

608
  """
609
  if excl_stor:
610
    (vg_free, vg_size) = bdev.LogicalVolume.GetVgSpindlesInfo(name)
611
  else:
612
    vg_free = 0
613
    vg_size = 0
614
  return {
615
    "type": constants.ST_LVM_PV,
616
    "name": name,
617
    "storage_free": vg_free,
618
    "storage_size": vg_size,
619
    }
620

    
621

    
622
def _GetHvInfo(name, hvparams, get_hv_fn=hypervisor.GetHypervisor):
623
  """Retrieves node information from a hypervisor.
624

625
  The information returned depends on the hypervisor. Common items:
626

627
    - vg_size is the size of the configured volume group in MiB
628
    - vg_free is the free size of the volume group in MiB
629
    - memory_dom0 is the memory allocated for domain0 in MiB
630
    - memory_free is the currently available (free) ram in MiB
631
    - memory_total is the total number of ram in MiB
632
    - hv_version: the hypervisor version, if available
633

634
  @type hvparams: dict of string
635
  @param hvparams: the hypervisor's hvparams
636

637
  """
638
  return get_hv_fn(name).GetNodeInfo(hvparams=hvparams)
639

    
640

    
641
def _GetHvInfoAll(hv_specs, get_hv_fn=hypervisor.GetHypervisor):
642
  """Retrieves node information for all hypervisors.
643

644
  See C{_GetHvInfo} for information on the output.
645

646
  @type hv_specs: list of pairs (string, dict of strings)
647
  @param hv_specs: list of pairs of a hypervisor's name and its hvparams
648

649
  """
650
  if hv_specs is None:
651
    return None
652

    
653
  result = []
654
  for hvname, hvparams in hv_specs:
655
    result.append(_GetHvInfo(hvname, hvparams, get_hv_fn))
656
  return result
657

    
658

    
659
def _GetNamedNodeInfo(names, fn):
660
  """Calls C{fn} for all names in C{names} and returns a dictionary.
661

662
  @rtype: None or dict
663

664
  """
665
  if names is None:
666
    return None
667
  else:
668
    return map(fn, names)
669

    
670

    
671
def GetNodeInfo(storage_units, hv_specs):
672
  """Gives back a hash with different information about the node.
673

674
  @type storage_units: list of tuples (string, string, list)
675
  @param storage_units: List of tuples (storage unit, identifier, parameters) to
676
    ask for disk space information. In case of lvm-vg, the identifier is
677
    the VG name. The parameters can contain additional, storage-type-specific
678
    parameters, for example exclusive storage for lvm storage.
679
  @type hv_specs: list of pairs (string, dict of strings)
680
  @param hv_specs: list of pairs of a hypervisor's name and its hvparams
681
  @rtype: tuple; (string, None/dict, None/dict)
682
  @return: Tuple containing boot ID, volume group information and hypervisor
683
    information
684

685
  """
686
  bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
687
  storage_info = _GetNamedNodeInfo(
688
    storage_units,
689
    (lambda (storage_type, storage_key, storage_params):
690
        _ApplyStorageInfoFunction(storage_type, storage_key, storage_params)))
691
  hv_info = _GetHvInfoAll(hv_specs)
692
  return (bootid, storage_info, hv_info)
693

    
694

    
695
# pylint: disable=W0613
696
def _GetFileStorageSpaceInfo(path, *args):
697
  """Wrapper around filestorage.GetSpaceInfo.
698

699
  The purpose of this wrapper is to call filestorage.GetFileStorageSpaceInfo
700
  and ignore the *args parameter to not leak it into the filestorage
701
  module's code.
702

703
  @see: C{filestorage.GetFileStorageSpaceInfo} for description of the
704
    parameters.
705

706
  """
707
  return filestorage.GetFileStorageSpaceInfo(path)
708

    
709

    
710
# FIXME: implement storage reporting for all missing storage types.
711
_STORAGE_TYPE_INFO_FN = {
712
  constants.ST_BLOCK: None,
713
  constants.ST_DISKLESS: None,
714
  constants.ST_EXT: None,
715
  constants.ST_FILE: _GetFileStorageSpaceInfo,
716
  constants.ST_LVM_PV: _GetVgSpindlesInfo,
717
  constants.ST_LVM_VG: _GetVgInfo,
718
  constants.ST_RADOS: None,
719
}
720

    
721

    
722
def _ApplyStorageInfoFunction(storage_type, storage_key, *args):
723
  """Looks up and applies the correct function to calculate free and total
724
  storage for the given storage type.
725

726
  @type storage_type: string
727
  @param storage_type: the storage type for which the storage shall be reported.
728
  @type storage_key: string
729
  @param storage_key: identifier of a storage unit, e.g. the volume group name
730
    of an LVM storage unit
731
  @type args: any
732
  @param args: various parameters that can be used for storage reporting. These
733
    parameters and their semantics vary from storage type to storage type and
734
    are just propagated in this function.
735
  @return: the results of the application of the storage space function (see
736
    _STORAGE_TYPE_INFO_FN) if storage space reporting is implemented for that
737
    storage type
738
  @raises NotImplementedError: for storage types who don't support space
739
    reporting yet
740
  """
741
  fn = _STORAGE_TYPE_INFO_FN[storage_type]
742
  if fn is not None:
743
    return fn(storage_key, *args)
744
  else:
745
    raise NotImplementedError
746

    
747

    
748
def _CheckExclusivePvs(pvi_list):
749
  """Check that PVs are not shared among LVs
750

751
  @type pvi_list: list of L{objects.LvmPvInfo} objects
752
  @param pvi_list: information about the PVs
753

754
  @rtype: list of tuples (string, list of strings)
755
  @return: offending volumes, as tuples: (pv_name, [lv1_name, lv2_name...])
756

757
  """
758
  res = []
759
  for pvi in pvi_list:
760
    if len(pvi.lv_list) > 1:
761
      res.append((pvi.name, pvi.lv_list))
762
  return res
763

    
764

    
765
def _VerifyHypervisors(what, vm_capable, result, all_hvparams,
766
                       get_hv_fn=hypervisor.GetHypervisor):
767
  """Verifies the hypervisor. Appends the results to the 'results' list.
768

769
  @type what: C{dict}
770
  @param what: a dictionary of things to check
771
  @type vm_capable: boolean
772
  @param vm_capable: whether or not this node is vm capable
773
  @type result: dict
774
  @param result: dictionary of verification results; results of the
775
    verifications in this function will be added here
776
  @type all_hvparams: dict of dict of string
777
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
778
  @type get_hv_fn: function
779
  @param get_hv_fn: function to retrieve the hypervisor, to improve testability
780

781
  """
782
  if not vm_capable:
783
    return
784

    
785
  if constants.NV_HYPERVISOR in what:
786
    result[constants.NV_HYPERVISOR] = {}
787
    for hv_name in what[constants.NV_HYPERVISOR]:
788
      hvparams = all_hvparams[hv_name]
789
      try:
790
        val = get_hv_fn(hv_name).Verify(hvparams=hvparams)
791
      except errors.HypervisorError, err:
792
        val = "Error while checking hypervisor: %s" % str(err)
793
      result[constants.NV_HYPERVISOR][hv_name] = val
794

    
795

    
796
def _VerifyHvparams(what, vm_capable, result,
797
                    get_hv_fn=hypervisor.GetHypervisor):
798
  """Verifies the hvparams. Appends the results to the 'results' list.
799

800
  @type what: C{dict}
801
  @param what: a dictionary of things to check
802
  @type vm_capable: boolean
803
  @param vm_capable: whether or not this node is vm capable
804
  @type result: dict
805
  @param result: dictionary of verification results; results of the
806
    verifications in this function will be added here
807
  @type get_hv_fn: function
808
  @param get_hv_fn: function to retrieve the hypervisor, to improve testability
809

810
  """
811
  if not vm_capable:
812
    return
813

    
814
  if constants.NV_HVPARAMS in what:
815
    result[constants.NV_HVPARAMS] = []
816
    for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
817
      try:
818
        logging.info("Validating hv %s, %s", hv_name, hvparms)
819
        get_hv_fn(hv_name).ValidateParameters(hvparms)
820
      except errors.HypervisorError, err:
821
        result[constants.NV_HVPARAMS].append((source, hv_name, str(err)))
822

    
823

    
824
def _VerifyInstanceList(what, vm_capable, result, all_hvparams):
825
  """Verifies the instance list.
826

827
  @type what: C{dict}
828
  @param what: a dictionary of things to check
829
  @type vm_capable: boolean
830
  @param vm_capable: whether or not this node is vm capable
831
  @type result: dict
832
  @param result: dictionary of verification results; results of the
833
    verifications in this function will be added here
834
  @type all_hvparams: dict of dict of string
835
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
836

837
  """
838
  if constants.NV_INSTANCELIST in what and vm_capable:
839
    # GetInstanceList can fail
840
    try:
841
      val = GetInstanceList(what[constants.NV_INSTANCELIST],
842
                            all_hvparams=all_hvparams)
843
    except RPCFail, err:
844
      val = str(err)
845
    result[constants.NV_INSTANCELIST] = val
846

    
847

    
848
def _VerifyNodeInfo(what, vm_capable, result, all_hvparams):
849
  """Verifies the node info.
850

851
  @type what: C{dict}
852
  @param what: a dictionary of things to check
853
  @type vm_capable: boolean
854
  @param vm_capable: whether or not this node is vm capable
855
  @type result: dict
856
  @param result: dictionary of verification results; results of the
857
    verifications in this function will be added here
858
  @type all_hvparams: dict of dict of string
859
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
860

861
  """
862
  if constants.NV_HVINFO in what and vm_capable:
863
    hvname = what[constants.NV_HVINFO]
864
    hyper = hypervisor.GetHypervisor(hvname)
865
    hvparams = all_hvparams[hvname]
866
    result[constants.NV_HVINFO] = hyper.GetNodeInfo(hvparams=hvparams)
867

    
868

    
869
def VerifyNode(what, cluster_name, all_hvparams):
870
  """Verify the status of the local node.
871

872
  Based on the input L{what} parameter, various checks are done on the
873
  local node.
874

875
  If the I{filelist} key is present, this list of
876
  files is checksummed and the file/checksum pairs are returned.
877

878
  If the I{nodelist} key is present, we check that we have
879
  connectivity via ssh with the target nodes (and check the hostname
880
  report).
881

882
  If the I{node-net-test} key is present, we check that we have
883
  connectivity to the given nodes via both primary IP and, if
884
  applicable, secondary IPs.
885

886
  @type what: C{dict}
887
  @param what: a dictionary of things to check:
888
      - filelist: list of files for which to compute checksums
889
      - nodelist: list of nodes we should check ssh communication with
890
      - node-net-test: list of nodes we should check node daemon port
891
        connectivity with
892
      - hypervisor: list with hypervisors to run the verify for
893
  @type cluster_name: string
894
  @param cluster_name: the cluster's name
895
  @type all_hvparams: dict of dict of strings
896
  @param all_hvparams: a dictionary mapping hypervisor names to hvparams
897
  @rtype: dict
898
  @return: a dictionary with the same keys as the input dict, and
899
      values representing the result of the checks
900

901
  """
902
  result = {}
903
  my_name = netutils.Hostname.GetSysName()
904
  port = netutils.GetDaemonPort(constants.NODED)
905
  vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
906

    
907
  _VerifyHypervisors(what, vm_capable, result, all_hvparams)
908
  _VerifyHvparams(what, vm_capable, result)
909

    
910
  if constants.NV_FILELIST in what:
911
    fingerprints = utils.FingerprintFiles(map(vcluster.LocalizeVirtualPath,
912
                                              what[constants.NV_FILELIST]))
913
    result[constants.NV_FILELIST] = \
914
      dict((vcluster.MakeVirtualPath(key), value)
915
           for (key, value) in fingerprints.items())
916

    
917
  if constants.NV_NODELIST in what:
918
    (nodes, bynode) = what[constants.NV_NODELIST]
919

    
920
    # Add nodes from other groups (different for each node)
921
    try:
922
      nodes.extend(bynode[my_name])
923
    except KeyError:
924
      pass
925

    
926
    # Use a random order
927
    random.shuffle(nodes)
928

    
929
    # Try to contact all nodes
930
    val = {}
931
    for node in nodes:
932
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
933
      if not success:
934
        val[node] = message
935

    
936
    result[constants.NV_NODELIST] = val
937

    
938
  if constants.NV_NODENETTEST in what:
939
    result[constants.NV_NODENETTEST] = tmp = {}
940
    my_pip = my_sip = None
941
    for name, pip, sip in what[constants.NV_NODENETTEST]:
942
      if name == my_name:
943
        my_pip = pip
944
        my_sip = sip
945
        break
946
    if not my_pip:
947
      tmp[my_name] = ("Can't find my own primary/secondary IP"
948
                      " in the node list")
949
    else:
950
      for name, pip, sip in what[constants.NV_NODENETTEST]:
951
        fail = []
952
        if not netutils.TcpPing(pip, port, source=my_pip):
953
          fail.append("primary")
954
        if sip != pip:
955
          if not netutils.TcpPing(sip, port, source=my_sip):
956
            fail.append("secondary")
957
        if fail:
958
          tmp[name] = ("failure using the %s interface(s)" %
959
                       " and ".join(fail))
960

    
961
  if constants.NV_MASTERIP in what:
962
    # FIXME: add checks on incoming data structures (here and in the
963
    # rest of the function)
964
    master_name, master_ip = what[constants.NV_MASTERIP]
965
    if master_name == my_name:
966
      source = constants.IP4_ADDRESS_LOCALHOST
967
    else:
968
      source = None
969
    result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
970
                                                     source=source)
971

    
972
  if constants.NV_USERSCRIPTS in what:
973
    result[constants.NV_USERSCRIPTS] = \
974
      [script for script in what[constants.NV_USERSCRIPTS]
975
       if not utils.IsExecutable(script)]
976

    
977
  if constants.NV_OOB_PATHS in what:
978
    result[constants.NV_OOB_PATHS] = tmp = []
979
    for path in what[constants.NV_OOB_PATHS]:
980
      try:
981
        st = os.stat(path)
982
      except OSError, err:
983
        tmp.append("error stating out of band helper: %s" % err)
984
      else:
985
        if stat.S_ISREG(st.st_mode):
986
          if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
987
            tmp.append(None)
988
          else:
989
            tmp.append("out of band helper %s is not executable" % path)
990
        else:
991
          tmp.append("out of band helper %s is not a file" % path)
992

    
993
  if constants.NV_LVLIST in what and vm_capable:
994
    try:
995
      val = GetVolumeList(utils.ListVolumeGroups().keys())
996
    except RPCFail, err:
997
      val = str(err)
998
    result[constants.NV_LVLIST] = val
999

    
1000
  _VerifyInstanceList(what, vm_capable, result, all_hvparams)
1001

    
1002
  if constants.NV_VGLIST in what and vm_capable:
1003
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
1004

    
1005
  if constants.NV_PVLIST in what and vm_capable:
1006
    check_exclusive_pvs = constants.NV_EXCLUSIVEPVS in what
1007
    val = bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
1008
                                       filter_allocatable=False,
1009
                                       include_lvs=check_exclusive_pvs)
1010
    if check_exclusive_pvs:
1011
      result[constants.NV_EXCLUSIVEPVS] = _CheckExclusivePvs(val)
1012
      for pvi in val:
1013
        # Avoid sending useless data on the wire
1014
        pvi.lv_list = []
1015
    result[constants.NV_PVLIST] = map(objects.LvmPvInfo.ToDict, val)
1016

    
1017
  if constants.NV_VERSION in what:
1018
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
1019
                                    constants.RELEASE_VERSION)
1020

    
1021
  _VerifyNodeInfo(what, vm_capable, result, all_hvparams)
1022

    
1023
  if constants.NV_DRBDVERSION in what and vm_capable:
1024
    try:
1025
      drbd_version = DRBD8.GetProcInfo().GetVersionString()
1026
    except errors.BlockDeviceError, err:
1027
      logging.warning("Can't get DRBD version", exc_info=True)
1028
      drbd_version = str(err)
1029
    result[constants.NV_DRBDVERSION] = drbd_version
1030

    
1031
  if constants.NV_DRBDLIST in what and vm_capable:
1032
    try:
1033
      used_minors = drbd.DRBD8.GetUsedDevs()
1034
    except errors.BlockDeviceError, err:
1035
      logging.warning("Can't get used minors list", exc_info=True)
1036
      used_minors = str(err)
1037
    result[constants.NV_DRBDLIST] = used_minors
1038

    
1039
  if constants.NV_DRBDHELPER in what and vm_capable:
1040
    status = True
1041
    try:
1042
      payload = drbd.DRBD8.GetUsermodeHelper()
1043
    except errors.BlockDeviceError, err:
1044
      logging.error("Can't get DRBD usermode helper: %s", str(err))
1045
      status = False
1046
      payload = str(err)
1047
    result[constants.NV_DRBDHELPER] = (status, payload)
1048

    
1049
  if constants.NV_NODESETUP in what:
1050
    result[constants.NV_NODESETUP] = tmpr = []
1051
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
1052
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
1053
                  " under /sys, missing required directories /sys/block"
1054
                  " and /sys/class/net")
1055
    if (not os.path.isdir("/proc/sys") or
1056
        not os.path.isfile("/proc/sysrq-trigger")):
1057
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
1058
                  " under /proc, missing required directory /proc/sys and"
1059
                  " the file /proc/sysrq-trigger")
1060

    
1061
  if constants.NV_TIME in what:
1062
    result[constants.NV_TIME] = utils.SplitTime(time.time())
1063

    
1064
  if constants.NV_OSLIST in what and vm_capable:
1065
    result[constants.NV_OSLIST] = DiagnoseOS()
1066

    
1067
  if constants.NV_BRIDGES in what and vm_capable:
1068
    result[constants.NV_BRIDGES] = [bridge
1069
                                    for bridge in what[constants.NV_BRIDGES]
1070
                                    if not utils.BridgeExists(bridge)]
1071

    
1072
  if what.get(constants.NV_FILE_STORAGE_PATHS) == my_name:
1073
    result[constants.NV_FILE_STORAGE_PATHS] = \
1074
      bdev.ComputeWrongFileStoragePaths()
1075

    
1076
  return result
1077

    
1078

    
1079
def GetBlockDevSizes(devices):
1080
  """Return the size of the given block devices
1081

1082
  @type devices: list
1083
  @param devices: list of block device nodes to query
1084
  @rtype: dict
1085
  @return:
1086
    dictionary of all block devices under /dev (key). The value is their
1087
    size in MiB.
1088

1089
    {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
1090

1091
  """
1092
  DEV_PREFIX = "/dev/"
1093
  blockdevs = {}
1094

    
1095
  for devpath in devices:
1096
    if not utils.IsBelowDir(DEV_PREFIX, devpath):
1097
      continue
1098

    
1099
    try:
1100
      st = os.stat(devpath)
1101
    except EnvironmentError, err:
1102
      logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
1103
      continue
1104

    
1105
    if stat.S_ISBLK(st.st_mode):
1106
      result = utils.RunCmd(["blockdev", "--getsize64", devpath])
1107
      if result.failed:
1108
        # We don't want to fail, just do not list this device as available
1109
        logging.warning("Cannot get size for block device %s", devpath)
1110
        continue
1111

    
1112
      size = int(result.stdout) / (1024 * 1024)
1113
      blockdevs[devpath] = size
1114
  return blockdevs
1115

    
1116

    
1117
def GetVolumeList(vg_names):
1118
  """Compute list of logical volumes and their size.
1119

1120
  @type vg_names: list
1121
  @param vg_names: the volume groups whose LVs we should list, or
1122
      empty for all volume groups
1123
  @rtype: dict
1124
  @return:
1125
      dictionary of all partions (key) with value being a tuple of
1126
      their size (in MiB), inactive and online status::
1127

1128
        {'xenvg/test1': ('20.06', True, True)}
1129

1130
      in case of errors, a string is returned with the error
1131
      details.
1132

1133
  """
1134
  lvs = {}
1135
  sep = "|"
1136
  if not vg_names:
1137
    vg_names = []
1138
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
1139
                         "--separator=%s" % sep,
1140
                         "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
1141
  if result.failed:
1142
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
1143

    
1144
  for line in result.stdout.splitlines():
1145
    line = line.strip()
1146
    match = _LVSLINE_REGEX.match(line)
1147
    if not match:
1148
      logging.error("Invalid line returned from lvs output: '%s'", line)
1149
      continue
1150
    vg_name, name, size, attr = match.groups()
1151
    inactive = attr[4] == "-"
1152
    online = attr[5] == "o"
1153
    virtual = attr[0] == "v"
1154
    if virtual:
1155
      # we don't want to report such volumes as existing, since they
1156
      # don't really hold data
1157
      continue
1158
    lvs[vg_name + "/" + name] = (size, inactive, online)
1159

    
1160
  return lvs
1161

    
1162

    
1163
def ListVolumeGroups():
1164
  """List the volume groups and their size.
1165

1166
  @rtype: dict
1167
  @return: dictionary with keys volume name and values the
1168
      size of the volume
1169

1170
  """
1171
  return utils.ListVolumeGroups()
1172

    
1173

    
1174
def NodeVolumes():
1175
  """List all volumes on this node.
1176

1177
  @rtype: list
1178
  @return:
1179
    A list of dictionaries, each having four keys:
1180
      - name: the logical volume name,
1181
      - size: the size of the logical volume
1182
      - dev: the physical device on which the LV lives
1183
      - vg: the volume group to which it belongs
1184

1185
    In case of errors, we return an empty list and log the
1186
    error.
1187

1188
    Note that since a logical volume can live on multiple physical
1189
    volumes, the resulting list might include a logical volume
1190
    multiple times.
1191

1192
  """
1193
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
1194
                         "--separator=|",
1195
                         "--options=lv_name,lv_size,devices,vg_name"])
1196
  if result.failed:
1197
    _Fail("Failed to list logical volumes, lvs output: %s",
1198
          result.output)
1199

    
1200
  def parse_dev(dev):
1201
    return dev.split("(")[0]
1202

    
1203
  def handle_dev(dev):
1204
    return [parse_dev(x) for x in dev.split(",")]
1205

    
1206
  def map_line(line):
1207
    line = [v.strip() for v in line]
1208
    return [{"name": line[0], "size": line[1],
1209
             "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
1210

    
1211
  all_devs = []
1212
  for line in result.stdout.splitlines():
1213
    if line.count("|") >= 3:
1214
      all_devs.extend(map_line(line.split("|")))
1215
    else:
1216
      logging.warning("Strange line in the output from lvs: '%s'", line)
1217
  return all_devs
1218

    
1219

    
1220
def BridgesExist(bridges_list):
1221
  """Check if a list of bridges exist on the current node.
1222

1223
  @rtype: boolean
1224
  @return: C{True} if all of them exist, C{False} otherwise
1225

1226
  """
1227
  missing = []
1228
  for bridge in bridges_list:
1229
    if not utils.BridgeExists(bridge):
1230
      missing.append(bridge)
1231

    
1232
  if missing:
1233
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
1234

    
1235

    
1236
def GetInstanceListForHypervisor(hname, hvparams=None,
1237
                                 get_hv_fn=hypervisor.GetHypervisor):
1238
  """Provides a list of instances of the given hypervisor.
1239

1240
  @type hname: string
1241
  @param hname: name of the hypervisor
1242
  @type hvparams: dict of strings
1243
  @param hvparams: hypervisor parameters for the given hypervisor
1244
  @type get_hv_fn: function
1245
  @param get_hv_fn: function that returns a hypervisor for the given hypervisor
1246
    name; optional parameter to increase testability
1247

1248
  @rtype: list
1249
  @return: a list of all running instances on the current node
1250
    - instance1.example.com
1251
    - instance2.example.com
1252

1253
  """
1254
  results = []
1255
  try:
1256
    hv = get_hv_fn(hname)
1257
    names = hv.ListInstances(hvparams=hvparams)
1258
    results.extend(names)
1259
  except errors.HypervisorError, err:
1260
    _Fail("Error enumerating instances (hypervisor %s): %s",
1261
          hname, err, exc=True)
1262
  return results
1263

    
1264

    
1265
def GetInstanceList(hypervisor_list, all_hvparams=None,
1266
                    get_hv_fn=hypervisor.GetHypervisor):
1267
  """Provides a list of instances.
1268

1269
  @type hypervisor_list: list
1270
  @param hypervisor_list: the list of hypervisors to query information
1271
  @type all_hvparams: dict of dict of strings
1272
  @param all_hvparams: a dictionary mapping hypervisor types to respective
1273
    cluster-wide hypervisor parameters
1274
  @type get_hv_fn: function
1275
  @param get_hv_fn: function that returns a hypervisor for the given hypervisor
1276
    name; optional parameter to increase testability
1277

1278
  @rtype: list
1279
  @return: a list of all running instances on the current node
1280
    - instance1.example.com
1281
    - instance2.example.com
1282

1283
  """
1284
  results = []
1285
  for hname in hypervisor_list:
1286
    hvparams = all_hvparams[hname]
1287
    results.extend(GetInstanceListForHypervisor(hname, hvparams=hvparams,
1288
                                                get_hv_fn=get_hv_fn))
1289
  return results
1290

    
1291

    
1292
def GetInstanceInfo(instance, hname, hvparams=None):
1293
  """Gives back the information about an instance as a dictionary.
1294

1295
  @type instance: string
1296
  @param instance: the instance name
1297
  @type hname: string
1298
  @param hname: the hypervisor type of the instance
1299
  @type hvparams: dict of strings
1300
  @param hvparams: the instance's hvparams
1301

1302
  @rtype: dict
1303
  @return: dictionary with the following keys:
1304
      - memory: memory size of instance (int)
1305
      - state: xen state of instance (string)
1306
      - time: cpu time of instance (float)
1307
      - vcpus: the number of vcpus (int)
1308

1309
  """
1310
  output = {}
1311

    
1312
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance,
1313
                                                          hvparams=hvparams)
1314
  if iinfo is not None:
1315
    output["memory"] = iinfo[2]
1316
    output["vcpus"] = iinfo[3]
1317
    output["state"] = iinfo[4]
1318
    output["time"] = iinfo[5]
1319

    
1320
  return output
1321

    
1322

    
1323
def GetInstanceMigratable(instance):
1324
  """Computes whether an instance can be migrated.
1325

1326
  @type instance: L{objects.Instance}
1327
  @param instance: object representing the instance to be checked.
1328

1329
  @rtype: tuple
1330
  @return: tuple of (result, description) where:
1331
      - result: whether the instance can be migrated or not
1332
      - description: a description of the issue, if relevant
1333

1334
  """
1335
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1336
  iname = instance.name
1337
  if iname not in hyper.ListInstances(instance.hvparams):
1338
    _Fail("Instance %s is not running", iname)
1339

    
1340
  for idx in range(len(instance.disks)):
1341
    link_name = _GetBlockDevSymlinkPath(iname, idx)
1342
    if not os.path.islink(link_name):
1343
      logging.warning("Instance %s is missing symlink %s for disk %d",
1344
                      iname, link_name, idx)
1345

    
1346

    
1347
def GetAllInstancesInfo(hypervisor_list, all_hvparams):
1348
  """Gather data about all instances.
1349

1350
  This is the equivalent of L{GetInstanceInfo}, except that it
1351
  computes data for all instances at once, thus being faster if one
1352
  needs data about more than one instance.
1353

1354
  @type hypervisor_list: list
1355
  @param hypervisor_list: list of hypervisors to query for instance data
1356
  @type all_hvparams: dict of dict of strings
1357
  @param all_hvparams: mapping of hypervisor names to hvparams
1358

1359
  @rtype: dict
1360
  @return: dictionary of instance: data, with data having the following keys:
1361
      - memory: memory size of instance (int)
1362
      - state: xen state of instance (string)
1363
      - time: cpu time of instance (float)
1364
      - vcpus: the number of vcpus
1365

1366
  """
1367
  output = {}
1368

    
1369
  for hname in hypervisor_list:
1370
    hvparams = all_hvparams[hname]
1371
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo(hvparams)
1372
    if iinfo:
1373
      for name, _, memory, vcpus, state, times in iinfo:
1374
        value = {
1375
          "memory": memory,
1376
          "vcpus": vcpus,
1377
          "state": state,
1378
          "time": times,
1379
          }
1380
        if name in output:
1381
          # we only check static parameters, like memory and vcpus,
1382
          # and not state and time which can change between the
1383
          # invocations of the different hypervisors
1384
          for key in "memory", "vcpus":
1385
            if value[key] != output[name][key]:
1386
              _Fail("Instance %s is running twice"
1387
                    " with different parameters", name)
1388
        output[name] = value
1389

    
1390
  return output
1391

    
1392

    
1393
def _InstanceLogName(kind, os_name, instance, component):
1394
  """Compute the OS log filename for a given instance and operation.
1395

1396
  The instance name and os name are passed in as strings since not all
1397
  operations have these as part of an instance object.
1398

1399
  @type kind: string
1400
  @param kind: the operation type (e.g. add, import, etc.)
1401
  @type os_name: string
1402
  @param os_name: the os name
1403
  @type instance: string
1404
  @param instance: the name of the instance being imported/added/etc.
1405
  @type component: string or None
1406
  @param component: the name of the component of the instance being
1407
      transferred
1408

1409
  """
1410
  # TODO: Use tempfile.mkstemp to create unique filename
1411
  if component:
1412
    assert "/" not in component
1413
    c_msg = "-%s" % component
1414
  else:
1415
    c_msg = ""
1416
  base = ("%s-%s-%s%s-%s.log" %
1417
          (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1418
  return utils.PathJoin(pathutils.LOG_OS_DIR, base)
1419

    
1420

    
1421
def InstanceOsAdd(instance, reinstall, debug):
1422
  """Add an OS to an instance.
1423

1424
  @type instance: L{objects.Instance}
1425
  @param instance: Instance whose OS is to be installed
1426
  @type reinstall: boolean
1427
  @param reinstall: whether this is an instance reinstall
1428
  @type debug: integer
1429
  @param debug: debug level, passed to the OS scripts
1430
  @rtype: None
1431

1432
  """
1433
  inst_os = OSFromDisk(instance.os)
1434

    
1435
  create_env = OSEnvironment(instance, inst_os, debug)
1436
  if reinstall:
1437
    create_env["INSTANCE_REINSTALL"] = "1"
1438

    
1439
  logfile = _InstanceLogName("add", instance.os, instance.name, None)
1440

    
1441
  result = utils.RunCmd([inst_os.create_script], env=create_env,
1442
                        cwd=inst_os.path, output=logfile, reset_env=True)
1443
  if result.failed:
1444
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
1445
                  " output: %s", result.cmd, result.fail_reason, logfile,
1446
                  result.output)
1447
    lines = [utils.SafeEncode(val)
1448
             for val in utils.TailFile(logfile, lines=20)]
1449
    _Fail("OS create script failed (%s), last lines in the"
1450
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1451

    
1452

    
1453
def RunRenameInstance(instance, old_name, debug):
1454
  """Run the OS rename script for an instance.
1455

1456
  @type instance: L{objects.Instance}
1457
  @param instance: Instance whose OS is to be installed
1458
  @type old_name: string
1459
  @param old_name: previous instance name
1460
  @type debug: integer
1461
  @param debug: debug level, passed to the OS scripts
1462
  @rtype: boolean
1463
  @return: the success of the operation
1464

1465
  """
1466
  inst_os = OSFromDisk(instance.os)
1467

    
1468
  rename_env = OSEnvironment(instance, inst_os, debug)
1469
  rename_env["OLD_INSTANCE_NAME"] = old_name
1470

    
1471
  logfile = _InstanceLogName("rename", instance.os,
1472
                             "%s-%s" % (old_name, instance.name), None)
1473

    
1474
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1475
                        cwd=inst_os.path, output=logfile, reset_env=True)
1476

    
1477
  if result.failed:
1478
    logging.error("os create command '%s' returned error: %s output: %s",
1479
                  result.cmd, result.fail_reason, result.output)
1480
    lines = [utils.SafeEncode(val)
1481
             for val in utils.TailFile(logfile, lines=20)]
1482
    _Fail("OS rename script failed (%s), last lines in the"
1483
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1484

    
1485

    
1486
def _GetBlockDevSymlinkPath(instance_name, idx, _dir=None):
1487
  """Returns symlink path for block device.
1488

1489
  """
1490
  if _dir is None:
1491
    _dir = pathutils.DISK_LINKS_DIR
1492

    
1493
  return utils.PathJoin(_dir,
1494
                        ("%s%s%s" %
1495
                         (instance_name, constants.DISK_SEPARATOR, idx)))
1496

    
1497

    
1498
def _SymlinkBlockDev(instance_name, device_path, idx):
1499
  """Set up symlinks to a instance's block device.
1500

1501
  This is an auxiliary function run when an instance is start (on the primary
1502
  node) or when an instance is migrated (on the target node).
1503

1504

1505
  @param instance_name: the name of the target instance
1506
  @param device_path: path of the physical block device, on the node
1507
  @param idx: the disk index
1508
  @return: absolute path to the disk's symlink
1509

1510
  """
1511
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1512
  try:
1513
    os.symlink(device_path, link_name)
1514
  except OSError, err:
1515
    if err.errno == errno.EEXIST:
1516
      if (not os.path.islink(link_name) or
1517
          os.readlink(link_name) != device_path):
1518
        os.remove(link_name)
1519
        os.symlink(device_path, link_name)
1520
    else:
1521
      raise
1522

    
1523
  return link_name
1524

    
1525

    
1526
def _RemoveBlockDevLinks(instance_name, disks):
1527
  """Remove the block device symlinks belonging to the given instance.
1528

1529
  """
1530
  for idx, _ in enumerate(disks):
1531
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1532
    if os.path.islink(link_name):
1533
      try:
1534
        os.remove(link_name)
1535
      except OSError:
1536
        logging.exception("Can't remove symlink '%s'", link_name)
1537

    
1538

    
1539
def _GatherAndLinkBlockDevs(instance):
1540
  """Set up an instance's block device(s).
1541

1542
  This is run on the primary node at instance startup. The block
1543
  devices must be already assembled.
1544

1545
  @type instance: L{objects.Instance}
1546
  @param instance: the instance whose disks we shoul assemble
1547
  @rtype: list
1548
  @return: list of (disk_object, device_path)
1549

1550
  """
1551
  block_devices = []
1552
  for idx, disk in enumerate(instance.disks):
1553
    device = _RecursiveFindBD(disk)
1554
    if device is None:
1555
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1556
                                    str(disk))
1557
    device.Open()
1558
    try:
1559
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1560
    except OSError, e:
1561
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1562
                                    e.strerror)
1563

    
1564
    block_devices.append((disk, link_name))
1565

    
1566
  return block_devices
1567

    
1568

    
1569
def StartInstance(instance, startup_paused, reason, store_reason=True):
1570
  """Start an instance.
1571

1572
  @type instance: L{objects.Instance}
1573
  @param instance: the instance object
1574
  @type startup_paused: bool
1575
  @param instance: pause instance at startup?
1576
  @type reason: list of reasons
1577
  @param reason: the reason trail for this startup
1578
  @type store_reason: boolean
1579
  @param store_reason: whether to store the shutdown reason trail on file
1580
  @rtype: None
1581

1582
  """
1583
  running_instances = GetInstanceListForHypervisor(instance.hypervisor,
1584
                                                   instance.hvparams)
1585

    
1586
  if instance.name in running_instances:
1587
    logging.info("Instance %s already running, not starting", instance.name)
1588
    return
1589

    
1590
  try:
1591
    block_devices = _GatherAndLinkBlockDevs(instance)
1592
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1593
    hyper.StartInstance(instance, block_devices, startup_paused)
1594
    if store_reason:
1595
      _StoreInstReasonTrail(instance.name, reason)
1596
  except errors.BlockDeviceError, err:
1597
    _Fail("Block device error: %s", err, exc=True)
1598
  except errors.HypervisorError, err:
1599
    _RemoveBlockDevLinks(instance.name, instance.disks)
1600
    _Fail("Hypervisor error: %s", err, exc=True)
1601

    
1602

    
1603
def InstanceShutdown(instance, timeout, reason, store_reason=True):
1604
  """Shut an instance down.
1605

1606
  @note: this functions uses polling with a hardcoded timeout.
1607

1608
  @type instance: L{objects.Instance}
1609
  @param instance: the instance object
1610
  @type timeout: integer
1611
  @param timeout: maximum timeout for soft shutdown
1612
  @type reason: list of reasons
1613
  @param reason: the reason trail for this shutdown
1614
  @type store_reason: boolean
1615
  @param store_reason: whether to store the shutdown reason trail on file
1616
  @rtype: None
1617

1618
  """
1619
  hv_name = instance.hypervisor
1620
  hyper = hypervisor.GetHypervisor(hv_name)
1621
  iname = instance.name
1622

    
1623
  if instance.name not in hyper.ListInstances(instance.hvparams):
1624
    logging.info("Instance %s not running, doing nothing", iname)
1625
    return
1626

    
1627
  class _TryShutdown:
1628
    def __init__(self):
1629
      self.tried_once = False
1630

    
1631
    def __call__(self):
1632
      if iname not in hyper.ListInstances(instance.hvparams):
1633
        return
1634

    
1635
      try:
1636
        hyper.StopInstance(instance, retry=self.tried_once)
1637
        if store_reason:
1638
          _StoreInstReasonTrail(instance.name, reason)
1639
      except errors.HypervisorError, err:
1640
        if iname not in hyper.ListInstances(instance.hvparams):
1641
          # if the instance is no longer existing, consider this a
1642
          # success and go to cleanup
1643
          return
1644

    
1645
        _Fail("Failed to stop instance %s: %s", iname, err)
1646

    
1647
      self.tried_once = True
1648

    
1649
      raise utils.RetryAgain()
1650

    
1651
  try:
1652
    utils.Retry(_TryShutdown(), 5, timeout)
1653
  except utils.RetryTimeout:
1654
    # the shutdown did not succeed
1655
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1656

    
1657
    try:
1658
      hyper.StopInstance(instance, force=True)
1659
    except errors.HypervisorError, err:
1660
      if iname in hyper.ListInstances(instance.hvparams):
1661
        # only raise an error if the instance still exists, otherwise
1662
        # the error could simply be "instance ... unknown"!
1663
        _Fail("Failed to force stop instance %s: %s", iname, err)
1664

    
1665
    time.sleep(1)
1666

    
1667
    if iname in hyper.ListInstances(instance.hvparams):
1668
      _Fail("Could not shutdown instance %s even by destroy", iname)
1669

    
1670
  try:
1671
    hyper.CleanupInstance(instance.name)
1672
  except errors.HypervisorError, err:
1673
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1674

    
1675
  _RemoveBlockDevLinks(iname, instance.disks)
1676

    
1677

    
1678
def InstanceReboot(instance, reboot_type, shutdown_timeout, reason):
1679
  """Reboot an instance.
1680

1681
  @type instance: L{objects.Instance}
1682
  @param instance: the instance object to reboot
1683
  @type reboot_type: str
1684
  @param reboot_type: the type of reboot, one the following
1685
    constants:
1686
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1687
        instance OS, do not recreate the VM
1688
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1689
        restart the VM (at the hypervisor level)
1690
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1691
        not accepted here, since that mode is handled differently, in
1692
        cmdlib, and translates into full stop and start of the
1693
        instance (instead of a call_instance_reboot RPC)
1694
  @type shutdown_timeout: integer
1695
  @param shutdown_timeout: maximum timeout for soft shutdown
1696
  @type reason: list of reasons
1697
  @param reason: the reason trail for this reboot
1698
  @rtype: None
1699

1700
  """
1701
  running_instances = GetInstanceListForHypervisor(instance.hypervisor,
1702
                                                   instance.hvparams)
1703

    
1704
  if instance.name not in running_instances:
1705
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1706

    
1707
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1708
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1709
    try:
1710
      hyper.RebootInstance(instance)
1711
    except errors.HypervisorError, err:
1712
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1713
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1714
    try:
1715
      InstanceShutdown(instance, shutdown_timeout, reason, store_reason=False)
1716
      result = StartInstance(instance, False, reason, store_reason=False)
1717
      _StoreInstReasonTrail(instance.name, reason)
1718
      return result
1719
    except errors.HypervisorError, err:
1720
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1721
  else:
1722
    _Fail("Invalid reboot_type received: %s", reboot_type)
1723

    
1724

    
1725
def InstanceBalloonMemory(instance, memory):
1726
  """Resize an instance's memory.
1727

1728
  @type instance: L{objects.Instance}
1729
  @param instance: the instance object
1730
  @type memory: int
1731
  @param memory: new memory amount in MB
1732
  @rtype: None
1733

1734
  """
1735
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1736
  running = hyper.ListInstances(instance.hvparams)
1737
  if instance.name not in running:
1738
    logging.info("Instance %s is not running, cannot balloon", instance.name)
1739
    return
1740
  try:
1741
    hyper.BalloonInstanceMemory(instance, memory)
1742
  except errors.HypervisorError, err:
1743
    _Fail("Failed to balloon instance memory: %s", err, exc=True)
1744

    
1745

    
1746
def MigrationInfo(instance):
1747
  """Gather information about an instance to be migrated.
1748

1749
  @type instance: L{objects.Instance}
1750
  @param instance: the instance definition
1751

1752
  """
1753
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1754
  try:
1755
    info = hyper.MigrationInfo(instance)
1756
  except errors.HypervisorError, err:
1757
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1758
  return info
1759

    
1760

    
1761
def AcceptInstance(instance, info, target):
1762
  """Prepare the node to accept an instance.
1763

1764
  @type instance: L{objects.Instance}
1765
  @param instance: the instance definition
1766
  @type info: string/data (opaque)
1767
  @param info: migration information, from the source node
1768
  @type target: string
1769
  @param target: target host (usually ip), on this node
1770

1771
  """
1772
  # TODO: why is this required only for DTS_EXT_MIRROR?
1773
  if instance.disk_template in constants.DTS_EXT_MIRROR:
1774
    # Create the symlinks, as the disks are not active
1775
    # in any way
1776
    try:
1777
      _GatherAndLinkBlockDevs(instance)
1778
    except errors.BlockDeviceError, err:
1779
      _Fail("Block device error: %s", err, exc=True)
1780

    
1781
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1782
  try:
1783
    hyper.AcceptInstance(instance, info, target)
1784
  except errors.HypervisorError, err:
1785
    if instance.disk_template in constants.DTS_EXT_MIRROR:
1786
      _RemoveBlockDevLinks(instance.name, instance.disks)
1787
    _Fail("Failed to accept instance: %s", err, exc=True)
1788

    
1789

    
1790
def FinalizeMigrationDst(instance, info, success):
1791
  """Finalize any preparation to accept an instance.
1792

1793
  @type instance: L{objects.Instance}
1794
  @param instance: the instance definition
1795
  @type info: string/data (opaque)
1796
  @param info: migration information, from the source node
1797
  @type success: boolean
1798
  @param success: whether the migration was a success or a failure
1799

1800
  """
1801
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1802
  try:
1803
    hyper.FinalizeMigrationDst(instance, info, success)
1804
  except errors.HypervisorError, err:
1805
    _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1806

    
1807

    
1808
def MigrateInstance(cluster_name, instance, target, live):
1809
  """Migrates an instance to another node.
1810

1811
  @type cluster_name: string
1812
  @param cluster_name: name of the cluster
1813
  @type instance: L{objects.Instance}
1814
  @param instance: the instance definition
1815
  @type target: string
1816
  @param target: the target node name
1817
  @type live: boolean
1818
  @param live: whether the migration should be done live or not (the
1819
      interpretation of this parameter is left to the hypervisor)
1820
  @raise RPCFail: if migration fails for some reason
1821

1822
  """
1823
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1824

    
1825
  try:
1826
    hyper.MigrateInstance(cluster_name, instance, target, live)
1827
  except errors.HypervisorError, err:
1828
    _Fail("Failed to migrate instance: %s", err, exc=True)
1829

    
1830

    
1831
def FinalizeMigrationSource(instance, success, live):
1832
  """Finalize the instance migration on the source node.
1833

1834
  @type instance: L{objects.Instance}
1835
  @param instance: the instance definition of the migrated instance
1836
  @type success: bool
1837
  @param success: whether the migration succeeded or not
1838
  @type live: bool
1839
  @param live: whether the user requested a live migration or not
1840
  @raise RPCFail: If the execution fails for some reason
1841

1842
  """
1843
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1844

    
1845
  try:
1846
    hyper.FinalizeMigrationSource(instance, success, live)
1847
  except Exception, err:  # pylint: disable=W0703
1848
    _Fail("Failed to finalize the migration on the source node: %s", err,
1849
          exc=True)
1850

    
1851

    
1852
def GetMigrationStatus(instance):
1853
  """Get the migration status
1854

1855
  @type instance: L{objects.Instance}
1856
  @param instance: the instance that is being migrated
1857
  @rtype: L{objects.MigrationStatus}
1858
  @return: the status of the current migration (one of
1859
           L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
1860
           progress info that can be retrieved from the hypervisor
1861
  @raise RPCFail: If the migration status cannot be retrieved
1862

1863
  """
1864
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1865
  try:
1866
    return hyper.GetMigrationStatus(instance)
1867
  except Exception, err:  # pylint: disable=W0703
1868
    _Fail("Failed to get migration status: %s", err, exc=True)
1869

    
1870

    
1871
def BlockdevCreate(disk, size, owner, on_primary, info, excl_stor):
1872
  """Creates a block device for an instance.
1873

1874
  @type disk: L{objects.Disk}
1875
  @param disk: the object describing the disk we should create
1876
  @type size: int
1877
  @param size: the size of the physical underlying device, in MiB
1878
  @type owner: str
1879
  @param owner: the name of the instance for which disk is created,
1880
      used for device cache data
1881
  @type on_primary: boolean
1882
  @param on_primary:  indicates if it is the primary node or not
1883
  @type info: string
1884
  @param info: string that will be sent to the physical device
1885
      creation, used for example to set (LVM) tags on LVs
1886
  @type excl_stor: boolean
1887
  @param excl_stor: Whether exclusive_storage is active
1888

1889
  @return: the new unique_id of the device (this can sometime be
1890
      computed only after creation), or None. On secondary nodes,
1891
      it's not required to return anything.
1892

1893
  """
1894
  # TODO: remove the obsolete "size" argument
1895
  # pylint: disable=W0613
1896
  clist = []
1897
  if disk.children:
1898
    for child in disk.children:
1899
      try:
1900
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1901
      except errors.BlockDeviceError, err:
1902
        _Fail("Can't assemble device %s: %s", child, err)
1903
      if on_primary or disk.AssembleOnSecondary():
1904
        # we need the children open in case the device itself has to
1905
        # be assembled
1906
        try:
1907
          # pylint: disable=E1103
1908
          crdev.Open()
1909
        except errors.BlockDeviceError, err:
1910
          _Fail("Can't make child '%s' read-write: %s", child, err)
1911
      clist.append(crdev)
1912

    
1913
  try:
1914
    device = bdev.Create(disk, clist, excl_stor)
1915
  except errors.BlockDeviceError, err:
1916
    _Fail("Can't create block device: %s", err)
1917

    
1918
  if on_primary or disk.AssembleOnSecondary():
1919
    try:
1920
      device.Assemble()
1921
    except errors.BlockDeviceError, err:
1922
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1923
    if on_primary or disk.OpenOnSecondary():
1924
      try:
1925
        device.Open(force=True)
1926
      except errors.BlockDeviceError, err:
1927
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1928
    DevCacheManager.UpdateCache(device.dev_path, owner,
1929
                                on_primary, disk.iv_name)
1930

    
1931
  device.SetInfo(info)
1932

    
1933
  return device.unique_id
1934

    
1935

    
1936
def _WipeDevice(path, offset, size):
1937
  """This function actually wipes the device.
1938

1939
  @param path: The path to the device to wipe
1940
  @param offset: The offset in MiB in the file
1941
  @param size: The size in MiB to write
1942

1943
  """
1944
  # Internal sizes are always in Mebibytes; if the following "dd" command
1945
  # should use a different block size the offset and size given to this
1946
  # function must be adjusted accordingly before being passed to "dd".
1947
  block_size = 1024 * 1024
1948

    
1949
  cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1950
         "bs=%s" % block_size, "oflag=direct", "of=%s" % path,
1951
         "count=%d" % size]
1952
  result = utils.RunCmd(cmd)
1953

    
1954
  if result.failed:
1955
    _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1956
          result.fail_reason, result.output)
1957

    
1958

    
1959
def BlockdevWipe(disk, offset, size):
1960
  """Wipes a block device.
1961

1962
  @type disk: L{objects.Disk}
1963
  @param disk: the disk object we want to wipe
1964
  @type offset: int
1965
  @param offset: The offset in MiB in the file
1966
  @type size: int
1967
  @param size: The size in MiB to write
1968

1969
  """
1970
  try:
1971
    rdev = _RecursiveFindBD(disk)
1972
  except errors.BlockDeviceError:
1973
    rdev = None
1974

    
1975
  if not rdev:
1976
    _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1977

    
1978
  # Do cross verify some of the parameters
1979
  if offset < 0:
1980
    _Fail("Negative offset")
1981
  if size < 0:
1982
    _Fail("Negative size")
1983
  if offset > rdev.size:
1984
    _Fail("Offset is bigger than device size")
1985
  if (offset + size) > rdev.size:
1986
    _Fail("The provided offset and size to wipe is bigger than device size")
1987

    
1988
  _WipeDevice(rdev.dev_path, offset, size)
1989

    
1990

    
1991
def BlockdevPauseResumeSync(disks, pause):
1992
  """Pause or resume the sync of the block device.
1993

1994
  @type disks: list of L{objects.Disk}
1995
  @param disks: the disks object we want to pause/resume
1996
  @type pause: bool
1997
  @param pause: Wheater to pause or resume
1998

1999
  """
2000
  success = []
2001
  for disk in disks:
2002
    try:
2003
      rdev = _RecursiveFindBD(disk)
2004
    except errors.BlockDeviceError:
2005
      rdev = None
2006

    
2007
    if not rdev:
2008
      success.append((False, ("Cannot change sync for device %s:"
2009
                              " device not found" % disk.iv_name)))
2010
      continue
2011

    
2012
    result = rdev.PauseResumeSync(pause)
2013

    
2014
    if result:
2015
      success.append((result, None))
2016
    else:
2017
      if pause:
2018
        msg = "Pause"
2019
      else:
2020
        msg = "Resume"
2021
      success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
2022

    
2023
  return success
2024

    
2025

    
2026
def BlockdevRemove(disk):
2027
  """Remove a block device.
2028

2029
  @note: This is intended to be called recursively.
2030

2031
  @type disk: L{objects.Disk}
2032
  @param disk: the disk object we should remove
2033
  @rtype: boolean
2034
  @return: the success of the operation
2035

2036
  """
2037
  msgs = []
2038
  try:
2039
    rdev = _RecursiveFindBD(disk)
2040
  except errors.BlockDeviceError, err:
2041
    # probably can't attach
2042
    logging.info("Can't attach to device %s in remove", disk)
2043
    rdev = None
2044
  if rdev is not None:
2045
    r_path = rdev.dev_path
2046
    try:
2047
      rdev.Remove()
2048
    except errors.BlockDeviceError, err:
2049
      msgs.append(str(err))
2050
    if not msgs:
2051
      DevCacheManager.RemoveCache(r_path)
2052

    
2053
  if disk.children:
2054
    for child in disk.children:
2055
      try:
2056
        BlockdevRemove(child)
2057
      except RPCFail, err:
2058
        msgs.append(str(err))
2059

    
2060
  if msgs:
2061
    _Fail("; ".join(msgs))
2062

    
2063

    
2064
def _RecursiveAssembleBD(disk, owner, as_primary):
2065
  """Activate a block device for an instance.
2066

2067
  This is run on the primary and secondary nodes for an instance.
2068

2069
  @note: this function is called recursively.
2070

2071
  @type disk: L{objects.Disk}
2072
  @param disk: the disk we try to assemble
2073
  @type owner: str
2074
  @param owner: the name of the instance which owns the disk
2075
  @type as_primary: boolean
2076
  @param as_primary: if we should make the block device
2077
      read/write
2078

2079
  @return: the assembled device or None (in case no device
2080
      was assembled)
2081
  @raise errors.BlockDeviceError: in case there is an error
2082
      during the activation of the children or the device
2083
      itself
2084

2085
  """
2086
  children = []
2087
  if disk.children:
2088
    mcn = disk.ChildrenNeeded()
2089
    if mcn == -1:
2090
      mcn = 0 # max number of Nones allowed
2091
    else:
2092
      mcn = len(disk.children) - mcn # max number of Nones
2093
    for chld_disk in disk.children:
2094
      try:
2095
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
2096
      except errors.BlockDeviceError, err:
2097
        if children.count(None) >= mcn:
2098
          raise
2099
        cdev = None
2100
        logging.error("Error in child activation (but continuing): %s",
2101
                      str(err))
2102
      children.append(cdev)
2103

    
2104
  if as_primary or disk.AssembleOnSecondary():
2105
    r_dev = bdev.Assemble(disk, children)
2106
    result = r_dev
2107
    if as_primary or disk.OpenOnSecondary():
2108
      r_dev.Open()
2109
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
2110
                                as_primary, disk.iv_name)
2111

    
2112
  else:
2113
    result = True
2114
  return result
2115

    
2116

    
2117
def BlockdevAssemble(disk, owner, as_primary, idx):
2118
  """Activate a block device for an instance.
2119

2120
  This is a wrapper over _RecursiveAssembleBD.
2121

2122
  @rtype: str or boolean
2123
  @return: a C{/dev/...} path for primary nodes, and
2124
      C{True} for secondary nodes
2125

2126
  """
2127
  try:
2128
    result = _RecursiveAssembleBD(disk, owner, as_primary)
2129
    if isinstance(result, BlockDev):
2130
      # pylint: disable=E1103
2131
      result = result.dev_path
2132
      if as_primary:
2133
        _SymlinkBlockDev(owner, result, idx)
2134
  except errors.BlockDeviceError, err:
2135
    _Fail("Error while assembling disk: %s", err, exc=True)
2136
  except OSError, err:
2137
    _Fail("Error while symlinking disk: %s", err, exc=True)
2138

    
2139
  return result
2140

    
2141

    
2142
def BlockdevShutdown(disk):
2143
  """Shut down a block device.
2144

2145
  First, if the device is assembled (Attach() is successful), then
2146
  the device is shutdown. Then the children of the device are
2147
  shutdown.
2148

2149
  This function is called recursively. Note that we don't cache the
2150
  children or such, as oppossed to assemble, shutdown of different
2151
  devices doesn't require that the upper device was active.
2152

2153
  @type disk: L{objects.Disk}
2154
  @param disk: the description of the disk we should
2155
      shutdown
2156
  @rtype: None
2157

2158
  """
2159
  msgs = []
2160
  r_dev = _RecursiveFindBD(disk)
2161
  if r_dev is not None:
2162
    r_path = r_dev.dev_path
2163
    try:
2164
      r_dev.Shutdown()
2165
      DevCacheManager.RemoveCache(r_path)
2166
    except errors.BlockDeviceError, err:
2167
      msgs.append(str(err))
2168

    
2169
  if disk.children:
2170
    for child in disk.children:
2171
      try:
2172
        BlockdevShutdown(child)
2173
      except RPCFail, err:
2174
        msgs.append(str(err))
2175

    
2176
  if msgs:
2177
    _Fail("; ".join(msgs))
2178

    
2179

    
2180
def BlockdevAddchildren(parent_cdev, new_cdevs):
2181
  """Extend a mirrored block device.
2182

2183
  @type parent_cdev: L{objects.Disk}
2184
  @param parent_cdev: the disk to which we should add children
2185
  @type new_cdevs: list of L{objects.Disk}
2186
  @param new_cdevs: the list of children which we should add
2187
  @rtype: None
2188

2189
  """
2190
  parent_bdev = _RecursiveFindBD(parent_cdev)
2191
  if parent_bdev is None:
2192
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
2193
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
2194
  if new_bdevs.count(None) > 0:
2195
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
2196
  parent_bdev.AddChildren(new_bdevs)
2197

    
2198

    
2199
def BlockdevRemovechildren(parent_cdev, new_cdevs):
2200
  """Shrink a mirrored block device.
2201

2202
  @type parent_cdev: L{objects.Disk}
2203
  @param parent_cdev: the disk from which we should remove children
2204
  @type new_cdevs: list of L{objects.Disk}
2205
  @param new_cdevs: the list of children which we should remove
2206
  @rtype: None
2207

2208
  """
2209
  parent_bdev = _RecursiveFindBD(parent_cdev)
2210
  if parent_bdev is None:
2211
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
2212
  devs = []
2213
  for disk in new_cdevs:
2214
    rpath = disk.StaticDevPath()
2215
    if rpath is None:
2216
      bd = _RecursiveFindBD(disk)
2217
      if bd is None:
2218
        _Fail("Can't find device %s while removing children", disk)
2219
      else:
2220
        devs.append(bd.dev_path)
2221
    else:
2222
      if not utils.IsNormAbsPath(rpath):
2223
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
2224
      devs.append(rpath)
2225
  parent_bdev.RemoveChildren(devs)
2226

    
2227

    
2228
def BlockdevGetmirrorstatus(disks):
2229
  """Get the mirroring status of a list of devices.
2230

2231
  @type disks: list of L{objects.Disk}
2232
  @param disks: the list of disks which we should query
2233
  @rtype: disk
2234
  @return: List of L{objects.BlockDevStatus}, one for each disk
2235
  @raise errors.BlockDeviceError: if any of the disks cannot be
2236
      found
2237

2238
  """
2239
  stats = []
2240
  for dsk in disks:
2241
    rbd = _RecursiveFindBD(dsk)
2242
    if rbd is None:
2243
      _Fail("Can't find device %s", dsk)
2244

    
2245
    stats.append(rbd.CombinedSyncStatus())
2246

    
2247
  return stats
2248

    
2249

    
2250
def BlockdevGetmirrorstatusMulti(disks):
2251
  """Get the mirroring status of a list of devices.
2252

2253
  @type disks: list of L{objects.Disk}
2254
  @param disks: the list of disks which we should query
2255
  @rtype: disk
2256
  @return: List of tuples, (bool, status), one for each disk; bool denotes
2257
    success/failure, status is L{objects.BlockDevStatus} on success, string
2258
    otherwise
2259

2260
  """
2261
  result = []
2262
  for disk in disks:
2263
    try:
2264
      rbd = _RecursiveFindBD(disk)
2265
      if rbd is None:
2266
        result.append((False, "Can't find device %s" % disk))
2267
        continue
2268

    
2269
      status = rbd.CombinedSyncStatus()
2270
    except errors.BlockDeviceError, err:
2271
      logging.exception("Error while getting disk status")
2272
      result.append((False, str(err)))
2273
    else:
2274
      result.append((True, status))
2275

    
2276
  assert len(disks) == len(result)
2277

    
2278
  return result
2279

    
2280

    
2281
def _RecursiveFindBD(disk):
2282
  """Check if a device is activated.
2283

2284
  If so, return information about the real device.
2285

2286
  @type disk: L{objects.Disk}
2287
  @param disk: the disk object we need to find
2288

2289
  @return: None if the device can't be found,
2290
      otherwise the device instance
2291

2292
  """
2293
  children = []
2294
  if disk.children:
2295
    for chdisk in disk.children:
2296
      children.append(_RecursiveFindBD(chdisk))
2297

    
2298
  return bdev.FindDevice(disk, children)
2299

    
2300

    
2301
def _OpenRealBD(disk):
2302
  """Opens the underlying block device of a disk.
2303

2304
  @type disk: L{objects.Disk}
2305
  @param disk: the disk object we want to open
2306

2307
  """
2308
  real_disk = _RecursiveFindBD(disk)
2309
  if real_disk is None:
2310
    _Fail("Block device '%s' is not set up", disk)
2311

    
2312
  real_disk.Open()
2313

    
2314
  return real_disk
2315

    
2316

    
2317
def BlockdevFind(disk):
2318
  """Check if a device is activated.
2319

2320
  If it is, return information about the real device.
2321

2322
  @type disk: L{objects.Disk}
2323
  @param disk: the disk to find
2324
  @rtype: None or objects.BlockDevStatus
2325
  @return: None if the disk cannot be found, otherwise a the current
2326
           information
2327

2328
  """
2329
  try:
2330
    rbd = _RecursiveFindBD(disk)
2331
  except errors.BlockDeviceError, err:
2332
    _Fail("Failed to find device: %s", err, exc=True)
2333

    
2334
  if rbd is None:
2335
    return None
2336

    
2337
  return rbd.GetSyncStatus()
2338

    
2339

    
2340
def BlockdevGetdimensions(disks):
2341
  """Computes the size of the given disks.
2342

2343
  If a disk is not found, returns None instead.
2344

2345
  @type disks: list of L{objects.Disk}
2346
  @param disks: the list of disk to compute the size for
2347
  @rtype: list
2348
  @return: list with elements None if the disk cannot be found,
2349
      otherwise the pair (size, spindles), where spindles is None if the
2350
      device doesn't support that
2351

2352
  """
2353
  result = []
2354
  for cf in disks:
2355
    try:
2356
      rbd = _RecursiveFindBD(cf)
2357
    except errors.BlockDeviceError:
2358
      result.append(None)
2359
      continue
2360
    if rbd is None:
2361
      result.append(None)
2362
    else:
2363
      result.append(rbd.GetActualDimensions())
2364
  return result
2365

    
2366

    
2367
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
2368
  """Export a block device to a remote node.
2369

2370
  @type disk: L{objects.Disk}
2371
  @param disk: the description of the disk to export
2372
  @type dest_node: str
2373
  @param dest_node: the destination node to export to
2374
  @type dest_path: str
2375
  @param dest_path: the destination path on the target node
2376
  @type cluster_name: str
2377
  @param cluster_name: the cluster name, needed for SSH hostalias
2378
  @rtype: None
2379

2380
  """
2381
  real_disk = _OpenRealBD(disk)
2382

    
2383
  # the block size on the read dd is 1MiB to match our units
2384
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
2385
                               "dd if=%s bs=1048576 count=%s",
2386
                               real_disk.dev_path, str(disk.size))
2387

    
2388
  # we set here a smaller block size as, due to ssh buffering, more
2389
  # than 64-128k will mostly ignored; we use nocreat to fail if the
2390
  # device is not already there or we pass a wrong path; we use
2391
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
2392
  # to not buffer too much memory; this means that at best, we flush
2393
  # every 64k, which will not be very fast
2394
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
2395
                                " oflag=dsync", dest_path)
2396

    
2397
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
2398
                                                   constants.SSH_LOGIN_USER,
2399
                                                   destcmd)
2400

    
2401
  # all commands have been checked, so we're safe to combine them
2402
  command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
2403

    
2404
  result = utils.RunCmd(["bash", "-c", command])
2405

    
2406
  if result.failed:
2407
    _Fail("Disk copy command '%s' returned error: %s"
2408
          " output: %s", command, result.fail_reason, result.output)
2409

    
2410

    
2411
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2412
  """Write a file to the filesystem.
2413

2414
  This allows the master to overwrite(!) a file. It will only perform
2415
  the operation if the file belongs to a list of configuration files.
2416

2417
  @type file_name: str
2418
  @param file_name: the target file name
2419
  @type data: str
2420
  @param data: the new contents of the file
2421
  @type mode: int
2422
  @param mode: the mode to give the file (can be None)
2423
  @type uid: string
2424
  @param uid: the owner of the file
2425
  @type gid: string
2426
  @param gid: the group of the file
2427
  @type atime: float
2428
  @param atime: the atime to set on the file (can be None)
2429
  @type mtime: float
2430
  @param mtime: the mtime to set on the file (can be None)
2431
  @rtype: None
2432

2433
  """
2434
  file_name = vcluster.LocalizeVirtualPath(file_name)
2435

    
2436
  if not os.path.isabs(file_name):
2437
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2438

    
2439
  if file_name not in _ALLOWED_UPLOAD_FILES:
2440
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2441
          file_name)
2442

    
2443
  raw_data = _Decompress(data)
2444

    
2445
  if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2446
    _Fail("Invalid username/groupname type")
2447

    
2448
  getents = runtime.GetEnts()
2449
  uid = getents.LookupUser(uid)
2450
  gid = getents.LookupGroup(gid)
2451

    
2452
  utils.SafeWriteFile(file_name, None,
2453
                      data=raw_data, mode=mode, uid=uid, gid=gid,
2454
                      atime=atime, mtime=mtime)
2455

    
2456

    
2457
def RunOob(oob_program, command, node, timeout):
2458
  """Executes oob_program with given command on given node.
2459

2460
  @param oob_program: The path to the executable oob_program
2461
  @param command: The command to invoke on oob_program
2462
  @param node: The node given as an argument to the program
2463
  @param timeout: Timeout after which we kill the oob program
2464

2465
  @return: stdout
2466
  @raise RPCFail: If execution fails for some reason
2467

2468
  """
2469
  result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2470

    
2471
  if result.failed:
2472
    _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2473
          result.fail_reason, result.output)
2474

    
2475
  return result.stdout
2476

    
2477

    
2478
def _OSOndiskAPIVersion(os_dir):
2479
  """Compute and return the API version of a given OS.
2480

2481
  This function will try to read the API version of the OS residing in
2482
  the 'os_dir' directory.
2483

2484
  @type os_dir: str
2485
  @param os_dir: the directory in which we should look for the OS
2486
  @rtype: tuple
2487
  @return: tuple (status, data) with status denoting the validity and
2488
      data holding either the vaid versions or an error message
2489

2490
  """
2491
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2492

    
2493
  try:
2494
    st = os.stat(api_file)
2495
  except EnvironmentError, err:
2496
    return False, ("Required file '%s' not found under path %s: %s" %
2497
                   (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err)))
2498

    
2499
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2500
    return False, ("File '%s' in %s is not a regular file" %
2501
                   (constants.OS_API_FILE, os_dir))
2502

    
2503
  try:
2504
    api_versions = utils.ReadFile(api_file).splitlines()
2505
  except EnvironmentError, err:
2506
    return False, ("Error while reading the API version file at %s: %s" %
2507
                   (api_file, utils.ErrnoOrStr(err)))
2508

    
2509
  try:
2510
    api_versions = [int(version.strip()) for version in api_versions]
2511
  except (TypeError, ValueError), err:
2512
    return False, ("API version(s) can't be converted to integer: %s" %
2513
                   str(err))
2514

    
2515
  return True, api_versions
2516

    
2517

    
2518
def DiagnoseOS(top_dirs=None):
2519
  """Compute the validity for all OSes.
2520

2521
  @type top_dirs: list
2522
  @param top_dirs: the list of directories in which to
2523
      search (if not given defaults to
2524
      L{pathutils.OS_SEARCH_PATH})
2525
  @rtype: list of L{objects.OS}
2526
  @return: a list of tuples (name, path, status, diagnose, variants,
2527
      parameters, api_version) for all (potential) OSes under all
2528
      search paths, where:
2529
          - name is the (potential) OS name
2530
          - path is the full path to the OS
2531
          - status True/False is the validity of the OS
2532
          - diagnose is the error message for an invalid OS, otherwise empty
2533
          - variants is a list of supported OS variants, if any
2534
          - parameters is a list of (name, help) parameters, if any
2535
          - api_version is a list of support OS API versions
2536

2537
  """
2538
  if top_dirs is None:
2539
    top_dirs = pathutils.OS_SEARCH_PATH
2540

    
2541
  result = []
2542
  for dir_name in top_dirs:
2543
    if os.path.isdir(dir_name):
2544
      try:
2545
        f_names = utils.ListVisibleFiles(dir_name)
2546
      except EnvironmentError, err:
2547
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2548
        break
2549
      for name in f_names:
2550
        os_path = utils.PathJoin(dir_name, name)
2551
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2552
        if status:
2553
          diagnose = ""
2554
          variants = os_inst.supported_variants
2555
          parameters = os_inst.supported_parameters
2556
          api_versions = os_inst.api_versions
2557
        else:
2558
          diagnose = os_inst
2559
          variants = parameters = api_versions = []
2560
        result.append((name, os_path, status, diagnose, variants,
2561
                       parameters, api_versions))
2562

    
2563
  return result
2564

    
2565

    
2566
def _TryOSFromDisk(name, base_dir=None):
2567
  """Create an OS instance from disk.
2568

2569
  This function will return an OS instance if the given name is a
2570
  valid OS name.
2571

2572
  @type base_dir: string
2573
  @keyword base_dir: Base directory containing OS installations.
2574
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2575
  @rtype: tuple
2576
  @return: success and either the OS instance if we find a valid one,
2577
      or error message
2578

2579
  """
2580
  if base_dir is None:
2581
    os_dir = utils.FindFile(name, pathutils.OS_SEARCH_PATH, os.path.isdir)
2582
  else:
2583
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2584

    
2585
  if os_dir is None:
2586
    return False, "Directory for OS %s not found in search path" % name
2587

    
2588
  status, api_versions = _OSOndiskAPIVersion(os_dir)
2589
  if not status:
2590
    # push the error up
2591
    return status, api_versions
2592

    
2593
  if not constants.OS_API_VERSIONS.intersection(api_versions):
2594
    return False, ("API version mismatch for path '%s': found %s, want %s." %
2595
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
2596

    
2597
  # OS Files dictionary, we will populate it with the absolute path
2598
  # names; if the value is True, then it is a required file, otherwise
2599
  # an optional one
2600
  os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2601

    
2602
  if max(api_versions) >= constants.OS_API_V15:
2603
    os_files[constants.OS_VARIANTS_FILE] = False
2604

    
2605
  if max(api_versions) >= constants.OS_API_V20:
2606
    os_files[constants.OS_PARAMETERS_FILE] = True
2607
  else:
2608
    del os_files[constants.OS_SCRIPT_VERIFY]
2609

    
2610
  for (filename, required) in os_files.items():
2611
    os_files[filename] = utils.PathJoin(os_dir, filename)
2612

    
2613
    try:
2614
      st = os.stat(os_files[filename])
2615
    except EnvironmentError, err:
2616
      if err.errno == errno.ENOENT and not required:
2617
        del os_files[filename]
2618
        continue
2619
      return False, ("File '%s' under path '%s' is missing (%s)" %
2620
                     (filename, os_dir, utils.ErrnoOrStr(err)))
2621

    
2622
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2623
      return False, ("File '%s' under path '%s' is not a regular file" %
2624
                     (filename, os_dir))
2625

    
2626
    if filename in constants.OS_SCRIPTS:
2627
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2628
        return False, ("File '%s' under path '%s' is not executable" %
2629
                       (filename, os_dir))
2630

    
2631
  variants = []
2632
  if constants.OS_VARIANTS_FILE in os_files:
2633
    variants_file = os_files[constants.OS_VARIANTS_FILE]
2634
    try:
2635
      variants = \
2636
        utils.FilterEmptyLinesAndComments(utils.ReadFile(variants_file))
2637
    except EnvironmentError, err:
2638
      # we accept missing files, but not other errors
2639
      if err.errno != errno.ENOENT:
2640
        return False, ("Error while reading the OS variants file at %s: %s" %
2641
                       (variants_file, utils.ErrnoOrStr(err)))
2642

    
2643
  parameters = []
2644
  if constants.OS_PARAMETERS_FILE in os_files:
2645
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2646
    try:
2647
      parameters = utils.ReadFile(parameters_file).splitlines()
2648
    except EnvironmentError, err:
2649
      return False, ("Error while reading the OS parameters file at %s: %s" %
2650
                     (parameters_file, utils.ErrnoOrStr(err)))
2651
    parameters = [v.split(None, 1) for v in parameters]
2652

    
2653
  os_obj = objects.OS(name=name, path=os_dir,
2654
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
2655
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
2656
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
2657
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
2658
                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2659
                                                 None),
2660
                      supported_variants=variants,
2661
                      supported_parameters=parameters,
2662
                      api_versions=api_versions)
2663
  return True, os_obj
2664

    
2665

    
2666
def OSFromDisk(name, base_dir=None):
2667
  """Create an OS instance from disk.
2668

2669
  This function will return an OS instance if the given name is a
2670
  valid OS name. Otherwise, it will raise an appropriate
2671
  L{RPCFail} exception, detailing why this is not a valid OS.
2672

2673
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2674
  an exception but returns true/false status data.
2675

2676
  @type base_dir: string
2677
  @keyword base_dir: Base directory containing OS installations.
2678
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2679
  @rtype: L{objects.OS}
2680
  @return: the OS instance if we find a valid one
2681
  @raise RPCFail: if we don't find a valid OS
2682

2683
  """
2684
  name_only = objects.OS.GetName(name)
2685
  status, payload = _TryOSFromDisk(name_only, base_dir)
2686

    
2687
  if not status:
2688
    _Fail(payload)
2689

    
2690
  return payload
2691

    
2692

    
2693
def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2694
  """Calculate the basic environment for an os script.
2695

2696
  @type os_name: str
2697
  @param os_name: full operating system name (including variant)
2698
  @type inst_os: L{objects.OS}
2699
  @param inst_os: operating system for which the environment is being built
2700
  @type os_params: dict
2701
  @param os_params: the OS parameters
2702
  @type debug: integer
2703
  @param debug: debug level (0 or 1, for OS Api 10)
2704
  @rtype: dict
2705
  @return: dict of environment variables
2706
  @raise errors.BlockDeviceError: if the block device
2707
      cannot be found
2708

2709
  """
2710
  result = {}
2711
  api_version = \
2712
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2713
  result["OS_API_VERSION"] = "%d" % api_version
2714
  result["OS_NAME"] = inst_os.name
2715
  result["DEBUG_LEVEL"] = "%d" % debug
2716

    
2717
  # OS variants
2718
  if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2719
    variant = objects.OS.GetVariant(os_name)
2720
    if not variant:
2721
      variant = inst_os.supported_variants[0]
2722
  else:
2723
    variant = ""
2724
  result["OS_VARIANT"] = variant
2725

    
2726
  # OS params
2727
  for pname, pvalue in os_params.items():
2728
    result["OSP_%s" % pname.upper()] = pvalue
2729

    
2730
  # Set a default path otherwise programs called by OS scripts (or
2731
  # even hooks called from OS scripts) might break, and we don't want
2732
  # to have each script require setting a PATH variable
2733
  result["PATH"] = constants.HOOKS_PATH
2734

    
2735
  return result
2736

    
2737

    
2738
def OSEnvironment(instance, inst_os, debug=0):
2739
  """Calculate the environment for an os script.
2740

2741
  @type instance: L{objects.Instance}
2742
  @param instance: target instance for the os script run
2743
  @type inst_os: L{objects.OS}
2744
  @param inst_os: operating system for which the environment is being built
2745
  @type debug: integer
2746
  @param debug: debug level (0 or 1, for OS Api 10)
2747
  @rtype: dict
2748
  @return: dict of environment variables
2749
  @raise errors.BlockDeviceError: if the block device
2750
      cannot be found
2751

2752
  """
2753
  result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2754

    
2755
  for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2756
    result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2757

    
2758
  result["HYPERVISOR"] = instance.hypervisor
2759
  result["DISK_COUNT"] = "%d" % len(instance.disks)
2760
  result["NIC_COUNT"] = "%d" % len(instance.nics)
2761
  result["INSTANCE_SECONDARY_NODES"] = \
2762
      ("%s" % " ".join(instance.secondary_nodes))
2763

    
2764
  # Disks
2765
  for idx, disk in enumerate(instance.disks):
2766
    real_disk = _OpenRealBD(disk)
2767
    result["DISK_%d_PATH" % idx] = real_disk.dev_path
2768
    result["DISK_%d_ACCESS" % idx] = disk.mode
2769
    result["DISK_%d_UUID" % idx] = disk.uuid
2770
    if disk.name:
2771
      result["DISK_%d_NAME" % idx] = disk.name
2772
    if constants.HV_DISK_TYPE in instance.hvparams:
2773
      result["DISK_%d_FRONTEND_TYPE" % idx] = \
2774
        instance.hvparams[constants.HV_DISK_TYPE]
2775
    if disk.dev_type in constants.LDS_BLOCK:
2776
      result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2777
    elif disk.dev_type == constants.LD_FILE:
2778
      result["DISK_%d_BACKEND_TYPE" % idx] = \
2779
        "file:%s" % disk.physical_id[0]
2780

    
2781
  # NICs
2782
  for idx, nic in enumerate(instance.nics):
2783
    result["NIC_%d_MAC" % idx] = nic.mac
2784
    result["NIC_%d_UUID" % idx] = nic.uuid
2785
    if nic.name:
2786
      result["NIC_%d_NAME" % idx] = nic.name
2787
    if nic.ip:
2788
      result["NIC_%d_IP" % idx] = nic.ip
2789
    result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2790
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2791
      result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2792
    if nic.nicparams[constants.NIC_LINK]:
2793
      result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2794
    if nic.netinfo:
2795
      nobj = objects.Network.FromDict(nic.netinfo)
2796
      result.update(nobj.HooksDict("NIC_%d_" % idx))
2797
    if constants.HV_NIC_TYPE in instance.hvparams:
2798
      result["NIC_%d_FRONTEND_TYPE" % idx] = \
2799
        instance.hvparams[constants.HV_NIC_TYPE]
2800

    
2801
  # HV/BE params
2802
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2803
    for key, value in source.items():
2804
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2805

    
2806
  return result
2807

    
2808

    
2809
def DiagnoseExtStorage(top_dirs=None):
2810
  """Compute the validity for all ExtStorage Providers.
2811

2812
  @type top_dirs: list
2813
  @param top_dirs: the list of directories in which to
2814
      search (if not given defaults to
2815
      L{pathutils.ES_SEARCH_PATH})
2816
  @rtype: list of L{objects.ExtStorage}
2817
  @return: a list of tuples (name, path, status, diagnose, parameters)
2818
      for all (potential) ExtStorage Providers under all
2819
      search paths, where:
2820
          - name is the (potential) ExtStorage Provider
2821
          - path is the full path to the ExtStorage Provider
2822
          - status True/False is the validity of the ExtStorage Provider
2823
          - diagnose is the error message for an invalid ExtStorage Provider,
2824
            otherwise empty
2825
          - parameters is a list of (name, help) parameters, if any
2826

2827
  """
2828
  if top_dirs is None:
2829
    top_dirs = pathutils.ES_SEARCH_PATH
2830

    
2831
  result = []
2832
  for dir_name in top_dirs:
2833
    if os.path.isdir(dir_name):
2834
      try:
2835
        f_names = utils.ListVisibleFiles(dir_name)
2836
      except EnvironmentError, err:
2837
        logging.exception("Can't list the ExtStorage directory %s: %s",
2838
                          dir_name, err)
2839
        break
2840
      for name in f_names:
2841
        es_path = utils.PathJoin(dir_name, name)
2842
        status, es_inst = bdev.ExtStorageFromDisk(name, base_dir=dir_name)
2843
        if status:
2844
          diagnose = ""
2845
          parameters = es_inst.supported_parameters
2846
        else:
2847
          diagnose = es_inst
2848
          parameters = []
2849
        result.append((name, es_path, status, diagnose, parameters))
2850

    
2851
  return result
2852

    
2853

    
2854
def BlockdevGrow(disk, amount, dryrun, backingstore, excl_stor):
2855
  """Grow a stack of block devices.
2856

2857
  This function is called recursively, with the childrens being the
2858
  first ones to resize.
2859

2860
  @type disk: L{objects.Disk}
2861
  @param disk: the disk to be grown
2862
  @type amount: integer
2863
  @param amount: the amount (in mebibytes) to grow with
2864
  @type dryrun: boolean
2865
  @param dryrun: whether to execute the operation in simulation mode
2866
      only, without actually increasing the size
2867
  @param backingstore: whether to execute the operation on backing storage
2868
      only, or on "logical" storage only; e.g. DRBD is logical storage,
2869
      whereas LVM, file, RBD are backing storage
2870
  @rtype: (status, result)
2871
  @type excl_stor: boolean
2872
  @param excl_stor: Whether exclusive_storage is active
2873
  @return: a tuple with the status of the operation (True/False), and
2874
      the errors message if status is False
2875

2876
  """
2877
  r_dev = _RecursiveFindBD(disk)
2878
  if r_dev is None:
2879
    _Fail("Cannot find block device %s", disk)
2880

    
2881
  try:
2882
    r_dev.Grow(amount, dryrun, backingstore, excl_stor)
2883
  except errors.BlockDeviceError, err:
2884
    _Fail("Failed to grow block device: %s", err, exc=True)
2885

    
2886

    
2887
def BlockdevSnapshot(disk):
2888
  """Create a snapshot copy of a block device.
2889

2890
  This function is called recursively, and the snapshot is actually created
2891
  just for the leaf lvm backend device.
2892

2893
  @type disk: L{objects.Disk}
2894
  @param disk: the disk to be snapshotted
2895
  @rtype: string
2896
  @return: snapshot disk ID as (vg, lv)
2897

2898
  """
2899
  if disk.dev_type == constants.LD_DRBD8:
2900
    if not disk.children:
2901
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2902
            disk.unique_id)
2903
    return BlockdevSnapshot(disk.children[0])
2904
  elif disk.dev_type == constants.LD_LV:
2905
    r_dev = _RecursiveFindBD(disk)
2906
    if r_dev is not None:
2907
      # FIXME: choose a saner value for the snapshot size
2908
      # let's stay on the safe side and ask for the full size, for now
2909
      return r_dev.Snapshot(disk.size)
2910
    else:
2911
      _Fail("Cannot find block device %s", disk)
2912
  else:
2913
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2914
          disk.unique_id, disk.dev_type)
2915

    
2916

    
2917
def BlockdevSetInfo(disk, info):
2918
  """Sets 'metadata' information on block devices.
2919

2920
  This function sets 'info' metadata on block devices. Initial
2921
  information is set at device creation; this function should be used
2922
  for example after renames.
2923

2924
  @type disk: L{objects.Disk}
2925
  @param disk: the disk to be grown
2926
  @type info: string
2927
  @param info: new 'info' metadata
2928
  @rtype: (status, result)
2929
  @return: a tuple with the status of the operation (True/False), and
2930
      the errors message if status is False
2931

2932
  """
2933
  r_dev = _RecursiveFindBD(disk)
2934
  if r_dev is None:
2935
    _Fail("Cannot find block device %s", disk)
2936

    
2937
  try:
2938
    r_dev.SetInfo(info)
2939
  except errors.BlockDeviceError, err:
2940
    _Fail("Failed to set information on block device: %s", err, exc=True)
2941

    
2942

    
2943
def FinalizeExport(instance, snap_disks):
2944
  """Write out the export configuration information.
2945

2946
  @type instance: L{objects.Instance}
2947
  @param instance: the instance which we export, used for
2948
      saving configuration
2949
  @type snap_disks: list of L{objects.Disk}
2950
  @param snap_disks: list of snapshot block devices, which
2951
      will be used to get the actual name of the dump file
2952

2953
  @rtype: None
2954

2955
  """
2956
  destdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name + ".new")
2957
  finaldestdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name)
2958

    
2959
  config = objects.SerializableConfigParser()
2960

    
2961
  config.add_section(constants.INISECT_EXP)
2962
  config.set(constants.INISECT_EXP, "version", "0")
2963
  config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2964
  config.set(constants.INISECT_EXP, "source", instance.primary_node)
2965
  config.set(constants.INISECT_EXP, "os", instance.os)
2966
  config.set(constants.INISECT_EXP, "compression", "none")
2967

    
2968
  config.add_section(constants.INISECT_INS)
2969
  config.set(constants.INISECT_INS, "name", instance.name)
2970
  config.set(constants.INISECT_INS, "maxmem", "%d" %
2971
             instance.beparams[constants.BE_MAXMEM])
2972
  config.set(constants.INISECT_INS, "minmem", "%d" %
2973
             instance.beparams[constants.BE_MINMEM])
2974
  # "memory" is deprecated, but useful for exporting to old ganeti versions
2975
  config.set(constants.INISECT_INS, "memory", "%d" %
2976
             instance.beparams[constants.BE_MAXMEM])
2977
  config.set(constants.INISECT_INS, "vcpus", "%d" %
2978
             instance.beparams[constants.BE_VCPUS])
2979
  config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2980
  config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2981
  config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2982

    
2983
  nic_total = 0
2984
  for nic_count, nic in enumerate(instance.nics):
2985
    nic_total += 1
2986
    config.set(constants.INISECT_INS, "nic%d_mac" %
2987
               nic_count, "%s" % nic.mac)
2988
    config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2989
    config.set(constants.INISECT_INS, "nic%d_network" % nic_count,
2990
               "%s" % nic.network)
2991
    for param in constants.NICS_PARAMETER_TYPES:
2992
      config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2993
                 "%s" % nic.nicparams.get(param, None))
2994
  # TODO: redundant: on load can read nics until it doesn't exist
2995
  config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2996

    
2997
  disk_total = 0
2998
  for disk_count, disk in enumerate(snap_disks):
2999
    if disk:
3000
      disk_total += 1
3001
      config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
3002
                 ("%s" % disk.iv_name))
3003
      config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
3004
                 ("%s" % disk.physical_id[1]))
3005
      config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
3006
                 ("%d" % disk.size))
3007

    
3008
  config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
3009

    
3010
  # New-style hypervisor/backend parameters
3011

    
3012
  config.add_section(constants.INISECT_HYP)
3013
  for name, value in instance.hvparams.items():
3014
    if name not in constants.HVC_GLOBALS:
3015
      config.set(constants.INISECT_HYP, name, str(value))
3016

    
3017
  config.add_section(constants.INISECT_BEP)
3018
  for name, value in instance.beparams.items():
3019
    config.set(constants.INISECT_BEP, name, str(value))
3020

    
3021
  config.add_section(constants.INISECT_OSP)
3022
  for name, value in instance.osparams.items():
3023
    config.set(constants.INISECT_OSP, name, str(value))
3024

    
3025
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
3026
                  data=config.Dumps())
3027
  shutil.rmtree(finaldestdir, ignore_errors=True)
3028
  shutil.move(destdir, finaldestdir)
3029

    
3030

    
3031
def ExportInfo(dest):
3032
  """Get export configuration information.
3033

3034
  @type dest: str
3035
  @param dest: directory containing the export
3036

3037
  @rtype: L{objects.SerializableConfigParser}
3038
  @return: a serializable config file containing the
3039
      export info
3040

3041
  """
3042
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
3043

    
3044
  config = objects.SerializableConfigParser()
3045
  config.read(cff)
3046

    
3047
  if (not config.has_section(constants.INISECT_EXP) or
3048
      not config.has_section(constants.INISECT_INS)):
3049
    _Fail("Export info file doesn't have the required fields")
3050

    
3051
  return config.Dumps()
3052

    
3053

    
3054
def ListExports():
3055
  """Return a list of exports currently available on this machine.
3056

3057
  @rtype: list
3058
  @return: list of the exports
3059

3060
  """
3061
  if os.path.isdir(pathutils.EXPORT_DIR):
3062
    return sorted(utils.ListVisibleFiles(pathutils.EXPORT_DIR))
3063
  else:
3064
    _Fail("No exports directory")
3065

    
3066

    
3067
def RemoveExport(export):
3068
  """Remove an existing export from the node.
3069

3070
  @type export: str
3071
  @param export: the name of the export to remove
3072
  @rtype: None
3073

3074
  """
3075
  target = utils.PathJoin(pathutils.EXPORT_DIR, export)
3076

    
3077
  try:
3078
    shutil.rmtree(target)
3079
  except EnvironmentError, err:
3080
    _Fail("Error while removing the export: %s", err, exc=True)
3081

    
3082

    
3083
def BlockdevRename(devlist):
3084
  """Rename a list of block devices.
3085

3086
  @type devlist: list of tuples
3087
  @param devlist: list of tuples of the form  (disk,
3088
      new_logical_id, new_physical_id); disk is an
3089
      L{objects.Disk} object describing the current disk,
3090
      and new logical_id/physical_id is the name we
3091
      rename it to
3092
  @rtype: boolean
3093
  @return: True if all renames succeeded, False otherwise
3094

3095
  """
3096
  msgs = []
3097
  result = True
3098
  for disk, unique_id in devlist:
3099
    dev = _RecursiveFindBD(disk)
3100
    if dev is None:
3101
      msgs.append("Can't find device %s in rename" % str(disk))
3102
      result = False
3103
      continue
3104
    try:
3105
      old_rpath = dev.dev_path
3106
      dev.Rename(unique_id)
3107
      new_rpath = dev.dev_path
3108
      if old_rpath != new_rpath:
3109
        DevCacheManager.RemoveCache(old_rpath)
3110
        # FIXME: we should add the new cache information here, like:
3111
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
3112
        # but we don't have the owner here - maybe parse from existing
3113
        # cache? for now, we only lose lvm data when we rename, which
3114
        # is less critical than DRBD or MD
3115
    except errors.BlockDeviceError, err:
3116
      msgs.append("Can't rename device '%s' to '%s': %s" %
3117
                  (dev, unique_id, err))
3118
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
3119
      result = False
3120
  if not result:
3121
    _Fail("; ".join(msgs))
3122

    
3123

    
3124
def _TransformFileStorageDir(fs_dir):
3125
  """Checks whether given file_storage_dir is valid.
3126

3127
  Checks wheter the given fs_dir is within the cluster-wide default
3128
  file_storage_dir or the shared_file_storage_dir, which are stored in
3129
  SimpleStore. Only paths under those directories are allowed.
3130

3131
  @type fs_dir: str
3132
  @param fs_dir: the path to check
3133

3134
  @return: the normalized path if valid, None otherwise
3135

3136
  """
3137
  if not (constants.ENABLE_FILE_STORAGE or
3138
          constants.ENABLE_SHARED_FILE_STORAGE):
3139
    _Fail("File storage disabled at configure time")
3140

    
3141
  bdev.CheckFileStoragePath(fs_dir)
3142

    
3143
  return os.path.normpath(fs_dir)
3144

    
3145

    
3146
def CreateFileStorageDir(file_storage_dir):
3147
  """Create file storage directory.
3148

3149
  @type file_storage_dir: str
3150
  @param file_storage_dir: directory to create
3151

3152
  @rtype: tuple
3153
  @return: tuple with first element a boolean indicating wheter dir
3154
      creation was successful or not
3155

3156
  """
3157
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
3158
  if os.path.exists(file_storage_dir):
3159
    if not os.path.isdir(file_storage_dir):
3160
      _Fail("Specified storage dir '%s' is not a directory",
3161
            file_storage_dir)
3162
  else:
3163
    try:
3164
      os.makedirs(file_storage_dir, 0750)
3165
    except OSError, err:
3166
      _Fail("Cannot create file storage directory '%s': %s",
3167
            file_storage_dir, err, exc=True)
3168

    
3169

    
3170
def RemoveFileStorageDir(file_storage_dir):
3171
  """Remove file storage directory.
3172

3173
  Remove it only if it's empty. If not log an error and return.
3174

3175
  @type file_storage_dir: str
3176
  @param file_storage_dir: the directory we should cleanup
3177
  @rtype: tuple (success,)
3178
  @return: tuple of one element, C{success}, denoting
3179
      whether the operation was successful
3180

3181
  """
3182
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
3183
  if os.path.exists(file_storage_dir):
3184
    if not os.path.isdir(file_storage_dir):
3185
      _Fail("Specified Storage directory '%s' is not a directory",
3186
            file_storage_dir)
3187
    # deletes dir only if empty, otherwise we want to fail the rpc call
3188
    try:
3189
      os.rmdir(file_storage_dir)
3190
    except OSError, err:
3191
      _Fail("Cannot remove file storage directory '%s': %s",
3192
            file_storage_dir, err)
3193

    
3194

    
3195
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
3196
  """Rename the file storage directory.
3197

3198
  @type old_file_storage_dir: str
3199
  @param old_file_storage_dir: the current path
3200
  @type new_file_storage_dir: str
3201
  @param new_file_storage_dir: the name we should rename to
3202
  @rtype: tuple (success,)
3203
  @return: tuple of one element, C{success}, denoting
3204
      whether the operation was successful
3205

3206
  """
3207
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
3208
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
3209
  if not os.path.exists(new_file_storage_dir):
3210
    if os.path.isdir(old_file_storage_dir):
3211
      try:
3212
        os.rename(old_file_storage_dir, new_file_storage_dir)
3213
      except OSError, err:
3214
        _Fail("Cannot rename '%s' to '%s': %s",
3215
              old_file_storage_dir, new_file_storage_dir, err)
3216
    else:
3217
      _Fail("Specified storage dir '%s' is not a directory",
3218
            old_file_storage_dir)
3219
  else:
3220
    if os.path.exists(old_file_storage_dir):
3221
      _Fail("Cannot rename '%s' to '%s': both locations exist",
3222
            old_file_storage_dir, new_file_storage_dir)
3223

    
3224

    
3225
def _EnsureJobQueueFile(file_name):
3226
  """Checks whether the given filename is in the queue directory.
3227

3228
  @type file_name: str
3229
  @param file_name: the file name we should check
3230
  @rtype: None
3231
  @raises RPCFail: if the file is not valid
3232

3233
  """
3234
  if not utils.IsBelowDir(pathutils.QUEUE_DIR, file_name):
3235
    _Fail("Passed job queue file '%s' does not belong to"
3236
          " the queue directory '%s'", file_name, pathutils.QUEUE_DIR)
3237

    
3238

    
3239
def JobQueueUpdate(file_name, content):
3240
  """Updates a file in the queue directory.
3241

3242
  This is just a wrapper over L{utils.io.WriteFile}, with proper
3243
  checking.
3244

3245
  @type file_name: str
3246
  @param file_name: the job file name
3247
  @type content: str
3248
  @param content: the new job contents
3249
  @rtype: boolean
3250
  @return: the success of the operation
3251

3252
  """
3253
  file_name = vcluster.LocalizeVirtualPath(file_name)
3254

    
3255
  _EnsureJobQueueFile(file_name)
3256
  getents = runtime.GetEnts()
3257

    
3258
  # Write and replace the file atomically
3259
  utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
3260
                  gid=getents.daemons_gid, mode=constants.JOB_QUEUE_FILES_PERMS)
3261

    
3262

    
3263
def JobQueueRename(old, new):
3264
  """Renames a job queue file.
3265

3266
  This is just a wrapper over os.rename with proper checking.
3267

3268
  @type old: str
3269
  @param old: the old (actual) file name
3270
  @type new: str
3271
  @param new: the desired file name
3272
  @rtype: tuple
3273
  @return: the success of the operation and payload
3274

3275
  """
3276
  old = vcluster.LocalizeVirtualPath(old)
3277
  new = vcluster.LocalizeVirtualPath(new)
3278

    
3279
  _EnsureJobQueueFile(old)
3280
  _EnsureJobQueueFile(new)
3281

    
3282
  getents = runtime.GetEnts()
3283

    
3284
  utils.RenameFile(old, new, mkdir=True, mkdir_mode=0750,
3285
                   dir_uid=getents.masterd_uid, dir_gid=getents.daemons_gid)
3286

    
3287

    
3288
def BlockdevClose(instance_name, disks):
3289
  """Closes the given block devices.
3290

3291
  This means they will be switched to secondary mode (in case of
3292
  DRBD).
3293

3294
  @param instance_name: if the argument is not empty, the symlinks
3295
      of this instance will be removed
3296
  @type disks: list of L{objects.Disk}
3297
  @param disks: the list of disks to be closed
3298
  @rtype: tuple (success, message)
3299
  @return: a tuple of success and message, where success
3300
      indicates the succes of the operation, and message
3301
      which will contain the error details in case we
3302
      failed
3303

3304
  """
3305
  bdevs = []
3306
  for cf in disks:
3307
    rd = _RecursiveFindBD(cf)
3308
    if rd is None:
3309
      _Fail("Can't find device %s", cf)
3310
    bdevs.append(rd)
3311

    
3312
  msg = []
3313
  for rd in bdevs:
3314
    try:
3315
      rd.Close()
3316
    except errors.BlockDeviceError, err:
3317
      msg.append(str(err))
3318
  if msg:
3319
    _Fail("Can't make devices secondary: %s", ",".join(msg))
3320
  else:
3321
    if instance_name:
3322
      _RemoveBlockDevLinks(instance_name, disks)
3323

    
3324

    
3325
def ValidateHVParams(hvname, hvparams):
3326
  """Validates the given hypervisor parameters.
3327

3328
  @type hvname: string
3329
  @param hvname: the hypervisor name
3330
  @type hvparams: dict
3331
  @param hvparams: the hypervisor parameters to be validated
3332
  @rtype: None
3333

3334
  """
3335
  try:
3336
    hv_type = hypervisor.GetHypervisor(hvname)
3337
    hv_type.ValidateParameters(hvparams)
3338
  except errors.HypervisorError, err:
3339
    _Fail(str(err), log=False)
3340

    
3341

    
3342
def _CheckOSPList(os_obj, parameters):
3343
  """Check whether a list of parameters is supported by the OS.
3344

3345
  @type os_obj: L{objects.OS}
3346
  @param os_obj: OS object to check
3347
  @type parameters: list
3348
  @param parameters: the list of parameters to check
3349

3350
  """
3351
  supported = [v[0] for v in os_obj.supported_parameters]
3352
  delta = frozenset(parameters).difference(supported)
3353
  if delta:
3354
    _Fail("The following parameters are not supported"
3355
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
3356

    
3357

    
3358
def ValidateOS(required, osname, checks, osparams):
3359
  """Validate the given OS' parameters.
3360

3361
  @type required: boolean
3362
  @param required: whether absence of the OS should translate into
3363
      failure or not
3364
  @type osname: string
3365
  @param osname: the OS to be validated
3366
  @type checks: list
3367
  @param checks: list of the checks to run (currently only 'parameters')
3368
  @type osparams: dict
3369
  @param osparams: dictionary with OS parameters
3370
  @rtype: boolean
3371
  @return: True if the validation passed, or False if the OS was not
3372
      found and L{required} was false
3373

3374
  """
3375
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
3376
    _Fail("Unknown checks required for OS %s: %s", osname,
3377
          set(checks).difference(constants.OS_VALIDATE_CALLS))
3378

    
3379
  name_only = objects.OS.GetName(osname)
3380
  status, tbv = _TryOSFromDisk(name_only, None)
3381

    
3382
  if not status:
3383
    if required:
3384
      _Fail(tbv)
3385
    else:
3386
      return False
3387

    
3388
  if max(tbv.api_versions) < constants.OS_API_V20:
3389
    return True
3390

    
3391
  if constants.OS_VALIDATE_PARAMETERS in checks:
3392
    _CheckOSPList(tbv, osparams.keys())
3393

    
3394
  validate_env = OSCoreEnv(osname, tbv, osparams)
3395
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
3396
                        cwd=tbv.path, reset_env=True)
3397
  if result.failed:
3398
    logging.error("os validate command '%s' returned error: %s output: %s",
3399
                  result.cmd, result.fail_reason, result.output)
3400
    _Fail("OS validation script failed (%s), output: %s",
3401
          result.fail_reason, result.output, log=False)
3402

    
3403
  return True
3404

    
3405

    
3406
def DemoteFromMC():
3407
  """Demotes the current node from master candidate role.
3408

3409
  """
3410
  # try to ensure we're not the master by mistake
3411
  master, myself = ssconf.GetMasterAndMyself()
3412
  if master == myself:
3413
    _Fail("ssconf status shows I'm the master node, will not demote")
3414

    
3415
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "check", constants.MASTERD])
3416
  if not result.failed:
3417
    _Fail("The master daemon is running, will not demote")
3418

    
3419
  try:
3420
    if os.path.isfile(pathutils.CLUSTER_CONF_FILE):
3421
      utils.CreateBackup(pathutils.CLUSTER_CONF_FILE)
3422
  except EnvironmentError, err:
3423
    if err.errno != errno.ENOENT:
3424
      _Fail("Error while backing up cluster file: %s", err, exc=True)
3425

    
3426
  utils.RemoveFile(pathutils.CLUSTER_CONF_FILE)
3427

    
3428

    
3429
def _GetX509Filenames(cryptodir, name):
3430
  """Returns the full paths for the private key and certificate.
3431

3432
  """
3433
  return (utils.PathJoin(cryptodir, name),
3434
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
3435
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
3436

    
3437

    
3438
def CreateX509Certificate(validity, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3439
  """Creates a new X509 certificate for SSL/TLS.
3440

3441
  @type validity: int
3442
  @param validity: Validity in seconds
3443
  @rtype: tuple; (string, string)
3444
  @return: Certificate name and public part
3445

3446
  """
3447
  (key_pem, cert_pem) = \
3448
    utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
3449
                                     min(validity, _MAX_SSL_CERT_VALIDITY))
3450

    
3451
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
3452
                              prefix="x509-%s-" % utils.TimestampForFilename())
3453
  try:
3454
    name = os.path.basename(cert_dir)
3455
    assert len(name) > 5
3456

    
3457
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3458

    
3459
    utils.WriteFile(key_file, mode=0400, data=key_pem)
3460
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
3461

    
3462
    # Never return private key as it shouldn't leave the node
3463
    return (name, cert_pem)
3464
  except Exception:
3465
    shutil.rmtree(cert_dir, ignore_errors=True)
3466
    raise
3467

    
3468

    
3469
def RemoveX509Certificate(name, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3470
  """Removes a X509 certificate.
3471

3472
  @type name: string
3473
  @param name: Certificate name
3474

3475
  """
3476
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3477

    
3478
  utils.RemoveFile(key_file)
3479
  utils.RemoveFile(cert_file)
3480

    
3481
  try:
3482
    os.rmdir(cert_dir)
3483
  except EnvironmentError, err:
3484
    _Fail("Cannot remove certificate directory '%s': %s",
3485
          cert_dir, err)
3486

    
3487

    
3488
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3489
  """Returns the command for the requested input/output.
3490

3491
  @type instance: L{objects.Instance}
3492
  @param instance: The instance object
3493
  @param mode: Import/export mode
3494
  @param ieio: Input/output type
3495
  @param ieargs: Input/output arguments
3496

3497
  """
3498
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3499

    
3500
  env = None
3501
  prefix = None
3502
  suffix = None
3503
  exp_size = None
3504

    
3505
  if ieio == constants.IEIO_FILE:
3506
    (filename, ) = ieargs
3507

    
3508
    if not utils.IsNormAbsPath(filename):
3509
      _Fail("Path '%s' is not normalized or absolute", filename)
3510

    
3511
    real_filename = os.path.realpath(filename)
3512
    directory = os.path.dirname(real_filename)
3513

    
3514
    if not utils.IsBelowDir(pathutils.EXPORT_DIR, real_filename):
3515
      _Fail("File '%s' is not under exports directory '%s': %s",
3516
            filename, pathutils.EXPORT_DIR, real_filename)
3517

    
3518
    # Create directory
3519
    utils.Makedirs(directory, mode=0750)
3520

    
3521
    quoted_filename = utils.ShellQuote(filename)
3522

    
3523
    if mode == constants.IEM_IMPORT:
3524
      suffix = "> %s" % quoted_filename
3525
    elif mode == constants.IEM_EXPORT:
3526
      suffix = "< %s" % quoted_filename
3527

    
3528
      # Retrieve file size
3529
      try:
3530
        st = os.stat(filename)
3531
      except EnvironmentError, err:
3532
        logging.error("Can't stat(2) %s: %s", filename, err)
3533
      else:
3534
        exp_size = utils.BytesToMebibyte(st.st_size)
3535

    
3536
  elif ieio == constants.IEIO_RAW_DISK:
3537
    (disk, ) = ieargs
3538

    
3539
    real_disk = _OpenRealBD(disk)
3540

    
3541
    if mode == constants.IEM_IMPORT:
3542
      # we set here a smaller block size as, due to transport buffering, more
3543
      # than 64-128k will mostly ignored; we use nocreat to fail if the device
3544
      # is not already there or we pass a wrong path; we use notrunc to no
3545
      # attempt truncate on an LV device; we use oflag=dsync to not buffer too
3546
      # much memory; this means that at best, we flush every 64k, which will
3547
      # not be very fast
3548
      suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
3549
                                    " bs=%s oflag=dsync"),
3550
                                    real_disk.dev_path,
3551
                                    str(64 * 1024))
3552

    
3553
    elif mode == constants.IEM_EXPORT:
3554
      # the block size on the read dd is 1MiB to match our units
3555
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3556
                                   real_disk.dev_path,
3557
                                   str(1024 * 1024), # 1 MB
3558
                                   str(disk.size))
3559
      exp_size = disk.size
3560

    
3561
  elif ieio == constants.IEIO_SCRIPT:
3562
    (disk, disk_index, ) = ieargs
3563

    
3564
    assert isinstance(disk_index, (int, long))
3565

    
3566
    real_disk = _OpenRealBD(disk)
3567

    
3568
    inst_os = OSFromDisk(instance.os)
3569
    env = OSEnvironment(instance, inst_os)
3570

    
3571
    if mode == constants.IEM_IMPORT:
3572
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3573
      env["IMPORT_INDEX"] = str(disk_index)
3574
      script = inst_os.import_script
3575

    
3576
    elif mode == constants.IEM_EXPORT:
3577
      env["EXPORT_DEVICE"] = real_disk.dev_path
3578
      env["EXPORT_INDEX"] = str(disk_index)
3579
      script = inst_os.export_script
3580

    
3581
    # TODO: Pass special environment only to script
3582
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3583

    
3584
    if mode == constants.IEM_IMPORT:
3585
      suffix = "| %s" % script_cmd
3586

    
3587
    elif mode == constants.IEM_EXPORT:
3588
      prefix = "%s |" % script_cmd
3589

    
3590
    # Let script predict size
3591
    exp_size = constants.IE_CUSTOM_SIZE
3592

    
3593
  else:
3594
    _Fail("Invalid %s I/O mode %r", mode, ieio)
3595

    
3596
  return (env, prefix, suffix, exp_size)
3597

    
3598

    
3599
def _CreateImportExportStatusDir(prefix):
3600
  """Creates status directory for import/export.
3601

3602
  """
3603
  return tempfile.mkdtemp(dir=pathutils.IMPORT_EXPORT_DIR,
3604
                          prefix=("%s-%s-" %
3605
                                  (prefix, utils.TimestampForFilename())))
3606

    
3607

    
3608
def StartImportExportDaemon(mode, opts, host, port, instance, component,
3609
                            ieio, ieioargs):
3610
  """Starts an import or export daemon.
3611

3612
  @param mode: Import/output mode
3613
  @type opts: L{objects.ImportExportOptions}
3614
  @param opts: Daemon options
3615
  @type host: string
3616
  @param host: Remote host for export (None for import)
3617
  @type port: int
3618
  @param port: Remote port for export (None for import)
3619
  @type instance: L{objects.Instance}
3620
  @param instance: Instance object
3621
  @type component: string
3622
  @param component: which part of the instance is transferred now,
3623
      e.g. 'disk/0'
3624
  @param ieio: Input/output type
3625
  @param ieioargs: Input/output arguments
3626

3627
  """
3628
  if mode == constants.IEM_IMPORT:
3629
    prefix = "import"
3630

    
3631
    if not (host is None and port is None):
3632
      _Fail("Can not specify host or port on import")
3633

    
3634
  elif mode == constants.IEM_EXPORT:
3635
    prefix = "export"
3636

    
3637
    if host is None or port is None:
3638
      _Fail("Host and port must be specified for an export")
3639

    
3640
  else:
3641
    _Fail("Invalid mode %r", mode)
3642

    
3643
  if (opts.key_name is None) ^ (opts.ca_pem is None):
3644
    _Fail("Cluster certificate can only be used for both key and CA")
3645

    
3646
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3647
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3648

    
3649
  if opts.key_name is None:
3650
    # Use server.pem
3651
    key_path = pathutils.NODED_CERT_FILE
3652
    cert_path = pathutils.NODED_CERT_FILE
3653
    assert opts.ca_pem is None
3654
  else:
3655
    (_, key_path, cert_path) = _GetX509Filenames(pathutils.CRYPTO_KEYS_DIR,
3656
                                                 opts.key_name)
3657
    assert opts.ca_pem is not None
3658

    
3659
  for i in [key_path, cert_path]:
3660
    if not os.path.exists(i):
3661
      _Fail("File '%s' does not exist" % i)
3662

    
3663
  status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3664
  try:
3665
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3666
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3667
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3668

    
3669
    if opts.ca_pem is None:
3670
      # Use server.pem
3671
      ca = utils.ReadFile(pathutils.NODED_CERT_FILE)
3672
    else:
3673
      ca = opts.ca_pem
3674

    
3675
    # Write CA file
3676
    utils.WriteFile(ca_file, data=ca, mode=0400)
3677

    
3678
    cmd = [
3679
      pathutils.IMPORT_EXPORT_DAEMON,
3680
      status_file, mode,
3681
      "--key=%s" % key_path,
3682
      "--cert=%s" % cert_path,
3683
      "--ca=%s" % ca_file,
3684
      ]
3685

    
3686
    if host:
3687
      cmd.append("--host=%s" % host)
3688

    
3689
    if port:
3690
      cmd.append("--port=%s" % port)
3691

    
3692
    if opts.ipv6:
3693
      cmd.append("--ipv6")
3694
    else:
3695
      cmd.append("--ipv4")
3696

    
3697
    if opts.compress:
3698
      cmd.append("--compress=%s" % opts.compress)
3699

    
3700
    if opts.magic:
3701
      cmd.append("--magic=%s" % opts.magic)
3702

    
3703
    if exp_size is not None:
3704
      cmd.append("--expected-size=%s" % exp_size)
3705

    
3706
    if cmd_prefix:
3707
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
3708

    
3709
    if cmd_suffix:
3710
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
3711

    
3712
    if mode == constants.IEM_EXPORT:
3713
      # Retry connection a few times when connecting to remote peer
3714
      cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3715
      cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3716
    elif opts.connect_timeout is not None:
3717
      assert mode == constants.IEM_IMPORT
3718
      # Overall timeout for establishing connection while listening
3719
      cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3720

    
3721
    logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3722

    
3723
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3724
    # support for receiving a file descriptor for output
3725
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3726
                      output=logfile)
3727

    
3728
    # The import/export name is simply the status directory name
3729
    return os.path.basename(status_dir)
3730

    
3731
  except Exception:
3732
    shutil.rmtree(status_dir, ignore_errors=True)
3733
    raise
3734

    
3735

    
3736
def GetImportExportStatus(names):
3737
  """Returns import/export daemon status.
3738

3739
  @type names: sequence
3740
  @param names: List of names
3741
  @rtype: List of dicts
3742
  @return: Returns a list of the state of each named import/export or None if a
3743
           status couldn't be read
3744

3745
  """
3746
  result = []
3747

    
3748
  for name in names:
3749
    status_file = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name,
3750
                                 _IES_STATUS_FILE)
3751

    
3752
    try:
3753
      data = utils.ReadFile(status_file)
3754
    except EnvironmentError, err:
3755
      if err.errno != errno.ENOENT:
3756
        raise
3757
      data = None
3758

    
3759
    if not data:
3760
      result.append(None)
3761
      continue
3762

    
3763
    result.append(serializer.LoadJson(data))
3764

    
3765
  return result
3766

    
3767

    
3768
def AbortImportExport(name):
3769
  """Sends SIGTERM to a running import/export daemon.
3770

3771
  """
3772
  logging.info("Abort import/export %s", name)
3773

    
3774
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3775
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3776

    
3777
  if pid:
3778
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3779
                 name, pid)
3780
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3781

    
3782

    
3783
def CleanupImportExport(name):
3784
  """Cleanup after an import or export.
3785

3786
  If the import/export daemon is still running it's killed. Afterwards the
3787
  whole status directory is removed.
3788

3789
  """
3790
  logging.info("Finalizing import/export %s", name)
3791

    
3792
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3793

    
3794
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3795

    
3796
  if pid:
3797
    logging.info("Import/export %s is still running with PID %s",
3798
                 name, pid)
3799
    utils.KillProcess(pid, waitpid=False)
3800

    
3801
  shutil.rmtree(status_dir, ignore_errors=True)
3802

    
3803

    
3804
def _SetPhysicalId(target_node_uuid, nodes_ip, disks):
3805
  """Sets the correct physical ID on all passed disks.
3806

3807
  """
3808
  for cf in disks:
3809
    cf.SetPhysicalID(target_node_uuid, nodes_ip)
3810

    
3811

    
3812
def _FindDisks(target_node_uuid, nodes_ip, disks):
3813
  """Sets the physical ID on disks and returns the block devices.
3814

3815
  """
3816
  _SetPhysicalId(target_node_uuid, nodes_ip, disks)
3817

    
3818
  bdevs = []
3819

    
3820
  for cf in disks:
3821
    rd = _RecursiveFindBD(cf)
3822
    if rd is None:
3823
      _Fail("Can't find device %s", cf)
3824
    bdevs.append(rd)
3825
  return bdevs
3826

    
3827

    
3828
def DrbdDisconnectNet(target_node_uuid, nodes_ip, disks):
3829
  """Disconnects the network on a list of drbd devices.
3830

3831
  """
3832
  bdevs = _FindDisks(target_node_uuid, nodes_ip, disks)
3833

    
3834
  # disconnect disks
3835
  for rd in bdevs:
3836
    try:
3837
      rd.DisconnectNet()
3838
    except errors.BlockDeviceError, err:
3839
      _Fail("Can't change network configuration to standalone mode: %s",
3840
            err, exc=True)
3841

    
3842

    
3843
def DrbdAttachNet(target_node_uuid, nodes_ip, disks, instance_name,
3844
                  multimaster):
3845
  """Attaches the network on a list of drbd devices.
3846

3847
  """
3848
  bdevs = _FindDisks(target_node_uuid, nodes_ip, disks)
3849

    
3850
  if multimaster:
3851
    for idx, rd in enumerate(bdevs):
3852
      try:
3853
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3854
      except EnvironmentError, err:
3855
        _Fail("Can't create symlink: %s", err)
3856
  # reconnect disks, switch to new master configuration and if
3857
  # needed primary mode
3858
  for rd in bdevs:
3859
    try:
3860
      rd.AttachNet(multimaster)
3861
    except errors.BlockDeviceError, err:
3862
      _Fail("Can't change network configuration: %s", err)
3863

    
3864
  # wait until the disks are connected; we need to retry the re-attach
3865
  # if the device becomes standalone, as this might happen if the one
3866
  # node disconnects and reconnects in a different mode before the
3867
  # other node reconnects; in this case, one or both of the nodes will
3868
  # decide it has wrong configuration and switch to standalone
3869

    
3870
  def _Attach():
3871
    all_connected = True
3872

    
3873
    for rd in bdevs:
3874
      stats = rd.GetProcStatus()
3875

    
3876
      all_connected = (all_connected and
3877
                       (stats.is_connected or stats.is_in_resync))
3878

    
3879
      if stats.is_standalone:
3880
        # peer had different config info and this node became
3881
        # standalone, even though this should not happen with the
3882
        # new staged way of changing disk configs
3883
        try:
3884
          rd.AttachNet(multimaster)
3885
        except errors.BlockDeviceError, err:
3886
          _Fail("Can't change network configuration: %s", err)
3887

    
3888
    if not all_connected:
3889
      raise utils.RetryAgain()
3890

    
3891
  try:
3892
    # Start with a delay of 100 miliseconds and go up to 5 seconds
3893
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3894
  except utils.RetryTimeout:
3895
    _Fail("Timeout in disk reconnecting")
3896

    
3897
  if multimaster:
3898
    # change to primary mode
3899
    for rd in bdevs:
3900
      try:
3901
        rd.Open()
3902
      except errors.BlockDeviceError, err:
3903
        _Fail("Can't change to primary mode: %s", err)
3904

    
3905

    
3906
def DrbdWaitSync(target_node_uuid, nodes_ip, disks):
3907
  """Wait until DRBDs have synchronized.
3908

3909
  """
3910
  def _helper(rd):
3911
    stats = rd.GetProcStatus()
3912
    if not (stats.is_connected or stats.is_in_resync):
3913
      raise utils.RetryAgain()
3914
    return stats
3915

    
3916
  bdevs = _FindDisks(target_node_uuid, nodes_ip, disks)
3917

    
3918
  min_resync = 100
3919
  alldone = True
3920
  for rd in bdevs:
3921
    try:
3922
      # poll each second for 15 seconds
3923
      stats = utils.Retry(_helper, 1, 15, args=[rd])
3924
    except utils.RetryTimeout:
3925
      stats = rd.GetProcStatus()
3926
      # last check
3927
      if not (stats.is_connected or stats.is_in_resync):
3928
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3929
    alldone = alldone and (not stats.is_in_resync)
3930
    if stats.sync_percent is not None:
3931
      min_resync = min(min_resync, stats.sync_percent)
3932

    
3933
  return (alldone, min_resync)
3934

    
3935

    
3936
def DrbdNeedsActivation(target_node_uuid, nodes_ip, disks):
3937
  """Checks which of the passed disks needs activation and returns their UUIDs.
3938

3939
  """
3940
  _SetPhysicalId(target_node_uuid, nodes_ip, disks)
3941
  faulty_disks = []
3942

    
3943
  for disk in disks:
3944
    rd = _RecursiveFindBD(disk)
3945
    if rd is None:
3946
      faulty_disks.append(disk)
3947
      continue
3948

    
3949
    stats = rd.GetProcStatus()
3950
    if stats.is_standalone or stats.is_diskless:
3951
      faulty_disks.append(disk)
3952

    
3953
  return [disk.uuid for disk in faulty_disks]
3954

    
3955

    
3956
def GetDrbdUsermodeHelper():
3957
  """Returns DRBD usermode helper currently configured.
3958

3959
  """
3960
  try:
3961
    return drbd.DRBD8.GetUsermodeHelper()
3962
  except errors.BlockDeviceError, err:
3963
    _Fail(str(err))
3964

    
3965

    
3966
def PowercycleNode(hypervisor_type, hvparams=None):
3967
  """Hard-powercycle the node.
3968

3969
  Because we need to return first, and schedule the powercycle in the
3970
  background, we won't be able to report failures nicely.
3971

3972
  """
3973
  hyper = hypervisor.GetHypervisor(hypervisor_type)
3974
  try:
3975
    pid = os.fork()
3976
  except OSError:
3977
    # if we can't fork, we'll pretend that we're in the child process
3978
    pid = 0
3979
  if pid > 0:
3980
    return "Reboot scheduled in 5 seconds"
3981
  # ensure the child is running on ram
3982
  try:
3983
    utils.Mlockall()
3984
  except Exception: # pylint: disable=W0703
3985
    pass
3986
  time.sleep(5)
3987
  hyper.PowercycleNode(hvparams=hvparams)
3988

    
3989

    
3990
def _VerifyRestrictedCmdName(cmd):
3991
  """Verifies a restricted command name.
3992

3993
  @type cmd: string
3994
  @param cmd: Command name
3995
  @rtype: tuple; (boolean, string or None)
3996
  @return: The tuple's first element is the status; if C{False}, the second
3997
    element is an error message string, otherwise it's C{None}
3998

3999
  """
4000
  if not cmd.strip():
4001
    return (False, "Missing command name")
4002

    
4003
  if os.path.basename(cmd) != cmd:
4004
    return (False, "Invalid command name")
4005

    
4006
  if not constants.EXT_PLUGIN_MASK.match(cmd):
4007
    return (False, "Command name contains forbidden characters")
4008

    
4009
  return (True, None)
4010

    
4011

    
4012
def _CommonRestrictedCmdCheck(path, owner):
4013
  """Common checks for restricted command file system directories and files.
4014

4015
  @type path: string
4016
  @param path: Path to check
4017
  @param owner: C{None} or tuple containing UID and GID
4018
  @rtype: tuple; (boolean, string or C{os.stat} result)
4019
  @return: The tuple's first element is the status; if C{False}, the second
4020
    element is an error message string, otherwise it's the result of C{os.stat}
4021

4022
  """
4023
  if owner is None:
4024
    # Default to root as owner
4025
    owner = (0, 0)
4026

    
4027
  try:
4028
    st = os.stat(path)
4029
  except EnvironmentError, err:
4030
    return (False, "Can't stat(2) '%s': %s" % (path, err))
4031

    
4032
  if stat.S_IMODE(st.st_mode) & (~_RCMD_MAX_MODE):
4033
    return (False, "Permissions on '%s' are too permissive" % path)
4034

    
4035
  if (st.st_uid, st.st_gid) != owner:
4036
    (owner_uid, owner_gid) = owner
4037
    return (False, "'%s' is not owned by %s:%s" % (path, owner_uid, owner_gid))
4038

    
4039
  return (True, st)
4040

    
4041

    
4042
def _VerifyRestrictedCmdDirectory(path, _owner=None):
4043
  """Verifies restricted command directory.
4044

4045
  @type path: string
4046
  @param path: Path to check
4047
  @rtype: tuple; (boolean, string or None)
4048
  @return: The tuple's first element is the status; if C{False}, the second
4049
    element is an error message string, otherwise it's C{None}
4050

4051
  """
4052
  (status, value) = _CommonRestrictedCmdCheck(path, _owner)
4053

    
4054
  if not status:
4055
    return (False, value)
4056

    
4057
  if not stat.S_ISDIR(value.st_mode):
4058
    return (False, "Path '%s' is not a directory" % path)
4059

    
4060
  return (True, None)
4061

    
4062

    
4063
def _VerifyRestrictedCmd(path, cmd, _owner=None):
4064
  """Verifies a whole restricted command and returns its executable filename.
4065

4066
  @type path: string
4067
  @param path: Directory containing restricted commands
4068
  @type cmd: string
4069
  @param cmd: Command name
4070
  @rtype: tuple; (boolean, string)
4071
  @return: The tuple's first element is the status; if C{False}, the second
4072
    element is an error message string, otherwise the second element is the
4073
    absolute path to the executable
4074

4075
  """
4076
  executable = utils.PathJoin(path, cmd)
4077

    
4078
  (status, msg) = _CommonRestrictedCmdCheck(executable, _owner)
4079

    
4080
  if not status:
4081
    return (False, msg)
4082

    
4083
  if not utils.IsExecutable(executable):
4084
    return (False, "access(2) thinks '%s' can't be executed" % executable)
4085

    
4086
  return (True, executable)
4087

    
4088

    
4089
def _PrepareRestrictedCmd(path, cmd,
4090
                          _verify_dir=_VerifyRestrictedCmdDirectory,
4091
                          _verify_name=_VerifyRestrictedCmdName,
4092
                          _verify_cmd=_VerifyRestrictedCmd):
4093
  """Performs a number of tests on a restricted command.
4094

4095
  @type path: string
4096
  @param path: Directory containing restricted commands
4097
  @type cmd: string
4098
  @param cmd: Command name
4099
  @return: Same as L{_VerifyRestrictedCmd}
4100

4101
  """
4102
  # Verify the directory first
4103
  (status, msg) = _verify_dir(path)
4104
  if status:
4105
    # Check command if everything was alright
4106
    (status, msg) = _verify_name(cmd)
4107

    
4108
  if not status:
4109
    return (False, msg)
4110

    
4111
  # Check actual executable
4112
  return _verify_cmd(path, cmd)
4113

    
4114

    
4115
def RunRestrictedCmd(cmd,
4116
                     _lock_timeout=_RCMD_LOCK_TIMEOUT,
4117
                     _lock_file=pathutils.RESTRICTED_COMMANDS_LOCK_FILE,
4118
                     _path=pathutils.RESTRICTED_COMMANDS_DIR,
4119
                     _sleep_fn=time.sleep,
4120
                     _prepare_fn=_PrepareRestrictedCmd,
4121
                     _runcmd_fn=utils.RunCmd,
4122
                     _enabled=constants.ENABLE_RESTRICTED_COMMANDS):
4123
  """Executes a restricted command after performing strict tests.
4124

4125
  @type cmd: string
4126
  @param cmd: Command name
4127
  @rtype: string
4128
  @return: Command output
4129
  @raise RPCFail: In case of an error
4130

4131
  """
4132
  logging.info("Preparing to run restricted command '%s'", cmd)
4133

    
4134
  if not _enabled:
4135
    _Fail("Restricted commands disabled at configure time")
4136

    
4137
  lock = None
4138
  try:
4139
    cmdresult = None
4140
    try:
4141
      lock = utils.FileLock.Open(_lock_file)
4142
      lock.Exclusive(blocking=True, timeout=_lock_timeout)
4143

    
4144
      (status, value) = _prepare_fn(_path, cmd)
4145

    
4146
      if status:
4147
        cmdresult = _runcmd_fn([value], env={}, reset_env=True,
4148
                               postfork_fn=lambda _: lock.Unlock())
4149
      else:
4150
        logging.error(value)
4151
    except Exception: # pylint: disable=W0703
4152
      # Keep original error in log
4153
      logging.exception("Caught exception")
4154

    
4155
    if cmdresult is None:
4156
      logging.info("Sleeping for %0.1f seconds before returning",
4157
                   _RCMD_INVALID_DELAY)
4158
      _sleep_fn(_RCMD_INVALID_DELAY)
4159

    
4160
      # Do not include original error message in returned error
4161
      _Fail("Executing command '%s' failed" % cmd)
4162
    elif cmdresult.failed or cmdresult.fail_reason:
4163
      _Fail("Restricted command '%s' failed: %s; output: %s",
4164
            cmd, cmdresult.fail_reason, cmdresult.output)
4165
    else:
4166
      return cmdresult.output
4167
  finally:
4168
    if lock is not None:
4169
      # Release lock at last
4170
      lock.Close()
4171
      lock = None
4172

    
4173

    
4174
def SetWatcherPause(until, _filename=pathutils.WATCHER_PAUSEFILE):
4175
  """Creates or removes the watcher pause file.
4176

4177
  @type until: None or number
4178
  @param until: Unix timestamp saying until when the watcher shouldn't run
4179

4180
  """
4181
  if until is None:
4182
    logging.info("Received request to no longer pause watcher")
4183
    utils.RemoveFile(_filename)
4184
  else:
4185
    logging.info("Received request to pause watcher until %s", until)
4186

    
4187
    if not ht.TNumber(until):
4188
      _Fail("Duration must be numeric")
4189

    
4190
    utils.WriteFile(_filename, data="%d\n" % (until, ), mode=0644)
4191

    
4192

    
4193
class HooksRunner(object):
4194
  """Hook runner.
4195

4196
  This class is instantiated on the node side (ganeti-noded) and not
4197
  on the master side.
4198

4199
  """
4200
  def __init__(self, hooks_base_dir=None):
4201
    """Constructor for hooks runner.
4202

4203
    @type hooks_base_dir: str or None
4204
    @param hooks_base_dir: if not None, this overrides the
4205
        L{pathutils.HOOKS_BASE_DIR} (useful for unittests)
4206

4207
    """
4208
    if hooks_base_dir is None:
4209
      hooks_base_dir = pathutils.HOOKS_BASE_DIR
4210
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
4211
    # constant
4212
    self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
4213

    
4214
  def RunLocalHooks(self, node_list, hpath, phase, env):
4215
    """Check that the hooks will be run only locally and then run them.
4216

4217
    """
4218
    assert len(node_list) == 1
4219
    node = node_list[0]
4220
    _, myself = ssconf.GetMasterAndMyself()
4221
    assert node == myself
4222

    
4223
    results = self.RunHooks(hpath, phase, env)
4224

    
4225
    # Return values in the form expected by HooksMaster
4226
    return {node: (None, False, results)}
4227

    
4228
  def RunHooks(self, hpath, phase, env):
4229
    """Run the scripts in the hooks directory.
4230

4231
    @type hpath: str
4232
    @param hpath: the path to the hooks directory which
4233
        holds the scripts
4234
    @type phase: str
4235
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
4236
        L{constants.HOOKS_PHASE_POST}
4237
    @type env: dict
4238
    @param env: dictionary with the environment for the hook
4239
    @rtype: list
4240
    @return: list of 3-element tuples:
4241
      - script path
4242
      - script result, either L{constants.HKR_SUCCESS} or
4243
        L{constants.HKR_FAIL}
4244
      - output of the script
4245

4246
    @raise errors.ProgrammerError: for invalid input
4247
        parameters
4248

4249
    """
4250
    if phase == constants.HOOKS_PHASE_PRE:
4251
      suffix = "pre"
4252
    elif phase == constants.HOOKS_PHASE_POST:
4253
      suffix = "post"
4254
    else:
4255
      _Fail("Unknown hooks phase '%s'", phase)
4256

    
4257
    subdir = "%s-%s.d" % (hpath, suffix)
4258
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
4259

    
4260
    results = []
4261

    
4262
    if not os.path.isdir(dir_name):
4263
      # for non-existing/non-dirs, we simply exit instead of logging a
4264
      # warning at every operation
4265
      return results
4266

    
4267
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
4268

    
4269
    for (relname, relstatus, runresult) in runparts_results:
4270
      if relstatus == constants.RUNPARTS_SKIP:
4271
        rrval = constants.HKR_SKIP
4272
        output = ""
4273
      elif relstatus == constants.RUNPARTS_ERR:
4274
        rrval = constants.HKR_FAIL
4275
        output = "Hook script execution error: %s" % runresult
4276
      elif relstatus == constants.RUNPARTS_RUN:
4277
        if runresult.failed:
4278
          rrval = constants.HKR_FAIL
4279
        else:
4280
          rrval = constants.HKR_SUCCESS
4281
        output = utils.SafeEncode(runresult.output.strip())
4282
      results.append(("%s/%s" % (subdir, relname), rrval, output))
4283

    
4284
    return results
4285

    
4286

    
4287
class IAllocatorRunner(object):
4288
  """IAllocator runner.
4289

4290
  This class is instantiated on the node side (ganeti-noded) and not on
4291
  the master side.
4292

4293
  """
4294
  @staticmethod
4295
  def Run(name, idata):
4296
    """Run an iallocator script.
4297

4298
    @type name: str
4299
    @param name: the iallocator script name
4300
    @type idata: str
4301
    @param idata: the allocator input data
4302

4303
    @rtype: tuple
4304
    @return: two element tuple of:
4305
       - status
4306
       - either error message or stdout of allocator (for success)
4307

4308
    """
4309
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
4310
                                  os.path.isfile)
4311
    if alloc_script is None:
4312
      _Fail("iallocator module '%s' not found in the search path", name)
4313

    
4314
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
4315
    try:
4316
      os.write(fd, idata)
4317
      os.close(fd)
4318
      result = utils.RunCmd([alloc_script, fin_name])
4319
      if result.failed:
4320
        _Fail("iallocator module '%s' failed: %s, output '%s'",
4321
              name, result.fail_reason, result.output)
4322
    finally:
4323
      os.unlink(fin_name)
4324

    
4325
    return result.stdout
4326

    
4327

    
4328
class DevCacheManager(object):
4329
  """Simple class for managing a cache of block device information.
4330

4331
  """
4332
  _DEV_PREFIX = "/dev/"
4333
  _ROOT_DIR = pathutils.BDEV_CACHE_DIR
4334

    
4335
  @classmethod
4336
  def _ConvertPath(cls, dev_path):
4337
    """Converts a /dev/name path to the cache file name.
4338

4339
    This replaces slashes with underscores and strips the /dev
4340
    prefix. It then returns the full path to the cache file.
4341

4342
    @type dev_path: str
4343
    @param dev_path: the C{/dev/} path name
4344
    @rtype: str
4345
    @return: the converted path name
4346

4347
    """
4348
    if dev_path.startswith(cls._DEV_PREFIX):
4349
      dev_path = dev_path[len(cls._DEV_PREFIX):]
4350
    dev_path = dev_path.replace("/", "_")
4351
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
4352
    return fpath
4353

    
4354
  @classmethod
4355
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
4356
    """Updates the cache information for a given device.
4357

4358
    @type dev_path: str
4359
    @param dev_path: the pathname of the device
4360
    @type owner: str
4361
    @param owner: the owner (instance name) of the device
4362
    @type on_primary: bool
4363
    @param on_primary: whether this is the primary
4364
        node nor not
4365
    @type iv_name: str
4366
    @param iv_name: the instance-visible name of the
4367
        device, as in objects.Disk.iv_name
4368

4369
    @rtype: None
4370

4371
    """
4372
    if dev_path is None:
4373
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
4374
      return
4375
    fpath = cls._ConvertPath(dev_path)
4376
    if on_primary:
4377
      state = "primary"
4378
    else:
4379
      state = "secondary"
4380
    if iv_name is None:
4381
      iv_name = "not_visible"
4382
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
4383
    try:
4384
      utils.WriteFile(fpath, data=fdata)
4385
    except EnvironmentError, err:
4386
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
4387

    
4388
  @classmethod
4389
  def RemoveCache(cls, dev_path):
4390
    """Remove data for a dev_path.
4391

4392
    This is just a wrapper over L{utils.io.RemoveFile} with a converted
4393
    path name and logging.
4394

4395
    @type dev_path: str
4396
    @param dev_path: the pathname of the device
4397

4398
    @rtype: None
4399

4400
    """
4401
    if dev_path is None:
4402
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
4403
      return
4404
    fpath = cls._ConvertPath(dev_path)
4405
    try:
4406
      utils.RemoveFile(fpath)
4407
    except EnvironmentError, err:
4408
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)