Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 7e8841bd

History | View | Annotate | Download (84.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26

27
"""
28

    
29

    
30
import os
31
import os.path
32
import shutil
33
import time
34
import stat
35
import errno
36
import re
37
import subprocess
38
import random
39
import logging
40
import tempfile
41
import zlib
42
import base64
43

    
44
from ganeti import errors
45
from ganeti import utils
46
from ganeti import ssh
47
from ganeti import hypervisor
48
from ganeti import constants
49
from ganeti import bdev
50
from ganeti import objects
51
from ganeti import ssconf
52

    
53

    
54
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
55

    
56

    
57
class RPCFail(Exception):
58
  """Class denoting RPC failure.
59

60
  Its argument is the error message.
61

62
  """
63

    
64

    
65
def _Fail(msg, *args, **kwargs):
66
  """Log an error and the raise an RPCFail exception.
67

68
  This exception is then handled specially in the ganeti daemon and
69
  turned into a 'failed' return type. As such, this function is a
70
  useful shortcut for logging the error and returning it to the master
71
  daemon.
72

73
  @type msg: string
74
  @param msg: the text of the exception
75
  @raise RPCFail
76

77
  """
78
  if args:
79
    msg = msg % args
80
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
81
    if "exc" in kwargs and kwargs["exc"]:
82
      logging.exception(msg)
83
    else:
84
      logging.error(msg)
85
  raise RPCFail(msg)
86

    
87

    
88
def _GetConfig():
89
  """Simple wrapper to return a SimpleStore.
90

91
  @rtype: L{ssconf.SimpleStore}
92
  @return: a SimpleStore instance
93

94
  """
95
  return ssconf.SimpleStore()
96

    
97

    
98
def _GetSshRunner(cluster_name):
99
  """Simple wrapper to return an SshRunner.
100

101
  @type cluster_name: str
102
  @param cluster_name: the cluster name, which is needed
103
      by the SshRunner constructor
104
  @rtype: L{ssh.SshRunner}
105
  @return: an SshRunner instance
106

107
  """
108
  return ssh.SshRunner(cluster_name)
109

    
110

    
111
def _Decompress(data):
112
  """Unpacks data compressed by the RPC client.
113

114
  @type data: list or tuple
115
  @param data: Data sent by RPC client
116
  @rtype: str
117
  @return: Decompressed data
118

119
  """
120
  assert isinstance(data, (list, tuple))
121
  assert len(data) == 2
122
  (encoding, content) = data
123
  if encoding == constants.RPC_ENCODING_NONE:
124
    return content
125
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
126
    return zlib.decompress(base64.b64decode(content))
127
  else:
128
    raise AssertionError("Unknown data encoding")
129

    
130

    
131
def _CleanDirectory(path, exclude=None):
132
  """Removes all regular files in a directory.
133

134
  @type path: str
135
  @param path: the directory to clean
136
  @type exclude: list
137
  @param exclude: list of files to be excluded, defaults
138
      to the empty list
139

140
  """
141
  if not os.path.isdir(path):
142
    return
143
  if exclude is None:
144
    exclude = []
145
  else:
146
    # Normalize excluded paths
147
    exclude = [os.path.normpath(i) for i in exclude]
148

    
149
  for rel_name in utils.ListVisibleFiles(path):
150
    full_name = os.path.normpath(os.path.join(path, rel_name))
151
    if full_name in exclude:
152
      continue
153
    if os.path.isfile(full_name) and not os.path.islink(full_name):
154
      utils.RemoveFile(full_name)
155

    
156

    
157
def _BuildUploadFileList():
158
  """Build the list of allowed upload files.
159

160
  This is abstracted so that it's built only once at module import time.
161

162
  """
163
  allowed_files = set([
164
    constants.CLUSTER_CONF_FILE,
165
    constants.ETC_HOSTS,
166
    constants.SSH_KNOWN_HOSTS_FILE,
167
    constants.VNC_PASSWORD_FILE,
168
    constants.RAPI_CERT_FILE,
169
    constants.RAPI_USERS_FILE,
170
    constants.HMAC_CLUSTER_KEY,
171
    ])
172

    
173
  for hv_name in constants.HYPER_TYPES:
174
    hv_class = hypervisor.GetHypervisorClass(hv_name)
175
    allowed_files.update(hv_class.GetAncillaryFiles())
176

    
177
  return frozenset(allowed_files)
178

    
179

    
180
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
181

    
182

    
183
def JobQueuePurge():
184
  """Removes job queue files and archived jobs.
185

186
  @rtype: tuple
187
  @return: True, None
188

189
  """
190
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
191
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
192

    
193

    
194
def GetMasterInfo():
195
  """Returns master information.
196

197
  This is an utility function to compute master information, either
198
  for consumption here or from the node daemon.
199

200
  @rtype: tuple
201
  @return: master_netdev, master_ip, master_name
202
  @raise RPCFail: in case of errors
203

204
  """
205
  try:
206
    cfg = _GetConfig()
207
    master_netdev = cfg.GetMasterNetdev()
208
    master_ip = cfg.GetMasterIP()
209
    master_node = cfg.GetMasterNode()
210
  except errors.ConfigurationError, err:
211
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
212
  return (master_netdev, master_ip, master_node)
213

    
214

    
215
def StartMaster(start_daemons, no_voting):
216
  """Activate local node as master node.
217

218
  The function will always try activate the IP address of the master
219
  (unless someone else has it). It will also start the master daemons,
220
  based on the start_daemons parameter.
221

222
  @type start_daemons: boolean
223
  @param start_daemons: whether to also start the master
224
      daemons (ganeti-masterd and ganeti-rapi)
225
  @type no_voting: boolean
226
  @param no_voting: whether to start ganeti-masterd without a node vote
227
      (if start_daemons is True), but still non-interactively
228
  @rtype: None
229

230
  """
231
  # GetMasterInfo will raise an exception if not able to return data
232
  master_netdev, master_ip, _ = GetMasterInfo()
233

    
234
  err_msgs = []
235
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
236
    if utils.OwnIpAddress(master_ip):
237
      # we already have the ip:
238
      logging.debug("Master IP already configured, doing nothing")
239
    else:
240
      msg = "Someone else has the master ip, not activating"
241
      logging.error(msg)
242
      err_msgs.append(msg)
243
  else:
244
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
245
                           "dev", master_netdev, "label",
246
                           "%s:0" % master_netdev])
247
    if result.failed:
248
      msg = "Can't activate master IP: %s" % result.output
249
      logging.error(msg)
250
      err_msgs.append(msg)
251

    
252
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
253
                           "-s", master_ip, master_ip])
254
    # we'll ignore the exit code of arping
255

    
256
  # and now start the master and rapi daemons
257
  if start_daemons:
258
    daemons_params = {
259
        'ganeti-masterd': [],
260
        'ganeti-rapi': [],
261
        }
262
    if no_voting:
263
      daemons_params['ganeti-masterd'].append('--no-voting')
264
      daemons_params['ganeti-masterd'].append('--yes-do-it')
265
    for daemon in daemons_params:
266
      cmd = [daemon]
267
      cmd.extend(daemons_params[daemon])
268
      result = utils.RunCmd(cmd)
269
      if result.failed:
270
        msg = "Can't start daemon %s: %s" % (daemon, result.output)
271
        logging.error(msg)
272
        err_msgs.append(msg)
273

    
274
  if err_msgs:
275
    _Fail("; ".join(err_msgs))
276

    
277

    
278
def StopMaster(stop_daemons):
279
  """Deactivate this node as master.
280

281
  The function will always try to deactivate the IP address of the
282
  master. It will also stop the master daemons depending on the
283
  stop_daemons parameter.
284

285
  @type stop_daemons: boolean
286
  @param stop_daemons: whether to also stop the master daemons
287
      (ganeti-masterd and ganeti-rapi)
288
  @rtype: None
289

290
  """
291
  # TODO: log and report back to the caller the error failures; we
292
  # need to decide in which case we fail the RPC for this
293

    
294
  # GetMasterInfo will raise an exception if not able to return data
295
  master_netdev, master_ip, _ = GetMasterInfo()
296

    
297
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
298
                         "dev", master_netdev])
299
  if result.failed:
300
    logging.error("Can't remove the master IP, error: %s", result.output)
301
    # but otherwise ignore the failure
302

    
303
  if stop_daemons:
304
    # stop/kill the rapi and the master daemon
305
    for daemon in constants.RAPI, constants.MASTERD:
306
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
307

    
308

    
309
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
310
  """Joins this node to the cluster.
311

312
  This does the following:
313
      - updates the hostkeys of the machine (rsa and dsa)
314
      - adds the ssh private key to the user
315
      - adds the ssh public key to the users' authorized_keys file
316

317
  @type dsa: str
318
  @param dsa: the DSA private key to write
319
  @type dsapub: str
320
  @param dsapub: the DSA public key to write
321
  @type rsa: str
322
  @param rsa: the RSA private key to write
323
  @type rsapub: str
324
  @param rsapub: the RSA public key to write
325
  @type sshkey: str
326
  @param sshkey: the SSH private key to write
327
  @type sshpub: str
328
  @param sshpub: the SSH public key to write
329
  @rtype: boolean
330
  @return: the success of the operation
331

332
  """
333
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
334
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
335
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
336
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
337
  for name, content, mode in sshd_keys:
338
    utils.WriteFile(name, data=content, mode=mode)
339

    
340
  try:
341
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
342
                                                    mkdir=True)
343
  except errors.OpExecError, err:
344
    _Fail("Error while processing user ssh files: %s", err, exc=True)
345

    
346
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
347
    utils.WriteFile(name, data=content, mode=0600)
348

    
349
  utils.AddAuthorizedKey(auth_keys, sshpub)
350

    
351
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
352

    
353

    
354
def LeaveCluster():
355
  """Cleans up and remove the current node.
356

357
  This function cleans up and prepares the current node to be removed
358
  from the cluster.
359

360
  If processing is successful, then it raises an
361
  L{errors.QuitGanetiException} which is used as a special case to
362
  shutdown the node daemon.
363

364
  """
365
  _CleanDirectory(constants.DATA_DIR)
366
  JobQueuePurge()
367

    
368
  try:
369
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
370

    
371
    utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
372

    
373
    utils.RemoveFile(priv_key)
374
    utils.RemoveFile(pub_key)
375
  except errors.OpExecError:
376
    logging.exception("Error while processing ssh files")
377

    
378
  try:
379
    utils.RemoveFile(constants.HMAC_CLUSTER_KEY)
380
    utils.RemoveFile(constants.RAPI_CERT_FILE)
381
    utils.RemoveFile(constants.SSL_CERT_FILE)
382
  except:
383
    logging.exception("Error while removing cluster secrets")
384

    
385
  confd_pid = utils.ReadPidFile(utils.DaemonPidFileName(constants.CONFD))
386

    
387
  if confd_pid:
388
    utils.KillProcess(confd_pid, timeout=2)
389

    
390
  # Raise a custom exception (handled in ganeti-noded)
391
  raise errors.QuitGanetiException(True, 'Shutdown scheduled')
392

    
393

    
394
def GetNodeInfo(vgname, hypervisor_type):
395
  """Gives back a hash with different information about the node.
396

397
  @type vgname: C{string}
398
  @param vgname: the name of the volume group to ask for disk space information
399
  @type hypervisor_type: C{str}
400
  @param hypervisor_type: the name of the hypervisor to ask for
401
      memory information
402
  @rtype: C{dict}
403
  @return: dictionary with the following keys:
404
      - vg_size is the size of the configured volume group in MiB
405
      - vg_free is the free size of the volume group in MiB
406
      - memory_dom0 is the memory allocated for domain0 in MiB
407
      - memory_free is the currently available (free) ram in MiB
408
      - memory_total is the total number of ram in MiB
409

410
  """
411
  outputarray = {}
412
  vginfo = _GetVGInfo(vgname)
413
  outputarray['vg_size'] = vginfo['vg_size']
414
  outputarray['vg_free'] = vginfo['vg_free']
415

    
416
  hyper = hypervisor.GetHypervisor(hypervisor_type)
417
  hyp_info = hyper.GetNodeInfo()
418
  if hyp_info is not None:
419
    outputarray.update(hyp_info)
420

    
421
  outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
422

    
423
  return outputarray
424

    
425

    
426
def VerifyNode(what, cluster_name):
427
  """Verify the status of the local node.
428

429
  Based on the input L{what} parameter, various checks are done on the
430
  local node.
431

432
  If the I{filelist} key is present, this list of
433
  files is checksummed and the file/checksum pairs are returned.
434

435
  If the I{nodelist} key is present, we check that we have
436
  connectivity via ssh with the target nodes (and check the hostname
437
  report).
438

439
  If the I{node-net-test} key is present, we check that we have
440
  connectivity to the given nodes via both primary IP and, if
441
  applicable, secondary IPs.
442

443
  @type what: C{dict}
444
  @param what: a dictionary of things to check:
445
      - filelist: list of files for which to compute checksums
446
      - nodelist: list of nodes we should check ssh communication with
447
      - node-net-test: list of nodes we should check node daemon port
448
        connectivity with
449
      - hypervisor: list with hypervisors to run the verify for
450
  @rtype: dict
451
  @return: a dictionary with the same keys as the input dict, and
452
      values representing the result of the checks
453

454
  """
455
  result = {}
456

    
457
  if constants.NV_HYPERVISOR in what:
458
    result[constants.NV_HYPERVISOR] = tmp = {}
459
    for hv_name in what[constants.NV_HYPERVISOR]:
460
      tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
461

    
462
  if constants.NV_FILELIST in what:
463
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
464
      what[constants.NV_FILELIST])
465

    
466
  if constants.NV_NODELIST in what:
467
    result[constants.NV_NODELIST] = tmp = {}
468
    random.shuffle(what[constants.NV_NODELIST])
469
    for node in what[constants.NV_NODELIST]:
470
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
471
      if not success:
472
        tmp[node] = message
473

    
474
  if constants.NV_NODENETTEST in what:
475
    result[constants.NV_NODENETTEST] = tmp = {}
476
    my_name = utils.HostInfo().name
477
    my_pip = my_sip = None
478
    for name, pip, sip in what[constants.NV_NODENETTEST]:
479
      if name == my_name:
480
        my_pip = pip
481
        my_sip = sip
482
        break
483
    if not my_pip:
484
      tmp[my_name] = ("Can't find my own primary/secondary IP"
485
                      " in the node list")
486
    else:
487
      port = utils.GetDaemonPort(constants.NODED)
488
      for name, pip, sip in what[constants.NV_NODENETTEST]:
489
        fail = []
490
        if not utils.TcpPing(pip, port, source=my_pip):
491
          fail.append("primary")
492
        if sip != pip:
493
          if not utils.TcpPing(sip, port, source=my_sip):
494
            fail.append("secondary")
495
        if fail:
496
          tmp[name] = ("failure using the %s interface(s)" %
497
                       " and ".join(fail))
498

    
499
  if constants.NV_LVLIST in what:
500
    result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
501

    
502
  if constants.NV_INSTANCELIST in what:
503
    result[constants.NV_INSTANCELIST] = GetInstanceList(
504
      what[constants.NV_INSTANCELIST])
505

    
506
  if constants.NV_VGLIST in what:
507
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
508

    
509
  if constants.NV_VERSION in what:
510
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
511
                                    constants.RELEASE_VERSION)
512

    
513
  if constants.NV_HVINFO in what:
514
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
515
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
516

    
517
  if constants.NV_DRBDLIST in what:
518
    try:
519
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
520
    except errors.BlockDeviceError, err:
521
      logging.warning("Can't get used minors list", exc_info=True)
522
      used_minors = str(err)
523
    result[constants.NV_DRBDLIST] = used_minors
524

    
525
  return result
526

    
527

    
528
def GetVolumeList(vg_name):
529
  """Compute list of logical volumes and their size.
530

531
  @type vg_name: str
532
  @param vg_name: the volume group whose LVs we should list
533
  @rtype: dict
534
  @return:
535
      dictionary of all partions (key) with value being a tuple of
536
      their size (in MiB), inactive and online status::
537

538
        {'test1': ('20.06', True, True)}
539

540
      in case of errors, a string is returned with the error
541
      details.
542

543
  """
544
  lvs = {}
545
  sep = '|'
546
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
547
                         "--separator=%s" % sep,
548
                         "-olv_name,lv_size,lv_attr", vg_name])
549
  if result.failed:
550
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
551

    
552
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
553
  for line in result.stdout.splitlines():
554
    line = line.strip()
555
    match = valid_line_re.match(line)
556
    if not match:
557
      logging.error("Invalid line returned from lvs output: '%s'", line)
558
      continue
559
    name, size, attr = match.groups()
560
    inactive = attr[4] == '-'
561
    online = attr[5] == 'o'
562
    virtual = attr[0] == 'v'
563
    if virtual:
564
      # we don't want to report such volumes as existing, since they
565
      # don't really hold data
566
      continue
567
    lvs[name] = (size, inactive, online)
568

    
569
  return lvs
570

    
571

    
572
def ListVolumeGroups():
573
  """List the volume groups and their size.
574

575
  @rtype: dict
576
  @return: dictionary with keys volume name and values the
577
      size of the volume
578

579
  """
580
  return utils.ListVolumeGroups()
581

    
582

    
583
def NodeVolumes():
584
  """List all volumes on this node.
585

586
  @rtype: list
587
  @return:
588
    A list of dictionaries, each having four keys:
589
      - name: the logical volume name,
590
      - size: the size of the logical volume
591
      - dev: the physical device on which the LV lives
592
      - vg: the volume group to which it belongs
593

594
    In case of errors, we return an empty list and log the
595
    error.
596

597
    Note that since a logical volume can live on multiple physical
598
    volumes, the resulting list might include a logical volume
599
    multiple times.
600

601
  """
602
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
603
                         "--separator=|",
604
                         "--options=lv_name,lv_size,devices,vg_name"])
605
  if result.failed:
606
    _Fail("Failed to list logical volumes, lvs output: %s",
607
          result.output)
608

    
609
  def parse_dev(dev):
610
    if '(' in dev:
611
      return dev.split('(')[0]
612
    else:
613
      return dev
614

    
615
  def map_line(line):
616
    return {
617
      'name': line[0].strip(),
618
      'size': line[1].strip(),
619
      'dev': parse_dev(line[2].strip()),
620
      'vg': line[3].strip(),
621
    }
622

    
623
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
624
          if line.count('|') >= 3]
625

    
626

    
627
def BridgesExist(bridges_list):
628
  """Check if a list of bridges exist on the current node.
629

630
  @rtype: boolean
631
  @return: C{True} if all of them exist, C{False} otherwise
632

633
  """
634
  missing = []
635
  for bridge in bridges_list:
636
    if not utils.BridgeExists(bridge):
637
      missing.append(bridge)
638

    
639
  if missing:
640
    _Fail("Missing bridges %s", ", ".join(missing))
641

    
642

    
643
def GetInstanceList(hypervisor_list):
644
  """Provides a list of instances.
645

646
  @type hypervisor_list: list
647
  @param hypervisor_list: the list of hypervisors to query information
648

649
  @rtype: list
650
  @return: a list of all running instances on the current node
651
    - instance1.example.com
652
    - instance2.example.com
653

654
  """
655
  results = []
656
  for hname in hypervisor_list:
657
    try:
658
      names = hypervisor.GetHypervisor(hname).ListInstances()
659
      results.extend(names)
660
    except errors.HypervisorError, err:
661
      _Fail("Error enumerating instances (hypervisor %s): %s",
662
            hname, err, exc=True)
663

    
664
  return results
665

    
666

    
667
def GetInstanceInfo(instance, hname):
668
  """Gives back the information about an instance as a dictionary.
669

670
  @type instance: string
671
  @param instance: the instance name
672
  @type hname: string
673
  @param hname: the hypervisor type of the instance
674

675
  @rtype: dict
676
  @return: dictionary with the following keys:
677
      - memory: memory size of instance (int)
678
      - state: xen state of instance (string)
679
      - time: cpu time of instance (float)
680

681
  """
682
  output = {}
683

    
684
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
685
  if iinfo is not None:
686
    output['memory'] = iinfo[2]
687
    output['state'] = iinfo[4]
688
    output['time'] = iinfo[5]
689

    
690
  return output
691

    
692

    
693
def GetInstanceMigratable(instance):
694
  """Gives whether an instance can be migrated.
695

696
  @type instance: L{objects.Instance}
697
  @param instance: object representing the instance to be checked.
698

699
  @rtype: tuple
700
  @return: tuple of (result, description) where:
701
      - result: whether the instance can be migrated or not
702
      - description: a description of the issue, if relevant
703

704
  """
705
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
706
  iname = instance.name
707
  if iname not in hyper.ListInstances():
708
    _Fail("Instance %s is not running", iname)
709

    
710
  for idx in range(len(instance.disks)):
711
    link_name = _GetBlockDevSymlinkPath(iname, idx)
712
    if not os.path.islink(link_name):
713
      _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
714

    
715

    
716
def GetAllInstancesInfo(hypervisor_list):
717
  """Gather data about all instances.
718

719
  This is the equivalent of L{GetInstanceInfo}, except that it
720
  computes data for all instances at once, thus being faster if one
721
  needs data about more than one instance.
722

723
  @type hypervisor_list: list
724
  @param hypervisor_list: list of hypervisors to query for instance data
725

726
  @rtype: dict
727
  @return: dictionary of instance: data, with data having the following keys:
728
      - memory: memory size of instance (int)
729
      - state: xen state of instance (string)
730
      - time: cpu time of instance (float)
731
      - vcpus: the number of vcpus
732

733
  """
734
  output = {}
735

    
736
  for hname in hypervisor_list:
737
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
738
    if iinfo:
739
      for name, _, memory, vcpus, state, times in iinfo:
740
        value = {
741
          'memory': memory,
742
          'vcpus': vcpus,
743
          'state': state,
744
          'time': times,
745
          }
746
        if name in output:
747
          # we only check static parameters, like memory and vcpus,
748
          # and not state and time which can change between the
749
          # invocations of the different hypervisors
750
          for key in 'memory', 'vcpus':
751
            if value[key] != output[name][key]:
752
              _Fail("Instance %s is running twice"
753
                    " with different parameters", name)
754
        output[name] = value
755

    
756
  return output
757

    
758

    
759
def InstanceOsAdd(instance, reinstall):
760
  """Add an OS to an instance.
761

762
  @type instance: L{objects.Instance}
763
  @param instance: Instance whose OS is to be installed
764
  @type reinstall: boolean
765
  @param reinstall: whether this is an instance reinstall
766
  @rtype: None
767

768
  """
769
  inst_os = OSFromDisk(instance.os)
770

    
771
  create_env = OSEnvironment(instance, inst_os)
772
  if reinstall:
773
    create_env['INSTANCE_REINSTALL'] = "1"
774

    
775
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
776
                                     instance.name, int(time.time()))
777

    
778
  result = utils.RunCmd([inst_os.create_script], env=create_env,
779
                        cwd=inst_os.path, output=logfile,)
780
  if result.failed:
781
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
782
                  " output: %s", result.cmd, result.fail_reason, logfile,
783
                  result.output)
784
    lines = [utils.SafeEncode(val)
785
             for val in utils.TailFile(logfile, lines=20)]
786
    _Fail("OS create script failed (%s), last lines in the"
787
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
788

    
789

    
790
def RunRenameInstance(instance, old_name):
791
  """Run the OS rename script for an instance.
792

793
  @type instance: L{objects.Instance}
794
  @param instance: Instance whose OS is to be installed
795
  @type old_name: string
796
  @param old_name: previous instance name
797
  @rtype: boolean
798
  @return: the success of the operation
799

800
  """
801
  inst_os = OSFromDisk(instance.os)
802

    
803
  rename_env = OSEnvironment(instance, inst_os)
804
  rename_env['OLD_INSTANCE_NAME'] = old_name
805

    
806
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
807
                                           old_name,
808
                                           instance.name, int(time.time()))
809

    
810
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
811
                        cwd=inst_os.path, output=logfile)
812

    
813
  if result.failed:
814
    logging.error("os create command '%s' returned error: %s output: %s",
815
                  result.cmd, result.fail_reason, result.output)
816
    lines = [utils.SafeEncode(val)
817
             for val in utils.TailFile(logfile, lines=20)]
818
    _Fail("OS rename script failed (%s), last lines in the"
819
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
820

    
821

    
822
def _GetVGInfo(vg_name):
823
  """Get information about the volume group.
824

825
  @type vg_name: str
826
  @param vg_name: the volume group which we query
827
  @rtype: dict
828
  @return:
829
    A dictionary with the following keys:
830
      - C{vg_size} is the total size of the volume group in MiB
831
      - C{vg_free} is the free size of the volume group in MiB
832
      - C{pv_count} are the number of physical disks in that VG
833

834
    If an error occurs during gathering of data, we return the same dict
835
    with keys all set to None.
836

837
  """
838
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
839

    
840
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
841
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
842

    
843
  if retval.failed:
844
    logging.error("volume group %s not present", vg_name)
845
    return retdic
846
  valarr = retval.stdout.strip().rstrip(':').split(':')
847
  if len(valarr) == 3:
848
    try:
849
      retdic = {
850
        "vg_size": int(round(float(valarr[0]), 0)),
851
        "vg_free": int(round(float(valarr[1]), 0)),
852
        "pv_count": int(valarr[2]),
853
        }
854
    except ValueError, err:
855
      logging.exception("Fail to parse vgs output: %s", err)
856
  else:
857
    logging.error("vgs output has the wrong number of fields (expected"
858
                  " three): %s", str(valarr))
859
  return retdic
860

    
861

    
862
def _GetBlockDevSymlinkPath(instance_name, idx):
863
  return os.path.join(constants.DISK_LINKS_DIR,
864
                      "%s:%d" % (instance_name, idx))
865

    
866

    
867
def _SymlinkBlockDev(instance_name, device_path, idx):
868
  """Set up symlinks to a instance's block device.
869

870
  This is an auxiliary function run when an instance is start (on the primary
871
  node) or when an instance is migrated (on the target node).
872

873

874
  @param instance_name: the name of the target instance
875
  @param device_path: path of the physical block device, on the node
876
  @param idx: the disk index
877
  @return: absolute path to the disk's symlink
878

879
  """
880
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
881
  try:
882
    os.symlink(device_path, link_name)
883
  except OSError, err:
884
    if err.errno == errno.EEXIST:
885
      if (not os.path.islink(link_name) or
886
          os.readlink(link_name) != device_path):
887
        os.remove(link_name)
888
        os.symlink(device_path, link_name)
889
    else:
890
      raise
891

    
892
  return link_name
893

    
894

    
895
def _RemoveBlockDevLinks(instance_name, disks):
896
  """Remove the block device symlinks belonging to the given instance.
897

898
  """
899
  for idx, _ in enumerate(disks):
900
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
901
    if os.path.islink(link_name):
902
      try:
903
        os.remove(link_name)
904
      except OSError:
905
        logging.exception("Can't remove symlink '%s'", link_name)
906

    
907

    
908
def _GatherAndLinkBlockDevs(instance):
909
  """Set up an instance's block device(s).
910

911
  This is run on the primary node at instance startup. The block
912
  devices must be already assembled.
913

914
  @type instance: L{objects.Instance}
915
  @param instance: the instance whose disks we shoul assemble
916
  @rtype: list
917
  @return: list of (disk_object, device_path)
918

919
  """
920
  block_devices = []
921
  for idx, disk in enumerate(instance.disks):
922
    device = _RecursiveFindBD(disk)
923
    if device is None:
924
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
925
                                    str(disk))
926
    device.Open()
927
    try:
928
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
929
    except OSError, e:
930
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
931
                                    e.strerror)
932

    
933
    block_devices.append((disk, link_name))
934

    
935
  return block_devices
936

    
937

    
938
def StartInstance(instance):
939
  """Start an instance.
940

941
  @type instance: L{objects.Instance}
942
  @param instance: the instance object
943
  @rtype: None
944

945
  """
946
  running_instances = GetInstanceList([instance.hypervisor])
947

    
948
  if instance.name in running_instances:
949
    logging.info("Instance %s already running, not starting", instance.name)
950
    return
951

    
952
  try:
953
    block_devices = _GatherAndLinkBlockDevs(instance)
954
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
955
    hyper.StartInstance(instance, block_devices)
956
  except errors.BlockDeviceError, err:
957
    _Fail("Block device error: %s", err, exc=True)
958
  except errors.HypervisorError, err:
959
    _RemoveBlockDevLinks(instance.name, instance.disks)
960
    _Fail("Hypervisor error: %s", err, exc=True)
961

    
962

    
963
def InstanceShutdown(instance, timeout):
964
  """Shut an instance down.
965

966
  @note: this functions uses polling with a hardcoded timeout.
967

968
  @type instance: L{objects.Instance}
969
  @param instance: the instance object
970
  @type timeout: integer
971
  @param timeout: maximum timeout for soft shutdown
972
  @rtype: None
973

974
  """
975
  hv_name = instance.hypervisor
976
  hyper = hypervisor.GetHypervisor(hv_name)
977
  running_instances = hyper.ListInstances()
978
  iname = instance.name
979

    
980
  if iname not in running_instances:
981
    logging.info("Instance %s not running, doing nothing", iname)
982
    return
983

    
984
  start = time.time()
985
  end = start + timeout
986
  sleep_time = 1
987

    
988
  tried_once = False
989
  while not tried_once and time.time() < end:
990
    try:
991
      hyper.StopInstance(instance, retry=tried_once)
992
    except errors.HypervisorError, err:
993
      _Fail("Failed to stop instance %s: %s", iname, err)
994
    tried_once = True
995
    time.sleep(sleep_time)
996
    if instance.name not in hyper.ListInstances():
997
      break
998
    if sleep_time < 5:
999
      # 1.2 behaves particularly good for our case:
1000
      # it gives us 10 increasing steps and caps just slightly above 5 seconds
1001
      sleep_time *= 1.2
1002
  else:
1003
    # the shutdown did not succeed
1004
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1005

    
1006
    try:
1007
      hyper.StopInstance(instance, force=True)
1008
    except errors.HypervisorError, err:
1009
      _Fail("Failed to force stop instance %s: %s", iname, err)
1010

    
1011
    time.sleep(1)
1012
    if instance.name in GetInstanceList([hv_name]):
1013
      _Fail("Could not shutdown instance %s even by destroy", iname)
1014

    
1015
  _RemoveBlockDevLinks(iname, instance.disks)
1016

    
1017

    
1018
def InstanceReboot(instance, reboot_type, shutdown_timeout):
1019
  """Reboot an instance.
1020

1021
  @type instance: L{objects.Instance}
1022
  @param instance: the instance object to reboot
1023
  @type reboot_type: str
1024
  @param reboot_type: the type of reboot, one the following
1025
    constants:
1026
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1027
        instance OS, do not recreate the VM
1028
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1029
        restart the VM (at the hypervisor level)
1030
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1031
        not accepted here, since that mode is handled differently, in
1032
        cmdlib, and translates into full stop and start of the
1033
        instance (instead of a call_instance_reboot RPC)
1034
  @type timeout: integer
1035
  @param timeout: maximum timeout for soft shutdown
1036
  @rtype: None
1037

1038
  """
1039
  running_instances = GetInstanceList([instance.hypervisor])
1040

    
1041
  if instance.name not in running_instances:
1042
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1043

    
1044
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1045
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1046
    try:
1047
      hyper.RebootInstance(instance)
1048
    except errors.HypervisorError, err:
1049
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1050
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1051
    try:
1052
      InstanceShutdown(instance, shutdown_timeout)
1053
      return StartInstance(instance)
1054
    except errors.HypervisorError, err:
1055
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1056
  else:
1057
    _Fail("Invalid reboot_type received: %s", reboot_type)
1058

    
1059

    
1060
def MigrationInfo(instance):
1061
  """Gather information about an instance to be migrated.
1062

1063
  @type instance: L{objects.Instance}
1064
  @param instance: the instance definition
1065

1066
  """
1067
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1068
  try:
1069
    info = hyper.MigrationInfo(instance)
1070
  except errors.HypervisorError, err:
1071
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1072
  return info
1073

    
1074

    
1075
def AcceptInstance(instance, info, target):
1076
  """Prepare the node to accept an instance.
1077

1078
  @type instance: L{objects.Instance}
1079
  @param instance: the instance definition
1080
  @type info: string/data (opaque)
1081
  @param info: migration information, from the source node
1082
  @type target: string
1083
  @param target: target host (usually ip), on this node
1084

1085
  """
1086
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1087
  try:
1088
    hyper.AcceptInstance(instance, info, target)
1089
  except errors.HypervisorError, err:
1090
    _Fail("Failed to accept instance: %s", err, exc=True)
1091

    
1092

    
1093
def FinalizeMigration(instance, info, success):
1094
  """Finalize any preparation to accept an instance.
1095

1096
  @type instance: L{objects.Instance}
1097
  @param instance: the instance definition
1098
  @type info: string/data (opaque)
1099
  @param info: migration information, from the source node
1100
  @type success: boolean
1101
  @param success: whether the migration was a success or a failure
1102

1103
  """
1104
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1105
  try:
1106
    hyper.FinalizeMigration(instance, info, success)
1107
  except errors.HypervisorError, err:
1108
    _Fail("Failed to finalize migration: %s", err, exc=True)
1109

    
1110

    
1111
def MigrateInstance(instance, target, live):
1112
  """Migrates an instance to another node.
1113

1114
  @type instance: L{objects.Instance}
1115
  @param instance: the instance definition
1116
  @type target: string
1117
  @param target: the target node name
1118
  @type live: boolean
1119
  @param live: whether the migration should be done live or not (the
1120
      interpretation of this parameter is left to the hypervisor)
1121
  @rtype: tuple
1122
  @return: a tuple of (success, msg) where:
1123
      - succes is a boolean denoting the success/failure of the operation
1124
      - msg is a string with details in case of failure
1125

1126
  """
1127
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1128

    
1129
  try:
1130
    hyper.MigrateInstance(instance.name, target, live)
1131
  except errors.HypervisorError, err:
1132
    _Fail("Failed to migrate instance: %s", err, exc=True)
1133

    
1134

    
1135
def BlockdevCreate(disk, size, owner, on_primary, info):
1136
  """Creates a block device for an instance.
1137

1138
  @type disk: L{objects.Disk}
1139
  @param disk: the object describing the disk we should create
1140
  @type size: int
1141
  @param size: the size of the physical underlying device, in MiB
1142
  @type owner: str
1143
  @param owner: the name of the instance for which disk is created,
1144
      used for device cache data
1145
  @type on_primary: boolean
1146
  @param on_primary:  indicates if it is the primary node or not
1147
  @type info: string
1148
  @param info: string that will be sent to the physical device
1149
      creation, used for example to set (LVM) tags on LVs
1150

1151
  @return: the new unique_id of the device (this can sometime be
1152
      computed only after creation), or None. On secondary nodes,
1153
      it's not required to return anything.
1154

1155
  """
1156
  clist = []
1157
  if disk.children:
1158
    for child in disk.children:
1159
      try:
1160
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1161
      except errors.BlockDeviceError, err:
1162
        _Fail("Can't assemble device %s: %s", child, err)
1163
      if on_primary or disk.AssembleOnSecondary():
1164
        # we need the children open in case the device itself has to
1165
        # be assembled
1166
        try:
1167
          crdev.Open()
1168
        except errors.BlockDeviceError, err:
1169
          _Fail("Can't make child '%s' read-write: %s", child, err)
1170
      clist.append(crdev)
1171

    
1172
  try:
1173
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1174
  except errors.BlockDeviceError, err:
1175
    _Fail("Can't create block device: %s", err)
1176

    
1177
  if on_primary or disk.AssembleOnSecondary():
1178
    try:
1179
      device.Assemble()
1180
    except errors.BlockDeviceError, err:
1181
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1182
    device.SetSyncSpeed(constants.SYNC_SPEED)
1183
    if on_primary or disk.OpenOnSecondary():
1184
      try:
1185
        device.Open(force=True)
1186
      except errors.BlockDeviceError, err:
1187
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1188
    DevCacheManager.UpdateCache(device.dev_path, owner,
1189
                                on_primary, disk.iv_name)
1190

    
1191
  device.SetInfo(info)
1192

    
1193
  return device.unique_id
1194

    
1195

    
1196
def BlockdevRemove(disk):
1197
  """Remove a block device.
1198

1199
  @note: This is intended to be called recursively.
1200

1201
  @type disk: L{objects.Disk}
1202
  @param disk: the disk object we should remove
1203
  @rtype: boolean
1204
  @return: the success of the operation
1205

1206
  """
1207
  msgs = []
1208
  try:
1209
    rdev = _RecursiveFindBD(disk)
1210
  except errors.BlockDeviceError, err:
1211
    # probably can't attach
1212
    logging.info("Can't attach to device %s in remove", disk)
1213
    rdev = None
1214
  if rdev is not None:
1215
    r_path = rdev.dev_path
1216
    try:
1217
      rdev.Remove()
1218
    except errors.BlockDeviceError, err:
1219
      msgs.append(str(err))
1220
    if not msgs:
1221
      DevCacheManager.RemoveCache(r_path)
1222

    
1223
  if disk.children:
1224
    for child in disk.children:
1225
      try:
1226
        BlockdevRemove(child)
1227
      except RPCFail, err:
1228
        msgs.append(str(err))
1229

    
1230
  if msgs:
1231
    _Fail("; ".join(msgs))
1232

    
1233

    
1234
def _RecursiveAssembleBD(disk, owner, as_primary):
1235
  """Activate a block device for an instance.
1236

1237
  This is run on the primary and secondary nodes for an instance.
1238

1239
  @note: this function is called recursively.
1240

1241
  @type disk: L{objects.Disk}
1242
  @param disk: the disk we try to assemble
1243
  @type owner: str
1244
  @param owner: the name of the instance which owns the disk
1245
  @type as_primary: boolean
1246
  @param as_primary: if we should make the block device
1247
      read/write
1248

1249
  @return: the assembled device or None (in case no device
1250
      was assembled)
1251
  @raise errors.BlockDeviceError: in case there is an error
1252
      during the activation of the children or the device
1253
      itself
1254

1255
  """
1256
  children = []
1257
  if disk.children:
1258
    mcn = disk.ChildrenNeeded()
1259
    if mcn == -1:
1260
      mcn = 0 # max number of Nones allowed
1261
    else:
1262
      mcn = len(disk.children) - mcn # max number of Nones
1263
    for chld_disk in disk.children:
1264
      try:
1265
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1266
      except errors.BlockDeviceError, err:
1267
        if children.count(None) >= mcn:
1268
          raise
1269
        cdev = None
1270
        logging.error("Error in child activation (but continuing): %s",
1271
                      str(err))
1272
      children.append(cdev)
1273

    
1274
  if as_primary or disk.AssembleOnSecondary():
1275
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1276
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1277
    result = r_dev
1278
    if as_primary or disk.OpenOnSecondary():
1279
      r_dev.Open()
1280
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1281
                                as_primary, disk.iv_name)
1282

    
1283
  else:
1284
    result = True
1285
  return result
1286

    
1287

    
1288
def BlockdevAssemble(disk, owner, as_primary):
1289
  """Activate a block device for an instance.
1290

1291
  This is a wrapper over _RecursiveAssembleBD.
1292

1293
  @rtype: str or boolean
1294
  @return: a C{/dev/...} path for primary nodes, and
1295
      C{True} for secondary nodes
1296

1297
  """
1298
  try:
1299
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1300
    if isinstance(result, bdev.BlockDev):
1301
      result = result.dev_path
1302
  except errors.BlockDeviceError, err:
1303
    _Fail("Error while assembling disk: %s", err, exc=True)
1304

    
1305
  return result
1306

    
1307

    
1308
def BlockdevShutdown(disk):
1309
  """Shut down a block device.
1310

1311
  First, if the device is assembled (Attach() is successful), then
1312
  the device is shutdown. Then the children of the device are
1313
  shutdown.
1314

1315
  This function is called recursively. Note that we don't cache the
1316
  children or such, as oppossed to assemble, shutdown of different
1317
  devices doesn't require that the upper device was active.
1318

1319
  @type disk: L{objects.Disk}
1320
  @param disk: the description of the disk we should
1321
      shutdown
1322
  @rtype: None
1323

1324
  """
1325
  msgs = []
1326
  r_dev = _RecursiveFindBD(disk)
1327
  if r_dev is not None:
1328
    r_path = r_dev.dev_path
1329
    try:
1330
      r_dev.Shutdown()
1331
      DevCacheManager.RemoveCache(r_path)
1332
    except errors.BlockDeviceError, err:
1333
      msgs.append(str(err))
1334

    
1335
  if disk.children:
1336
    for child in disk.children:
1337
      try:
1338
        BlockdevShutdown(child)
1339
      except RPCFail, err:
1340
        msgs.append(str(err))
1341

    
1342
  if msgs:
1343
    _Fail("; ".join(msgs))
1344

    
1345

    
1346
def BlockdevAddchildren(parent_cdev, new_cdevs):
1347
  """Extend a mirrored block device.
1348

1349
  @type parent_cdev: L{objects.Disk}
1350
  @param parent_cdev: the disk to which we should add children
1351
  @type new_cdevs: list of L{objects.Disk}
1352
  @param new_cdevs: the list of children which we should add
1353
  @rtype: None
1354

1355
  """
1356
  parent_bdev = _RecursiveFindBD(parent_cdev)
1357
  if parent_bdev is None:
1358
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1359
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1360
  if new_bdevs.count(None) > 0:
1361
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1362
  parent_bdev.AddChildren(new_bdevs)
1363

    
1364

    
1365
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1366
  """Shrink a mirrored block device.
1367

1368
  @type parent_cdev: L{objects.Disk}
1369
  @param parent_cdev: the disk from which we should remove children
1370
  @type new_cdevs: list of L{objects.Disk}
1371
  @param new_cdevs: the list of children which we should remove
1372
  @rtype: None
1373

1374
  """
1375
  parent_bdev = _RecursiveFindBD(parent_cdev)
1376
  if parent_bdev is None:
1377
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1378
  devs = []
1379
  for disk in new_cdevs:
1380
    rpath = disk.StaticDevPath()
1381
    if rpath is None:
1382
      bd = _RecursiveFindBD(disk)
1383
      if bd is None:
1384
        _Fail("Can't find device %s while removing children", disk)
1385
      else:
1386
        devs.append(bd.dev_path)
1387
    else:
1388
      devs.append(rpath)
1389
  parent_bdev.RemoveChildren(devs)
1390

    
1391

    
1392
def BlockdevGetmirrorstatus(disks):
1393
  """Get the mirroring status of a list of devices.
1394

1395
  @type disks: list of L{objects.Disk}
1396
  @param disks: the list of disks which we should query
1397
  @rtype: disk
1398
  @return:
1399
      a list of (mirror_done, estimated_time) tuples, which
1400
      are the result of L{bdev.BlockDev.CombinedSyncStatus}
1401
  @raise errors.BlockDeviceError: if any of the disks cannot be
1402
      found
1403

1404
  """
1405
  stats = []
1406
  for dsk in disks:
1407
    rbd = _RecursiveFindBD(dsk)
1408
    if rbd is None:
1409
      _Fail("Can't find device %s", dsk)
1410

    
1411
    stats.append(rbd.CombinedSyncStatus())
1412

    
1413
  return stats
1414

    
1415

    
1416
def _RecursiveFindBD(disk):
1417
  """Check if a device is activated.
1418

1419
  If so, return information about the real device.
1420

1421
  @type disk: L{objects.Disk}
1422
  @param disk: the disk object we need to find
1423

1424
  @return: None if the device can't be found,
1425
      otherwise the device instance
1426

1427
  """
1428
  children = []
1429
  if disk.children:
1430
    for chdisk in disk.children:
1431
      children.append(_RecursiveFindBD(chdisk))
1432

    
1433
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1434

    
1435

    
1436
def BlockdevFind(disk):
1437
  """Check if a device is activated.
1438

1439
  If it is, return information about the real device.
1440

1441
  @type disk: L{objects.Disk}
1442
  @param disk: the disk to find
1443
  @rtype: None or objects.BlockDevStatus
1444
  @return: None if the disk cannot be found, otherwise a the current
1445
           information
1446

1447
  """
1448
  try:
1449
    rbd = _RecursiveFindBD(disk)
1450
  except errors.BlockDeviceError, err:
1451
    _Fail("Failed to find device: %s", err, exc=True)
1452

    
1453
  if rbd is None:
1454
    return None
1455

    
1456
  return rbd.GetSyncStatus()
1457

    
1458

    
1459
def BlockdevGetsize(disks):
1460
  """Computes the size of the given disks.
1461

1462
  If a disk is not found, returns None instead.
1463

1464
  @type disks: list of L{objects.Disk}
1465
  @param disks: the list of disk to compute the size for
1466
  @rtype: list
1467
  @return: list with elements None if the disk cannot be found,
1468
      otherwise the size
1469

1470
  """
1471
  result = []
1472
  for cf in disks:
1473
    try:
1474
      rbd = _RecursiveFindBD(cf)
1475
    except errors.BlockDeviceError, err:
1476
      result.append(None)
1477
      continue
1478
    if rbd is None:
1479
      result.append(None)
1480
    else:
1481
      result.append(rbd.GetActualSize())
1482
  return result
1483

    
1484

    
1485
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1486
  """Export a block device to a remote node.
1487

1488
  @type disk: L{objects.Disk}
1489
  @param disk: the description of the disk to export
1490
  @type dest_node: str
1491
  @param dest_node: the destination node to export to
1492
  @type dest_path: str
1493
  @param dest_path: the destination path on the target node
1494
  @type cluster_name: str
1495
  @param cluster_name: the cluster name, needed for SSH hostalias
1496
  @rtype: None
1497

1498
  """
1499
  real_disk = _RecursiveFindBD(disk)
1500
  if real_disk is None:
1501
    _Fail("Block device '%s' is not set up", disk)
1502

    
1503
  real_disk.Open()
1504

    
1505
  # the block size on the read dd is 1MiB to match our units
1506
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1507
                               "dd if=%s bs=1048576 count=%s",
1508
                               real_disk.dev_path, str(disk.size))
1509

    
1510
  # we set here a smaller block size as, due to ssh buffering, more
1511
  # than 64-128k will mostly ignored; we use nocreat to fail if the
1512
  # device is not already there or we pass a wrong path; we use
1513
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1514
  # to not buffer too much memory; this means that at best, we flush
1515
  # every 64k, which will not be very fast
1516
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1517
                                " oflag=dsync", dest_path)
1518

    
1519
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1520
                                                   constants.GANETI_RUNAS,
1521
                                                   destcmd)
1522

    
1523
  # all commands have been checked, so we're safe to combine them
1524
  command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1525

    
1526
  result = utils.RunCmd(["bash", "-c", command])
1527

    
1528
  if result.failed:
1529
    _Fail("Disk copy command '%s' returned error: %s"
1530
          " output: %s", command, result.fail_reason, result.output)
1531

    
1532

    
1533
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1534
  """Write a file to the filesystem.
1535

1536
  This allows the master to overwrite(!) a file. It will only perform
1537
  the operation if the file belongs to a list of configuration files.
1538

1539
  @type file_name: str
1540
  @param file_name: the target file name
1541
  @type data: str
1542
  @param data: the new contents of the file
1543
  @type mode: int
1544
  @param mode: the mode to give the file (can be None)
1545
  @type uid: int
1546
  @param uid: the owner of the file (can be -1 for default)
1547
  @type gid: int
1548
  @param gid: the group of the file (can be -1 for default)
1549
  @type atime: float
1550
  @param atime: the atime to set on the file (can be None)
1551
  @type mtime: float
1552
  @param mtime: the mtime to set on the file (can be None)
1553
  @rtype: None
1554

1555
  """
1556
  if not os.path.isabs(file_name):
1557
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1558

    
1559
  if file_name not in _ALLOWED_UPLOAD_FILES:
1560
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1561
          file_name)
1562

    
1563
  raw_data = _Decompress(data)
1564

    
1565
  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1566
                  atime=atime, mtime=mtime)
1567

    
1568

    
1569
def WriteSsconfFiles(values):
1570
  """Update all ssconf files.
1571

1572
  Wrapper around the SimpleStore.WriteFiles.
1573

1574
  """
1575
  ssconf.SimpleStore().WriteFiles(values)
1576

    
1577

    
1578
def _ErrnoOrStr(err):
1579
  """Format an EnvironmentError exception.
1580

1581
  If the L{err} argument has an errno attribute, it will be looked up
1582
  and converted into a textual C{E...} description. Otherwise the
1583
  string representation of the error will be returned.
1584

1585
  @type err: L{EnvironmentError}
1586
  @param err: the exception to format
1587

1588
  """
1589
  if hasattr(err, 'errno'):
1590
    detail = errno.errorcode[err.errno]
1591
  else:
1592
    detail = str(err)
1593
  return detail
1594

    
1595

    
1596
def _OSOndiskAPIVersion(name, os_dir):
1597
  """Compute and return the API version of a given OS.
1598

1599
  This function will try to read the API version of the OS given by
1600
  the 'name' parameter and residing in the 'os_dir' directory.
1601

1602
  @type name: str
1603
  @param name: the OS name we should look for
1604
  @type os_dir: str
1605
  @param os_dir: the directory inwhich we should look for the OS
1606
  @rtype: tuple
1607
  @return: tuple (status, data) with status denoting the validity and
1608
      data holding either the vaid versions or an error message
1609

1610
  """
1611
  api_file = os.path.sep.join([os_dir, constants.OS_API_FILE])
1612

    
1613
  try:
1614
    st = os.stat(api_file)
1615
  except EnvironmentError, err:
1616
    return False, ("Required file '%s' not found under path %s: %s" %
1617
                   (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1618

    
1619
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1620
    return False, ("File '%s' in %s is not a regular file" %
1621
                   (constants.OS_API_FILE, os_dir))
1622

    
1623
  try:
1624
    api_versions = utils.ReadFile(api_file).splitlines()
1625
  except EnvironmentError, err:
1626
    return False, ("Error while reading the API version file at %s: %s" %
1627
                   (api_file, _ErrnoOrStr(err)))
1628

    
1629
  try:
1630
    api_versions = [int(version.strip()) for version in api_versions]
1631
  except (TypeError, ValueError), err:
1632
    return False, ("API version(s) can't be converted to integer: %s" %
1633
                   str(err))
1634

    
1635
  return True, api_versions
1636

    
1637

    
1638
def DiagnoseOS(top_dirs=None):
1639
  """Compute the validity for all OSes.
1640

1641
  @type top_dirs: list
1642
  @param top_dirs: the list of directories in which to
1643
      search (if not given defaults to
1644
      L{constants.OS_SEARCH_PATH})
1645
  @rtype: list of L{objects.OS}
1646
  @return: a list of tuples (name, path, status, diagnose, variants)
1647
      for all (potential) OSes under all search paths, where:
1648
          - name is the (potential) OS name
1649
          - path is the full path to the OS
1650
          - status True/False is the validity of the OS
1651
          - diagnose is the error message for an invalid OS, otherwise empty
1652
          - variants is a list of supported OS variants, if any
1653

1654
  """
1655
  if top_dirs is None:
1656
    top_dirs = constants.OS_SEARCH_PATH
1657

    
1658
  result = []
1659
  for dir_name in top_dirs:
1660
    if os.path.isdir(dir_name):
1661
      try:
1662
        f_names = utils.ListVisibleFiles(dir_name)
1663
      except EnvironmentError, err:
1664
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1665
        break
1666
      for name in f_names:
1667
        os_path = os.path.sep.join([dir_name, name])
1668
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1669
        if status:
1670
          diagnose = ""
1671
          variants = os_inst.supported_variants
1672
        else:
1673
          diagnose = os_inst
1674
          variants = []
1675
        result.append((name, os_path, status, diagnose, variants))
1676

    
1677
  return result
1678

    
1679

    
1680
def _TryOSFromDisk(name, base_dir=None):
1681
  """Create an OS instance from disk.
1682

1683
  This function will return an OS instance if the given name is a
1684
  valid OS name.
1685

1686
  @type base_dir: string
1687
  @keyword base_dir: Base directory containing OS installations.
1688
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1689
  @rtype: tuple
1690
  @return: success and either the OS instance if we find a valid one,
1691
      or error message
1692

1693
  """
1694
  if base_dir is None:
1695
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1696
    if os_dir is None:
1697
      return False, "Directory for OS %s not found in search path" % name
1698
  else:
1699
    os_dir = os.path.sep.join([base_dir, name])
1700

    
1701
  status, api_versions = _OSOndiskAPIVersion(name, os_dir)
1702
  if not status:
1703
    # push the error up
1704
    return status, api_versions
1705

    
1706
  if not constants.OS_API_VERSIONS.intersection(api_versions):
1707
    return False, ("API version mismatch for path '%s': found %s, want %s." %
1708
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
1709

    
1710
  # OS Files dictionary, we will populate it with the absolute path names
1711
  os_files = dict.fromkeys(constants.OS_SCRIPTS)
1712

    
1713
  if max(api_versions) >= constants.OS_API_V15:
1714
    os_files[constants.OS_VARIANTS_FILE] = ''
1715

    
1716
  for name in os_files:
1717
    os_files[name] = os.path.sep.join([os_dir, name])
1718

    
1719
    try:
1720
      st = os.stat(os_files[name])
1721
    except EnvironmentError, err:
1722
      return False, ("File '%s' under path '%s' is missing (%s)" %
1723
                     (name, os_dir, _ErrnoOrStr(err)))
1724

    
1725
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1726
      return False, ("File '%s' under path '%s' is not a regular file" %
1727
                     (name, os_dir))
1728

    
1729
    if name in constants.OS_SCRIPTS:
1730
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1731
        return False, ("File '%s' under path '%s' is not executable" %
1732
                       (name, os_dir))
1733

    
1734
  variants = None
1735
  if constants.OS_VARIANTS_FILE in os_files:
1736
    variants_file = os_files[constants.OS_VARIANTS_FILE]
1737
    try:
1738
      variants = utils.ReadFile(variants_file).splitlines()
1739
    except EnvironmentError, err:
1740
      return False, ("Error while reading the OS variants file at %s: %s" %
1741
                     (variants_file, _ErrnoOrStr(err)))
1742
    if not variants:
1743
      return False, ("No supported os variant found")
1744

    
1745
  os_obj = objects.OS(name=name, path=os_dir,
1746
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
1747
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
1748
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
1749
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
1750
                      supported_variants=variants,
1751
                      api_versions=api_versions)
1752
  return True, os_obj
1753

    
1754

    
1755
def OSFromDisk(name, base_dir=None):
1756
  """Create an OS instance from disk.
1757

1758
  This function will return an OS instance if the given name is a
1759
  valid OS name. Otherwise, it will raise an appropriate
1760
  L{RPCFail} exception, detailing why this is not a valid OS.
1761

1762
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1763
  an exception but returns true/false status data.
1764

1765
  @type base_dir: string
1766
  @keyword base_dir: Base directory containing OS installations.
1767
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1768
  @rtype: L{objects.OS}
1769
  @return: the OS instance if we find a valid one
1770
  @raise RPCFail: if we don't find a valid OS
1771

1772
  """
1773
  name_only = name.split("+", 1)[0]
1774
  status, payload = _TryOSFromDisk(name_only, base_dir)
1775

    
1776
  if not status:
1777
    _Fail(payload)
1778

    
1779
  return payload
1780

    
1781

    
1782
def OSEnvironment(instance, os, debug=0):
1783
  """Calculate the environment for an os script.
1784

1785
  @type instance: L{objects.Instance}
1786
  @param instance: target instance for the os script run
1787
  @type os: L{objects.OS}
1788
  @param os: operating system for which the environment is being built
1789
  @type debug: integer
1790
  @param debug: debug level (0 or 1, for OS Api 10)
1791
  @rtype: dict
1792
  @return: dict of environment variables
1793
  @raise errors.BlockDeviceError: if the block device
1794
      cannot be found
1795

1796
  """
1797
  result = {}
1798
  api_version = max(constants.OS_API_VERSIONS.intersection(os.api_versions))
1799
  result['OS_API_VERSION'] = '%d' % api_version
1800
  result['INSTANCE_NAME'] = instance.name
1801
  result['INSTANCE_OS'] = instance.os
1802
  result['HYPERVISOR'] = instance.hypervisor
1803
  result['DISK_COUNT'] = '%d' % len(instance.disks)
1804
  result['NIC_COUNT'] = '%d' % len(instance.nics)
1805
  result['DEBUG_LEVEL'] = '%d' % debug
1806
  if api_version >= constants.OS_API_V15:
1807
    try:
1808
      variant = instance.os.split('+', 1)[1]
1809
    except IndexError:
1810
      variant = os.supported_variants[0]
1811
    result['OS_VARIANT'] = variant
1812
  for idx, disk in enumerate(instance.disks):
1813
    real_disk = _RecursiveFindBD(disk)
1814
    if real_disk is None:
1815
      raise errors.BlockDeviceError("Block device '%s' is not set up" %
1816
                                    str(disk))
1817
    real_disk.Open()
1818
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
1819
    result['DISK_%d_ACCESS' % idx] = disk.mode
1820
    if constants.HV_DISK_TYPE in instance.hvparams:
1821
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
1822
        instance.hvparams[constants.HV_DISK_TYPE]
1823
    if disk.dev_type in constants.LDS_BLOCK:
1824
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1825
    elif disk.dev_type == constants.LD_FILE:
1826
      result['DISK_%d_BACKEND_TYPE' % idx] = \
1827
        'file:%s' % disk.physical_id[0]
1828
  for idx, nic in enumerate(instance.nics):
1829
    result['NIC_%d_MAC' % idx] = nic.mac
1830
    if nic.ip:
1831
      result['NIC_%d_IP' % idx] = nic.ip
1832
    result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1833
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1834
      result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1835
    if nic.nicparams[constants.NIC_LINK]:
1836
      result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1837
    if constants.HV_NIC_TYPE in instance.hvparams:
1838
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
1839
        instance.hvparams[constants.HV_NIC_TYPE]
1840

    
1841
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1842
    for key, value in source.items():
1843
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1844

    
1845
  return result
1846

    
1847
def BlockdevGrow(disk, amount):
1848
  """Grow a stack of block devices.
1849

1850
  This function is called recursively, with the childrens being the
1851
  first ones to resize.
1852

1853
  @type disk: L{objects.Disk}
1854
  @param disk: the disk to be grown
1855
  @rtype: (status, result)
1856
  @return: a tuple with the status of the operation
1857
      (True/False), and the errors message if status
1858
      is False
1859

1860
  """
1861
  r_dev = _RecursiveFindBD(disk)
1862
  if r_dev is None:
1863
    _Fail("Cannot find block device %s", disk)
1864

    
1865
  try:
1866
    r_dev.Grow(amount)
1867
  except errors.BlockDeviceError, err:
1868
    _Fail("Failed to grow block device: %s", err, exc=True)
1869

    
1870

    
1871
def BlockdevSnapshot(disk):
1872
  """Create a snapshot copy of a block device.
1873

1874
  This function is called recursively, and the snapshot is actually created
1875
  just for the leaf lvm backend device.
1876

1877
  @type disk: L{objects.Disk}
1878
  @param disk: the disk to be snapshotted
1879
  @rtype: string
1880
  @return: snapshot disk path
1881

1882
  """
1883
  if disk.children:
1884
    if len(disk.children) == 1:
1885
      # only one child, let's recurse on it
1886
      return BlockdevSnapshot(disk.children[0])
1887
    else:
1888
      # more than one child, choose one that matches
1889
      for child in disk.children:
1890
        if child.size == disk.size:
1891
          # return implies breaking the loop
1892
          return BlockdevSnapshot(child)
1893
  elif disk.dev_type == constants.LD_LV:
1894
    r_dev = _RecursiveFindBD(disk)
1895
    if r_dev is not None:
1896
      # let's stay on the safe side and ask for the full size, for now
1897
      return r_dev.Snapshot(disk.size)
1898
    else:
1899
      _Fail("Cannot find block device %s", disk)
1900
  else:
1901
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1902
          disk.unique_id, disk.dev_type)
1903

    
1904

    
1905
def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1906
  """Export a block device snapshot to a remote node.
1907

1908
  @type disk: L{objects.Disk}
1909
  @param disk: the description of the disk to export
1910
  @type dest_node: str
1911
  @param dest_node: the destination node to export to
1912
  @type instance: L{objects.Instance}
1913
  @param instance: the instance object to whom the disk belongs
1914
  @type cluster_name: str
1915
  @param cluster_name: the cluster name, needed for SSH hostalias
1916
  @type idx: int
1917
  @param idx: the index of the disk in the instance's disk list,
1918
      used to export to the OS scripts environment
1919
  @rtype: None
1920

1921
  """
1922
  inst_os = OSFromDisk(instance.os)
1923
  export_env = OSEnvironment(instance, inst_os)
1924

    
1925
  export_script = inst_os.export_script
1926

    
1927
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1928
                                     instance.name, int(time.time()))
1929
  if not os.path.exists(constants.LOG_OS_DIR):
1930
    os.mkdir(constants.LOG_OS_DIR, 0750)
1931
  real_disk = _RecursiveFindBD(disk)
1932
  if real_disk is None:
1933
    _Fail("Block device '%s' is not set up", disk)
1934

    
1935
  real_disk.Open()
1936

    
1937
  export_env['EXPORT_DEVICE'] = real_disk.dev_path
1938
  export_env['EXPORT_INDEX'] = str(idx)
1939

    
1940
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1941
  destfile = disk.physical_id[1]
1942

    
1943
  # the target command is built out of three individual commands,
1944
  # which are joined by pipes; we check each individual command for
1945
  # valid parameters
1946
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
1947
                               inst_os.path, export_script, logfile)
1948

    
1949
  comprcmd = "gzip"
1950

    
1951
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1952
                                destdir, destdir, destfile)
1953
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1954
                                                   constants.GANETI_RUNAS,
1955
                                                   destcmd)
1956

    
1957
  # all commands have been checked, so we're safe to combine them
1958
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1959

    
1960
  result = utils.RunCmd(["bash", "-c", command], env=export_env)
1961

    
1962
  if result.failed:
1963
    _Fail("OS snapshot export command '%s' returned error: %s"
1964
          " output: %s", command, result.fail_reason, result.output)
1965

    
1966

    
1967
def FinalizeExport(instance, snap_disks):
1968
  """Write out the export configuration information.
1969

1970
  @type instance: L{objects.Instance}
1971
  @param instance: the instance which we export, used for
1972
      saving configuration
1973
  @type snap_disks: list of L{objects.Disk}
1974
  @param snap_disks: list of snapshot block devices, which
1975
      will be used to get the actual name of the dump file
1976

1977
  @rtype: None
1978

1979
  """
1980
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1981
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1982

    
1983
  config = objects.SerializableConfigParser()
1984

    
1985
  config.add_section(constants.INISECT_EXP)
1986
  config.set(constants.INISECT_EXP, 'version', '0')
1987
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1988
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1989
  config.set(constants.INISECT_EXP, 'os', instance.os)
1990
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
1991

    
1992
  config.add_section(constants.INISECT_INS)
1993
  config.set(constants.INISECT_INS, 'name', instance.name)
1994
  config.set(constants.INISECT_INS, 'memory', '%d' %
1995
             instance.beparams[constants.BE_MEMORY])
1996
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
1997
             instance.beparams[constants.BE_VCPUS])
1998
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1999

    
2000
  nic_total = 0
2001
  for nic_count, nic in enumerate(instance.nics):
2002
    nic_total += 1
2003
    config.set(constants.INISECT_INS, 'nic%d_mac' %
2004
               nic_count, '%s' % nic.mac)
2005
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2006
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
2007
               '%s' % nic.bridge)
2008
  # TODO: redundant: on load can read nics until it doesn't exist
2009
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2010

    
2011
  disk_total = 0
2012
  for disk_count, disk in enumerate(snap_disks):
2013
    if disk:
2014
      disk_total += 1
2015
      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2016
                 ('%s' % disk.iv_name))
2017
      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2018
                 ('%s' % disk.physical_id[1]))
2019
      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2020
                 ('%d' % disk.size))
2021

    
2022
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2023

    
2024
  utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
2025
                  data=config.Dumps())
2026
  shutil.rmtree(finaldestdir, True)
2027
  shutil.move(destdir, finaldestdir)
2028

    
2029

    
2030
def ExportInfo(dest):
2031
  """Get export configuration information.
2032

2033
  @type dest: str
2034
  @param dest: directory containing the export
2035

2036
  @rtype: L{objects.SerializableConfigParser}
2037
  @return: a serializable config file containing the
2038
      export info
2039

2040
  """
2041
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
2042

    
2043
  config = objects.SerializableConfigParser()
2044
  config.read(cff)
2045

    
2046
  if (not config.has_section(constants.INISECT_EXP) or
2047
      not config.has_section(constants.INISECT_INS)):
2048
    _Fail("Export info file doesn't have the required fields")
2049

    
2050
  return config.Dumps()
2051

    
2052

    
2053
def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
2054
  """Import an os image into an instance.
2055

2056
  @type instance: L{objects.Instance}
2057
  @param instance: instance to import the disks into
2058
  @type src_node: string
2059
  @param src_node: source node for the disk images
2060
  @type src_images: list of string
2061
  @param src_images: absolute paths of the disk images
2062
  @rtype: list of boolean
2063
  @return: each boolean represent the success of importing the n-th disk
2064

2065
  """
2066
  inst_os = OSFromDisk(instance.os)
2067
  import_env = OSEnvironment(instance, inst_os)
2068
  import_script = inst_os.import_script
2069

    
2070
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
2071
                                        instance.name, int(time.time()))
2072
  if not os.path.exists(constants.LOG_OS_DIR):
2073
    os.mkdir(constants.LOG_OS_DIR, 0750)
2074

    
2075
  comprcmd = "gunzip"
2076
  impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
2077
                               import_script, logfile)
2078

    
2079
  final_result = []
2080
  for idx, image in enumerate(src_images):
2081
    if image:
2082
      destcmd = utils.BuildShellCmd('cat %s', image)
2083
      remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
2084
                                                       constants.GANETI_RUNAS,
2085
                                                       destcmd)
2086
      command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
2087
      import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
2088
      import_env['IMPORT_INDEX'] = str(idx)
2089
      result = utils.RunCmd(command, env=import_env)
2090
      if result.failed:
2091
        logging.error("Disk import command '%s' returned error: %s"
2092
                      " output: %s", command, result.fail_reason,
2093
                      result.output)
2094
        final_result.append("error importing disk %d: %s, %s" %
2095
                            (idx, result.fail_reason, result.output[-100]))
2096

    
2097
  if final_result:
2098
    _Fail("; ".join(final_result), log=False)
2099

    
2100

    
2101
def ListExports():
2102
  """Return a list of exports currently available on this machine.
2103

2104
  @rtype: list
2105
  @return: list of the exports
2106

2107
  """
2108
  if os.path.isdir(constants.EXPORT_DIR):
2109
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
2110
  else:
2111
    _Fail("No exports directory")
2112

    
2113

    
2114
def RemoveExport(export):
2115
  """Remove an existing export from the node.
2116

2117
  @type export: str
2118
  @param export: the name of the export to remove
2119
  @rtype: None
2120

2121
  """
2122
  target = os.path.join(constants.EXPORT_DIR, export)
2123

    
2124
  try:
2125
    shutil.rmtree(target)
2126
  except EnvironmentError, err:
2127
    _Fail("Error while removing the export: %s", err, exc=True)
2128

    
2129

    
2130
def BlockdevRename(devlist):
2131
  """Rename a list of block devices.
2132

2133
  @type devlist: list of tuples
2134
  @param devlist: list of tuples of the form  (disk,
2135
      new_logical_id, new_physical_id); disk is an
2136
      L{objects.Disk} object describing the current disk,
2137
      and new logical_id/physical_id is the name we
2138
      rename it to
2139
  @rtype: boolean
2140
  @return: True if all renames succeeded, False otherwise
2141

2142
  """
2143
  msgs = []
2144
  result = True
2145
  for disk, unique_id in devlist:
2146
    dev = _RecursiveFindBD(disk)
2147
    if dev is None:
2148
      msgs.append("Can't find device %s in rename" % str(disk))
2149
      result = False
2150
      continue
2151
    try:
2152
      old_rpath = dev.dev_path
2153
      dev.Rename(unique_id)
2154
      new_rpath = dev.dev_path
2155
      if old_rpath != new_rpath:
2156
        DevCacheManager.RemoveCache(old_rpath)
2157
        # FIXME: we should add the new cache information here, like:
2158
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2159
        # but we don't have the owner here - maybe parse from existing
2160
        # cache? for now, we only lose lvm data when we rename, which
2161
        # is less critical than DRBD or MD
2162
    except errors.BlockDeviceError, err:
2163
      msgs.append("Can't rename device '%s' to '%s': %s" %
2164
                  (dev, unique_id, err))
2165
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2166
      result = False
2167
  if not result:
2168
    _Fail("; ".join(msgs))
2169

    
2170

    
2171
def _TransformFileStorageDir(file_storage_dir):
2172
  """Checks whether given file_storage_dir is valid.
2173

2174
  Checks wheter the given file_storage_dir is within the cluster-wide
2175
  default file_storage_dir stored in SimpleStore. Only paths under that
2176
  directory are allowed.
2177

2178
  @type file_storage_dir: str
2179
  @param file_storage_dir: the path to check
2180

2181
  @return: the normalized path if valid, None otherwise
2182

2183
  """
2184
  cfg = _GetConfig()
2185
  file_storage_dir = os.path.normpath(file_storage_dir)
2186
  base_file_storage_dir = cfg.GetFileStorageDir()
2187
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2188
      base_file_storage_dir):
2189
    _Fail("File storage directory '%s' is not under base file"
2190
          " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2191
  return file_storage_dir
2192

    
2193

    
2194
def CreateFileStorageDir(file_storage_dir):
2195
  """Create file storage directory.
2196

2197
  @type file_storage_dir: str
2198
  @param file_storage_dir: directory to create
2199

2200
  @rtype: tuple
2201
  @return: tuple with first element a boolean indicating wheter dir
2202
      creation was successful or not
2203

2204
  """
2205
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2206
  if os.path.exists(file_storage_dir):
2207
    if not os.path.isdir(file_storage_dir):
2208
      _Fail("Specified storage dir '%s' is not a directory",
2209
            file_storage_dir)
2210
  else:
2211
    try:
2212
      os.makedirs(file_storage_dir, 0750)
2213
    except OSError, err:
2214
      _Fail("Cannot create file storage directory '%s': %s",
2215
            file_storage_dir, err, exc=True)
2216

    
2217

    
2218
def RemoveFileStorageDir(file_storage_dir):
2219
  """Remove file storage directory.
2220

2221
  Remove it only if it's empty. If not log an error and return.
2222

2223
  @type file_storage_dir: str
2224
  @param file_storage_dir: the directory we should cleanup
2225
  @rtype: tuple (success,)
2226
  @return: tuple of one element, C{success}, denoting
2227
      whether the operation was successful
2228

2229
  """
2230
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2231
  if os.path.exists(file_storage_dir):
2232
    if not os.path.isdir(file_storage_dir):
2233
      _Fail("Specified Storage directory '%s' is not a directory",
2234
            file_storage_dir)
2235
    # deletes dir only if empty, otherwise we want to fail the rpc call
2236
    try:
2237
      os.rmdir(file_storage_dir)
2238
    except OSError, err:
2239
      _Fail("Cannot remove file storage directory '%s': %s",
2240
            file_storage_dir, err)
2241

    
2242

    
2243
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2244
  """Rename the file storage directory.
2245

2246
  @type old_file_storage_dir: str
2247
  @param old_file_storage_dir: the current path
2248
  @type new_file_storage_dir: str
2249
  @param new_file_storage_dir: the name we should rename to
2250
  @rtype: tuple (success,)
2251
  @return: tuple of one element, C{success}, denoting
2252
      whether the operation was successful
2253

2254
  """
2255
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2256
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2257
  if not os.path.exists(new_file_storage_dir):
2258
    if os.path.isdir(old_file_storage_dir):
2259
      try:
2260
        os.rename(old_file_storage_dir, new_file_storage_dir)
2261
      except OSError, err:
2262
        _Fail("Cannot rename '%s' to '%s': %s",
2263
              old_file_storage_dir, new_file_storage_dir, err)
2264
    else:
2265
      _Fail("Specified storage dir '%s' is not a directory",
2266
            old_file_storage_dir)
2267
  else:
2268
    if os.path.exists(old_file_storage_dir):
2269
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2270
            old_file_storage_dir, new_file_storage_dir)
2271

    
2272

    
2273
def _EnsureJobQueueFile(file_name):
2274
  """Checks whether the given filename is in the queue directory.
2275

2276
  @type file_name: str
2277
  @param file_name: the file name we should check
2278
  @rtype: None
2279
  @raises RPCFail: if the file is not valid
2280

2281
  """
2282
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2283
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2284

    
2285
  if not result:
2286
    _Fail("Passed job queue file '%s' does not belong to"
2287
          " the queue directory '%s'", file_name, queue_dir)
2288

    
2289

    
2290
def JobQueueUpdate(file_name, content):
2291
  """Updates a file in the queue directory.
2292

2293
  This is just a wrapper over L{utils.WriteFile}, with proper
2294
  checking.
2295

2296
  @type file_name: str
2297
  @param file_name: the job file name
2298
  @type content: str
2299
  @param content: the new job contents
2300
  @rtype: boolean
2301
  @return: the success of the operation
2302

2303
  """
2304
  _EnsureJobQueueFile(file_name)
2305

    
2306
  # Write and replace the file atomically
2307
  utils.WriteFile(file_name, data=_Decompress(content))
2308

    
2309

    
2310
def JobQueueRename(old, new):
2311
  """Renames a job queue file.
2312

2313
  This is just a wrapper over os.rename with proper checking.
2314

2315
  @type old: str
2316
  @param old: the old (actual) file name
2317
  @type new: str
2318
  @param new: the desired file name
2319
  @rtype: tuple
2320
  @return: the success of the operation and payload
2321

2322
  """
2323
  _EnsureJobQueueFile(old)
2324
  _EnsureJobQueueFile(new)
2325

    
2326
  utils.RenameFile(old, new, mkdir=True)
2327

    
2328

    
2329
def JobQueueSetDrainFlag(drain_flag):
2330
  """Set the drain flag for the queue.
2331

2332
  This will set or unset the queue drain flag.
2333

2334
  @type drain_flag: boolean
2335
  @param drain_flag: if True, will set the drain flag, otherwise reset it.
2336
  @rtype: truple
2337
  @return: always True, None
2338
  @warning: the function always returns True
2339

2340
  """
2341
  if drain_flag:
2342
    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2343
  else:
2344
    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2345

    
2346

    
2347
def BlockdevClose(instance_name, disks):
2348
  """Closes the given block devices.
2349

2350
  This means they will be switched to secondary mode (in case of
2351
  DRBD).
2352

2353
  @param instance_name: if the argument is not empty, the symlinks
2354
      of this instance will be removed
2355
  @type disks: list of L{objects.Disk}
2356
  @param disks: the list of disks to be closed
2357
  @rtype: tuple (success, message)
2358
  @return: a tuple of success and message, where success
2359
      indicates the succes of the operation, and message
2360
      which will contain the error details in case we
2361
      failed
2362

2363
  """
2364
  bdevs = []
2365
  for cf in disks:
2366
    rd = _RecursiveFindBD(cf)
2367
    if rd is None:
2368
      _Fail("Can't find device %s", cf)
2369
    bdevs.append(rd)
2370

    
2371
  msg = []
2372
  for rd in bdevs:
2373
    try:
2374
      rd.Close()
2375
    except errors.BlockDeviceError, err:
2376
      msg.append(str(err))
2377
  if msg:
2378
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2379
  else:
2380
    if instance_name:
2381
      _RemoveBlockDevLinks(instance_name, disks)
2382

    
2383

    
2384
def ValidateHVParams(hvname, hvparams):
2385
  """Validates the given hypervisor parameters.
2386

2387
  @type hvname: string
2388
  @param hvname: the hypervisor name
2389
  @type hvparams: dict
2390
  @param hvparams: the hypervisor parameters to be validated
2391
  @rtype: None
2392

2393
  """
2394
  try:
2395
    hv_type = hypervisor.GetHypervisor(hvname)
2396
    hv_type.ValidateParameters(hvparams)
2397
  except errors.HypervisorError, err:
2398
    _Fail(str(err), log=False)
2399

    
2400

    
2401
def DemoteFromMC():
2402
  """Demotes the current node from master candidate role.
2403

2404
  """
2405
  # try to ensure we're not the master by mistake
2406
  master, myself = ssconf.GetMasterAndMyself()
2407
  if master == myself:
2408
    _Fail("ssconf status shows I'm the master node, will not demote")
2409
  pid_file = utils.DaemonPidFileName(constants.MASTERD)
2410
  if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2411
    _Fail("The master daemon is running, will not demote")
2412
  try:
2413
    if os.path.isfile(constants.CLUSTER_CONF_FILE):
2414
      utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2415
  except EnvironmentError, err:
2416
    if err.errno != errno.ENOENT:
2417
      _Fail("Error while backing up cluster file: %s", err, exc=True)
2418
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2419

    
2420

    
2421
def _FindDisks(nodes_ip, disks):
2422
  """Sets the physical ID on disks and returns the block devices.
2423

2424
  """
2425
  # set the correct physical ID
2426
  my_name = utils.HostInfo().name
2427
  for cf in disks:
2428
    cf.SetPhysicalID(my_name, nodes_ip)
2429

    
2430
  bdevs = []
2431

    
2432
  for cf in disks:
2433
    rd = _RecursiveFindBD(cf)
2434
    if rd is None:
2435
      _Fail("Can't find device %s", cf)
2436
    bdevs.append(rd)
2437
  return bdevs
2438

    
2439

    
2440
def DrbdDisconnectNet(nodes_ip, disks):
2441
  """Disconnects the network on a list of drbd devices.
2442

2443
  """
2444
  bdevs = _FindDisks(nodes_ip, disks)
2445

    
2446
  # disconnect disks
2447
  for rd in bdevs:
2448
    try:
2449
      rd.DisconnectNet()
2450
    except errors.BlockDeviceError, err:
2451
      _Fail("Can't change network configuration to standalone mode: %s",
2452
            err, exc=True)
2453

    
2454

    
2455
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2456
  """Attaches the network on a list of drbd devices.
2457

2458
  """
2459
  bdevs = _FindDisks(nodes_ip, disks)
2460

    
2461
  if multimaster:
2462
    for idx, rd in enumerate(bdevs):
2463
      try:
2464
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2465
      except EnvironmentError, err:
2466
        _Fail("Can't create symlink: %s", err)
2467
  # reconnect disks, switch to new master configuration and if
2468
  # needed primary mode
2469
  for rd in bdevs:
2470
    try:
2471
      rd.AttachNet(multimaster)
2472
    except errors.BlockDeviceError, err:
2473
      _Fail("Can't change network configuration: %s", err)
2474
  # wait until the disks are connected; we need to retry the re-attach
2475
  # if the device becomes standalone, as this might happen if the one
2476
  # node disconnects and reconnects in a different mode before the
2477
  # other node reconnects; in this case, one or both of the nodes will
2478
  # decide it has wrong configuration and switch to standalone
2479
  RECONNECT_TIMEOUT = 2 * 60
2480
  sleep_time = 0.100 # start with 100 miliseconds
2481
  timeout_limit = time.time() + RECONNECT_TIMEOUT
2482
  while time.time() < timeout_limit:
2483
    all_connected = True
2484
    for rd in bdevs:
2485
      stats = rd.GetProcStatus()
2486
      if not (stats.is_connected or stats.is_in_resync):
2487
        all_connected = False
2488
      if stats.is_standalone:
2489
        # peer had different config info and this node became
2490
        # standalone, even though this should not happen with the
2491
        # new staged way of changing disk configs
2492
        try:
2493
          rd.AttachNet(multimaster)
2494
        except errors.BlockDeviceError, err:
2495
          _Fail("Can't change network configuration: %s", err)
2496
    if all_connected:
2497
      break
2498
    time.sleep(sleep_time)
2499
    sleep_time = min(5, sleep_time * 1.5)
2500
  if not all_connected:
2501
    _Fail("Timeout in disk reconnecting")
2502
  if multimaster:
2503
    # change to primary mode
2504
    for rd in bdevs:
2505
      try:
2506
        rd.Open()
2507
      except errors.BlockDeviceError, err:
2508
        _Fail("Can't change to primary mode: %s", err)
2509

    
2510

    
2511
def DrbdWaitSync(nodes_ip, disks):
2512
  """Wait until DRBDs have synchronized.
2513

2514
  """
2515
  bdevs = _FindDisks(nodes_ip, disks)
2516

    
2517
  min_resync = 100
2518
  alldone = True
2519
  for rd in bdevs:
2520
    stats = rd.GetProcStatus()
2521
    if not (stats.is_connected or stats.is_in_resync):
2522
      _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2523
    alldone = alldone and (not stats.is_in_resync)
2524
    if stats.sync_percent is not None:
2525
      min_resync = min(min_resync, stats.sync_percent)
2526

    
2527
  return (alldone, min_resync)
2528

    
2529

    
2530
def PowercycleNode(hypervisor_type):
2531
  """Hard-powercycle the node.
2532

2533
  Because we need to return first, and schedule the powercycle in the
2534
  background, we won't be able to report failures nicely.
2535

2536
  """
2537
  hyper = hypervisor.GetHypervisor(hypervisor_type)
2538
  try:
2539
    pid = os.fork()
2540
  except OSError:
2541
    # if we can't fork, we'll pretend that we're in the child process
2542
    pid = 0
2543
  if pid > 0:
2544
    return "Reboot scheduled in 5 seconds"
2545
  time.sleep(5)
2546
  hyper.PowercycleNode()
2547

    
2548

    
2549
class HooksRunner(object):
2550
  """Hook runner.
2551

2552
  This class is instantiated on the node side (ganeti-noded) and not
2553
  on the master side.
2554

2555
  """
2556
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2557

    
2558
  def __init__(self, hooks_base_dir=None):
2559
    """Constructor for hooks runner.
2560

2561
    @type hooks_base_dir: str or None
2562
    @param hooks_base_dir: if not None, this overrides the
2563
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
2564

2565
    """
2566
    if hooks_base_dir is None:
2567
      hooks_base_dir = constants.HOOKS_BASE_DIR
2568
    self._BASE_DIR = hooks_base_dir
2569

    
2570
  @staticmethod
2571
  def ExecHook(script, env):
2572
    """Exec one hook script.
2573

2574
    @type script: str
2575
    @param script: the full path to the script
2576
    @type env: dict
2577
    @param env: the environment with which to exec the script
2578
    @rtype: tuple (success, message)
2579
    @return: a tuple of success and message, where success
2580
        indicates the succes of the operation, and message
2581
        which will contain the error details in case we
2582
        failed
2583

2584
    """
2585
    # exec the process using subprocess and log the output
2586
    fdstdin = None
2587
    try:
2588
      fdstdin = open("/dev/null", "r")
2589
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2590
                               stderr=subprocess.STDOUT, close_fds=True,
2591
                               shell=False, cwd="/", env=env)
2592
      output = ""
2593
      try:
2594
        output = child.stdout.read(4096)
2595
        child.stdout.close()
2596
      except EnvironmentError, err:
2597
        output += "Hook script error: %s" % str(err)
2598

    
2599
      while True:
2600
        try:
2601
          result = child.wait()
2602
          break
2603
        except EnvironmentError, err:
2604
          if err.errno == errno.EINTR:
2605
            continue
2606
          raise
2607
    finally:
2608
      # try not to leak fds
2609
      for fd in (fdstdin, ):
2610
        if fd is not None:
2611
          try:
2612
            fd.close()
2613
          except EnvironmentError, err:
2614
            # just log the error
2615
            #logging.exception("Error while closing fd %s", fd)
2616
            pass
2617

    
2618
    return result == 0, utils.SafeEncode(output.strip())
2619

    
2620
  def RunHooks(self, hpath, phase, env):
2621
    """Run the scripts in the hooks directory.
2622

2623
    @type hpath: str
2624
    @param hpath: the path to the hooks directory which
2625
        holds the scripts
2626
    @type phase: str
2627
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
2628
        L{constants.HOOKS_PHASE_POST}
2629
    @type env: dict
2630
    @param env: dictionary with the environment for the hook
2631
    @rtype: list
2632
    @return: list of 3-element tuples:
2633
      - script path
2634
      - script result, either L{constants.HKR_SUCCESS} or
2635
        L{constants.HKR_FAIL}
2636
      - output of the script
2637

2638
    @raise errors.ProgrammerError: for invalid input
2639
        parameters
2640

2641
    """
2642
    if phase == constants.HOOKS_PHASE_PRE:
2643
      suffix = "pre"
2644
    elif phase == constants.HOOKS_PHASE_POST:
2645
      suffix = "post"
2646
    else:
2647
      _Fail("Unknown hooks phase '%s'", phase)
2648

    
2649
    rr = []
2650

    
2651
    subdir = "%s-%s.d" % (hpath, suffix)
2652
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2653
    try:
2654
      dir_contents = utils.ListVisibleFiles(dir_name)
2655
    except OSError:
2656
      # FIXME: must log output in case of failures
2657
      return rr
2658

    
2659
    # we use the standard python sort order,
2660
    # so 00name is the recommended naming scheme
2661
    dir_contents.sort()
2662
    for relname in dir_contents:
2663
      fname = os.path.join(dir_name, relname)
2664
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2665
          self.RE_MASK.match(relname) is not None):
2666
        rrval = constants.HKR_SKIP
2667
        output = ""
2668
      else:
2669
        result, output = self.ExecHook(fname, env)
2670
        if not result:
2671
          rrval = constants.HKR_FAIL
2672
        else:
2673
          rrval = constants.HKR_SUCCESS
2674
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
2675

    
2676
    return rr
2677

    
2678

    
2679
class IAllocatorRunner(object):
2680
  """IAllocator runner.
2681

2682
  This class is instantiated on the node side (ganeti-noded) and not on
2683
  the master side.
2684

2685
  """
2686
  def Run(self, name, idata):
2687
    """Run an iallocator script.
2688

2689
    @type name: str
2690
    @param name: the iallocator script name
2691
    @type idata: str
2692
    @param idata: the allocator input data
2693

2694
    @rtype: tuple
2695
    @return: two element tuple of:
2696
       - status
2697
       - either error message or stdout of allocator (for success)
2698

2699
    """
2700
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2701
                                  os.path.isfile)
2702
    if alloc_script is None:
2703
      _Fail("iallocator module '%s' not found in the search path", name)
2704

    
2705
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2706
    try:
2707
      os.write(fd, idata)
2708
      os.close(fd)
2709
      result = utils.RunCmd([alloc_script, fin_name])
2710
      if result.failed:
2711
        _Fail("iallocator module '%s' failed: %s, output '%s'",
2712
              name, result.fail_reason, result.output)
2713
    finally:
2714
      os.unlink(fin_name)
2715

    
2716
    return result.stdout
2717

    
2718

    
2719
class DevCacheManager(object):
2720
  """Simple class for managing a cache of block device information.
2721

2722
  """
2723
  _DEV_PREFIX = "/dev/"
2724
  _ROOT_DIR = constants.BDEV_CACHE_DIR
2725

    
2726
  @classmethod
2727
  def _ConvertPath(cls, dev_path):
2728
    """Converts a /dev/name path to the cache file name.
2729

2730
    This replaces slashes with underscores and strips the /dev
2731
    prefix. It then returns the full path to the cache file.
2732

2733
    @type dev_path: str
2734
    @param dev_path: the C{/dev/} path name
2735
    @rtype: str
2736
    @return: the converted path name
2737

2738
    """
2739
    if dev_path.startswith(cls._DEV_PREFIX):
2740
      dev_path = dev_path[len(cls._DEV_PREFIX):]
2741
    dev_path = dev_path.replace("/", "_")
2742
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2743
    return fpath
2744

    
2745
  @classmethod
2746
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2747
    """Updates the cache information for a given device.
2748

2749
    @type dev_path: str
2750
    @param dev_path: the pathname of the device
2751
    @type owner: str
2752
    @param owner: the owner (instance name) of the device
2753
    @type on_primary: bool
2754
    @param on_primary: whether this is the primary
2755
        node nor not
2756
    @type iv_name: str
2757
    @param iv_name: the instance-visible name of the
2758
        device, as in objects.Disk.iv_name
2759

2760
    @rtype: None
2761

2762
    """
2763
    if dev_path is None:
2764
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
2765
      return
2766
    fpath = cls._ConvertPath(dev_path)
2767
    if on_primary:
2768
      state = "primary"
2769
    else:
2770
      state = "secondary"
2771
    if iv_name is None:
2772
      iv_name = "not_visible"
2773
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2774
    try:
2775
      utils.WriteFile(fpath, data=fdata)
2776
    except EnvironmentError, err:
2777
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
2778

    
2779
  @classmethod
2780
  def RemoveCache(cls, dev_path):
2781
    """Remove data for a dev_path.
2782

2783
    This is just a wrapper over L{utils.RemoveFile} with a converted
2784
    path name and logging.
2785

2786
    @type dev_path: str
2787
    @param dev_path: the pathname of the device
2788

2789
    @rtype: None
2790

2791
    """
2792
    if dev_path is None:
2793
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
2794
      return
2795
    fpath = cls._ConvertPath(dev_path)
2796
    try:
2797
      utils.RemoveFile(fpath)
2798
    except EnvironmentError, err:
2799
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)