Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 6d2e83d5

History | View | Annotate | Download (75.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon"""
23

    
24

    
25
import os
26
import os.path
27
import shutil
28
import time
29
import stat
30
import errno
31
import re
32
import subprocess
33
import random
34
import logging
35
import tempfile
36
import zlib
37
import base64
38

    
39
from ganeti import errors
40
from ganeti import utils
41
from ganeti import ssh
42
from ganeti import hypervisor
43
from ganeti import constants
44
from ganeti import bdev
45
from ganeti import objects
46
from ganeti import ssconf
47

    
48

    
49
def _GetConfig():
50
  """Simple wrapper to return a SimpleStore.
51

52
  @rtype: L{ssconf.SimpleStore}
53
  @return: a SimpleStore instance
54

55
  """
56
  return ssconf.SimpleStore()
57

    
58

    
59
def _GetSshRunner(cluster_name):
60
  """Simple wrapper to return an SshRunner.
61

62
  @type cluster_name: str
63
  @param cluster_name: the cluster name, which is needed
64
      by the SshRunner constructor
65
  @rtype: L{ssh.SshRunner}
66
  @return: an SshRunner instance
67

68
  """
69
  return ssh.SshRunner(cluster_name)
70

    
71

    
72
def _Decompress(data):
73
  """Unpacks data compressed by the RPC client.
74

75
  @type data: list or tuple
76
  @param data: Data sent by RPC client
77
  @rtype: str
78
  @return: Decompressed data
79

80
  """
81
  assert isinstance(data, (list, tuple))
82
  assert len(data) == 2
83
  (encoding, content) = data
84
  if encoding == constants.RPC_ENCODING_NONE:
85
    return content
86
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
87
    return zlib.decompress(base64.b64decode(content))
88
  else:
89
    raise AssertionError("Unknown data encoding")
90

    
91

    
92
def _CleanDirectory(path, exclude=None):
93
  """Removes all regular files in a directory.
94

95
  @type path: str
96
  @param path: the directory to clean
97
  @type exclude: list
98
  @param exclude: list of files to be excluded, defaults
99
      to the empty list
100

101
  """
102
  if not os.path.isdir(path):
103
    return
104
  if exclude is None:
105
    exclude = []
106
  else:
107
    # Normalize excluded paths
108
    exclude = [os.path.normpath(i) for i in exclude]
109

    
110
  for rel_name in utils.ListVisibleFiles(path):
111
    full_name = os.path.normpath(os.path.join(path, rel_name))
112
    if full_name in exclude:
113
      continue
114
    if os.path.isfile(full_name) and not os.path.islink(full_name):
115
      utils.RemoveFile(full_name)
116

    
117

    
118
def JobQueuePurge():
119
  """Removes job queue files and archived jobs.
120

121
  @rtype: None
122

123
  """
124
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
125
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
126

    
127

    
128
def GetMasterInfo():
129
  """Returns master information.
130

131
  This is an utility function to compute master information, either
132
  for consumption here or from the node daemon.
133

134
  @rtype: tuple
135
  @return: (master_netdev, master_ip, master_name) if we have a good
136
      configuration, otherwise (None, None, None)
137

138
  """
139
  try:
140
    cfg = _GetConfig()
141
    master_netdev = cfg.GetMasterNetdev()
142
    master_ip = cfg.GetMasterIP()
143
    master_node = cfg.GetMasterNode()
144
  except errors.ConfigurationError, err:
145
    logging.exception("Cluster configuration incomplete")
146
    return (None, None, None)
147
  return (master_netdev, master_ip, master_node)
148

    
149

    
150
def StartMaster(start_daemons):
151
  """Activate local node as master node.
152

153
  The function will always try activate the IP address of the master
154
  (unless someone else has it). It will also start the master daemons,
155
  based on the start_daemons parameter.
156

157
  @type start_daemons: boolean
158
  @param start_daemons: whther to also start the master
159
      daemons (ganeti-masterd and ganeti-rapi)
160
  @rtype: None
161

162
  """
163
  ok = True
164
  master_netdev, master_ip, _ = GetMasterInfo()
165
  if not master_netdev:
166
    return False
167

    
168
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
169
    if utils.OwnIpAddress(master_ip):
170
      # we already have the ip:
171
      logging.debug("Already started")
172
    else:
173
      logging.error("Someone else has the master ip, not activating")
174
      ok = False
175
  else:
176
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
177
                           "dev", master_netdev, "label",
178
                           "%s:0" % master_netdev])
179
    if result.failed:
180
      logging.error("Can't activate master IP: %s", result.output)
181
      ok = False
182

    
183
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
184
                           "-s", master_ip, master_ip])
185
    # we'll ignore the exit code of arping
186

    
187
  # and now start the master and rapi daemons
188
  if start_daemons:
189
    for daemon in 'ganeti-masterd', 'ganeti-rapi':
190
      result = utils.RunCmd([daemon])
191
      if result.failed:
192
        logging.error("Can't start daemon %s: %s", daemon, result.output)
193
        ok = False
194
  return ok
195

    
196

    
197
def StopMaster(stop_daemons):
198
  """Deactivate this node as master.
199

200
  The function will always try to deactivate the IP address of the
201
  master. It will also stop the master daemons depending on the
202
  stop_daemons parameter.
203

204
  @type stop_daemons: boolean
205
  @param stop_daemons: whether to also stop the master daemons
206
      (ganeti-masterd and ganeti-rapi)
207
  @rtype: None
208

209
  """
210
  master_netdev, master_ip, _ = GetMasterInfo()
211
  if not master_netdev:
212
    return False
213

    
214
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
215
                         "dev", master_netdev])
216
  if result.failed:
217
    logging.error("Can't remove the master IP, error: %s", result.output)
218
    # but otherwise ignore the failure
219

    
220
  if stop_daemons:
221
    # stop/kill the rapi and the master daemon
222
    for daemon in constants.RAPI_PID, constants.MASTERD_PID:
223
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
224

    
225
  return True
226

    
227

    
228
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
229
  """Joins this node to the cluster.
230

231
  This does the following:
232
      - updates the hostkeys of the machine (rsa and dsa)
233
      - adds the ssh private key to the user
234
      - adds the ssh public key to the users' authorized_keys file
235

236
  @type dsa: str
237
  @param dsa: the DSA private key to write
238
  @type dsapub: str
239
  @param dsapub: the DSA public key to write
240
  @type rsa: str
241
  @param rsa: the RSA private key to write
242
  @type rsapub: str
243
  @param rsapub: the RSA public key to write
244
  @type sshkey: str
245
  @param sshkey: the SSH private key to write
246
  @type sshpub: str
247
  @param sshpub: the SSH public key to write
248
  @rtype: boolean
249
  @return: the success of the operation
250

251
  """
252
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
253
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
254
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
255
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
256
  for name, content, mode in sshd_keys:
257
    utils.WriteFile(name, data=content, mode=mode)
258

    
259
  try:
260
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
261
                                                    mkdir=True)
262
  except errors.OpExecError, err:
263
    logging.exception("Error while processing user ssh files")
264
    return False
265

    
266
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
267
    utils.WriteFile(name, data=content, mode=0600)
268

    
269
  utils.AddAuthorizedKey(auth_keys, sshpub)
270

    
271
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
272

    
273
  return True
274

    
275

    
276
def LeaveCluster():
277
  """Cleans up and remove the current node.
278

279
  This function cleans up and prepares the current node to be removed
280
  from the cluster.
281

282
  If processing is successful, then it raises an
283
  L{errors.QuitGanetiException} which is used as a special case to
284
  shutdown the node daemon.
285

286
  """
287
  _CleanDirectory(constants.DATA_DIR)
288
  JobQueuePurge()
289

    
290
  try:
291
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
292
  except errors.OpExecError:
293
    logging.exception("Error while processing ssh files")
294
    return
295

    
296
  f = open(pub_key, 'r')
297
  try:
298
    utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
299
  finally:
300
    f.close()
301

    
302
  utils.RemoveFile(priv_key)
303
  utils.RemoveFile(pub_key)
304

    
305
  # Return a reassuring string to the caller, and quit
306
  raise errors.QuitGanetiException(False, 'Shutdown scheduled')
307

    
308

    
309
def GetNodeInfo(vgname, hypervisor_type):
310
  """Gives back a hash with different informations about the node.
311

312
  @type vgname: C{string}
313
  @param vgname: the name of the volume group to ask for disk space information
314
  @type hypervisor_type: C{str}
315
  @param hypervisor_type: the name of the hypervisor to ask for
316
      memory information
317
  @rtype: C{dict}
318
  @return: dictionary with the following keys:
319
      - vg_size is the size of the configured volume group in MiB
320
      - vg_free is the free size of the volume group in MiB
321
      - memory_dom0 is the memory allocated for domain0 in MiB
322
      - memory_free is the currently available (free) ram in MiB
323
      - memory_total is the total number of ram in MiB
324

325
  """
326
  outputarray = {}
327
  vginfo = _GetVGInfo(vgname)
328
  outputarray['vg_size'] = vginfo['vg_size']
329
  outputarray['vg_free'] = vginfo['vg_free']
330

    
331
  hyper = hypervisor.GetHypervisor(hypervisor_type)
332
  hyp_info = hyper.GetNodeInfo()
333
  if hyp_info is not None:
334
    outputarray.update(hyp_info)
335

    
336
  f = open("/proc/sys/kernel/random/boot_id", 'r')
337
  try:
338
    outputarray["bootid"] = f.read(128).rstrip("\n")
339
  finally:
340
    f.close()
341

    
342
  return outputarray
343

    
344

    
345
def VerifyNode(what, cluster_name):
346
  """Verify the status of the local node.
347

348
  Based on the input L{what} parameter, various checks are done on the
349
  local node.
350

351
  If the I{filelist} key is present, this list of
352
  files is checksummed and the file/checksum pairs are returned.
353

354
  If the I{nodelist} key is present, we check that we have
355
  connectivity via ssh with the target nodes (and check the hostname
356
  report).
357

358
  If the I{node-net-test} key is present, we check that we have
359
  connectivity to the given nodes via both primary IP and, if
360
  applicable, secondary IPs.
361

362
  @type what: C{dict}
363
  @param what: a dictionary of things to check:
364
      - filelist: list of files for which to compute checksums
365
      - nodelist: list of nodes we should check ssh communication with
366
      - node-net-test: list of nodes we should check node daemon port
367
        connectivity with
368
      - hypervisor: list with hypervisors to run the verify for
369
  @rtype: dict
370
  @return: a dictionary with the same keys as the input dict, and
371
      values representing the result of the checks
372

373
  """
374
  result = {}
375

    
376
  if constants.NV_HYPERVISOR in what:
377
    result[constants.NV_HYPERVISOR] = tmp = {}
378
    for hv_name in what[constants.NV_HYPERVISOR]:
379
      tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
380

    
381
  if constants.NV_FILELIST in what:
382
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
383
      what[constants.NV_FILELIST])
384

    
385
  if constants.NV_NODELIST in what:
386
    result[constants.NV_NODELIST] = tmp = {}
387
    random.shuffle(what[constants.NV_NODELIST])
388
    for node in what[constants.NV_NODELIST]:
389
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
390
      if not success:
391
        tmp[node] = message
392

    
393
  if constants.NV_NODENETTEST in what:
394
    result[constants.NV_NODENETTEST] = tmp = {}
395
    my_name = utils.HostInfo().name
396
    my_pip = my_sip = None
397
    for name, pip, sip in what[constants.NV_NODENETTEST]:
398
      if name == my_name:
399
        my_pip = pip
400
        my_sip = sip
401
        break
402
    if not my_pip:
403
      tmp[my_name] = ("Can't find my own primary/secondary IP"
404
                      " in the node list")
405
    else:
406
      port = utils.GetNodeDaemonPort()
407
      for name, pip, sip in what[constants.NV_NODENETTEST]:
408
        fail = []
409
        if not utils.TcpPing(pip, port, source=my_pip):
410
          fail.append("primary")
411
        if sip != pip:
412
          if not utils.TcpPing(sip, port, source=my_sip):
413
            fail.append("secondary")
414
        if fail:
415
          tmp[name] = ("failure using the %s interface(s)" %
416
                       " and ".join(fail))
417

    
418
  if constants.NV_LVLIST in what:
419
    result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
420

    
421
  if constants.NV_INSTANCELIST in what:
422
    result[constants.NV_INSTANCELIST] = GetInstanceList(
423
      what[constants.NV_INSTANCELIST])
424

    
425
  if constants.NV_VGLIST in what:
426
    result[constants.NV_VGLIST] = ListVolumeGroups()
427

    
428
  if constants.NV_VERSION in what:
429
    result[constants.NV_VERSION] = constants.PROTOCOL_VERSION
430

    
431
  if constants.NV_HVINFO in what:
432
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
433
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
434

    
435
  if constants.NV_DRBDLIST in what:
436
    try:
437
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
438
    except errors.BlockDeviceErrors:
439
      logging.warning("Can't get used minors list", exc_info=True)
440
      used_minors = []
441
    result[constants.NV_DRBDLIST] = used_minors
442

    
443
  return result
444

    
445

    
446
def GetVolumeList(vg_name):
447
  """Compute list of logical volumes and their size.
448

449
  @type vg_name: str
450
  @param vg_name: the volume group whose LVs we should list
451
  @rtype: dict
452
  @return:
453
      dictionary of all partions (key) with value being a tuple of
454
      their size (in MiB), inactive and online status::
455

456
        {'test1': ('20.06', True, True)}
457

458
      in case of errors, a string is returned with the error
459
      details.
460

461
  """
462
  lvs = {}
463
  sep = '|'
464
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
465
                         "--separator=%s" % sep,
466
                         "-olv_name,lv_size,lv_attr", vg_name])
467
  if result.failed:
468
    logging.error("Failed to list logical volumes, lvs output: %s",
469
                  result.output)
470
    return result.output
471

    
472
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
473
  for line in result.stdout.splitlines():
474
    line = line.strip()
475
    match = valid_line_re.match(line)
476
    if not match:
477
      logging.error("Invalid line returned from lvs output: '%s'", line)
478
      continue
479
    name, size, attr = match.groups()
480
    inactive = attr[4] == '-'
481
    online = attr[5] == 'o'
482
    lvs[name] = (size, inactive, online)
483

    
484
  return lvs
485

    
486

    
487
def ListVolumeGroups():
488
  """List the volume groups and their size.
489

490
  @rtype: dict
491
  @return: dictionary with keys volume name and values the
492
      size of the volume
493

494
  """
495
  return utils.ListVolumeGroups()
496

    
497

    
498
def NodeVolumes():
499
  """List all volumes on this node.
500

501
  @rtype: list
502
  @return:
503
    A list of dictionaries, each having four keys:
504
      - name: the logical volume name,
505
      - size: the size of the logical volume
506
      - dev: the physical device on which the LV lives
507
      - vg: the volume group to which it belongs
508

509
    In case of errors, we return an empty list and log the
510
    error.
511

512
    Note that since a logical volume can live on multiple physical
513
    volumes, the resulting list might include a logical volume
514
    multiple times.
515

516
  """
517
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
518
                         "--separator=|",
519
                         "--options=lv_name,lv_size,devices,vg_name"])
520
  if result.failed:
521
    logging.error("Failed to list logical volumes, lvs output: %s",
522
                  result.output)
523
    return []
524

    
525
  def parse_dev(dev):
526
    if '(' in dev:
527
      return dev.split('(')[0]
528
    else:
529
      return dev
530

    
531
  def map_line(line):
532
    return {
533
      'name': line[0].strip(),
534
      'size': line[1].strip(),
535
      'dev': parse_dev(line[2].strip()),
536
      'vg': line[3].strip(),
537
    }
538

    
539
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
540
          if line.count('|') >= 3]
541

    
542

    
543
def BridgesExist(bridges_list):
544
  """Check if a list of bridges exist on the current node.
545

546
  @rtype: boolean
547
  @return: C{True} if all of them exist, C{False} otherwise
548

549
  """
550
  for bridge in bridges_list:
551
    if not utils.BridgeExists(bridge):
552
      return False
553

    
554
  return True
555

    
556

    
557
def GetInstanceList(hypervisor_list):
558
  """Provides a list of instances.
559

560
  @type hypervisor_list: list
561
  @param hypervisor_list: the list of hypervisors to query information
562

563
  @rtype: list
564
  @return: a list of all running instances on the current node
565
    - instance1.example.com
566
    - instance2.example.com
567

568
  """
569
  results = []
570
  for hname in hypervisor_list:
571
    try:
572
      names = hypervisor.GetHypervisor(hname).ListInstances()
573
      results.extend(names)
574
    except errors.HypervisorError, err:
575
      logging.exception("Error enumerating instances for hypevisor %s", hname)
576
      # FIXME: should we somehow not propagate this to the master?
577
      raise
578

    
579
  return results
580

    
581

    
582
def GetInstanceInfo(instance, hname):
583
  """Gives back the informations about an instance as a dictionary.
584

585
  @type instance: string
586
  @param instance: the instance name
587
  @type hname: string
588
  @param hname: the hypervisor type of the instance
589

590
  @rtype: dict
591
  @return: dictionary with the following keys:
592
      - memory: memory size of instance (int)
593
      - state: xen state of instance (string)
594
      - time: cpu time of instance (float)
595

596
  """
597
  output = {}
598

    
599
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
600
  if iinfo is not None:
601
    output['memory'] = iinfo[2]
602
    output['state'] = iinfo[4]
603
    output['time'] = iinfo[5]
604

    
605
  return output
606

    
607

    
608
def GetInstanceMigratable(instance):
609
  """Gives whether an instance can be migrated.
610

611
  @type instance: L{objects.Instance}
612
  @param instance: object representing the instance to be checked.
613

614
  @rtype: tuple
615
  @return: tuple of (result, description) where:
616
      - result: whether the instance can be migrated or not
617
      - description: a description of the issue, if relevant
618

619
  """
620
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
621
  if instance.name not in hyper.ListInstances():
622
    return (False, 'not running')
623

    
624
  for idx in range(len(instance.disks)):
625
    link_name = _GetBlockDevSymlinkPath(instance.name, idx)
626
    if not os.path.islink(link_name):
627
      return (False, 'not restarted since ganeti 1.2.5')
628

    
629
  return (True, '')
630

    
631

    
632
def GetAllInstancesInfo(hypervisor_list):
633
  """Gather data about all instances.
634

635
  This is the equivalent of L{GetInstanceInfo}, except that it
636
  computes data for all instances at once, thus being faster if one
637
  needs data about more than one instance.
638

639
  @type hypervisor_list: list
640
  @param hypervisor_list: list of hypervisors to query for instance data
641

642
  @rtype: dict
643
  @return: dictionary of instance: data, with data having the following keys:
644
      - memory: memory size of instance (int)
645
      - state: xen state of instance (string)
646
      - time: cpu time of instance (float)
647
      - vcpus: the number of vcpus
648

649
  """
650
  output = {}
651

    
652
  for hname in hypervisor_list:
653
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
654
    if iinfo:
655
      for name, inst_id, memory, vcpus, state, times in iinfo:
656
        value = {
657
          'memory': memory,
658
          'vcpus': vcpus,
659
          'state': state,
660
          'time': times,
661
          }
662
        if name in output and output[name] != value:
663
          raise errors.HypervisorError("Instance %s running duplicate"
664
                                       " with different parameters" % name)
665
        output[name] = value
666

    
667
  return output
668

    
669

    
670
def AddOSToInstance(instance):
671
  """Add an OS to an instance.
672

673
  @type instance: L{objects.Instance}
674
  @param instance: Instance whose OS is to be installed
675
  @rtype: boolean
676
  @return: the success of the operation
677

678
  """
679
  inst_os = OSFromDisk(instance.os)
680

    
681
  create_env = OSEnvironment(instance)
682

    
683
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
684
                                     instance.name, int(time.time()))
685

    
686
  result = utils.RunCmd([inst_os.create_script], env=create_env,
687
                        cwd=inst_os.path, output=logfile,)
688
  if result.failed:
689
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
690
                  " output: %s", result.cmd, result.fail_reason, logfile,
691
                  result.output)
692
    lines = [val.encode("string_escape")
693
             for val in utils.TailFile(logfile, lines=20)]
694
    return (False, "OS create script failed (%s), last lines in the"
695
            " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
696

    
697
  return (True, "Successfully installed")
698

    
699

    
700
def RunRenameInstance(instance, old_name):
701
  """Run the OS rename script for an instance.
702

703
  @type instance: L{objects.Instance}
704
  @param instance: Instance whose OS is to be installed
705
  @type old_name: string
706
  @param old_name: previous instance name
707
  @rtype: boolean
708
  @return: the success of the operation
709

710
  """
711
  inst_os = OSFromDisk(instance.os)
712

    
713
  rename_env = OSEnvironment(instance)
714
  rename_env['OLD_INSTANCE_NAME'] = old_name
715

    
716
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
717
                                           old_name,
718
                                           instance.name, int(time.time()))
719

    
720
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
721
                        cwd=inst_os.path, output=logfile)
722

    
723
  if result.failed:
724
    logging.error("os create command '%s' returned error: %s output: %s",
725
                  result.cmd, result.fail_reason, result.output)
726
    return False
727

    
728
  return True
729

    
730

    
731
def _GetVGInfo(vg_name):
732
  """Get informations about the volume group.
733

734
  @type vg_name: str
735
  @param vg_name: the volume group which we query
736
  @rtype: dict
737
  @return:
738
    A dictionary with the following keys:
739
      - C{vg_size} is the total size of the volume group in MiB
740
      - C{vg_free} is the free size of the volume group in MiB
741
      - C{pv_count} are the number of physical disks in that VG
742

743
    If an error occurs during gathering of data, we return the same dict
744
    with keys all set to None.
745

746
  """
747
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
748

    
749
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
750
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
751

    
752
  if retval.failed:
753
    logging.error("volume group %s not present", vg_name)
754
    return retdic
755
  valarr = retval.stdout.strip().rstrip(':').split(':')
756
  if len(valarr) == 3:
757
    try:
758
      retdic = {
759
        "vg_size": int(round(float(valarr[0]), 0)),
760
        "vg_free": int(round(float(valarr[1]), 0)),
761
        "pv_count": int(valarr[2]),
762
        }
763
    except ValueError, err:
764
      logging.exception("Fail to parse vgs output")
765
  else:
766
    logging.error("vgs output has the wrong number of fields (expected"
767
                  " three): %s", str(valarr))
768
  return retdic
769

    
770

    
771
def _GetBlockDevSymlinkPath(instance_name, idx):
772
  return os.path.join(constants.DISK_LINKS_DIR,
773
                      "%s:%d" % (instance_name, idx))
774

    
775

    
776
def _SymlinkBlockDev(instance_name, device_path, idx):
777
  """Set up symlinks to a instance's block device.
778

779
  This is an auxiliary function run when an instance is start (on the primary
780
  node) or when an instance is migrated (on the target node).
781

782

783
  @param instance_name: the name of the target instance
784
  @param device_path: path of the physical block device, on the node
785
  @param idx: the disk index
786
  @return: absolute path to the disk's symlink
787

788
  """
789
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
790
  try:
791
    os.symlink(device_path, link_name)
792
  except OSError, err:
793
    if err.errno == errno.EEXIST:
794
      if (not os.path.islink(link_name) or
795
          os.readlink(link_name) != device_path):
796
        os.remove(link_name)
797
        os.symlink(device_path, link_name)
798
    else:
799
      raise
800

    
801
  return link_name
802

    
803

    
804
def _RemoveBlockDevLinks(instance_name, disks):
805
  """Remove the block device symlinks belonging to the given instance.
806

807
  """
808
  for idx, disk in enumerate(disks):
809
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
810
    if os.path.islink(link_name):
811
      try:
812
        os.remove(link_name)
813
      except OSError:
814
        logging.exception("Can't remove symlink '%s'", link_name)
815

    
816

    
817
def _GatherAndLinkBlockDevs(instance):
818
  """Set up an instance's block device(s).
819

820
  This is run on the primary node at instance startup. The block
821
  devices must be already assembled.
822

823
  @type instance: L{objects.Instance}
824
  @param instance: the instance whose disks we shoul assemble
825
  @rtype: list
826
  @return: list of (disk_object, device_path)
827

828
  """
829
  block_devices = []
830
  for idx, disk in enumerate(instance.disks):
831
    device = _RecursiveFindBD(disk)
832
    if device is None:
833
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
834
                                    str(disk))
835
    device.Open()
836
    try:
837
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
838
    except OSError, e:
839
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
840
                                    e.strerror)
841

    
842
    block_devices.append((disk, link_name))
843

    
844
  return block_devices
845

    
846

    
847
def StartInstance(instance, extra_args):
848
  """Start an instance.
849

850
  @type instance: L{objects.Instance}
851
  @param instance: the instance object
852
  @rtype: boolean
853
  @return: whether the startup was successful or not
854

855
  """
856
  running_instances = GetInstanceList([instance.hypervisor])
857

    
858
  if instance.name in running_instances:
859
    return (True, "Already running")
860

    
861
  try:
862
    block_devices = _GatherAndLinkBlockDevs(instance)
863
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
864
    hyper.StartInstance(instance, block_devices, extra_args)
865
  except errors.BlockDeviceError, err:
866
    logging.exception("Failed to start instance")
867
    return (False, "Block device error: %s" % str(err))
868
  except errors.HypervisorError, err:
869
    logging.exception("Failed to start instance")
870
    _RemoveBlockDevLinks(instance.name, instance.disks)
871
    return (False, "Hypervisor error: %s" % str(err))
872

    
873
  return (True, "Instance started successfully")
874

    
875

    
876
def ShutdownInstance(instance):
877
  """Shut an instance down.
878

879
  @note: this functions uses polling with a hardcoded timeout.
880

881
  @type instance: L{objects.Instance}
882
  @param instance: the instance object
883
  @rtype: boolean
884
  @return: whether the startup was successful or not
885

886
  """
887
  hv_name = instance.hypervisor
888
  running_instances = GetInstanceList([hv_name])
889

    
890
  if instance.name not in running_instances:
891
    return True
892

    
893
  hyper = hypervisor.GetHypervisor(hv_name)
894
  try:
895
    hyper.StopInstance(instance)
896
  except errors.HypervisorError, err:
897
    logging.error("Failed to stop instance")
898
    return False
899

    
900
  # test every 10secs for 2min
901

    
902
  time.sleep(1)
903
  for dummy in range(11):
904
    if instance.name not in GetInstanceList([hv_name]):
905
      break
906
    time.sleep(10)
907
  else:
908
    # the shutdown did not succeed
909
    logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
910

    
911
    try:
912
      hyper.StopInstance(instance, force=True)
913
    except errors.HypervisorError, err:
914
      logging.exception("Failed to stop instance")
915
      return False
916

    
917
    time.sleep(1)
918
    if instance.name in GetInstanceList([hv_name]):
919
      logging.error("could not shutdown instance '%s' even by destroy",
920
                    instance.name)
921
      return False
922

    
923
  _RemoveBlockDevLinks(instance.name, instance.disks)
924

    
925
  return True
926

    
927

    
928
def RebootInstance(instance, reboot_type, extra_args):
929
  """Reboot an instance.
930

931
  @type instance: L{objects.Instance}
932
  @param instance: the instance object to reboot
933
  @type reboot_type: str
934
  @param reboot_type: the type of reboot, one the following
935
    constants:
936
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
937
        instance OS, do not recreate the VM
938
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
939
        restart the VM (at the hypervisor level)
940
      - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
941
        is not accepted here, since that mode is handled
942
        differently
943
  @rtype: boolean
944
  @return: the success of the operation
945

946
  """
947
  running_instances = GetInstanceList([instance.hypervisor])
948

    
949
  if instance.name not in running_instances:
950
    logging.error("Cannot reboot instance that is not running")
951
    return False
952

    
953
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
954
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
955
    try:
956
      hyper.RebootInstance(instance)
957
    except errors.HypervisorError, err:
958
      logging.exception("Failed to soft reboot instance")
959
      return False
960
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
961
    try:
962
      ShutdownInstance(instance)
963
      StartInstance(instance, extra_args)
964
    except errors.HypervisorError, err:
965
      logging.exception("Failed to hard reboot instance")
966
      return False
967
  else:
968
    raise errors.ParameterError("reboot_type invalid")
969

    
970
  return True
971

    
972

    
973
def MigrateInstance(instance, target, live):
974
  """Migrates an instance to another node.
975

976
  @type instance: L{objects.Instance}
977
  @param instance: the instance definition
978
  @type target: string
979
  @param target: the target node name
980
  @type live: boolean
981
  @param live: whether the migration should be done live or not (the
982
      interpretation of this parameter is left to the hypervisor)
983
  @rtype: tuple
984
  @return: a tuple of (success, msg) where:
985
      - succes is a boolean denoting the success/failure of the operation
986
      - msg is a string with details in case of failure
987

988
  """
989
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
990

    
991
  try:
992
    hyper.MigrateInstance(instance.name, target, live)
993
  except errors.HypervisorError, err:
994
    msg = "Failed to migrate instance"
995
    logging.exception(msg)
996
    return (False, "%s: %s" % (msg, err))
997
  return (True, "Migration successfull")
998

    
999

    
1000
def CreateBlockDevice(disk, size, owner, on_primary, info):
1001
  """Creates a block device for an instance.
1002

1003
  @type disk: L{objects.Disk}
1004
  @param disk: the object describing the disk we should create
1005
  @type size: int
1006
  @param size: the size of the physical underlying device, in MiB
1007
  @type owner: str
1008
  @param owner: the name of the instance for which disk is created,
1009
      used for device cache data
1010
  @type on_primary: boolean
1011
  @param on_primary:  indicates if it is the primary node or not
1012
  @type info: string
1013
  @param info: string that will be sent to the physical device
1014
      creation, used for example to set (LVM) tags on LVs
1015

1016
  @return: the new unique_id of the device (this can sometime be
1017
      computed only after creation), or None. On secondary nodes,
1018
      it's not required to return anything.
1019

1020
  """
1021
  clist = []
1022
  if disk.children:
1023
    for child in disk.children:
1024
      crdev = _RecursiveAssembleBD(child, owner, on_primary)
1025
      if on_primary or disk.AssembleOnSecondary():
1026
        # we need the children open in case the device itself has to
1027
        # be assembled
1028
        crdev.Open()
1029
      clist.append(crdev)
1030

    
1031
  try:
1032
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, size)
1033
  except errors.GenericError, err:
1034
    return False, "Can't create block device: %s" % str(err)
1035

    
1036
  if on_primary or disk.AssembleOnSecondary():
1037
    if not device.Assemble():
1038
      errorstring = "Can't assemble device after creation, very unusual event"
1039
      logging.error(errorstring)
1040
      return False, errorstring
1041
    device.SetSyncSpeed(constants.SYNC_SPEED)
1042
    if on_primary or disk.OpenOnSecondary():
1043
      device.Open(force=True)
1044
    DevCacheManager.UpdateCache(device.dev_path, owner,
1045
                                on_primary, disk.iv_name)
1046

    
1047
  device.SetInfo(info)
1048

    
1049
  physical_id = device.unique_id
1050
  return True, physical_id
1051

    
1052

    
1053
def RemoveBlockDevice(disk):
1054
  """Remove a block device.
1055

1056
  @note: This is intended to be called recursively.
1057

1058
  @type disk: L{objects.Disk}
1059
  @param disk: the disk object we should remove
1060
  @rtype: boolean
1061
  @return: the success of the operation
1062

1063
  """
1064
  try:
1065
    rdev = _RecursiveFindBD(disk)
1066
  except errors.BlockDeviceError, err:
1067
    # probably can't attach
1068
    logging.info("Can't attach to device %s in remove", disk)
1069
    rdev = None
1070
  if rdev is not None:
1071
    r_path = rdev.dev_path
1072
    result = rdev.Remove()
1073
    if result:
1074
      DevCacheManager.RemoveCache(r_path)
1075
  else:
1076
    result = True
1077
  if disk.children:
1078
    for child in disk.children:
1079
      result = result and RemoveBlockDevice(child)
1080
  return result
1081

    
1082

    
1083
def _RecursiveAssembleBD(disk, owner, as_primary):
1084
  """Activate a block device for an instance.
1085

1086
  This is run on the primary and secondary nodes for an instance.
1087

1088
  @note: this function is called recursively.
1089

1090
  @type disk: L{objects.Disk}
1091
  @param disk: the disk we try to assemble
1092
  @type owner: str
1093
  @param owner: the name of the instance which owns the disk
1094
  @type as_primary: boolean
1095
  @param as_primary: if we should make the block device
1096
      read/write
1097

1098
  @return: the assembled device or None (in case no device
1099
      was assembled)
1100
  @raise errors.BlockDeviceError: in case there is an error
1101
      during the activation of the children or the device
1102
      itself
1103

1104
  """
1105
  children = []
1106
  if disk.children:
1107
    mcn = disk.ChildrenNeeded()
1108
    if mcn == -1:
1109
      mcn = 0 # max number of Nones allowed
1110
    else:
1111
      mcn = len(disk.children) - mcn # max number of Nones
1112
    for chld_disk in disk.children:
1113
      try:
1114
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1115
      except errors.BlockDeviceError, err:
1116
        if children.count(None) >= mcn:
1117
          raise
1118
        cdev = None
1119
        logging.debug("Error in child activation: %s", str(err))
1120
      children.append(cdev)
1121

    
1122
  if as_primary or disk.AssembleOnSecondary():
1123
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children)
1124
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1125
    result = r_dev
1126
    if as_primary or disk.OpenOnSecondary():
1127
      r_dev.Open()
1128
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1129
                                as_primary, disk.iv_name)
1130

    
1131
  else:
1132
    result = True
1133
  return result
1134

    
1135

    
1136
def AssembleBlockDevice(disk, owner, as_primary):
1137
  """Activate a block device for an instance.
1138

1139
  This is a wrapper over _RecursiveAssembleBD.
1140

1141
  @rtype: str or boolean
1142
  @return: a C{/dev/...} path for primary nodes, and
1143
      C{True} for secondary nodes
1144

1145
  """
1146
  result = _RecursiveAssembleBD(disk, owner, as_primary)
1147
  if isinstance(result, bdev.BlockDev):
1148
    result = result.dev_path
1149
  return result
1150

    
1151

    
1152
def ShutdownBlockDevice(disk):
1153
  """Shut down a block device.
1154

1155
  First, if the device is assembled (Attach() is successfull), then
1156
  the device is shutdown. Then the children of the device are
1157
  shutdown.
1158

1159
  This function is called recursively. Note that we don't cache the
1160
  children or such, as oppossed to assemble, shutdown of different
1161
  devices doesn't require that the upper device was active.
1162

1163
  @type disk: L{objects.Disk}
1164
  @param disk: the description of the disk we should
1165
      shutdown
1166
  @rtype: boolean
1167
  @return: the success of the operation
1168

1169
  """
1170
  r_dev = _RecursiveFindBD(disk)
1171
  if r_dev is not None:
1172
    r_path = r_dev.dev_path
1173
    result = r_dev.Shutdown()
1174
    if result:
1175
      DevCacheManager.RemoveCache(r_path)
1176
  else:
1177
    result = True
1178
  if disk.children:
1179
    for child in disk.children:
1180
      result = result and ShutdownBlockDevice(child)
1181
  return result
1182

    
1183

    
1184
def MirrorAddChildren(parent_cdev, new_cdevs):
1185
  """Extend a mirrored block device.
1186

1187
  @type parent_cdev: L{objects.Disk}
1188
  @param parent_cdev: the disk to which we should add children
1189
  @type new_cdevs: list of L{objects.Disk}
1190
  @param new_cdevs: the list of children which we should add
1191
  @rtype: boolean
1192
  @return: the success of the operation
1193

1194
  """
1195
  parent_bdev = _RecursiveFindBD(parent_cdev)
1196
  if parent_bdev is None:
1197
    logging.error("Can't find parent device")
1198
    return False
1199
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1200
  if new_bdevs.count(None) > 0:
1201
    logging.error("Can't find new device(s) to add: %s:%s",
1202
                  new_bdevs, new_cdevs)
1203
    return False
1204
  parent_bdev.AddChildren(new_bdevs)
1205
  return True
1206

    
1207

    
1208
def MirrorRemoveChildren(parent_cdev, new_cdevs):
1209
  """Shrink a mirrored block device.
1210

1211
  @type parent_cdev: L{objects.Disk}
1212
  @param parent_cdev: the disk from which we should remove children
1213
  @type new_cdevs: list of L{objects.Disk}
1214
  @param new_cdevs: the list of children which we should remove
1215
  @rtype: boolean
1216
  @return: the success of the operation
1217

1218
  """
1219
  parent_bdev = _RecursiveFindBD(parent_cdev)
1220
  if parent_bdev is None:
1221
    logging.error("Can't find parent in remove children: %s", parent_cdev)
1222
    return False
1223
  devs = []
1224
  for disk in new_cdevs:
1225
    rpath = disk.StaticDevPath()
1226
    if rpath is None:
1227
      bd = _RecursiveFindBD(disk)
1228
      if bd is None:
1229
        logging.error("Can't find dynamic device %s while removing children",
1230
                      disk)
1231
        return False
1232
      else:
1233
        devs.append(bd.dev_path)
1234
    else:
1235
      devs.append(rpath)
1236
  parent_bdev.RemoveChildren(devs)
1237
  return True
1238

    
1239

    
1240
def GetMirrorStatus(disks):
1241
  """Get the mirroring status of a list of devices.
1242

1243
  @type disks: list of L{objects.Disk}
1244
  @param disks: the list of disks which we should query
1245
  @rtype: disk
1246
  @return:
1247
      a list of (mirror_done, estimated_time) tuples, which
1248
      are the result of L{bdev.BlockDev.CombinedSyncStatus}
1249
  @raise errors.BlockDeviceError: if any of the disks cannot be
1250
      found
1251

1252
  """
1253
  stats = []
1254
  for dsk in disks:
1255
    rbd = _RecursiveFindBD(dsk)
1256
    if rbd is None:
1257
      raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1258
    stats.append(rbd.CombinedSyncStatus())
1259
  return stats
1260

    
1261

    
1262
def _RecursiveFindBD(disk):
1263
  """Check if a device is activated.
1264

1265
  If so, return informations about the real device.
1266

1267
  @type disk: L{objects.Disk}
1268
  @param disk: the disk object we need to find
1269

1270
  @return: None if the device can't be found,
1271
      otherwise the device instance
1272

1273
  """
1274
  children = []
1275
  if disk.children:
1276
    for chdisk in disk.children:
1277
      children.append(_RecursiveFindBD(chdisk))
1278

    
1279
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1280

    
1281

    
1282
def FindBlockDevice(disk):
1283
  """Check if a device is activated.
1284

1285
  If it is, return informations about the real device.
1286

1287
  @type disk: L{objects.Disk}
1288
  @param disk: the disk to find
1289
  @rtype: None or tuple
1290
  @return: None if the disk cannot be found, otherwise a
1291
      tuple (device_path, major, minor, sync_percent,
1292
      estimated_time, is_degraded)
1293

1294
  """
1295
  rbd = _RecursiveFindBD(disk)
1296
  if rbd is None:
1297
    return rbd
1298
  return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1299

    
1300

    
1301
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1302
  """Write a file to the filesystem.
1303

1304
  This allows the master to overwrite(!) a file. It will only perform
1305
  the operation if the file belongs to a list of configuration files.
1306

1307
  @type file_name: str
1308
  @param file_name: the target file name
1309
  @type data: str
1310
  @param data: the new contents of the file
1311
  @type mode: int
1312
  @param mode: the mode to give the file (can be None)
1313
  @type uid: int
1314
  @param uid: the owner of the file (can be -1 for default)
1315
  @type gid: int
1316
  @param gid: the group of the file (can be -1 for default)
1317
  @type atime: float
1318
  @param atime: the atime to set on the file (can be None)
1319
  @type mtime: float
1320
  @param mtime: the mtime to set on the file (can be None)
1321
  @rtype: boolean
1322
  @return: the success of the operation; errors are logged
1323
      in the node daemon log
1324

1325
  """
1326
  if not os.path.isabs(file_name):
1327
    logging.error("Filename passed to UploadFile is not absolute: '%s'",
1328
                  file_name)
1329
    return False
1330

    
1331
  allowed_files = [
1332
    constants.CLUSTER_CONF_FILE,
1333
    constants.ETC_HOSTS,
1334
    constants.SSH_KNOWN_HOSTS_FILE,
1335
    constants.VNC_PASSWORD_FILE,
1336
    ]
1337

    
1338
  if file_name not in allowed_files:
1339
    logging.error("Filename passed to UploadFile not in allowed"
1340
                 " upload targets: '%s'", file_name)
1341
    return False
1342

    
1343
  raw_data = _Decompress(data)
1344

    
1345
  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1346
                  atime=atime, mtime=mtime)
1347
  return True
1348

    
1349

    
1350
def WriteSsconfFiles(values):
1351
  """Update all ssconf files.
1352

1353
  Wrapper around the SimpleStore.WriteFiles.
1354

1355
  """
1356
  ssconf.SimpleStore().WriteFiles(values)
1357

    
1358

    
1359
def _ErrnoOrStr(err):
1360
  """Format an EnvironmentError exception.
1361

1362
  If the L{err} argument has an errno attribute, it will be looked up
1363
  and converted into a textual C{E...} description. Otherwise the
1364
  string representation of the error will be returned.
1365

1366
  @type err: L{EnvironmentError}
1367
  @param err: the exception to format
1368

1369
  """
1370
  if hasattr(err, 'errno'):
1371
    detail = errno.errorcode[err.errno]
1372
  else:
1373
    detail = str(err)
1374
  return detail
1375

    
1376

    
1377
def _OSOndiskVersion(name, os_dir):
1378
  """Compute and return the API version of a given OS.
1379

1380
  This function will try to read the API version of the OS given by
1381
  the 'name' parameter and residing in the 'os_dir' directory.
1382

1383
  @type name: str
1384
  @param name: the OS name we should look for
1385
  @type os_dir: str
1386
  @param os_dir: the directory inwhich we should look for the OS
1387
  @rtype: int or None
1388
  @return:
1389
      Either an integer denoting the version or None in the
1390
      case when this is not a valid OS name.
1391
  @raise errors.InvalidOS: if the OS cannot be found
1392

1393
  """
1394
  api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1395

    
1396
  try:
1397
    st = os.stat(api_file)
1398
  except EnvironmentError, err:
1399
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1400
                           " found (%s)" % _ErrnoOrStr(err))
1401

    
1402
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1403
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1404
                           " a regular file")
1405

    
1406
  try:
1407
    f = open(api_file)
1408
    try:
1409
      api_versions = f.readlines()
1410
    finally:
1411
      f.close()
1412
  except EnvironmentError, err:
1413
    raise errors.InvalidOS(name, os_dir, "error while reading the"
1414
                           " API version (%s)" % _ErrnoOrStr(err))
1415

    
1416
  api_versions = [version.strip() for version in api_versions]
1417
  try:
1418
    api_versions = [int(version) for version in api_versions]
1419
  except (TypeError, ValueError), err:
1420
    raise errors.InvalidOS(name, os_dir,
1421
                           "API version is not integer (%s)" % str(err))
1422

    
1423
  return api_versions
1424

    
1425

    
1426
def DiagnoseOS(top_dirs=None):
1427
  """Compute the validity for all OSes.
1428

1429
  @type top_dirs: list
1430
  @param top_dirs: the list of directories in which to
1431
      search (if not given defaults to
1432
      L{constants.OS_SEARCH_PATH})
1433
  @rtype: list of L{objects.OS}
1434
  @return: an OS object for each name in all the given
1435
      directories
1436

1437
  """
1438
  if top_dirs is None:
1439
    top_dirs = constants.OS_SEARCH_PATH
1440

    
1441
  result = []
1442
  for dir_name in top_dirs:
1443
    if os.path.isdir(dir_name):
1444
      try:
1445
        f_names = utils.ListVisibleFiles(dir_name)
1446
      except EnvironmentError, err:
1447
        logging.exception("Can't list the OS directory %s", dir_name)
1448
        break
1449
      for name in f_names:
1450
        try:
1451
          os_inst = OSFromDisk(name, base_dir=dir_name)
1452
          result.append(os_inst)
1453
        except errors.InvalidOS, err:
1454
          result.append(objects.OS.FromInvalidOS(err))
1455

    
1456
  return result
1457

    
1458

    
1459
def OSFromDisk(name, base_dir=None):
1460
  """Create an OS instance from disk.
1461

1462
  This function will return an OS instance if the given name is a
1463
  valid OS name. Otherwise, it will raise an appropriate
1464
  L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1465

1466
  @type base_dir: string
1467
  @keyword base_dir: Base directory containing OS installations.
1468
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1469
  @rtype: L{objects.OS}
1470
  @return: the OS instance if we find a valid one
1471
  @raise errors.InvalidOS: if we don't find a valid OS
1472

1473
  """
1474
  if base_dir is None:
1475
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1476
    if os_dir is None:
1477
      raise errors.InvalidOS(name, None, "OS dir not found in search path")
1478
  else:
1479
    os_dir = os.path.sep.join([base_dir, name])
1480

    
1481
  api_versions = _OSOndiskVersion(name, os_dir)
1482

    
1483
  if constants.OS_API_VERSION not in api_versions:
1484
    raise errors.InvalidOS(name, os_dir, "API version mismatch"
1485
                           " (found %s want %s)"
1486
                           % (api_versions, constants.OS_API_VERSION))
1487

    
1488
  # OS Scripts dictionary, we will populate it with the actual script names
1489
  os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1490

    
1491
  for script in os_scripts:
1492
    os_scripts[script] = os.path.sep.join([os_dir, script])
1493

    
1494
    try:
1495
      st = os.stat(os_scripts[script])
1496
    except EnvironmentError, err:
1497
      raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1498
                             (script, _ErrnoOrStr(err)))
1499

    
1500
    if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1501
      raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1502
                             script)
1503

    
1504
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1505
      raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1506
                             script)
1507

    
1508

    
1509
  return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1510
                    create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1511
                    export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1512
                    import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1513
                    rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1514
                    api_versions=api_versions)
1515

    
1516
def OSEnvironment(instance, debug=0):
1517
  """Calculate the environment for an os script.
1518

1519
  @type instance: L{objects.Instance}
1520
  @param instance: target instance for the os script run
1521
  @type debug: integer
1522
  @param debug: debug level (0 or 1, for OS Api 10)
1523
  @rtype: dict
1524
  @return: dict of environment variables
1525
  @raise errors.BlockDeviceError: if the block device
1526
      cannot be found
1527

1528
  """
1529
  result = {}
1530
  result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1531
  result['INSTANCE_NAME'] = instance.name
1532
  result['HYPERVISOR'] = instance.hypervisor
1533
  result['DISK_COUNT'] = '%d' % len(instance.disks)
1534
  result['NIC_COUNT'] = '%d' % len(instance.nics)
1535
  result['DEBUG_LEVEL'] = '%d' % debug
1536
  for idx, disk in enumerate(instance.disks):
1537
    real_disk = _RecursiveFindBD(disk)
1538
    if real_disk is None:
1539
      raise errors.BlockDeviceError("Block device '%s' is not set up" %
1540
                                    str(disk))
1541
    real_disk.Open()
1542
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
1543
    # FIXME: When disks will have read-only mode, populate this
1544
    result['DISK_%d_ACCESS' % idx] = 'W'
1545
    if constants.HV_DISK_TYPE in instance.hvparams:
1546
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
1547
        instance.hvparams[constants.HV_DISK_TYPE]
1548
    if disk.dev_type in constants.LDS_BLOCK:
1549
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1550
    elif disk.dev_type == constants.LD_FILE:
1551
      result['DISK_%d_BACKEND_TYPE' % idx] = \
1552
        'file:%s' % disk.physical_id[0]
1553
  for idx, nic in enumerate(instance.nics):
1554
    result['NIC_%d_MAC' % idx] = nic.mac
1555
    if nic.ip:
1556
      result['NIC_%d_IP' % idx] = nic.ip
1557
    result['NIC_%d_BRIDGE' % idx] = nic.bridge
1558
    if constants.HV_NIC_TYPE in instance.hvparams:
1559
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
1560
        instance.hvparams[constants.HV_NIC_TYPE]
1561

    
1562
  return result
1563

    
1564
def GrowBlockDevice(disk, amount):
1565
  """Grow a stack of block devices.
1566

1567
  This function is called recursively, with the childrens being the
1568
  first ones to resize.
1569

1570
  @type disk: L{objects.Disk}
1571
  @param disk: the disk to be grown
1572
  @rtype: (status, result)
1573
  @return: a tuple with the status of the operation
1574
      (True/False), and the errors message if status
1575
      is False
1576

1577
  """
1578
  r_dev = _RecursiveFindBD(disk)
1579
  if r_dev is None:
1580
    return False, "Cannot find block device %s" % (disk,)
1581

    
1582
  try:
1583
    r_dev.Grow(amount)
1584
  except errors.BlockDeviceError, err:
1585
    return False, str(err)
1586

    
1587
  return True, None
1588

    
1589

    
1590
def SnapshotBlockDevice(disk):
1591
  """Create a snapshot copy of a block device.
1592

1593
  This function is called recursively, and the snapshot is actually created
1594
  just for the leaf lvm backend device.
1595

1596
  @type disk: L{objects.Disk}
1597
  @param disk: the disk to be snapshotted
1598
  @rtype: string
1599
  @return: snapshot disk path
1600

1601
  """
1602
  if disk.children:
1603
    if len(disk.children) == 1:
1604
      # only one child, let's recurse on it
1605
      return SnapshotBlockDevice(disk.children[0])
1606
    else:
1607
      # more than one child, choose one that matches
1608
      for child in disk.children:
1609
        if child.size == disk.size:
1610
          # return implies breaking the loop
1611
          return SnapshotBlockDevice(child)
1612
  elif disk.dev_type == constants.LD_LV:
1613
    r_dev = _RecursiveFindBD(disk)
1614
    if r_dev is not None:
1615
      # let's stay on the safe side and ask for the full size, for now
1616
      return r_dev.Snapshot(disk.size)
1617
    else:
1618
      return None
1619
  else:
1620
    raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1621
                                 " '%s' of type '%s'" %
1622
                                 (disk.unique_id, disk.dev_type))
1623

    
1624

    
1625
def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1626
  """Export a block device snapshot to a remote node.
1627

1628
  @type disk: L{objects.Disk}
1629
  @param disk: the description of the disk to export
1630
  @type dest_node: str
1631
  @param dest_node: the destination node to export to
1632
  @type instance: L{objects.Instance}
1633
  @param instance: the instance object to whom the disk belongs
1634
  @type cluster_name: str
1635
  @param cluster_name: the cluster name, needed for SSH hostalias
1636
  @type idx: int
1637
  @param idx: the index of the disk in the instance's disk list,
1638
      used to export to the OS scripts environment
1639
  @rtype: boolean
1640
  @return: the success of the operation
1641

1642
  """
1643
  export_env = OSEnvironment(instance)
1644

    
1645
  inst_os = OSFromDisk(instance.os)
1646
  export_script = inst_os.export_script
1647

    
1648
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1649
                                     instance.name, int(time.time()))
1650
  if not os.path.exists(constants.LOG_OS_DIR):
1651
    os.mkdir(constants.LOG_OS_DIR, 0750)
1652
  real_disk = _RecursiveFindBD(disk)
1653
  if real_disk is None:
1654
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1655
                                  str(disk))
1656
  real_disk.Open()
1657

    
1658
  export_env['EXPORT_DEVICE'] = real_disk.dev_path
1659
  export_env['EXPORT_INDEX'] = str(idx)
1660

    
1661
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1662
  destfile = disk.physical_id[1]
1663

    
1664
  # the target command is built out of three individual commands,
1665
  # which are joined by pipes; we check each individual command for
1666
  # valid parameters
1667
  expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1668
                               export_script, logfile)
1669

    
1670
  comprcmd = "gzip"
1671

    
1672
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1673
                                destdir, destdir, destfile)
1674
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1675
                                                   constants.GANETI_RUNAS,
1676
                                                   destcmd)
1677

    
1678
  # all commands have been checked, so we're safe to combine them
1679
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1680

    
1681
  result = utils.RunCmd(command, env=export_env)
1682

    
1683
  if result.failed:
1684
    logging.error("os snapshot export command '%s' returned error: %s"
1685
                  " output: %s", command, result.fail_reason, result.output)
1686
    return False
1687

    
1688
  return True
1689

    
1690

    
1691
def FinalizeExport(instance, snap_disks):
1692
  """Write out the export configuration information.
1693

1694
  @type instance: L{objects.Instance}
1695
  @param instance: the instance which we export, used for
1696
      saving configuration
1697
  @type snap_disks: list of L{objects.Disk}
1698
  @param snap_disks: list of snapshot block devices, which
1699
      will be used to get the actual name of the dump file
1700

1701
  @rtype: boolean
1702
  @return: the success of the operation
1703

1704
  """
1705
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1706
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1707

    
1708
  config = objects.SerializableConfigParser()
1709

    
1710
  config.add_section(constants.INISECT_EXP)
1711
  config.set(constants.INISECT_EXP, 'version', '0')
1712
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1713
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1714
  config.set(constants.INISECT_EXP, 'os', instance.os)
1715
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
1716

    
1717
  config.add_section(constants.INISECT_INS)
1718
  config.set(constants.INISECT_INS, 'name', instance.name)
1719
  config.set(constants.INISECT_INS, 'memory', '%d' %
1720
             instance.beparams[constants.BE_MEMORY])
1721
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
1722
             instance.beparams[constants.BE_VCPUS])
1723
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1724

    
1725
  nic_total = 0
1726
  for nic_count, nic in enumerate(instance.nics):
1727
    nic_total += 1
1728
    config.set(constants.INISECT_INS, 'nic%d_mac' %
1729
               nic_count, '%s' % nic.mac)
1730
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1731
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1732
               '%s' % nic.bridge)
1733
  # TODO: redundant: on load can read nics until it doesn't exist
1734
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1735

    
1736
  disk_total = 0
1737
  for disk_count, disk in enumerate(snap_disks):
1738
    if disk:
1739
      disk_total += 1
1740
      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1741
                 ('%s' % disk.iv_name))
1742
      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1743
                 ('%s' % disk.physical_id[1]))
1744
      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1745
                 ('%d' % disk.size))
1746

    
1747
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1748

    
1749
  utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1750
                  data=config.Dumps())
1751
  shutil.rmtree(finaldestdir, True)
1752
  shutil.move(destdir, finaldestdir)
1753

    
1754
  return True
1755

    
1756

    
1757
def ExportInfo(dest):
1758
  """Get export configuration information.
1759

1760
  @type dest: str
1761
  @param dest: directory containing the export
1762

1763
  @rtype: L{objects.SerializableConfigParser}
1764
  @return: a serializable config file containing the
1765
      export info
1766

1767
  """
1768
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1769

    
1770
  config = objects.SerializableConfigParser()
1771
  config.read(cff)
1772

    
1773
  if (not config.has_section(constants.INISECT_EXP) or
1774
      not config.has_section(constants.INISECT_INS)):
1775
    return None
1776

    
1777
  return config
1778

    
1779

    
1780
def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1781
  """Import an os image into an instance.
1782

1783
  @type instance: L{objects.Instance}
1784
  @param instance: instance to import the disks into
1785
  @type src_node: string
1786
  @param src_node: source node for the disk images
1787
  @type src_images: list of string
1788
  @param src_images: absolute paths of the disk images
1789
  @rtype: list of boolean
1790
  @return: each boolean represent the success of importing the n-th disk
1791

1792
  """
1793
  import_env = OSEnvironment(instance)
1794
  inst_os = OSFromDisk(instance.os)
1795
  import_script = inst_os.import_script
1796

    
1797
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1798
                                        instance.name, int(time.time()))
1799
  if not os.path.exists(constants.LOG_OS_DIR):
1800
    os.mkdir(constants.LOG_OS_DIR, 0750)
1801

    
1802
  comprcmd = "gunzip"
1803
  impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1804
                               import_script, logfile)
1805

    
1806
  final_result = []
1807
  for idx, image in enumerate(src_images):
1808
    if image:
1809
      destcmd = utils.BuildShellCmd('cat %s', image)
1810
      remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1811
                                                       constants.GANETI_RUNAS,
1812
                                                       destcmd)
1813
      command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1814
      import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1815
      import_env['IMPORT_INDEX'] = str(idx)
1816
      result = utils.RunCmd(command, env=import_env)
1817
      if result.failed:
1818
        logging.error("Disk import command '%s' returned error: %s"
1819
                      " output: %s", command, result.fail_reason,
1820
                      result.output)
1821
        final_result.append(False)
1822
      else:
1823
        final_result.append(True)
1824
    else:
1825
      final_result.append(True)
1826

    
1827
  return final_result
1828

    
1829

    
1830
def ListExports():
1831
  """Return a list of exports currently available on this machine.
1832

1833
  @rtype: list
1834
  @return: list of the exports
1835

1836
  """
1837
  if os.path.isdir(constants.EXPORT_DIR):
1838
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
1839
  else:
1840
    return []
1841

    
1842

    
1843
def RemoveExport(export):
1844
  """Remove an existing export from the node.
1845

1846
  @type export: str
1847
  @param export: the name of the export to remove
1848
  @rtype: boolean
1849
  @return: the success of the operation
1850

1851
  """
1852
  target = os.path.join(constants.EXPORT_DIR, export)
1853

    
1854
  shutil.rmtree(target)
1855
  # TODO: catch some of the relevant exceptions and provide a pretty
1856
  # error message if rmtree fails.
1857

    
1858
  return True
1859

    
1860

    
1861
def RenameBlockDevices(devlist):
1862
  """Rename a list of block devices.
1863

1864
  @type devlist: list of tuples
1865
  @param devlist: list of tuples of the form  (disk,
1866
      new_logical_id, new_physical_id); disk is an
1867
      L{objects.Disk} object describing the current disk,
1868
      and new logical_id/physical_id is the name we
1869
      rename it to
1870
  @rtype: boolean
1871
  @return: True if all renames succeeded, False otherwise
1872

1873
  """
1874
  result = True
1875
  for disk, unique_id in devlist:
1876
    dev = _RecursiveFindBD(disk)
1877
    if dev is None:
1878
      result = False
1879
      continue
1880
    try:
1881
      old_rpath = dev.dev_path
1882
      dev.Rename(unique_id)
1883
      new_rpath = dev.dev_path
1884
      if old_rpath != new_rpath:
1885
        DevCacheManager.RemoveCache(old_rpath)
1886
        # FIXME: we should add the new cache information here, like:
1887
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1888
        # but we don't have the owner here - maybe parse from existing
1889
        # cache? for now, we only lose lvm data when we rename, which
1890
        # is less critical than DRBD or MD
1891
    except errors.BlockDeviceError, err:
1892
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1893
      result = False
1894
  return result
1895

    
1896

    
1897
def _TransformFileStorageDir(file_storage_dir):
1898
  """Checks whether given file_storage_dir is valid.
1899

1900
  Checks wheter the given file_storage_dir is within the cluster-wide
1901
  default file_storage_dir stored in SimpleStore. Only paths under that
1902
  directory are allowed.
1903

1904
  @type file_storage_dir: str
1905
  @param file_storage_dir: the path to check
1906

1907
  @return: the normalized path if valid, None otherwise
1908

1909
  """
1910
  cfg = _GetConfig()
1911
  file_storage_dir = os.path.normpath(file_storage_dir)
1912
  base_file_storage_dir = cfg.GetFileStorageDir()
1913
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1914
      base_file_storage_dir):
1915
    logging.error("file storage directory '%s' is not under base file"
1916
                  " storage directory '%s'",
1917
                  file_storage_dir, base_file_storage_dir)
1918
    return None
1919
  return file_storage_dir
1920

    
1921

    
1922
def CreateFileStorageDir(file_storage_dir):
1923
  """Create file storage directory.
1924

1925
  @type file_storage_dir: str
1926
  @param file_storage_dir: directory to create
1927

1928
  @rtype: tuple
1929
  @return: tuple with first element a boolean indicating wheter dir
1930
      creation was successful or not
1931

1932
  """
1933
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1934
  result = True,
1935
  if not file_storage_dir:
1936
    result = False,
1937
  else:
1938
    if os.path.exists(file_storage_dir):
1939
      if not os.path.isdir(file_storage_dir):
1940
        logging.error("'%s' is not a directory", file_storage_dir)
1941
        result = False,
1942
    else:
1943
      try:
1944
        os.makedirs(file_storage_dir, 0750)
1945
      except OSError, err:
1946
        logging.error("Cannot create file storage directory '%s': %s",
1947
                      file_storage_dir, err)
1948
        result = False,
1949
  return result
1950

    
1951

    
1952
def RemoveFileStorageDir(file_storage_dir):
1953
  """Remove file storage directory.
1954

1955
  Remove it only if it's empty. If not log an error and return.
1956

1957
  @type file_storage_dir: str
1958
  @param file_storage_dir: the directory we should cleanup
1959
  @rtype: tuple (success,)
1960
  @return: tuple of one element, C{success}, denoting
1961
      whether the operation was successfull
1962

1963
  """
1964
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1965
  result = True,
1966
  if not file_storage_dir:
1967
    result = False,
1968
  else:
1969
    if os.path.exists(file_storage_dir):
1970
      if not os.path.isdir(file_storage_dir):
1971
        logging.error("'%s' is not a directory", file_storage_dir)
1972
        result = False,
1973
      # deletes dir only if empty, otherwise we want to return False
1974
      try:
1975
        os.rmdir(file_storage_dir)
1976
      except OSError, err:
1977
        logging.exception("Cannot remove file storage directory '%s'",
1978
                          file_storage_dir)
1979
        result = False,
1980
  return result
1981

    
1982

    
1983
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1984
  """Rename the file storage directory.
1985

1986
  @type old_file_storage_dir: str
1987
  @param old_file_storage_dir: the current path
1988
  @type new_file_storage_dir: str
1989
  @param new_file_storage_dir: the name we should rename to
1990
  @rtype: tuple (success,)
1991
  @return: tuple of one element, C{success}, denoting
1992
      whether the operation was successful
1993

1994
  """
1995
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1996
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1997
  result = True,
1998
  if not old_file_storage_dir or not new_file_storage_dir:
1999
    result = False,
2000
  else:
2001
    if not os.path.exists(new_file_storage_dir):
2002
      if os.path.isdir(old_file_storage_dir):
2003
        try:
2004
          os.rename(old_file_storage_dir, new_file_storage_dir)
2005
        except OSError, err:
2006
          logging.exception("Cannot rename '%s' to '%s'",
2007
                            old_file_storage_dir, new_file_storage_dir)
2008
          result =  False,
2009
      else:
2010
        logging.error("'%s' is not a directory", old_file_storage_dir)
2011
        result = False,
2012
    else:
2013
      if os.path.exists(old_file_storage_dir):
2014
        logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
2015
                      old_file_storage_dir, new_file_storage_dir)
2016
        result = False,
2017
  return result
2018

    
2019

    
2020
def _IsJobQueueFile(file_name):
2021
  """Checks whether the given filename is in the queue directory.
2022

2023
  @type file_name: str
2024
  @param file_name: the file name we should check
2025
  @rtype: boolean
2026
  @return: whether the file is under the queue directory
2027

2028
  """
2029
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2030
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2031

    
2032
  if not result:
2033
    logging.error("'%s' is not a file in the queue directory",
2034
                  file_name)
2035

    
2036
  return result
2037

    
2038

    
2039
def JobQueueUpdate(file_name, content):
2040
  """Updates a file in the queue directory.
2041

2042
  This is just a wrapper over L{utils.WriteFile}, with proper
2043
  checking.
2044

2045
  @type file_name: str
2046
  @param file_name: the job file name
2047
  @type content: str
2048
  @param content: the new job contents
2049
  @rtype: boolean
2050
  @return: the success of the operation
2051

2052
  """
2053
  if not _IsJobQueueFile(file_name):
2054
    return False
2055

    
2056
  # Write and replace the file atomically
2057
  utils.WriteFile(file_name, data=_Decompress(content))
2058

    
2059
  return True
2060

    
2061

    
2062
def JobQueueRename(old, new):
2063
  """Renames a job queue file.
2064

2065
  This is just a wrapper over os.rename with proper checking.
2066

2067
  @type old: str
2068
  @param old: the old (actual) file name
2069
  @type new: str
2070
  @param new: the desired file name
2071
  @rtype: boolean
2072
  @return: the success of the operation
2073

2074
  """
2075
  if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2076
    return False
2077

    
2078
  utils.RenameFile(old, new, mkdir=True)
2079

    
2080
  return True
2081

    
2082

    
2083
def JobQueueSetDrainFlag(drain_flag):
2084
  """Set the drain flag for the queue.
2085

2086
  This will set or unset the queue drain flag.
2087

2088
  @type drain_flag: boolean
2089
  @param drain_flag: if True, will set the drain flag, otherwise reset it.
2090
  @rtype: boolean
2091
  @return: always True
2092
  @warning: the function always returns True
2093

2094
  """
2095
  if drain_flag:
2096
    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2097
  else:
2098
    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2099

    
2100
  return True
2101

    
2102

    
2103
def CloseBlockDevices(instance_name, disks):
2104
  """Closes the given block devices.
2105

2106
  This means they will be switched to secondary mode (in case of
2107
  DRBD).
2108

2109
  @param instance_name: if the argument is not empty, the symlinks
2110
      of this instance will be removed
2111
  @type disks: list of L{objects.Disk}
2112
  @param disks: the list of disks to be closed
2113
  @rtype: tuple (success, message)
2114
  @return: a tuple of success and message, where success
2115
      indicates the succes of the operation, and message
2116
      which will contain the error details in case we
2117
      failed
2118

2119
  """
2120
  bdevs = []
2121
  for cf in disks:
2122
    rd = _RecursiveFindBD(cf)
2123
    if rd is None:
2124
      return (False, "Can't find device %s" % cf)
2125
    bdevs.append(rd)
2126

    
2127
  msg = []
2128
  for rd in bdevs:
2129
    try:
2130
      rd.Close()
2131
    except errors.BlockDeviceError, err:
2132
      msg.append(str(err))
2133
  if msg:
2134
    return (False, "Can't make devices secondary: %s" % ",".join(msg))
2135
  else:
2136
    if instance_name:
2137
      _RemoveBlockDevLinks(instance_name, disks)
2138
    return (True, "All devices secondary")
2139

    
2140

    
2141
def ValidateHVParams(hvname, hvparams):
2142
  """Validates the given hypervisor parameters.
2143

2144
  @type hvname: string
2145
  @param hvname: the hypervisor name
2146
  @type hvparams: dict
2147
  @param hvparams: the hypervisor parameters to be validated
2148
  @rtype: tuple (success, message)
2149
  @return: a tuple of success and message, where success
2150
      indicates the succes of the operation, and message
2151
      which will contain the error details in case we
2152
      failed
2153

2154
  """
2155
  try:
2156
    hv_type = hypervisor.GetHypervisor(hvname)
2157
    hv_type.ValidateParameters(hvparams)
2158
    return (True, "Validation passed")
2159
  except errors.HypervisorError, err:
2160
    return (False, str(err))
2161

    
2162

    
2163
def DemoteFromMC():
2164
  """Demotes the current node from master candidate role.
2165

2166
  """
2167
  # try to ensure we're not the master by mistake
2168
  master, myself = ssconf.GetMasterAndMyself()
2169
  if master == myself:
2170
    return (False, "ssconf status shows I'm the master node, will not demote")
2171
  pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2172
  if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2173
    return (False, "The master daemon is running, will not demote")
2174
  try:
2175
    utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2176
  except EnvironmentError, err:
2177
    if err.errno != errno.ENOENT:
2178
      return (False, "Error while backing up cluster file: %s" % str(err))
2179
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2180
  return (True, "Done")
2181

    
2182

    
2183
def _FindDisks(nodes_ip, disks):
2184
  """Sets the physical ID on disks and returns the block devices.
2185

2186
  """
2187
  # set the correct physical ID
2188
  my_name = utils.HostInfo().name
2189
  for cf in disks:
2190
    cf.SetPhysicalID(my_name, nodes_ip)
2191

    
2192
  bdevs = []
2193

    
2194
  for cf in disks:
2195
    rd = _RecursiveFindBD(cf)
2196
    if rd is None:
2197
      return (False, "Can't find device %s" % cf)
2198
    bdevs.append(rd)
2199
  return (True, bdevs)
2200

    
2201

    
2202
def DrbdDisconnectNet(nodes_ip, disks):
2203
  """Disconnects the network on a list of drbd devices.
2204

2205
  """
2206
  status, bdevs = _FindDisks(nodes_ip, disks)
2207
  if not status:
2208
    return status, bdevs
2209

    
2210
  # disconnect disks
2211
  for rd in bdevs:
2212
    try:
2213
      rd.DisconnectNet()
2214
    except errors.BlockDeviceError, err:
2215
      logging.exception("Failed to go into standalone mode")
2216
      return (False, "Can't change network configuration: %s" % str(err))
2217
  return (True, "All disks are now disconnected")
2218

    
2219

    
2220
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2221
  """Attaches the network on a list of drbd devices.
2222

2223
  """
2224
  status, bdevs = _FindDisks(nodes_ip, disks)
2225
  if not status:
2226
    return status, bdevs
2227

    
2228
  if multimaster:
2229
    for idx, rd in enumerate(bdevs):
2230
      try:
2231
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2232
      except EnvironmentError, err:
2233
        return (False, "Can't create symlink: %s" % str(err))
2234
  # reconnect disks, switch to new master configuration and if
2235
  # needed primary mode
2236
  for rd in bdevs:
2237
    try:
2238
      rd.AttachNet(multimaster)
2239
    except errors.BlockDeviceError, err:
2240
      return (False, "Can't change network configuration: %s" % str(err))
2241
  # wait until the disks are connected; we need to retry the re-attach
2242
  # if the device becomes standalone, as this might happen if the one
2243
  # node disconnects and reconnects in a different mode before the
2244
  # other node reconnects; in this case, one or both of the nodes will
2245
  # decide it has wrong configuration and switch to standalone
2246
  RECONNECT_TIMEOUT = 2 * 60
2247
  sleep_time = 0.100 # start with 100 miliseconds
2248
  timeout_limit = time.time() + RECONNECT_TIMEOUT
2249
  while time.time() < timeout_limit:
2250
    all_connected = True
2251
    for rd in bdevs:
2252
      stats = rd.GetProcStatus()
2253
      if not (stats.is_connected or stats.is_in_resync):
2254
        all_connected = False
2255
      if stats.is_standalone:
2256
        # peer had different config info and this node became
2257
        # standalone, even though this should not happen with the
2258
        # new staged way of changing disk configs
2259
        try:
2260
          rd.ReAttachNet(multimaster)
2261
        except errors.BlockDeviceError, err:
2262
          return (False, "Can't change network configuration: %s" % str(err))
2263
    if all_connected:
2264
      break
2265
    time.sleep(sleep_time)
2266
    sleep_time = min(5, sleep_time * 1.5)
2267
  if not all_connected:
2268
    return (False, "Timeout in disk reconnecting")
2269
  if multimaster:
2270
    # change to primary mode
2271
    for rd in bdevs:
2272
      rd.Open()
2273
  if multimaster:
2274
    msg = "multi-master and primary"
2275
  else:
2276
    msg = "single-master"
2277
  return (True, "Disks are now configured as %s" % msg)
2278

    
2279

    
2280
def DrbdWaitSync(nodes_ip, disks):
2281
  """Wait until DRBDs have synchronized.
2282

2283
  """
2284
  status, bdevs = _FindDisks(nodes_ip, disks)
2285
  if not status:
2286
    return status, bdevs
2287

    
2288
  min_resync = 100
2289
  alldone = True
2290
  failure = False
2291
  for rd in bdevs:
2292
    stats = rd.GetProcStatus()
2293
    if not (stats.is_connected or stats.is_in_resync):
2294
      failure = True
2295
      break
2296
    alldone = alldone and (not stats.is_in_resync)
2297
    if stats.sync_percent is not None:
2298
      min_resync = min(min_resync, stats.sync_percent)
2299
  return (not failure, (alldone, min_resync))
2300

    
2301

    
2302
class HooksRunner(object):
2303
  """Hook runner.
2304

2305
  This class is instantiated on the node side (ganeti-noded) and not
2306
  on the master side.
2307

2308
  """
2309
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2310

    
2311
  def __init__(self, hooks_base_dir=None):
2312
    """Constructor for hooks runner.
2313

2314
    @type hooks_base_dir: str or None
2315
    @param hooks_base_dir: if not None, this overrides the
2316
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
2317

2318
    """
2319
    if hooks_base_dir is None:
2320
      hooks_base_dir = constants.HOOKS_BASE_DIR
2321
    self._BASE_DIR = hooks_base_dir
2322

    
2323
  @staticmethod
2324
  def ExecHook(script, env):
2325
    """Exec one hook script.
2326

2327
    @type script: str
2328
    @param script: the full path to the script
2329
    @type env: dict
2330
    @param env: the environment with which to exec the script
2331
    @rtype: tuple (success, message)
2332
    @return: a tuple of success and message, where success
2333
        indicates the succes of the operation, and message
2334
        which will contain the error details in case we
2335
        failed
2336

2337
    """
2338
    # exec the process using subprocess and log the output
2339
    fdstdin = None
2340
    try:
2341
      fdstdin = open("/dev/null", "r")
2342
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2343
                               stderr=subprocess.STDOUT, close_fds=True,
2344
                               shell=False, cwd="/", env=env)
2345
      output = ""
2346
      try:
2347
        output = child.stdout.read(4096)
2348
        child.stdout.close()
2349
      except EnvironmentError, err:
2350
        output += "Hook script error: %s" % str(err)
2351

    
2352
      while True:
2353
        try:
2354
          result = child.wait()
2355
          break
2356
        except EnvironmentError, err:
2357
          if err.errno == errno.EINTR:
2358
            continue
2359
          raise
2360
    finally:
2361
      # try not to leak fds
2362
      for fd in (fdstdin, ):
2363
        if fd is not None:
2364
          try:
2365
            fd.close()
2366
          except EnvironmentError, err:
2367
            # just log the error
2368
            #logging.exception("Error while closing fd %s", fd)
2369
            pass
2370

    
2371
    return result == 0, output
2372

    
2373
  def RunHooks(self, hpath, phase, env):
2374
    """Run the scripts in the hooks directory.
2375

2376
    @type hpath: str
2377
    @param hpath: the path to the hooks directory which
2378
        holds the scripts
2379
    @type phase: str
2380
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
2381
        L{constants.HOOKS_PHASE_POST}
2382
    @type env: dict
2383
    @param env: dictionary with the environment for the hook
2384
    @rtype: list
2385
    @return: list of 3-element tuples:
2386
      - script path
2387
      - script result, either L{constants.HKR_SUCCESS} or
2388
        L{constants.HKR_FAIL}
2389
      - output of the script
2390

2391
    @raise errors.ProgrammerError: for invalid input
2392
        parameters
2393

2394
    """
2395
    if phase == constants.HOOKS_PHASE_PRE:
2396
      suffix = "pre"
2397
    elif phase == constants.HOOKS_PHASE_POST:
2398
      suffix = "post"
2399
    else:
2400
      raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2401
    rr = []
2402

    
2403
    subdir = "%s-%s.d" % (hpath, suffix)
2404
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2405
    try:
2406
      dir_contents = utils.ListVisibleFiles(dir_name)
2407
    except OSError, err:
2408
      # FIXME: must log output in case of failures
2409
      return rr
2410

    
2411
    # we use the standard python sort order,
2412
    # so 00name is the recommended naming scheme
2413
    dir_contents.sort()
2414
    for relname in dir_contents:
2415
      fname = os.path.join(dir_name, relname)
2416
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2417
          self.RE_MASK.match(relname) is not None):
2418
        rrval = constants.HKR_SKIP
2419
        output = ""
2420
      else:
2421
        result, output = self.ExecHook(fname, env)
2422
        if not result:
2423
          rrval = constants.HKR_FAIL
2424
        else:
2425
          rrval = constants.HKR_SUCCESS
2426
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
2427

    
2428
    return rr
2429

    
2430

    
2431
class IAllocatorRunner(object):
2432
  """IAllocator runner.
2433

2434
  This class is instantiated on the node side (ganeti-noded) and not on
2435
  the master side.
2436

2437
  """
2438
  def Run(self, name, idata):
2439
    """Run an iallocator script.
2440

2441
    @type name: str
2442
    @param name: the iallocator script name
2443
    @type idata: str
2444
    @param idata: the allocator input data
2445

2446
    @rtype: tuple
2447
    @return: four element tuple of:
2448
       - run status (one of the IARUN_ constants)
2449
       - stdout
2450
       - stderr
2451
       - fail reason (as from L{utils.RunResult})
2452

2453
    """
2454
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2455
                                  os.path.isfile)
2456
    if alloc_script is None:
2457
      return (constants.IARUN_NOTFOUND, None, None, None)
2458

    
2459
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2460
    try:
2461
      os.write(fd, idata)
2462
      os.close(fd)
2463
      result = utils.RunCmd([alloc_script, fin_name])
2464
      if result.failed:
2465
        return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2466
                result.fail_reason)
2467
    finally:
2468
      os.unlink(fin_name)
2469

    
2470
    return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2471

    
2472

    
2473
class DevCacheManager(object):
2474
  """Simple class for managing a cache of block device information.
2475

2476
  """
2477
  _DEV_PREFIX = "/dev/"
2478
  _ROOT_DIR = constants.BDEV_CACHE_DIR
2479

    
2480
  @classmethod
2481
  def _ConvertPath(cls, dev_path):
2482
    """Converts a /dev/name path to the cache file name.
2483

2484
    This replaces slashes with underscores and strips the /dev
2485
    prefix. It then returns the full path to the cache file.
2486

2487
    @type dev_path: str
2488
    @param dev_path: the C{/dev/} path name
2489
    @rtype: str
2490
    @return: the converted path name
2491

2492
    """
2493
    if dev_path.startswith(cls._DEV_PREFIX):
2494
      dev_path = dev_path[len(cls._DEV_PREFIX):]
2495
    dev_path = dev_path.replace("/", "_")
2496
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2497
    return fpath
2498

    
2499
  @classmethod
2500
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2501
    """Updates the cache information for a given device.
2502

2503
    @type dev_path: str
2504
    @param dev_path: the pathname of the device
2505
    @type owner: str
2506
    @param owner: the owner (instance name) of the device
2507
    @type on_primary: bool
2508
    @param on_primary: whether this is the primary
2509
        node nor not
2510
    @type iv_name: str
2511
    @param iv_name: the instance-visible name of the
2512
        device, as in objects.Disk.iv_name
2513

2514
    @rtype: None
2515

2516
    """
2517
    if dev_path is None:
2518
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
2519
      return
2520
    fpath = cls._ConvertPath(dev_path)
2521
    if on_primary:
2522
      state = "primary"
2523
    else:
2524
      state = "secondary"
2525
    if iv_name is None:
2526
      iv_name = "not_visible"
2527
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2528
    try:
2529
      utils.WriteFile(fpath, data=fdata)
2530
    except EnvironmentError, err:
2531
      logging.exception("Can't update bdev cache for %s", dev_path)
2532

    
2533
  @classmethod
2534
  def RemoveCache(cls, dev_path):
2535
    """Remove data for a dev_path.
2536

2537
    This is just a wrapper over L{utils.RemoveFile} with a converted
2538
    path name and logging.
2539

2540
    @type dev_path: str
2541
    @param dev_path: the pathname of the device
2542

2543
    @rtype: None
2544

2545
    """
2546
    if dev_path is None:
2547
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
2548
      return
2549
    fpath = cls._ConvertPath(dev_path)
2550
    try:
2551
      utils.RemoveFile(fpath)
2552
    except EnvironmentError, err:
2553
      logging.exception("Can't update bdev cache for %s", dev_path)