Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 5bdce580

History | View | Annotate | Download (17.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling."""
23

    
24
import os
25
import logging
26
import threading
27
import errno
28
import re
29
import time
30

    
31
from ganeti import constants
32
from ganeti import serializer
33
from ganeti import workerpool
34
from ganeti import opcodes
35
from ganeti import errors
36
from ganeti import mcpu
37
from ganeti import utils
38
from ganeti import rpc
39

    
40

    
41
JOBQUEUE_THREADS = 5
42

    
43

    
44
class _QueuedOpCode(object):
45
  """Encasulates an opcode object.
46

47
  Access is synchronized by the '_lock' attribute.
48

49
  The 'log' attribute holds the execution log and consists of tuples
50
  of the form (timestamp, level, message).
51

52
  """
53
  def __new__(cls, *args, **kwargs):
54
    obj = object.__new__(cls, *args, **kwargs)
55
    # Create a special lock for logging
56
    obj._log_lock = threading.Lock()
57
    return obj
58

    
59
  def __init__(self, op):
60
    self.input = op
61
    self.status = constants.OP_STATUS_QUEUED
62
    self.result = None
63
    self.log = []
64

    
65
  @classmethod
66
  def Restore(cls, state):
67
    obj = _QueuedOpCode.__new__(cls)
68
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
69
    obj.status = state["status"]
70
    obj.result = state["result"]
71
    obj.log = state["log"]
72
    return obj
73

    
74
  def Serialize(self):
75
    self._log_lock.acquire()
76
    try:
77
      return {
78
        "input": self.input.__getstate__(),
79
        "status": self.status,
80
        "result": self.result,
81
        "log": self.log,
82
        }
83
    finally:
84
      self._log_lock.release()
85

    
86
  def Log(self, *args):
87
    """Append a log entry.
88

89
    """
90
    assert len(args) < 3
91

    
92
    if len(args) == 1:
93
      log_type = constants.ELOG_MESSAGE
94
      log_msg = args[0]
95
    else:
96
      log_type, log_msg = args
97

    
98
    self._log_lock.acquire()
99
    try:
100
      self.log.append((time.time(), log_type, log_msg))
101
    finally:
102
      self._log_lock.release()
103

    
104
  def RetrieveLog(self, start_at=0):
105
    """Retrieve (a part of) the execution log.
106

107
    """
108
    self._log_lock.acquire()
109
    try:
110
      return self.log[start_at:]
111
    finally:
112
      self._log_lock.release()
113

    
114

    
115
class _QueuedJob(object):
116
  """In-memory job representation.
117

118
  This is what we use to track the user-submitted jobs.
119

120
  """
121
  def __init__(self, queue, job_id, ops):
122
    if not ops:
123
      # TODO
124
      raise Exception("No opcodes")
125

    
126
    self.queue = queue
127
    self.id = job_id
128
    self.ops = [_QueuedOpCode(op) for op in ops]
129
    self.run_op_index = -1
130

    
131
  @classmethod
132
  def Restore(cls, queue, state):
133
    obj = _QueuedJob.__new__(cls)
134
    obj.queue = queue
135
    obj.id = state["id"]
136
    obj.ops = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
137
    obj.run_op_index = state["run_op_index"]
138
    return obj
139

    
140
  def Serialize(self):
141
    return {
142
      "id": self.id,
143
      "ops": [op.Serialize() for op in self.ops],
144
      "run_op_index": self.run_op_index,
145
      }
146

    
147
  def CalcStatus(self):
148
    status = constants.JOB_STATUS_QUEUED
149

    
150
    all_success = True
151
    for op in self.ops:
152
      if op.status == constants.OP_STATUS_SUCCESS:
153
        continue
154

    
155
      all_success = False
156

    
157
      if op.status == constants.OP_STATUS_QUEUED:
158
        pass
159
      elif op.status == constants.OP_STATUS_RUNNING:
160
        status = constants.JOB_STATUS_RUNNING
161
      elif op.status == constants.OP_STATUS_ERROR:
162
        status = constants.JOB_STATUS_ERROR
163
        # The whole job fails if one opcode failed
164
        break
165
      elif op.status == constants.OP_STATUS_CANCELED:
166
        status = constants.OP_STATUS_CANCELED
167
        break
168

    
169
    if all_success:
170
      status = constants.JOB_STATUS_SUCCESS
171

    
172
    return status
173

    
174

    
175
class _JobQueueWorker(workerpool.BaseWorker):
176
  def RunTask(self, job):
177
    """Job executor.
178

179
    This functions processes a job.
180

181
    """
182
    logging.debug("Worker %s processing job %s",
183
                  self.worker_id, job.id)
184
    proc = mcpu.Processor(self.pool.queue.context)
185
    queue = job.queue
186
    try:
187
      try:
188
        count = len(job.ops)
189
        for idx, op in enumerate(job.ops):
190
          try:
191
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
192

    
193
            queue.acquire()
194
            try:
195
              job.run_op_index = idx
196
              op.status = constants.OP_STATUS_RUNNING
197
              op.result = None
198
              queue.UpdateJobUnlocked(job)
199

    
200
              input_opcode = op.input
201
            finally:
202
              queue.release()
203

    
204
            result = proc.ExecOpCode(input_opcode, op.Log)
205

    
206
            queue.acquire()
207
            try:
208
              op.status = constants.OP_STATUS_SUCCESS
209
              op.result = result
210
              queue.UpdateJobUnlocked(job)
211
            finally:
212
              queue.release()
213

    
214
            logging.debug("Op %s/%s: Successfully finished %s",
215
                          idx + 1, count, op)
216
          except Exception, err:
217
            queue.acquire()
218
            try:
219
              try:
220
                op.status = constants.OP_STATUS_ERROR
221
                op.result = str(err)
222
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
223
              finally:
224
                queue.UpdateJobUnlocked(job)
225
            finally:
226
              queue.release()
227
            raise
228

    
229
      except errors.GenericError, err:
230
        logging.exception("Ganeti exception")
231
      except:
232
        logging.exception("Unhandled exception")
233
    finally:
234
      queue.acquire()
235
      try:
236
        job_id = job.id
237
        status = job.CalcStatus()
238
      finally:
239
        queue.release()
240
      logging.debug("Worker %s finished job %s, status = %s",
241
                    self.worker_id, job_id, status)
242

    
243

    
244
class _JobQueueWorkerPool(workerpool.WorkerPool):
245
  def __init__(self, queue):
246
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
247
                                              _JobQueueWorker)
248
    self.queue = queue
249

    
250

    
251
class JobQueue(object):
252
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
253

    
254
  def __init__(self, context):
255
    self.context = context
256
    self._memcache = {}
257
    self._my_hostname = utils.HostInfo().name
258

    
259
    # Locking
260
    self._lock = threading.Lock()
261
    self.acquire = self._lock.acquire
262
    self.release = self._lock.release
263

    
264
    # Make sure our directories exists
265
    for path in (constants.QUEUE_DIR, constants.JOB_QUEUE_ARCHIVE_DIR):
266
      try:
267
        os.mkdir(path, 0700)
268
      except OSError, err:
269
        if err.errno not in (errno.EEXIST, ):
270
          raise
271

    
272
    # Get queue lock
273
    self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w")
274
    try:
275
      utils.LockFile(self.lock_fd)
276
    except:
277
      self.lock_fd.close()
278
      raise
279

    
280
    # Read version
281
    try:
282
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
283
    except IOError, err:
284
      if err.errno not in (errno.ENOENT, ):
285
        raise
286

    
287
      # Setup a new queue
288
      self._InitQueueUnlocked()
289

    
290
      # Try to open again
291
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
292

    
293
    try:
294
      # Try to read version
295
      version = int(version_fd.read(128))
296

    
297
      # Verify version
298
      if version != constants.JOB_QUEUE_VERSION:
299
        raise errors.JobQueueError("Found version %s, expected %s",
300
                                   version, constants.JOB_QUEUE_VERSION)
301
    finally:
302
      version_fd.close()
303

    
304
    self._last_serial = self._ReadSerial()
305
    if self._last_serial is None:
306
      raise errors.ConfigurationError("Can't read/parse the job queue serial"
307
                                      " file")
308

    
309
    # Setup worker pool
310
    self._wpool = _JobQueueWorkerPool(self)
311

    
312
    # We need to lock here because WorkerPool.AddTask() may start a job while
313
    # we're still doing our work.
314
    self.acquire()
315
    try:
316
      for job in self._GetJobsUnlocked(None):
317
        status = job.CalcStatus()
318

    
319
        if status in (constants.JOB_STATUS_QUEUED, ):
320
          self._wpool.AddTask(job)
321

    
322
        elif status in (constants.JOB_STATUS_RUNNING, ):
323
          logging.warning("Unfinished job %s found: %s", job.id, job)
324
          try:
325
            for op in job.ops:
326
              op.status = constants.OP_STATUS_ERROR
327
              op.result = "Unclean master daemon shutdown"
328
          finally:
329
            self.UpdateJobUnlocked(job)
330
    finally:
331
      self.release()
332

    
333
  @staticmethod
334
  def _ReadSerial():
335
    """Try to read the job serial file.
336

337
    @rtype: None or int
338
    @return: If the serial can be read, then it is returned. Otherwise None
339
             is returned.
340

341
    """
342
    try:
343
      serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r")
344
      try:
345
        # Read last serial
346
        serial = int(serial_fd.read(1024).strip())
347
      finally:
348
        serial_fd.close()
349
    except (ValueError, EnvironmentError):
350
      serial = None
351

    
352
    return serial
353

    
354
  def _InitQueueUnlocked(self):
355
    assert self.lock_fd, "Queue should be open"
356

    
357
    utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
358
                    data="%s\n" % constants.JOB_QUEUE_VERSION)
359
    if self._ReadSerial() is None:
360
      utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
361
                      data="%s\n" % 0)
362

    
363
  def _FormatJobID(self, job_id):
364
    if not isinstance(job_id, (int, long)):
365
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
366
    if job_id < 0:
367
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
368

    
369
    return str(job_id)
370

    
371
  def _NewSerialUnlocked(self, nodes):
372
    """Generates a new job identifier.
373

374
    Job identifiers are unique during the lifetime of a cluster.
375

376
    Returns: A string representing the job identifier.
377

378
    """
379
    assert self.lock_fd, "Queue should be open"
380

    
381
    # New number
382
    serial = self._last_serial + 1
383

    
384
    # Write to file
385
    utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
386
                    data="%s\n" % serial)
387

    
388
    # Keep it only if we were able to write the file
389
    self._last_serial = serial
390

    
391
    # Distribute the serial to the other nodes
392
    try:
393
      nodes.remove(self._my_hostname)
394
    except ValueError:
395
      pass
396

    
397
    result = rpc.call_upload_file(nodes, constants.JOB_QUEUE_SERIAL_FILE)
398
    for node in nodes:
399
      if not result[node]:
400
        logging.error("copy of job queue file to node %s failed", node)
401

    
402
    return self._FormatJobID(serial)
403

    
404
  @staticmethod
405
  def _GetJobPath(job_id):
406
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
407

    
408
  @staticmethod
409
  def _GetArchivedJobPath(job_id):
410
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
411

    
412
  @classmethod
413
  def _ExtractJobID(cls, name):
414
    m = cls._RE_JOB_FILE.match(name)
415
    if m:
416
      return m.group(1)
417
    else:
418
      return None
419

    
420
  def _GetJobIDsUnlocked(self, archived=False):
421
    """Return all known job IDs.
422

423
    If the parameter archived is True, archived jobs IDs will be
424
    included. Currently this argument is unused.
425

426
    The method only looks at disk because it's a requirement that all
427
    jobs are present on disk (so in the _memcache we don't have any
428
    extra IDs).
429

430
    """
431
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
432
    jlist.sort()
433
    return jlist
434

    
435
  def _ListJobFiles(self):
436
    assert self.lock_fd, "Queue should be open"
437

    
438
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
439
            if self._RE_JOB_FILE.match(name)]
440

    
441
  def _LoadJobUnlocked(self, job_id):
442
    assert self.lock_fd, "Queue should be open"
443

    
444
    if job_id in self._memcache:
445
      logging.debug("Found job %s in memcache", job_id)
446
      return self._memcache[job_id]
447

    
448
    filepath = self._GetJobPath(job_id)
449
    logging.debug("Loading job from %s", filepath)
450
    try:
451
      fd = open(filepath, "r")
452
    except IOError, err:
453
      if err.errno in (errno.ENOENT, ):
454
        return None
455
      raise
456
    try:
457
      data = serializer.LoadJson(fd.read())
458
    finally:
459
      fd.close()
460

    
461
    job = _QueuedJob.Restore(self, data)
462
    self._memcache[job_id] = job
463
    logging.debug("Added job %s to the cache", job_id)
464
    return job
465

    
466
  def _GetJobsUnlocked(self, job_ids):
467
    if not job_ids:
468
      job_ids = self._GetJobIDsUnlocked()
469

    
470
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
471

    
472
  @utils.LockedMethod
473
  def SubmitJob(self, ops, nodes):
474
    """Create and store a new job.
475

476
    This enters the job into our job queue and also puts it on the new
477
    queue, in order for it to be picked up by the queue processors.
478

479
    @type ops: list
480
    @param ops: The list of OpCodes that will become the new job.
481
    @type nodes: list
482
    @param nodes: The list of nodes to which the new job serial will be
483
                  distributed.
484

485
    """
486
    assert self.lock_fd, "Queue should be open"
487

    
488
    # Get job identifier
489
    job_id = self._NewSerialUnlocked(nodes)
490
    job = _QueuedJob(self, job_id, ops)
491

    
492
    # Write to disk
493
    self.UpdateJobUnlocked(job)
494

    
495
    logging.debug("Added new job %s to the cache", job_id)
496
    self._memcache[job_id] = job
497

    
498
    # Add to worker pool
499
    self._wpool.AddTask(job)
500

    
501
    return job.id
502

    
503
  def UpdateJobUnlocked(self, job):
504
    assert self.lock_fd, "Queue should be open"
505

    
506
    filename = self._GetJobPath(job.id)
507
    logging.debug("Writing job %s to %s", job.id, filename)
508
    utils.WriteFile(filename,
509
                    data=serializer.DumpJson(job.Serialize(), indent=False))
510
    self._CleanCacheUnlocked([job.id])
511

    
512
  def _CleanCacheUnlocked(self, exclude):
513
    """Clean the memory cache.
514

515
    The exceptions argument contains job IDs that should not be
516
    cleaned.
517

518
    """
519
    assert isinstance(exclude, list)
520

    
521
    for job in self._memcache.values():
522
      if job.id in exclude:
523
        continue
524
      if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,
525
                                  constants.JOB_STATUS_RUNNING):
526
        logging.debug("Cleaning job %s from the cache", job.id)
527
        try:
528
          del self._memcache[job.id]
529
        except KeyError:
530
          pass
531

    
532
  @utils.LockedMethod
533
  def CancelJob(self, job_id):
534
    """Cancels a job.
535

536
    @type job_id: string
537
    @param job_id: Job ID of job to be cancelled.
538

539
    """
540
    logging.debug("Cancelling job %s", job_id)
541

    
542
    job = self._LoadJobUnlocked(job_id)
543
    if not job:
544
      logging.debug("Job %s not found", job_id)
545
      return
546

    
547
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
548
      logging.debug("Job %s is no longer in the queue", job.id)
549
      return
550

    
551
    try:
552
      for op in job.ops:
553
        op.status = constants.OP_STATUS_ERROR
554
        op.result = "Job cancelled by request"
555
    finally:
556
      self.UpdateJobUnlocked(job)
557

    
558
  @utils.LockedMethod
559
  def ArchiveJob(self, job_id):
560
    """Archives a job.
561

562
    @type job_id: string
563
    @param job_id: Job ID of job to be archived.
564

565
    """
566
    logging.debug("Archiving job %s", job_id)
567

    
568
    job = self._LoadJobUnlocked(job_id)
569
    if not job:
570
      logging.debug("Job %s not found", job_id)
571
      return
572

    
573
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
574
                                constants.JOB_STATUS_SUCCESS,
575
                                constants.JOB_STATUS_ERROR):
576
      logging.debug("Job %s is not yet done", job.id)
577
      return
578

    
579
    try:
580
      old = self._GetJobPath(job.id)
581
      new = self._GetArchivedJobPath(job.id)
582

    
583
      os.rename(old, new)
584

    
585
      logging.debug("Successfully archived job %s", job.id)
586
    finally:
587
      # Cleaning the cache because we don't know what os.rename actually did
588
      # and to be on the safe side.
589
      self._CleanCacheUnlocked([])
590

    
591
  def _GetJobInfoUnlocked(self, job, fields):
592
    row = []
593
    for fname in fields:
594
      if fname == "id":
595
        row.append(job.id)
596
      elif fname == "status":
597
        row.append(job.CalcStatus())
598
      elif fname == "ops":
599
        row.append([op.input.__getstate__() for op in job.ops])
600
      elif fname == "opresult":
601
        row.append([op.result for op in job.ops])
602
      elif fname == "opstatus":
603
        row.append([op.status for op in job.ops])
604
      elif fname == "ticker":
605
        ji = job.run_op_index
606
        if ji < 0:
607
          lmsg = None
608
        else:
609
          lmsg = job.ops[ji].RetrieveLog(-1)
610
          # message might be empty here
611
          if lmsg:
612
            lmsg = lmsg[0]
613
          else:
614
            lmsg = None
615
        row.append(lmsg)
616
      else:
617
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
618
    return row
619

    
620
  @utils.LockedMethod
621
  def QueryJobs(self, job_ids, fields):
622
    """Returns a list of jobs in queue.
623

624
    Args:
625
    - job_ids: Sequence of job identifiers or None for all
626
    - fields: Names of fields to return
627

628
    """
629
    jobs = []
630

    
631
    for job in self._GetJobsUnlocked(job_ids):
632
      if job is None:
633
        jobs.append(None)
634
      else:
635
        jobs.append(self._GetJobInfoUnlocked(job, fields))
636

    
637
    return jobs
638

    
639
  @utils.LockedMethod
640
  def Shutdown(self):
641
    """Stops the job queue.
642

643
    """
644
    assert self.lock_fd, "Queue should be open"
645

    
646
    self._wpool.TerminateWorkers()
647

    
648
    self.lock_fd.close()
649
    self.lock_fd = None