diff --git c/twisted/positioning/__init__.py w/twisted/positioning/__init__.py new file mode 100644 index 0000000..3454a64 --- /dev/null +++ w/twisted/positioning/__init__.py @@ -0,0 +1,7 @@ +# Copyright (c) 2009-2011 Twisted Matrix Laboratories. +# See LICENSE for details. +""" +The Twisted positioning framework. + +@since: 11.1 +""" diff --git c/twisted/positioning/base.py w/twisted/positioning/base.py new file mode 100644 index 0000000..1e707fb --- /dev/null +++ w/twisted/positioning/base.py @@ -0,0 +1,1083 @@ +# -*- test-case-name: twisted.positioning.test.test_base,twisted.positioning.test.test_sentence -*- +# Copyright (c) 2009-2011 Twisted Matrix Laboratories. +# See LICENSE for details. +""" +Generic positioning base classes. + +@since: 11.1 +""" +from zope.interface import implements +from twisted.python.util import FancyEqMixin + +from twisted.positioning import ipositioning + +MPS_PER_KNOT = 0.5144444444444444 +MPS_PER_KPH = 0.27777777777777777 +METERS_PER_FOOT = 0.3048 + +LATITUDE, LONGITUDE, HEADING, VARIATION = range(4) +NORTH, EAST, SOUTH, WEST = range(4) + + + +class BasePositioningReceiver(object): + """ + A base positioning receiver. + + This class would be a good base class for building positioning + receivers. It implements the interface (so you don't have to) with stub + methods. + + People who want to implement positioning receivers should subclass this + class and override the specific callbacks they want to handle. + """ + implements(ipositioning.IPositioningReceiver) + + def timeReceived(self, time): + """ + Implements L{IPositioningReceiver.timeReceived} stub. + """ + + + def headingReceived(self, heading): + """ + Implements L{IPositioningReceiver.headingReceived} stub. + """ + + + def speedReceived(self, speed): + """ + Implements L{IPositioningReceiver.speedReceived} stub. + """ + + + def climbReceived(self, climb): + """ + Implements L{IPositioningReceiver.climbReceived} stub. + """ + + + def positionReceived(self, latitude, longitude): + """ + Implements L{IPositioningReceiver.positionReceived} stub. + """ + + + def positionErrorReceived(self, positionError): + """ + Implements L{IPositioningReceiver.positioningErrorReceived} stub. + """ + + + def altitudeReceived(self, altitude): + """ + Implements L{IPositioningReceiver.altitudeReceived} stub. + """ + + + def beaconInformationReceived(self, beaconInformation): + """ + Implements L{IPositioningReceiver.beaconInformationReceived} stub. + """ + + + +class InvalidSentence(Exception): + """ + An exception raised when a sentence is invalid. + """ + + + +class InvalidChecksum(Exception): + """ + An exception raised when the checksum of a sentence is invalid. + """ + + +class BaseSentence(object): + """ + A base sentence class for a particular protocol. + + Using this base class, specific sentence classes can almost automatically + be created for a particular protocol (except for the documentation of + course) if that protocol implements the L{IPositioningSentenceProducer} + interface. To do this, fill the ALLOWED_ATTRIBUTES class attribute using + the C{getSentenceAttributes} class method of the producer:: + + class FooSentence(BaseSentence): + \"\"\" + A sentence for integalactic transmodulator sentences. + + @ivar transmogrificationConstant: The value used in the + transmogrifier while producing this sentence, corrected for + gravitational fields. + @type transmogrificationConstant: C{Tummy} + \"\"\" + ALLOWED_ATTRIBUTES = FooProtocol.getSentenceAttributes() + + @ivar presentAttribues: An iterable containing the names of the + attributes that are present in this sentence. + @type presentAttributes: iterable of C{str} + + @cvar ALLOWED_ATTRIBUTES: A set of attributes that are allowed in this + sentence. + @type ALLOWED_ATTRIBUTES: C{set} of C{str} + """ + ALLOWED_ATTRIBUTES = set() + + + def __init__(self, sentenceData): + """ + Initializes a sentence with parsed sentence data. + + @param sentenceData: The parsed sentence data. + @type sentenceData: C{dict} (C{str} -> C{str} or C{NoneType}) + """ + self._sentenceData = sentenceData + + + presentAttributes = property(lambda self: iter(self._sentenceData)) + + + def __getattr__(self, name): + """ + Gets an attribute of this sentence. + """ + if name in self.ALLOWED_ATTRIBUTES: + return self._sentenceData.get(name, None) + else: + className = self.__class__.__name__ + msg = "%s sentences have no %s attributes" % (className, name) + raise AttributeError(msg) + + + def __repr__(self): + """ + Returns a textual representation of this sentence. + + @return: A textual representation of this sentence. + @rtype: C{str} + """ + items = self._sentenceData.items() + data = ["%s: %s" % (k, v) for k, v in sorted(items) if k != "type"] + dataRepr = ", ".join(data) + + typeRepr = self._sentenceData.get("type") or "unknown type" + className = self.__class__.__name__ + + return "<%s (%s) {%s}>" % (className, typeRepr, dataRepr) + + + +class PositioningSentenceProducerMixin(object): + """ + A mixin for certain protocols that produce positioning sentences. + + This mixin helps protocols that have C{SENTENCE_CONTENTS} class variables + (such as the C{NMEAProtocol} and the C{ClassicGPSDProtocol}) implement the + L{IPositioningSentenceProducingProtocol} interface. + """ + #@classmethod + def getSentenceAttributes(cls): + """ + Returns a set of all attributes that might be found in the sentences + produced by this protocol. + + This is basically a set of all the attributes of all the sentences that + this protocol can produce. + + @return: The set of all possible sentence attribute names. + @rtype: C{set} of C{str} + """ + attributes = set(["type"]) + for attributeList in cls.SENTENCE_CONTENTS.values(): + for attribute in attributeList: + if attribute is None: + continue + attributes.add(attribute) + + return attributes + + + getSentenceAttributes = classmethod(getSentenceAttributes) + + + +class Angle(object, FancyEqMixin): + """ + An object representing an angle. + + @ivar inDecimalDegrees: The value of this angle, expressed in decimal + degrees. C{None} if unknown. This attribute is read-only. + @type inDecimalDegrees: C{float} (or C{NoneType}) + @ivar inDegreesMinutesSeconds: The value of this angle, expressed in + degrees, minutes and seconds. C{None} if unknown. This attribute is + read-only. + @type inDegreesMinutesSeconds: 3-C{tuple} of C{int} (or C{NoneType}) + + @cvar RANGE_EXPRESSIONS: A collections of expressions for the allowable + range for the angular value of a particular coordinate value. + @type RANGE_EXPRESSIONS: A mapping of coordinate types (C{LATITUDE}, + C{LONGITUDE}, C{HEADING}, C{VARIATION}) to 1-argument callables. + """ + RANGE_EXPRESSIONS = { + LATITUDE: lambda latitude: -90.0 < latitude < 90.0, + LONGITUDE: lambda longitude: -180.0 < longitude < 180.0, + HEADING: lambda heading: 0 <= heading < 360, + VARIATION: lambda variation: -180 < variation <= 180, + } + + + ANGLE_TYPE_NAMES = { + LATITUDE: "latitude", + LONGITUDE: "longitude", + VARIATION: "variation", + HEADING: "heading", + } + + + compareAttributes = 'angleType', 'inDecimalDegrees' + + + def __init__(self, angle=None, angleType=None): + """ + Initializes an angle. + + @param angle: The value of the angle in decimal degrees. (C{None} if + unknown). + @type angle: C{float} or C{NoneType} + @param angleType: A symbolic constant describing the angle type. Should + be one of LATITUDE, LONGITUDE, HEADING, VARIATION. C{None} if + unknown. + + @raises ValueError: If the angle type is not the default argument, but it + is an unknown type (it's not present in C{Angle.RANGE_EXPRESSIONS}), + or it is a known type but the supplied value was out of the allowable + range for said type. + """ + if angle is not None and angleType is not None: + if angleType not in self.RANGE_EXPRESSIONS: + raise ValueError("Unknown angle type") + elif not self.RANGE_EXPRESSIONS[angleType](angle): + raise ValueError("Angle %s not in allowed range for type %s" + % (angle, self.ANGLE_TYPE_NAMES[angleType])) + + self.angleType = angleType + self._angle = angle + + + inDecimalDegrees = property(lambda self: self._angle) + + + def _getDMS(self): + """ + Gets the value of this angle as a degrees, minutes, seconds tuple. + + @return: This angle expressed in degrees, minutes, seconds. C{None} if + the angle is unknown. + @rtype: 3-C{tuple} of C{int} (or C{NoneType}) + """ + if self._angle is None: + return None + + degrees = abs(int(self._angle)) + fractionalDegrees = abs(self._angle - int(self._angle)) + decimalMinutes = 60 * fractionalDegrees + + minutes = int(decimalMinutes) + fractionalMinutes = decimalMinutes - int(decimalMinutes) + decimalSeconds = 60 * fractionalMinutes + + return degrees, minutes, int(decimalSeconds) + + + inDegreesMinutesSeconds = property(_getDMS) + + + def setSign(self, sign): + """ + Sets the sign of this angle. + + @param sign: The new sign. C{1} for positive and C{-1} for negative + signs, respectively. + @type sign: C{int} + + @raise ValueError: If the C{sign} parameter is not C{-1} or C{1}. + """ + if sign not in (-1, 1): + raise ValueError("bad sign (got %s, expected -1 or 1)" % sign) + + self._angle = sign * abs(self._angle) + + + def __float__(self): + """ + Returns this angle as a float. + + @return: The float value of this angle, expressed in degrees. + @rtype: C{float} + """ + return self._angle + + + def __repr__(self): + """ + Returns a string representation of this angle. + + @return: The string representation. + @rtype: C{str} + """ + return "<%s (%s)>" % (self._angleTypeNameRepr, self._angleValueRepr) + + + def _getAngleValueRepr(self): + """ + Returns a string representation of the angular value of this angle. + + This is a helper function for the actual C{__repr__}. + + @return: The string representation. + @rtype: C{str} + """ + if self.inDecimalDegrees is not None: + return "%s degrees" % round(self.inDecimalDegrees, 2) + else: + return "unknown value" + + + _angleValueRepr = property(_getAngleValueRepr) + + + def _getAngleTypeNameRepr(self): + """ + Returns a string representation of the type of this angle. + + This is a helper function for the actual C{__repr__}. + + @return: The string representation. + @rtype: C{str} + """ + angleTypeName = self.ANGLE_TYPE_NAMES.get( + self.angleType, "angle of unknown type").capitalize() + return angleTypeName + + + _angleTypeNameRepr = property(_getAngleTypeNameRepr) + + + +class Heading(Angle): + """ + The heading of a mobile object. + + @ivar variation: The (optional) variation. + The sign of the variation is positive for variations towards the east + (clockwise from north), and negative for variations towards the west + (counterclockwise from north). + If the variation is unknown or not applicable, this is C{None}. + @type variation: C{Angle} or C{NoneType}. + @ivar correctedHeading: The heading, corrected for variation. If the + variation is unknown (C{None}), is None. This attribute is read-only (its + value is determined by the angle and variation attributes). The value is + coerced to being between 0 (inclusive) and 360 (exclusive). + """ + def __init__(self, angle=None, variation=None): + """ + Initializes a angle with an optional variation. + """ + Angle.__init__(self, angle, HEADING) + self.variation = variation + + + #@classmethod + def fromFloats(cls, angleValue=None, variationValue=None): + """ + Constructs a Heading from the float values of the angle and variation. + + @param angleValue: The angle value of this heading. + @type angleValue: C{float} + @param variationValue: The value of the variation of this heading. + @type variationValue: C{float} + """ + variation = Angle(variationValue, VARIATION) + return cls(angleValue, variation) + + + fromFloats = classmethod(fromFloats) + + + def _getCorrectedHeading(self): + """ + Corrects the heading by the given variation. This is sometimes known as + the true heading. + + @return: The heading, corrected by the variation. If the variation or + the angle are unknown, returns C{None}. + @rtype: C{float} or C{NoneType} + """ + if self._angle is None or self.variation is None: + return None + + angle = (self.inDecimalDegrees - self.variation.inDecimalDegrees) % 360 + return Angle(angle, HEADING) + + + correctedHeading = property(_getCorrectedHeading) + + + def setSign(self, sign): + """ + Sets the sign of the variation of this heading. + + @param sign: The new sign. C{1} for positive and C{-1} for negative + signs, respectively. + @type sign: C{int} + + @raise ValueErorr: If the C{sign} parameter is not C{-1} or C{1}. + """ + if self.variation.inDecimalDegrees is None: + raise ValueError("can't set the sign of an unknown variation") + + self.variation.setSign(sign) + + + compareAttributes = list(Angle.compareAttributes) + ["variation"] + + + def __repr__(self): + """ + Returns a string representation of this angle. + + @return: The string representation. + @rtype: C{str} + """ + if self.variation is None: + variationRepr = "unknown variation" + else: + variationRepr = repr(self.variation) + + return "<%s (%s, %s)>" % ( + self._angleTypeNameRepr, self._angleValueRepr, variationRepr) + + + +class Coordinate(Angle, FancyEqMixin): + """ + A coordinate. + + @ivar angle: The value of the coordinate in decimal degrees, with the usual + rules for sign (northern and eastern hemispheres are positive, southern + and western hemispheres are negative). + @type angle: C{float} + """ + def __init__(self, angle, coordinateType=None): + """ + Initializes a coordinate. + + @param angle: The angle of this coordinate in decimal degrees. The + hemisphere is determined by the sign (north and east are positive). + If this coordinate describes a latitude, this value must be within + -90.0 and +90.0 (exclusive). If this value describes a longitude, + this value must be within -180.0 and +180.0 (exclusive). + @type angle: C{float} + @param coordinateType: One of L{LATITUDE}, L{LONGITUDE}. Used to return + hemisphere names. + """ + Angle.__init__(self, angle, coordinateType) + + + HEMISPHERES_BY_TYPE_AND_SIGN = { + LATITUDE: [ + NORTH, # positive + SOUTH, # negative + ], + + LONGITUDE: [ + EAST, # positve + WEST, # negative + ] + } + + + def _getHemisphere(self): + """ + Gets the hemisphere of this coordinate. + + @return: A symbolic constant representing a hemisphere (C{NORTH}, + C{EAST}, C{SOUTH} or C{WEST}). + """ + try: + sign = int(self.inDecimalDegrees < 0) + return self.HEMISPHERES_BY_TYPE_AND_SIGN[self.angleType][sign] + except KeyError: + raise ValueError("unknown coordinate type (cant find hemisphere)") + + + hemisphere = property(fget=_getHemisphere) + + + +class Altitude(object, FancyEqMixin): + """ + An altitude. + + @ivar inMeters: The altitude represented by this object, in meters. This + attribute is read-only. + @type inMeters: C{float} + + @ivar inFeet: As above, but expressed in feet. + @type inFeet: C{float} + """ + compareAttributes = 'inMeters', + + def __init__(self, altitude): + """ + Initializes an altitude. + + @param altitude: The altitude in meters. + @type altitude: C{float} + """ + self._altitude = altitude + + + def _getAltitudeInFeet(self): + """ + Gets the altitude this object represents, in feet. + + @return: The altitude, expressed in feet. + @rtype: C{float} + """ + return self._altitude / METERS_PER_FOOT + + + inFeet = property(_getAltitudeInFeet) + + + def _getAltitudeInMeters(self): + """ + Returns the altitude this object represents, in meters. + + @return: The altitude, expressed in feet. + @rtype: C{float} + """ + return self._altitude + + + inMeters = property(_getAltitudeInMeters) + + + def __float__(self): + """ + Returns the altitude represented by this object expressed in meters. + + @return: The altitude represented by this object, expressed in meters. + @rtype: C{float} + """ + return self._altitude + + + def __repr__(self): + """ + Returns a string representation of this altitude. + + @return: The string representation. + @rtype: C{str} + """ + return "" % (self._altitude,) + + + +class _BaseSpeed(object, FancyEqMixin): + """ + An object representing the abstract concept of the speed (rate of + movement) of a mobile object. + + This primarily has behavior for converting between units and comparison. + + @ivar inMetersPerSecond: The speed that this object represents, expressed + in meters per second. This attribute is immutable. + @type inMetersPerSecond: C{float} + + @ivar inKnots: Same as above, but expressed in knots. + @type inKnots: C{float} + """ + compareAttributes = 'inMetersPerSecond', + + def __init__(self, speed): + """ + Initializes a speed. + + @param speed: The speed that this object represents, expressed in + meters per second. + @type speed: C{float} + + @raises ValueError: Raised if value was invalid for this particular + kind of speed. Only happens in subclasses. + """ + self._speed = speed + + + def _getSpeedInKnots(self): + """ + Returns the speed represented by this object, expressed in knots. + + @return: The speed this object represents, in knots. + @rtype: C{float} + """ + return self._speed / MPS_PER_KNOT + + + inKnots = property(_getSpeedInKnots) + + + inMetersPerSecond = property(lambda self: self._speed) + + + def __float__(self): + """ + Returns the speed represented by this object expressed in meters per + second. + + @return: The speed represented by this object, expressed in meters per + second. + @rtype: C{float} + """ + return self._speed + + + def __repr__(self): + """ + Returns a string representation of this speed object. + + @return: The string representation. + @rtype: C{str} + """ + speedValue = round(self.inMetersPerSecond, 2) + return "<%s (%s m/s)>" % (self.__class__.__name__, speedValue) + + + +class Speed(_BaseSpeed): + """ + The speed (rate of movement) of a mobile object. + """ + def __init__(self, speed): + """ + Initializes a L{Speed} object. + + @param speed: The speed that this object represents, expressed in + meters per second. + @type speed: C{float} + + @raises ValueError: Raised if C{speed} is negative. + """ + if speed < 0: + raise ValueError("negative speed: %r" % (speed,)) + + _BaseSpeed.__init__(self, speed) + + + +class Climb(_BaseSpeed): + """ + The climb ("vertical speed") of an object. + """ + def __init__(self, climb): + """ + Initializes a L{Clib} object. + + @param climb: The climb that this object represents, expressed in + meters per second. + @type climb: C{float} + + @raises ValueError: Raised if the provided climb was less than zero. + """ + _BaseSpeed.__init__(self, climb) + + + +class PositionError(object, FancyEqMixin): + """ + Position error information. + + @ivar pdop: The position dilution of precision. C{None} if unknown. + @type pdop: C{float} or C{NoneType} + @ivar hdop: The horizontal dilution of precision. C{None} if unknown. + @type hdop: C{float} or C{NoneType} + @ivar vdop: The vertical dilution of precision. C{None} if unknown. + @type vdop: C{float} or C{NoneType} + """ + compareAttributes = 'pdop', 'hdop', 'vdop' + + def __init__(self, pdop=None, hdop=None, vdop=None, testInvariant=False): + """ + Initializes a positioning error object. + + @param pdop: The position dilution of precision. C{None} if unknown. + @type pdop: C{float} or C{NoneType} + @param hdop: The horizontal dilution of precision. C{None} if unknown. + @type hdop: C{float} or C{NoneType} + @param vdop: The vertical dilution of precision. C{None} if unknown. + @type vdop: C{float} or C{NoneType} + @param testInvariant: Flag to test if the DOP invariant is valid or + not. If C{True}, the invariant (PDOP = (HDOP**2 + VDOP**2)*.5) is + checked at every mutation. By default, this is false, because the + vast majority of DOP-providing devices ignore this invariant. + @type testInvariant: c{bool} + """ + self._pdop = pdop + self._hdop = hdop + self._vdop = vdop + + self._testInvariant = testInvariant + self._testDilutionOfPositionInvariant() + + + ALLOWABLE_TRESHOLD = 0.01 + + + def _testDilutionOfPositionInvariant(self): + """ + Tests if this positioning error object satisfies the dilution of + position invariant (PDOP = (HDOP**2 + VDOP**2)*.5), unless the + C{self._testInvariant} instance variable is C{False}. + + @return: C{None} if the invariant was not satisifed or not tested. + @raises ValueError: Raised if the invariant was tested but not + satisfied. + """ + if not self._testInvariant: + return + + for x in (self.pdop, self.hdop, self.vdop): + if x is None: + return + + delta = abs(self.pdop - (self.hdop**2 + self.vdop**2)**.5) + if delta > self.ALLOWABLE_TRESHOLD: + raise ValueError("invalid combination of dilutions of precision: " + "position: %s, horizontal: %s, vertical: %s" + % (self.pdop, self.hdop, self.vdop)) + + + DOP_EXPRESSIONS = { + 'pdop': [ + lambda self: float(self._pdop), + lambda self: (self._hdop**2 + self._vdop**2)**.5, + ], + + 'hdop': [ + lambda self: float(self._hdop), + lambda self: (self._pdop**2 - self._vdop**2)**.5, + ], + + 'vdop': [ + lambda self: float(self._vdop), + lambda self: (self._pdop**2 - self._hdop**2)**.5, + ], + } + + + def _getDOP(self, dopType): + """ + Gets a particular dilution of position value. + + @return: The DOP if it is known, C{None} otherwise. + @rtype: C{float} or C{NoneType} + """ + for dopExpression in self.DOP_EXPRESSIONS[dopType]: + try: + return dopExpression(self) + except TypeError: + continue + + + def _setDOP(self, dopType, value): + """ + Sets a particular dilution of position value. + + @param dopType: The type of dilution of position to set. One of + ('pdop', 'hdop', 'vdop'). + @type dopType: C{str} + + @param value: The value to set the dilution of position type to. + @type value: C{float} + + If this position error tests dilution of precision invariants, + it will be checked. If the invariant is not satisfied, the + assignment will be undone and C{ValueError} is raised. + """ + attributeName = "_" + dopType + + oldValue = getattr(self, attributeName) + setattr(self, attributeName, float(value)) + + try: + self._testDilutionOfPositionInvariant() + except ValueError: + setattr(self, attributeName, oldValue) + raise + + + pdop = property(fget=lambda self: self._getDOP('pdop'), + fset=lambda self, value: self._setDOP('pdop', value)) + + + hdop = property(fget=lambda self: self._getDOP('hdop'), + fset=lambda self, value: self._setDOP('hdop', value)) + + + vdop = property(fget=lambda self: self._getDOP('vdop'), + fset=lambda self, value: self._setDOP('vdop', value)) + + + _REPR_TEMPLATE = "" + + + def __repr__(self): + """ + Returns a string representation of positioning information object. + + @return: The string representation. + @rtype: C{str} + """ + return self._REPR_TEMPLATE % (self.pdop, self.hdop, self.vdop) + + + +class BeaconInformation(object): + """ + Information about positioning beacons (a generalized term for the reference + objects that help you determine your position, such as satellites or cell + towers). + + @ivar beacons: A set of visible beacons. Note that visible beacons are not + necessarily used in acquiring a postioning fix. + @type beacons: C{set} of L{IPositioningBeacon} + + @ivar usedBeacons: An iterable of the beacons that were used in obtaining a + positioning fix. This only contains beacons that are actually used, not + beacons of which it is unknown if they are used or not. This attribute + is immutable. + @type usedBeacons: iterable of L{IPositioningBeacon} + + @ivar seen: The amount of beacons that can be seen. This attribute is + immutable. + @type seen: C{int} + @ivar used: The amount of beacons that were used in obtaining the + positioning fix. This attribute is immutable. + @type used: C{int} + """ + def __init__(self, beacons=None): + """ + Initializes a beacon information object. + + @param beacons: A collection of beacons that will be present in this + beacon information object. + @type beacons: iterable of L{IPositioningBeacon} or C{Nonetype} + """ + self.beacons = set(beacons or []) + + + def _getUsedBeacons(self): + """ + Returns a generator of used beacons. + + @return: A generator containing all of the used positioning beacons. This + only contains beacons that are actually used, not beacons of which it + is unknown if they are used or not. + @rtype: iterable of L{PositioningBeacon} + """ + for beacon in self.beacons: + if beacon.isUsed: + yield beacon + + + usedBeacons = property(fget=_getUsedBeacons) + + + def _getNumberOfBeaconsSeen(self): + """ + Returns the number of beacons that can be seen. + + @return: The number of beacons that can be seen. + @rtype: C{int} + """ + return len(self.beacons) + + + seen = property(_getNumberOfBeaconsSeen) + + + def _getNumberOfBeaconsUsed(self): + """ + Returns the number of beacons that can be seen. + + @return: The number of beacons that can be seen, or C{None} if the number + is unknown. This happens as soon as one of the beacons has an unknown + (C{None}) C{isUsed} attribute. + @rtype: C{int} or C{NoneType} + """ + numberOfUsedBeacons = 0 + for beacon in self.beacons: + if beacon.isUsed is None: + return None + elif beacon.isUsed: + numberOfUsedBeacons += 1 + return numberOfUsedBeacons + + + used = property(_getNumberOfBeaconsUsed) + + + def __iter__(self): + """ + Yields the beacons in this beacon information object. + + @return: A generator producing the beacons in this beacon information + object. + @rtype: iterable of L{PositioningBeacon} + """ + for beacon in self.beacons: + yield beacon + + + def __repr__(self): + """ + Returns a string representation of this beacon information object. + + The beacons are sorted by their identifier. + + @return: The string representation. + @rtype: C{str} + """ + beaconReprs = ", ".join([repr(beacon) for beacon in + sorted(self.beacons, key=lambda x: x.identifier)]) + + if self.used is not None: + used = str(self.used) + else: + used = "?" + + return "" % ( + self.seen, used, beaconReprs) + + + +class PositioningBeacon(object): + """ + A positioning beacon. + + @ivar identifier: The unqiue identifier for this satellite. This is usually + an integer. For GPS, this is also known as the PRN. + @type identifier: Pretty much anything that can be used as a unique + identifier. Depends on the implementation. + @ivar isUsed: C{True} if the satellite is currently being used to obtain a + fix, C{False} if it is not currently being used, C{None} if unknown. + @type isUsed: C{bool} or C{NoneType} + """ + def __init__(self, identifier, isUsed=None): + """ + Initializes a positioning beacon. + + @param identifier: The identifier for this beacon. + @type identifier: Can be pretty much anything (see ivar documentation). + @param isUsed: Determines if this beacon is used in obtaining a + positioning fix (see the ivar documentation). + @type isUsed: C{bool} or C{NoneType} + """ + self.identifier = identifier + self.isUsed = isUsed + + + def __hash__(self): + """ + Returns the hash of the identifier for this beacon. + + @return: The hash of the identifier. (C{hash(self.identifier)}) + @rtype: C{int} + """ + return hash(self.identifier) + + + def _usedRepr(self): + """ + Returns a single character representation of the status of this + satellite in terms of being used for attaining a positioning fix. + + @return: One of ("Y", "N", "?") depending on the status of the + satellite. + @rtype: C{str} + """ + return {True: "Y", False: "N", None: "?"}[self.isUsed] + + + def __repr__(self): + """ + Returns a string representation of this beacon. + + @return: The string representation. + @rtype: C{str} + """ + return "" \ + % (self.identifier, self._usedRepr()) + + + +class Satellite(PositioningBeacon): + """ + A satellite. + + @ivar azimuth: The azimuth of the satellite. This is the heading (positive + angle relative to true north) where the satellite appears to be to the + device. + @ivar elevation: The (positive) angle above the horizon where this + satellite appears to be to the device. + @ivar signalToNoiseRatio: The signal to noise ratio of the signal coming + from this satellite. + """ + def __init__(self, + identifier, + azimuth=None, + elevation=None, + signalToNoiseRatio=None, + isUsed=None): + """ + Initializes a satellite object. + + @param identifier: The PRN (unique identifier) of this satellite. + @type identifier: C{int} + @param azimuth: The azimuth of the satellite (see instance variable + documentation). + @type azimuth: C{float} + @param elevation: The elevation of the satellite (see instance variable + documentation). + @type elevation: C{float} + @param signalToNoiseRatio: The signal to noise ratio of the connection + to this satellite (see instance variable documentation). + @type signalToNoiseRatio: C{float} + + """ + super(Satellite, self).__init__(int(identifier), isUsed) + + self.azimuth = azimuth + self.elevation = elevation + self.signalToNoiseRatio = signalToNoiseRatio + + + def __repr__(self): + """ + Returns a string representation of this Satellite. + + @return: The string representation. + @rtype: C{str} + """ + azimuth, elevation, snr = [{None: "?"}.get(x, x) + for x in self.azimuth, self.elevation, self.signalToNoiseRatio] + + properties = "azimuth: %s, elevation: %s, snr: %s" % ( + azimuth, elevation, snr) + + return "" % ( + self.identifier, properties, self._usedRepr()) diff --git c/twisted/positioning/ipositioning.py w/twisted/positioning/ipositioning.py new file mode 100644 index 0000000..403d738 --- /dev/null +++ w/twisted/positioning/ipositioning.py @@ -0,0 +1,117 @@ +# Copyright (c) 2009-2011 Twisted Matrix Laboratories. +# See LICENSE for details. +""" +Positioning interfaces. + +@since: 11.1 +""" +from zope.interface import Interface + + +class IPositioningReceiver(Interface): + """ + An interface for positioning providers. + """ + def positionReceived(latitude, longitude): + """ + Method called when a position is received. + + @param latitude: The latitude of the received position. + @type latitude: L{twisted.positioning.base.Coordinate} + @param longitude: The longitude of the received position. + @type longitude: L{twisted.positioning.base.Coordinate} + """ + + + def positionErrorReceived(positionError): + """ + Method called when position error is received. + + @param positioningError: The position error. + @type positioningError: L{twisted.positioning.base.PositionError} + """ + + def timeReceived(time): + """ + Method called when time and date information arrives. + + @param time: The date and time (expressed in UTC unless otherwise + specified). + @type time: L{datetime.datetime} + """ + + + def headingReceived(heading): + """ + Method called when a true heading is received. + + @param heading: The heading. + @type heading: L{twisted.positioning.base.Heading} + """ + + + def altitudeReceived(altitude): + """ + Method called when an altitude is received. + + @param altitude: The altitude. + @type altitude: L{twisted.positioning.base.Altitude} + """ + + + def speedReceived(speed): + """ + Method called when the speed is received. + + @param speed: The speed of a mobile object. + @type speed: L{twisted.positioning.base.Speed} + """ + + + def climbReceived(climb): + """ + Method called when the climb is received. + + @param climb: The climb of the mobile object. + @type climb: L{twisted.positioning.base.Climb} + """ + + def beaconInformationReceived(beaconInformation): + """ + Method called when positioning beacon information is received. + + @param beaconInformation: The beacon information. + @type beaconInformation: L{twisted.positioning.base.BeaconInformation} + """ + + + +class INMEAReceiver(Interface): + """ + An object that can receive NMEA data. + """ + def sentenceReceived(sentence): + """ + Method called when a sentence is received. + + @param sentence: The received NMEA sentence. + @type L{twisted.positioning.nmea.NMEASentence} + """ + + + +class IPositioningSentenceProducer(Interface): + """ + A protocol that produces positioning sentences. + + Implementing this protocol allows sentence classes to be automagically + generated for a particular protocol. + """ + def getSentenceAttributes(self): + """ + Returns a set of attributes that might be present in a sentence produced + by this sentence producer. + + @return: A set of attributes that might be present in a given sentence. + @rtype: C{set} of C{str} + """ diff --git c/twisted/positioning/nmea.py w/twisted/positioning/nmea.py new file mode 100644 index 0000000..c25aefe --- /dev/null +++ w/twisted/positioning/nmea.py @@ -0,0 +1,860 @@ +# -*- test-case-name: twisted.positioning.test.test_nmea -*- +# Copyright (c) 2009-2011 Twisted Matrix Laboratories. +# See LICENSE for details. +""" +Classes for working with NMEA (and vaguely NMEA-like) sentence producing +devices. + +@since: 11.1 +""" + +import itertools +import operator +import datetime +from zope.interface import implements, classProvides + +from twisted.protocols.basic import LineReceiver +from twisted.positioning import base, ipositioning +from twisted.positioning.base import LATITUDE, LONGITUDE, VARIATION + +# GPGGA fix quality: +(GGA_INVALID_FIX, GGA_GPS_FIX, GGA_DGPS_FIX, GGA_PPS_FIX, GGA_RTK_FIX, + GGA_FLOAT_RTK_FIX, GGA_DEAD_RECKONING, GGA_MANUAL_FIX, GGA_SIMULATED_FIX + ) = [str(x) for x in range(9)] + +# GPGLL/GPRMC fix quality: +DATA_ACTIVE, DATA_VOID = "A", "V" + +# Selection modes (used in a variety of sentences): +MODE_AUTO, MODE_MANUAL = 'A', 'M' + +# GPGSA fix types: +GSA_NO_FIX, GSA_2D_FIX, GSA_3D_FIX = '1', '2', '3' + +NMEA_NORTH, NMEA_EAST, NMEA_SOUTH, NMEA_WEST = "N", "E", "S", "W" + + +def split(sentence): + """ + Returns the split version of an NMEA sentence, minus header + and checksum. + + @param sentence: The NMEA sentence to split. + @type sentence: C{str} + + >>> split("$GPGGA,spam,eggs*00") + ['GPGGA', 'spam', 'eggs'] + """ + if sentence[-3] == "*": # sentence with checksum + return sentence[1:-3].split(',') + elif sentence[-1] == "*": # sentence without checksum + return sentence[1:-1].split(',') + else: + raise base.InvalidSentence("malformed sentence %s" % sentence) + + +def validateChecksum(sentence): + """ + Validates the checksum of an NMEA sentence. + + @param sentence: The NMEA sentence to check the checksum of. + @type sentence: C{str} + + @raise ValueError: If the sentence has an invalid checksum. + + Simply returns on sentences that either don't have a checksum, + or have a valid checksum. + """ + if sentence[-3] == '*': # sentence has a checksum + reference, source = int(sentence[-2:], 16), sentence[1:-3] + computed = reduce(operator.xor, (ord(x) for x in source)) + if computed != reference: + raise base.InvalidChecksum("%02x != %02x" % (computed, reference)) + + + +class NMEAProtocol(LineReceiver, base.PositioningSentenceProducerMixin): + """ + A protocol that parses and verifies the checksum of an NMEA sentence (in + string form, not L{NMEASentence}), and delegates to a receiver. + + It receives lines and verifies these lines are NMEA sentences. If + they are, verifies their checksum and unpacks them into their + components. It then wraps them in L{NMEASentence} objects and + calls the appropriate receiver method with them. + """ + classProvides(ipositioning.IPositioningSentenceProducer) + METHOD_PREFIX = "nmea_" + + def __init__(self, receiver): + """ + Initializes an NMEAProtocol. + + @param receiver: A receiver for NMEAProtocol sentence objects. + @type receiver: L{INMEAReceiver} + """ + self.receiver = receiver + + + def lineReceived(self, rawSentence): + """ + Parses the data from the sentence and validates the checksum. + + @param rawSentence: The MMEA positioning sentence. + @type rawSentence: C{str} + """ + sentence = rawSentence.strip() + + validateChecksum(sentence) + splitSentence = split(sentence) + + sentenceType, contents = splitSentence[0], splitSentence[1:] + + try: + keys = self.SENTENCE_CONTENTS[sentenceType] + except KeyError: + raise ValueError("unknown sentence type %s" % sentenceType) + + sentenceData = {"type": sentenceType} + for key, value in itertools.izip(keys, contents): + if key is not None and value != "": + sentenceData[key] = value + + sentence = NMEASentence(sentenceData) + + try: + callback = getattr(self, self.METHOD_PREFIX + sentenceType) + callback(sentence) + except AttributeError: + pass # No sentence-specific callback on the protocol + + if self.receiver is not None: + self.receiver.sentenceReceived(sentence) + + + SENTENCE_CONTENTS = { + 'GPGGA': [ + 'timestamp', + + 'latitudeFloat', + 'latitudeHemisphere', + 'longitudeFloat', + 'longitudeHemisphere', + + 'fixQuality', + 'numberOfSatellitesSeen', + 'horizontalDilutionOfPrecision', + + 'altitude', + 'altitudeUnits', + 'heightOfGeoidAboveWGS84', + 'heightOfGeoidAboveWGS84Units', + + # The next parts are DGPS information. + ], + + 'GPRMC': [ + 'timestamp', + + 'dataMode', + + 'latitudeFloat', + 'latitudeHemisphere', + 'longitudeFloat', + 'longitudeHemisphere', + + 'speedInKnots', + + 'trueHeading', + + 'datestamp', + + 'magneticVariation', + 'magneticVariationDirection', + ], + + 'GPGSV': [ + 'numberOfGSVSentences', + 'GSVSentenceIndex', + + 'numberOfSatellitesSeen', + + 'satellitePRN_0', + 'elevation_0', + 'azimuth_0', + 'signalToNoiseRatio_0', + + 'satellitePRN_1', + 'elevation_1', + 'azimuth_1', + 'signalToNoiseRatio_1', + + 'satellitePRN_2', + 'elevation_2', + 'azimuth_2', + 'signalToNoiseRatio_2', + + 'satellitePRN_3', + 'elevation_3', + 'azimuth_3', + 'signalToNoiseRatio_3', + ], + + 'GPGLL': [ + 'latitudeFloat', + 'latitudeHemisphere', + 'longitudeFloat', + 'longitudeHemisphere', + 'timestamp', + 'dataMode', + ], + + 'GPHDT': [ + 'trueHeading', + ], + + 'GPTRF': [ + 'datestamp', + 'timestamp', + + 'latitudeFloat', + 'latitudeHemisphere', + 'longitudeFloat', + 'longitudeHemisphere', + + 'elevation', + 'numberOfIterations', # unused + 'numberOfDopplerIntervals', # unused + 'updateDistanceInNauticalMiles', # unused + 'satellitePRN', + ], + + 'GPGSA': [ + 'dataMode', + 'fixType', + + 'usedSatellitePRN_0', + 'usedSatellitePRN_1', + 'usedSatellitePRN_2', + 'usedSatellitePRN_3', + 'usedSatellitePRN_4', + 'usedSatellitePRN_5', + 'usedSatellitePRN_6', + 'usedSatellitePRN_7', + 'usedSatellitePRN_8', + 'usedSatellitePRN_9', + 'usedSatellitePRN_10', + 'usedSatellitePRN_11', + + 'positionDilutionOfPrecision', + 'horizontalDilutionOfPrecision', + 'verticalDilutionOfPrecision', + ] + } + + +class NMEASentence(base.BaseSentence): + """ + An object representing an NMEA sentence. + + The attributes of this objects are raw NMEA protocol data, which + are all ASCII bytestrings. + + This object contains all the raw NMEA protocol data in a single + sentence. Not all of these necessarily have to be present in the + sentence. Missing attributes are None when accessed. + + Sentence-specific junk: + + @ivar type: The sentence type ("GPGGA", "GPGSV"...). + @ivar numberOfGSVSentences: The total number of GSV sentences in a + sequence. + @ivar GSVSentenceIndex: The index of this GSV sentence in the GSV + sequence. + + Time-related attributes: + + @ivar timestamp: A timestamp. ("123456" -> 12:34:56Z) + @ivar datestamp: A datestamp. ("230394" -> 23 Mar 1994) + + Location-related attributes: + + @ivar latitudeFloat: Latitude value. (for example: "1234.567" -> + 12 degrees, 34.567 minutes). + @ivar latitudeHemisphere: Latitudinal hemisphere ("N" or "S"). + @ivar longitudeFloat: Longitude value. See C{latitudeFloat} for an + example. + @ivar longitudeHemisphere: Longitudinal hemisphere ("E" or "W"). + @ivar altitude: The altitude above mean sea level. + @ivar altitudeUnits: Units in which altitude is expressed. (Always + "M" for meters.) + @ivar heightOfGeoidAboveWGS84: The local height of the geoid above + the WGS84 ellipsoid model. + @ivar heightOfGeoidAboveWGS84Units: The units in which the height + above the geoid is expressed. (Always "M" for meters.) + + Attributes related to direction and movement: + + @ivar trueHeading: The true heading. + @ivar magneticVariation: The magnetic variation. + @ivar magneticVariationDirection: The direction of the magnetic + variation. One of C{"E"} or C{"W"}. + @ivar speedInKnots: The ground speed, expressed in knots. + + Attributes related to fix and data quality: + + @ivar fixQuality: The quality of the fix. This is a single digit + from C{"0"} to C{"8"}. The important ones are C{"0"} (invalid + fix), C{"1"} (GPS fix) and C{"2"} (DGPS fix). + @ivar dataMode: Signals if the data is usable or not. One of + L{DATA_ACTIVE} or L{DATA_VOID}. + @ivar numberOfSatellitesSeen: The number of satellites seen by the + receiver. + @ivar numberOfSatellitesUsed: The number of satellites used in + computing the fix. + + Attributes related to precision: + + @ivar horizontalDilutionOfPrecision: The dilution of the precision of the + position on a plane tangential to the geoid. (HDOP) + @ivar verticalDilutionOfPrecision: As C{horizontalDilutionOfPrecision}, + but for a position on a plane perpendicular to the geoid. (VDOP) + @ivar positionDilutionOfPrecision: Euclidian norm of HDOP and VDOP. + + Attributes related to satellite-specific data: + + @ivar C{satellitePRN}: The unique identifcation number of a particular + satelite. Optionally suffixed with C{_N} if multiple satellites are + referenced in a sentence, where C{N in range(4)}. + @ivar C{elevation}: The elevation of a satellite in decimal degrees. + Optionally suffixed with C{_N}, as with C{satellitePRN}. + @ivar C{azimuth}: The azimuth of a satellite in decimal degrees. + Optionally suffixed with C{_N}, as with C{satellitePRN}. + @ivar C{signalToNoiseRatio}: The SNR of a satellite signal, in decibels. + Optionally suffixed with C{_N}, as with C{satellitePRN}. + @ivar C{usedSatellitePRN_N}: Where C{int(N) in range(12)}. The PRN + of a satelite used in computing the fix. + + """ + ALLOWED_ATTRIBUTES = NMEAProtocol.getSentenceAttributes() + + def _isFirstGSVSentence(self): + """ + Tests if this current GSV sentence is the first one in a sequence. + """ + return self.GSVSentenceIndex == "1" + + + def _isLastGSVSentence(self): + """ + Tests if this current GSV sentence is the final one in a sequence. + """ + return self.GSVSentenceIndex == self.numberOfGSVSentences + + + +class NMEAAdapter(object): + """ + An adapter from NMEAProtocol receivers to positioning receivers. + + @cvar DATESTAMP_HANDLING: Determines the way incomplete (two-digit) NMEA + datestamps are handled.. One of L{INTELLIGENT_DATESTAMPS} (default, + assumes dates are twenty-first century if the two-digit date is below + the L{INTELLIGENT_DATE_THRESHOLD}, twentieth century otherwise), + L{DATESTAMPS_FROM_20XX} (assumes all dates are twenty-first century), + L{DATESTAMPS_FROM_19XX} (assumes all dates are twentieth century). + All of these are class attributes of this class. + + @cvar INTELLIGENT_DATE_THRESHOLD: The threshold that determines which + century we guess a year is in. If the year value in a sentence is above + this value, assumes the 20th century (19xx), otherwise assumes the + twenty-first century (20xx). + @type INTELLIGENT_DATE_THRESHOLD: L{int} + """ + implements(ipositioning.INMEAReceiver) + + + def __init__(self, receiver): + """ + Initializes a new NMEA adapter. + + @param receiver: The receiver for positioning sentences. + @type receiver: L{twisted.positioning.IPositioningReceiver} + """ + self._state = {} + self._sentenceData = {} + self._receiver = receiver + + + def _fixTimestamp(self): + """ + Turns the NMEAProtocol timestamp notation into a datetime.time object. + The time in this object is expressed as Zulu time. + """ + timestamp = self.currentSentence.timestamp.split('.')[0] + timeObject = datetime.datetime.strptime(timestamp, '%H%M%S').time() + self._sentenceData['_time'] = timeObject + + + INTELLIGENT_DATESTAMPS = 0 + DATESTAMPS_FROM_20XX = 1 + DATESTAMPS_FROM_19XX = 2 + + DATESTAMP_HANDLING = INTELLIGENT_DATESTAMPS + INTELLIGENT_DATE_THRESHOLD = 80 + + + def _fixDatestamp(self): + """ + Turns an NMEA datestamp format into a C{datetime.date} object. + """ + datestamp = self.currentSentence.datestamp + + day, month, year = [int(ordinalString) for ordinalString in + (datestamp[0:2], datestamp[2:4], datestamp[4:6])] + + if self.DATESTAMP_HANDLING == self.INTELLIGENT_DATESTAMPS: + if year > self.INTELLIGENT_DATE_THRESHOLD: + year = int('19%02d' % year) + else: + year = int('20%02d' % year) + + elif self.DATESTAMP_HANDLING == self.DATESTAMPS_FROM_20XX: + year = int('20%02d' % year) + + elif self.DATESTAMP_HANDLING == self.DATESTAMPS_FROM_19XX: + year = int('19%02d' % year) + + else: + raise ValueError("unknown datestamp handling method (%s)" + % (self.DATESTAMP_HANDLING,)) + + self._sentenceData['_date'] = datetime.date(year, month, day) + + + def _fixCoordinateFloat(self, coordinateType): + """ + Turns the NMEAProtocol coordinate format into Python float. + + @param coordinateType: The coordinate type. Should be L{base.LATITUDE} + or L{base.LONGITUDE}. + """ + coordinateName = base.Coordinate.ANGLE_TYPE_NAMES[coordinateType] + key = coordinateName + 'Float' + nmeaCoordinate = getattr(self.currentSentence, key) + + left, right = nmeaCoordinate.split('.') + + degrees, minutes = int(left[:-2]), float("%s.%s" % (left[-2:], right)) + angle = degrees + minutes/60 + coordinate = base.Coordinate(angle, coordinateType) + self._sentenceData[coordinateName] = coordinate + + + def _fixHemisphereSign(self, coordinateType, sentenceDataKey=None): + """ + Fixes the sign for a hemisphere. + + This method must be called after the magnitude for the thing it + determines the sign of has been set. This is done by the following + functions: + + - C{self.FIXERS['magneticVariation']} + - C{self.FIXERS['latitudeFloat']} + - C{self.FIXERS['longitudeFloat']} + + @param coordinateType: Coordinate type. One of L{base.LATITUDE}, + L{base.LONGITUDE} or L{base.VARIATION}. + """ + sentenceDataKey = sentenceDataKey or coordinateType + sign = self._getHemisphereSign(coordinateType) + self._sentenceData[sentenceDataKey].setSign(sign) + + + COORDINATE_SIGNS = { + NMEA_NORTH: 1, + NMEA_EAST: 1, + NMEA_SOUTH: -1, + NMEA_WEST: -1 + } + + + def _getHemisphereSign(self, coordinateType): + """ + Returns the hemisphere sign for a given coordinate type. + + @param coordinateType: Coordinate type. One of L{base.LATITUDE}, + L{base.LONGITUDE} or L{base.VARIATION}. + """ + if coordinateType in (LATITUDE, LONGITUDE): + hemisphereKey = (base.Coordinate.ANGLE_TYPE_NAMES[coordinateType] + + 'Hemisphere') + elif coordinateType == VARIATION: + hemisphereKey = 'magneticVariationDirection' + else: + raise ValueError("unknown coordinate type %s" % (coordinateType,)) + + hemisphere = getattr(self.currentSentence, hemisphereKey) + + try: + return self.COORDINATE_SIGNS[hemisphere.upper()] + except KeyError: + raise ValueError("bad hemisphere/direction: %s" % hemisphere) + + + def _convert(self, sourceKey, converter=float, destinationKey=None): + """ + A simple conversion fix. + + @param sourceKey: The attribute name of the value to fix. + @type sourceKey: C{str} (Python identifier) + + @param converter: The function that converts the value. + @type converter: unary callable + + @param destinationKey: The target attribute key. If unset or + C{None}, same as C{sourceKey}. + @type destinationKey: C{str} (Python identifier) + """ + currentValue = getattr(self.currentSentence, sourceKey) + + if destinationKey is None: + destinationKey = sourceKey + + self._sentenceData[destinationKey] = converter(currentValue) + + + + STATEFUL_UPDATE = { + # sentenceKey: (stateKey, factory, attributeName, converter), + 'trueHeading': + ('heading', base.Heading, '_angle', float), + 'magneticVariation': + ('heading', base.Heading, 'variation', + lambda angle: base.Angle(float(angle), VARIATION)), + + 'horizontalDilutionOfPrecision': + ('positionError', base.PositionError, 'hdop', float), + 'verticalDilutionOfPrecision': + ('positionError', base.PositionError, 'vdop', float), + 'positionDilutionOfPrecision': + ('positionError', base.PositionError, 'pdop', float), + + } + + + def _statefulUpdate(self, sentenceKey): + """ + Does a stateful update of a particular positioning attribute. + + @param sentenceKey: The name of the key in the sentence attributes, + C{NMEAAdapter.STATEFUL_UPDATE} dictionary and the adapter state. + @type sentenceKey: C{str} + """ + state, factory, attr, converter = self.STATEFUL_UPDATE[sentenceKey] + + if state not in self._sentenceData: + self._sentenceData[state] = self._state.get(state, factory()) + + newValue = converter(getattr(self.currentSentence, sentenceKey)) + setattr(self._sentenceData[state], attr, newValue) + + + ACCEPTABLE_UNITS = frozenset(['M']) + UNIT_CONVERTERS = { + 'N': lambda inKnots: base.Speed(float(inKnots) * base.MPS_PER_KNOT), + 'K': lambda inKPH: base.Speed(float(inKPH) * base.MPS_PER_KPH), + } + + + def _fixUnits(self, unitKey=None, valueKey=None, sourceKey=None, unit=None): + """ + Fixes the units of a certain value. + + @param unit: The unit that is being converted I{from}. If unspecified + or None, asks the current sentence for the C{unitKey}. If that also + fails, raises C{AttributeError}. + @type unit: C{str} + @param unitKey: The name of the key/attribute under which the unit can + be found in the current sentence. If the C{unit} parameter is set, + this parameter is not used. + @type unitKey: C{str} + @param sourceKey: The name of the key/attribute that contains the + current value to be converted (expressed in units as defined + according to the the C{unit} parameter). If unset, will use the + same key as the value key. + @type sourceKey: C{str} + @param valueKey: The key name in which the data will be stored in the + C{_sentenceData} instance attribute. If unset, attempts to strip + "Units" from the C{unitKey} parameter. + @type valueKey: C{str} + + None of the keys are allowed to be the empty string. + """ + unit = unit or getattr(self.currentSentence, unitKey) + valueKey = valueKey or unitKey.strip('Units') + sourceKey = sourceKey or valueKey + + if unit not in self.ACCEPTABLE_UNITS: + converter = self.UNIT_CONVERTERS[unit] + currentValue = getattr(self.currentSentence, sourceKey) + self._sentenceData[valueKey] = converter(currentValue) + + + GSV_KEYS = "satellitePRN", "azimuth", "elevation", "signalToNoiseRatio" + + + def _fixGSV(self): + """ + Parses partial visible satellite information from a GSV sentence. + """ + # To anyone who knows NMEA, this method's name should raise a chuckle's + # worth of schadenfreude. 'Fix' GSV? Hah! Ludicrous. + self._sentenceData['_partialBeaconInformation'] = base.BeaconInformation() + + for index in range(4): + keys = ["%s_%i" % (key, index) for key in self.GSV_KEYS] + values = [getattr(self.currentSentence, k) for k in keys] + prn, azimuth, elevation, snr = values + + if prn is None or snr is None: + # The peephole optimizer optimizes the jump away, meaning that + # coverage.py isn't covered. It is. Replace it with break and + # watch the test case fail. + # ML thread about this issue: http://goo.gl/1KNUi + # Related CPython bug: http://bugs.python.org/issue2506 + continue # pragma: no cover + + satellite = base.Satellite(prn, azimuth, elevation, snr) + bi = self._sentenceData['_partialBeaconInformation'] + bi.beacons.add(satellite) + + + def _fixGSA(self): + """ + Extracts the information regarding which satellites were used in + obtaining the GPS fix from a GSA sentence. + + @precondition: A GSA sentence was fired. + @postcondition: The current sentence data (C{self._sentenceData} will + contain a set of the currently used PRNs (under the key + C{_usedPRNs}. + """ + self._sentenceData['_usedPRNs'] = set() + for key in ("usedSatellitePRN_%d" % x for x in range(12)): + prn = getattr(self.currentSentence, key, None) + if prn is not None: + self._sentenceData['_usedPRNs'].add(int(prn)) + + + SPECIFIC_SENTENCE_FIXES = { + 'GPGSV': _fixGSV, + 'GPGSA': _fixGSA, + } + + + def _sentenceSpecificFix(self): + """ + Executes a fix for a specific type of sentence. + """ + fixer = self.SPECIFIC_SENTENCE_FIXES.get(self.currentSentence.type) + if fixer is not None: + fixer(self) + + + FIXERS = { + 'type': + lambda self: self._sentenceSpecificFix(), + + 'timestamp': + lambda self: self._fixTimestamp(), + 'datestamp': + lambda self: self._fixDatestamp(), + + 'latitudeFloat': + lambda self: self._fixCoordinateFloat(LATITUDE), + 'latitudeHemisphere': + lambda self: self._fixHemisphereSign(LATITUDE, 'latitude'), + 'longitudeFloat': + lambda self: self._fixCoordinateFloat(LONGITUDE), + 'longitudeHemisphere': + lambda self: self._fixHemisphereSign(LONGITUDE, 'longitude'), + + 'altitude': + lambda self: self._convert('altitude', + converter=lambda strRepr: base.Altitude(float(strRepr))), + 'altitudeUnits': + lambda self: self._fixUnits(unitKey='altitudeUnits'), + + 'heightOfGeoidAboveWGS84': + lambda self: self._convert('heightOfGeoidAboveWGS84', + converter=lambda strRepr: base.Altitude(float(strRepr))), + 'heightOfGeoidAboveWGS84Units': + lambda self: self._fixUnits( + unitKey='heightOfGeoidAboveWGS84Units'), + + 'trueHeading': + lambda self: self._statefulUpdate('trueHeading'), + 'magneticVariation': + lambda self: self._statefulUpdate('magneticVariation'), + + 'magneticVariationDirection': + lambda self: self._fixHemisphereSign(VARIATION, + 'heading'), + + 'speedInKnots': + lambda self: self._fixUnits(valueKey='speed', + sourceKey='speedInKnots', + unit='N'), + + 'positionDilutionOfPrecision': + lambda self: self._statefulUpdate('positionDilutionOfPrecision'), + 'horizontalDilutionOfPrecision': + lambda self: self._statefulUpdate('horizontalDilutionOfPrecision'), + 'verticalDilutionOfPrecision': + lambda self: self._statefulUpdate('verticalDilutionOfPrecision'), + } + + + def clear(self): + """ + Resets this adapter. + + This will empty the adapter state and the current sentence data. + """ + self._state = {} + self._sentenceData = {} + + + def sentenceReceived(self, sentence): + """ + Called when a sentence is received. + + Will clean the received NMEAProtocol sentence up, and then update the + adapter's state, followed by firing the callbacks. + + If the received sentence was invalid, the state will be cleared. + + @param sentence: The sentence that is received. + @type sentence: L{NMEASentence} + """ + self.currentSentence = sentence + + try: + self._validateCurrentSentence() + self._cleanCurrentSentence() + except base.InvalidSentence: + self.clear() + + self._updateSentence() + self._fireSentenceCallbacks() + + + def _validateCurrentSentence(self): + """ + Tests if a sentence contains a valid fix. + """ + if (self.currentSentence.fixQuality == GGA_INVALID_FIX + or self.currentSentence.dataMode == DATA_VOID + or self.currentSentence.fixType == GSA_NO_FIX): + raise base.InvalidSentence("bad sentence") + + + def _cleanCurrentSentence(self): + """ + Cleans the current sentence. + """ + for key in sorted(self.currentSentence.presentAttributes): + fixer = self.FIXERS.get(key, None) + + if fixer is not None: + fixer(self) + + + def _updateSentence(self): + """ + Updates the current state with the new information from the sentence. + """ + self._updateBeaconInformation() + self._combineDateAndTime() + self._state.update(self._sentenceData) + + + def _updateBeaconInformation(self): + """ + Updates existing beacon information state with new data. + """ + new = self._sentenceData.get('_partialBeaconInformation') + if new is None: + return + + usedPRNs = (self._state.get('_usedPRNs') + or self._sentenceData.get('_usedPRNs')) + if usedPRNs is not None: + for beacon in new.beacons: + beacon.isUsed = (beacon.identifier in usedPRNs) + + old = self._state.get('_partialBeaconInformation') + if old is not None: + new.beacons.update(old.beacons) + + if self.currentSentence._isLastGSVSentence(): + if not self.currentSentence._isFirstGSVSentence(): + # not a 1-sentence sequence, get rid of partial information + del self._state['_partialBeaconInformation'] + bi = self._sentenceData.pop('_partialBeaconInformation') + self._sentenceData['beaconInformation'] = bi + + + def _combineDateAndTime(self): + """ + Combines a C{datetime.date} object and a C{datetime.time} object, + collected from one or more NMEA sentences, into a single + C{datetime.datetime} object suitable for sending to the + L{IPositioningReceiver}. + """ + if not ('_date' in self._sentenceData or '_time' in self._sentenceData): + return + + date, time = [self._sentenceData.get(key) or self._state.get(key) + for key in ('_date', '_time')] + + if date is None or time is None: + return + + dt = datetime.datetime.combine(date, time) + self._sentenceData['time'] = dt + + + def _fireSentenceCallbacks(self): + """ + Fires sentence callbacks for the current sentence. + + A callback will only fire if all of the keys it requires are present in + the current state and at least one such field was altered in the + current sentence. + + The callbacks will only be fired with data from L{self._state}. + """ + for callbackName, requiredFields in self.REQUIRED_CALLBACK_FIELDS.items(): + callback = getattr(self._receiver, callbackName) + + kwargs = {} + atLeastOnePresentInSentence = False + + try: + for field in requiredFields: + if field in self._sentenceData: + atLeastOnePresentInSentence = True + kwargs[field] = self._state[field] + except KeyError: + continue + + if atLeastOnePresentInSentence: + callback(**kwargs) + + + +NMEAAdapter.REQUIRED_CALLBACK_FIELDS = dict( + (name, method.positional) for name, method + in ipositioning.IPositioningReceiver.namesAndDescriptions()) diff --git c/twisted/positioning/test/__init__.py w/twisted/positioning/test/__init__.py new file mode 100644 index 0000000..fcd5611 --- /dev/null +++ w/twisted/positioning/test/__init__.py @@ -0,0 +1,5 @@ +# Copyright (c) 2009-2011 Twisted Matrix Laboratories. +# See LICENSE for details. +""" +Tests for the Twisted positioning framework. +""" diff --git c/twisted/positioning/test/test_base.py w/twisted/positioning/test/test_base.py new file mode 100644 index 0000000..95fdba6 --- /dev/null +++ w/twisted/positioning/test/test_base.py @@ -0,0 +1,802 @@ +# Copyright (c) 2009-2011 Twisted Matrix Laboratories. +# See LICENSE for details. +""" +Test cases for positioning primitives. +""" +from twisted.trial.unittest import TestCase +from twisted.positioning import base +from twisted.positioning.base import LATITUDE, LONGITUDE +from twisted.positioning.base import NORTH, EAST, SOUTH, WEST + + +class AngleTests(TestCase): + """ + Tests for the L{twisted.positioning.base.Angle} class. + """ + def test_empty(self): + """ + Tests the repr of an empty angle. + """ + a = base.Angle() + self.assertEquals("", repr(a)) + + + def test_variation(self): + """ + Tests the repr of an empty variation. + """ + a = base.Angle(angleType=base.VARIATION) + self.assertEquals("", repr(a)) + + + def test_unknownType(self): + """ + Tests the repr of an unknown angle of a 1 decimal degree value. + """ + a = base.Angle(1.0) + self.assertEquals("", repr(a)) + + + +class HeadingTests(TestCase): + """ + Tests for the L{twisted.positioning.base.Heading} class. + """ + def test_simple(self): + """ + Tests some of the basic features of a very simple heading. + """ + h = base.Heading(1.) + self.assertEquals(h.inDecimalDegrees, 1.) + self.assertEquals(h.variation, None) + self.assertEquals(h.correctedHeading, None) + self.assertEquals(float(h), 1.) + + + def test_headingWithoutVariationRepr(self): + """ + Tests the repr of a heading without a variation. + """ + h = base.Heading(1.) + self.assertEquals(repr(h), "") + + + def test_headingWithVariationRepr(self): + """ + Tests the repr of a heading with a variation. + """ + angle, variation = 1.0, -10.0 + h = base.Heading.fromFloats(angle, variationValue=variation) + + variationRepr = '' % (variation,) + expectedRepr = '' % (angle, variationRepr) + self.assertEquals(repr(h), expectedRepr) + + + def test_equality(self): + """ + Tests if equal headings compare equal. + """ + self.assertEquals(base.Heading(1.), base.Heading(1.)) + + + def test_inequality(self): + """ + Tests if unequal headings compare unequal. + """ + self.assertNotEquals(base.Heading(1.), base.Heading(2.)) + + + def test_edgeCases(self): + """ + Tests that the two edge cases of a heading value of zero and a heading + value of zero with a variation of C{180.0} don't fail. + """ + base.Heading(0) + base.Heading(0, 180) + + + def _badValueTest(self, **kw): + """ + Helper function for verifying that bad values raise C{ValueError}. + + Passes C{**kw} to L{base.Heading.fromFloats}, and checks if that raises. + """ + self.assertRaises(ValueError, base.Heading.fromFloats, **kw) + + + def test_badAngleValueEdgeCase(self): + """ + Tests that a heading with value C{360.0} fails. + """ + self._badValueTest(angleValue=360.0) + + + def test_badVariationEdgeCase(self): + """ + Tests that a variation of C{-180.0} fails. + """ + self._badValueTest(variationValue=-180.0) + + + def test_negativeHeading(self): + """ + Tests that negative heading values cause C{ValueError}. + """ + self._badValueTest(angleValue=-10.0) + + + def test_headingTooLarge(self): + """ + Tests that an angle value larger than C{360.0} raises C{ValueError}. + """ + self._badValueTest(angleValue=370.0) + + + def test_variationTooNegative(self): + """ + Tests that variation values less than C{-180.0} fail. + """ + self._badValueTest(variationValue=-190.0) + + + def test_variationTooPositive(self): + """ + Tests that variation values greater than C{-180.0} fail. + """ + self._badValueTest(variationValue=190.0) + + + def test_correctedHeading(self): + """ + Simple test for a corrected heading. + """ + h = base.Heading.fromFloats(1., variationValue=-10.) + self.assertEquals(h.correctedHeading, base.Angle(11., base.HEADING)) + + + def test_correctedHeadingOverflow(self): + """ + Tests that a corrected heading that comes out above 360 degrees is + correctly handled. + """ + h = base.Heading.fromFloats(359., variationValue=-2.) + self.assertEquals(h.correctedHeading, base.Angle(1., base.HEADING)) + + + def test_correctedHeadingOverflowEdgeCase(self): + """ + Tests that a corrected heading that comes out to exactly 360 degrees + is correctly handled. + """ + h = base.Heading.fromFloats(359., variationValue=-1.) + self.assertEquals(h.correctedHeading, base.Angle(0., base.HEADING)) + + + def test_correctedHeadingUnderflow(self): + """ + Tests that a corrected heading that comes out under 0 degrees is + correctly handled. + """ + h = base.Heading.fromFloats(1., variationValue=2.) + self.assertEquals(h.correctedHeading, base.Angle(359., base.HEADING)) + + + def test_correctedHeadingUnderflowEdgeCase(self): + """ + Tests that a corrected heading that comes out under 0 degrees is + correctly handled. + """ + h = base.Heading.fromFloats(1., variationValue=1.) + self.assertEquals(h.correctedHeading, base.Angle(0., base.HEADING)) + + + def test_setVariationSign(self): + """ + Tests that setting the sign on a variation works. + """ + h = base.Heading.fromFloats(1., variationValue=1.) + h.setSign(1) + self.assertEquals(h.variation.inDecimalDegrees, 1.) + h.setSign(-1) + self.assertEquals(h.variation.inDecimalDegrees, -1.) + + + def test_setBadVariationSign(self): + """ + Tests that setting invalid sign values on a variation fails + predictably. + """ + h = base.Heading.fromFloats(1., variationValue=1.) + self.assertRaises(ValueError, h.setSign, -50) + self.assertEquals(h.variation.inDecimalDegrees, 1.) + + self.assertRaises(ValueError, h.setSign, 0) + self.assertEquals(h.variation.inDecimalDegrees, 1.) + + self.assertRaises(ValueError, h.setSign, 50) + self.assertEquals(h.variation.inDecimalDegrees, 1.) + + + def test_setUnknownVariationSign(self): + """ + Tests that setting an otherwise correct sign on an unknown variation + fails predictably. + """ + h = base.Heading.fromFloats(1.) + self.assertEquals(None, h.variation.inDecimalDegrees) + self.assertRaises(ValueError, h.setSign, 1) + + + +class CoordinateTests(TestCase): + def test_simple(self): + """ + Test that coordinates are convertible into a float, and verifies the + generic coordinate repr. + """ + value = 10.0 + c = base.Coordinate(value) + self.assertEquals(float(c), value) + expectedRepr = "" % (value,) + self.assertEquals(repr(c), expectedRepr) + + + def test_positiveLatitude(self): + """ + Tests creating positive latitudes and verifies their repr. + """ + value = 50.0 + c = base.Coordinate(value, LATITUDE) + self.assertEquals(repr(c), "" % value) + + + def test_negativeLatitude(self): + """ + Tests creating negative latitudes and verifies their repr. + """ + value = -50.0 + c = base.Coordinate(value, LATITUDE) + self.assertEquals(repr(c), "" % value) + + + def test_positiveLongitude(self): + """ + Tests creating positive longitudes and verifies their repr. + """ + value = 50.0 + c = base.Coordinate(value, LONGITUDE) + self.assertEquals(repr(c), "" % value) + + + def test_negativeLongitude(self): + """ + Tests creating negative longitudes and verifies their repr. + """ + value = -50.0 + c = base.Coordinate(value, LONGITUDE) + self.assertEquals(repr(c), "" % value) + + + def test_badCoordinateType(self): + """ + Tests that creating coordinates with bogus types raises C{ValueError}. + """ + self.assertRaises(ValueError, base.Coordinate, 150.0, "BOGUS") + + + def test_equality(self): + """ + Tests that equal coordinates compare equal. + """ + self.assertEquals(base.Coordinate(1.0), base.Coordinate(1.0)) + + + def test_differentAnglesInequality(self): + """ + Tests that coordinates with different angles compare unequal. + """ + c1 = base.Coordinate(1.0) + c2 = base.Coordinate(-1.0) + self.assertNotEquals(c1, c2) + + + def test_differentTypesInequality(self): + """ + Tests that coordinates with the same angles but different types + compare unequal. + """ + c1 = base.Coordinate(1.0, LATITUDE) + c2 = base.Coordinate(1.0, LONGITUDE) + self.assertNotEquals(c1, c2) + + + def test_sign(self): + """ + Tests that setting the sign on a coordinate works. + """ + c = base.Coordinate(50., LATITUDE) + c.setSign(1) + self.assertEquals(c.inDecimalDegrees, 50.) + c.setSign(-1) + self.assertEquals(c.inDecimalDegrees, -50.) + + + def test_badVariationSign(self): + """ + Tests that setting a bogus sign value on a coordinate raises + C{ValueError} and doesn't affect the coordinate. + """ + value = 50.0 + c = base.Coordinate(value, LATITUDE) + + self.assertRaises(ValueError, c.setSign, -50) + self.assertEquals(c.inDecimalDegrees, 50.) + + self.assertRaises(ValueError, c.setSign, 0) + self.assertEquals(c.inDecimalDegrees, 50.) + + self.assertRaises(ValueError, c.setSign, 50) + self.assertEquals(c.inDecimalDegrees, 50.) + + + def test_hemispheres(self): + """ + Checks that coordinates know which hemisphere they're in. + """ + coordinatesAndHemispheres = [ + (base.Coordinate(1.0, LATITUDE), NORTH), + (base.Coordinate(-1.0, LATITUDE), SOUTH), + (base.Coordinate(1.0, LONGITUDE), EAST), + (base.Coordinate(-1.0, LONGITUDE), WEST), + ] + + for coordinate, expectedHemisphere in coordinatesAndHemispheres: + self.assertEquals(expectedHemisphere, coordinate.hemisphere) + + + def test_badHemisphere(self): + """ + Checks that asking for a hemisphere when the coordinate doesn't know + raises C{ValueError}. + """ + c = base.Coordinate(1.0, None) + self.assertRaises(ValueError, lambda: c.hemisphere) + + + def test_badLatitudeValues(self): + """ + Tests that latitudes outside of M{-90.0 < latitude < 90.0} raise + C{ValueError}. + """ + self.assertRaises(ValueError, base.Coordinate, 150.0, LATITUDE) + self.assertRaises(ValueError, base.Coordinate, -150.0, LATITUDE) + + + def test_badLongitudeValues(self): + """ + Tests that longitudes outside of M{-180.0 < longitude < 180.0} raise + C{ValueError}. + """ + self.assertRaises(ValueError, base.Coordinate, 250.0, LONGITUDE) + self.assertRaises(ValueError, base.Coordinate, -250.0, LONGITUDE) + + + def test_inDegreesMinutesSeconds(self): + """ + Tests accessing coordinate values in degrees, minutes and seconds. + """ + c = base.Coordinate(50.5, LATITUDE) + self.assertEquals(c.inDegreesMinutesSeconds, (50, 30, 0)) + + c = base.Coordinate(50.213, LATITUDE) + self.assertEquals(c.inDegreesMinutesSeconds, (50, 12, 46)) + + + def test_unknownAngleInDegreesMinutesSeconds(self): + """ + Tests accessing unknown coordinate values in degrees, minutes + and seconds. + """ + c = base.Coordinate(None, None) + self.assertEquals(c.inDegreesMinutesSeconds, None) + + + +class AltitudeTests(TestCase): + """ + Tests for the L{twisted.positioning.base.Altitude} class. + """ + def test_simple(self): + """ + Tests basic altitude functionality. + """ + a = base.Altitude(1.) + self.assertEquals(float(a), 1.) + self.assertEquals(a.inMeters, 1.) + self.assertEquals(a.inFeet, 1./base.METERS_PER_FOOT) + self.assertEquals(repr(a), "") + + + def test_equality(self): + """ + Tests that equal altitudes compare equal. + """ + a1 = base.Altitude(1.) + a2 = base.Altitude(1.) + self.assertEquals(a1, a2) + + + def test_inequality(self): + """ + Tests that unequal altitudes compare unequal. + """ + a1 = base.Altitude(1.) + a2 = base.Altitude(-1.) + self.assertNotEquals(a1, a2) + + + +class SpeedTests(TestCase): + """ + Tests for the L{twisted.positioning.base.Speed} class. + """ + def test_simple(self): + """ + Tests basic speed functionality. + """ + s = base.Speed(50.0) + self.assertEquals(s.inMetersPerSecond, 50.0) + self.assertEquals(float(s), 50.0) + self.assertEquals(repr(s), "") + + + def test_negativeSpeeds(self): + """ + Tests that negative speeds raise C{ValueError}. + """ + self.assertRaises(ValueError, base.Speed, -1.0) + + + def test_inKnots(self): + """ + Tests that speeds can be converted into knots correctly. + """ + s = base.Speed(1.0) + self.assertEquals(1/base.MPS_PER_KNOT, s.inKnots) + + + def test_asFloat(self): + """ + Tests that speeds can be converted into C{float}s correctly. + """ + self.assertEquals(1.0, float(base.Speed(1.0))) + + + +class ClimbTests(TestCase): + """ + Tests for L{twisted.positioning.base.Climb}. + """ + def test_simple(self): + """ + Basic functionality for climb objects. + """ + s = base.Climb(42.) + self.assertEquals(s.inMetersPerSecond, 42.) + self.assertEquals(float(s), 42.) + self.assertEquals(repr(s), "") + + + def test_negativeClimbs(self): + """ + Tests that creating negative climbs works. + """ + base.Climb(-42.) + + + def test_speedInKnots(self): + """ + Tests that climbs can be converted into knots correctly. + """ + s = base.Climb(1.0) + self.assertEquals(1/base.MPS_PER_KNOT, s.inKnots) + + + def test_asFloat(self): + """ + Tests that speeds can be converted into C{float}s correctly. + """ + self.assertEquals(1.0, float(base.Climb(1.0))) + + + +class PositionErrorTests(TestCase): + """ + Tests for L{twisted.positioning.base.PositionError}. + """ + def test_allUnset(self): + """ + Tests that creating an empty L{PositionError} works without checking + the invariant. + """ + pe = base.PositionError() + for x in (pe.pdop, pe.hdop, pe.vdop): + self.assertEquals(None, x) + + + def test_allUnsetWithInvariant(self): + """ + Tests that creating an empty L{PositionError} works while checking the + invariant. + """ + pe = base.PositionError(testInvariant=True) + for x in (pe.pdop, pe.hdop, pe.vdop): + self.assertEquals(None, x) + + + def test_simpleWithoutInvariant(self): + """ + Tests that creating a simple L{PositionError} with just a HDOP without + checking the invariant works. + """ + base.PositionError(hdop=1.0) + + + def test_simpleWithInvariant(self): + """ + Tests that creating a simple L{PositionError} with just a HDOP while + checking the invariant works. + """ + base.PositionError(hdop=1.0, testInvariant=True) + + + def test_invalidWithoutInvariant(self): + """ + Tests that creating a simple L{PositionError} with all values set + without checking the invariant works. + """ + base.PositionError(pdop=1.0, vdop=1.0, hdop=1.0) + + + def test_invalidWithInvariant(self): + """ + Tests that creating a simple L{PositionError} with all values set to + inconsistent values while checking the invariant raises C{ValueError}. + """ + self.assertRaises(ValueError, base.PositionError, + pdop=1.0, vdop=1.0, hdop=1.0, testInvariant=True) + + + def test_setDOPWithoutInvariant(self): + """ + Tests that setting the PDOP value (with HDOP and VDOP already known) + to an inconsistent value without checking the invariant works. + """ + pe = base.PositionError(hdop=1.0, vdop=1.0) + pe.pdop = 100.0 + self.assertEquals(pe.pdop, 100.0) + + + def test_setDOPWithInvariant(self): + """ + Tests that setting the PDOP value (with HDOP and VDOP already known) + to an inconsistent value while checking the invariant raises + C{ValueError}. + """ + pe = base.PositionError(hdop=1.0, vdop=1.0, testInvariant=True) + pdop = pe.pdop + + def setPDOP(pe): + pe.pdop = 100.0 + + self.assertRaises(ValueError, setPDOP, pe) + self.assertEqual(pe.pdop, pdop) + + + REPR_TEMPLATE = "" + + + def _testDOP(self, pe, pdop, hdop, vdop): + """ + Tests the DOP values in a position error, and the repr of that + position error. + """ + self.assertEquals(pe.pdop, pdop) + self.assertEquals(pe.hdop, hdop) + self.assertEquals(pe.vdop, vdop) + self.assertEquals(repr(pe), self.REPR_TEMPLATE % (pdop, hdop, vdop)) + + + def test_positionAndHorizontalSet(self): + """ + Tests that the VDOP is correctly determined from PDOP and HDOP. + """ + pdop, hdop = 2.0, 1.0 + vdop = (pdop**2 - hdop**2)**.5 + pe = base.PositionError(pdop=pdop, hdop=hdop) + self._testDOP(pe, pdop, hdop, vdop) + + + def test_positionAndVerticalSet(self): + """ + Tests that the HDOP is correctly determined from PDOP and VDOP. + """ + pdop, vdop = 2.0, 1.0 + hdop = (pdop**2 - vdop**2)**.5 + pe = base.PositionError(pdop=pdop, vdop=vdop) + self._testDOP(pe, pdop, hdop, vdop) + + + def test_horizontalAndVerticalSet(self): + """ + Tests that the PDOP is correctly determined from HDOP and VDOP. + """ + hdop, vdop = 1.0, 1.0 + pdop = (hdop**2 + vdop**2)**.5 + pe = base.PositionError(hdop=hdop, vdop=vdop) + self._testDOP(pe, pdop, hdop, vdop) + + + +class BeaconInformationTests(TestCase): + """ + Tests for L{twisted.positioning.base.BeaconInformation}. + """ + def test_minimal(self): + """ + Tests some basic features of a minimal beacon information object. + + Tests the number of used beacons is zero, the total number of + beacons (the number of seen beacons) is zero, and the repr of + the object. + """ + bi = base.BeaconInformation() + self.assertEquals(len(list(bi.usedBeacons)), 0) + self.assertEquals(len(list(bi)), 0) + self.assertEquals(repr(bi), + "") + + + satelliteKwargs = {"azimuth": 1, "elevation": 1, "signalToNoiseRatio": 1.} + + + def test_simple(self): + """ + Tests a beacon information with a bunch of satellites, none of + which used in computing a fix. + """ + def _buildSatellite(**kw): + kwargs = dict(self.satelliteKwargs) + kwargs.update(kw) + return base.Satellite(isUsed=None, **kwargs) + + beacons = set() + for prn in range(1, 10): + beacons.add(_buildSatellite(identifier=prn)) + + bi = base.BeaconInformation(beacons) + + self.assertEquals(len(list(bi.usedBeacons)), 0) + self.assertEquals(bi.used, None) + self.assertEquals(len(list(bi)), 9) + self.assertEquals(repr(bi), + ", " + ", " + ", " + ", " + ", " + ", " + ", " + ", " + "" + "})>") + + + def test_someSatellitesUsed(self): + """ + Tests a beacon information with a bunch of satellites, some of + them used in computing a fix. + """ + def _buildSatellite(**kw): + kwargs = dict(self.satelliteKwargs) + kwargs.update(kw) + return base.Satellite(**kwargs) + + beacons = set() + for prn in range(1, 10): + isUsed = bool(prn % 2) + satellite = _buildSatellite(identifier=prn, isUsed=isUsed) + beacons.add(satellite) + + bi = base.BeaconInformation(beacons) + + self.assertEquals(len(list(bi.usedBeacons)), 5) + self.assertEquals(bi.used, 5) + self.assertEquals(len(list(bi)), 9) + self.assertEquals(len(bi.beacons), 9) + self.assertEquals(bi.seen, 9) + self.assertEquals(repr(bi), + ", " + ", " + ", " + ", " + ", " + ", " + ", " + ", " + "" + "})>") + + + +class PositioningBeaconTests(TestCase): + """ + Tests for L{twisted.positioning.base.PositioningBeacon}. + """ + def test_usedRepr(self): + """ + Tests the repr of a positioning beacon being used. + """ + s = base.PositioningBeacon("A", True) + self.assertEquals(repr(s), "") + + + def test_unusedRepr(self): + """ + Tests the repr of a positioning beacon not being used. + """ + s = base.PositioningBeacon("A", False) + self.assertEquals(repr(s), "") + + + def test_dontKnowIfUsed(self): + """ + Tests the repr of a positioning beacon that might be used. + """ + s = base.PositioningBeacon("A", None) + self.assertEquals(repr(s), "") + + + +class SatelliteTests(TestCase): + """ + Tests for L{twisted.positioning.base.Satellite}. + """ + def test_minimal(self): + """ + Tests a minimal satellite that only has a known PRN. + + Tests that the azimuth, elevation and signal to noise ratios + are C{None} and verifies the repr. + """ + s = base.Satellite(1) + self.assertEquals(s.identifier, 1) + self.assertEquals(s.azimuth, None) + self.assertEquals(s.elevation, None) + self.assertEquals(s.signalToNoiseRatio, None) + self.assertEquals(repr(s), "") + + + def test_simple(self): + """ + Tests a minimal satellite that only has a known PRN. + + Tests that the azimuth, elevation and signal to noise ratios + are correct and verifies the repr. + """ + s = base.Satellite(identifier=1, + azimuth=270., + elevation=30., + signalToNoiseRatio=25., + isUsed=True) + + self.assertEquals(s.identifier, 1) + self.assertEquals(s.azimuth, 270.) + self.assertEquals(s.elevation, 30.) + self.assertEquals(s.signalToNoiseRatio, 25.) + self.assertEquals(repr(s), "") diff --git c/twisted/positioning/test/test_nmea.py w/twisted/positioning/test/test_nmea.py new file mode 100644 index 0000000..d574740 --- /dev/null +++ w/twisted/positioning/test/test_nmea.py @@ -0,0 +1,1184 @@ +# Copyright (c) 2009-2011 Twisted Matrix Laboratories. +# See LICENSE for details. +""" +Test cases for using NMEA sentences. +""" +import datetime +from zope.interface import implements + +from twisted.positioning import base, nmea, ipositioning +from twisted.trial.unittest import TestCase + +from twisted.positioning.base import LATITUDE, LONGITUDE + +# Sample sentences +GPGGA = '$GPGGA,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,*47' +GPRMC = '$GPRMC,123519,A,4807.038,N,01131.000,E,022.4,084.4,230394,003.1,W*6A' +GPGSA = '$GPGSA,A,3,19,28,14,18,27,22,31,39,,,,,1.7,1.0,1.3*34' +GPHDT = '$GPHDT,038.005,T*3B' +GPGLL = '$GPGLL,4916.45,N,12311.12,W,225444,A*31' +GPGLL_PARTIAL = '$GPGLL,3751.65,S,14507.36,E*77' + +GPGSV_SINGLE = '$GPGSV,1,1,11,03,03,111,00,04,15,270,00,06,01,010,00,,,,*4b' +GPGSV_EMPTY_MIDDLE = '$GPGSV,1,1,11,03,03,111,00,,,,,,,,,13,06,292,00*75' +GPGSV_SEQ = GPGSV_FIRST, GPGSV_MIDDLE, GPGSV_LAST = """ +$GPGSV,3,1,11,03,03,111,00,04,15,270,00,06,01,010,00,13,06,292,00*74 +$GPGSV,3,2,11,14,25,170,00,16,57,208,39,18,67,296,40,19,40,246,00*74 +$GPGSV,3,3,11,22,42,067,42,24,14,311,43,27,05,244,00,,,,*4D +""".split() + + + +class NMEATestReceiver(object): + """ + An NMEA receiver for testing. + + Remembers the last sentence it has received. + """ + implements(ipositioning.INMEAReceiver) + + def __init__(self): + self.clear() + + + def clear(self): + """ + Forgets the received sentence (if any), by setting + C{self.receivedSentence} to C{None}. + """ + self.receivedSentence = None + + + def sentenceReceived(self, sentence): + self.receivedSentence = sentence + + + +class NMEACallbackTestProtocol(nmea.NMEAProtocol): + """ + An NMEA protocol with a bunch of callbacks that remembers when + those callbacks have been called. + """ + def __init__(self): + nmea.NMEAProtocol.__init__(self, None) + + for sentenceType in nmea.NMEAProtocol.SENTENCE_CONTENTS: + self._createCallback(sentenceType) + + self.clear() + + + def clear(self): + """ + Forgets all of the called methods, by setting C{self.called} to + C{None}. + """ + self.called = {} + + + SENTENCE_TYPES = list(nmea.NMEAProtocol.SENTENCE_CONTENTS) + + + def _createCallback(self, sentenceType): + """ + Creates a callback for an NMEA sentence. + """ + def callback(sentence): + self.called[sentenceType] = True + + setattr(self, "nmea_" + sentenceType, callback) + + + +class CallbackTests(TestCase): + """ + Tests if callbacks on NMEA protocols are correctly called. + """ + def setUp(self): + self.callbackProtocol = NMEACallbackTestProtocol() + + + def test_callbacksCalled(self): + """ + Tests that the correct callbacks fire, and that *only* those fire. + """ + sentencesByType = {'GPGGA': ['$GPGGA*56'], + 'GPGLL': ['$GPGLL*50'], + 'GPGSA': ['$GPGSA*42'], + 'GPGSV': ['$GPGSV*55'], + 'GPHDT': ['$GPHDT*4f'], + 'GPRMC': ['$GPRMC*4b']} + + for calledSentenceType in sentencesByType: + for sentence in sentencesByType[calledSentenceType]: + self.callbackProtocol.lineReceived(sentence) + called = self.callbackProtocol.called + + for sentenceType in NMEACallbackTestProtocol.SENTENCE_TYPES: + if sentenceType == calledSentenceType: + self.assertEquals(called[sentenceType], True) + else: + self.assertNotIn(sentenceType, called) + + self.callbackProtocol.clear() + + + +class SplitTest(TestCase): + """ + Checks splitting of NMEA sentences. + """ + def test_withChecksum(self): + """ + Tests that an NMEA sentence with a checksum gets split correctly. + """ + splitSentence = nmea.split("$GPGGA,spam,eggs*00") + self.assertEqual(splitSentence, ['GPGGA', 'spam', 'eggs']) + + + def test_noCheckum(self): + """ + Tests that an NMEA sentence without a checksum gets split correctly. + """ + splitSentence = nmea.split("$GPGGA,spam,eggs*") + self.assertEqual(splitSentence, ['GPGGA', 'spam', 'eggs']) + + + +class ChecksumTests(TestCase): + """ + NMEA sentence checksum verification tests. + """ + def test_valid(self): + """ + Tests checkum validation for valid or missing checksums. + """ + sentences = [GPGGA, GPGGA[:-2]] + + for s in sentences: + nmea.validateChecksum(s) + + + def test_invalid(self): + """ + Tests checksum validation on invalid checksums. + """ + bareSentence, checksum = GPGGA.split("*") + badChecksum = "%x" % (int(checksum, 16) + 1) + sentences = ["%s*%s" % (bareSentence, badChecksum)] + + for s in sentences: + self.assertRaises(base.InvalidChecksum, nmea.validateChecksum, s) + + + +class NMEAReceiverSetup: + """ + A mixin for tests that need an NMEA receiver (and a protocol attached to + it). + + @ivar receiver: An NMEA receiver that remembers the last sentence. + @type receiver: L{NMEATestReceiver} + + @ivar protocol: An NMEA protocol attached to the receiver. + @type protocol: L{twisted.positioning.nmea.NMEAProtocol} + """ + def setUp(self): + self.receiver = NMEATestReceiver() + self.protocol = nmea.NMEAProtocol(self.receiver) + + + +class GSVSequenceTests(NMEAReceiverSetup, TestCase): + """ + Tests if GSV sentence sequences are identified correctly. + """ + def test_firstSentence(self): + """ + Tests if the last sentence in a GSV sequence is correctly identified. + """ + self.protocol.lineReceived(GPGSV_FIRST) + sentence = self.receiver.receivedSentence + + self.assertTrue(sentence._isFirstGSVSentence()) + self.assertFalse(sentence._isLastGSVSentence()) + + + def test_middleSentence(self): + """ + Tests if a sentence in the middle of a GSV sequence is correctly + identified (as being neither the last nor the first). + """ + self.protocol.lineReceived(GPGSV_MIDDLE) + sentence = self.receiver.receivedSentence + + self.assertFalse(sentence._isFirstGSVSentence()) + self.assertFalse(sentence._isLastGSVSentence()) + + + def test_lastSentence(self): + """ + Tests if the last sentence in a GSV sequence is correctly identified. + """ + self.protocol.lineReceived(GPGSV_LAST) + sentence = self.receiver.receivedSentence + + self.assertFalse(sentence._isFirstGSVSentence()) + self.assertTrue(sentence._isLastGSVSentence()) + + + +class BogusSentenceTests(NMEAReceiverSetup, TestCase): + """ + Tests for verifying predictable failure for bogus NMEA sentences. + """ + def assertRaisesOnSentence(self, exceptionClass, sentence): + """ + Asserts that the protocol raises C{exceptionClass} when it receives + C{sentence}. + + @param exceptionClass: The exception class expected to be raised. + @type exceptionClass: C{Exception} subclass + + @param sentence: The (bogus) NMEA sentence. + @type sentence: C{str} + """ + self.assertRaises(exceptionClass, self.protocol.lineReceived, sentence) + + + def test_raiseOnUnknownSentenceType(self): + """ + Tests that the protocol raises C{ValueError} when you feed it a + well-formed sentence of unknown type. + """ + self.assertRaisesOnSentence(ValueError, "$GPBOGUS*5b") + + + def test_raiseOnMalformedSentences(self): + """ + Tests that the protocol raises L{base.InvalidSentence} when you feed + it a malformed sentence. + """ + self.assertRaisesOnSentence(base.InvalidSentence, "GPBOGUS") + + + +class NMEASentenceTests(NMEAReceiverSetup, TestCase): + """ + Tests for L{nmea.NMEASentence} objects. + """ + def test_repr(self): + """ + Checks that the C{repr} of L{nmea.NMEASentence} objects is + predictable. + """ + sentencesWithExpectedRepr = [ + (GPGSA, + ""), + ] + + for sentence, repr_ in sentencesWithExpectedRepr: + self.protocol.lineReceived(sentence) + received = self.receiver.receivedSentence + self.assertEquals(repr(received), repr_) + + + +class ParsingTests(NMEAReceiverSetup, TestCase): + """ + Tests if raw NMEA sentences get parsed correctly. + + This doesn't really involve any interpretation, just turning ugly raw NMEA + representations into objects that are more pleasant to work with. + """ + def _parserTest(self, sentence, expected): + """ + Passes a sentence to the protocol and gets the parsed sentence from + the receiver. Then verifies that the parsed sentence contains the + expected data. + """ + self.protocol.lineReceived(sentence) + received = self.receiver.receivedSentence + self.assertEquals(expected, received._sentenceData) + + + def test_fullRMC(self): + """ + Tests that a full RMC sentence is correctly parsed. + """ + expected = { + 'type': 'GPRMC', + 'latitudeFloat': '4807.038', + 'latitudeHemisphere': 'N', + 'longitudeFloat': '01131.000', + 'longitudeHemisphere': 'E', + 'magneticVariation': '003.1', + 'magneticVariationDirection': 'W', + 'speedInKnots': '022.4', + 'timestamp': '123519', + 'datestamp': '230394', + 'trueHeading': '084.4', + 'dataMode': 'A', + } + self._parserTest(GPRMC, expected) + + + def test_fullGGA(self): + """ + Tests that a full GGA sentence is correctly parsed. + """ + expected = { + 'type': 'GPGGA', + + 'altitude': '545.4', + 'altitudeUnits': 'M', + 'heightOfGeoidAboveWGS84': '46.9', + 'heightOfGeoidAboveWGS84Units': 'M', + + 'horizontalDilutionOfPrecision': '0.9', + + 'latitudeFloat': '4807.038', + 'latitudeHemisphere': 'N', + 'longitudeFloat': '01131.000', + 'longitudeHemisphere': 'E', + + 'numberOfSatellitesSeen': '08', + 'timestamp': '123519', + 'fixQuality': '1', + } + self._parserTest(GPGGA, expected) + + + def test_fullGLL(self): + """ + Tests that a full GLL sentence is correctly parsed. + """ + expected = { + 'type': 'GPGLL', + + 'latitudeFloat': '4916.45', + 'latitudeHemisphere': 'N', + 'longitudeFloat': '12311.12', + 'longitudeHemisphere': 'W', + + 'timestamp': '225444', + 'dataMode': 'A', + } + self._parserTest(GPGLL, expected) + + + def test_partialGLL(self): + """ + Tests that a partial GLL sentence is correctly parsed. + """ + expected = { + 'type': 'GPGLL', + + 'latitudeFloat': '3751.65', + 'latitudeHemisphere': 'S', + 'longitudeFloat': '14507.36', + 'longitudeHemisphere': 'E', + } + self._parserTest(GPGLL_PARTIAL, expected) + + + def test_fullGSV(self): + """ + Tests that a full GSV sentence is correctly parsed. + """ + expected = { + 'type': 'GPGSV', + 'GSVSentenceIndex': '1', + 'numberOfGSVSentences': '3', + 'numberOfSatellitesSeen': '11', + + 'azimuth_0': '111', + 'azimuth_1': '270', + 'azimuth_2': '010', + 'azimuth_3': '292', + + 'elevation_0': '03', + 'elevation_1': '15', + 'elevation_2': '01', + 'elevation_3': '06', + + 'satellitePRN_0': '03', + 'satellitePRN_1': '04', + 'satellitePRN_2': '06', + 'satellitePRN_3': '13', + + 'signalToNoiseRatio_0': '00', + 'signalToNoiseRatio_1': '00', + 'signalToNoiseRatio_2': '00', + 'signalToNoiseRatio_3': '00', + } + self._parserTest(GPGSV_FIRST, expected) + + + def test_partialGSV(self): + """ + Tests that a partial GSV sentence is correctly parsed. + """ + expected = { + 'type': 'GPGSV', + 'GSVSentenceIndex': '3', + 'numberOfGSVSentences': '3', + 'numberOfSatellitesSeen': '11', + + 'azimuth_0': '067', + 'azimuth_1': '311', + 'azimuth_2': '244', + + 'elevation_0': '42', + 'elevation_1': '14', + 'elevation_2': '05', + + 'satellitePRN_0': '22', + 'satellitePRN_1': '24', + 'satellitePRN_2': '27', + + 'signalToNoiseRatio_0': '42', + 'signalToNoiseRatio_1': '43', + 'signalToNoiseRatio_2': '00', + } + self._parserTest(GPGSV_LAST, expected) + + + def test_fullHDT(self): + """ + Tests that a full HDT sentence is correctly parsed. + """ + expected = { + 'type': 'GPHDT', + 'trueHeading': '038.005', + } + self._parserTest(GPHDT, expected) + + + def test_typicalGSA(self): + """ + Tests that a typical GSA sentence is correctly parsed. + """ + expected = { + 'type': 'GPGSA', + + 'dataMode': 'A', + 'fixType': '3', + + 'usedSatellitePRN_0': '19', + 'usedSatellitePRN_1': '28', + 'usedSatellitePRN_2': '14', + 'usedSatellitePRN_3': '18', + 'usedSatellitePRN_4': '27', + 'usedSatellitePRN_5': '22', + 'usedSatellitePRN_6': '31', + 'usedSatellitePRN_7': '39', + + 'positionDilutionOfPrecision': '1.7', + 'horizontalDilutionOfPrecision': '1.0', + 'verticalDilutionOfPrecision': '1.3', + } + self._parserTest(GPGSA, expected) + + + +class FixerTestMixin: + """ + Mixin for tests for the fixers on L{nmea.NMEAAdapter} that adapt + from NMEA-specific notations to generic Python objects. + + @ivar adapter: The NMEA adapter. + @type adapter: L{nmea.NMEAAdapter} + """ + def setUp(self): + self.adapter = nmea.NMEAAdapter(base.BasePositioningReceiver()) + + + def _fixerTest(self, sentenceData, expected=None, exceptionClass=None): + """ + A generic adapter fixer test. + + Creates a sentence from the C{sentenceData} and sends that to the + adapter. If C{exceptionClass} is not passed, this is assumed to work, + and C{expected} is compared with the adapter's internal state. + Otherwise, passing the sentence to the adapter is checked to raise + C{exceptionClass}. + + @param sentenceData: Raw sentence content. + @type sentenceData: C{dict} mapping C{str} to C{str} + + @param expected: The expected state of the adapter. + @type expected: C{dict} or C{None} + + @param exceptionClass: The exception to be raised by the adapter. + @type exceptionClass: subclass of C{Exception} + """ + sentence = nmea.NMEASentence(sentenceData) + def receiveSentence(): + self.adapter.sentenceReceived(sentence) + + if exceptionClass is None: + receiveSentence() + self.assertEquals(self.adapter._state, expected) + else: + self.assertRaises(exceptionClass, receiveSentence) + + self.adapter.clear() + + + +class TimestampFixerTests(FixerTestMixin, TestCase): + """ + Tests conversion from NMEA timestamps to C{datetime.time} objects. + """ + def test_simple(self): + """ + Tests that a simple timestamp is converted correctly. + """ + data = {'timestamp': '123456'} # 12:34:56Z + expected = {'_time': datetime.time(12, 34, 56)} + self._fixerTest(data, expected) + + + def test_broken(self): + """ + Tests that a broken timestamp raises C{ValueError}. + """ + badTimestamps = '993456', '129956', '123499' + + for t in badTimestamps: + self._fixerTest({'timestamp': t}, exceptionClass=ValueError) + + + +class DatestampFixerTests(FixerTestMixin, TestCase): + def test_intelligent(self): + """ + Tests "intelligent" datestamp handling (guess century based on last + two digits). Also tests that this is the default. + """ + self.assertEqual(self.adapter.DATESTAMP_HANDLING, + self.adapter.INTELLIGENT_DATESTAMPS) + + datestring, date = '010199', datetime.date(1999, 1, 1) + self._fixerTest({'datestamp': datestring}, {'_date': date}) + + datestring, date = '010109', datetime.date(2009, 1, 1) + self._fixerTest({'datestamp': datestring}, {'_date': date}) + + + def test_19xx(self): + """ + Tests 20th-century-only datestam handling method. + """ + self.adapter.DATESTAMP_HANDLING = self.adapter.DATESTAMPS_FROM_19XX + + datestring, date = '010199', datetime.date(1999, 1, 1) + self._fixerTest({'datestamp': datestring}, {'_date': date}) + + datestring, date = '010109', datetime.date(1909, 1, 1) + self._fixerTest({'datestamp': datestring}, {'_date': date}) + + + def test_20xx(self): + """ + Tests 21st-century-only datestam handling method. + """ + self.adapter.DATESTAMP_HANDLING = self.adapter.DATESTAMPS_FROM_20XX + + datestring, date = '010199', datetime.date(2099, 1, 1) + self._fixerTest({'datestamp': datestring}, {'_date': date}) + + datestring, date = '010109', datetime.date(2009, 1, 1) + self._fixerTest({'datestamp': datestring}, {'_date': date}) + + + def test_bogusMethod(self): + """ + Tests that using a nonexistent datestamp handling method raises C{ValueError}. + """ + self.adapter.DATESTAMP_HANDLING = "BOGUS_VALUE" + self._fixerTest({'datestamp': '010199'}, exceptionClass=ValueError) + + + def test_broken(self): + """ + Tests that a broken datestring raises C{ValueError}. + """ + self._fixerTest({'datestamp': '123456'}, exceptionClass=ValueError) + + + +def _nmeaFloat(degrees, minutes): + """ + Builds an NMEA float representation for a given angle in degrees and + decimal minutes. + + @param degrees: The integer degrees for this angle. + @type degrees: C{int} + @param minutes: The decimal minutes value for this angle. + @type minutes: C{float} + @return: The NMEA float representation for this angle. + @rtype: C{str} + """ + return "%i%0.3f" % (degrees, minutes) + + +def _coordinateSign(hemisphere): + """ + Return the sign of a coordinate. + + This is C{1} if the coordinate is in the northern or eastern hemispheres, + C{-1} otherwise. + + @param hemisphere: NMEA shorthand for the hemisphere. One of "NESW". + @type hemisphere: C{str} + + @return: The sign of the coordinate value. + @rtype: C{int} + """ + return 1 if hemisphere in "NE" else -1 + + +def _coordinateType(hemisphere): + """ + Return the type of a coordinate. + + This is L{LATITUDE} if the coordinate is in the northern or southern + hemispheres, L{LONGITUDE} otherwise. + + @param hemisphere: NMEA shorthand for the hemisphere. One of "NESW". + @type hemisphere: C{str} + + @return: The type of the coordinate (L{LATITUDE} or L{LONGITUDE}) + """ + return LATITUDE if hemisphere in "NS" else LONGITUDE + + + +class CoordinateFixerTests(FixerTestMixin, TestCase): + """ + Tests turning NMEA coordinate notations into something more pleasant. + """ + def _coordinateFixerTest(self, degrees, minutes, hemisphere): + """ + Tests that an NMEA representation of a coordinate at the given + location converts correctly into a L{base.Coordinate}. + """ + coordinateType = _coordinateType(hemisphere) + if coordinateType is LATITUDE: + typeName = "latitude" + else: + typeName = "longitude" + + sentenceData = {"%sFloat" % typeName: _nmeaFloat(degrees, minutes), + "%sHemisphere" % typeName: hemisphere} + + coordinateValue = _coordinateSign(hemisphere)*(degrees + minutes/60) + coordinate = base.Coordinate(coordinateValue, coordinateType) + + self._fixerTest(sentenceData, {typeName: coordinate}) + + + def test_north(self): + """ + Tests that NMEA coordinate representations in the northern hemisphere + convert correctly. + """ + self._coordinateFixerTest(10, 30.0, "N") + + + def test_south(self): + """ + Tests that NMEA coordinate representations in the southern hemisphere + convert correctly. + """ + self._coordinateFixerTest(45, 12.145, "S") + + + def test_east(self): + """ + Tests that NMEA coordinate representations in the eastern hemisphere + convert correctly. + """ + self._coordinateFixerTest(53, 31.513, "E") + + + def test_west(self): + """ + Tests that NMEA coordinate representations in the western hemisphere + convert correctly. + """ + self._coordinateFixerTest(12, 45.120, "W") + + + def test_badHemisphere(self): + """ + Tests that NMEA coordinate representations for nonexistent hemispheres + raise C{ValueError} when you attempt to parse them. + """ + sentenceData = {'longitudeHemisphere': 'Q'} + self._fixerTest(sentenceData, exceptionClass=ValueError) + + + def test_badHemisphereSign(self): + """ + Tests that NMEA coordinate repesentation parsing fails predictably + when you pass nonexistent coordinate types (not latitude or + longitude). + """ + getSign = lambda: self.adapter._getHemisphereSign("BOGUS_VALUE") + self.assertRaises(ValueError, getSign) + + + +class AltitudeFixerTests(FixerTestMixin, TestCase): + """ + Tests that NMEA representations of altitudes are correctly converted. + """ + def test_fixAltitude(self): + """ + Tests that the NMEA representation of an altitude (above mean sea + level) is correctly converted. + """ + key, value = 'altitude', '545.4' + altitude = base.Altitude(float(value)) + self._fixerTest({key: value}, {key: altitude}) + + + def test_heightOfGeoidAboveWGS84(self): + """ + Tests that the NMEA representation of an altitude of the geoid (above + the WGS84 reference level) is correctly converted. + """ + key, value = 'heightOfGeoidAboveWGS84', '46.9' + altitude = base.Altitude(float(value)) + self._fixerTest({key: value}, {key: altitude}) + + + +class SpeedFixerTests(FixerTestMixin, TestCase): + """ + Tests that NMEA representations of speeds are correctly converted. + """ + def test_speedInKnots(self): + """ + Tests if speeds reported in knots correctly get converted to + meters per second. + """ + key, value, targetKey = "speedInKnots", "10", "speed" + speed = base.Speed(float(value) * base.MPS_PER_KNOT) + self._fixerTest({key: value}, {targetKey: speed}) + + + +class VariationFixerTests(FixerTestMixin, TestCase): + """ + Tests if the absolute values of magnetic variations on the heading + and their sign get combined correctly, and if that value gets + combined with a heading correctly. + """ + def test_west(self): + """ + Tests westward (negative) magnetic variation. + """ + variation, direction = "1.34", "W" + heading = base.Heading.fromFloats(variationValue=-1*float(variation)) + sentenceData = {'magneticVariation': variation, + 'magneticVariationDirection': direction} + + self._fixerTest(sentenceData, {'heading': heading}) + + + def test_east(self): + """ + Tests eastward (positive) magnetic variation. + """ + variation, direction = "1.34", "E" + heading = base.Heading.fromFloats(variationValue=float(variation)) + sentenceData = {'magneticVariation': variation, + 'magneticVariationDirection': direction} + + self._fixerTest(sentenceData, {'heading': heading}) + + + def test_withHeading(self): + """ + Tests if variation values get combined with headings correctly. + """ + trueHeading, variation, direction = "123.12", "1.34", "E" + sentenceData = {'trueHeading': trueHeading, + 'magneticVariation': variation, + 'magneticVariationDirection': direction} + heading = base.Heading.fromFloats(float(trueHeading), + variationValue=float(variation)) + self._fixerTest(sentenceData, {'heading': heading}) + + + +class PositionErrorFixerTests(FixerTestMixin, TestCase): + """ + Position errors in NMEA are passed as dilutions of precision (DOP). This + is a measure relative to some specified value of the GPS device as its + "reference" precision. Unfortunately, there are very few ways of figuring + this out from just the device (sans manual). + + There are two basic DOP values: vertical and horizontal. HDOP tells you + how precise your location is on the face of the earth (pretending it's + flat, at least locally). VDOP tells you how precise your altitude is + known. PDOP (position DOP) is a dependent value defined as the Nuclidean + norm of those two, and gives you a more generic "goodness of fix" value. + """ + def test_simple(self): + self._fixerTest( + {'horizontalDilutionOfPrecision': '11'}, + {'positionError': base.PositionError(hdop=11.)}) + + + def test_mixing(self): + pdop, hdop, vdop = "1", "1", "1" + positionError = base.PositionError(pdop=float(pdop), + hdop=float(hdop), + vdop=float(vdop)) + sentenceData = {'positionDilutionOfPrecision': pdop, + 'horizontalDilutionOfPrecision': hdop, + 'verticalDilutionOfPrecision': vdop} + self._fixerTest(sentenceData, {"positionError": positionError}) + + +class ValidFixTests(FixerTestMixin, TestCase): + """ + Tests that data reported from a valid fix is used. + """ + def test_GGA(self): + """ + Tests that GGA data with a valid fix is used. + """ + sentenceData = {'type': 'GPGGA', + 'altitude': '545.4', + 'fixQuality': nmea.GGA_GPS_FIX} + expectedState = {'altitude': base.Altitude(545.4)} + + self._fixerTest(sentenceData, expectedState) + + + def test_GLL(self): + """ + Tests that GLL data with a valid data mode is used. + """ + sentenceData = {'type': 'GPGLL', + 'altitude': '545.4', + 'dataMode': nmea.DATA_ACTIVE} + expectedState = {'altitude': base.Altitude(545.4)} + + self._fixerTest(sentenceData, expectedState) + + + +class InvalidFixTests(FixerTestMixin, TestCase): + """ + Tests that data being reported from a bad or incomplete fix isn't + used. Although the specification dictates that GPSes shouldn't produce + NMEA sentences with real-looking values for altitude or position in them + unless they have at least some semblance of a GPS fix, this is widely + ignored. + """ + def _invalidFixTest(self, sentenceData): + """ + Tests that sentences with an invalid fix or data mode result in empty + state (ie, the data isn't used). + """ + self._fixerTest(sentenceData, {}) + + + def test_GGA(self): + """ + Tests that GGA sentence data is unused when there is no fix. + """ + sentenceData = {'type': 'GPGGA', + 'altitude': '545.4', + 'fixQuality': nmea.GGA_INVALID_FIX} + + self._invalidFixTest(sentenceData) + + + def test_GLL(self): + """ + Tests that GLL sentence data is unused when the data is flagged as + void. + """ + sentenceData = {'type': 'GPGLL', + 'altitude': '545.4', + 'dataMode': nmea.DATA_VOID} + + self._invalidFixTest(sentenceData) + + + def test_badGSADataMode(self): + """ + Tests that GSA sentence data is not used when there is no GPS fix, but + the data mode claims the data is "active". Some GPSes do do this, + unfortunately, and that means you shouldn't use the data. + """ + sentenceData = {'type': 'GPGSA', + 'altitude': '545.4', + 'dataMode': nmea.DATA_ACTIVE, + 'fixType': nmea.GSA_NO_FIX} + self._invalidFixTest(sentenceData) + + + def test_badGSAFixType(self): + """ + Tests that GSA sentence data is not used when the fix claims to be + valid (albeit only 2D), but the data mode says the data is void. Some + GPSes do do this, unfortunately, and that means you shouldn't use the + data. + """ + sentenceData = {'type': 'GPGSA', + 'altitude': '545.4', + 'dataMode': nmea.DATA_VOID, + 'fixType': nmea.GSA_2D_FIX} + self._invalidFixTest(sentenceData) + + + def test_badGSADataModeAndFixType(self): + """ + Tests that GSA sentence data is not use when neither the fix nor the + data mode is any good. + """ + sentenceData = {'type': 'GPGSA', + 'altitude': '545.4', + 'dataMode': nmea.DATA_VOID, + 'fixType': nmea.GSA_NO_FIX} + self._invalidFixTest(sentenceData) + + + +class MockNMEAReceiver(base.BasePositioningReceiver): + """ + A mock NMEA receiver. + + Mocks all the L{IPositioningReceiver} methods with stubs that don't do + anything but register that they were called. + """ + def __init__(self): + self.clear() + + for methodName in ipositioning.IPositioningReceiver: + self._addCallback(methodName) + + + def clear(self): + """ + Forget all the methods that have been called on this receiver, by + emptying C{self.called}. + """ + self.called = {} + + + def _addCallback(self, name): + def callback(*a, **kw): + self.called[name] = True + + setattr(self, name, callback) + + + +class NMEAReceiverTest(TestCase): + """ + Tests for the NMEA receiver. + """ + def setUp(self): + self.receiver = MockNMEAReceiver() + self.adapter = nmea.NMEAAdapter(self.receiver) + self.protocol = nmea.NMEAProtocol(self.adapter) + + + def _receiverTest(self, sentences, expectedFired=(), extraTest=None): + """ + A generic test for NMEA receiver behavior. + + @param sentences: The sequence of sentences to simulate receiving. + @type sentences: iterable of C{str} + @param expectedFired: The names of the callbacks expected to fire. + @type expectedFired: iterable of C{str} + @param extraTest: An optional extra test hook. + @type extraTest: nullary callable + """ + for sentence in sentences: + self.protocol.lineReceived(sentence) + + actuallyFired = self.receiver.called.keys() + self.assertEquals(set(actuallyFired), set(expectedFired)) + + if extraTest is not None: + extraTest() + + self.receiver.clear() + self.adapter.clear() + + + def test_positionErrorUpdateAcrossStates(self): + """ + Tests that the positioning error is updated across multiple states. + """ + sentences = [GPGSA] + GPGSV_SEQ + callbacksFired = ['positionErrorReceived', 'beaconInformationReceived'] + + def checkBeaconInformation(): + beaconInformation = self.adapter._state['beaconInformation'] + self.assertEqual(beaconInformation.seen, 11) + self.assertEqual(beaconInformation.used, 5) + + self._receiverTest(sentences, callbacksFired, checkBeaconInformation) + + + def test_emptyMiddleGSV(self): + """ + Tests that a GSV sentence with empty entries in any position + does not mean the entries in subsequent positions are ignored. + """ + sentences = [GPGSV_EMPTY_MIDDLE] + callbacksFired = ['beaconInformationReceived'] + + def checkBeaconInformation(): + beaconInformation = self.adapter._state['beaconInformation'] + self.assertEqual(beaconInformation.seen, 2) + prns = [satellite.identifier for satellite in beaconInformation] + self.assertIn(13, prns) + + self._receiverTest(sentences, callbacksFired, checkBeaconInformation) + + def test_GGASentences(self): + """ + Tests that a sequence of GGA sentences fires C{positionReceived}, + C{positionErrorReceived} and C{altitudeReceived}. + """ + sentences = [GPGGA] + callbacksFired = ['positionReceived', + 'positionErrorReceived', + 'altitudeReceived'] + + self._receiverTest(sentences, callbacksFired) + + + def test_RMCSentences(self): + """ + Tests that a sequence of RMC sentences fires C{positionReceived}, + C{speedReceived}, C{headingReceived} and C{timeReceived}. + """ + sentences = [GPRMC] + callbacksFired = ['headingReceived', + 'speedReceived', + 'positionReceived', + 'timeReceived'] + + self._receiverTest(sentences, callbacksFired) + + + def test_GSVSentences(self): + """ + Verifies that a complete sequence of GSV sentences fires + C{beaconInformationReceived}. + """ + sentences = [GPGSV_FIRST, GPGSV_MIDDLE, GPGSV_LAST] + callbacksFired = ['beaconInformationReceived'] + + def checkPartialInformation(): + self.assertNotIn('_partialBeaconInformation', self.adapter._state) + + self._receiverTest(sentences, callbacksFired, checkPartialInformation) + + + def test_emptyMiddleEntriesGSVSequence(self): + """ + Verifies that a complete sequence of GSV sentences with empty entries + in the middle still fires C{beaconInformationReceived}. + """ + sentences = [GPGSV_EMPTY_MIDDLE] + self._receiverTest(sentences, ["beaconInformationReceived"]) + + + def test_incompleteGSVSequence(self): + """ + Verifies that an incomplete sequence of GSV sentences does not fire. + """ + sentences = [GPGSV_FIRST] + self._receiverTest(sentences) + + + def test_singleSentenceGSVSequence(self): + """ + Verifies that the parser does not fail badly when the sequence consists + of only one sentence (but is otherwise complete). + """ + sentences = [GPGSV_SINGLE] + self._receiverTest(sentences, ["beaconInformationReceived"]) + + + def test_GLLSentences(self): + """ + Verfies that GLL sentences fire C{positionReceived}. + """ + sentences = [GPGLL_PARTIAL, GPGLL] + self._receiverTest(sentences, ['positionReceived']) + + + def test_HDTSentences(self): + """ + Verfies that HDT sentences fire C{headingReceived}. + """ + sentences = [GPHDT] + self._receiverTest(sentences, ['headingReceived']) + + + def test_mixedSentences(self): + """ + Verifies that a mix of sentences fires the correct callbacks. + """ + sentences = [GPRMC, GPGGA] + callbacksFired = ['altitudeReceived', + 'speedReceived', + 'positionReceived', + 'positionErrorReceived', + 'timeReceived', + 'headingReceived'] + + def checkTime(): + expectedDateTime = datetime.datetime(1994, 3, 23, 12, 35, 19) + self.assertEquals(self.adapter._state['time'], expectedDateTime) + + self._receiverTest(sentences, callbacksFired, checkTime) + + + def test_lotsOfMixedSentences(self): + """ + Tests for an entire gamut of sentences. These are more than you'd + expect from your average consumer GPS device. They have most of the + important information, including beacon information and visibility. + """ + sentences = [GPGSA] + GPGSV_SEQ + [GPRMC, GPGGA, GPGLL] + + callbacksFired = ['headingReceived', + 'beaconInformationReceived', + 'speedReceived', + 'positionReceived', + 'timeReceived', + 'altitudeReceived', + 'positionErrorReceived'] + + self._receiverTest(sentences, callbacksFired) diff --git c/twisted/positioning/test/test_sentence.py w/twisted/positioning/test/test_sentence.py new file mode 100644 index 0000000..25d0474 --- /dev/null +++ w/twisted/positioning/test/test_sentence.py @@ -0,0 +1,162 @@ +# Copyright (c) 2009-2011 Twisted Matrix Laboratories. +# See LICENSE for details. +""" +Tests for positioning sentences. +""" +import itertools +from zope.interface import classProvides + +from twisted.positioning import base, ipositioning +from twisted.trial.unittest import TestCase + + +sentinelValueOne = "someStringValue" +sentinelValueTwo = "someOtherStringValue" + + + +class DummyProtocol(object): + """ + A simple, fake protocol. + """ + classProvides(ipositioning.IPositioningSentenceProducer) + + @staticmethod + def getSentenceAttributes(): + return ["type", sentinelValueOne, sentinelValueTwo] + + + +class DummySentence(base.BaseSentence): + """ + A sentence for L{DummyProtocol}. + """ + ALLOWED_ATTRIBUTES = DummyProtocol.getSentenceAttributes() + + + +class MixinProtocol(base.PositioningSentenceProducerMixin): + """ + A simple, fake protocol that declaratively tells you the sentences + it produces using L{base.PositioningSentenceProducerMixin}. + """ + SENTENCE_CONTENTS = { + None: [ + sentinelValueOne, + sentinelValueTwo, + None # see MixinTests.test_noNoneInSentenceAttributes + ], + } + + + +class MixinSentence(base.BaseSentence): + """ + A sentence for L{MixinProtocol}. + """ + ALLOWED_ATTRIBUTES = MixinProtocol.getSentenceAttributes() + + + +class SentenceTestsMixin: + """ + Tests for positioning protocols and their respective sentences. + """ + def test_attributeAccess(self): + """ + Tests that accessing a sentence attribute gets the correct value, and + accessing an unset attribute (which is specified as being a valid + sentence attribute) gets C{None}. + """ + thisSentinel = object() + sentence = self.sentenceClass({sentinelValueOne: thisSentinel}) + self.assertEquals(getattr(sentence, sentinelValueOne), thisSentinel) + self.assertEquals(getattr(sentence, sentinelValueTwo), None) + + + def test_raiseOnMissingAttributeAccess(self): + """ + Tests that accessing a nonexistant attribute raises C{AttributeError}. + """ + sentence = self.sentenceClass({}) + self.assertRaises(AttributeError, getattr, sentence, "BOGUS") + + + def test_raiseOnBadAttributeAccess(self): + """ + Tests that accessing bogus attributes raises C{AttributeError}, *even* + when that attribute actually is in the sentence data. + """ + sentence = self.sentenceClass({"BOGUS": None}) + self.assertRaises(AttributeError, getattr, sentence, "BOGUS") + + + sentenceType = "tummies" + reprTemplate = "<%s (%s) {%s}>" + + + def _expectedRepr(self, sentenceType="unknown type", dataRepr=""): + """ + Builds the expected repr for a sentence. + """ + clsName = self.sentenceClass.__name__ + return self.reprTemplate % (clsName, sentenceType, dataRepr) + + + def test_unknownTypeRepr(self): + """ + Test the repr of an empty sentence of unknown type. + """ + sentence = self.sentenceClass({}) + expectedRepr = self._expectedRepr() + self.assertEqual(repr(sentence), expectedRepr) + + + def test_knownTypeRepr(self): + """ + Test the repr of an empty sentence of known type. + """ + sentence = self.sentenceClass({"type": self.sentenceType}) + expectedRepr = self._expectedRepr(self.sentenceType) + self.assertEqual(repr(sentence), expectedRepr) + + + +class DummyTests(TestCase, SentenceTestsMixin): + """ + Tests for protocol classes that implement the appropriate interface + (L{ipositioning.IPositioningSentenceProducer}) manually. + """ + def setUp(self): + self.protocol = DummyProtocol() + self.sentenceClass = DummySentence + + + +class MixinTests(TestCase, SentenceTestsMixin): + """ + Tests for protocols deriving from L{base.PositioningSentenceProducerMixin} + and their sentences. + """ + def setUp(self): + self.protocol = MixinProtocol() + self.sentenceClass = MixinSentence + + + def test_noNoneInSentenceAttributes(self): + """ + Tests that C{None} does not appear in the sentence attributes of the + protocol, even though it's in the specification. + + This is because C{None} is a placeholder for parts of the sentence you + don't really need or want, but there are some bits later on in the + sentence that you do want. The alternative would be to have to specify + things like "_UNUSED0", "_UNUSED1"... which would end up cluttering + the sentence data and eventually adapter state. + """ + sentenceAttributes = self.protocol.getSentenceAttributes() + self.assertNotIn(None, sentenceAttributes) + + sentenceContents = self.protocol.SENTENCE_CONTENTS + sentenceSpecAttributes = itertools.chain(*sentenceContents.values()) + self.assertIn(None, sentenceSpecAttributes)