#!/usr/bin/env python3 """ Google Search Console Data Analyzer Analyzes GSC data to find top performing pages and unique queries per page. - Ensures up to 5 unique mutually exclusive queries per page (configurable). - Filters pages against a sitemap. - For sitemap pages with no/few GSC queries, attempts to assign queries based on URL slug matching against remaining GSC queries. - URL normalization handles trailing slashes and other common variations. - Saves rejected sitemap URLs for review. - Saves unique queries and their total impressions to a separate CSV. """ import os import json from datetime import datetime, timedelta from collections import defaultdict import pandas as pd from googleapiclient.discovery import build from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from google.auth.transport.requests import Request import pickle import re from urllib.parse import urlparse, unquote import csv import heapq SCOPES = ['https://www.googleapis.com/auth/webmasters.readonly'] COMMON_STOPWORDS = { "the", "a", "an", "is", "are", "was", "were", "of", "in", "on", "at", "to", "for", "with", "by", "from", "as", "it", "its", "and", "or", "but", "if", "then", "com", "www", "http", "https", "html", "php", "aspx", "jsp", "blog", "blogs", "page", "news", "article", "articles", "using", "how", "what", "when", "where", "why", "com", "org", "net", "gov", "edu", "uk", "ca", "au", "index" } class GSCAnalyzer: def __init__(self, site_url, credentials_file='credentials.json'): self.site_url = self.normalize_url(site_url, is_base_site=True) if not self.site_url: raise ValueError("Provided SITE_URL is invalid or not HTTP/HTTPS.") self.credentials_file = credentials_file self.service = None self.authenticate() def authenticate(self): # ... (same as before) creds = None if os.path.exists('token.pickle'): with open('token.pickle', 'rb') as token: creds = pickle.load(token) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file( self.credentials_file, SCOPES) creds = flow.run_local_server(port=0) with open('token.pickle', 'wb') as token: pickle.dump(creds, token) self.service = build('searchconsole', 'v1', credentials=creds) print(f"βœ… Successfully authenticated with Google Search Console") def normalize_url(self, url, is_base_site=False): # ... (same as before, ensure it's robust) if not url or not isinstance(url, str): return "" try: temp_url = url if "://" in url: scheme, rest = url.split("://", 1) if "/" in rest: netloc, path_query_fragment = rest.split("/", 1) temp_url = f"{scheme.lower()}://{netloc.lower()}/{path_query_fragment}" else: temp_url = f"{scheme.lower()}://{rest.lower()}" else: # Handle cases like "example.com" or "example.com/path" without scheme for base site if is_base_site and not url.startswith(('http://', 'https://')): if "//" in url : # e.g. //example.com/path temp_url = f"https:{url}" elif "/" in url and not url.startswith("/"): # e.g. example.com/path temp_url = f"https://{url}" elif not "/" in url and "." in url: # e.g. example.com temp_url = f"https://{url}" # else it might be a relative path or malformed, let urlparse handle or fail parsed = urlparse(temp_url) current_scheme = parsed.scheme current_netloc = parsed.netloc current_path = parsed.path if not current_scheme and current_netloc: # Likely parsed from example.com/path current_scheme = urlparse(self.site_url).scheme if hasattr(self, 'site_url') and self.site_url else "https" elif not current_netloc and not current_scheme: if is_base_site: # This case should be largely covered by temp_url logic for is_base_site # but if it still reaches here, it's likely an invalid base URL return "" elif hasattr(self, 'site_url') and self.site_url: base_parsed = urlparse(self.site_url) current_scheme = base_parsed.scheme current_netloc = base_parsed.netloc if not current_path.startswith('/'): current_path = '/' + current_path else: return "" if current_scheme not in ('http', 'https'): return "" if not current_netloc: return "" path_to_normalize = current_path if current_path else '/' normalized_path = path_to_normalize.rstrip('/') if path_to_normalize != '/' else '/' normalized_path = unquote(normalized_path) # Decode URL-encoded characters # Ensure netloc is also lowercased as parsed.netloc might not be if scheme was missing return f"{current_scheme.lower()}://{current_netloc.lower()}{normalized_path}" except ValueError: return "" def save_rejected_sitemap_urls(self, rejected_urls_data): # ... (same as before) if not rejected_urls_data: print("ℹ️ No rejected sitemap URLs to save.") return timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"rejected_sitemap_urls_{timestamp}.csv" try: df = pd.DataFrame(rejected_urls_data) df.to_csv(filename, index=False, encoding='utf-8-sig') print(f"πŸ’Ύ Rejected sitemap URLs saved to: {filename}") except Exception as e: print(f"❌ Error saving rejected sitemap URLs to CSV: {e}") def load_sitemap_urls(self, sitemap_file="sitemap.csv"): # ... (same as before) sitemap_url_map = {} rejected_urls_for_saving = [] sitemap_path = os.path.join(os.path.dirname(__file__), sitemap_file) if not os.path.exists(sitemap_path): print(f"⚠️ Sitemap file '{sitemap_path}' not found. Proceeding without sitemap pre-filtering.") return sitemap_url_map try: print(f"πŸ“– Loading sitemap from '{sitemap_path}'...") df = pd.read_csv(sitemap_path, header=None, usecols=[0], on_bad_lines='warn', encoding='utf-8-sig', skipinitialspace=True, keep_default_na=False) if df.empty: print(f"⚠️ Sitemap file '{sitemap_path}' is empty or has no valid URLs in the first column.") return sitemap_url_map raw_urls_from_sitemap = df.iloc[:, 0].astype(str).tolist() processed_sitemap_entries = 0 for original_url_str_raw in raw_urls_from_sitemap: processed_sitemap_entries += 1 original_url_str_stripped = original_url_str_raw.strip() if not original_url_str_stripped: rejected_urls_for_saving.append({'Original URL': original_url_str_raw, 'Reason for Rejection': 'Empty or Whitespace', 'Normalized Attempt': ''}) continue normalized_sitemap_url = self.normalize_url(original_url_str_stripped) if normalized_sitemap_url: if normalized_sitemap_url not in sitemap_url_map: sitemap_url_map[normalized_sitemap_url] = original_url_str_stripped else: rejected_urls_for_saving.append({'Original URL': original_url_str_stripped, 'Reason for Rejection': 'Duplicate after normalization', 'Normalized Attempt': normalized_sitemap_url}) else: rejected_urls_for_saving.append({'Original URL': original_url_str_stripped, 'Reason for Rejection': 'Normalization Failed (Invalid/Malformed/Non-HTTP)', 'Normalized Attempt': ''}) print(f"πŸ—ΊοΈ Loaded {len(sitemap_url_map)} unique, normalized URLs from sitemap.") print(f" (Processed {processed_sitemap_entries} entries from the sitemap file.)") if not sitemap_url_map and processed_sitemap_entries > 0: print(f"⚠️ No valid HTTP/HTTPS URLs could be loaded from sitemap file '{sitemap_path}'. All entries rejected or empty.") except Exception as e: print(f"❌ Error loading sitemap '{sitemap_path}': {e}") rejected_urls_for_saving.append({'Original URL': f"ENTIRE_FILE_ERROR: {sitemap_file}", 'Reason for Rejection': f"Pandas or File Read Error: {str(e)[:100]}", 'Normalized Attempt': ''}) self.save_rejected_sitemap_urls(rejected_urls_for_saving) return sitemap_url_map def get_search_analytics_data(self, start_date, end_date, country='usa'): # ... (same as before) print(f"πŸ“Š Fetching GSC data from {start_date} to {end_date} for {country.upper()}...") all_rows = [] start_row = 0 row_limit = 25000 while True: print(f" -> Fetching rows starting from {start_row}...") request = { 'startDate': start_date, 'endDate': end_date, 'dimensions': ['page', 'query'], 'dimensionFilterGroups': [{'filters': [{'dimension': 'country', 'operator': 'equals', 'expression': country}]}], 'rowLimit': row_limit, 'startRow': start_row } try: response = self.service.searchanalytics().query(siteUrl=self.site_url, body=request).execute() rows = response.get('rows', []) if not rows: break all_rows.extend(rows) start_row += len(rows) if len(rows) < row_limit: break except Exception as e: print(f" -> An error occurred during API call: {e}") break if start_row > 0: print(f" -> Fetched {start_row} total rows from GSC.") return all_rows def process_data(self, raw_data, sitemap_normalized_urls_set=None): print(f"πŸ”„ Processing {len(raw_data)} GSC data rows...") sitemap_filtering_active = isinstance(sitemap_normalized_urls_set, set) and sitemap_normalized_urls_set if sitemap_filtering_active: print(f" -> Filtering GSC pages against {len(sitemap_normalized_urls_set)} sitemap URLs.") page_data = defaultdict(lambda: {'clicks': 0, 'impressions': 0, 'original_url_gsc': None}) # MODIFIED: This new structure maps a query to all pages it appeared on, with performance metrics. query_performance_on_pages = defaultdict(list) all_query_objects_from_gsc = {} # Kept for slug matching pool aggregated_unique_query_impressions = defaultdict(int) # For sorting queries by total impressions skipped_sitemap_count = 0 for row in raw_data: page_url_original_gsc = row['keys'][0] query_text = row['keys'][1] clicks = row['clicks'] impressions = row['impressions'] ctr = row['ctr'] position = row['position'] normalized_query_text = query_text.lower().strip() aggregated_unique_query_impressions[normalized_query_text] += impressions normalized_gsc_url = self.normalize_url(page_url_original_gsc) if not normalized_gsc_url: continue if sitemap_filtering_active and normalized_gsc_url not in sitemap_normalized_urls_set: skipped_sitemap_count += 1 continue page_data[normalized_gsc_url]['clicks'] += clicks page_data[normalized_gsc_url]['impressions'] += impressions if not page_data[normalized_gsc_url]['original_url_gsc']: page_data[normalized_gsc_url]['original_url_gsc'] = page_url_original_gsc query_info = { 'query': query_text, 'clicks': clicks, 'impressions': impressions, 'ctr': ctr, 'position': position } # MODIFIED: Populate the new data structure for the new assignment logic. query_performance_on_pages[normalized_query_text].append({ 'page': normalized_gsc_url, 'impressions': impressions, 'query_info': query_info }) # Store the query_info object for the instance of the query with the highest single-row impression. # This is used for the secondary slug matching logic. query_key_for_object_store = query_text.lower().strip() if query_key_for_object_store not in all_query_objects_from_gsc or \ impressions > all_query_objects_from_gsc[query_key_for_object_store]['impressions']: all_query_objects_from_gsc[query_key_for_object_store] = query_info if sitemap_filtering_active and skipped_sitemap_count > 0: print(f" -> Skipped {skipped_sitemap_count:,} GSC data entries for pages not in sitemap.") print(f"πŸ“ˆ Found {len(page_data):,} unique pages in GSC data (after sitemap filtering if applied).") sorted_all_gsc_query_objects = sorted( all_query_objects_from_gsc.values(), key=lambda x: x['impressions'], reverse=True ) print(f"πŸ“Š Found {len(aggregated_unique_query_impressions)} unique normalized query strings in GSC data.") # MODIFIED: Return the new data structure instead of the old one. return dict(page_data), dict(query_performance_on_pages), sorted_all_gsc_query_objects, dict(aggregated_unique_query_impressions) def save_aggregated_queries_csv(self, aggregated_query_data, filename_prefix="gsc_unique_queries_total_impressions"): # ... (same as before) if not aggregated_query_data: print("ℹ️ No aggregated query data to save for the unique queries CSV.") return timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{filename_prefix}_{timestamp}.csv" query_list = [{'Query': query, 'Total Impressions': impressions} for query, impressions in aggregated_query_data.items()] query_list_sorted = sorted(query_list, key=lambda x: x['Total Impressions'], reverse=True) try: df = pd.DataFrame(query_list_sorted) df.to_csv(filename, index=False, encoding='utf-8-sig') print(f"πŸ’Ύ Unique queries and their total impressions saved to: {filename}") print(f" -> This CSV contains {len(df)} unique queries.") except Exception as e: print(f"❌ Error saving unique queries CSV ('{filename}'): {e}") # NEW LOGIC: This method replaces the old page-first assignment logic. def perform_query_page_matching(self, aggregated_query_impressions, query_performance_on_pages, queries_per_page=2): """ Assigns queries to pages based on query performance. 1. Sorts all unique queries by their total impressions. 2. For each query, finds the page where it had the highest impressions. 3. Assigns the query to that page, if the page has not yet reached its query limit. 4. If the top page is full, it tries the next-best page for that query, and so on. """ print(f"πŸš€ Performing query-to-page matching with new logic...") print(f" -> Total unique queries to process: {len(aggregated_query_impressions)}") print(f" -> Page assignment limit: {queries_per_page} queries per page") # 1. Sort all unique queries by their total aggregated impressions, descending. sorted_queries = sorted( aggregated_query_impressions.items(), key=lambda item: item[1], reverse=True ) # Keep track of assignments assigned_queries_map = defaultdict(list) page_assignment_count = defaultdict(int) globally_used_query_strings = set() queries_assigned_count = 0 # 2. Iterate through the sorted list of queries. for query_text, total_impressions in sorted_queries: # Get all page performances for this specific query. page_performances = query_performance_on_pages.get(query_text, []) if not page_performances: continue # Sort the pages for THIS query by impressions, descending. # This finds the best page for the current query. sorted_pages_for_this_query = sorted( page_performances, key=lambda x: x['impressions'], reverse=True ) # 3. Attempt to assign this query to its best available page. for performance_data in sorted_pages_for_this_query: page_url = performance_data['page'] query_info_object = performance_data['query_info'] # 4. Check if the page has room for more queries. if page_assignment_count[page_url] < queries_per_page: # 5. Assign the query and update tracking variables. assigned_queries_map[page_url].append(query_info_object) page_assignment_count[page_url] += 1 globally_used_query_strings.add(query_text) queries_assigned_count += 1 # Once assigned, break the inner loop and move to the next query. break print(f"βœ… Query-to-page matching complete.") print(f" -> Total queries assigned: {queries_assigned_count}") print(f" -> Number of pages receiving at least one query: {len(assigned_queries_map)}") return assigned_queries_map, globally_used_query_strings def extract_keywords_from_url(self, url_string): # ... (same as before) if not url_string: return set() try: path = urlparse(url_string).path path_segments = [seg for seg in re.split(r'[/_-]', path) if seg] keywords = set() for segment in path_segments: segment_cleaned = segment.split('.')[0] word = segment_cleaned.lower() if word and word not in COMMON_STOPWORDS and len(word) > 2: keywords.add(word) return keywords except Exception: return set() def assign_queries_by_slug(self, sitemap_pages_to_fill_normalized_urls, sitemap_url_map, gsc_assigned_queries_map, available_query_pool, globally_used_query_strings, queries_per_page=2): # ... (same as before) print(f"🎯 Slug-based assignment: Attempting to find queries for {len(sitemap_pages_to_fill_normalized_urls)} sitemap pages...") slug_assigned_queries_map = defaultdict(list) queries_assigned_by_slug_count = 0 for norm_page_url in sitemap_pages_to_fill_normalized_urls: original_sitemap_url = sitemap_url_map.get(norm_page_url, norm_page_url) num_already_assigned = len(gsc_assigned_queries_map.get(norm_page_url, [])) num_needed = queries_per_page - num_already_assigned if num_needed <= 0: continue page_slug_keywords = self.extract_keywords_from_url(original_sitemap_url) if not page_slug_keywords: continue queries_found_for_this_page_by_slug = 0 for query_info in available_query_pool: query_text_lower = query_info['query'].lower().strip() if query_text_lower in globally_used_query_strings: continue match_found = False for slug_keyword in page_slug_keywords: if slug_keyword in query_text_lower: match_found = True; break if match_found: slug_assigned_queries_map[norm_page_url].append(query_info) globally_used_query_strings.add(query_text_lower) queries_found_for_this_page_by_slug += 1 queries_assigned_by_slug_count +=1 if queries_found_for_this_page_by_slug >= num_needed: break print(f"βœ… Slug-based assignment complete. Assigned {queries_assigned_by_slug_count} new queries across pages.") return slug_assigned_queries_map def generate_report(self, final_report_entries, start_date_str, end_date_str, queries_per_page): # ... (same as before) report = ["=" * 80, "πŸ† GOOGLE SEARCH CONSOLE ANALYSIS REPORT", "=" * 80] report.append(f"πŸ“… Analysis Period: {start_date_str} to {end_date_str}") report.append(f"🌍 Country: USA (default)") report.append(f"πŸ”— Total Sitemap Pages Processed: {len(final_report_entries)}") report.append(f"🎯 Target Queries Per Page: {queries_per_page}") report.extend(["=" * 80, "", "πŸ“Š A. PAGES (Sorted by GSC Clicks, then Assignment Type)", "-" * 70]) for i, entry in enumerate(final_report_entries, 1): report.append(f"{i:3d}. πŸ”— {entry['original_url']}") report.append(f" Source: {entry['assignment_type']}") report.append(f" GSC Clicks: {entry['gsc_clicks']:,}, GSC Impressions: {entry['gsc_impressions']:,}") if entry['gsc_impressions'] > 0: ctr = (entry['gsc_clicks'] / entry['gsc_impressions'] * 100) report.append(f" GSC CTR: {ctr:.2f}%") report.append(f" Assigned Queries ({len(entry['assigned_queries'])}/{queries_per_page}):") if entry['assigned_queries']: for j, q_info in enumerate(entry['assigned_queries'], 1): report.append(f" {j:2d}. \"{q_info['query']}\" (Impr: {q_info['impressions']:,}, Clicks: {q_info['clicks']:,}, CTR: {q_info['ctr']:.2f}%, Pos: {q_info['position']:.1f})") else: report.append(" ❌ No queries assigned.") report.append("") return "\n".join(report) def save_to_csv(self, final_report_entries, filename_prefix="gsc_analysis", queries_per_page=2): # ... (same as before) print("πŸ’Ύ Preparing CSV data...") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") report_data_list = [] for rank, entry in enumerate(final_report_entries, start=1): queries_str = ", ".join(q['query'] for q in entry['assigned_queries']) report_data_list.append({ 'Rank': rank, 'Page URL': entry['original_url'], 'Assignment Source': entry['assignment_type'], 'GSC Clicks': entry['gsc_clicks'], 'GSC Impressions': entry['gsc_impressions'], 'GSC CTR (%)': round((entry['gsc_clicks'] / entry['gsc_impressions'] * 100), 2) if entry['gsc_impressions'] > 0 else 0, 'Number of Assigned Queries': len(entry['assigned_queries']), 'Assigned Queries': queries_str }) print("πŸ“Š Creating DataFrame and saving to CSV...") report_df = pd.DataFrame(report_data_list) filename = f"{filename_prefix}_report_{timestamp}.csv" report_df.to_csv(filename, index=False, encoding='utf-8-sig') print(f"πŸ’Ύ CSV report saved to: {filename}") if not report_df.empty: q_counts = report_df['Number of Assigned Queries'] print(f"πŸ“Š CSV Summary Stats: Total queries in CSV: {q_counts.sum():,}, Avg queries/page: {q_counts.mean():.1f}, Pages w/ full {queries_per_page}: {(q_counts == queries_per_page).sum():,}/{len(report_df):,}") else: print("πŸ“Š CSV is empty.") def run_analysis(self, days=330, save_csv=True, save_report=True, sitemap_file="sitemap.csv", queries_per_page=2): end_date_dt = datetime.now().date() start_date_dt = end_date_dt - timedelta(days=days) start_date_str = start_date_dt.strftime('%Y-%m-%d') end_date_str = end_date_dt.strftime('%Y-%m-%d') print(f"πŸš€ Starting GSC analysis for {self.site_url}") print(f"πŸ“… Date range: {start_date_str} to {end_date_str}") print(f"🎯 Targeting up to {queries_per_page} unique queries per page.") sitemap_url_map = self.load_sitemap_urls(sitemap_file) if not sitemap_url_map: print("⚠️ No sitemap URLs loaded. Analysis might be limited or produce no output if sitemap is crucial.") try: raw_gsc_data = self.get_search_analytics_data(start_date_str, end_date_str) if not raw_gsc_data: print("❌ No data found from GSC for the specified period and criteria.") return sitemap_normalized_set = set(sitemap_url_map.keys()) if sitemap_url_map else None # MODIFIED: Unpack the new `query_performance_on_pages` data structure. gsc_page_data, query_performance_on_pages, all_gsc_queries_for_slug_pool, aggregated_query_impressions = \ self.process_data(raw_gsc_data, sitemap_normalized_set) if aggregated_query_impressions: self.save_aggregated_queries_csv(aggregated_query_impressions) if not gsc_page_data and not sitemap_url_map: print("❌ No page data to analyze from GSC (and/or no sitemap). Analysis cannot proceed.") return # MODIFIED: Call the new query-first assignment logic. gsc_assigned_queries_map, globally_used_query_strings = \ self.perform_query_page_matching(aggregated_query_impressions, query_performance_on_pages, queries_per_page) # The pool of remaining queries for slug matching is prepared. remaining_queries_for_slug_pool = [ q_info for q_info in all_gsc_queries_for_slug_pool if q_info['query'].lower().strip() not in globally_used_query_strings ] sitemap_pages_to_fill_via_slug = [] if sitemap_url_map: for norm_sitemap_url in sitemap_url_map.keys(): num_gsc_assigned = len(gsc_assigned_queries_map.get(norm_sitemap_url, [])) if num_gsc_assigned < queries_per_page: sitemap_pages_to_fill_via_slug.append(norm_sitemap_url) slug_newly_assigned_map = {} if sitemap_pages_to_fill_via_slug and remaining_queries_for_slug_pool: slug_newly_assigned_map = self.assign_queries_by_slug( sitemap_pages_to_fill_via_slug, sitemap_url_map, gsc_assigned_queries_map, remaining_queries_for_slug_pool, globally_used_query_strings, queries_per_page ) else: print("ℹ️ Skipping slug-based assignment (no pages need filling or no remaining queries).") # The rest of the reporting logic remains the same, combining results from both assignment methods. final_report_entries = [] pages_to_report_on_normalized = set(sitemap_url_map.keys()) | set(gsc_page_data.keys()) for norm_url in pages_to_report_on_normalized: original_url = sitemap_url_map.get(norm_url, gsc_page_data.get(norm_url, {}).get('original_url_gsc', norm_url)) gsc_info = gsc_page_data.get(norm_url, {'clicks': 0, 'impressions': 0}) queries_from_gsc = gsc_assigned_queries_map.get(norm_url, []) queries_from_slug = slug_newly_assigned_map.get(norm_url, []) final_assigned_queries = list(queries_from_gsc) current_query_texts = {q['query'].lower().strip() for q in final_assigned_queries} for slug_q in queries_from_slug: if len(final_assigned_queries) < queries_per_page and slug_q['query'].lower().strip() not in current_query_texts: final_assigned_queries.append(slug_q) current_query_texts.add(slug_q['query'].lower().strip()) assignment_type = "NONE" if queries_from_gsc and queries_from_slug: assignment_type = "GSC & SLUG" elif queries_from_gsc: assignment_type = "GSC" elif queries_from_slug: assignment_type = "SLUG" elif not final_assigned_queries : assignment_type = "NO_QUERIES_FOUND" final_report_entries.append({ 'normalized_url': norm_url, 'original_url': original_url, 'gsc_clicks': gsc_info['clicks'], 'gsc_impressions': gsc_info['impressions'], 'assigned_queries': final_assigned_queries, 'assignment_type': assignment_type }) def sort_key(entry): type_priority = {"GSC": 0, "GSC & SLUG": 1, "SLUG": 2, "NO_QUERIES_FOUND": 3, "NONE": 4} return (-entry['gsc_clicks'], -entry['gsc_impressions'], type_priority.get(entry['assignment_type'], 99), entry['original_url']) final_report_entries.sort(key=sort_key) report_text = self.generate_report(final_report_entries, start_date_str, end_date_str, queries_per_page) print("\n" + report_text.split("="*80)[2][:500] + "...\n") # Print first part of report summary if save_csv: self.save_to_csv(final_report_entries, queries_per_page=queries_per_page) if save_report: report_filename = f"gsc_analysis_full_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" with open(report_filename, 'w', encoding='utf-8') as f: f.write(report_text) print(f"πŸ’Ύ Full text report saved to: {report_filename}") print("\nβœ… Analysis completed successfully!") except Exception as e: print(f"❌ Error during analysis run: {str(e)}") import traceback traceback.print_exc() def main(): # ... (same as before) # Be sure to set your Site URL and ensure credentials.json and sitemap.csv are present. SITE_URL = 'https://farmonaut.com/' # Replace with your target domain property URL CREDENTIALS_FILE = 'credentials.json' SITEMAP_FILE = 'sitemap.csv' # Ensure this file is in the same directory or provide full path QUERIES_PER_PAGE = 3 # Set your desired number of queries per page. if 'yourwebsite.com' in SITE_URL or SITE_URL == 'https://example.com/' or not SITE_URL.startswith(('http://', 'https://')): print("❌ Please update SITE_URL in main() with your actual, full website URL (e.g., https://example.com/).") return if not os.path.exists(CREDENTIALS_FILE): print(f"❌ Credentials file '{CREDENTIALS_FILE}' not found. Follow setup instructions.") return sitemap_full_path = os.path.join(os.path.dirname(__file__), SITEMAP_FILE) if not os.path.exists(sitemap_full_path): print(f"⚠️ Sitemap file '{sitemap_full_path}' not found. Analysis will run but sitemap-specific features will be limited.") try: analyzer = GSCAnalyzer(SITE_URL, CREDENTIALS_FILE) analyzer.run_analysis(days=330, sitemap_file=SITEMAP_FILE, queries_per_page=QUERIES_PER_PAGE) except Exception as e: print(f"❌ Failed to run analysis: {str(e)}") import traceback traceback.print_exc() if __name__ == "__main__": main()