# -*- coding: utf-8 -*- """AWD.ipynb Automatically generated by Colab. """ import firebase_admin from firebase_admin import credentials, db import traceback cred = credentials.Certificate("servicekey.json") firebase_admin.initialize_app( cred, {"databaseURL": "https://farmbase-b2f7e-31c0c.firebaseio.com/"} ) from datetime import datetime import matplotlib.pyplot as plt import numpy as np import matplotlib.image as mpimg import re def handle_snapshot(snapshot): data = snapshot.val() print("Retrieved data:", data) return data # You can process or use the retrieved data here # Function to check if the dates are within the specified range def check_dates_in_range(dates, start_date, end_date): results = [] for date_str in dates: date = datetime.strptime(date_str, '%Y%m%d') if start_date <= date <= end_date: results.append(date_str) return results # Function to find the number of days in a particular month. def mmdd_to_days(mmdd_date): # Extract month and day from MMDD date month = int(mmdd_date[:2]) day = int(mmdd_date[2:]) # Number of days in each month days_in_month = [0, 31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31] # Calculate the number of days since the start of the year days = sum(days_in_month[:month]) + day return days ########################## extracting information about the field######################################## def get_fieldobj(path_fieldobj, UID, fieldID, start_date, end_date): ref = db.reference(path_fieldobj).child(UID).child(fieldID) field_obj = ref.get() #print(field_obj) field_description = field_obj.get("FieldDescription", "NA") field_address = field_obj.get("FieldAddress","NA") latitude = field_obj["Coordinates"]["a"]["Latitude"] longitude = field_obj["Coordinates"]["a"]["Longitude"] health_obj = field_obj["Health"] health_num = 0 for key in health_obj: health_num += 1 ndwi_obj = {} rsm_obj = {} if health_num > 0: ndwi_obj = health_obj["ndwi"] rsm_obj = health_obj["rsm"] prev_ndwi = None prev_rsm = None data_na_ndwi = [] data_na_rsm = [] # Iterate over the keys in any of the four datasets for key in sorted(set(set(ndwi_obj.keys()) | set(rsm_obj.keys()))): # Check if the key exists in each dataset ###If the data point is not available, then we take the previous value###### if key in ndwi_obj: if ndwi_obj[key] != 'NaN': if ndwi_obj[key].isnumeric(): ndwi_obj[key] = int(ndwi_obj[key]) / 100 prev_ndwi = ndwi_obj[key] else: ndwi_obj[key] = float(ndwi_obj[key]) else: ndwi_obj[key] = prev_ndwi data_na_ndwi.append(key) if key in rsm_obj: if rsm_obj[key] != 'NaN': if rsm_obj[key].isnumeric(): rsm_obj[key] = int(rsm_obj[key]) / 100 prev_rsm = rsm_obj[key] else: rsm_obj[key] = float(rsm_obj[key]) else: rsm_obj[key] = prev_rsm data_na_rsm.append(key) ##############Scaling RSM values to NDWI values##################### if key in rsm_obj: if rsm_obj[key] >= 0.6: rsm_obj[key] = rsm_obj[key] - 0.2 ###################################################################################### SensedDays = list(field_obj["SensedDays"].keys()) SensedDays_sorted = sorted(SensedDays) last_sensed_day = SensedDays_sorted[-1] crop_irrigation = {} SensedDays_range = check_dates_in_range(SensedDays_sorted, start_date, end_date) for day in SensedDays_range: if field_obj["SensedDays"][day] != "yes": if field_obj["SensedDays"][day]["isCloudy"] == True: for key in sorted(set(set(ndwi_obj.keys()) | set(rsm_obj.keys()))): if key == day: # Check if the key corresponds to the sensed day crop_irrigation[key] = rsm_obj.get(key, ndwi_obj[key]) # Replace NDWI with RSM if available #print(crop_health) elif field_obj["SensedDays"][day]["isCloudy"] == False: for key in sorted(set(ndwi_obj.keys()) | set(rsm_obj.keys())): if key == day: crop_irrigation[key] = ndwi_obj.get(key, ndwi_obj[key]) elif field_obj["SensedDays"][day] == "yes": for key in sorted(set(ndwi_obj.keys()) | set(rsm_obj.keys())): if key == day: crop_irrigation[key] = ndwi_obj.get(key, ndwi_obj[key]) return latitude, longitude, crop_irrigation, data_na_ndwi, data_na_rsm, field_address, field_description, last_sensed_day #################calculating duration of a phase################################################# def calculate_duration(all_dates, wet_phases, dry_phases, unpred_phases): durations = [] current_phase = None start_date = all_dates[0] for date in all_dates: if date in wet_phases: if current_phase != 'wet': if current_phase is not None: durations.append((current_phase, start_date, date)) start_date = date current_phase = 'wet' elif date in dry_phases: if current_phase != 'dry': if current_phase is not None: durations.append((current_phase, start_date, date)) start_date = date current_phase = 'dry' elif date in unpred_phases: if current_phase != 'unpred': if current_phase is not None: durations.append((current_phase, start_date, date)) start_date = date current_phase = 'unpred' if current_phase is not None: durations.append((current_phase, start_date, all_dates[-1])) return durations ####################################Function for assigning phases########################################## def phase_labels(field_info): ndwi_data = np.array(list(field_info["cropIrrigation"].values()))[1:-1] data_na_ndwi_ori = field_info["data_na_ndwi"] data_na_ndwi = [] data_na_rsm = [] for date in data_na_ndwi_ori: data_na_ndwi_i = date[4:] data_na_ndwi.append(data_na_ndwi_i) data_na_rsm_ori = field_info["data_na_rsm"] for date in data_na_rsm_ori: data_na_rsm_i = date[4:] data_na_rsm.append(data_na_rsm_i) farm_labels = [] for j in range(1, len(ndwi_data)): # Calculate the slope between consecutive NDWI values # Assign 1 for increasing slope (wet phase), 0 for dry phase and 2 for unpred phase slope = ndwi_data[j] - ndwi_data[j-1] if (slope >= 0 and slope <= 0.03) or (slope < 0 and slope >= -0.03): farm_labels.append(2) elif slope > 0.03: farm_labels.append(1) elif slope < -0.03: farm_labels.append(0) dates = list(field_info["cropIrrigation"].keys())[1:-1] modified_dates = [] for date in dates: modified_date = date[4:] modified_dates.append(modified_date) wet_phases = [modified_dates[i] for i in range(len(farm_labels)) if farm_labels[i] == 1] dry_phases = [modified_dates[i] for i in range(len(farm_labels)) if farm_labels[i] == 0] unpred_phases = [modified_dates[i] for i in range(len(farm_labels)) if farm_labels[i] == 2] durations = calculate_duration(modified_dates, wet_phases, dry_phases, unpred_phases) modified_durations = [] #If dry phase is between 10-20 days than prolonged_dry, if dry_phase greater or equal than 20 days then harvest is predicted for phase, start, end in durations: start_days = mmdd_to_days(start) end_days = mmdd_to_days(end) duration_days = end_days - start_days if phase == "dry": if duration_days > 10 and duration_days < 20: # Change the stored value of phase in duration phase = "prolonged_dry" elif duration_days >= 20: phase = "harvest_predicted" # Append the modified tuple along with duration days to the new list modified_durations.append((phase, start, end, duration_days)) return ndwi_data, dates, farm_labels, wet_phases, dry_phases, unpred_phases, modified_durations, data_na_ndwi, data_na_rsm ########################Function to plot AWD individually assuming field_address is available, instead of field_description. Chang defaults as per requirement. ############# def plot_AWD_individual_wo_fd_mp(fieldID, fieldAddress, ndwi_data, dates, modified_durations, data_na_ndwi, data_na_rsm, ax): #plt.figure(figsize=(11, 1.5)) plt.style.use("default") plt.rcParams.update({'font.size': 9, 'axes.linewidth': 1}) plt.rcParams['axes.spines.left'] = True plt.rcParams['axes.spines.right'] = False plt.rcParams['axes.spines.top'] = False plt.rcParams['axes.spines.bottom'] = True plt.rcParams.update({'figure.max_open_warning': 0}) modified_dates = [] mod_dates_spell = [] for date in dates: modified_date = date[4:] ax.plot([modified_date, modified_date], [1.2, 1.2], color='green', linewidth=0.1) modified_dates.append(modified_date) date_obj = datetime.strptime(modified_date, "%m%d") mod_date = date_obj.strftime("%d %b") # "fieldAddress": fieldAddress, mod_dates_spell.append(mod_date) for phase, start, end, duration_days in modified_durations: color = '#1982c4' if phase == 'wet' else '#ffca3a' if phase == 'dry' else '#ff595e' if phase == 'prolonged_dry' else '#B8B8B8' if phase == "unpred" else '#8ac926' if phase == "harvest_predicted" else "black" ax.plot([start, end], [1.2, 1.2], color=color, linewidth=3.5) ax.plot(modified_dates, ndwi_data, color='#B3C7D6FF', label='NDWI') common_na = [x for x in data_na_ndwi if x in data_na_rsm] #print(common_na) for x in modified_dates: #print("modified_dates", x) if x in common_na: #print("common_na", x) ax.axvline(x, color='black', linestyle='--', linewidth=1.2) ax.set_xticks(range(len(dates))) ax.set_xticklabels(mod_dates_spell, rotation=90) ax.grid(axis='x', linestyle='--') ax.set_ylim(0, 1.3) fieldAddress = fieldAddress.replace('\t',' ') fieldAddress = re.sub(r'\s+', ' ', fieldAddress) fieldDescription = fieldAddress.replace('\t',' ') fieldDescription = re.sub(r'\s+', ' ', fieldDescription) title = f"Field Description: {fieldAddress}" #title = "Farm Details Redacted" threshold = 50 field_description_avail = False field_address_avail = True if len(title) > 70: # Split the title into multiple lines with a maximum number of characters per line words = title.split() lines = [] line = "" for word in words: if len(line + word) > 70: lines.append(line) line = "" line += word + " " lines.append(line) # Join the lines with a newline character title_1 = "\n".join(lines) title = title_1 + f"Field ID: {fieldID}" elif len(title + fieldID) > threshold: if field_description_avail == True: title = f"Field Description: {fieldDescription}\nField ID: {fieldID}" elif field_address_avail == True: title = f"Field Description: {fieldAddress}\nField ID: {fieldID}" else: if field_description_avail == True: title = f"Field Description: {fieldDescription} Field ID: {fieldID}" elif field_address_avail == True: title = f"Field Description: {fieldAddress} Field ID: {fieldID}" title = f"Field ID: {fieldID}" ax.set_title(f"{title}", fontsize=9) #########################Function to plot AWD profiles in pdf report############################################# import matplotlib.pyplot as plt from matplotlib.backends.backend_pdf import PdfPages from datetime import datetime import matplotlib.image as mpimg def plot_multiple_AWD(results, legend_image_path, output_pdf_path): pdf_pages = PdfPages(output_pdf_path) total_plots = len(results) plots_per_page = 10 num_pages = (total_plots + plots_per_page - 1) // plots_per_page # Ceiling division fieldIDs = list(results.keys()) successful_fieldIDs = fieldIDs # Assuming successful_fieldIDs is a list of fieldIDs to be processed for page in range(num_pages): fig, axs = plt.subplots(5, 2, figsize=(11.7, 8.3)) # A4 landscape size in inches axs = axs.flatten() for i in range(plots_per_page): idx = page * plots_per_page + i if idx+1 > total_plots: break #print(idx) fieldID = successful_fieldIDs[idx] print(fieldID) field_data = results[fieldID] ax = axs[i] ndwiData, Dates, farmLabels, wetPhases, dryPhases, unpredPhases, modifiedDurations, data_na_ndwi, data_na_rsm = phase_labels(results[fieldID]) fieldAddress = field_data["fieldAddress"] # fieldAddress = "Farm Details Redacted" plot_AWD_individual_wo_fd_mp(fieldID, fieldAddress, ndwiData, Dates, modifiedDurations, data_na_ndwi, data_na_rsm, ax) # Remove empty subplots # Remove empty subplots if i + 1 < plots_per_page: # Check if there are more plots and if it's not the last plot for j in range(i, plots_per_page): axs[j].remove() # Add legend image as header legend_ax = fig.add_axes([0.1, 0.9, 0.8, 0.05], anchor='N') legend_ax.axis('off') legend_img = mpimg.imread(legend_image_path) # Define the width of the page page_width = 11.3 # Calculate the width of the legend image legend_width = page_width / 2 # Calculate the left margin for centering the legend left_margin = (page_width - legend_width) / 2 # Define the position and size of the legend image in the header legend_ax = fig.add_axes([left_margin / page_width, 0.9, legend_width / page_width, 0.05], anchor='N') legend_ax.axis('off') # Load the legend image legend_img = mpimg.imread(legend_image_path) fig.set_dpi(300) # Adjust DPI as needed # Display the legend image in the header legend_ax.imshow(legend_img) # Add footer footer_ax = fig.add_axes([0.1, 0.02, 0.8, 0.05], anchor='S') footer_ax.axis('off') #footer_text = "Generated on: " + datetime.now().strftime("%Y-%m-%d) #footer_ax.text(0.5, 0.5, footer_text, ha='center', va='center', fontsize=10) # Adjust subplots to leave margin on both sides and space for header and footer fig.subplots_adjust(left=0.1, right=0.9, top=0.85, bottom=0.15, hspace=2.5, wspace=0.14) # Adjusted for better spacing pdf_pages.savefig(fig) plt.close(fig) pdf_pages.close() #################################################MAIN BODY############################################################ firebase_database_url = "https://farmbase-b2f7e-31c0c.firebaseio.com//" UID = "VKXzYbNY89YD7UePtUL5KbxLYTE3" crop = "paddy" path_fieldobj = "/PaidMonitoredFields/PMF/" # Define the start and end date range start_date = datetime.strptime('20240515', '%Y%m%d') end_date = datetime.strptime('20240705', '%Y%m%d') #start_date = datetime.strptime('20230715', '%Y%m%d') #end_date = datetime.strptime('20231115', '%Y%m%d') ref = db.reference(path_fieldobj).child(UID).get(False,True) fieldID_list = list(ref.keys()) # Import necessary module from datetime import datetime # Define an empty dictionary to store the results results = {} # List to store successful and error fieldIDs successful_fieldIDs = [] error_fieldIDs = [] # Counter for successful runs successful_runs = 0 # Iterate over the subset of fieldID_list for fieldID in fieldID_list: try: # Call the function get_fieldobj and save the values #print(fieldID) latitude, longitude, cropIrrigation, data_na_ndwi, data_na_rsm, fieldAddress, fieldDescription, last_sensed_day = get_fieldobj(path_fieldobj, UID, fieldID, start_date, end_date) # Store the values in the results dictionary results[fieldID] = { "latitude": latitude, "longitude": longitude, "cropIrrigation": cropIrrigation, "data_na_ndwi": data_na_ndwi, "data_na_rsm": data_na_rsm, "fieldAddress": fieldAddress, "fieldDescritpion": fieldDescription, "last_sensed_day": last_sensed_day } # Increment the counter and add to successful list successful_runs += 1 successful_fieldIDs.append(fieldID) except Exception as e: # Report the fieldID that caused an error print(f"Error processing fieldID {fieldID}: {e}") print(traceback.format_exc()) error_fieldIDs.append(fieldID) # Output the number of successful runs and the successful fieldIDs #print(f"Number of successful runs: {successful_runs}") #print("Successful fieldIDs:", successful_fieldIDs) print("Error fieldIDs:", error_fieldIDs) #############################################GENERATE PDF REPORT####################################################### ##########legend imgae is required here######################## legend_image_path = 'legend_v4.png' # Replace with the path to your legend image output_pdf_path = 'AWD_season2_plots.pdf' plot_multiple_AWD(results, legend_image_path, output_pdf_path)