import requests from datetime import datetime import json import threading UID = "CFv7IjeJR8ZKFWSXb95qzgvymCv1" # Kapil farmonaut UID = "HF9dSIoDEYcTwebrRWVzb08SQln2" ## loni_dcm def get_sesnsed_days(UID, FieldID): endpointUrl = "https://us-central1-farmbase-b2f7e.cloudfunctions.net/getSensedDays" bodyObj = { # "PolygonID" : "1668483636222", "UID": UID, "FieldID": FieldID, } response = requests.post(endpointUrl, json=bodyObj) # print("Status code: ", response.status_code) return response def get_sat_image(UID, fieldId, SensedDay, ImageType): endpointUrl = "https://us-central1-farmbase-b2f7e.cloudfunctions.net/getFieldImage" bodyObj = { "UID": UID, "FieldID": fieldId, "ImageType": ImageType, "SensedDay": SensedDay, "Colormap": "1", } response = requests.post(endpointUrl, json=bodyObj) return response.json()["url"] def calculate_pixel_count(image_url): from PIL import Image import numpy as np import requests from io import BytesIO # Function to download image from URL def download_image_from_url(url): response = requests.get(url) if response.status_code == 200: image = Image.open(BytesIO(response.content)) return image else: print("Failed to download image") return None # Open the image # image = Image.open(image_url) image = download_image_from_url(image_url) # Convert the image to a numpy array image_array = np.array(image) # Define the color palette colors = { (255, 255, 255, 255): "White", (0, 0, 0, 0): "EmptyColor", (17, 167, 95, 255): "Green", # #11a75f (145, 16, 44, 255): "Red", # #91102c (234, 79, 59, 255): "Orange", # #ea4f3b (60, 19, 97, 255): "Purple", # #3c1361 } # Count pixels for each color color_counts = {} total_pixels = 0 # Initialize total pixel count for color, name in colors.items(): if name != "EmptyColor": # Exclude "EmptyColor" count = np.sum(np.all(image_array == color, axis=-1)) total_pixels += count # Increment total_pixels color_counts[name] = count # Calculate percentage for each color color_percentages = {} for name, count in color_counts.items(): percentage = (count / total_pixels) * 100 color_percentages[name] = percentage return color_percentages def process_one_field_data(fieldId): str_ref_db_userId = "/PaidMonitoredFields/PMF/" + UID + "/" + str(fieldId) ref = db.reference(f"{str_ref_db_userId}") fieldName = ref.child("FieldDescription").get() if fieldName is None: fieldName = ref.child("FieldAddress").get() if fieldName is None: fieldName = ref.child("Name").get() if fieldName is None: fieldName = ref.child("Phone").get() if fieldName is None: fieldName = "" fieldArea = ref.child("FieldArea").get() fieldArea = "{:,} sq m".format(fieldArea) lastSatelliteVisit = ref.child("LatestDay").get() if lastSatelliteVisit is None: ## skip return coordinates_db = ref.child("Coordinates").get() if coordinates_db is None: return # Define custom sorting order lat_lon_pairs = [] def custom_sort(key): if key == "a": return 0 elif key.startswith("P_"): return int(key[2:]) # Sort the keys of the Coordinates dictionary using the custom sorting function sorted_coordinates_keys = sorted( coordinates_db.keys(), key=custom_sort, ) # Iterate over the sorted keys for point in sorted_coordinates_keys: coordinates = coordinates_db[point] latitude = float(coordinates["Latitude"]) longitude = float(coordinates["Longitude"]) lat_lon_pairs.append([latitude, longitude]) if lat_lon_pairs: lat_lon_pairs.append(lat_lon_pairs[0]) # Separate the latitude and longitude values latitudes = [coord[0] for coord in lat_lon_pairs] longitudes = [coord[1] for coord in lat_lon_pairs] # Generate image URL ImageType = "hybrid" image_url = get_sat_image(UID, fieldId, lastSatelliteVisit, ImageType) # Calculate pixel count percentage_data = calculate_pixel_count(image_url) row_dict = { "fieldName": fieldName, "fieldArea": fieldArea, "lastSatelliteVisit": datetime.strptime(lastSatelliteVisit, "%Y%m%d").strftime( "%d %B %Y" ), "coordinates": lat_lon_pairs, "maxLat": max(latitudes), "maxLon": max(longitudes), "minLat": min(latitudes), "minLon": min(longitudes), "url": image_url, "Satellite_Data": { "white": "{:.2f}%".format(float(percentage_data["White"])), "green": "{:.2f}%".format(float(percentage_data["Green"])), "orange": "{:.2f}%".format(float(percentage_data["Orange"])), "purple": "{:.2f}%".format(float(percentage_data["Purple"])), "red": "{:.2f}%".format(float(percentage_data["Red"])), }, } # Append the dictionary to the list data_list.append(row_dict) import firebase_admin from firebase_admin import db, credentials # # Config/Setup # #------------------------------------------------------------------------------- database_url = "https://farmbase-b2f7e-31c0c.firebaseio.com/" cred = credentials.Certificate( "kapil_key_farmbase-b2f7e-firebase-adminsdk-st5nt-aeaad650be.json" ) if not firebase_admin._apps: default_app = firebase_admin.initialize_app(cred, {"databaseURL": database_url}) else: default_app = firebase_admin._apps["[DEFAULT]"] # Get all FieldIDs of the user str_ref_db_userId = "/PaidMonitoredFields/PMF/" + UID ref = db.reference(f"{str_ref_db_userId}") FieldIDs = list(ref.get(shallow=True).keys()) # Shallow to get only keys data_list = [] batch_no = 20 # length = 61 #len(FieldIDs) for i in range((len(FieldIDs) + batch_no - 1) // batch_no): threads = [] print(batch_no * (i), batch_no * (i + 1)) for fieldId in FieldIDs[ batch_no * (i) : batch_no * (i + 1) ]: # Limit the batch size t = threading.Thread(target=process_one_field_data, args=(fieldId,)) t.start() threads.append(t) for t in threads: t.join() # Wait for all threads to complete i = i + 1 # print(data_list) # Convert the list of dictionaries to a JSON string json_data = json.dumps(data_list, indent=2) # Read the content of the HTML file template_file_name = "report_template_3.html" file_name = datetime.today().strftime("%Y%m%d") + "_" + UID[:5] + ".html" with open(template_file_name, "r", encoding="utf-8") as file: content_str = file.read() content_str = content_str.replace("DATA_TO_REPLACE", json_data) ## Report Data content_str = content_str.replace( "DATE_TO_REPLACE", datetime.today().strftime("%d %B %Y") ) ## Report Date # Save the modified HTML content with open(file_name, "w") as f: f.write(content_str)