Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ fae737ac

History | View | Annotate | Download (17.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling."""
23

    
24
import os
25
import logging
26
import threading
27
import errno
28
import re
29
import time
30

    
31
from ganeti import constants
32
from ganeti import serializer
33
from ganeti import workerpool
34
from ganeti import opcodes
35
from ganeti import errors
36
from ganeti import mcpu
37
from ganeti import utils
38
from ganeti import rpc
39

    
40

    
41
JOBQUEUE_THREADS = 5
42

    
43

    
44
class _QueuedOpCode(object):
45
  """Encasulates an opcode object.
46

47
  Access is synchronized by the '_lock' attribute.
48

49
  The 'log' attribute holds the execution log and consists of tuples
50
  of the form (timestamp, level, message).
51

52
  """
53
  def __init__(self, op):
54
    self.__Setup(op, constants.OP_STATUS_QUEUED, None, [])
55

    
56
  def __Setup(self, input_, status, result, log):
57
    self._lock = threading.Lock()
58
    self.input = input_
59
    self.status = status
60
    self.result = result
61
    self.log = log
62

    
63
  @classmethod
64
  def Restore(cls, state):
65
    obj = object.__new__(cls)
66
    obj.__Setup(opcodes.OpCode.LoadOpCode(state["input"]),
67
                state["status"], state["result"], state["log"])
68
    return obj
69

    
70
  @utils.LockedMethod
71
  def Serialize(self):
72
    return {
73
      "input": self.input.__getstate__(),
74
      "status": self.status,
75
      "result": self.result,
76
      "log": self.log,
77
      }
78

    
79
  @utils.LockedMethod
80
  def GetInput(self):
81
    """Returns the original opcode.
82

83
    """
84
    return self.input
85

    
86
  @utils.LockedMethod
87
  def SetStatus(self, status, result):
88
    """Update the opcode status and result.
89

90
    """
91
    self.status = status
92
    self.result = result
93

    
94
  @utils.LockedMethod
95
  def GetStatus(self):
96
    """Get the opcode status.
97

98
    """
99
    return self.status
100

    
101
  @utils.LockedMethod
102
  def GetResult(self):
103
    """Get the opcode result.
104

105
    """
106
    return self.result
107

    
108
  @utils.LockedMethod
109
  def Log(self, *args):
110
    """Append a log entry.
111

112
    """
113
    assert len(args) < 2
114

    
115
    if len(args) == 1:
116
      log_type = constants.ELOG_MESSAGE
117
      log_msg = args[0]
118
    else:
119
      log_type, log_msg = args
120
    self.log.append((time.time(), log_type, log_msg))
121

    
122
  @utils.LockedMethod
123
  def RetrieveLog(self, start_at=0):
124
    """Retrieve (a part of) the execution log.
125

126
    """
127
    return self.log[start_at:]
128

    
129

    
130
class _QueuedJob(object):
131
  """In-memory job representation.
132

133
  This is what we use to track the user-submitted jobs.
134

135
  """
136
  def __init__(self, storage, job_id, ops):
137
    if not ops:
138
      # TODO
139
      raise Exception("No opcodes")
140

    
141
    self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops], -1)
142

    
143
  def __Setup(self, storage, job_id, ops, run_op_index):
144
    self._lock = threading.Lock()
145
    self.storage = storage
146
    self.id = job_id
147
    self._ops = ops
148
    self.run_op_index = run_op_index
149

    
150
  @classmethod
151
  def Restore(cls, storage, state):
152
    obj = object.__new__(cls)
153
    op_list = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
154
    obj.__Setup(storage, state["id"], op_list, state["run_op_index"])
155
    return obj
156

    
157
  def Serialize(self):
158
    return {
159
      "id": self.id,
160
      "ops": [op.Serialize() for op in self._ops],
161
      "run_op_index": self.run_op_index,
162
      }
163

    
164
  def SetUnclean(self, msg):
165
    try:
166
      for op in self._ops:
167
        op.SetStatus(constants.OP_STATUS_ERROR, msg)
168
    finally:
169
      self.storage.UpdateJob(self)
170

    
171
  def GetStatus(self):
172
    status = constants.JOB_STATUS_QUEUED
173

    
174
    all_success = True
175
    for op in self._ops:
176
      op_status = op.GetStatus()
177
      if op_status == constants.OP_STATUS_SUCCESS:
178
        continue
179

    
180
      all_success = False
181

    
182
      if op_status == constants.OP_STATUS_QUEUED:
183
        pass
184
      elif op_status == constants.OP_STATUS_RUNNING:
185
        status = constants.JOB_STATUS_RUNNING
186
      elif op_status == constants.OP_STATUS_ERROR:
187
        status = constants.JOB_STATUS_ERROR
188
        # The whole job fails if one opcode failed
189
        break
190

    
191
    if all_success:
192
      status = constants.JOB_STATUS_SUCCESS
193

    
194
    return status
195

    
196
  @utils.LockedMethod
197
  def GetRunOpIndex(self):
198
    return self.run_op_index
199

    
200
  def Run(self, proc):
201
    """Job executor.
202

203
    This functions processes a this job in the context of given processor
204
    instance.
205

206
    Args:
207
    - proc: Ganeti Processor to run the job with
208

209
    """
210
    try:
211
      count = len(self._ops)
212
      for idx, op in enumerate(self._ops):
213
        try:
214
          logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
215

    
216
          self._lock.acquire()
217
          try:
218
            self.run_op_index = idx
219
          finally:
220
            self._lock.release()
221

    
222
          op.SetStatus(constants.OP_STATUS_RUNNING, None)
223
          self.storage.UpdateJob(self)
224

    
225
          result = proc.ExecOpCode(op.input, op.Log)
226

    
227
          op.SetStatus(constants.OP_STATUS_SUCCESS, result)
228
          self.storage.UpdateJob(self)
229
          logging.debug("Op %s/%s: Successfully finished %s",
230
                        idx + 1, count, op)
231
        except Exception, err:
232
          try:
233
            op.SetStatus(constants.OP_STATUS_ERROR, str(err))
234
            logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
235
          finally:
236
            self.storage.UpdateJob(self)
237
          raise
238

    
239
    except errors.GenericError, err:
240
      logging.error("ganeti exception %s", exc_info=err)
241
    except Exception, err:
242
      logging.error("unhandled exception %s", exc_info=err)
243
    except:
244
      logging.error("unhandled unknown exception %s", exc_info=err)
245

    
246

    
247
class _JobQueueWorker(workerpool.BaseWorker):
248
  def RunTask(self, job):
249
    logging.debug("Worker %s processing job %s",
250
                  self.worker_id, job.id)
251
    # TODO: feedback function
252
    proc = mcpu.Processor(self.pool.context)
253
    try:
254
      job.Run(proc)
255
    finally:
256
      logging.debug("Worker %s finished job %s, status = %s",
257
                    self.worker_id, job.id, job.GetStatus())
258

    
259

    
260
class _JobQueueWorkerPool(workerpool.WorkerPool):
261
  def __init__(self, context):
262
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
263
                                              _JobQueueWorker)
264
    self.context = context
265

    
266

    
267
class JobStorageBase(object):
268
  def __init__(self, id_prefix):
269
    self.id_prefix = id_prefix
270

    
271
    if id_prefix:
272
      prefix_pattern = re.escape("%s-" % id_prefix)
273
    else:
274
      prefix_pattern = ""
275

    
276
    # Apart from the prefix, all job IDs are numeric
277
    self._re_job_id = re.compile(r"^%s\d+$" % prefix_pattern)
278

    
279
  def OwnsJobId(self, job_id):
280
    return self._re_job_id.match(job_id)
281

    
282
  def FormatJobID(self, job_id):
283
    if not isinstance(job_id, (int, long)):
284
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
285
    if job_id < 0:
286
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
287

    
288
    if self.id_prefix:
289
      prefix = "%s-" % self.id_prefix
290
    else:
291
      prefix = ""
292

    
293
    return "%s%010d" % (prefix, job_id)
294

    
295
  def _ShouldJobBeArchivedUnlocked(self, job):
296
    if job.GetStatus() not in (constants.JOB_STATUS_CANCELED,
297
                               constants.JOB_STATUS_SUCCESS,
298
                               constants.JOB_STATUS_ERROR):
299
      logging.debug("Job %s is not yet done", job.id)
300
      return False
301
    return True
302

    
303

    
304
class DiskJobStorage(JobStorageBase):
305
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
306

    
307
  def __init__(self, id_prefix):
308
    JobStorageBase.__init__(self, id_prefix)
309

    
310
    self._lock = threading.Lock()
311
    self._memcache = {}
312
    self._my_hostname = utils.HostInfo().name
313

    
314
    # Make sure our directories exists
315
    for path in (constants.QUEUE_DIR, constants.JOB_QUEUE_ARCHIVE_DIR):
316
      try:
317
        os.mkdir(path, 0700)
318
      except OSError, err:
319
        if err.errno not in (errno.EEXIST, ):
320
          raise
321

    
322
    # Get queue lock
323
    self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w")
324
    try:
325
      utils.LockFile(self.lock_fd)
326
    except:
327
      self.lock_fd.close()
328
      raise
329

    
330
    # Read version
331
    try:
332
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
333
    except IOError, err:
334
      if err.errno not in (errno.ENOENT, ):
335
        raise
336

    
337
      # Setup a new queue
338
      self._InitQueueUnlocked()
339

    
340
      # Try to open again
341
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
342

    
343
    try:
344
      # Try to read version
345
      version = int(version_fd.read(128))
346

    
347
      # Verify version
348
      if version != constants.JOB_QUEUE_VERSION:
349
        raise errors.JobQueueError("Found version %s, expected %s",
350
                                   version, constants.JOB_QUEUE_VERSION)
351
    finally:
352
      version_fd.close()
353

    
354
    self._last_serial = self._ReadSerial()
355
    if self._last_serial is None:
356
      raise errors.ConfigurationError("Can't read/parse the job queue serial"
357
                                      " file")
358

    
359
  @staticmethod
360
  def _ReadSerial():
361
    """Try to read the job serial file.
362

363
    @rtype: None or int
364
    @return: If the serial can be read, then it is returned. Otherwise None
365
             is returned.
366

367
    """
368
    try:
369
      serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r")
370
      try:
371
        # Read last serial
372
        serial = int(serial_fd.read(1024).strip())
373
      finally:
374
        serial_fd.close()
375
    except (ValueError, EnvironmentError):
376
      serial = None
377

    
378
    return serial
379

    
380
  def Close(self):
381
    assert self.lock_fd, "Queue should be open"
382

    
383
    self.lock_fd.close()
384
    self.lock_fd = None
385

    
386
  def _InitQueueUnlocked(self):
387
    assert self.lock_fd, "Queue should be open"
388

    
389
    utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
390
                    data="%s\n" % constants.JOB_QUEUE_VERSION)
391
    if self._ReadSerial() is None:
392
      utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
393
                      data="%s\n" % 0)
394

    
395
  def _NewSerialUnlocked(self, nodes):
396
    """Generates a new job identifier.
397

398
    Job identifiers are unique during the lifetime of a cluster.
399

400
    Returns: A string representing the job identifier.
401

402
    """
403
    assert self.lock_fd, "Queue should be open"
404

    
405
    # New number
406
    serial = self._last_serial + 1
407

    
408
    # Write to file
409
    utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
410
                    data="%s\n" % serial)
411

    
412
    # Keep it only if we were able to write the file
413
    self._last_serial = serial
414

    
415
    # Distribute the serial to the other nodes
416
    try:
417
      nodes.remove(self._my_hostname)
418
    except ValueError:
419
      pass
420

    
421
    result = rpc.call_upload_file(nodes, constants.JOB_QUEUE_SERIAL_FILE)
422
    for node in nodes:
423
      if not result[node]:
424
        logging.error("copy of job queue file to node %s failed", node)
425

    
426
    return self.FormatJobID(serial)
427

    
428
  def _GetJobPath(self, job_id):
429
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
430

    
431
  def _GetArchivedJobPath(self, job_id):
432
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
433

    
434
  def _ExtractJobID(self, name):
435
    m = self._RE_JOB_FILE.match(name)
436
    if m:
437
      return m.group(1)
438
    else:
439
      return None
440

    
441
  def _GetJobIDsUnlocked(self, archived=False):
442
    """Return all known job IDs.
443

444
    If the parameter archived is True, archived jobs IDs will be
445
    included. Currently this argument is unused.
446

447
    The method only looks at disk because it's a requirement that all
448
    jobs are present on disk (so in the _memcache we don't have any
449
    extra IDs).
450

451
    """
452
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
453
    jlist.sort()
454
    return jlist
455

    
456
  def _ListJobFiles(self):
457
    assert self.lock_fd, "Queue should be open"
458

    
459
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
460
            if self._RE_JOB_FILE.match(name)]
461

    
462
  def _LoadJobUnlocked(self, job_id):
463
    assert self.lock_fd, "Queue should be open"
464

    
465
    if job_id in self._memcache:
466
      logging.debug("Found job %s in memcache", job_id)
467
      return self._memcache[job_id]
468

    
469
    filepath = self._GetJobPath(job_id)
470
    logging.debug("Loading job from %s", filepath)
471
    try:
472
      fd = open(filepath, "r")
473
    except IOError, err:
474
      if err.errno in (errno.ENOENT, ):
475
        return None
476
      raise
477
    try:
478
      data = serializer.LoadJson(fd.read())
479
    finally:
480
      fd.close()
481

    
482
    job = _QueuedJob.Restore(self, data)
483
    self._memcache[job_id] = job
484
    logging.debug("Added job %s to the cache", job_id)
485
    return job
486

    
487
  def _GetJobsUnlocked(self, job_ids):
488
    if not job_ids:
489
      job_ids = self._GetJobIDsUnlocked()
490

    
491
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
492

    
493
  @utils.LockedMethod
494
  def GetJobs(self, job_ids):
495
    return self._GetJobsUnlocked(job_ids)
496

    
497
  @utils.LockedMethod
498
  def AddJob(self, ops, nodes):
499
    """Create and store on disk a new job.
500

501
    @type ops: list
502
    @param ops: The list of OpCodes that will become the new job.
503
    @type nodes: list
504
    @param nodes: The list of nodes to which the new job serial will be
505
                  distributed.
506

507
    """
508
    assert self.lock_fd, "Queue should be open"
509

    
510
    # Get job identifier
511
    job_id = self._NewSerialUnlocked(nodes)
512
    job = _QueuedJob(self, job_id, ops)
513

    
514
    # Write to disk
515
    self._UpdateJobUnlocked(job)
516

    
517
    logging.debug("Added new job %s to the cache", job_id)
518
    self._memcache[job_id] = job
519

    
520
    return job
521

    
522
  def _UpdateJobUnlocked(self, job):
523
    assert self.lock_fd, "Queue should be open"
524

    
525
    filename = self._GetJobPath(job.id)
526
    logging.debug("Writing job %s to %s", job.id, filename)
527
    utils.WriteFile(filename,
528
                    data=serializer.DumpJson(job.Serialize(), indent=False))
529
    self._CleanCacheUnlocked([job.id])
530

    
531
  def _CleanCacheUnlocked(self, exclude):
532
    """Clean the memory cache.
533

534
    The exceptions argument contains job IDs that should not be
535
    cleaned.
536

537
    """
538
    assert isinstance(exclude, list)
539
    for job in self._memcache.values():
540
      if job.id in exclude:
541
        continue
542
      if job.GetStatus() not in (constants.JOB_STATUS_QUEUED,
543
                                 constants.JOB_STATUS_RUNNING):
544
        logging.debug("Cleaning job %s from the cache", job.id)
545
        try:
546
          del self._memcache[job.id]
547
        except KeyError:
548
          pass
549

    
550
  @utils.LockedMethod
551
  def UpdateJob(self, job):
552
    return self._UpdateJobUnlocked(job)
553

    
554
  @utils.LockedMethod
555
  def ArchiveJob(self, job_id):
556
    """Archives a job.
557

558
    @type job_id: string
559
    @param job_id: Job ID of job to be archived.
560

561
    """
562
    logging.debug("Archiving job %s", job_id)
563

    
564
    job = self._LoadJobUnlocked(job_id)
565
    if not job:
566
      logging.debug("Job %s not found", job_id)
567
      return
568

    
569
    if not self._ShouldJobBeArchivedUnlocked(job):
570
      return
571

    
572
    try:
573
      old = self._GetJobPath(job.id)
574
      new = self._GetArchivedJobPath(job.id)
575

    
576
      os.rename(old, new)
577

    
578
      logging.debug("Successfully archived job %s", job.id)
579
    finally:
580
      # Cleaning the cache because we don't know what os.rename actually did
581
      # and to be on the safe side.
582
      self._CleanCacheUnlocked([])
583

    
584

    
585
class JobQueue:
586
  """The job queue.
587

588
  """
589
  def __init__(self, context):
590
    self._lock = threading.Lock()
591
    self._jobs = DiskJobStorage("")
592
    self._wpool = _JobQueueWorkerPool(context)
593

    
594
    for job in self._jobs.GetJobs(None):
595
      status = job.GetStatus()
596
      if status in (constants.JOB_STATUS_QUEUED, ):
597
        self._wpool.AddTask(job)
598

    
599
      elif status in (constants.JOB_STATUS_RUNNING, ):
600
        logging.warning("Unfinished job %s found: %s", job.id, job)
601
        job.SetUnclean("Unclean master daemon shutdown")
602

    
603
  @utils.LockedMethod
604
  def SubmitJob(self, ops, nodes):
605
    """Add a new job to the queue.
606

607
    This enters the job into our job queue and also puts it on the new
608
    queue, in order for it to be picked up by the queue processors.
609

610
    @type ops: list
611
    @param ops: the sequence of opcodes that will become the new job
612
    @type nodes: list
613
    @param nodes: the list of nodes to which the queue should be
614
                  distributed
615

616
    """
617
    job = self._jobs.AddJob(ops, nodes)
618

    
619
    # Add to worker pool
620
    self._wpool.AddTask(job)
621

    
622
    return job.id
623

    
624
  def ArchiveJob(self, job_id):
625
    self._jobs.ArchiveJob(job_id)
626

    
627
  def CancelJob(self, job_id):
628
    raise NotImplementedError()
629

    
630
  def _GetJobInfo(self, job, fields):
631
    row = []
632
    for fname in fields:
633
      if fname == "id":
634
        row.append(job.id)
635
      elif fname == "status":
636
        row.append(job.GetStatus())
637
      elif fname == "ops":
638
        row.append([op.GetInput().__getstate__() for op in job._ops])
639
      elif fname == "opresult":
640
        row.append([op.GetResult() for op in job._ops])
641
      elif fname == "opstatus":
642
        row.append([op.GetStatus() for op in job._ops])
643
      elif fname == "ticker":
644
        ji = job.GetRunOpIndex()
645
        if ji < 0:
646
          lmsg = None
647
        else:
648
          lmsg = job._ops[ji].RetrieveLog(-1)
649
          # message might be empty here
650
          if lmsg:
651
            lmsg = lmsg[0]
652
          else:
653
            lmsg = None
654
        row.append(lmsg)
655
      else:
656
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
657
    return row
658

    
659
  def QueryJobs(self, job_ids, fields):
660
    """Returns a list of jobs in queue.
661

662
    Args:
663
    - job_ids: Sequence of job identifiers or None for all
664
    - fields: Names of fields to return
665

666
    """
667
    self._lock.acquire()
668
    try:
669
      jobs = []
670

    
671
      for job in self._jobs.GetJobs(job_ids):
672
        if job is None:
673
          jobs.append(None)
674
        else:
675
          jobs.append(self._GetJobInfo(job, fields))
676

    
677
      return jobs
678
    finally:
679
      self._lock.release()
680

    
681
  @utils.LockedMethod
682
  def Shutdown(self):
683
    """Stops the job queue.
684

685
    """
686
    self._wpool.TerminateWorkers()
687
    self._jobs.Close()