Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / commissioning / specificator.py @ b4368e33

History | View | Annotate | Download (22.1 kB)

1
# -*- coding: utf-8 -*-
2
# Copyright 2012 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or
5
# without modification, are permitted provided that the following
6
# conditions are met:
7
#
8
#   1. Redistributions of source code must retain the above
9
#      copyright notice, this list of conditions and the following
10
#      disclaimer.
11
#
12
#   2. Redistributions in binary form must reproduce the above
13
#      copyright notice, this list of conditions and the following
14
#      disclaimer in the documentation and/or other materials
15
#      provided with the distribution.
16
#
17
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28
# POSSIBILITY OF SUCH DAMAGE.
29
#
30
# The views and conclusions contained in the software and
31
# documentation are those of the authors and should not be
32
# interpreted as representing official policies, either expressed
33
# or implied, of GRNET S.A.
34

    
35
from random import random, choice, randint
36
from math import log
37
from inspect import isclass
38
from kamaki.clients.commissioning.utils.argmap import (
39
    argmap_decode,
40
    argmap_check,
41
    argmap_unpack_dict,
42
    argmap_unpack_list)
43

    
44
try:
45
    from collections import OrderedDict
46
except ImportError:
47
    from kamaki.clients.commissioning.utils.ordereddict import OrderedDict
48

    
49
def shorts(s):
50
    if not isinstance(s, unicode):
51
        s = str(s)
52

    
53
    if len(s) <= 64:
54
        return s
55

    
56
    return s[:61] + '...'
57

    
58

    
59
class CanonifyException(Exception):
60
    pass
61

    
62
class SpecifyException(Exception):
63
    pass
64

    
65

    
66
class Canonical(object):
67

    
68
    _random_choice = None
69

    
70
    def __init__(self, *args, **kw):
71
        self.args = []
72
        named_args = []
73
        for a in args:
74
            if isinstance(a, tuple) and len(a) == 2:
75
                named_args.append(a)
76
            else:
77
                self.args.append(a)
78
        ordered_dict = OrderedDict(named_args)
79

    
80
        self.name = kw.pop('classname', self.__class__.__name__)
81
        random_choice = kw.pop('random', None)
82
        if random_choice is not None:
83
            self.random_choice = random_choice
84
        opts = {}
85
        for k, v in kw.items():
86
            if not isinstance(v, Canonical):
87
                if isclass(v) and issubclass(v, Canonical):
88
                    m = ("argument '%s': value '%s' is a Canonical _class_. "
89
                         "Perhaps you meant to specify a Canonical _instance_"
90
                         % (k, v))
91
                    raise SpecifyException(m)
92
                opts[k] = v
93
                del kw[k]
94

    
95
        self.opts = opts
96
        ordered_dict.update(kw)
97
        self.kw = ordered_dict
98
        self.init()
99

    
100
        if 'default' in opts:
101
            item = opts['default']
102
            if item is None:
103
                opts['null'] = 1
104
            else:
105
                opts['default'] = self._check(item)
106

    
107
    def init(self):
108
        return
109

    
110
    def __call__(self, item):
111
        return self.check(item)
112

    
113
    def check(self, item):
114
        if argmap_check(item):
115
            item = self._unpack(item)
116

    
117
        opts = self.opts
118
        if item is None and 'default' in opts:
119
            item = opts['default']
120

    
121
        can_be_null = opts.get('null', False)
122
        if item is None and can_be_null:
123
            return None
124

    
125
        return self._check(item)
126

    
127
    def _check(self, item):
128
        return item
129

    
130
    def _unpack(self, item):
131
        return argmap_unpack_list(item)
132

    
133
    def create(self):
134
        return None
135

    
136
    def random(self, **kw):
137
        random_choice = self._random_choice
138
        if random_choice is None:
139
            return None
140

    
141
        if callable(random_choice):
142
            return random_choice(kw)
143

    
144
        if isinstance(random_choice, str):
145
            return getattr(self, random_choice)(kw)
