#!/usr/bin/env python3 """ Hybrid Selenium Shop Finder - Combines speed with reliability Extracts basic info from search results, then clicks for details only when needed """ from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.chrome.service import Service from webdriver_manager.chrome import ChromeDriverManager from selenium.common.exceptions import TimeoutException, StaleElementReferenceException import pandas as pd from datetime import datetime import time import re import os from pathlib import Path class HybridShopFinder: def __init__(self, headless=False): """Initialize with automatic driver installation""" print("Setting up Chrome driver...") options = webdriver.ChromeOptions() options.add_argument('--disable-blink-features=AutomationControlled') options.add_experimental_option("excludeSwitches", ["enable-automation"]) options.add_experimental_option('useAutomationExtension', False) if headless: options.add_argument('--headless') options.add_argument('--no-sandbox') options.add_argument('--disable-dev-shm-usage') options.add_argument('--window-size=1920,1080') options.add_argument('--lang=en-US') service = Service(ChromeDriverManager().install()) self.driver = webdriver.Chrome(service=service, options=options) print("✅ Chrome driver ready!") def clean_text(self, text): """Remove special characters and clean text""" if not text: return '' # Remove common Google Maps icons and special characters # These appear as garbled text in CSV cleaned = re.sub(r'[^\x00-\x7F]+', ' ', text) # Remove non-ASCII cleaned = re.sub(r'\s+', ' ', cleaned) # Multiple spaces to single cleaned = cleaned.strip() # Remove common prefixes prefixes_to_remove = [',', 'î ‹', '·', '›', '‹', '»', '«'] for prefix in prefixes_to_remove: cleaned = cleaned.replace(prefix, '').strip() return cleaned def format_phone(self, phone): """Format phone number""" if not phone: return '' # Clean the phone first phone = self.clean_text(phone) digits = re.sub(r'\D', '', phone) if len(digits) == 10: return f"({digits[:3]}) {digits[3:6]}-{digits[6:]}" elif len(digits) == 11 and digits[0] == '1': return f"+1 ({digits[1:4]}) {digits[4:7]}-{digits[7:]}" return phone def search_and_extract(self, search_term, zipcode, max_shops=20): """Search and extract with hybrid approach""" results = [] # Go to Google Maps url = f'https://www.google.com/maps/search/{search_term}+near+{zipcode}' self.driver.get(url) time.sleep(4) # Wait for results try: WebDriverWait(self.driver, 15).until( EC.presence_of_element_located((By.CSS_SELECTOR, '[role="feed"]')) ) time.sleep(2) # Extra wait for results to settle except TimeoutException: print(f"No results found for {search_term}") return results # First, collect all shop links and basic info shop_data = [] scroll_attempts = 0 while scroll_attempts < 3: # Find all shop links shop_elements = self.driver.find_elements(By.CSS_SELECTOR, 'div[role="feed"] a[href*="/maps/place/"]') print(f" Found {len(shop_elements)} shop links") for elem in shop_elements: try: # Get basic info from search results href = elem.get_attribute('href') aria_label = elem.get_attribute('aria-label') or '' # Skip if already processed if any(d['href'] == href for d in shop_data): continue # Get parent container parent = elem.find_element(By.XPATH, './ancestor::div[@jsaction]') parent_text = parent.text shop_info = { 'href': href, 'name': aria_label, 'search_text': parent_text, 'element': elem } shop_data.append(shop_info) except: continue if len(shop_data) >= max_shops: break # Scroll for more try: feed = self.driver.find_element(By.CSS_SELECTOR, '[role="feed"]') self.driver.execute_script("arguments[0].scrollTop = arguments[0].scrollHeight", feed) time.sleep(2) scroll_attempts += 1 except: break print(f" Collected {len(shop_data)} shops to process") # Now click into each shop for detailed info for idx, shop in enumerate(shop_data[:max_shops]): try: print( f" Extracting details for shop {idx + 1}/{min(len(shop_data), max_shops)}: {shop['name'][:30]}...", end='\r') # Re-find the element to avoid stale reference elements = self.driver.find_elements(By.CSS_SELECTOR, f'a[href="{shop["href"]}"]') if not elements: continue # Click the shop elements[0].click() time.sleep(2) # Extract detailed info info = { 'search_term': search_term, 'name': self.clean_text(shop['name']), 'address': '', 'phone': '', 'website': '', 'rating': '', 'num_ratings': 0, 'hours': '' } # Wait for details panel try: WebDriverWait(self.driver, 5).until( EC.presence_of_element_located((By.CSS_SELECTOR, 'div[role="main"]')) ) except: pass # Get all info buttons/divs info_elements = self.driver.find_elements(By.CSS_SELECTOR, 'button[data-item-id], div[data-item-id], a[data-item-id]') for elem in info_elements: try: aria_label = (elem.get_attribute('aria-label') or '').lower() text = elem.text.strip() if not text: continue # Phone if 'phone' in aria_label or 'call' in aria_label: phone_match = re.search(r'[\d\s\-\(\)\+\.]+', text) if phone_match: info['phone'] = self.format_phone(phone_match.group()) # Address elif 'address' in aria_label: info['address'] = self.clean_text(text.replace('\n', ', ')) # Website elif 'website' in aria_label or '.com' in text or 'http' in text: info['website'] = self.clean_text(text) # Hours elif any(day in text for day in ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']): info['hours'] = self.clean_text(text.replace('\n', '; ')) except: continue # Get rating - try multiple selectors rating_selectors = [ 'span[role="img"][aria-label*="stars"]', 'span[aria-label*="stars"]', 'div[aria-label*="stars"]', 'span.MW4etd' # Common rating class ] for selector in rating_selectors: try: rating_elem = self.driver.find_element(By.CSS_SELECTOR, selector) aria_label = rating_elem.get_attribute('aria-label') or '' # Extract rating rating_match = re.search(r'([\d\.]+)\s*star', aria_label, re.IGNORECASE) if rating_match: info['rating'] = float(rating_match.group(1)) # Extract reviews reviews_match = re.search(r'(\d+[\d,]*)\s*review', aria_label, re.IGNORECASE) if reviews_match: info['num_ratings'] = int(reviews_match.group(1).replace(',', '')) if info['rating']: break except: continue # Add to results if info['name']: results.append(info) # Go back to search results self.driver.back() time.sleep(1.5) except Exception as e: print(f"\n Error extracting shop {idx + 1}: {str(e)[:50]}") # Try to recover try: self.driver.get(url) time.sleep(3) except: pass continue print(f"\n Successfully extracted {len(results)} shops with details") return results def search_all_shops(self, zipcode): """Search for all shop types""" search_terms = [ "smoke shop", "vape shop", "CBD store", "hemp shop", "head shop", "tobacco shop", "kratom shop", "delta 8 shop", "smoke and vape", "hookah shop", "pipe shop" ] all_results = [] seen_names = set() for i, term in enumerate(search_terms): print(f"\nSearching: {term} ({i + 1}/{len(search_terms)})") try: shops = self.search_and_extract(term, zipcode) # Add only new shops new_shops = 0 for shop in shops: # Normalize name for comparison normalized_name = shop['name'].lower().strip() if normalized_name and normalized_name not in seen_names: all_results.append(shop) seen_names.add(normalized_name) new_shops += 1 print(f" Added {new_shops} new unique shops") print(f" Total unique shops: {len(all_results)}") except Exception as e: print(f"Error searching for {term}: {str(e)[:100]}...") continue return pd.DataFrame(all_results) def close(self): """Close browser""" if self.driver: self.driver.quit() def main(): print("=" * 60) print("HYBRID SELENIUM SHOP FINDER") print("=" * 60) print("\nThis version combines speed with reliability.") print("It clicks into shops only to get detailed information.\n") zipcode = input("Enter zipcode: ").strip() if not re.match(r'^\d{5}$', zipcode): print("Please enter a valid 5-digit zipcode") return headless = input("\nRun in background mode? (y/n, default=n): ").lower() == 'y' print("\nStarting hybrid extraction...") print("This should take 5-10 minutes.\n") finder = HybridShopFinder(headless=headless) try: # Search all shop types results_df = finder.search_all_shops(zipcode) if not results_df.empty: # Clean and sort data results_df = results_df.sort_values(['num_ratings', 'rating'], ascending=False) # Save to Documents timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"shop_data_{zipcode}_{timestamp}.csv" # Get Documents path (works for any user) documents = Path.home() / "Documents" filepath = documents / filename # Ensure all columns exist columns = ['name', 'search_term', 'address', 'phone', 'website', 'rating', 'num_ratings', 'hours'] for col in columns: if col not in results_df.columns: results_df[col] = '' # Save with UTF-8 encoding to handle any special characters results_df[columns].to_csv(filepath, index=False, encoding='utf-8-sig') print(f"\n✅ Success! Found {len(results_df)} unique shops") print(f"✅ Data saved to: {filepath}") print(f"✅ File saved to your Documents as: {filename}") # Show summary print("\nTop 15 shops by reviews:") print("-" * 80) display_cols = ['name', 'phone', 'rating', 'num_ratings'] print(results_df[display_cols].head(15).to_string(index=False)) print("\nShops by type:") print(results_df['search_term'].value_counts()) # Data quality print("\nData completeness:") total = len(results_df) print( f"- With phone: {(results_df['phone'] != '').sum()} ({(results_df['phone'] != '').sum() / total * 100:.0f}%)") print( f"- With address: {(results_df['address'] != '').sum()} ({(results_df['address'] != '').sum() / total * 100:.0f}%)") print( f"- With website: {(results_df['website'] != '').sum()} ({(results_df['website'] != '').sum() / total * 100:.0f}%)") print( f"- With hours: {(results_df['hours'] != '').sum()} ({(results_df['hours'] != '').sum() / total * 100:.0f}%)") print( f"- With ratings: {(results_df['rating'] != '').sum()} ({(results_df['rating'] != '').sum() / total * 100:.0f}%)") else: print("\n❌ No shops found.") except Exception as e: print(f"\n❌ Error: {e}") import traceback traceback.print_exc() finally: finder.close() print("\n✅ Browser closed.") if __name__ == "__main__": main()