import cv2 import numpy as np import json import requests from PIL import Image import io import os import firebase_admin #from firebase_admin import credentials, db from firebase_admin import credentials, firestore, storage import argparse # firebase_admin.initialize_app( # cred, {"databaseURL": "https://farmbase-b2f7e-31c0c.firebaseio.com/"} # ) database_url = "https://farmbase-b2f7e-31c0c.firebaseio.com/" storage_url = "farmbase-b2f7e.appspot.com" cred = credentials.Certificate("credentials.json") if not firebase_admin._apps: default_app = firebase_admin.initialize_app( cred, {"databaseURL": database_url, "storageBucket": storage_url} ) else: print("Already initialized") default_app = firebase_admin._apps["[DEFAULT]"] # Get the Firebase Storage bucket bucket = storage.bucket() def detect_color_change(img1_path, img2_path, output_path, threshold=0, text=None): """ Detect significant color differences (e.g., green to brown) between two images. Args: img1_path (str): Path to the first image (before uprooting). img2_path (str): Path to the second image (after uprooting). output_path (str): Path to save the output image with marked polygons. threshold (int): Threshold for detecting color change. Returns: None """ # Load images img1 = cv2.imread(img1_path) img2 = cv2.imread(img2_path) if img1 is None or img2 is None: print("Error: One or both images could not be loaded.") return # Resize images to the same size if they are different if img1.shape != img2.shape: img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0])) # Convert both images to HSV color space for better color detection img1_hsv = cv2.cvtColor(img1, cv2.COLOR_BGR2HSV) img2_hsv = cv2.cvtColor(img2, cv2.COLOR_BGR2HSV) # Define HSV ranges for green and brown lower_green = np.array([35, 50, 50]) upper_green = np.array([85, 255, 255]) lower_brown = np.array([10, 50, 20]) upper_brown = np.array([30, 255, 200]) # Create masks for green and brown areas in both images mask_green1 = cv2.inRange(img1_hsv, lower_green, upper_green) mask_brown2 = cv2.inRange(img2_hsv, lower_brown, upper_brown) # Detect areas where the green changed to brown uprooted_area = cv2.bitwise_and(mask_green1, mask_brown2) # Clean up the mask using morphological operations kernel = np.ones((3, 3), np.uint8) uprooted_area = cv2.morphologyEx(uprooted_area, cv2.MORPH_CLOSE, kernel) # Find contours of the uprooted areas contours, _ = cv2.findContours(uprooted_area, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) # Draw the detected polygons on the second image output_img = img2.copy() for contour in contours: area = cv2.contourArea(contour) if area > threshold: # Filter small areas cv2.drawContours(output_img, [contour], -1, (0, 0, 255), 2) # Red color polygon # # Create a blank space at the top for text padding = 100 output_img = cv2.copyMakeBorder(output_img, padding, 0, 0, 0, cv2.BORDER_CONSTANT, value=(255, 255, 255)) # Split text if it exceeds image width img_width = output_img.shape[1] font_scale = 0.5 font_thickness = 1 font = cv2.FONT_HERSHEY_SIMPLEX words = text.split() line = "" y_pos = 20 # Start drawing text from this Y position for word in words: # Check if the current line plus the new word would exceed the image width line_with_word = f"{line} {word}".strip() text_size = cv2.getTextSize(line_with_word, font, font_scale, font_thickness)[0] if text_size[0] > img_width - 20: # If the line is too long, move to the next line cv2.putText(output_img, line, (10, y_pos), font, font_scale, (0, 0, 0), font_thickness) line = word y_pos += 20 # Move down to the next line else: line = line_with_word # Draw the final line cv2.putText(output_img, line, (10, y_pos), font, font_scale, (0, 0, 0), font_thickness) # Convert the OpenCV image to a Pillow Image output_img = cv2.cvtColor(output_img, cv2.COLOR_BGR2RGB) pil_img = Image.fromarray(output_img) # Save the output image at 300 DPI pil_img.save(output_path, dpi=(300, 300)) print(f"Output saved to: {output_path}") # Check if any uprooted area is detected (non-zero values) if np.any(uprooted_area): # If any non-zero value exists, an uprooted area is detected print("Uprooted area detected.") return True # If no uprooted area is detected print("No uprooted area detected.") return False # Display the result (optional) #cv2.imshow("Uprooted Areas", output_img) #cv2.waitKey(0) #cv2.destroyAllWindows() def download_image(UID, fieldID, polygonID, day, parameter, local_image_dir, zoom_factor=7): try: # Check if the local path directory exists local_dir = os.path.dirname(local_image_dir) if not os.path.exists(local_dir) and local_dir != '': os.makedirs(local_dir) # Create directories if not present # Reference to the Firebase storage bucket blob = bucket.blob( f"PaidMonitoredFieldsPolygons/{UID}/{fieldID}/{polygonID}/{day}/{parameter}.png" ) # Local file path for saving the image local_filename = f"{local_dir}/{parameter}_{day}.png" # Download the file to a temporary location temp_filename = f"{local_dir}/temp_{parameter}_{day}.png" blob.download_to_filename(temp_filename) # Check if the file exists in Firebase Storage if not blob.exists(): print(f"Error: File does not exist in Firebase Storage.") return # Check if the image file was downloaded and exists locally if not os.path.exists(temp_filename): print(f"Error: Failed to download the image to {temp_filename}.") return # Load the image using OpenCV image = cv2.imread(temp_filename) # Verify that the image is loaded correctly if image is None: print(f"Error: Failed to load the image from {temp_filename}. The file might be corrupted or not a valid image.") return # Zoom the image by the specified zoom factor zoomed_image = cv2.resize(image, (0, 0), fx=zoom_factor, fy=zoom_factor, interpolation=cv2.INTER_LINEAR) # Save the zoomed image to the desired local path cv2.imwrite(local_filename, zoomed_image) #print(f"File downloaded and zoomed. Saved to {local_filename}") # Optionally, remove the temporary file os.remove(temp_filename) except Exception as e: print(f"An error occurred: {e}") def extract_matching_data(file_path, target_polygon_id): # Open and load the JSON file with open(file_path, 'r') as file: data = json.load(file) # Check if the target_polygon_id exists as a key in the outer dictionary if target_polygon_id in data: return data[target_polygon_id] # Return the corresponding inner dictionary else: return {"error": f"polygonID {target_polygon_id} not found."} def fetch_image_url_from_api(uid, field_id, polygon_id, sensed_day, parameter, color_map="1"): endpoint_url = "https://us-central1-farmbase-b2f7e.cloudfunctions.net/getPolygonSatelliteImage" body_obj = { "UID": uid, "FieldID": field_id, "PolygonID" : polygon_id, "ImageType": parameter, "RequestedDay": str(sensed_day), "ColorMap": str(color_map) } response = requests.post(endpoint_url, json=body_obj) if response.status_code == 200: response_json = response.json() image_url = response_json.get('url') # Adjust this key based on the actual API response if image_url: return image_url else: raise ValueError("Image URL not found in the response.") else: raise Exception(f"Failed to fetch image URL. Status code: {response.status_code}") from datetime import datetime def get_first_and_last_december_dates(sensed_days): # Convert strings to datetime objects dates = [datetime.strptime(date, "%Y%m%d") for date in sensed_days] # Filter for December 2023 and December 2024 dec_2023 = [date for date in dates if date.year == 2023 and date.month == 12] dec_2024 = [date for date in dates if date.year == 2024 and date.month == 12] # Find the first date for Dec'23 and the last date for Dec'24 first_dec_2023 = min(dec_2023) if dec_2023 else None last_dec_2024 = max(dec_2024) if dec_2024 else None # Return the result as a list return [ first_dec_2023.strftime("%Y%m%d") if first_dec_2023 else None, last_dec_2024.strftime("%Y%m%d") if last_dec_2024 else None ] def get_nov_dec_dates(sensed_days): # Convert strings to datetime objects dates = [datetime.strptime(date, "%Y%m%d") for date in sensed_days] # Filter for November and December 2023, 2024 nov_2023 = [date for date in dates if date.year == 2023 and date.month == 11] dec_2023 = [date for date in dates if date.year == 2023 and date.month == 12] nov_2024 = [date for date in dates if date.year == 2024 and date.month == 11] dec_2024 = [date for date in dates if date.year == 2024 and date.month == 12] # Combine all dates from the specified months all_dates = nov_2023 + dec_2023 + nov_2024 + dec_2024 # Sort dates in ascending order all_dates_sorted = sorted(all_dates) # Return the result as a list of formatted strings return [date.strftime("%Y%m%d") for date in all_dates_sorted] def pull_data_and_images_to_json(file_path, polygonID): result = {} field_data = extract_matching_data(file_path, target_polygon_id=polygonID) #print(field_data) sensed_days = field_data["SensedDays"].keys() field_name = field_data["Name"] UID = field_data["UID"] fieldID = field_data["fieldID"] print("Pulling field data") for parameter in ["TCI"]: #dates = get_first_and_last_december_dates(sensed_days) dates = get_nov_dec_dates(sensed_days) #dates = ["20231121"] for day in dates: image_dir = f"{UID}/{fieldID}/{polygonID}/images_test/{parameter}/" download_image(UID, fieldID, polygonID, day, parameter, image_dir) print(f"{parameter} images saved at {image_dir}") return dates, UID, fieldID, field_name def find_nonCloudy_dates(date_dict): # Convert string keys to datetime objects for easier comparison dates = {datetime.strptime(date, "%Y%m%d"): status for date, status in date_dict.items()} # Function to find the required dates for a given year def find_december_and_november_dates(year): # December dates for the given year dec_dates = [date for date in dates if date.year == year and date.month == 12 and not dates[date]] if dec_dates: return min(dec_dates).strftime("%Y%m%d") # Return the first False date in December # If no False dates in December, check November nov_dates = [date for date in dates if date.year == year and date.month == 11 and not dates[date]] if nov_dates: return max(nov_dates).strftime("%Y%m%d") # Return the last False date in November return None # Return None if no suitable date is found # Find the required dates for both 2023 and 2024 dec_2023_false = find_december_and_november_dates(2023) dec_2024_false = find_december_and_november_dates(2024) selected_dates = [dec_2023_false, dec_2024_false] return selected_dates def detect_clouds(image_path, threshold=250, min_contour_area=500): # Load the image in RGB format image = cv2.imread(image_path) image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # Convert to grayscale (brightness channel) grayscale = cv2.cvtColor(image_rgb, cv2.COLOR_RGB2GRAY) # Apply a threshold to detect bright areas (clouds) _, cloud_mask = cv2.threshold(grayscale, threshold, 255, cv2.THRESH_BINARY) # Find contours of the cloud regions contours, _ = cv2.findContours(cloud_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) # Filter out small contours that are unlikely to be clouds large_contours = [contour for contour in contours if cv2.contourArea(contour) > min_contour_area] # If there are large contours, it indicates the presence of clouds isCloudy = len(large_contours) > 0 # # Display the mask (cloud regions) # plt.imshow(cloud_mask, cmap='gray') # plt.title("Cloud Mask") # plt.axis('off') # Hide axis # plt.show() #return isCloudy, cloud_mask return isCloudy def process_polygons(file_path, output_folder): with open(file_path, 'r') as file: data = json.load(file) # Extract the top-level keys polygonID_list = list(data.keys()) uprooted_polygons = [] not_uprooted_polygons = [] print(f"Total Polygons: {len(polygonID_list)}") # Check if the output directory exists, and if not, create it output_directory = os.path.join("logs", output_folder) # Join "logs" and the output_folder if not os.path.exists(output_directory): os.makedirs(output_directory) # Create the directory if it doesn't exist # Open the result files in write mode with open(f'logs/{output_folder}/successful_polygons.txt', 'w') as successful_file, \ open(f'logs/{output_folder}/error_polygons.txt', 'w') as error_file, \ open(f'logs/{output_folder}/uprooted_polygons.txt', 'w') as uprooted_file, \ open(f'logs/{output_folder}/non_uprooted_polygons.txt', 'w') as non_uprooted_file: for polygonID in polygonID_list: # For demonstration, slicing here [1:3] can be adjusted try: print(f"Processing PolygonID: {polygonID}") dates, UID, fieldID, fieldName = pull_data_and_images_to_json(file_path, polygonID) sorted_dates = sorted(dates) isCloudy_flag = {} parameter = "TCI" image_dir = f"{UID}/{fieldID}/{polygonID}/images_test/{parameter}" for day in sorted_dates: image_path = f"{image_dir}/{parameter}_{day}.png" isCloudy = detect_clouds(image_path) isCloudy_flag[day] = isCloudy compare_dates = find_nonCloudy_dates(isCloudy_flag) sorted_dates = sorted(compare_dates) img1_path = f"{image_dir}/{parameter}_{sorted_dates[0]}.png" img2_path = f"{image_dir}/{parameter}_{sorted_dates[1]}.png" output_path = f"{image_dir}/comparison_{sorted_dates[0]}_{sorted_dates[1]}.png" text = f"FieldID: {fieldID}, FieldName: {fieldName}, PolygonID: {polygonID}" uprooted_area_flag = detect_color_change(img1_path, img2_path, output_path, text=text) if uprooted_area_flag: uprooted_polygons.append(polygonID) uprooted_file.write(f"{polygonID}\n") else: not_uprooted_polygons.append(polygonID) non_uprooted_file.write(f"{polygonID}\n") # Log successful polygon ID successful_file.write(f"{polygonID}\n") except Exception as e: print(f"Error processing PolygonID {polygonID}: {e}") # Log the polygon ID that encountered an error error_file.write(f"{polygonID}\n") cmd_del = f"rm -rf {UID}/{fieldID}/{polygonID}" os.system(cmd_del) def main(): # Set up argument parser parser = argparse.ArgumentParser(description="Process polygons from a JSON file.") parser.add_argument('json_file', type=str, help="Path to the JSON file to process.") parser.add_argument('output_folder', type=str, help="Path to the JSON file to process.") args = parser.parse_args() # Process the polygons using the provided JSON file path process_polygons(args.json_file, args.output_folder) if __name__ == "__main__": main() # # Example usage # if __name__ == "__main__": # # UID = "TCXcp5VIsfhHZrh0nm2VsgBtcGy2" # # fieldID = "1655556970917" # # polygonID = "1670995374649" # file_path = "full_gavl_polygons_obj.json" # with open(file_path, 'r') as file: # data = json.load(file) # # Extract the top-level keys # polygonID_list = list(data.keys()) # uprooted_polygons = [] # not_uprooted_polygons = [] # print(len(polygonID_list)) # # Open the result files in write mode # with open('logs/successful_polygons.txt', 'w') as successful_file, \ # open('logs/error_polygons.txt', 'w') as error_file, \ # open('logs/uprooted_polygons.txt', 'w') as uprooted_file, \ # open('logs/non_uprooted_polygons.txt', 'w') as non_uprooted_file: # for polygonID in polygonID_list[1:3]: # try: # print(polygonID) # dates, UID, fieldID, fieldName = pull_data_and_images_to_json(file_path, polygonID) # #print(dates) # sorted_dates = sorted(dates) # isCloudy_flag = {} # parameter = "TCI" # image_dir = f"{UID}/{fieldID}/{polygonID}/images_test/{parameter}" # for day in sorted_dates: # image_path = f"{image_dir}/{parameter}_{day}.png" # isCloudy = detect_clouds(image_path) # isCloudy_flag[day] = isCloudy # #print(isCloudy_flag) # compare_dates = find_nonCloudy_dates(isCloudy_flag) # print(compare_dates) # sorted_dates = sorted(compare_dates) # img1_path = f"{image_dir}/{parameter}_{sorted_dates[0]}.png" # Path to the first image # img2_path = f"{image_dir}/{parameter}_{sorted_dates[1]}.png" # Path to the first image # output_path = f"{image_dir}/comparison_{sorted_dates[0]}_{sorted_dates[1]}.png" # text = f"FieldID: {fieldID}, FieldName: {fieldName}, PolygonID: {polygonID}" # uprooted_area_flag = detect_color_change(img1_path, img2_path, output_path, text=text) # if uprooted_area_flag == True: # uprooted_polygons.append(polygonID) # uprooted_file.write(f"{polygonID}\n") # else: # not_uprooted_polygons.append(polygonID) # non_uprooted_file.write(f"{polygonID}\n") # # Log successful polygon ID # successful_file.write(f"{polygonID}\n") # except Exception as e: # print(f"Error processing PolygonID {polygonID}: {e}") # # Log the polygon ID that encountered an error # error_file.write(f"{polygonID}\n")