Source code for pyzor.message

"""This modules contains the various messages used in the pyzor client server
communication.
"""

import random
import email.message

import pyzor


[docs] class Message(email.message.Message): def __init__(self): email.message.Message.__init__(self) self.setup()
[docs] def setup(self): pass
[docs] def init_for_sending(self): self.ensure_complete()
def __str__(self): # The parent class adds the unix From header. return self.as_string()
[docs] def ensure_complete(self): pass
[docs] class ThreadedMessage(Message):
[docs] def init_for_sending(self): if 'Thread' not in self: self.set_thread(ThreadId.generate()) assert 'Thread' in self self["PV"] = str(pyzor.proto_version) Message.init_for_sending(self)
[docs] def ensure_complete(self): if 'PV' not in self or 'Thread' not in self: raise pyzor.IncompleteMessageError("Doesn't have fields for a " "ThreadedMessage.") Message.ensure_complete(self)
[docs] def get_protocol_version(self): return float(self['PV'])
[docs] def get_thread(self): return ThreadId(self['Thread'])
[docs] def set_thread(self, i): self['Thread'] = str(i)
[docs] class Response(ThreadedMessage): ok_code = 200
[docs] def ensure_complete(self): if 'Code' not in self or 'Diag' not in self: raise pyzor.IncompleteMessageError("doesn't have fields for a " "Response") ThreadedMessage.ensure_complete(self)
[docs] def is_ok(self): return self.get_code() == self.ok_code
[docs] def get_code(self): return int(self['Code'])
[docs] def get_diag(self): return self['Diag']
[docs] def head_tuple(self): return self.get_code(), self.get_diag()
[docs] class Request(ThreadedMessage): """This is the class that should be used to read in Requests of any type. Subclasses are responsible for setting 'Op' if they are generating a message,"""
[docs] def get_op(self): return self['Op']
[docs] def ensure_complete(self): if 'Op' not in self: raise pyzor.IncompleteMessageError("doesn't have fields for a " "Request") ThreadedMessage.ensure_complete(self)
[docs] class ClientSideRequest(Request): op = None
[docs] def setup(self): Request.setup(self) self["Op"] = self.op
[docs] class SimpleDigestBasedRequest(ClientSideRequest): def __init__(self, digest=None): ClientSideRequest.__init__(self) self.digest_count = 0 if digest: self.add_digest(digest)
[docs] def add_digest(self, digest): self.add_header("Op-Digest", digest) self.digest_count += 1
[docs] class SimpleDigestSpecBasedRequest(SimpleDigestBasedRequest): def __init__(self, digest=None, spec=None): SimpleDigestBasedRequest.__init__(self, digest) if spec: flat_spec = [item for sublist in spec for item in sublist] self["Op-Spec"] = ",".join(str(part) for part in flat_spec)
[docs] class PingRequest(ClientSideRequest): op = "ping"
[docs] class PongRequest(SimpleDigestBasedRequest): op = "pong"
[docs] class CheckRequest(SimpleDigestBasedRequest): op = "check"
[docs] class InfoRequest(SimpleDigestBasedRequest): op = "info"
[docs] class ReportRequest(SimpleDigestSpecBasedRequest): op = "report"
[docs] class WhitelistRequest(SimpleDigestSpecBasedRequest): op = "whitelist"
[docs] class ThreadId(int): # (0, 1024) is reserved full_range = (0, 2 ** 16) ok_range = (1024, full_range[1]) error_value = 0 def __new__(cls, i): self = int.__new__(cls, i) if not (cls.full_range[0] <= self < cls.full_range[1]): raise ValueError("value outside of range") return self
[docs] @classmethod def generate(cls): return cls(random.randrange(*cls.ok_range))
[docs] def in_ok_range(self): return self.ok_range[0] <= self < self.ok_range[1]