from openai import OpenAI import anthropic from tqdm import tqdm from urllib.parse import urlparse from requests.auth import HTTPBasicAuth import mimetypes import ast from PIL import Image import re import unicodedata from eventregistry import * import json from fuzzywuzzy import fuzz from datetime import datetime, timedelta import time def is_similar_title(title1, title2, threshold=70): """ Check if two titles are similar based on a similarity threshold. Args: title1 (str): First title. title2 (str): Second title. threshold (int): Similarity threshold (default 85%). Returns: bool: True if titles are similar, False otherwise. """ return fuzz.token_sort_ratio(title1, title2) >= threshold def get_news_articles(date_start, date_end, max_items=100): """ Fetch news articles based on location and date range, filtering out similar titles. Args: location_uri (str): The URI of the location (e.g., country). date_start (str): The start date for fetching articles (format: YYYY-MM-DD). date_end (str): The end date for fetching articles (format: YYYY-MM-DD). max_items (int): Maximum number of articles to fetch. Default is 100. Returns: list: A list of unique or sufficiently distinct articles fetched based on the query. """ # Define the query with input parameters query = { "$query": { "$and": [ { "$or": [ {"conceptUri": "http://en.wikipedia.org/wiki/Agriculture"}, {"conceptUri": "http://en.wikipedia.org/wiki/Farming"}, { "conceptUri": "http://en.wikipedia.org/wiki/Forestry" }, { "conceptUri": "http://en.wikipedia.org/wiki/Agriculture" }, { "conceptUri": "http://en.wikipedia.org/wiki/Organic_farming" }, { "conceptUri": "http://en.wikipedia.org/wiki/Fertilizer" }, { "conceptUri": "http://en.wikipedia.org/wiki/Agribusiness" } ] }, {"categoryUri": "dmoz/Business/Agriculture_and_Forestry"}, {"$or":[{ "locationUri": "http://en.wikipedia.org/wiki/United_States" }, { "locationUri": "http://en.wikipedia.org/wiki/United_Kingdom" }, { "locationUri": "http://en.wikipedia.org/wiki/Australia" }, { "locationUri": "http://en.wikipedia.org/wiki/Europe" }, { "locationUri": "http://en.wikipedia.org/wiki/Canada" }, { "locationUri": "http://en.wikipedia.org/wiki/Africa" }, { "locationUri": "http://en.wikipedia.org/wiki/South_America" }, { "locationUri": "http://en.wikipedia.org/wiki/Middle_East" }, { "locationUri": "http://en.wikipedia.org/wiki/Asia" }]}, { "dateStart": date_start, # Use the input dateStart "dateEnd": date_end, # Use the input dateEnd }, ] } } # Initialize query q = QueryArticlesIter.initWithComplexQuery(query) # List to store unique/sufficiently distinct articles articles_list = [] seen_titles = [] # List to track seen titles (for fuzzy matching) # Fetch and append unique articles to the list for article in q.execQuery(er, maxItems=max_items): title = article.get("title", None) if title: # Check for duplicates or highly similar titles using fuzzy matching if not any( is_similar_title(title, seen_title) for seen_title in seen_titles ): articles_list.append(article) seen_titles.append(title) # Track the title for similarity comparison return articles_list def create_slug(title): # Normalize the string and remove accents title = ( unicodedata.normalize("NFKD", title).encode("ascii", "ignore").decode("utf-8") ) # Convert to lowercase title = title.lower() # Remove special characters and replace spaces with hyphens title = re.sub( r"[^a-z0-9\s-]", "", title ) # Keep only alphanumeric characters, spaces, and hyphens title = re.sub(r"\s+", "-", title).strip( "-" ) # Replace spaces with hyphens and trim leading/trailing hyphens return title def string_to_array(string): # Use ast.literal_eval to safely evaluate the string as a list try: array = ast.literal_eval(string) print(array) except: array = string return array def call_openai(prompt, temperature, max_tokens): ## Initialize OpenAi api_key = "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" api_key = "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" client = OpenAI(api_key=api_key) completion = client.chat.completions.create( model="gpt-4o-mini-2024-07-18", messages=[ { "role": "system", "content": "You are a expert in SEO and a representative of Farmonaut.", }, {"role": "user", "content": prompt}, ], max_tokens=max_tokens, temperature=temperature, ) return completion.choices[0].message.content def call_genai(prompt, temperature, max_tokens): client = anthropic.Anthropic( # defaults to os.environ.get("ANTHROPIC_API_KEY") api_key="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", ) message = client.messages.create( model="claude-3-5-sonnet-20240620", max_tokens=max_tokens, temperature=temperature, system="You are an SEO expert, a gis/ remote sensing expert, an agriculture and horticulture scientist, and a representative of Farmonaut (farmonaut.com).", messages=[{"role": "user", "content": prompt}], ) # print(message) return message.content[0].text def ai_image_url(prompt): ## Initialize OpenAi api_key = "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" api_key = "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" client = OpenAI(api_key=api_key) response = client.images.generate( model="dall-e-3", prompt=prompt, size="1024x1024", quality="standard", n=1, ) image_url = response.data[0].url return image_url def get_file_extension(url): # Parse the URL parsed_url = urlparse(url) # Get the path component of the URL path = parsed_url.path # Extract the file extension file_extension = os.path.splitext(path)[1] # Return the extension (without the dot) or an empty string if there's no extension return file_extension[1:] if file_extension else "" def add_watermark(main_image_path, watermark_path): # Open the main image main_image = Image.open(main_image_path).convert("RGBA") # Open the watermark image watermark = Image.open(watermark_path).convert("RGBA") # Calculate the new size for the watermark new_height = int(main_image.height * 0.1) aspect_ratio = watermark.width / watermark.height new_width = int(new_height * aspect_ratio) # Resize the watermark watermark = watermark.resize((new_width, new_height), Image.LANCZOS) # Calculate the position for the watermark (bottom right) # position = (main_image.width - watermark.width, main_image.height - watermark.height) position = (main_image.width - watermark.width, 0) # Create a new transparent image the same size as the main image transparent = Image.new("RGBA", main_image.size, (0, 0, 0, 0)) # Paste the watermark onto the transparent image transparent.paste(watermark, position, watermark) # Combine the main image with the watermark output = Image.alpha_composite(main_image, transparent) # Convert back to the original mode if it wasn't RGBA original_image = Image.open(main_image_path) if original_image.mode != "RGBA": output = output.convert(original_image.mode) # Save the result, overwriting the original image output.save(main_image_path) print(f"Watermark added to {main_image_path}") def convert_png_to_jpg(file_path): # Check if the file exists and is a PNG if not os.path.isfile(file_path) or not file_path.lower().endswith(".png"): raise ValueError("The provided file is not a valid PNG file.") # Open the PNG image with Image.open(file_path) as img: # Get the file name without extension file_name = os.path.splitext(file_path)[0] # Convert to RGB if the image has an alpha channel if img.mode in ("RGBA", "LA") or ( img.mode == "P" and "transparency" in img.info ): img = img.convert("RGB") # Save as JPG jpg_path = f"{file_name}.jpg" img.save(jpg_path, "JPEG") # Remove the original PNG file os.remove(file_path) print(f"Converted {file_path} to {jpg_path} and removed the original PNG.") def upload_media_to_wordpress(file_path, title): # WordPress credentials wp_url = "https://www.farmonaut.com" wp_username = "ankuromar296" wp_password = "Tjat A2hz 9XMv pXJi YbV0 GR8o" endpoint = f"{wp_url}/wp-json/wp/v2/media" auth = HTTPBasicAuth(wp_username, wp_password) mime_type, _ = mimetypes.guess_type(file_path) media_data = {"alt_text": title, "caption": title, "description": title} upload_name = f"{title}_{os.path.basename(file_path)}" with open(file_path, "rb") as file: files = {"file": (upload_name, file, mime_type)} # files = {'file': (os.path.basename(file_path), file, mime_type)} response = requests.post(endpoint, files=files, auth=auth, json=media_data) if response.status_code == 201: return response.json()["id"], response.json()["source_url"] else: print(f"Failed to upload media. Status code: {response.status_code}") print(f"Response: {response.text}") return None, None def publish_or_update_wordpress_post(post_data): # WordPress credentials wp_url = "https://www.farmonaut.com" wp_username = "ankuromar296" wp_password = "Tjat A2hz 9XMv pXJi YbV0 GR8o" # WordPress REST API endpoints posts_url = f"{wp_url}/wp-json/wp/v2/posts" # Set up authentication auth = HTTPBasicAuth(wp_username, wp_password) # Check if the API is accessible try: response = requests.get(f"{wp_url}/wp-json", auth=auth) response.raise_for_status() except requests.exceptions.RequestException as e: raise Exception(f"Failed to access WordPress API: {str(e)}") # Check if a post with the same title exists # existing_post = check_existing_post_by_title(site_url, auth, post_data['title']) # if existing_post: # # Update existing post # update_url = f"{posts_url}/{existing_post['id']}" # post_data['id'] = existing_post['id'] # response = requests.post(update_url, json=post_data, auth=auth) # else: # Create new post response = requests.post(posts_url, json=post_data, auth=auth) if response.status_code in [200, 201]: return response.json() else: raise Exception(f"Failed to publish/update post: {response.text}") def process_media2(media_url_arr, title): # print(media_url_arr) media_info = [] media_num = 0 folder_name = "insta_files_news" for url in media_url_arr: # print(url) media_num = media_num + 1 file_path = ( folder_name + "/" + str(media_num) + "." + str(get_file_extension(url)) ) try: response = requests.get(url, stream=True) except: print(traceback.format_exc()) response = None if response is None: continue os.makedirs(os.path.dirname(file_path), exist_ok=True) if response.status_code == 200: with open(file_path, "wb") as file: for chunk in response.iter_content(chunk_size=8192): if chunk: file.write(chunk) print(f"File downloaded successfully: {file_path}") add_watermark(file_path, "watermark.jpg") convert_png_to_jpg(file_path) else: print(f"Failed to download file. Status code: {response.status_code}") # with tempfile.TemporaryDirectory() as tmpdir: # for media_url in media_url_arr: # L.download_post(post, target=tmpdir) media_files = os.listdir(folder_name) for file in media_files: file_path = os.path.join(folder_name, file) media_id, media_url = upload_media_to_wordpress(file_path, title) if media_id and media_url: media_info.append( {"type": "image", "text": title, "id": media_id, "url": media_url} ) else: continue # Skip files that are neither videos nor images return media_info def create_breadcrumbs_fromURL(parsed_url): # Split the path into segments path_segments = parsed_url.path.strip("/").split("/") # Split by '/' # Construct breadcrumb items breadcrumbs = {"@type": "BreadcrumbList", "itemListElement": []} # Add the home item breadcrumbs["itemListElement"].append( { "@type": "ListItem", "position": 1, "name": "Home", "item": f"{parsed_url.scheme}://{parsed_url.netloc}/", # Base URL } ) # Add items for each segment in the path for position, segment in enumerate(path_segments, start=2): # Start at position 2 breadcrumb_item = { "@type": "ListItem", "position": position, "name": segment.replace( "-", " " ).title(), # Convert hyphens to spaces and capitalize "item": f"{parsed_url.scheme}://{parsed_url.netloc}/{'/'.join(path_segments[:position-1])}", # Construct the URL up to this segment } breadcrumbs["itemListElement"].append(breadcrumb_item) return breadcrumbs def create_news_schema(title, media_url_arr, blog_url): # Extract image URLs image_urls = list(media_url_arr) ## make sure it is a list # Organizatio details organization = ( { "@type": "Organization", "name": "Farmonaut", "url": "https://farmonaut.com", "sameAs": [ "https://www.facebook.com/farmonaut", "https://twitter.com/farmonaut", "https://www.linkedin.com/company/farmonaut", "https://www.instagram.com/farmonaut", # Add other social media links as necessary ], }, ) breadcrumbs = create_breadcrumbs_fromURL(urlparse(blog_url)) # Get current timestamp in ISO 8601 format current_timestamp = datetime.utcnow().isoformat() + "Z" return { "@context": "https://schema.org", "@type": "NewsArticle", "headline": title, "image": image_urls, "author": organization, "breadcrumb": breadcrumbs, "datePublished": current_timestamp, } def get_news_summary(title, content): ## Get SEO Optimized Summary total_tokens = int(len(content)/4) prompt = f"""Rewrite this text in roughly {total_tokens} tokens : Text: {content} Note: Do not add any additional information whatsoever.""" # prompt = f"""Summarize the content of this News Article in a highly SEO-optimized way, keeping it under 200 words. # Include key points that would appeal to a reader and potential search engines: # Title: {title}, Content: {content} # """ summary = call_openai(prompt, temperature=1, max_tokens=8000) return summary def get_keywords(title, content): ## Get SEO Optimized Keywords prompt = f"""From the content and summary provided, extract 5-10 SEO-optimized keywords that are highly relevant to the news topic: Title: {title}, Content: {content} Output format: - Return only the refined array of keywords - Do not include any explanatory text or commentary """ keywords = call_genai(prompt, temperature=1, max_tokens=1000) keywords = string_to_array(keywords) return keywords def generate_title(keywords, summary): ## Get SEO Optimized Title prompt = f"""Using the following keywords and summary, create a catchy SEO-optimized title for the news article. Make it 150 characters or less and include at least one power word. Keywords: {keywords}. Summary: {summary} 4. Output format: - Return only the suggested title - Do not include any explanatory text, quotation marks, or additional commentary""" title = call_genai(prompt, temperature=1, max_tokens=1000) return title def get_keyphrases(title, keywords, summary): ## Get SEO Optimized KeyPhrases prompt = f"""Using the following title, keywords, and summary, generate a set of 5-7 SEO-friendly keyphrases. Ensure the keyphrases are relevant and likely to rank well in search engines. Title: {title}, Keywords: {keywords}, Summary: {summary} 3. Output format: - Return results as an array of strings - Each keyphrase should be its own element in the array - Do not include any explanatory text or commentary - If fewer than 10 suitable keyphrases are found, include only those that meet the criteria Example output format: ["keyphrase 1", "keyphrase 2", "keyphrase 3", ...]""" keyphrases = call_genai(prompt, temperature=1, max_tokens=1000) keyphrases = string_to_array(keyphrases) return keyphrases def generate_summary(keywords, keyphrases, summary): #return summary ## Rewrite Summary - SEO Optimized prompt = f"""Task: Generate an SEO-optimized news blog's meta description: Inputs: - Keywords: {keywords} - Key phrases: {keyphrases} - Reference Text: {summary} Output format: - Provide only the 150-word summary - DO NOT OUTPUT ANY OTHER TEXT WITH THE RESPONSE """ summary = call_genai(prompt, 0, 1000) return summary def generate_images(keywords, title, summary, keyphrases): ## Get Image Description prompt = f""" Task: Suggest two SEO-optimizing AI image descriptions (DALL-E 3) for News Article. Inputs: a. Keywords: {keywords} b. Title: {title} c. Context: {summary} d. KeyPhrases: {keyphrases} Output format: [ "Detailed description of first image, incorporating relevant keywords and focusing on a key aspect of the blog topic or Farmonaut's services.", "Detailed description of second image, highlighting a different facet of the blog content or Farmonaut's technology, using appropriate keywords." ] Note: Focus on creating descriptions that would result in images that add significant value to the blog post while optimizing for search engines. Ensure descriptions are distinct from each other and highly relevant to the content. DO NOT OUTPUT ANY OTHER TEXT WITH THE RESPONSE. """ image_descriptions = call_genai(prompt, 1, 1000) print("\nImage Descriptions:") image_descriptions = string_to_array(image_descriptions) ## Generate Images media_url_arr = [] for image_description in image_descriptions: try: media_url_arr.append(ai_image_url(image_description)) print("media url", media_url_arr) except: print(traceback.format_exc()) return media_url_arr def generate_news_article(title, keywords, keyphrases, summary, media): ## Generate News Article prompt = f"""Using the title, keywords, keyphrases, and summary, write a news article of upto 1500 words. Make sure the article is SEO-optimized, informative, engaging, and integrates the keyphrases naturally. Title: {title}, Keywords: {keywords}, Keyphrases: {keyphrases}, Summary: {summary} Key requirements: - Use HTML formatting including ,