146

    
147
        return choice(random_choice)
148

    
149
    def tostring(self, depth=0, showopts=0, multiline=0):
150
        depth += 1
151
        if not multiline:
152
            argdepth = ''
153
            owndepth = ''
154
            joinchar = ','
155
            padchar = ''
156
        else:
157
            argdepth = '    ' * depth
158
            owndepth = '    ' * (depth - 1)
159
            joinchar = ',\n'
160
            padchar = '\n'
161

    
162
        args = [a.tostring( depth=depth,
163
                            showopts=showopts,
164
                            multiline=multiline) for a in self.args]
165
        args += [("%s=%s" %
166
                    (k, v.tostring( depth=depth,
167
                                    showopts=showopts,
168
                                    multiline=multiline)))
169

    
170
                                    for k, v in self.kw.items()]
171
        if showopts:
172
            args += [("%s=%s" % (k, str(v))) for k, v in self.opts.items()]
173

    
174
        if len(args) == 0:
175
            string = "%s(%s)" % (self.name, ','.join(args))
176
        else:
177
            string = "%s(%s" % (self.name, padchar)
178
            for arg in args:
179
                string += argdepth + arg + joinchar
180
            string = string[:-1] + padchar
181
            string += owndepth + ")"
182

    
183
        return string
184

    
185
    __str__ = tostring
186

    
187
    def __repr__(self):
188
        return self.tostring(multiline=0, showopts=1)
189

    
190
    def show(self):
191
        showable = self.opts.get('show', True)
192
        return self._show() if showable else ''
193

    
194
    def _show(self):
195
        return self.name
196

    
197
class Null(Canonical):
198

    
199
    def _check(self, item):
200
        return None
201

    
202
Nothing = Null()
203

    
204

    
205
class Integer(Canonical):
206

    
207
    def _check(self, item):
208
        try:
209
            num = long(item)
210
        except ValueError, e:
211
            try:
212
                num = long(item, 16)
213
            except Exception:
214
                m = "%s: cannot convert '%s' to long" % (self, shorts(item))
215
                raise CanonifyException(m)
216
        except TypeError, e:
217
            m = "%s: cannot convert '%s' to long" % (self, shorts(item))
218
            raise CanonifyException(m)
219

    
220
        optget = self.opts.get
221
        minimum = optget('minimum', None)
222
        maximum = optget('maximum', None)
223

    
224
        if minimum is not None and num < minimum:
225
            m = "%s: %d < minimum=%d" % (self, num, minimum)
226
            raise CanonifyException(m)
227

    
228
        if maximum is not None and num > maximum:
229
            m = "%s: %d > maximum=%d" % (self, num, maximum)
230
            raise CanonifyException(m)
231

    
232
        return num
233

    
234
    def _random_choice(self, kw):
235
        optget = self.opts.get
236
        kwget = kw.get
237
        minimum = kwget('minimum', optget('minimum', -4294967296L))
238
        maximum = kwget('maximum', optget('maximum', 4294967295L))
239
        r = random()
240
        if r < 0.1:
241
            return minimum
242
        if r < 0.2:
243
            return maximum
244
        if minimum <= 0 and maximum >= 0 and r < 0.3:
245
            return 0L
246
        return long(minimum + r * (maximum - minimum))
247

    
248

    
249

    
250
Serial = Integer(
251
            classname   =   'Serial',
252
            null        =   True,
253
)
254

    
255

    
256
class Text(Canonical):
257

    
258
    re = None
259
    matcher = None
260
    choices = None
261

    
262
    def init(self):
263
        opts = self.opts
264
        if 'regex' in opts:
265
            pat = opts['regex']
266
            re = self.re
267
            if re is None:
268
                import re
269
                self.re = re
270

    
271
            self.matcher = re.compile(pat, re.UNICODE)
272
            self.pat = pat
273

    
274
        if 'choices' in opts:
275
            opts['choices'] = dict((unicode(x), unicode(x))
276
                                    for x in opts['choices'])
