Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 8090e19f

History | View | Annotate | Download (18.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling."""
23

    
24
import os
25
import logging
26
import threading
27
import errno
28
import re
29
import time
30

    
31
from ganeti import constants
32
from ganeti import serializer
33
from ganeti import workerpool
34
from ganeti import opcodes
35
from ganeti import errors
36
from ganeti import mcpu
37
from ganeti import utils
38
from ganeti import rpc
39

    
40

    
41
JOBQUEUE_THREADS = 5
42

    
43

    
44
class _QueuedOpCode(object):
45
  """Encasulates an opcode object.
46

47
  Access is synchronized by the '_lock' attribute.
48

49
  The 'log' attribute holds the execution log and consists of tuples
50
  of the form (timestamp, level, message).
51

52
  """
53
  def __init__(self, op):
54
    self.__Setup(op, constants.OP_STATUS_QUEUED, None, [])
55

    
56
  def __Setup(self, input_, status, result, log):
57
    self._lock = threading.Lock()
58
    self.input = input_
59
    self.status = status
60
    self.result = result
61
    self.log = log
62

    
63
  @classmethod
64
  def Restore(cls, state):
65
    obj = object.__new__(cls)
66
    obj.__Setup(opcodes.OpCode.LoadOpCode(state["input"]),
67
                state["status"], state["result"], state["log"])
68
    return obj
69

    
70
  @utils.LockedMethod
71
  def Serialize(self):
72
    return {
73
      "input": self.input.__getstate__(),
74
      "status": self.status,
75
      "result": self.result,
76
      "log": self.log,
77
      }
78

    
79
  @utils.LockedMethod
80
  def GetInput(self):
81
    """Returns the original opcode.
82

83
    """
84
    return self.input
85

    
86
  @utils.LockedMethod
87
  def SetStatus(self, status, result):
88
    """Update the opcode status and result.
89

90
    """
91
    self.status = status
92
    self.result = result
93

    
94
  @utils.LockedMethod
95
  def GetStatus(self):
96
    """Get the opcode status.
97

98
    """
99
    return self.status
100

    
101
  @utils.LockedMethod
102
  def GetResult(self):
103
    """Get the opcode result.
104

105
    """
106
    return self.result
107

    
108
  @utils.LockedMethod
109
  def Log(self, *args):
110
    """Append a log entry.
111

112
    """
113
    assert len(args) < 2
114

    
115
    if len(args) == 1:
116
      log_type = constants.ELOG_MESSAGE
117
      log_msg = args[0]
118
    else:
119
      log_type, log_msg = args
120
    self.log.append((time.time(), log_type, log_msg))
121

    
122
  @utils.LockedMethod
123
  def RetrieveLog(self, start_at=0):
124
    """Retrieve (a part of) the execution log.
125

126
    """
127
    return self.log[start_at:]
128

    
129

    
130
class _QueuedJob(object):
131
  """In-memory job representation.
132

133
  This is what we use to track the user-submitted jobs.
134

135
  """
136
  def __init__(self, storage, job_id, ops):
137
    if not ops:
138
      # TODO
139
      raise Exception("No opcodes")
140

    
141
    self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops], -1)
142

    
143
  def __Setup(self, storage, job_id, ops, run_op_index):
144
    self._lock = threading.Lock()
145
    self.storage = storage
146
    self.id = job_id
147
    self._ops = ops
148
    self.run_op_index = run_op_index
149

    
150
  @classmethod
151
  def Restore(cls, storage, state):
152
    obj = object.__new__(cls)
153
    op_list = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
154
    obj.__Setup(storage, state["id"], op_list, state["run_op_index"])
155
    return obj
156

    
157
  def Serialize(self):
158
    return {
159
      "id": self.id,
160
      "ops": [op.Serialize() for op in self._ops],
161
      "run_op_index": self.run_op_index,
162
      }
163

    
164
  def _SetStatus(self, status, msg):
165
    try:
166
      for op in self._ops:
167
        op.SetStatus(status, msg)
168
    finally:
169
      self.storage.UpdateJob(self)
170

    
171
  def SetUnclean(self, msg):
172
    return self._SetStatus(constants.OP_STATUS_ERROR, msg)
173

    
174
  def SetCanceled(self, msg):
175
    return self._SetStatus(constants.JOB_STATUS_CANCELED, msg)
176

    
177
  def GetStatus(self):
178
    status = constants.JOB_STATUS_QUEUED
179

    
180
    all_success = True
181
    for op in self._ops:
182
      op_status = op.GetStatus()
183
      if op_status == constants.OP_STATUS_SUCCESS:
184
        continue
185

    
186
      all_success = False
187

    
188
      if op_status == constants.OP_STATUS_QUEUED:
189
        pass
190
      elif op_status == constants.OP_STATUS_RUNNING:
191
        status = constants.JOB_STATUS_RUNNING
192
      elif op_status == constants.OP_STATUS_ERROR:
193
        status = constants.JOB_STATUS_ERROR
194
        # The whole job fails if one opcode failed
195
        break
196
      elif op_status == constants.OP_STATUS_CANCELED:
197
        status = constants.OP_STATUS_CANCELED
198
        break
199

    
200
    if all_success:
201
      status = constants.JOB_STATUS_SUCCESS
202

    
203
    return status
204

    
205
  @utils.LockedMethod
206
  def GetRunOpIndex(self):
207
    return self.run_op_index
208

    
209
  def Run(self, proc):
210
    """Job executor.
211

212
    This functions processes a this job in the context of given processor
213
    instance.
214

215
    Args:
216
    - proc: Ganeti Processor to run the job with
217

218
    """
219
    try:
220
      count = len(self._ops)
221
      for idx, op in enumerate(self._ops):
222
        try:
223
          logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
224

    
225
          self._lock.acquire()
226
          try:
227
            self.run_op_index = idx
228
          finally:
229
            self._lock.release()
230

    
231
          op.SetStatus(constants.OP_STATUS_RUNNING, None)
232
          self.storage.UpdateJob(self)
233

    
234
          result = proc.ExecOpCode(op.input, op.Log)
235

    
236
          op.SetStatus(constants.OP_STATUS_SUCCESS, result)
237
          self.storage.UpdateJob(self)
238
          logging.debug("Op %s/%s: Successfully finished %s",
239
                        idx + 1, count, op)
240
        except Exception, err:
241
          try:
242
            op.SetStatus(constants.OP_STATUS_ERROR, str(err))
243
            logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
244
          finally:
245
            self.storage.UpdateJob(self)
246
          raise
247

    
248
    except errors.GenericError, err:
249
      logging.exception("Ganeti exception")
250
    except:
251
      logging.exception("Unhandled exception")
252

    
253

    
254
class _JobQueueWorker(workerpool.BaseWorker):
255
  def RunTask(self, job):
256
    logging.debug("Worker %s processing job %s",
257
                  self.worker_id, job.id)
258
    # TODO: feedback function
259
    proc = mcpu.Processor(self.pool.context)
260
    try:
261
      job.Run(proc)
262
    finally:
263
      logging.debug("Worker %s finished job %s, status = %s",
264
                    self.worker_id, job.id, job.GetStatus())
265

    
266

    
267
class _JobQueueWorkerPool(workerpool.WorkerPool):
268
  def __init__(self, context):
269
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
270
                                              _JobQueueWorker)
271
    self.context = context
272

    
273

    
274
class JobStorageBase(object):
275
  def __init__(self, id_prefix):
276
    self.id_prefix = id_prefix
277

    
278
    if id_prefix:
279
      prefix_pattern = re.escape("%s-" % id_prefix)
280
    else:
281
      prefix_pattern = ""
282

    
283
    # Apart from the prefix, all job IDs are numeric
284
    self._re_job_id = re.compile(r"^%s\d+$" % prefix_pattern)
285

    
286
  def OwnsJobId(self, job_id):
287
    return self._re_job_id.match(job_id)
288

    
289
  def FormatJobID(self, job_id):
290
    if not isinstance(job_id, (int, long)):
291
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
292
    if job_id < 0:
293
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
294

    
295
    if self.id_prefix:
296
      prefix = "%s-" % self.id_prefix
297
    else:
298
      prefix = ""
299

    
300
    return "%s%010d" % (prefix, job_id)
301

    
302
  def _ShouldJobBeArchivedUnlocked(self, job):
303
    if job.GetStatus() not in (constants.JOB_STATUS_CANCELED,
304
                               constants.JOB_STATUS_SUCCESS,
305
                               constants.JOB_STATUS_ERROR):
306
      logging.debug("Job %s is not yet done", job.id)
307
      return False
308
    return True
309

    
310

    
311
class DiskJobStorage(JobStorageBase):
312
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
313

    
314
  def __init__(self, id_prefix):
315
    JobStorageBase.__init__(self, id_prefix)
316

    
317
    self._lock = threading.Lock()
318
    self._memcache = {}
319
    self._my_hostname = utils.HostInfo().name
320

    
321
    # Make sure our directories exists
322
    for path in (constants.QUEUE_DIR, constants.JOB_QUEUE_ARCHIVE_DIR):
323
      try:
324
        os.mkdir(path, 0700)
325
      except OSError, err:
326
        if err.errno not in (errno.EEXIST, ):
327
          raise
328

    
329
    # Get queue lock
330
    self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w")
331
    try:
332
      utils.LockFile(self.lock_fd)
333
    except:
334
      self.lock_fd.close()
335
      raise
336

    
337
    # Read version
338
    try:
339
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
340
    except IOError, err:
341
      if err.errno not in (errno.ENOENT, ):
342
        raise
343

    
344
      # Setup a new queue
345
      self._InitQueueUnlocked()
346

    
347
      # Try to open again
348
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
349

    
350
    try:
351
      # Try to read version
352
      version = int(version_fd.read(128))
353

    
354
      # Verify version
355
      if version != constants.JOB_QUEUE_VERSION:
356
        raise errors.JobQueueError("Found version %s, expected %s",
357
                                   version, constants.JOB_QUEUE_VERSION)
358
    finally:
359
      version_fd.close()
360

    
361
    self._last_serial = self._ReadSerial()
362
    if self._last_serial is None:
363
      raise errors.ConfigurationError("Can't read/parse the job queue serial"
364
                                      " file")
365

    
366
  @staticmethod
367
  def _ReadSerial():
368
    """Try to read the job serial file.
369

370
    @rtype: None or int
371
    @return: If the serial can be read, then it is returned. Otherwise None
372
             is returned.
373

374
    """
375
    try:
376
      serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r")
377
      try:
378
        # Read last serial
379
        serial = int(serial_fd.read(1024).strip())
380
      finally:
381
        serial_fd.close()
382
    except (ValueError, EnvironmentError):
383
      serial = None
384

    
385
    return serial
386

    
387
  def Close(self):
388
    assert self.lock_fd, "Queue should be open"
389

    
390
    self.lock_fd.close()
391
    self.lock_fd = None
392

    
393
  def _InitQueueUnlocked(self):
394
    assert self.lock_fd, "Queue should be open"
395

    
396
    utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
397
                    data="%s\n" % constants.JOB_QUEUE_VERSION)
398
    if self._ReadSerial() is None:
399
      utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
400
                      data="%s\n" % 0)
401

    
402
  def _NewSerialUnlocked(self, nodes):
403
    """Generates a new job identifier.
404

405
    Job identifiers are unique during the lifetime of a cluster.
406

407
    Returns: A string representing the job identifier.
408

409
    """
410
    assert self.lock_fd, "Queue should be open"
411

    
412
    # New number
413
    serial = self._last_serial + 1
414

    
415
    # Write to file
416
    utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
417
                    data="%s\n" % serial)
418

    
419
    # Keep it only if we were able to write the file
420
    self._last_serial = serial
421

    
422
    # Distribute the serial to the other nodes
423
    try:
424
      nodes.remove(self._my_hostname)
425
    except ValueError:
426
      pass
427

    
428
    result = rpc.call_upload_file(nodes, constants.JOB_QUEUE_SERIAL_FILE)
429
    for node in nodes:
430
      if not result[node]:
431
        logging.error("copy of job queue file to node %s failed", node)
432

    
433
    return self.FormatJobID(serial)
434

    
435
  def _GetJobPath(self, job_id):
436
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
437

    
438
  def _GetArchivedJobPath(self, job_id):
439
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
440

    
441
  def _ExtractJobID(self, name):
442
    m = self._RE_JOB_FILE.match(name)
443
    if m:
444
      return m.group(1)
445
    else:
446
      return None
447

    
448
  def _GetJobIDsUnlocked(self, archived=False):
449
    """Return all known job IDs.
450

451
    If the parameter archived is True, archived jobs IDs will be
452
    included. Currently this argument is unused.
453

454
    The method only looks at disk because it's a requirement that all
455
    jobs are present on disk (so in the _memcache we don't have any
456
    extra IDs).
457

458
    """
459
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
460
    jlist.sort()
461
    return jlist
462

    
463
  def _ListJobFiles(self):
464
    assert self.lock_fd, "Queue should be open"
465

    
466
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
467
            if self._RE_JOB_FILE.match(name)]
468

    
469
  def _LoadJobUnlocked(self, job_id):
470
    assert self.lock_fd, "Queue should be open"
471

    
472
    if job_id in self._memcache:
473
      logging.debug("Found job %s in memcache", job_id)
474
      return self._memcache[job_id]
475

    
476
    filepath = self._GetJobPath(job_id)
477
    logging.debug("Loading job from %s", filepath)
478
    try:
479
      fd = open(filepath, "r")
480
    except IOError, err:
481
      if err.errno in (errno.ENOENT, ):
482
        return None
483
      raise
484
    try:
485
      data = serializer.LoadJson(fd.read())
486
    finally:
487
      fd.close()
488

    
489
    job = _QueuedJob.Restore(self, data)
490
    self._memcache[job_id] = job
491
    logging.debug("Added job %s to the cache", job_id)
492
    return job
493

    
494
  def _GetJobsUnlocked(self, job_ids):
495
    if not job_ids:
496
      job_ids = self._GetJobIDsUnlocked()
497

    
498
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
499

    
500
  @utils.LockedMethod
501
  def GetJobs(self, job_ids):
502
    return self._GetJobsUnlocked(job_ids)
503

    
504
  @utils.LockedMethod
505
  def AddJob(self, ops, nodes):
506
    """Create and store on disk a new job.
507

508
    @type ops: list
509
    @param ops: The list of OpCodes that will become the new job.
510
    @type nodes: list
511
    @param nodes: The list of nodes to which the new job serial will be
512
                  distributed.
513

514
    """
515
    assert self.lock_fd, "Queue should be open"
516

    
517
    # Get job identifier
518
    job_id = self._NewSerialUnlocked(nodes)
519
    job = _QueuedJob(self, job_id, ops)
520

    
521
    # Write to disk
522
    self._UpdateJobUnlocked(job)
523

    
524
    logging.debug("Added new job %s to the cache", job_id)
525
    self._memcache[job_id] = job
526

    
527
    return job
528

    
529
  def _UpdateJobUnlocked(self, job):
530
    assert self.lock_fd, "Queue should be open"
531

    
532
    filename = self._GetJobPath(job.id)
533
    logging.debug("Writing job %s to %s", job.id, filename)
534
    utils.WriteFile(filename,
535
                    data=serializer.DumpJson(job.Serialize(), indent=False))
536
    self._CleanCacheUnlocked([job.id])
537

    
538
  def _CleanCacheUnlocked(self, exclude):
539
    """Clean the memory cache.
540

541
    The exceptions argument contains job IDs that should not be
542
    cleaned.
543

544
    """
545
    assert isinstance(exclude, list)
546
    for job in self._memcache.values():
547
      if job.id in exclude:
548
        continue
549
      if job.GetStatus() not in (constants.JOB_STATUS_QUEUED,
550
                                 constants.JOB_STATUS_RUNNING):
551
        logging.debug("Cleaning job %s from the cache", job.id)
552
        try:
553
          del self._memcache[job.id]
554
        except KeyError:
555
          pass
556

    
557
  @utils.LockedMethod
558
  def UpdateJob(self, job):
559
    return self._UpdateJobUnlocked(job)
560

    
561
  # TODO: Figure out locking
562
  #@utils.LockedMethod
563
  def CancelJob(self, job_id):
564
    """Cancels a job.
565

566
    @type job_id: string
567
    @param job_id: Job ID of job to be cancelled.
568

569
    """
570
    logging.debug("Cancelling job %s", job_id)
571

    
572
    self._lock.acquire()
573
    try:
574
      job = self._LoadJobUnlocked(job_id)
575
    finally:
576
      self._lock.release()
577
    if not job:
578
      logging.debug("Job %s not found", job_id)
579
      return
580

    
581
    if job.GetStatus() not in (constants.JOB_STATUS_QUEUED,):
582
      logging.debug("Job %s is no longer in the queue", job.id)
583
      return
584

    
585
    job.SetCanceled("Job cancelled by request")
586

    
587
  @utils.LockedMethod
588
  def ArchiveJob(self, job_id):
589
    """Archives a job.
590

591
    @type job_id: string
592
    @param job_id: Job ID of job to be archived.
593

594
    """
595
    logging.debug("Archiving job %s", job_id)
596

    
597
    job = self._LoadJobUnlocked(job_id)
598
    if not job:
599
      logging.debug("Job %s not found", job_id)
600
      return
601

    
602
    if not self._ShouldJobBeArchivedUnlocked(job):
603
      return
604

    
605
    try:
606
      old = self._GetJobPath(job.id)
607
      new = self._GetArchivedJobPath(job.id)
608

    
609
      os.rename(old, new)
610

    
611
      logging.debug("Successfully archived job %s", job.id)
612
    finally:
613
      # Cleaning the cache because we don't know what os.rename actually did
614
      # and to be on the safe side.
615
      self._CleanCacheUnlocked([])
616

    
617

    
618
class JobQueue:
619
  """The job queue.
620

621
  """
622
  def __init__(self, context):
623
    self._lock = threading.Lock()
624
    self._jobs = DiskJobStorage("")
625
    self._wpool = _JobQueueWorkerPool(context)
626

    
627
    for job in self._jobs.GetJobs(None):
628
      status = job.GetStatus()
629
      if status in (constants.JOB_STATUS_QUEUED, ):
630
        self._wpool.AddTask(job)
631

    
632
      elif status in (constants.JOB_STATUS_RUNNING, ):
633
        logging.warning("Unfinished job %s found: %s", job.id, job)
634
        job.SetUnclean("Unclean master daemon shutdown")
635

    
636
  @utils.LockedMethod
637
  def SubmitJob(self, ops, nodes):
638
    """Add a new job to the queue.
639

640
    This enters the job into our job queue and also puts it on the new
641
    queue, in order for it to be picked up by the queue processors.
642

643
    @type ops: list
644
    @param ops: the sequence of opcodes that will become the new job
645
    @type nodes: list
646
    @param nodes: the list of nodes to which the queue should be
647
                  distributed
648

649
    """
650
    job = self._jobs.AddJob(ops, nodes)
651

    
652
    # Add to worker pool
653
    self._wpool.AddTask(job)
654

    
655
    return job.id
656

    
657
  def ArchiveJob(self, job_id):
658
    self._jobs.ArchiveJob(job_id)
659

    
660
  @utils.LockedMethod
661
  def CancelJob(self, job_id):
662
    self._jobs.CancelJob(job_id)
663

    
664
  def _GetJobInfo(self, job, fields):
665
    row = []
666
    for fname in fields:
667
      if fname == "id":
668
        row.append(job.id)
669
      elif fname == "status":
670
        row.append(job.GetStatus())
671
      elif fname == "ops":
672
        row.append([op.GetInput().__getstate__() for op in job._ops])
673
      elif fname == "opresult":
674
        row.append([op.GetResult() for op in job._ops])
675
      elif fname == "opstatus":
676
        row.append([op.GetStatus() for op in job._ops])
677
      elif fname == "ticker":
678
        ji = job.GetRunOpIndex()
679
        if ji < 0:
680
          lmsg = None
681
        else:
682
          lmsg = job._ops[ji].RetrieveLog(-1)
683
          # message might be empty here
684
          if lmsg:
685
            lmsg = lmsg[0]
686
          else:
687
            lmsg = None
688
        row.append(lmsg)
689
      else:
690
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
691
    return row
692

    
693
  def QueryJobs(self, job_ids, fields):
694
    """Returns a list of jobs in queue.
695

696
    Args:
697
    - job_ids: Sequence of job identifiers or None for all
698
    - fields: Names of fields to return
699

700
    """
701
    self._lock.acquire()
702
    try:
703
      jobs = []
704

    
705
      for job in self._jobs.GetJobs(job_ids):
706
        if job is None:
707
          jobs.append(None)
708
        else:
709
          jobs.append(self._GetJobInfo(job, fields))
710

    
711
      return jobs
712
    finally:
713
      self._lock.release()
714

    
715
  @utils.LockedMethod
716
  def Shutdown(self):
717
    """Stops the job queue.
718

719
    """
720
    self._wpool.TerminateWorkers()
721
    self._jobs.Close()