,

, , ,
    ,
      ,
      , and

      tags where appropriate - Include and naturally incorporate as many of these keywords: {keywords} - Include these key phrases: {keyphrases} - Write in the language of keywords/keywords if keywords/phrases are not in English - Mandatorily Localize the content if location names are available in keywords. - Add all the images from this JSON object: {media} with border-radius:16px, box-shadow: 10px 10px 15px, cursor: pointer. These images should open https://farmonaut.com/app_redirect when clicked. All images should be placed within 75% content of the blog. - Add links: - App Button Image: https://farmonaut.com/Images/web_app_button.png, Link on this button: https://farmonaut.com/app_redirect, height: 80px - API: https://sat.farmonaut.com/api - API Developer Docs:https://farmonaut.com/farmonaut-satellite-weather-api-developer-docs/ - Android App Button Image: https://farmonaut.com/wp-content/uploads/2020/01/get_it_on_google_play.png, Link on this Button: https://play.google.com/store/apps/details?id=com.farmonaut.android, height: 80px - iOS App Button Image: https://farmonaut.com/wp-content/uploads/2020/01/available_on_app_store.png, Link on this Button: https://apps.apple.com/in/app/farmonaut/id1489095847, height: 80px - Distribute these links within top 75% content of the blog with bold font - Use bullet points and subheadings (font color: #034d5c) to improve readability - Make the content mobile responsive Additional guidelines: - Achieve Flesch Reading Ease Score of at least 60 - Provide detailed explanations and examples - Ensure all content is factual and based on provided information - Organize information logically with clear transitions between sections - Use varied sentence structures and vocabulary for engaging reading - Mandatorily implement all the latest SEO guidelines provided by Google News Please generate the news article based on these requirements, ensuring it's well-structured, and upto 1500 words long. """ news_content = call_genai(prompt, temperature=1, max_tokens=8000) return news_content def generate_and_publish_news_article(article): # Extract the article content (or provide a default value if not available) title_article = article.get("title", "No title available") content_article = article.get("body", "No content available") news_summary = content_article summary = news_summary # ## Get News article Data # news_summary = get_news_summary(title_article, content_article) # print("Getting keywords") keywords = get_keywords(title_article, content_article) title = generate_title(keywords, news_summary) print("title", title) print("Getting keyphrases") keyphrases = get_keyphrases(title, keywords, news_summary) excerpt = generate_summary(keywords, keyphrases, news_summary) # print("Summary Generated :- ", summary) ## Generate Images media_url_arr = generate_images(keywords, title, summary, keyphrases) media_info = process_media2(media_url_arr, title) print("Media Processed") ## post_data = {"caption": excerpt, "media": []} if media_info: post_data["media"].extend(media_info) post_data["featured_media"] = media_info[0]["id"] if media_info else None media = post_data["media"] news_content = generate_news_article(title, keywords, keyphrases, summary, media) print("news content processed \n") ## blog URL blog_url = "https://farmonaut.com/news/" + create_slug(title) ## Generate Schema image_urls = [item["url"] for item in media_info if item["type"] == "image"] structured_schema = create_news_schema(title, image_urls, blog_url) # Convert structured data to JSON-LD format structured_schema_script = ( f'' ) post_data["title"] = title post_data["content"] = structured_schema_script + news_content post_data["status"] = "publish" post_data["excerpt"] = excerpt post_data["comment_status"] = "open" post_data["categories"] = [573] ## need to hard code for news ## Publish to website publish_or_update_wordpress_post(post_data) return post_data def publish_news_spaced(articles): """ Publish news articles spaced equally over 24 hours. Args: articles (list): List of articles to publish. generate_and_publish_news_article (func): Function to generate and publish an article. """ total_articles = len(articles) if total_articles == 0: print("No articles to publish.") return # Calculate time interval in seconds interval = ( 24 * 3600 / total_articles ) # 24 hours in seconds, divided by the number of articles for article in tqdm(articles, desc="Processing News articles"): try: generate_and_publish_news_article(article) # Generate and publish the article time.sleep( interval ) # Wait for the calculated interval before publishing the next article except: print(traceback.format_exc()) while True: ####### News APIKEY ####### APIKEY = "ae5e326f-a428-41d3-b0c8-09f1746f98b1" ## Initailize News Api er = EventRegistry(apiKey=APIKEY, allowUseOfArchive=False) ### Usage #location = "http://en.wikipedia.org/wiki/United_States" # start_date = "2024-10-12" # Example start date # end_date = "2024-10-13" # Example end date # Get today's date and yesterday's date end_date = datetime.today().strftime("%Y-%m-%d") start_date = (datetime.today() - timedelta(days=1)).strftime("%Y-%m-%d") # Fetch articles articles = get_news_articles(start_date, end_date, max_items=100) # Print the number of articles fetched print(f"Number of articles fetched: {len(articles)}") publish_news_spaced(articles) time.sleep(2*60*60)