277

    
278
    def _check(self, item):
279
        if not isinstance(item, unicode):
280
            # require non-unicode items to be utf8
281
            item = str(item)
282
            try:
283
                item = item.decode('utf8')
284
            except UnicodeDecodeError, e:
285
                item = item.decode('latin1')
286
                m = "%s: non-unicode '%s' is not utf8" % (self, shorts(item))
287
                raise CanonifyException(m)
288

    
289
        opts = self.opts
290
        if 'choices' in opts:
291
            choices = opts['choices']
292
            try:
293
                unknown = item not in choices
294
            except TypeError, e:
295
                m = "%s: unhashable type '%s'" % (self.name, shorts(item))
296
                raise CanonifyException(m, e)
297

    
298
            if unknown:
299
                m = "%s: '%s' not in choices" % (self.name, shorts(item))
300
                raise CanonifyException(m)
301

    
302
            return choices[item]
303

    
304
        optget = opts.get
305
        itemlen = len(item)
306
        maxlen = optget('maxlen', None)
307
        if maxlen is not None and itemlen > maxlen:
308
            m = "%s: len('%s') > maxlen=%d" % (self, shorts(item), maxlen)
309
            raise CanonifyException(m)
310

    
311
        minlen = optget('minlen', None)
312
        if minlen is not None and itemlen < minlen:
313
            m = "%s: len('%s') < minlen=%d" % (self, shorts(item), minlen)
314
            raise CanonifyException(m)
315

    
316
        matcher = self.matcher
317
        if matcher is not None:
318
            match = matcher.match(item)
319
            if  (       match is None
320
                    or  (match.start(), match.end()) != (0, itemlen)    ):
321

    
322
                    m = ("%s: '%s' does not match '%s'"
323
                            % (self, shorts(item), self.pat))
324
                    raise CanonifyException(m)
325

    
326
        return item
327

    
328
    default_alphabet = '0123456789ฮฑฮฒฮณฮดฮตฮถ'.decode('utf8')
329

    
330
    def _random_choice(self, kw):
331
        opts = self.opts
332
        if 'regex' in opts:
333
            m = 'Unfortunately, random for regex strings not supported'
334
            raise ValueError(m)
335

    
336
        optget = opts.get
337
        kwget = kw.get
338
        minlen = kwget('minlen', optget('minlen', 0))
339
        maxlen = kwget('maxlen', optget('maxlen', 32))
340
        alphabet = kwget('alphabet', self.default_alphabet)
341
        z = maxlen - minlen
342
        if z < 1:
343
            z = 1
344

    
345
        g = log(z, 2)
346
        r = random() * g
347
        z = minlen + int(2**r)
348

    
349
        s = u''
350
        for _ in xrange(z):
351
            s += choice(alphabet)
352

    
353
        return s
354

    
355

    
356
class Bytes(Canonical):
357

    
358
    re = None
359
    matcher = None
360
    choices = None
361

    
362
    def init(self):
363
        opts = self.opts
364
        if 'regex' in opts:
365
            pat = opts['regex']
366
            re = self.re
367
            if re is None:
368
                import re
369
                self.re = re
370

    
371
            self.matcher = re.compile(pat)
372
            self.pat = pat
373

    
374
        if 'choices' in opts:
375
            opts['choices'] = dict((str(x), str(x))
376
                                    for x in opts['choices'])
377

    
378
    def _check(self, item):
379
        if isinstance(item, unicode):
380
            # convert unicode to utf8
381
            item = item.encode('utf8')
382

    
383
        opts = self.opts
384
        if 'choices' in opts:
385
            choices = opts['choices']
386
            try:
387
                unknown = item not in choices
388
            except TypeError, e:
389
                m = "%s: unhashable type '%s'" % (self.name, shorts(item))
390
                raise CanonifyException(m, e)
391

    
392
            if unknown:
393
                m = "%s: '%s' not in choices" % (self.name, shorts(item))
394
                raise CanonifyException(m)
