#!/usr/bin/env python3 import cv2 import depthai as dai from collections import deque class FeatureTrackerDrawer: lineColor = (200, 0, 200) pointColor = (0, 0, 255) circleRadius = 2 maxTrackedFeaturesPathLength = 30 # for how many frames the feature is tracked trackedFeaturesPathLength = 10 trackedIDs = None trackedFeaturesPath = None def onTrackBar(self, val): FeatureTrackerDrawer.trackedFeaturesPathLength = val pass def trackFeaturePath(self, features): newTrackedIDs = set() for currentFeature in features: currentID = currentFeature.id newTrackedIDs.add(currentID) if currentID not in self.trackedFeaturesPath: self.trackedFeaturesPath[currentID] = deque() path = self.trackedFeaturesPath[currentID] path.append(currentFeature.position) while(len(path) > max(1, FeatureTrackerDrawer.trackedFeaturesPathLength)): path.popleft() self.trackedFeaturesPath[currentID] = path featuresToRemove = set() for oldId in self.trackedIDs: if oldId not in newTrackedIDs: featuresToRemove.add(oldId) for id in featuresToRemove: self.trackedFeaturesPath.pop(id) self.trackedIDs = newTrackedIDs def drawFeatures(self, img): cv2.setTrackbarPos(self.trackbarName, self.windowName, FeatureTrackerDrawer.trackedFeaturesPathLength) for featurePath in self.trackedFeaturesPath.values(): path = featurePath for j in range(len(path) - 1): src = (int(path[j].x), int(path[j].y)) dst = (int(path[j + 1].x), int(path[j + 1].y)) cv2.line(img, src, dst, self.lineColor, 1, cv2.LINE_AA, 0) j = len(path) - 1 cv2.circle(img, (int(path[j].x), int(path[j].y)), self.circleRadius, self.pointColor, -1, cv2.LINE_AA, 0) def __init__(self, trackbarName, windowName): self.trackbarName = trackbarName self.windowName = windowName cv2.namedWindow(windowName) cv2.createTrackbar(trackbarName, windowName, FeatureTrackerDrawer.trackedFeaturesPathLength, FeatureTrackerDrawer.maxTrackedFeaturesPathLength, self.onTrackBar) self.trackedIDs = set() self.trackedFeaturesPath = dict() # Create pipeline pipeline = dai.Pipeline() # Define sources and outputs monoLeft = pipeline.create(dai.node.MonoCamera) monoRight = pipeline.create(dai.node.MonoCamera) featureTrackerLeft = pipeline.create(dai.node.FeatureTracker) featureTrackerRight = pipeline.create(dai.node.FeatureTracker) xoutPassthroughFrameLeft = pipeline.create(dai.node.XLinkOut) xoutTrackedFeaturesLeft = pipeline.create(dai.node.XLinkOut) xoutPassthroughFrameRight = pipeline.create(dai.node.XLinkOut) xoutTrackedFeaturesRight = pipeline.create(dai.node.XLinkOut) xinTrackedFeaturesConfig = pipeline.create(dai.node.XLinkIn) xoutPassthroughFrameLeft.setStreamName("passthroughFrameLeft") xoutTrackedFeaturesLeft.setStreamName("trackedFeaturesLeft") xoutPassthroughFrameRight.setStreamName("passthroughFrameRight") xoutTrackedFeaturesRight.setStreamName("trackedFeaturesRight") xinTrackedFeaturesConfig.setStreamName("trackedFeaturesConfig") # Properties monoLeft.setResolution(dai.MonoCameraProperties.SensorResolution.THE_720_P) monoLeft.setCamera("left") monoRight.setResolution(dai.MonoCameraProperties.SensorResolution.THE_720_P) monoRight.setCamera("right") # Linking monoLeft.out.link(featureTrackerLeft.inputImage) featureTrackerLeft.passthroughInputImage.link(xoutPassthroughFrameLeft.input) featureTrackerLeft.outputFeatures.link(xoutTrackedFeaturesLeft.input) xinTrackedFeaturesConfig.out.link(featureTrackerLeft.inputConfig) monoRight.out.link(featureTrackerRight.inputImage) featureTrackerRight.passthroughInputImage.link(xoutPassthroughFrameRight.input) featureTrackerRight.outputFeatures.link(xoutTrackedFeaturesRight.input) xinTrackedFeaturesConfig.out.link(featureTrackerRight.inputConfig) # By default the least mount of resources are allocated # increasing it improves performance numShaves = 2 numMemorySlices = 2 featureTrackerLeft.setHardwareResources(numShaves, numMemorySlices) featureTrackerRight.setHardwareResources(numShaves, numMemorySlices) featureTrackerConfig = featureTrackerRight.initialConfig.get() print("Press 's' to switch between Lucas-Kanade optical flow and hardware accelerated motion estimation!") # Connect to device and start pipeline with dai.Device(pipeline) as device: # Output queues used to receive the results passthroughImageLeftQueue = device.getOutputQueue("passthroughFrameLeft", 8, False) outputFeaturesLeftQueue = device.getOutputQueue("trackedFeaturesLeft", 8, False) passthroughImageRightQueue = device.getOutputQueue("passthroughFrameRight", 8, False) outputFeaturesRightQueue = device.getOutputQueue("trackedFeaturesRight", 8, False) inputFeatureTrackerConfigQueue = device.getInputQueue("trackedFeaturesConfig") leftWindowName = "left" leftFeatureDrawer = FeatureTrackerDrawer("Feature tracking duration (frames)", leftWindowName) rightWindowName = "right" rightFeatureDrawer = FeatureTrackerDrawer("Feature tracking duration (frames)", rightWindowName) while True: inPassthroughFrameLeft = passthroughImageLeftQueue.get() passthroughFrameLeft = inPassthroughFrameLeft.getFrame() leftFrame = cv2.cvtColor(passthroughFrameLeft, cv2.COLOR_GRAY2BGR) inPassthroughFrameRight = passthroughImageRightQueue.get() passthroughFrameRight = inPassthroughFrameRight.getFrame() rightFrame = cv2.cvtColor(passthroughFrameRight, cv2.COLOR_GRAY2BGR) trackedFeaturesLeft = outputFeaturesLeftQueue.get().trackedFeatures leftFeatureDrawer.trackFeaturePath(trackedFeaturesLeft) leftFeatureDrawer.drawFeatures(leftFrame) trackedFeaturesRight = outputFeaturesRightQueue.get().trackedFeatures rightFeatureDrawer.trackFeaturePath(trackedFeaturesRight) rightFeatureDrawer.drawFeatures(rightFrame) # Show the frame cv2.imshow(leftWindowName, leftFrame) cv2.imshow(rightWindowName, rightFrame) key = cv2.waitKey(1) if key == ord('q'): break elif key == ord('s'): if featureTrackerConfig.motionEstimator.type == dai.FeatureTrackerConfig.MotionEstimator.Type.LUCAS_KANADE_OPTICAL_FLOW: featureTrackerConfig.motionEstimator.type = dai.FeatureTrackerConfig.MotionEstimator.Type.HW_MOTION_ESTIMATION print("Switching to hardware accelerated motion estimation") else: featureTrackerConfig.motionEstimator.type = dai.FeatureTrackerConfig.MotionEstimator.Type.LUCAS_KANADE_OPTICAL_FLOW print("Switching to Lucas-Kanade optical flow") cfg = dai.FeatureTrackerConfig() cfg.set(featureTrackerConfig) inputFeatureTrackerConfigQueue.send(cfg)