=== added directory 'twisted/positioning' === added file 'twisted/positioning/__init__.py' === added file 'twisted/positioning/base.py' --- twisted/positioning/base.py 1970-01-01 00:00:00 +0000 +++ twisted/positioning/base.py 2009-08-30 10:59:31 +0000 @@ -0,0 +1,779 @@ +# -*- encoding: utf-8 -*- +# -*- test-case-name: twisted.positioning.test.test_base -*- +# Copyright (c) 2009 Twisted Matrix Laboratories. +# See LICENSE for details. +""" +Generic positioning base classes. +""" +from zope.interface import implements +from twisted.positioning import ipositioning + +MPS_PER_KNOT = 0.5144444444444444 +MPS_PER_KPH = 0.27777777777777777 +METERS_PER_FOOT = 0.3048 + + +class BasePositioningReceiver(object): + implements(ipositioning.IPositioningReceiver) + def headingReceived(self, heading): + """ + Implements C{IPositioningReceiver.headingReceived} stub. + """ + + + def speedReceived(self, speed): + """ + Implements C{IPositioningReceiver.speedReceived} stub. + """ + + + def climbReceived(self, climb): + """ + Implements C{IPositioningReceiver.climbReceived} stub. + """ + + + def positionReceived(self, latitude, longitude): + """ + Implements C{IPositioningReceiver.positionReceived} stub. + """ + + + def positioningErrorReceived(self, positioningError): + """ + Implements C{IPositioningReceiver.positioningErrorReceived} stub. + """ + + + def altitudeReceived(self, altitude): + """ + Implements C{IPositioningReceiver.altitudeReceived} stub. + """ + + + def beaconInformationReceived(self, beaconInformation): + """ + Implements C{IPositioningReceiver.beaconInformationReceived} stub. + """ + + + +class InvalidSentence(Exception): + """ + An exception raised when a sentence is invalid. + """ + + + +class InvalidChecksum(Exception): + """ + An exception raised when the checksum of a sentence is invalid. + """ + + +class Heading(object): + """ + The heading of a mobile object. + + @ivar heading: The heading of a mobile object. C{None} if unknown. + @type heading: C{float} or C{NoneType} + @ivar variation: The (optional) variation. + The sign of the variation is positive for variations towards the east + (clockwise from north), and negative for variations towards the west + (counterclockwise from north). + If the variation is unknown or not applicable, this is C{None}. + @type variation: C{float} or C{NoneType}. + """ + def __init__(self, heading=None, variation=None): + """ + Initializes a heading with an optional variation. + """ + self.heading = heading + self.variation = variation + + if self.heading is not None and not 0 <= self.heading < 360: + raise ValueError("Bad heading (%s)" % (self.heading,)) + if self.variation is not None and not -180 < self.variation < 180: + raise ValueError("Bad variation (%s)" % (self.variation,)) + + + #@staticmethod + def _clipAngle(angle): + """ + TODO: document + """ + while angle <= 0: + angle += 360 + + while angle >= 360: + angle -= 360 + + return angle + + + _clipAngle = staticmethod(_clipAngle) + + + def _getCorrectedHeading(self): + """ + Corrects the heading by the given variation. This is sometimes known as + the true heading. + + @return: The heading, corrected by the variation. If the variation is + unknown, returns None. + @rtype: C{float} or C{NoneType} + """ + if self.variation is None: + return None + + return self._clipAngle(self.heading - self.variation) + + + correctedHeading=property(fget=_getCorrectedHeading, doc= + """ + Returns the heading, corrected for variation. + + If the variation is None, returns None. + + This calculates the raw heading minus the variation and then coerces + the value to something between 0 and 360. + + @return The corrected heading. If it is unknown or not applicable to + this heading object, returns C{None}. + @rtype: C{float} or C{NoneType} + """) + + + def __float__(self): + """ + Returns this heading as a float. + + @return: The float value of this heading. + @rtype: C{float} + """ + return self.heading + + + def __imul__(self, factor): + """ + Multiplies the variation in place. + + Note that this multiplies the variation, not the actual heading + itself. This is because multiplying the angle generally has no meaning, + wheras multiplying the variation is commonly useful (for example, for + setting the sign of the variation). + + @param factor: The factor to multiply with. + @type factor: C{float} + + @return: The mutated C{self}. + @post: The variation will be equal to the current value, multiplied by + the parameter value. + """ + self.variation *= factor + return self + + + def __repr__(self): + """ + Returns a debugging representation of a heading object. + """ + return "" % (self.heading, self.variation) + + + def __eq__(self, other): + """ + Compares two heading objects for equality. + """ + return self.heading == other.heading and \ + self.variation == other.variation + + + +class Altitude(object): + """ + An altitude. + + @ivar altitude: The altitude represented by this object, in meters. + @type altitude: C{float} + + @ivar altitudeInFeet: As above, but expressed in feet. + @type altitudeInFeet: C{float} + + The value in meters is the default. For ease of use, this value is used + when converting this object to a float: + + >>> altitude = Altitude(100) # 100 meters + >>> float(altitude) + 100.0 + >>> "The altitude is %.1f meters." % (altitude,) + 'The altitude is 100.0 meters.' + """ + def __init__(self, altitude): + """ + Initializes an altitude. + + @param altitude: The altitude in meters. + @type altitude: C{float} + """ + self.altitude = float(altitude) + + + def _getAltitudeInFeet(self): + """ + Gets the altitude of this altitude in feet. + """ + return self.altitude / METERS_PER_FOOT + + + inFeet = property(fget = _getAltitudeInFeet, doc= + """ + Returns the altitude represented by this object expressed in feet. + + @return: The altitude represented by this object expressed in feet. + @rtype: C{float} + """) + + + def __float__(self): + """ + Returns the altitude represented by this object expressed in meters. + """ + return self.altitude + + + def __eq__(self, other): + """ + Compares two altitudes for equality + + @param other: The Altitude to compare to. + @return: C{True} if the other altitude is equal to this altitude, + C{False} otherwise. + @rtype: C{bool} + """ + return self.altitude == other.altitude + + + def __repr__(self): + """ + Returns a string representation of this Altitude object. + """ + return "" % (self.altitude,) + + + +class Speed(object): + """ + An object representing the speed of a mobile object. + + @ivar speed: The speed that this object represents, in meters per second. + @type speed: C{float} + + @ivar speedInKnots: Same as above, but in knots. + @type speedInKnots: C{float} + + When converted to a float, this object will represent the speed in meters + per second. + """ + def __init__(self, speed): + """ + Initializes a speed. + + @param speed: The speed that this object represents, expressed in + meters per second. +o @type speed: C{float} + + If the provided speed is smaller than zero, raises ValueError. + + >>> Speed(-1.0) + Traceback (most recent call last): + ... + ValueError: negative speed: -1.0 + """ + if speed < 0: + raise ValueError("negative speed: %r" % (speed,)) + + self.speed = float(speed) + + + def _getSpeedInKnots(self): + return self.speed / MPS_PER_KNOT + + + speedInKnots = property(fget=_getSpeedInKnots, doc= + """ + Gets the speed represented by this object, expressed in knots. + """) + + + def __eq__(self, other): + """ + Compares two speeds for equality. + """ + return self.speed == other.speed + + + def __float__(self): + """ + Returns speed that this object represents, in meters per second. + """ + return self.speed + + + def __repr__(self): + """ + Returns a string representation of this Speed object. + """ + return "" % round(self.speed, 2) + + + +class Coordinate(object): + """ + A coordinate. + + @ivar angle: The value of the coordinate in decimal degrees, with the usual + rules for sign (northern and eastern hemispheres are positive, southern + and western hemispheres are negative). + @type angle: C{float} + + @ivar type: An optional description of the type of coordinate + this object. Should be one of ("latitude", "longitude"). + @type type: C{str} + """ + def __init__(self, angle, type=None): + """ + Initializes a coordinate. + + @param angle: The angle of this coordinate in decimal degrees. The + hemisphere is determined by the sign (north and east are positive). + @type angle: C{float} + + @param type: One of ("latitude", "longitude"). Used to return + hemisphere names. + @type type: C{str} + """ + self.angle = angle + self.type = type + + + def _getDMS(self): + """ + Gets the angle of this coordinate as a degrees, minutes, seconds tuple. + """ + degrees = abs(int(self.angle)) + fractionalDegrees = abs(self.angle - int(self.angle)) + decimalMinutes = 60 * fractionalDegrees + + minutes = int(decimalMinutes) + fractionalMinutes = decimalMinutes - int(decimalMinutes) + decimalSeconds = 60 * fractionalMinutes + + return degrees, minutes, int(round(decimalSeconds)) + + + inDegreesMinutesSeconds = property(fget=_getDMS, doc= + """ + Returns the angle of this coordinate in degrees, minutes, seconds. + """) + + + def _getHemisphere(self): + """ + Gets the hemisphere of this coordinate (one of ('N', 'E', 'S', 'W')). + """ + # REVIEW: switch inner and outer jmps, then use lookup dict? :-) + # (Or something evil like abusing indexing with a boolean (True == 1)) + if self.type == "latitude": + if self.angle > 0: + return "N" + else: + return "S" + elif self.type == "longitude": + if self.angle > 0: + return 'E' + else: + return "W" + else: + raise ValueError("Unknown coordinate type %s" % (self.type,)) + + + hemisphere = property(fget=_getHemisphere, doc= + """ + The hemisphere of this coordinate (one of ('N', 'E', 'S', 'W')). + + The hemisphere is determined from the coordinate type (latitude or + longitude) and signage of the angle in decimal degrees. + + >>> Coordinate(1.0, "latitude").hemisphere + 'N' + >>> Coordinate(-1.0, "latitude").hemisphere + 'S' + >>> Coordinate(1.0, "longitude").hemisphere + 'E' + >>> Coordinate(-1.0, "longitude").hemisphere + 'W' + """) + + + def setSign(self, sign): + """ + Sets the sign of this coordinate. + + @param sign: The sign to set, 1 for positive, -1 for negative signs. + @type sign: C{int} + """ + if sign not in (-1, 1): + raise ValueError("bad sign (got %s, expected -1 or 1)" % sign) + + self.angle = sign * abs(self.angle) + + + def __eq__(self, other): + """ + Compares two coordinates for equality. + """ + return self.type == other.type \ + and self.angle == other.angle + + + def __imul__(self, factor): + """ + Multiplies this coordinate in place. + + This is particularly useful for setting the sign of a coordinate + (multiplying by -1 or 1). + + @param factor: The factor to multiply with. + @type factor: numeric + + @return: The mutated C{self}. + @post: The coordinate will be equal to the current value, multiplied by + the parameter value. + """ + self.angle *= factor + return self + + + def __repr__(self): + """ + Returns a string representation of this coordinate. + + @return: The string representation. + @rtype: C{str} + """ + if self.type in ('latitude', 'longitude'): + coordinateType = self.type.title() + else: + coordinateType = "Coordinate of unknown type %s" % self.type + + return "<%s (%s degrees)>" % (coordinateType, round(self.angle, 2),) + + + +class PositioningError(object): + """ + Positioning error information. + """ + + ALLOWABLE_TRESHOLD = 0.01 + def __init__(self, pdop=None, hdop=None, vdop=None, testInvariant=False): + """ + Initializes a positioning error object. + + @param pdop: The position dilution of precision. + @type pdop: C{float} + @param hdop: The horizontal dilution of precision. + @type hdop: C{float} + @param vdop: The vertical dilution of precision. + @type vdop: C{float} + @param testInvariant: Flag to test if the DOP invariant is valid or + not. If C{True}, the invariant (PDOP = (HDOP**2 + VDOP**2)*.5) is + checked at every mutation. By default, this is false, because the + vast majority of DOP-providing devices ignore this invariant. + @type testInvariant: c{bool} + """ + self._pdop = pdop + self._hdop = hdop + self._vdop = vdop + + self._testInvariant = testInvariant + self._testDilutionOfPositionInvariant() + + + def _testDilutionOfPositionInvariant(self): + """ + Tests if this positioning error object satisfies the dilution of + position invariant (PDOP = (HDOP**2 + VDOP**2)*.5), unless the + C{self._testInvariant} instance variable is C{False}. + + @return: C{None} if the invariant was not satisifed or not tested. + @raises: C{ValueError} if the invariant was not satisfied. + """ + if not self._testInvariant: + return + + for x in (self.pdop, self.hdop, self.vdop): + if x is None: + return + + delta = abs(self.pdop - (self.hdop**2 + self.vdop**2)**.5) + if delta > self.ALLOWABLE_TRESHOLD: + raise ValueError("invalid combination of dilutions of precision: " + "position: %s, horizontal: %s, vertical: %s" + % (self.pdop, self.hdop, self.vdop)) + + + DOP_EXPRESSIONS = { + '_pdop': lambda self: (self._hdop**2 + self._vdop**2)**.5, + '_hdop': lambda self: (self._pdop**2 - self._vdop**2)**.5, + '_vdop': lambda self: (self._pdop**2 - self._hdop**2)**.5, + } + + + def _getDOP(self, dopType): + """ + Gets a particular dilution of position value + """ + attributeName = "_" + dopType + + if getattr(self, attributeName) is not None: + # known + return getattr(self, attributeName) + + + # REVIEW: perhaps we should replace this with a simple try/except? + others = (dop for dop in self.DOP_EXPRESSIONS if dop != attributeName) + for dop in others: + if getattr(self, dop) is None: + # At least one other DOP is None, can't calculate the last one. + return None + + # !known && calculable + return self.DOP_EXPRESSIONS[attributeName](self) + + + def _setDOP(self, dopType, value): + """ + Sets a particular dilution of position value. + + @param dopType: The type of dilution of position to set. One of + ('pdop', 'hdop', 'vdop'). + @type dopType: C{str} + + @param value: The value to set the dilution of position type to. + @type value: C{float} + """ + attributeName = "_" + dopType + setattr(self, attributeName, float(value)) + self._testDilutionOfPositionInvariant() + + + pdop = property( + fget=lambda self: self._getDOP('pdop'), + fset=lambda self, value: self._setDOP('pdop', value), + doc=""" + Returns (or calculates, if not directly available) the position + dilution of precision. + + @return: The position dilution of precision. + @rtype: C{float} or C{NoneType} if unknown + """) + + + hdop = property( + fget=lambda self: self._getDOP('hdop'), + fset=lambda self, value: self._setDOP('hdop', value), + doc=""" + Returns (or calculates, if not directly available) the horizontal + dilution of precision. + + @return: The horizontal dilution of precision. + @rtype: C{float} or C{NoneType} if unknown + """) + + + vdop = property( + fget=lambda self: self._getDOP('vdop'), + fset=lambda self, value: self._setDOP('vdop', value), + doc=""" + Returns (or calculates, if not directly available) the vertical + dilution of precision. + + @return: The vertical dilution of precision. + @rtype: C{float} or C{NoneType} if unknown + """) + + def __eq__(self, other): + """ + Compares two PositionErrors for equality. + + @return C{True} if the two positioning errors are equal (all the + relevant attributes are equal), C{False} otherwise. + @rtype: C{bool} + """ + return self.pdop == other.pdop \ + and self.hdop == other.hdop \ + and self.vdop == other.vdop + + + def __repr__(self): + """ + Returns a debugging representation of a positioning error. + """ + return "" \ + % (self.pdop, self.hdop, self.vdop) + + + +class BeaconInformation(object): + """ + Information about positioning beacons (a generalized term for the reference + objects that help you determine your position, such as satellites or cell + towers). + + @ivar beacons: A set of visible beacons. Note that visible beacons are not + necessarily used in acquiring a postioning fix. + @type beacons: C{set} of C{IPositioningBeacon} + + TODO: document seen/used ivars + + This object may be iterated to yield these beacons. + """ + def __init__(self, seen=0, used=0, beacons=None): + """ + Initializes a beacon information object. + + @ivar beacons: A collection of beacons in this beacon information + object. + @type beacons: iterable + """ + self.seen = int(seen) + self.used = int(used) + self.beacons = set(beacons or []) + + + def _getUsedBeacons(self): + return (x for x in self.beacons.value if x.isUsed) + + + usedBeacons = property(fget=_getUsedBeacons, doc= + """ + Yields the used beacons in this BeaconInformation object. + + This is different from BeaconInformation.__iter__ because it only + yields beacons that are actually used in obtaining the fix. + """) + + + def __iter__(self): + """ + Yields the beacons in this beacon information object. + """ + for beacon in self.beacons: + yield beacon + + + def __repr__(self): + """ + Returns a string representation of this beacon information object. + + @return: The string representation. + @rtype: C{str} + """ + beaconReprs = "".join(repr(beacon) for beacon in self.beacons) + return "" % beaconReprs + + + +class PositioningBeacon(object): + """ + A positioning beacon. + + @ivar identifier: The unqiue identifier for this satellite. This is usually + an integer. For GPS, this is also known as the PRN. + @type identifier: Pretty much anything that can be used as a unique + identifier. Depends on the implementation. + @ivar isUsed: True if the satellite is currently being used to acchieve a + fix, False if it is not currently being used, None if unknown. + @type isUsed: c{bool} or C{None} + """ + def __init__(self, identifier, isUsed=None): + self.identifier = identifier + self.isUsed = isUsed + + + def __hash__(self): + """ + Returns the identifier for this beacon. + """ + return self.identifier + + + def _usedRepr(self): + """ + Returns a single character representation of the status of this + satellite in terms of being used for attaining a positioning fix. + + @return: One of ("Y", "N", "?") depending on the status of the + satellite. + @rval: C{str} + """ + return {True: "Y", False: "N", None: "?"}[self.isUsed] + + + def __repr__(self): + """ + Returns a string representation of this beacon. + + @return: The string representation. + @rtype: C{str} + """ + return "" \ + % (self.identifier, self._usedRepr()) + + +class Satellite(PositioningBeacon): + """ + A satellite. + + @ivar azimuth: The azimuth of the satellite. This is the heading (positive + angle relative to true north) where the satellite appears to be to the + device. + @ivar elevation: The (positive) angle above the horizon where this + satellite appears to be to the device. + @ivar snr: The signal to noise ratio of the signal coming from this + satellite. + """ + def __init__(self, identifier, azimuth, elevation, snr, isUsed=None): + """ + Initializes a satellite object. + + @param azimuth: The azimuth of the satellite (see instance variable + documentation). + @type azimuth: C{float} + @param elevation: The elevation of the satellite (see instance variable + documentation). + @type elevation: C{float} + @param snr: The signal to noise ratio of the connection to this + satellite (see instance variable documentation). + @type snr: C{float} + + """ + super(Satellite, self).__init__(int(identifier), isUsed) + + # TODO: remove these float casts, make default arguments None + self.azimuth = float(azimuth) + self.elevation = float(elevation) + self.signalToNoiseRatio = float(snr) + + + def __repr__(self): + """ + Returns a string representation of this Satellite. + + @return: The string representation. + @rtype: C{str} + """ + return "" \ + % (self.identifier, self.azimuth, self.elevation, self._usedRepr()) === added file 'twisted/positioning/ipositioning.py' --- twisted/positioning/ipositioning.py 1970-01-01 00:00:00 +0000 +++ twisted/positioning/ipositioning.py 2009-08-30 10:56:48 +0000 @@ -0,0 +1,97 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2009 Twisted Matrix Laboratories. +# See LICENSE for details. +""" +Positioning interface. +""" +from zope.interface import Interface + + +class IPositioningReceiver(Interface): + """ + An interface for positioning providers. + """ + def positionReceived(latitude, longitude): + """ + Method called when a position is received. + + @param latitude: The latitude of the received position. + @type latitude: C{Coordinate} + @param longitude: The longitude of the received position. + @type longitude: C{Coordinate} + """ + + + def positioningErrorReceived(positioningError): + """ + Method called when positioning error is received. + + @param positioningError: The positioning error. + TODO: Create and document type. + """ + + def timeReceived(time, date): + """ + Method called when time and date information arrives. + + @param time: The time (in UTC unless otherwise specified). + @type time: C{datetime.time} + @param date: The date. + @type date: C{datetime.date} + """ + + def headingReceived(heading): + """ + Method called when a true heading is received. + + @param heading: The heading. + @type heading: C{Heading} + """ + + + def altitudeReceived(altitude): + """ + Method called when an altitude is received. + + @param altitude: The altitude. + @type altitude: C{twisted.positioning.base.Altitude} + """ + + + def speedReceived(speed): + """ + Method called when the speed is received. + + @param speed: The speed of a mobile object. + @type speed: C{twisted.positioning.base.Speed} + """ + + + def climbReceived(climb): + """ + Method called when the climb is received. + + @param climb: The climb of the mobile object. + TODO: Create and document type. + """ + + def beaconInformationReceived(beaconInformation): + """ + Method called when positioning beacon information is received. + + @param beaconInformation: The beacon information. + @type C{twisted.positioning.base.BeaconInformation} + """ + + +class INMEAReceiver(Interface): + """ + An object that can receive NMEA data. + """ + def sentenceReceived(sentence): + """ + Method called when a sentence is received. + + @param sentence: The received NMEA sentence. + @type C{twisted.positioning.nmea.NMEASentence} + """ === added file 'twisted/positioning/nmea.py' --- twisted/positioning/nmea.py 1970-01-01 00:00:00 +0000 +++ twisted/positioning/nmea.py 2009-08-30 16:42:44 +0000 @@ -0,0 +1,882 @@ +# -*- encoding: utf-8 -*- +# -*- test-case-name: twisted.positioning.test.test_nmea -*- +# Copyright (c) 2009 Twisted Matrix Laboratories. +# See LICENSE for details. +""" +Classes for using NMEAProtocol sentences. +""" +import itertools +import operator +import datetime +from zope.interface import implements + +import twisted.protocols.basic +from twisted.positioning import base, ipositioning + + +class NMEAProtocol(twisted.protocols.basic.LineReceiver): + """ + A protocol that parses and verifies the checksum of an NMEAProtocol sentence, and + delegates to a receiver. + + Responsibilities: + - receiving lines (which are hopefully sentences) + - verifying their checksum + - unpacking them (mapping of sentence element keys to their values) + - creating C{NMEASentence} objects + - passing them to the receiver. + """ + def __init__(self, receiver): + """ + Initializes an receiver for NMEAProtocol sentences. + + @param receiver: A receiver for NMEAProtocol sentence objects. + @type receiver: L{INMEAReceiver} + """ + self.receiver = receiver + + + METHOD_PREFIX = "nmea_" + + + def lineReceived(self, rawSentence): + """ + Parses the data from the sentence and validates the checksum. + + @param rawSentence: The raw positioning sentence. + @type rawSentence: C{str} (bytestring encoded in ascii) + """ + sentence = rawSentence.strip() + + self.validateChecksum(sentence) + splitSentence = self.splitSentence(sentence) + + sentenceType, contents = splitSentence[0], splitSentence[1:] + + keys = self.SENTENCE_CONTENTS.get(sentenceType, None) + callback = getattr(self, self.METHOD_PREFIX + sentenceType, None) + + if keys is None or callback is None: + raise ValueError("unknown sentence type %s" % sentenceType) + + sentenceData = {"type": sentenceType} + for key, value in itertools.izip(keys, contents): + if key is not None and value != "": + sentenceData[key] = value + + sentence = NMEASentence(sentenceData) + + callback(sentence) + + if self.receiver is not None: + self.receiver.sentenceReceived(sentence) + + + #@staticmethod + def validateChecksum(sentence): + """ + Validates the checksum of an NMEAProtocol sentence. + + Does nothing (except implicitly return None, of course) on sentences + with valid checksums. + + >>> NMEASentence.validateChecksum( + ... '$GPGGA,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,*47' + ... ) + + Same thing on sentences missing a checksum: + + >>> NMEASentence.validateChecksum( + ... '$GPGGA,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,*') + + Will raise an exception on sentences with a missing checksum: + + >>> NMEASentence.validateChecksum( + ... '$GPGGA,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,*46' + ... ) + Traceback (most recent call last): + ... + InvalidChecksum: 71 != 70 + """ + if sentence[-3] == '*': # sentence has a checksum + reference, source = int(sentence[-2:], 16), sentence[1:-3] + computed = reduce(operator.xor, (ord(x) for x in source)) + if computed != reference: + raise base.InvalidChecksum("%02x != %02x" + % (computed, reference)) + + + validateChecksum = staticmethod(validateChecksum) + + + #@staticmethod + def splitSentence(sentence): + """ + Returns the split version of the sentence, minus header and checksum. + + >>> NMEASentence.splitSentence("$GPGGA,spam,eggs*00") + ['GPGGA', 'spam', 'eggs'] + """ + if sentence[-3] == "*": # sentence with checksum + return sentence[1:-3].split(',') + elif sentence[-1] == "*": # sentence without checksum + return sentence[1:-1].split(',') + else: + raise base.InvalidSentence("malformed sentence %s" % sentence) + + + splitSentence = staticmethod(splitSentence) + + + def nmea_GPGGA(self, sentence): + """ + Callback called when a GGA sentence is received. + """ + + + def nmea_GPRMC(self, sentence): + """ + Callback called when an RMC sentence is received. + """ + + + def nmea_GPGSV(self, sentence): + """ + Callback called when a GSV sentence is received. + """ + + + def nmea_GPGLL(self, sentence): + """ + Callback called when a GGL sentence is received. + """ + + + def nmea_GPHDT(self, sentence): + """ + Callback called when an HDT sentence is received. + """ + + + def nmea_GPGSA(self, sentence): + """ + Callback called when a GSA sentence is received. + """ + + + + SENTENCE_CONTENTS = { + 'GPGGA': [ + 'timestamp', + + 'latitudeFloat', + 'latitudeHemisphere', + 'longitudeFloat', + 'longitudeHemisphere', + + 'validGGA', + 'numberOfSatellitesSeen', + 'horizontalDilutionOfPrecision', + + 'altitude', + 'altitudeUnits', + 'heightOfGeoidAboveWGS84', + 'heightOfGeoidAboveWGS84Units', + + # TODO: DGPS information + ], + + 'GPRMC': [ + 'timestamp', + + 'validRMC', + + 'latitudeFloat', + 'latitudeHemisphere', + 'longitudeFloat', + 'longitudeHemisphere', + + 'speedInKnots', + + 'trueHeading', + + 'datestamp', + + 'magneticVariation', + 'magneticVariationDirection', + ], + + 'GPGSV': [ + 'numberOfGSVSentences', + 'GSVSentenceIndex', + + 'numberOfSatellitesSeen', + + 'satellitePRN_0', + 'elevation_0', + 'azimuth_0', + 'signalToNoiseRatio_0', + + 'satellitePRN_1', + 'elevation_1', + 'azimuth_1', + 'signalToNoiseRatio_1', + + 'satellitePRN_2', + 'elevation_2', + 'azimuth_2', + 'signalToNoiseRatio_2', + + 'satellitePRN_3', + 'elevation_3', + 'azimuth_3', + 'signalToNoiseRatio_3', + ], + + 'GPGLL': [ + 'latitudeFloat', + 'latitudeHemisphere', + 'longitudeFloat', + 'longitudeHemisphere', + 'timestamp', + 'validGLL', + ], + + 'GPHDT': [ + 'trueHeading', + ], + + 'GPTRF': [ + 'datestamp', + 'timestamp', + + 'latitudeFloat', + 'latitudeHemisphere', + 'longitudeFloat', + 'longitudeHemisphere', + + # TODO: actually use these: + 'elevation', + 'numberOfIterations', + 'numberOfDopplerIntervals', + 'updateDistanceInNauticalMiles', + 'satellitePRN', + ], + + 'GPGSA': [ + None, # like GPRMCMode + None, # like GPGGAFixQuality + + 'usedSatellitePRN_0', + 'usedSatellitePRN_1', + 'usedSatellitePRN_2', + 'usedSatellitePRN_3', + 'usedSatellitePRN_4', + 'usedSatellitePRN_5', + 'usedSatellitePRN_6', + 'usedSatellitePRN_7', + 'usedSatellitePRN_8', + 'usedSatellitePRN_9', + 'usedSatellitePRN_10', + 'usedSatellitePRN_11', + + 'positionDilutionOfPrecision', + 'horizontalDilutionOfPrecision', + 'verticalDilutionOfPrecision', + ] + } + + + +class NMEASentence(object): + """ + An object representing an NMEAProtocol sentence. + + The attributes of this objects are raw NMEAProtocol representations, which + are bytestrings encoded in ASCII. + + @ivar type: The sentence type ("GPGGA", "GPGSV"...). + + This object contains the raw NMEAProtocol representations in a sentence. + Not all of these necessarily have to be present in the sentence. Missing + attributes are None when accessed. + + @ivar timestamp: An NMEAProtocol timestamp. ("123456" -> 12:34:56Z) + + @ivar latitudeFloat: The NMEAProtocol angular representation of a latitude + (for example: "1234.567" -> 12 degrees, 34.567 minutes). + @ivar latitudeHemisphere: The NMEAProtocol representation of a latitudinal + hemisphere ("N" or "S"). + @ivar longitudeFloat: The NMEAProtocol angular representation of a + longitude. See C{latitudeFloat} for an example. + @ivar longitudeHemisphere: The NMEAProtocol representation of a + longitudinal hemisphere ("E" or "W"). + + TODO: finish documenting these attributes + + """ + def __init__(self, sentenceData): + """ + Initializes an NMEAProtocol sentence from parsed sentence data. + """ + super(NMEASentence, self).__init__() + self._sentenceData = sentenceData + + ## REVIEW: would properties like this be okay? Does that mean we can remove + ## the ivar docs in the class docstring? Would a bunch of getters, like: + ## + ## def _getLatitudeFloat(self): + ## return self._sentenceData['latitudeFloat'] + ## + ## and then property(fget=_getLatitudeFloat) be preferable? That seems like + ## a lot of repetitive code. + latitudeFloat = property( + fget=lambda self: self._sentenceData['latitudeFloat'], + doc=""" + The NMEAProtocol angular representation of a latitude. + + For example: "1234.567" -> 12 degrees, 34.567 minutes. + """) + + def __getattr__(self, name): + """ + Gets an attribute from the internal sentence dictionary. + """ + # TODO: remove + return self._sentenceData.get(name, None) + + + def _isFirstGSVSentence(self): + """ + Tests if this current GSV sentence is the first one in a sequence. + """ + return self.GSVSentenceIndex == "1" + + + def _isLastGSVSentence(self): + """ + Tests if this current GSV sentence is the final one in a sequence. + """ + return self.GSVSentenceIndex == self.numberOfGSVSentences + + + keys = property(fget=lambda self: iter(self._sentenceData), doc= + """ + Returns an iterator that iterates over the names of attributes present + in this sentence. + """) + + + def __repr__(self): + """ + Returns a textual representation of this NMEA sentence. + + Note that this object represents a sentence that has already been + parsed -- this method does not return the raw serialized NMEA sentence. + """ + return "" % (repr(self._sentenceData),) + + + +MODE_AUTO, MODE_MANUAL = 'A', 'M' +RMC_INVALID, RMC_VALID = 'V', 'A' +GLL_INVALID, GLL_VALID = 'V', 'A' +GGA_INVALID, GGA_GPS_FIX, GGA_DGPS_FIX = 1, 2, 3 + + + +class NMEAAdapter(object): + """ + An adapter from NMEAProtocol receivers to positioning receivers. + + @cvar DATESTAMP_HANDLING: Way to handle dates. One of (C{'intelligent'} + (default, if the last two digits are greater than the intelligent data + threshold, assumes twentieth century, otherwise assumes twenty-first + century.), C{'19xx'} (assumes dates start with '19'), or C{'20xx'} + (assumes dates start with '20'). + @type DATESTAMP_HANDLING: C{str} + + @cvar INTELLIGENT_DATE_THRESHOLD: The threshold that determines which + century we guess a year is in. If the year value in a sentence is above + this value, assumes the 20th century (19xx), otherwise assumes the + twenty-first century (20xx). + @type INTELLIGENT_DATE_THRESHOLD: C{int} + """ + implements(ipositioning.INMEAReceiver) + + + def __init__(self, positioningReceiver): + """ + Initializes a new NMEA adapter. + + @param positioningReceiver: The receiver for positioning sentences. + @type positioningReceiver: C{ipositioning.IPositioningReceiver} + """ + self._state = {} + self._sentenceData = {} + self._receiver = positioningReceiver + + + def _fixTimestamp(self): + """ + Turns the NMEAProtocol timestamp notation into a datetime.time object. + The time in this object is expressed as Zulu time. + """ + timestamp = self.currentSentence.timestamp.split('.')[0] + timeObject = datetime.datetime.strptime(timestamp, '%H%M%S').time() + self._sentenceData['time'] = timeObject + + + DATESTAMP_HANDLING = 'intelligent' + INTELLIGENT_DATE_THRESHOLD = 80 + + + def _fixDatestamp(self): + """ + Turns an NMEA datestamp format into a Python datetime.date object. + """ + datestamp = self.currentSentence.datestamp + + day, month, year = [int(ordinalString) for ordinalString in + (datestamp[0:2], datestamp[2:4], datestamp[4:6])] + + # REVIEW: this can be optimized into a dict lookup + call, if I could + # have ternaries in 2.3 to do the intelligent datestamp handling. + if self.DATESTAMP_HANDLING == 'intelligent': + if year > self.INTELLIGENT_DATE_THRESHOLD: + year = int('19%02d' % year) + else: + year = int('20%02d' % year) + + elif self.DATESTAMP_HANDLING == '20xx': + year = int('20%02d' % year) + + elif self.DATESTAMP_HANDLING == '19xx': + year = int('19%02d' % year) + + else: + raise ValueError("unknown datestamp handling method (%s)" + % (self.DATESTAMP_HANDLING,)) + + self._sentenceData['date'] = datetime.date(year, month, day) + + + def _fixCoordinateFloat(self, coordinate): + """ + Turns the NMEAProtocol coordinate format into Python float. + + @param coordinate: The coordinate type: 'latitude' or 'longitude'. + @type coordinate: C{str} + """ + nmeaCoordinate = getattr(self.currentSentence, coordinate + 'Float') + left, right = nmeaCoordinate.split('.') + degrees, minutes = int(left[:-2]), float("%s.%s" % (left[-2:], right)) + + self._sentenceData[coordinate] = base.Coordinate(degrees + minutes/60, + coordinate) + + + ALLOWED_HEMISPHERE_LETTERS = { + "latitude": "NS", + 'longitude': "EW", + "magneticVariation": "EW", + } + + + def _fixHemisphereSign(self, coordinate, sentenceDataKey=None): + """ + Fixes the sign for a hemisphere. + + @param coordinate: Coordinate type (latitude, longitude or + magneticVariation). + @type coordinate: C{str} + + This method must be called after the magnitude for the thing it + determines the sign of has been set. This is done by the following + functions: + + - C{self.FIXERS['magneticVariation']} + - C{self.FIXERS['latitudeFloat']} + - C{self.FIXERS['longitudeFloat']} + """ + sentenceDataKey = sentenceDataKey or coordinate + sign = self._getHemisphereSign(coordinate) + self._sentenceData[sentenceDataKey] *= sign + + + COORDINATE_SIGNS = { + 'N': 1, + 'E': 1, + 'S': -1, + 'W': -1 + } + + + def _getHemisphereSign(self, coordinate): + """ + Returns the hemisphere sign for a given coordinate. + + @param coordinate: Coordinate type (latitude, longitude or + magneticVariation). + @type coordinate: C{str} + """ + if coordinate in ('latitude', 'longitude'): + hemisphereKey = coordinate + 'Hemisphere' + else: + hemisphereKey = coordinate + 'Direction' + + hemisphere = getattr(self.currentSentence, hemisphereKey) + + try: + return self.COORDINATE_SIGNS[hemisphere.upper()] + except KeyError: + raise ValueError("bad hemisphere/direction: %s" % hemisphere) + + + def _convert(self, sourceKey, converter=float, destinationKey=None): + """ + A simple conversion fix. + """ + currentValue = getattr(self.currentSentence, sourceKey) + + if destinationKey is None: + destinationKey = sourceKey + + self._sentenceData[destinationKey] = converter(currentValue) + + + + STATEFUL_UPDATE = { + # sentenceKey: (stateKey, factory, attributeName, converter), + 'trueHeading': + ('heading', base.Heading, 'heading', float), + 'magneticVariation': + ('heading', base.Heading, 'variation', float), + + 'horizontalDilutionOfPrecision': + ('positioningError', base.PositioningError, 'hdop', float), + 'verticalDilutionOfPrecision': + ('positioningError', base.PositioningError, 'vdop', float), + 'positionDilutionOfPrecision': + ('positioningError', base.PositioningError, 'pdop', float), + + } + + + def _statefulUpdate(self, sentenceKey): + """ + Does a stateful update of a particular positioning attribute. + + @param stateKey: The name of the key in the sentence attributes, the + adapter state, and the NMEAAdapter.STATEFUL_UPDATE dict. + @type stateKey: C{str} + """ + stateKey, factory, attributeName, converter \ + = self.STATEFUL_UPDATE[sentenceKey] + + if stateKey not in self._sentenceData: + self._sentenceData[stateKey] = self._state.get(stateKey, factory()) + + newValue = converter(getattr(self.currentSentence, sentenceKey)) + setattr(self._sentenceData[stateKey], attributeName, newValue) + + + ACCEPTABLE_UNITS = frozenset(['M']) + UNIT_CONVERTERS = { + 'N': lambda inKnots: base.Speed(float(inKnots) * base.MPS_PER_KNOT), + 'K': lambda inKPH: base.Speed(float(inKPH) * base.MPS_PER_KPH), + } + + + def _fixUnits(self, unitKey=None, valueKey=None, sourceKey=None, unit=None): + """ + Fixes the units of a certain value. + + At least one of C{unit}, C{unitKey} must be provided. + + If the C{valueKey} is not provided, will attempt to strip "Units" from + the C{unitKey}. + + If the C{sourceKey} is not provided. will store the new data in the + same place as the C{valueKey}. + """ + unit = unit or getattr(self.currentSentence, unitKey) + valueKey = valueKey or unitKey.strip('Units') + sourceKey = sourceKey or valueKey + + if unit not in self.ACCEPTABLE_UNITS: + converter = self.UNIT_CONVERTERS[unit] + currentValue = getattr(self.currentSentence, sourceKey) + self._sentenceData[valueKey] = converter(currentValue) + + + GSV_KEYS = "satellitePRN", "azimuth", "elevation", "signalToNoiseRatio" + + + def _fixGSV(self): + """ + Parses partial visible satellite information from a GSV sentence. + """ + # To anyone who knows NMEA, this method's name should raise a chuckle's + # worth of schadenfreude. 'Fix' GSV? Hah! Ludicrous. + self._sentenceData['_partialBeaconInformation'] = base.BeaconInformation() + + for index in range(4): + prn, azimuth, elevation, snr = \ + [getattr(self.currentSentence, "%s_%s" % (gsvKey, index)) + for gsvKey in self.GSV_KEYS] + + if prn is None or snr is None: + continue # continue not break, to accomodate for some bad gpses + + satellite = base.Satellite(prn, azimuth, elevation, snr) + self._sentenceData['_partialBeaconInformation'].beacons.add(satellite) + + + def _fixBeacons(self, predicate): + """ + TODO: document + + @param predicate: One of C{"seen"} or C{"used"}. + @type predicate: C{str} + """ + ## TODO: refactor mercilessly to a statefulUpdate + if "beaconInformation" not in self._state: + self._state["beaconInformation"] = base.BeaconInformation() + + # TODO: Write tests for this!!! -- lvh + informationKey = predicate.lower() + sentenceKey = "beacons" + predicate.title() + + setattr(self._state["beaconInformation"], informationKey, + int(getattr(self.currentSentence, sentenceKey))) + + + def _fixGSA(self): + """ + Extracts the information regarding which satellites were used in + obtaining the GPS fix from a GSA sentence. + + @pre: A GSA sentence was fired. + @post: The current sentence data (C{self._sentenceData} will contain a + set of the currently used PRNs (under the key C{_usedPRNs}. + """ + self._sentenceData['_usedPRNs'] = set() + for key in ("usedSatellitePRN_%d" % x for x in range(12)): + prn = getattr(self.currentSentence, key, None) + if prn is not None: + self._sentenceData['_usedPRNs'].add(int(prn)) + + + + SENTENCE_INVALIDITY = { + "RMC": RMC_INVALID, + "GGA": GGA_INVALID, + "GLL": GLL_INVALID, + } + + + def _validate(self, sentenceType): + """ + Tests if a sentence contains a valid fix. + + Some sentences (GGA, RMC...) contain information on the validity of the + fix. + """ + invalidValue = self.SENTENCE_INVALIDITY[sentenceType] + thisValue = getattr(self.currentSentence, "valid" + sentenceType) + + if thisValue == invalidValue: + raise base.InvalidSentence("bad %s validity" % sentenceType) + + + SPECIFIC_SENTENCE_FIXES = { + 'GPGSV': _fixGSV, + 'GPGSA': _fixGSA, + } + + + def _sentenceSpecificFix(self): + """ + Executes a fix for a specific type of sentence. + """ + fixer = self.SPECIFIC_SENTENCE_FIXES.get(self.currentSentence.type) + if fixer is not None: + fixer(self) + + + FIXERS = { + 'type': + lambda self: self._sentenceSpecificFix(), + + 'timestamp': + lambda self: self._fixTimestamp(), + 'datestamp': + lambda self: self._fixDatestamp(), + + 'latitudeFloat': + lambda self: self._fixCoordinateFloat(coordinate='latitude'), + 'latitudeHemisphere': + lambda self: self._fixHemisphereSign('latitude'), + 'longitudeFloat': + lambda self: self._fixCoordinateFloat(coordinate='longitude'), + 'longitudeHemisphere': + lambda self: self._fixHemisphereSign('longitude'), + + 'altitude': + lambda self: self._convert('altitude', + converter=base.Altitude), + 'altitudeUnits': + lambda self: self._fixUnits(unitKey='altitudeUnits'), + + 'heightOfGeoidAboveWGS84': + lambda self: self._convert('heightOfGeoidAboveWGS84', + converter=base.Altitude), + 'heightOfGeoidAboveWGS84Units': + lambda self: self._fixUnits( + unitKey='heightOfGeoidAboveWGS84Units'), + + 'trueHeading': + lambda self: self._statefulUpdate('trueHeading'), + 'magneticVariation': + lambda self: self._statefulUpdate('magneticVariation'), + + 'magneticVariationDirection': + lambda self: self._fixHemisphereSign('magneticVariation', + 'heading'), + + 'speedInKnots': + lambda self: self._fixUnits(valueKey='speed', + sourceKey='speedInKnots', + unit='N'), + + 'validGGA': + lambda self: self._validate('GGA'), + 'validRMC': + lambda self: self._validate('RMC'), + 'validGLL': + lambda self: self._validate('GLL'), + + 'numberOfBeaconsUsed': + lambda self: self._fixBeacons('used'), + 'numberOfBeaconsSeen': + lambda self: self._fixBeacons('seen'), + + 'positionDilutionOfPrecision': + lambda self: self._statefulUpdate('positionDilutionOfPrecision'), + 'horizontalDilutionOfPrecision': + lambda self: self._statefulUpdate('horizontalDilutionOfPrecision'), + 'verticalDilutionOfPrecision': + lambda self: self._statefulUpdate('verticalDilutionOfPrecision'), + } + + + def clear(self): + """ + Resets this adapter. + + This will empty the adapter state and the current sentence data. + """ + self._state = {} + self._sentenceData = {} + + + def sentenceReceived(self, sentence): + """ + Called when a sentence is received. + + Will clean the received NMEAProtocol sentence up, and then update the + adapter's state, followed by firing the callbacks. + + If the received sentence was invalid, the state will be cleared. + + @param sentence: The sentence that is received. + @type sentence: C{NMEASentence} + """ + self.currentSentence = sentence + + try: + self._cleanCurrentSentence() + self._updateSentence() + self._fireSentenceCallbacks() + except base.InvalidSentence: + self.clear() + + + def _cleanCurrentSentence(self): + """ + Cleans the current sentence. + """ + for key in sorted(self.currentSentence.keys): + fixer = self.FIXERS.get(key, None) + + if fixer is not None: + fixer(self) + + + def _updateSentence(self): + """ + Updates the current state with the new information from the sentence. + """ + self._updateBeaconInformation() + self._state.update(self._sentenceData) + + + def _updateBeaconInformation(self): + """ + Updates existing beacon information state with new data. + """ + new = self._sentenceData.get('_partialBeaconInformation') + if new is None: + return + + usedPRNs = self._state.get('_usedPRNs') \ + or self._sentenceData.get('_usedPRNs') + if usedPRNs is not None: + for beacon in new.beacons: + beacon.isUsed = (beacon.identifier in usedPRNs) + + old = self._state.get('_partialBeaconInformation') + if old is not None: + new.beacons.update(old.beacons) + + if self.currentSentence._isLastGSVSentence(): + del self._state['_partialBeaconInformation'] + bi = self._sentenceData.pop('_partialBeaconInformation') + self._sentenceData['beaconInformation'] = bi + + + def _fireSentenceCallbacks(self): + """ + Fires sentence callbacks for the current sentence. + + A callback will only fire if all of the keys it requires are present in + the current state and at least one such field was altered in the + current sentence. + + The callbacks will only be fired with data from C{self._state}. + """ + for callbackName, requiredFields in self.REQUIRED_CALLBACK_FIELDS.items(): + callback = getattr(self._receiver, callbackName, None) + + if callback is None: + continue + + kwargs = {} + atLeastOnePresentInSentence = False + + try: + for field in requiredFields: + if field in self._sentenceData: + atLeastOnePresentInSentence = True + kwargs[field] = self._state[field] + except KeyError: + continue + + if atLeastOnePresentInSentence: + callback(**kwargs) + + +NMEAAdapter.REQUIRED_CALLBACK_FIELDS = dict((name, method.positional) + for name, method + in ipositioning.IPositioningReceiver.namesAndDescriptions()) === added directory 'twisted/positioning/test' === added file 'twisted/positioning/test/__init__.py' === added file 'twisted/positioning/test/test_nmea.py' --- twisted/positioning/test/test_nmea.py 1970-01-01 00:00:00 +0000 +++ twisted/positioning/test/test_nmea.py 2009-08-30 11:18:53 +0000 @@ -0,0 +1,665 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2009 Twisted Matrix Laboratories. +# See LICENSE for details. +""" +Test cases for using NMEA sentences. +""" +import datetime +from zope.interface import implements + +from twisted.positioning import base, nmea, ipositioning +from twisted.trial.unittest import TestCase + +class NMEATestReceiver(object): + implements(ipositioning.INMEAReceiver) + + def __init__(self): + self.clear() + + + def clear(self): + self.receivedSentence = None + + + def sentenceReceived(self, sentence): + self.receivedSentence = sentence + + + +class CallbackTestNMEAProtocol(nmea.NMEAProtocol): + """ + A class that tests that the correct callbacks are called. + """ + def __init__(self): + nmea.NMEAProtocol.__init__(self, None) + + for sentenceType in nmea.NMEAProtocol.SENTENCE_CONTENTS: + self._createCallback(sentenceType) + + self.clear() + + + def clear(self): + self.sentenceReceived = None + self.called = {} + + + SENTENCE_TYPES = [x for x in nmea.NMEAProtocol.SENTENCE_CONTENTS] + + def _createCallback(self, sentenceType): + """ + Creates a callback for an NMEA sentence. + """ + def callback(sentence): + self.sentenceReceived = sentence + self.called[sentenceType] = True + + setattr(self, "nmea_" + sentenceType, callback) + + + +class NMEATests(TestCase): + def setUp(self): + self.callbackProtocol = CallbackTestNMEAProtocol() + + self.receiver = NMEATestReceiver() + self.receiverProtocol = nmea.NMEAProtocol(self.receiver) + + + def test_callbacksCalled(self): + """ + Tests that the correct callbacks fire, and that *only* those fire. + """ + sentencesByType = { + "GPGGA": [ + "$GPGGA*56", + ], + + "GPRMC": [ + "$GPRMC*4b", + ], + + "GPGSV": [ + "$GPGSV*55", + ], + + "GPGLL": [ + "$GPGLL*50", + ], + + "GPHDT": [ + "$GPHDT*4f", + ], + + "GPGSA": [ + "$GPGSA*42", + ], + + } + + for calledSentenceType in sentencesByType: + for sentence in sentencesByType[calledSentenceType]: + self.callbackProtocol.lineReceived(sentence) + called = self.callbackProtocol.called + + for sentenceType in CallbackTestNMEAProtocol.SENTENCE_TYPES: + self.assertEquals(sentenceType == calledSentenceType, + called.get(sentenceType, False)) + + self.callbackProtocol.clear() + + self.receiverProtocol.lineReceived(sentence) + self.assertTrue(self.receiver.receivedSentence) + + + + def test_validateGoodChecksum(self): + """ + Tests checkum validation for good or missing checksums. + """ + sentences = [ + '$GPGGA,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,*47', + '$GPGGA,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,*', + ] + + for sentence in sentences: + nmea.NMEAProtocol.validateChecksum(sentence) + + + def test_validateBadChecksum(self): + """ + Tests checksum validation on bad checksums. + """ + sentences = [ + '$GPGGA,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,*46', + ] + + for sentence in sentences: + self.assertRaises(base.InvalidChecksum, + nmea.NMEAProtocol.validateChecksum, sentence) + + + def test_GSVFirstSequence(self): + """ + Tests if the last sentence in a GSV sequence is correctly identified. + """ + string = '$GPGSV,3,1,11,22,42,067,42,24,14,311,43,27,05,244,00,,,,*4F' + self.callbackProtocol.lineReceived(string) + sentence = self.callbackProtocol.sentenceReceived + + self.assertTrue(sentence._isFirstGSVSentence()) + self.assertFalse(sentence._isLastGSVSentence()) + + + def test_GSVLastSentence(self): + """ + Tests if the last sentence in a GSV sequence is correctly identified. + """ + string = '$GPGSV,3,3,11,22,42,067,42,24,14,311,43,27,05,244,00,,,,*4D' + self.callbackProtocol.lineReceived(string) + sentence = self.callbackProtocol.sentenceReceived + + self.assertFalse(sentence._isFirstGSVSentence()) + self.assertTrue(sentence._isLastGSVSentence()) + + + def test_parsing(self): + """ + Tests the parsing of a few sentences. + """ + sentences = { + # Full GPRMC sentence. + '$GPRMC,123519,A,4807.038,N,01131.000,E,022.4,084.4,230394,003.1,W*6A': { + 'type': 'GPRMC', + 'latitudeFloat': '4807.038', + 'latitudeHemisphere': 'N', + 'longitudeFloat': '01131.000', + 'longitudeHemisphere': 'E', + 'magneticVariation': '003.1', + 'magneticVariationDirection': 'W', + 'speedInKnots': '022.4', + 'timestamp': '123519', + 'datestamp': '230394', + 'trueHeading': '084.4', + 'validRMC': 'A', + }, + + # Full GPGGA sentence. + '$GPGGA,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,*47': { + 'type': 'GPGGA', + + 'altitude': '545.4', + 'altitudeUnits': 'M', + 'heightOfGeoidAboveWGS84': '46.9', + 'heightOfGeoidAboveWGS84Units': 'M', + + 'horizontalDilutionOfPrecision': '0.9', + + 'latitudeFloat': '4807.038', + 'latitudeHemisphere': 'N', + 'longitudeFloat': '01131.000', + 'longitudeHemisphere': 'E', + + 'numberOfSatellitesSeen': '08', + 'timestamp': '123519', + 'validGGA': '1', + }, + + # Partial GPGLL sentence. + '$GPGLL,3751.65,S,14507.36,E*77': { + 'type': 'GPGLL', + + 'latitudeFloat': '3751.65', + 'latitudeHemisphere': 'S', + 'longitudeFloat': '14507.36', + 'longitudeHemisphere': 'E', + }, + + # Full GPGLL sentence. + '$GPGLL,4916.45,N,12311.12,W,225444,A*31': { + 'type': 'GPGLL', + + 'latitudeFloat': '4916.45', + 'latitudeHemisphere': 'N', + 'longitudeFloat': '12311.12', + 'longitudeHemisphere': 'W', + + 'timestamp': '225444', + 'validGLL': 'A', + }, + + # Full GPGSV sentence. + '$GPGSV,3,1,11,03,03,111,00,04,15,270,00,06,01,010,00,13,06,292,00*74': { + 'type': 'GPGSV', + 'GSVSentenceIndex': '1', + 'numberOfGSVSentences': '3', + 'numberOfSatellitesSeen': '11', + + 'azimuth_0': '111', + 'azimuth_1': '270', + 'azimuth_2': '010', + 'azimuth_3': '292', + + 'elevation_0': '03', + 'elevation_1': '15', + 'elevation_2': '01', + 'elevation_3': '06', + + 'satellitePRN_0': '03', + 'satellitePRN_1': '04', + 'satellitePRN_2': '06', + 'satellitePRN_3': '13', + + 'signalToNoiseRatio_0': '00', + 'signalToNoiseRatio_1': '00', + 'signalToNoiseRatio_2': '00', + 'signalToNoiseRatio_3': '00', + }, + + # Partially empty GSV sentence support. + '$GPGSV,3,3,11,22,42,067,42,24,14,311,43,27,05,244,00,,,,*4D': { + 'type': 'GPGSV', + 'GSVSentenceIndex': '3', + 'numberOfGSVSentences': '3', + 'numberOfSatellitesSeen': '11', + + 'azimuth_0': '067', + 'azimuth_1': '311', + 'azimuth_2': '244', + + 'elevation_0': '42', + 'elevation_1': '14', + 'elevation_2': '05', + + 'satellitePRN_0': '22', + 'satellitePRN_1': '24', + 'satellitePRN_2': '27', + + 'signalToNoiseRatio_0': '42', + 'signalToNoiseRatio_1': '43', + 'signalToNoiseRatio_2': '00', + }, + + # Full HDT sentence. + '$GPHDT,038.005,T*3B': { + 'type': 'GPHDT', + 'trueHeading': '038.005', + }, + + # Full TRG sentence. + # TODO: fill + + # Typical GPGSA sentence. + '$GPGSA,A,3,19,28,14,18,27,22,31,39,,,,,1.7,1.0,1.3*34': { + 'type': 'GPGSA', + + 'usedSatellitePRN_0': '19', + 'usedSatellitePRN_1': '28', + 'usedSatellitePRN_2': '14', + 'usedSatellitePRN_3': '18', + 'usedSatellitePRN_4': '27', + 'usedSatellitePRN_5': '22', + 'usedSatellitePRN_6': '31', + 'usedSatellitePRN_7': '39', + + 'positionDilutionOfPrecision': '1.7', + 'horizontalDilutionOfPrecision': '1.0', + 'verticalDilutionOfPrecision': '1.3', + }, + + + } + + for sentence, expected in sentences.iteritems(): + self.callbackProtocol.lineReceived(sentence) + received = self.callbackProtocol.sentenceReceived + self.assertEquals(expected, received._sentenceData) + self.callbackProtocol.clear() + + + +class NMEAAdapterConverterTests(TestCase): + """ + Tests for the converters on an NMEA adapter. + """ + def setUp(self): + self.adapter = nmea.NMEAAdapter(None) + + + def test_fixTimestamp(self): + self._genericFixerTest( + {'timestamp': '123456'}, # 12:34:56Z + {'time': datetime.time(12, 34, 56)}) + + + def test_fixBrokenTimestamp(self): + self._genericFixerRaisingTest( + {'timestamp': '993456'}, ValueError) + self._genericFixerRaisingTest( + {'timestamp': '129956'}, ValueError) + self._genericFixerRaisingTest( + {'timestamp': '123499'}, ValueError) + + + def test_fixDatestamp_intelligent(self): + self._genericFixerTest( + {'datestamp': '010199'}, + {'date': datetime.date(1999, 1, 1)}) + + self._genericFixerTest( + {'datestamp': '010109'}, + {'date': datetime.date(2009, 1, 1)}) + + + def test_fixDatestamp_19xx(self): + self.adapter.DATESTAMP_HANDLING = '19xx' + + self._genericFixerTest( + {'datestamp': '010199'}, + {'date': datetime.date(1999, 1, 1)}) + + self._genericFixerTest( + {'datestamp': '010109'}, + {'date': datetime.date(1909, 1, 1)}) + + + def test_fixDatestamp_20xx(self): + self.adapter.DATESTAMP_HANDLING = '20xx' + + self._genericFixerTest( + {'datestamp': '010199'}, + {'date': datetime.date(2099, 1, 1)}) + + self._genericFixerTest( + {'datestamp': '010109'}, + {'date': datetime.date(2009, 1, 1)}) + + + def test_fixBrokenDatestamp(self): + self._genericFixerRaisingTest({'datestamp': '123456'}, ValueError) + + + def test_coordinate_north(self): + self._genericFixerTest( + {'latitudeFloat': '1030.000', 'latitudeHemisphere': 'N'}, + {'latitude': base.Coordinate(10+30.000/60, 'latitude')}) + + + def test_coordinate_south(self): + self._genericFixerTest( + {'latitudeFloat': '4512.145', 'latitudeHemisphere': 'S'}, + {'latitude': base.Coordinate(-(45 + 12.145/60), 'latitude')}) + + + def test_coordinate_east(self): + self._genericFixerTest( + {'longitudeFloat': '5331.513', 'longitudeHemisphere': 'E'}, + {'longitude': base.Coordinate(53 + 31.513/60, 'longitude')}) + + + def test_coordinate_west(self): + self._genericFixerTest( + {'longitudeFloat': '1245.120', 'longitudeHemisphere': 'W'}, + {'longitude': base.Coordinate(-(12 + 45.12/60), 'longitude')}) + + + def test_fixHemisphereSignBadHemispheres(self): + self._genericFixerRaisingTest({'longitudeHemisphere': 'Q'}, + ValueError) + + + def test_fixAltitude(self): + self._genericFixerTest( + {'altitude': '545.4'}, + {'altitude': base.Altitude(545.4)}) + + self._genericFixerTest( + {'heightOfGeoidAboveWGS84': '46.9'}, + {'heightOfGeoidAboveWGS84': base.Altitude(46.9)}) + + + def test_validation(self): + """ + Tests validation of sentences. + + Invalid sentences will cause the state to be cleared. The altitude is + added so we have junk data (that will hopefully be removed, since the + GPS is telling us that this data is invalid). + """ + self._genericFixerTest( + {'altitude': '545.4', 'validGGA': nmea.GGA_GPS_FIX}, + {'altitude': base.Altitude(545.4)}) + self._genericFixerTest( + {'altitude': '545.4', 'validGGA': nmea.GGA_INVALID}, + {}) + + self._genericFixerTest( + {'altitude': '545.4', 'validGLL': nmea.GLL_VALID}, + {'altitude': base.Altitude(545.4)}) + self._genericFixerTest( + {'altitude': '545.4', 'validGLL': nmea.GLL_INVALID}, + {}) + + + def test_speedInKnots(self): + self._genericFixerTest( + {'speedInKnots': '10'}, + {'speed': base.Speed(10 * base.MPS_PER_KNOT)}) + + + def test_magneticVariation_west(self): + self._genericFixerTest( + {'magneticVariation': '1.34', 'magneticVariationDirection': 'W'}, + {'heading': base.Heading(variation=-1.34)}) + + + def test_magneticVariation_east(self): + self._genericFixerTest( + {'magneticVariation': '1.34', 'magneticVariationDirection': 'E'}, + {'heading': base.Heading(variation=1.34)}) + + + def test_headingPlusMagneticVariation(self): + + self._genericFixerTest( + {'trueHeading': '123.12', + 'magneticVariation': '1.34', 'magneticVariationDirection': 'E'}, + {'heading': base.Heading(123.12, variation=1.34)}) + + + def test_positioningError(self): + self._genericFixerTest( + {'horizontalDilutionOfPrecision': '11'}, + {'positioningError': base.PositioningError(hdop=11.)}) + + + def test_positioningError_mixing(self): + self._genericFixerTest( + {'positionDilutionOfPrecision': '1', + 'horizontalDilutionOfPrecision': '1', + 'verticalDilutionOfPrecision': '1'}, + {'positioningError': base.PositioningError(pdop=1., hdop=1., vdop=1.)}) + + + def _genericFixerTest(self, sentenceData, expected): + sentence = nmea.NMEASentence(sentenceData) + self.adapter.sentenceReceived(sentence) + self.assertEquals(self.adapter._state, expected) + self.adapter.clear() + + + def _genericFixerRaisingTest(self, sentenceData, exceptionClass): + sentence = nmea.NMEASentence(sentenceData) + self.assertRaises(exceptionClass, + self.adapter.sentenceReceived, sentence) + self.adapter.clear() + + +class MockNMEAReceiver(base.BasePositioningReceiver): + """ + A mock NMEA receiver. + """ + def __init__(self): + self.clear() + + for methodName in ipositioning.IPositioningReceiver: + setattr(self, methodName, self._callbackForName(methodName)) + + + def clear(self): + self.called = {} + + + def _callbackForName(self, name): + def setter(**_): + self.called[name] = True + return setter + + + +class NMEAReceiverTest(TestCase): + """ + Tests for NMEAReceivers. + """ + def setUp(self): + self.receiver = MockNMEAReceiver() + self.adapter = nmea.NMEAAdapter(self.receiver) + self.protocol = nmea.NMEAProtocol(self.adapter) + + + def test_positioningErrorUpdateAcrossStates(self): + """ + Tests that the positioning error is updated across multiple states. + """ + sentences = [ + '$GPGSA,A,3,19,28,14,18,27,22,31,39,,,,,1.7,1.0,1.3*34', + '$GPGSV,3,1,11,19,03,111,00,04,15,270,00,06,01,010,00,31,06,292,00*7f', + '$GPGSV,3,2,11,28,25,170,00,14,57,208,39,18,67,296,40,39,40,246,00*7b', + '$GPGSV,3,3,11,22,42,067,42,27,14,311,43,27,05,244,00,,,,*4e', + ] + callbacksFired = set(['positioningErrorReceived', + 'beaconInformationReceived']) + + self._genericSentenceReceivedTest(sentences, callbacksFired) + + + def test_GGASentences(self): + sentences = [ + '$GPGGA,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,*47', + ] + callbacksFired = set(['positionReceived', + 'positioningErrorReceived', + 'altitudeReceived', + 'timeReceived']) + self._genericSentenceReceivedTest(sentences, callbacksFired) + + + def test_RMCSentences(self): + sentences = [ + '$GPRMC,123519,A,4807.038,N,01131.000,E,022.4,084.4,230394,003.1,W*6A', + ] + callbacksFired = set(['headingReceived', + 'speedReceived', + 'positionReceived', + 'timeReceived']) + + self._genericSentenceReceivedTest(sentences, callbacksFired) + + + def test_GSVSentences_incompleteSequence(self): + """ + Verifies that an incomplete sequence of GSV sentences does not fire. + """ + sentences=[ + '$GPGSV,3,1,11,03,03,111,00,04,15,270,00,06,01,010,00,13,06,292,00*74', + ] + + self._genericSentenceReceivedTest(sentences, set([])) + + + def test_GSVSentences(self): + """ + Verifies that a complete sequence of GSV sentences fires. + """ + sentences=[ + '$GPGSV,3,1,11,03,03,111,00,04,15,270,00,06,01,010,00,13,06,292,00*74', + '$GPGSV,3,2,11,14,25,170,00,16,57,208,39,18,67,296,40,19,40,246,00*74', + '$GPGSV,3,3,11,22,42,067,42,24,14,311,43,27,05,244,00,,,,*4D', + ] + + callbacksFired = set(['beaconInformationReceived']) + + lambda self: self.assertNotIn( + '_partialBeaconInformation', self.adapter._state) + + + self._genericSentenceReceivedTest(sentences, callbacksFired, + beforeClearCondition=lambda self: self.assertNotIn( + '_partialBeaconInformation', self.adapter._state)) + + + + def test_GLLSentences(self): + sentences=[ + '$GPGLL,3751.65,S,14507.36,E*77', + '$GPGLL,4916.45,N,12311.12,W,225444,A*31', + ] + callbacksFired = set(['positionReceived', 'timeReceived']) + + self._genericSentenceReceivedTest(sentences, callbacksFired) + + + def test_HDTSentences(self): + sentences=[ + "$GPHDT,038.005,T*3B", + ] + callbacksFired = set(['headingReceived']) + + self._genericSentenceReceivedTest(sentences, callbacksFired) + + + def test_mixedSentences(self): + sentences = [ + '$GPRMC,123519,A,4807.038,N,01131.000,E,022.4,084.4,230394,003.1,W*6A', + '$GPGGA,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,*47', + ] + callbacksFired = set(['altitudeReceived', + 'speedReceived', + 'positionReceived', + 'positioningErrorReceived', + 'timeReceived', + 'headingReceived']) + self._genericSentenceReceivedTest(sentences, callbacksFired) + + + def test_mixesSentences_withBeaconInformationAndVisibility(self): + sentences = [ + '$GPGSA,A,3,16,4,13,18,27,22,31,39,,,,,1.7,1.0,1.3*', + '$GPGSV,3,1,11,03,03,111,00,04,15,270,00,06,01,010,00,13,06,292,00*74', + '$GPGSV,3,2,11,14,25,170,00,16,57,208,39,18,67,296,40,19,40,246,00*74', + '$GPGSV,3,3,11,22,42,067,42,24,14,311,43,27,05,244,00,,,,*4D', + '$GPRMC,123519,A,4807.038,N,01131.000,E,022.4,084.4,230394,003.1,W*6A', + '$GPGGA,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,*47', + '$GPGLL,4916.45,N,12311.12,W,225444,A*31', + ] + callbacksFired = set(['headingReceived', + 'beaconInformationReceived', + 'speedReceived', + 'positionReceived', + 'timeReceived', + 'altitudeReceived', + 'positioningErrorReceived']) + self._genericSentenceReceivedTest(sentences, callbacksFired) + + + def _genericSentenceReceivedTest(self, sentences, callbacksFired, + beforeClearCondition=lambda _: None): + for sentence in sentences: + self.protocol.lineReceived(sentence) + + self.assertEquals(set(self.receiver.called.keys()), callbacksFired) + + beforeClearCondition(self) + + self.receiver.clear() + self.adapter.clear()