395

    
396
            return choices[item]
397

    
398
        optget = opts.get
399
        itemlen = len(item)
400
        maxlen = optget('maxlen', None)
401
        if maxlen is not None and itemlen > maxlen:
402
            m = "%s: len('%s') > maxlen=%d" % (self, shorts(item), maxlen)
403
            raise CanonifyException(m)
404

    
405
        minlen = optget('minlen', None)
406
        if minlen is not None and itemlen < minlen:
407
            m = "%s: len('%s') < minlen=%d" % (self, shorts(item), minlen)
408
            raise CanonifyException(m)
409

    
410
        matcher = self.matcher
411
        if matcher is not None:
412
            match = matcher.match(item)
413
            if  (       match is None
414
                    or  (match.start(), match.end()) != (0, itemlen)    ):
415

    
416
                    m = ("%s: '%s' does not match '%s'"
417
                            % (self, shorts(item), self.pat))
418
                    raise CanonifyException(m)
419

    
420
        return item
421

    
422
    default_alphabet = '0123456789abcdef'
423

    
424
    def _random_choice(self, kw):
425
        opts = self.opts
426
        if 'regex' in opts:
427
            m = 'Unfortunately, random for regex strings not supported'
428
            raise ValueError(m)
429

    
430
        optget = opts.get
431
        kwget = kw.get
432
        minlen = kwget('minlen', optget('minlen', 0))
433
        maxlen = kwget('maxlen', optget('maxlen', 32))
434
        alphabet = kwget('alphabet', self.default_alphabet)
435
        z = maxlen - minlen
436
        if z < 1:
437
            z = 1
438

    
439
        g = log(z, 2)
440
        r = random() * g
441
        z = minlen + int(2**r)
442

    
443
        s = u''
444
        for _ in xrange(z):
445
            s += choice(alphabet)
446

    
447
        return s
448

    
449

    
450
class ListOf(Canonical):
451

    
452
    def init(self):
453
        args = self.args
454
        kw = self.kw
455

    
456
        if not (args or kw):
457
            raise SpecifyException("ListOf requires one or more arguments")
458

    
459
        if args and kw:
460
            m = ("ListOf requires either positional "
461
                 "or keyword arguments, but not both")
462
            raise SpecifyException(m)
463

    
464
        if args:
465
            if len(args) > 1:
466
                self.canonical = Tuple(*args)
467
            else:
468
                self.canonical = args[0]
469
        else:
470
            self.canonical = Args(**kw)
471

    
472
    def _check(self, item):
473
        if item is None:
474
            item = ()
475

    
476
        try:
477
            items = iter(item)
478
        except TypeError, e:
479
            m = "%s: %s is not iterable" % (self, shorts(item))
480
            raise CanonifyException(m)
481

    
482
        canonical = self.canonical
483
        canonified = []
484
        append = canonified.append
485

    
486
        for item in items:
487
            item = canonical(item)
488
            append(item)
489

    
490
        if not canonified and self.opts.get('nonempty', False):
491
            m = "%s: must be nonempty" % (self,)
492
            raise CanonifyException(m)
493

    
494
        return canonified
495

    
496
    def _random_choice(self, kw):
497
        z = randint(1, 4)
498
        get_random = self.canonical.random
499

    
500
        return [get_random() for _ in xrange(z)]
501

    
502
    def _show(self):
503
        return '[ ' + self.canonical.show() + ' ... ]'
504

    
505
class Args(Canonical):
506

    
507
    def _unpack(self, item):
508
        arglist = argmap_unpack_dict(item)
509
        keys = self.kw.keys()
510
        arglen = len(arglist)
511
        if arglen != len(keys):
512
            m = "inconsistent number of parameters: %s != %s" % (
513
            arglen, len(keys))
514
            raise CanonifyException(m)
515

    
516
        position = 0
517
        named_args = OrderedDict()
518

    
519
        for k, v in arglist:
520
            if k is not None:
521
                named_args[k] = v
522
            else:
523
                # find the right position
