Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ d9f311d7

History | View | Annotate | Download (18.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling."""
23

    
24
import os
25
import logging
26
import threading
27
import errno
28
import re
29
import time
30

    
31
from ganeti import constants
32
from ganeti import serializer
33
from ganeti import workerpool
34
from ganeti import opcodes
35
from ganeti import errors
36
from ganeti import mcpu
37
from ganeti import utils
38
from ganeti import rpc
39

    
40

    
41
JOBQUEUE_THREADS = 5
42

    
43

    
44
class _QueuedOpCode(object):
45
  """Encasulates an opcode object.
46

47
  Access is synchronized by the '_lock' attribute.
48

49
  The 'log' attribute holds the execution log and consists of tuples
50
  of the form (timestamp, level, message).
51

52
  """
53
  def __init__(self, op):
54
    self.__Setup(op, constants.OP_STATUS_QUEUED, None, [])
55

    
56
  def __Setup(self, input_, status, result, log):
57
    self._lock = threading.Lock()
58
    self.input = input_
59
    self.status = status
60
    self.result = result
61
    self.log = log
62

    
63
  @classmethod
64
  def Restore(cls, state):
65
    obj = object.__new__(cls)
66
    obj.__Setup(opcodes.OpCode.LoadOpCode(state["input"]),
67
                state["status"], state["result"], state["log"])
68
    return obj
69

    
70
  @utils.LockedMethod
71
  def Serialize(self):
72
    return {
73
      "input": self.input.__getstate__(),
74
      "status": self.status,
75
      "result": self.result,
76
      "log": self.log,
77
      }
78

    
79
  @utils.LockedMethod
80
  def GetInput(self):
81
    """Returns the original opcode.
82

83
    """
84
    return self.input
85

    
86
  @utils.LockedMethod
87
  def SetStatus(self, status, result):
88
    """Update the opcode status and result.
89

90
    """
91
    self.status = status
92
    self.result = result
93

    
94
  @utils.LockedMethod
95
  def GetStatus(self):
96
    """Get the opcode status.
97

98
    """
99
    return self.status
100

    
101
  @utils.LockedMethod
102
  def GetResult(self):
103
    """Get the opcode result.
104

105
    """
106
    return self.result
107

    
108
  @utils.LockedMethod
109
  def Log(self, *args):
110
    """Append a log entry.
111

112
    """
113
    assert len(args) < 2
114

    
115
    if len(args) == 1:
116
      log_type = constants.ELOG_MESSAGE
117
      log_msg = args[0]
118
    else:
119
      log_type, log_msg = args
120
    self.log.append((time.time(), log_type, log_msg))
121

    
122
  @utils.LockedMethod
123
  def RetrieveLog(self, start_at=0):
124
    """Retrieve (a part of) the execution log.
125

126
    """
127
    return self.log[start_at:]
128

    
129

    
130
class _QueuedJob(object):
131
  """In-memory job representation.
132

133
  This is what we use to track the user-submitted jobs.
134

135
  """
136
  def __init__(self, storage, job_id, ops):
137
    if not ops:
138
      # TODO
139
      raise Exception("No opcodes")
140

    
141
    self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops], -1)
142

    
143
  def __Setup(self, storage, job_id, ops, run_op_index):
144
    self._lock = threading.Lock()
145
    self.storage = storage
146
    self.id = job_id
147
    self._ops = ops
148
    self.run_op_index = run_op_index
149

    
150
  @classmethod
151
  def Restore(cls, storage, state):
152
    obj = object.__new__(cls)
153
    op_list = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
154
    obj.__Setup(storage, state["id"], op_list, state["run_op_index"])
155
    return obj
156

    
157
  def Serialize(self):
158
    return {
159
      "id": self.id,
160
      "ops": [op.Serialize() for op in self._ops],
161
      "run_op_index": self.run_op_index,
162
      }
163

    
164
  def _SetStatus(self, status, msg):
165
    try:
166
      for op in self._ops:
167
        op.SetStatus(status, msg)
168
    finally:
169
      self.storage.UpdateJob(self)
170

    
171
  def SetUnclean(self, msg):
172
    return self._SetStatus(constants.OP_STATUS_ERROR, msg)
173

    
174
  def SetCanceled(self, msg):
175
    return self._SetStatus(constants.JOB_STATUS_CANCELED, msg)
176

    
177
  def GetStatus(self):
178
    status = constants.JOB_STATUS_QUEUED
179

    
180
    all_success = True
181
    for op in self._ops:
182
      op_status = op.GetStatus()
183
      if op_status == constants.OP_STATUS_SUCCESS:
184
        continue
185

    
186
      all_success = False
187

    
188
      if op_status == constants.OP_STATUS_QUEUED:
189
        pass
190
      elif op_status == constants.OP_STATUS_RUNNING:
191
        status = constants.JOB_STATUS_RUNNING
192
      elif op_status == constants.OP_STATUS_ERROR:
193
        status = constants.JOB_STATUS_ERROR
194
        # The whole job fails if one opcode failed
195
        break
196
      elif op_status == constants.OP_STATUS_CANCELED:
197
        status = constants.OP_STATUS_CANCELED
198
        break
199

    
200
    if all_success:
201
      status = constants.JOB_STATUS_SUCCESS
202

    
203
    return status
204

    
205
  @utils.LockedMethod
206
  def GetRunOpIndex(self):
207
    return self.run_op_index
208

    
209
  def Run(self, proc):
210
    """Job executor.
211

212
    This functions processes a this job in the context of given processor
213
    instance.
214

215
    Args:
216
    - proc: Ganeti Processor to run the job with
217

218
    """
219
    try:
220
      count = len(self._ops)
221
      for idx, op in enumerate(self._ops):
222
        try:
223
          logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
224

    
225
          self._lock.acquire()
226
          try:
227
            self.run_op_index = idx
228
          finally:
229
            self._lock.release()
230

    
231
          op.SetStatus(constants.OP_STATUS_RUNNING, None)
232
          self.storage.UpdateJob(self)
233

    
234
          result = proc.ExecOpCode(op.input, op.Log)
235

    
236
          op.SetStatus(constants.OP_STATUS_SUCCESS, result)
237
          self.storage.UpdateJob(self)
238
          logging.debug("Op %s/%s: Successfully finished %s",
239
                        idx + 1, count, op)
240
        except Exception, err:
241
          try:
242
            op.SetStatus(constants.OP_STATUS_ERROR, str(err))
243
            logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
244
          finally:
245
            self.storage.UpdateJob(self)
246
          raise
247

    
248
    except errors.GenericError, err:
249
      logging.error("ganeti exception %s", exc_info=err)
250
    except Exception, err:
251
      logging.error("unhandled exception %s", exc_info=err)
252
    except:
253
      logging.error("unhandled unknown exception %s", exc_info=err)
254

    
255

    
256
class _JobQueueWorker(workerpool.BaseWorker):
257
  def RunTask(self, job):
258
    logging.debug("Worker %s processing job %s",
259
                  self.worker_id, job.id)
260
    # TODO: feedback function
261
    proc = mcpu.Processor(self.pool.context)
262
    try:
263
      job.Run(proc)
264
    finally:
265
      logging.debug("Worker %s finished job %s, status = %s",
266
                    self.worker_id, job.id, job.GetStatus())
267

    
268

    
269
class _JobQueueWorkerPool(workerpool.WorkerPool):
270
  def __init__(self, context):
271
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
272
                                              _JobQueueWorker)
273
    self.context = context
274

    
275

    
276
class JobStorageBase(object):
277
  def __init__(self, id_prefix):
278
    self.id_prefix = id_prefix
279

    
280
    if id_prefix:
281
      prefix_pattern = re.escape("%s-" % id_prefix)
282
    else:
283
      prefix_pattern = ""
284

    
285
    # Apart from the prefix, all job IDs are numeric
286
    self._re_job_id = re.compile(r"^%s\d+$" % prefix_pattern)
287

    
288
  def OwnsJobId(self, job_id):
289
    return self._re_job_id.match(job_id)
290

    
291
  def FormatJobID(self, job_id):
292
    if not isinstance(job_id, (int, long)):
293
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
294
    if job_id < 0:
295
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
296

    
297
    if self.id_prefix:
298
      prefix = "%s-" % self.id_prefix
299
    else:
300
      prefix = ""
301

    
302
    return "%s%010d" % (prefix, job_id)
303

    
304
  def _ShouldJobBeArchivedUnlocked(self, job):
305
    if job.GetStatus() not in (constants.JOB_STATUS_CANCELED,
306
                               constants.JOB_STATUS_SUCCESS,
307
                               constants.JOB_STATUS_ERROR):
308
      logging.debug("Job %s is not yet done", job.id)
309
      return False
310
    return True
311

    
312

    
313
class DiskJobStorage(JobStorageBase):
314
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
315

    
316
  def __init__(self, id_prefix):
317
    JobStorageBase.__init__(self, id_prefix)
318

    
319
    self._lock = threading.Lock()
320
    self._memcache = {}
321
    self._my_hostname = utils.HostInfo().name
322

    
323
    # Make sure our directories exists
324
    for path in (constants.QUEUE_DIR, constants.JOB_QUEUE_ARCHIVE_DIR):
325
      try:
326
        os.mkdir(path, 0700)
327
      except OSError, err:
328
        if err.errno not in (errno.EEXIST, ):
329
          raise
330

    
331
    # Get queue lock
332
    self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w")
333
    try:
334
      utils.LockFile(self.lock_fd)
335
    except:
336
      self.lock_fd.close()
337
      raise
338

    
339
    # Read version
340
    try:
341
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
342
    except IOError, err:
343
      if err.errno not in (errno.ENOENT, ):
344
        raise
345

    
346
      # Setup a new queue
347
      self._InitQueueUnlocked()
348

    
349
      # Try to open again
350
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
351

    
352
    try:
353
      # Try to read version
354
      version = int(version_fd.read(128))
355

    
356
      # Verify version
357
      if version != constants.JOB_QUEUE_VERSION:
358
        raise errors.JobQueueError("Found version %s, expected %s",
359
                                   version, constants.JOB_QUEUE_VERSION)
360
    finally:
361
      version_fd.close()
362

    
363
    self._last_serial = self._ReadSerial()
364
    if self._last_serial is None:
365
      raise errors.ConfigurationError("Can't read/parse the job queue serial"
366
                                      " file")
367

    
368
  @staticmethod
369
  def _ReadSerial():
370
    """Try to read the job serial file.
371

372
    @rtype: None or int
373
    @return: If the serial can be read, then it is returned. Otherwise None
374
             is returned.
375

376
    """
377
    try:
378
      serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r")
379
      try:
380
        # Read last serial
381
        serial = int(serial_fd.read(1024).strip())
382
      finally:
383
        serial_fd.close()
384
    except (ValueError, EnvironmentError):
385
      serial = None
386

    
387
    return serial
388

    
389
  def Close(self):
390
    assert self.lock_fd, "Queue should be open"
391

    
392
    self.lock_fd.close()
393
    self.lock_fd = None
394

    
395
  def _InitQueueUnlocked(self):
396
    assert self.lock_fd, "Queue should be open"
397

    
398
    utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
399
                    data="%s\n" % constants.JOB_QUEUE_VERSION)
400
    if self._ReadSerial() is None:
401
      utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
402
                      data="%s\n" % 0)
403

    
404
  def _NewSerialUnlocked(self, nodes):
405
    """Generates a new job identifier.
406

407
    Job identifiers are unique during the lifetime of a cluster.
408

409
    Returns: A string representing the job identifier.
410

411
    """
412
    assert self.lock_fd, "Queue should be open"
413

    
414
    # New number
415
    serial = self._last_serial + 1
416

    
417
    # Write to file
418
    utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
419
                    data="%s\n" % serial)
420

    
421
    # Keep it only if we were able to write the file
422
    self._last_serial = serial
423

    
424
    # Distribute the serial to the other nodes
425
    try:
426
      nodes.remove(self._my_hostname)
427
    except ValueError:
428
      pass
429

    
430
    result = rpc.call_upload_file(nodes, constants.JOB_QUEUE_SERIAL_FILE)
431
    for node in nodes:
432
      if not result[node]:
433
        logging.error("copy of job queue file to node %s failed", node)
434

    
435
    return self.FormatJobID(serial)
436

    
437
  def _GetJobPath(self, job_id):
438
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
439

    
440
  def _GetArchivedJobPath(self, job_id):
441
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
442

    
443
  def _ExtractJobID(self, name):
444
    m = self._RE_JOB_FILE.match(name)
445
    if m:
446
      return m.group(1)
447
    else:
448
      return None
449

    
450
  def _GetJobIDsUnlocked(self, archived=False):
451
    """Return all known job IDs.
452

453
    If the parameter archived is True, archived jobs IDs will be
454
    included. Currently this argument is unused.
455

456
    The method only looks at disk because it's a requirement that all
457
    jobs are present on disk (so in the _memcache we don't have any
458
    extra IDs).
459

460
    """
461
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
462
    jlist.sort()
463
    return jlist
464

    
465
  def _ListJobFiles(self):
466
    assert self.lock_fd, "Queue should be open"
467

    
468
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
469
            if self._RE_JOB_FILE.match(name)]
470

    
471
  def _LoadJobUnlocked(self, job_id):
472
    assert self.lock_fd, "Queue should be open"
473

    
474
    if job_id in self._memcache:
475
      logging.debug("Found job %s in memcache", job_id)
476
      return self._memcache[job_id]
477

    
478
    filepath = self._GetJobPath(job_id)
479
    logging.debug("Loading job from %s", filepath)
480
    try:
481
      fd = open(filepath, "r")
482
    except IOError, err:
483
      if err.errno in (errno.ENOENT, ):
484
        return None
485
      raise
486
    try:
487
      data = serializer.LoadJson(fd.read())
488
    finally:
489
      fd.close()
490

    
491
    job = _QueuedJob.Restore(self, data)
492
    self._memcache[job_id] = job
493
    logging.debug("Added job %s to the cache", job_id)
494
    return job
495

    
496
  def _GetJobsUnlocked(self, job_ids):
497
    if not job_ids:
498
      job_ids = self._GetJobIDsUnlocked()
499

    
500
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
501

    
502
  @utils.LockedMethod
503
  def GetJobs(self, job_ids):
504
    return self._GetJobsUnlocked(job_ids)
505

    
506
  @utils.LockedMethod
507
  def AddJob(self, ops, nodes):
508
    """Create and store on disk a new job.
509

510
    @type ops: list
511
    @param ops: The list of OpCodes that will become the new job.
512
    @type nodes: list
513
    @param nodes: The list of nodes to which the new job serial will be
514
                  distributed.
515

516
    """
517
    assert self.lock_fd, "Queue should be open"
518

    
519
    # Get job identifier
520
    job_id = self._NewSerialUnlocked(nodes)
521
    job = _QueuedJob(self, job_id, ops)
522

    
523
    # Write to disk
524
    self._UpdateJobUnlocked(job)
525

    
526
    logging.debug("Added new job %s to the cache", job_id)
527
    self._memcache[job_id] = job
528

    
529
    return job
530

    
531
  def _UpdateJobUnlocked(self, job):
532
    assert self.lock_fd, "Queue should be open"
533

    
534
    filename = self._GetJobPath(job.id)
535
    logging.debug("Writing job %s to %s", job.id, filename)
536
    utils.WriteFile(filename,
537
                    data=serializer.DumpJson(job.Serialize(), indent=False))
538
    self._CleanCacheUnlocked([job.id])
539

    
540
  def _CleanCacheUnlocked(self, exclude):
541
    """Clean the memory cache.
542

543
    The exceptions argument contains job IDs that should not be
544
    cleaned.
545

546
    """
547
    assert isinstance(exclude, list)
548
    for job in self._memcache.values():
549
      if job.id in exclude:
550
        continue
551
      if job.GetStatus() not in (constants.JOB_STATUS_QUEUED,
552
                                 constants.JOB_STATUS_RUNNING):
553
        logging.debug("Cleaning job %s from the cache", job.id)
554
        try:
555
          del self._memcache[job.id]
556
        except KeyError:
557
          pass
558

    
559
  @utils.LockedMethod
560
  def UpdateJob(self, job):
561
    return self._UpdateJobUnlocked(job)
562

    
563
  # TODO: Figure out locking
564
  #@utils.LockedMethod
565
  def CancelJob(self, job_id):
566
    """Cancels a job.
567

568
    @type job_id: string
569
    @param job_id: Job ID of job to be cancelled.
570

571
    """
572
    logging.debug("Cancelling job %s", job_id)
573

    
574
    self._lock.acquire()
575
    try:
576
      job = self._LoadJobUnlocked(job_id)
577
    finally:
578
      self._lock.release()
579
    if not job:
580
      logging.debug("Job %s not found", job_id)
581
      return
582

    
583
    if job.GetStatus() not in (constants.JOB_STATUS_QUEUED,):
584
      logging.debug("Job %s is no longer in the queue", job.id)
585
      return
586

    
587
    job.SetCanceled("Job cancelled by request")
588

    
589
  @utils.LockedMethod
590
  def ArchiveJob(self, job_id):
591
    """Archives a job.
592

593
    @type job_id: string
594
    @param job_id: Job ID of job to be archived.
595

596
    """
597
    logging.debug("Archiving job %s", job_id)
598

    
599
    job = self._LoadJobUnlocked(job_id)
600
    if not job:
601
      logging.debug("Job %s not found", job_id)
602
      return
603

    
604
    if not self._ShouldJobBeArchivedUnlocked(job):
605
      return
606

    
607
    try:
608
      old = self._GetJobPath(job.id)
609
      new = self._GetArchivedJobPath(job.id)
610

    
611
      os.rename(old, new)
612

    
613
      logging.debug("Successfully archived job %s", job.id)
614
    finally:
615
      # Cleaning the cache because we don't know what os.rename actually did
616
      # and to be on the safe side.
617
      self._CleanCacheUnlocked([])
618

    
619

    
620
class JobQueue:
621
  """The job queue.
622

623
  """
624
  def __init__(self, context):
625
    self._lock = threading.Lock()
626
    self._jobs = DiskJobStorage("")
627
    self._wpool = _JobQueueWorkerPool(context)
628

    
629
    for job in self._jobs.GetJobs(None):
630
      status = job.GetStatus()
631
      if status in (constants.JOB_STATUS_QUEUED, ):
632
        self._wpool.AddTask(job)
633

    
634
      elif status in (constants.JOB_STATUS_RUNNING, ):
635
        logging.warning("Unfinished job %s found: %s", job.id, job)
636
        job.SetUnclean("Unclean master daemon shutdown")
637

    
638
  @utils.LockedMethod
639
  def SubmitJob(self, ops, nodes):
640
    """Add a new job to the queue.
641

642
    This enters the job into our job queue and also puts it on the new
643
    queue, in order for it to be picked up by the queue processors.
644

645
    @type ops: list
646
    @param ops: the sequence of opcodes that will become the new job
647
    @type nodes: list
648
    @param nodes: the list of nodes to which the queue should be
649
                  distributed
650

651
    """
652
    job = self._jobs.AddJob(ops, nodes)
653

    
654
    # Add to worker pool
655
    self._wpool.AddTask(job)
656

    
657
    return job.id
658

    
659
  def ArchiveJob(self, job_id):
660
    self._jobs.ArchiveJob(job_id)
661

    
662
  @utils.LockedMethod
663
  def CancelJob(self, job_id):
664
    self._jobs.CancelJob(job_id)
665

    
666
  def _GetJobInfo(self, job, fields):
667
    row = []
668
    for fname in fields:
669
      if fname == "id":
670
        row.append(job.id)
671
      elif fname == "status":
672
        row.append(job.GetStatus())
673
      elif fname == "ops":
674
        row.append([op.GetInput().__getstate__() for op in job._ops])
675
      elif fname == "opresult":
676
        row.append([op.GetResult() for op in job._ops])
677
      elif fname == "opstatus":
678
        row.append([op.GetStatus() for op in job._ops])
679
      elif fname == "ticker":
680
        ji = job.GetRunOpIndex()
681
        if ji < 0:
682
          lmsg = None
683
        else:
684
          lmsg = job._ops[ji].RetrieveLog(-1)
685
          # message might be empty here
686
          if lmsg:
687
            lmsg = lmsg[0]
688
          else:
689
            lmsg = None
690
        row.append(lmsg)
691
      else:
692
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
693
    return row
694

    
695
  def QueryJobs(self, job_ids, fields):
696
    """Returns a list of jobs in queue.
697

698
    Args:
699
    - job_ids: Sequence of job identifiers or None for all
700
    - fields: Names of fields to return
701

702
    """
703
    self._lock.acquire()
704
    try:
705
      jobs = []
706

    
707
      for job in self._jobs.GetJobs(job_ids):
708
        if job is None:
709
          jobs.append(None)
710
        else:
711
          jobs.append(self._GetJobInfo(job, fields))
712

    
713
      return jobs
714
    finally:
715
      self._lock.release()
716

    
717
  @utils.LockedMethod
718
  def Shutdown(self):
719
    """Stops the job queue.
720

721
    """
722
    self._wpool.TerminateWorkers()
723
    self._jobs.Close()