import os from PIL import Image import cv2 import anthropic import requests from dotenv import load_dotenv import pytesseract import tempfile import ssl import instaloader import pandas as pd from sklearn.feature_extraction.text import TfidfVectorizer import numpy as np from requests.auth import HTTPBasicAuth import mimetypes import os from PIL import Image import cv2 import anthropic from wordpress_xmlrpc import Client, WordPressPost from wordpress_xmlrpc.methods.posts import NewPost from wordpress_xmlrpc.compat import xmlrpc_client from dotenv import load_dotenv import pytesseract import tempfile import base64 from instascrape import Profile, Post import requests from io import BytesIO import time import json import requests import traceback from urllib.parse import urlparse import os import openpyxl import ast def string_to_array(string): # Use ast.literal_eval to safely evaluate the string as a list array = ast.literal_eval(string) return array def get_first_column_values(file_path, sheet_name=None): # Load the workbook workbook = openpyxl.load_workbook(file_path, data_only=True) # If a sheet name is specified, load that sheet, otherwise use the active sheet sheet = workbook[sheet_name] if sheet_name else workbook.active # Get all the values from the first column (Column A) first_column_values = [] for cell in sheet['A']: # Convert cell value to string and append to list, handle empty cells first_column_values.append(str(cell.value) if cell.value is not None else "") return first_column_values # Example usage: # file_path = 'your_file.xlsx' # values = get_first_column_values(file_path, 'Sheet1') # print(values) def get_file_extension(url): # Parse the URL parsed_url = urlparse(url) # Get the path component of the URL path = parsed_url.path # Extract the file extension file_extension = os.path.splitext(path)[1] # Return the extension (without the dot) or an empty string if there's no extension return file_extension[1:] if file_extension else "" # posts = list(posts) ssl._create_default_https_context = ssl._create_unverified_context # Load environment variables load_dotenv() # Instagram session id (you need to get this from your browser after logging in to Instagram) SESSIONID = os.getenv("INSTAGRAM_SESSIONID") # Headers for Instagram requests headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.74 Safari/537.36 Edg/79.0.309.43", "cookie": f'sessionid={SESSIONID};' } # Initialize Instaloader L = instaloader.Instaloader() #L.login('himanshujain4578', 'harish@4321') L.post_metadata_txt_pattern = "" L.download_geotags = False L.save_metadata = False L.save_metadata_json = False L.download_comments = False # Anthropic API key (replace with your actual key) #anthropic_api_key = os.getenv("ANTHROPIC_API_KEY") # WordPress credentials wp_url = "https://www.farmonaut.com/wp-json/wp/v2" wp_username = "ankuromar296" wp_password = "Tjat A2hz 9XMv pXJi YbV0 GR8o" def remove_keywords(selected_keywords, excel_file, sheet_name, keyword_column, num_keywords=5): # Read the Excel sheet df = pd.read_excel(excel_file, sheet_name=sheet_name) # Ensure the keyword column exists if keyword_column not in df.columns: raise ValueError(f"Column '{keyword_column}' not found in the Excel sheet.") # Get the list of keywords # keywords = df[keyword_column].tolist() # # Create a TF-IDF vectorizer # vectorizer = TfidfVectorizer() # # Ensure text and keywords are strings # text = str(text) if isinstance(text, dict) else text # keywords = [str(keyword) if isinstance(keyword, dict) else keyword for keyword in keywords] # # Fit the vectorizer on the text and transform the keywords # tfidf_matrix = vectorizer.fit_transform([text] + keywords) # # Calculate cosine similarity between the text and each keyword # cosine_similarities = (tfidf_matrix * tfidf_matrix.T).toarray()[0][1:] # # Get the indices of the top num_keywords similar keywords # top_indices = np.argsort(cosine_similarities)[-num_keywords:][::-1] # # Select the top keywords # selected_keywords = [keywords[i] for i in top_indices] # Remove the selected keywords from the DataFrame df = df[~df[keyword_column].isin(selected_keywords)] # Save the updated DataFrame back to the Excel file df.to_excel(excel_file, sheet_name=sheet_name, index=False) def select_and_remove_keywords(text, excel_file, sheet_name, keyword_column, num_keywords=5): # Read the Excel sheet df = pd.read_excel(excel_file, sheet_name=sheet_name) # Ensure the keyword column exists if keyword_column not in df.columns: raise ValueError(f"Column '{keyword_column}' not found in the Excel sheet.") # Get the list of keywords keywords = df[keyword_column].tolist() # Create a TF-IDF vectorizer vectorizer = TfidfVectorizer() # Ensure text and keywords are strings text = str(text) if isinstance(text, dict) else text keywords = [str(keyword) if isinstance(keyword, dict) else keyword for keyword in keywords] # Fit the vectorizer on the text and transform the keywords tfidf_matrix = vectorizer.fit_transform([text] + keywords) # Calculate cosine similarity between the text and each keyword cosine_similarities = (tfidf_matrix * tfidf_matrix.T).toarray()[0][1:] # Get the indices of the top num_keywords similar keywords top_indices = np.argsort(cosine_similarities)[-num_keywords:][::-1] # Select the top keywords selected_keywords = [keywords[i] for i in top_indices] # Remove the selected keywords from the DataFrame df = df[~df[keyword_column].isin(selected_keywords)] # Save the updated DataFrame back to the Excel file df.to_excel(excel_file, sheet_name=sheet_name, index=False) return selected_keywords # Existing functions remain the same # (select_and_remove_keywords, get_instagram_posts, extract_text_from_image, extract_text_from_video, generate_blog_content) def call_genai(prompt, temperature, max_tokens): client = anthropic.Anthropic( # defaults to os.environ.get("ANTHROPIC_API_KEY") api_key="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", ) message = client.messages.create( model="claude-3-5-sonnet-20240620", max_tokens=max_tokens, temperature=temperature, system = "You are an SEO expert, and a representative of Farmonaut (farmonaut.com).", messages=[ {"role": "user", "content": prompt} ] ) return message.content[0].text def upload_media_to_wordpress(file_path, title): endpoint = f"{wp_url}/media" auth = HTTPBasicAuth(wp_username, wp_password) mime_type, _ = mimetypes.guess_type(file_path) media_data = { 'alt_text':title, 'caption':title, 'description':title } with open(file_path, 'rb') as file: files = {'file': (os.path.basename(file_path), file, mime_type)} response = requests.post(endpoint, files=files, auth=auth, json = media_data) if response.status_code == 201: return response.json()['id'], response.json()['source_url'] else: print(f"Failed to upload media. Status code: {response.status_code}") print(f"Response: {response.text}") return None, None def extract_text_from_video(video_path): video = cv2.VideoCapture(video_path) fps = int(video.get(cv2.CAP_PROP_FPS)) frame_count = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) duration = frame_count / fps text = "" for i in range(0, int(duration), 1): video.set(cv2.CAP_PROP_POS_MSEC, i * 1000) success, frame = video.read() if not success: break gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) frame_text = pytesseract.image_to_string(gray) text += frame_text + "\n" video.release() return text def process_media(post): media_info = [] with tempfile.TemporaryDirectory() as tmpdir: L.download_post(post, target=tmpdir) media_files = os.listdir(tmpdir) for file in media_files: file_path = os.path.join(tmpdir, file) if file.endswith('.mp4'): text = extract_text_from_video(file_path) media_type = 'video' elif file.endswith(('.jpg', '.jpeg', '.png')): text = extract_text_from_image(file_path) media_type = 'image' # Upload image to WordPress media_id, media_url = upload_media_to_wordpress(file_path) if media_id and media_url: media_info.append({ 'type': media_type, 'text': text, 'id': media_id, 'url': media_url }) else: continue # Skip files that are neither videos nor images return media_info def process_media2(media_url_arr, title): print(media_url_arr) media_info = [] media_num = 0 for url in media_url_arr: print(url) media_num = media_num + 1 folder_name = 'insta_files' file_path = folder_name + '/' + str(media_num) + '.' + str(get_file_extension(url)) response = requests.get(url, stream=True) if response.status_code == 200: with open(file_path, 'wb') as file: for chunk in response.iter_content(chunk_size=8192): if chunk: file.write(chunk) print(f"File downloaded successfully: {file_path}") else: print(f"Failed to download file. Status code: {response.status_code}") # with tempfile.TemporaryDirectory() as tmpdir: # for media_url in media_url_arr: # L.download_post(post, target=tmpdir) media_files = os.listdir(folder_name) for file in media_files: file_path = os.path.join(folder_name, file) if file.endswith('.mp4'): text = extract_text_from_video(file_path) media_type = 'video' elif file.endswith(('.jpg', '.jpeg', '.png')): text = extract_text_from_image(file_path) media_type = 'image' # Upload image to WordPress #media_id, media_url = 'temp', 'temp' media_id, media_url = upload_media_to_wordpress(file_path, title) if media_id and media_url: media_info.append({ 'type': media_type, 'text': text, 'id': media_id, 'url': media_url }) else: continue # Skip files that are neither videos nor images return media_info def extract_text_from_image(image_path): image = Image.open(image_path) text = pytesseract.image_to_string(image) return text def publish_to_wordpress(title, content, media_info, excerpt): endpoint = f"{wp_url}/posts" auth = HTTPBasicAuth(wp_username, wp_password) # Add images to the content for media in media_info: if media['type'] == 'image': content += f'\n\nInstagram image' slug = title.replace(' ', '-') post_data = { 'title': title, 'content': content, 'status': 'publish', 'excerpt':excerpt, 'slug':slug, 'comment_status':'open', 'featured_media': media_info[0]['id'] if media_info else None # Set the first image as featured image } response = requests.post(endpoint, json=post_data, auth=auth) if response.status_code == 201: print("Post published successfully!") else: print(f"Failed to publish post. Status code: {response.status_code}") print(f"Response: {response.text}") def get_instagram_posts(username, limit=100): #profile = instaloader.Profile.from_username(L.context, username) #posts = list(posts) return posts[:limit] def main(): instagram_username = "farmonaut" with open('insta_posts2.json') as f: posts = json.load(f) #posts = get_instagram_posts(instagram_username) #posts = posts['data']['xdt_api__v1__feed__user_timeline_graphql_connection']['edges'] for post in posts: try: # post = post['node'] # print(post) #print(post) post_data = { 'caption': post['caption'], 'media': [] } print(post_data['caption']) prompt = "Can you predict if this is a text related to a festival. Strictly Answer Yes or No: " + post_data['caption'] is_this_a_festival_post = call_genai(prompt, 0, 10) print(is_this_a_festival_post) if "no" in is_this_a_festival_post.lower(): prompt = "Make an interesting title for this blog. Strictly output only the blog title. No other text.: " + post_data['caption'] title = call_genai(prompt, 0, 50) title = title.replace('\"',"") all_keywords = get_first_column_values('final_keywords.xlsx', sheet_name='Sheet1') prompt = f"Based upon this array of keywords: {all_keywords} Return 5 keywords in an array format that best match this text: {post_data['caption']}. Strictly return data in array format. Don't include any other text in the response." keywords = call_genai(prompt, 0, 150) print(keywords) try: remove_keywords(string_to_array(keywords), 'final_keywords.xlsx', 'Sheet1', 'Keywords') except: print(traceback.format_exc()) #print(keywords) #print(post_data) media_count = post.get("images",[]) try: media_count = len(media_count) except: media_count = 0 #print(media_count) # if media_count is None: # media_count = 0 media_url_arr = [] if int(media_count)>0: #if post.typename == 'GraphSidecar': # This is a carousel post with multiple media items for media_url in post.get("images"): #media_url = node['image_versions2']['candidates'][0]['url'] media_url_arr.append(media_url) media_info = process_media2(media_url_arr, title) if media_info: post_data['media'].extend(media_info) else: # This is a single image/video post media_url = post['displayUrl'] media_url_arr.append(media_url) media_info = process_media2(media_url_arr, title) if media_info: post_data['media'].extend(media_info) prompt = f"Generate a HTML formatted blog post with ,

,

, , ,
    ,
      ,
      ,

      , blocks wherever necessary of at least 1200 words in informational tone and as a first-person plural (we, us, our, ours) based on the following Instagram post: {post_data['caption']} \n\n Please don't add any hallucinated information about Farmonaut. Try to incorporate these keywords into the blog: {keywords}. If any of the keywords look unrelated and out of context, then don't add them to the blog. Add Images (URLs) from this JSON object {post_data['media']} into the blog in blocks wherever necessary including the absolute top of the blog. Please include an FAQ section as well." blog_content = call_genai(prompt, 1, 4000) print(blog_content) publish_to_wordpress(title, blog_content, post_data['media'], post_data['caption']) except: print(traceback.format_exc()) time.sleep(100000) main()