524
                for i in range(position, arglen):
525
                    key = keys[i]
526
                    if not key in named_args.keys():
527
                       position = i + 1
528
                       break
529
                else:
530
                    m = "Formal arguments exhausted"
531
                    raise AssertionError(m)
532
                named_args[key] = v
533

    
534
        return named_args
535

    
536
    def _check(self, item):
537
        try:
538
            arglist = OrderedDict(item).items()
539
        except (TypeError, ValueError), e:
540
            m = "%s: %s is not dict-able" % (self, shorts(item))
541
            raise CanonifyException(m)
542

    
543
        canonified = OrderedDict()
544

    
545
        try:
546
            for n, c in self.kw.items():
547
                t = item[n] if n in item else None
548
                canonified[n] = c.check(t)
549
        except KeyError:
550
            m = ("%s: Argument '%s' not found in '%s'"
551
                 % (self, shorts(n), shorts(item)))
552
            raise CanonifyException(m)
553

    
554
        return canonified
555

    
556
    def _show(self):
557
        strings = [x for x in [c.show() for n, c in self.kw.items()] if x]
558
        return ' '.join(strings)
559

    
560
    def _random_choice(self, kw):
561
        args = {}
562
        for n, c in self.kw.items():
563
            args[n] = c.random()
564
        return args
565

    
566

    
567
class Tuple(Canonical):
568

    
569
    def _check(self, item):
570
        try:
571
            items = list(item)
572
        except TypeError, e:
573
            m = "%s: %s is not iterable" % (self, shorts(item))
574
            raise CanonifyException(m)
575

    
576
        canonicals = self.args
577
        zi = len(items)
578
        zc = len(canonicals)
579

    
580
        if zi != zc:
581
            m = "%s: expecting %d elements, not %d (%s)" % (self, zc, zi, str(items))
582
            raise CanonifyException(m)
583

    
584
        g = (canonical(element) for canonical, element in zip(self.args, item))
585

    
586
        return tuple(g)
587

    
588
    def __add__(self, other):
589
        oargs = other.args if isinstance(other, Tuple) else (other,)
590
        args = self.args + oargs
591
        return self.__class__(*args)
592

    
593
    def _random_choice(self, kw):
594
        return tuple(c.random() for c in self.args)
595

    
596
    def _show(self):
597
        canonicals = self.args
598
        strings = [x for x in [c.show() for c in canonicals] if x]
599
        return '[ ' + ' '.join(strings) + ' ]'
600

    
601
class Dict(Canonical):
602

    
603
    def _check(self, item):
604

    
605
        try:
606
            item = dict(item)
607
        except TypeError:
608
            m = "%s: '%s' is not dict-able" % (self, shorts(item))
609
            raise CanonifyException(m)
610

    
611
        canonified = {}
612
        canonical = self.kw
613

    
614
        for n, c in canonical.items():
615
            if n not in item:
616
                m = "%s: key '%s' not found" % (self, shorts(n))
617
                raise CanonifyException(m)
618
            canonified[n] = c(item[n])
619

    
620
        strict = self.opts.get('strict', True)
621
        if strict and len(item) != len(canonical):
622
            for k in sorted(item.keys()):
623
                if k not in canonical:
624
                    break
625

    
626
            m = "%s: unexpected key '%s' (strict mode)" % (self, shorts(k))
627
            raise CanonifyException(m)
628

    
629
        return canonified
630

    
631
    def _random_choice(self, kw):
632
        item = {}
633
        for n, c in self.kw.items():
634
            item[n] = c.random()
635

    
636
        return item
637

    
638

    
639
class Canonifier(object):
640
    def __init__(self, name, input_canonicals, output_canonicals, doc_strings):
641
        self.name = name
642
        self.input_canonicals = dict(input_canonicals)
643
        self.output_canonicals = dict(output_canonicals)
644
        self.doc_strings = dict(doc_strings)
645

    
646
    def call_names(self):
647
        return self.input_canonicals.keys()
