namespace my.name.space; // ************************************************************************ // * Copyright (C)2019, International Business Machines Corporation and // * others. All Rights Reserved. // ************************************************************************ use com.ibm.streamsx.datetime::currentTimeMillis ; use com.ibm.streamsx.datetime::TimeMillis ; /** * Stream type for the structured representation of all the raw data items in * a location record, along with the document (query) timestamp. * NOTE: rstring attributes only, to avoid conversion errors. * * @exclude */ type RawVehicleLocation = rstring agency, // agency rstring nextbusId, //vehicle ID TimeMillis lastTime, rstring routeTag, // route ID rstring dirTag, // direction (inbound, outbound, etc.) rstring latitude, // in degrees rstring longitude, // in degrees rstring secsSinceReport, // seconds since last location update rstring predictable, // Is next arrival at a stop predictable? // (convert to boolean before use) rstring heading, // direction of travel, degrees from north rstring speedKmHr ; // speed in km/h /** * Parse the XML containing the vehicle locations * @input RawNextBusData the XML to parse * @output VehicleLocations stream of tuples containing the vehicle locations. */ composite ParseNextBusData(input RawNextBusData; output VehicleLocations ) { param expression $agency ; graph // Extract the structured items from each XML record. (stream RVL ; stream LastUpdate) as ExtractVehicleLocations = XMLParse(RawNextBusData) { logic state : { rstring _agency = $agency ; } param trigger : "/body/vehicle", "/body/lastTime" ; xmlInput : locationXMLDoc ; parsing : permissive ; // Recover from data errors; see below output RVL : /* * Data error handling relies on XPath and permissive parsing. * when a requested attribute is missing, XPath returns an empty * string; a numeric cast would then result in a default value * (0 or 0.0). which may or may not be a legitimate value. * If a requested attribute is present but contains garbage, a * numeric cast may fail altogether, which would make the results * for the entire tuple unpredictable and invalid, or worse. * Therefore, just get string results here and perform any numeric * conversions, with appropriate protections, in the next operator. */ nextbusId = XPath("@id"), routeTag = XPath("@routeTag"), dirTag = XPath("@dirTag"), latitude = XPath("@lat"), longitude = XPath("@lon"), secsSinceReport = XPath("@secsSinceReport"), predictable = XPath("@predictable"), heading = XPath("@heading"), speedKmHr = XPath("@speedKmHr"), agency = _agency ; LastUpdate : lastTime =(TimeMillis) XPath("@time") ; } stream RawVehicleLocations = Custom(RVL ; LastUpdate) { logic state : { mutable boolean seenVehicles = false ; mutable boolean seenTime = false ; mutable TimeMillis lt ; mutable list vehicles = [ ] ; } onTuple RVL : { appendM(vehicles, RawVehicleLocations) ; } onTuple LastUpdate : { lt = lastTime ; } onPunct LastUpdate : { if(currentPunct() != Sys.WindowMarker) return ; seenTime = true ; if(seenTime && seenVehicles) { mutable RawVehicleLocation t = { } ; for(RawVehicleLocation vehicle in vehicles) { assignFrom(t, vehicle) ; t.lastTime = lt ; submit(t, RawVehicleLocations) ; } seenTime = false ; seenVehicles = false ; clearM(vehicles) ; } } onPunct RVL : { if(currentPunct() != Sys.WindowMarker) return ; seenVehicles = true ; if(seenTime && seenVehicles) { mutable RawVehicleLocation t = { } ; for(RawVehicleLocation vehicle in vehicles) { assignFrom(t, vehicle) ; t.lastTime = lt ; submit(t, RawVehicleLocations) ; } seenTime = false ; seenVehicles = false ; clearM(vehicles) ; } } } stream VehicleLocations = Custom(RawVehicleLocations) { logic onTuple RawVehicleLocations: { // First ensure we can parse the numeric data successfully. mutable int64 heading_ = 0l; if (parseNumber(heading_, heading) == -1) return; mutable float64 speedKmHr_ = 0.0; if (parseNumber(speedKmHr_, speedKmHr) == -1) return; mutable float64 latitude_ = 0.0; if (parseNumber(latitude_, latitude) == -1) return; mutable float64 longitude_ = 0.0; if (parseNumber(longitude_, longitude) == -1) return; mutable TimeMillis secsSinceReport_ = 0l; if (parseNumber(secsSinceReport_, secsSinceReport) == -1) return; mutable VehicleLocationType v = {}; assignFrom(v, RawVehicleLocations); v.id = nextbusId + ":" + agency + ":nextbus"; v.latitude = latitude_; v.longitude = longitude_ ; v.reportTime = lastTime - (secsSinceReport_ * 1000l); submit(v, VehicleLocations); } } }