648

    
649
    def call_docs(self):
650
        get_input_canonical = self.input_canonical
651
        for call_name, call_doc in self.doc_strings.iteritems():
652
            if not call_doc:
653
                canonical = get_input_canonical(call_name)
654
                call_doc = canonical.tostring(showopts=1, multiline=1)
655
            yield call_name, call_doc
656

    
657
    def get_doc(self, name):
658
        doc_strings = self.doc_strings
659
        if name not in doc_strings:
660
            m = "%s: Invalid method name '%s'" % (self.name, name)
661
            raise CanonifyException(m)
662

    
663
        docstring = doc_strings[name]
664
        if not docstring:
665
            docstring = self.input_canonical(name).tostring()
666
        return docstring
667

    
668
    def call_attrs(self):
669
        for call_name, canonical in self.input_canonicals.iteritems():
670
            yield call_name, canonical.tostring(showopts=1, multiline=1)
671

    
672
    def input_canonical(self, name):
673
        input_canonicals = self.input_canonicals
674
        if name not in input_canonicals:
675
            m = "%s: Invalid input call '%s'" % (self.name, name)
676
            raise CanonifyException(m)
677

    
678
        return input_canonicals[name]
679

    
680
    def canonify_input(self, name, the_input):
681
        return self.input_canonical(name)(the_input)
682

    
683
    def output_canonical(self, name):
684
        output_canonicals = self.output_canonicals
685
        if name not in output_canonicals:
686
            m = "%s: Output canonical '%s' does not exist" % (self.name, name)
687
            raise CanonifyException(m)
688

    
689
        return output_canonicals[name]
690

    
691
    def canonify_output(self, name, the_output):
692
        return self.output_canonical(name)(the_output)
693

    
694
    def show_input_canonical(self, name):
695
        return self.input_canonical(name).show()
696

    
697
    def parse(self, method, arglist):
698
        args, rest = argmap_decode(arglist)
699
        return self.input_canonical(method).check(args)
700

    
701

    
702
class Specificator(object):
703

    
704
    def __new__(cls):
705
        if cls is Specificator:
706
            m = "Specificator classes must be subclassed"
707
            raise SpecifyException(m)
708

    
709
        import inspect
710

    
711
        canonical_inputs = {}
712
        canonical_outputs = {}
713
        doc_strings = {}
714

    
715
        for name in dir(cls):
716
            f = getattr(cls, name)
717
            if not inspect.ismethod(f) or f.__name__.startswith('_'):
718
                continue
719

    
720
            doc_strings[name] = f.__doc__
721
            argspec = inspect.getargspec(f)
722
            defaults = argspec.defaults
723
            args = argspec.args
724
            if args and args[0] == 'self':
725
                args = args[1:]
726

    
727
            if not defaults:
728
                defaults = ()
729

    
730
            arglen = len(args)
731
            deflen = len(defaults)
732

    
733
            if arglen != deflen:
734
                a = (f.__name__, args[:arglen-deflen])
735
                m = "Unspecified arguments in '%s': %s" % a
736
                raise SpecifyException(m)
737

    
738
            args = zip(args, defaults)
739
            for a, c in args:
740
                if not isinstance(c, Canonical):
741
                    m = ("argument '%s=%s' is not an instance of 'Canonical'"
742
                         % (a, repr(c)))
743
                    raise SpecifyException(m)
744

    
745
            canonical = Null() if len(args) == 0 else Args(*args)
746
            canonical_inputs[name] = canonical
747

    
748
            self = object.__new__(cls)
749
            canonical = f(self)
750
            if not isinstance(canonical, Canonical):
751
                m = ("method '%s' does not return a Canonical, but a(n) %s "
752
                                                    % (name, type(canonical)))
753
                raise SpecifyException(m)
754
            canonical_outputs[name] = canonical
755

    
756
        return Canonifier(cls.__name__, canonical_inputs, canonical_outputs,
757
                          doc_strings)
758

    
759
    def __call__(self):
760
        return self
761