import psutil import multiprocessing import re import pandas as pd import numpy as np from typing import List, Dict import os import joblib def get_optimal_workers() -> int: """ Calcola il numero ottimale di workers basandosi sulle risorse del sistema. Returns ------- int Numero ottimale di workers """ # Ottiene il numero di CPU logiche (inclusi i thread virtuali) cpu_count = multiprocessing.cpu_count() # Ottiene la memoria totale e disponibile in GB memory = psutil.virtual_memory() total_memory_gb = memory.total / (1024 ** 3) available_memory_gb = memory.available / (1024 ** 3) # Stima della memoria necessaria per worker (esempio: 2GB per worker) memory_per_worker_gb = 2 # Calcola il numero massimo di workers basato sulla memoria disponibile max_workers_by_memory = int(available_memory_gb / memory_per_worker_gb) # Usa il minimo tra: # - numero di CPU disponibili - 1 (lascia una CPU libera per il sistema) # - numero massimo di workers basato sulla memoria # - un limite massimo arbitrario (es. 32) per evitare troppo overhead optimal_workers = min( cpu_count - 1, max_workers_by_memory, 32 # limite massimo arbitrario ) # Assicura almeno 1 worker return max(1, optimal_workers) def clean_column_name(name: str) -> str: """ Rimuove caratteri speciali e spazi, converte in snake_case e abbrevia. Parameters ---------- name : str Nome della colonna da pulire Returns ------- str Nome della colonna pulito """ # Rimuove caratteri speciali name = re.sub(r'[^a-zA-Z0-9\s]', '', name) # Converte in snake_case name = name.lower().replace(' ', '_') # Abbreviazioni comuni abbreviations = { 'production': 'prod', 'percentage': 'pct', 'hectare': 'ha', 'tonnes': 't', 'litres': 'l', 'minimum': 'min', 'maximum': 'max', 'average': 'avg' } for full, abbr in abbreviations.items(): name = name.replace(full, abbr) return name def clean_column_names(df: pd.DataFrame) -> List[str]: """ Pulisce tutti i nomi delle colonne in un DataFrame. Parameters ---------- df : pd.DataFrame DataFrame con le colonne da pulire Returns ------- list Lista dei nuovi nomi delle colonne puliti """ new_columns = [] for col in df.columns: # Usa regex per separare le varietà varieties = re.findall(r'([a-z]+)_([a-z_]+)', col) if varieties: new_columns.append(f"{varieties[0][0]}_{varieties[0][1]}") else: new_columns.append(col) return new_columns def to_camel_case(text: str) -> str: """ Converte una stringa in camelCase. Gestisce stringhe con spazi, trattini o underscore. Se è una sola parola, la restituisce in minuscolo. Parameters ---------- text : str Testo da convertire Returns ------- str Testo convertito in camelCase """ # Rimuove eventuali spazi iniziali e finali text = text.strip() # Se la stringa è vuota, ritorna stringa vuota if not text: return "" # Sostituisce trattini e underscore con spazi text = text.replace('-', ' ').replace('_', ' ') # Divide la stringa in parole words = text.split() # Se non ci sono parole dopo lo split, ritorna stringa vuota if not words: return "" # Se c'è una sola parola, ritorna in minuscolo if len(words) == 1: return words[0].lower() # Altrimenti procedi con il camelCase result = words[0].lower() for word in words[1:]: result += word.capitalize() return result def get_full_data(simulated_data: pd.DataFrame, olive_varieties: pd.DataFrame) -> pd.DataFrame: """ Ottiene il dataset completo combinando dati simulati e varietà di olive. Parameters ---------- simulated_data : pd.DataFrame DataFrame con i dati simulati olive_varieties : pd.DataFrame DataFrame con le informazioni sulle varietà Returns ------- pd.DataFrame DataFrame completo con tutte le informazioni """ # Colonne base rilevanti relevant_columns = [ 'year', 'temp_mean', 'precip_sum', 'solar_energy_sum', 'ha', 'zone', 'olive_prod' ] # Aggiungi colonne specifiche per varietà all_varieties = olive_varieties['Varietà di Olive'].unique() varieties = [clean_column_name(variety) for variety in all_varieties] for variety in varieties: relevant_columns.extend([ f'{variety}_olive_prod', f'{variety}_tech' ]) # Seleziona solo le colonne rilevanti full_data = simulated_data[relevant_columns].copy() # Aggiungi feature calcolate for variety in varieties: # Calcola efficienza produttiva if f'{variety}_olive_prod' in full_data.columns: full_data[f'{variety}_efficiency'] = ( full_data[f'{variety}_olive_prod'] / full_data['ha'] ) # Aggiungi indicatori tecnici if f'{variety}_tech' in full_data.columns: technique_dummies = pd.get_dummies( full_data[f'{variety}_tech'], prefix=f'{variety}_technique' ) full_data = pd.concat([full_data, technique_dummies], axis=1) # Aggiungi feature temporali full_data['month'] = 1 # Assumiamo dati annuali full_data['day'] = 1 # Assumiamo dati annuali # Calcola medie mobili for col in ['temp_mean', 'precip_sum', 'solar_energy_sum']: full_data[f'{col}_ma3'] = full_data[col].rolling(window=3, min_periods=1).mean() full_data[f'{col}_ma5'] = full_data[col].rolling(window=5, min_periods=1).mean() return full_data def prepare_static_features_multiple(varieties_info: List[Dict], percentages: List[float], hectares: float, all_varieties: List[str]) -> np.ndarray: """ Prepara le feature statiche per multiple varietà. Parameters ---------- varieties_info : List[Dict] Lista di dizionari contenenti le informazioni sulle varietà selezionate percentages : List[float] Lista delle percentuali corrispondenti a ciascuna varietà selezionata hectares : float Numero di ettari totali all_varieties : List[str] Lista di tutte le possibili varietà nel dataset originale Returns ------- np.ndarray Array numpy contenente tutte le feature statiche """ # Inizializza un dizionario per tutte le varietà possibili variety_data = {variety.lower(): { 'pct': 0, 'prod_t_ha': 0, 'tech': '', 'oil_prod_t_ha': 0, 'oil_prod_l_ha': 0, 'min_yield_pct': 0, 'max_yield_pct': 0, 'min_oil_prod_l_ha': 0, 'max_oil_prod_l_ha': 0, 'avg_oil_prod_l_ha': 0, 'l_per_t': 0, 'min_l_per_t': 0, 'max_l_per_t': 0, 'avg_l_per_t': 0, 'water_need_spring': 0, 'water_need_summer': 0, 'water_need_autumn': 0, 'water_need_winter': 0, 'annual_water_need': 0, 'optimal_temp': 0, 'drought_resistance': 0 } for variety in all_varieties} # Aggiorna i dati per le varietà selezionate for variety_info, percentage in zip(varieties_info, percentages): variety_name = clean_column_name(variety_info['variet_di_olive']).lower() technique = clean_column_name(variety_info['tecnica_di_coltivazione']).lower() if variety_name not in variety_data: print(f"Attenzione: La varietà '{variety_name}' non è presente nella lista delle varietà conosciute.") continue variety_data[variety_name].update({ 'pct': percentage / 100, 'prod_t_ha': variety_info['produzione_tonnellateettaro'], 'tech': technique, 'oil_prod_t_ha': variety_info['produzione_olio_tonnellateettaro'], 'oil_prod_l_ha': variety_info['produzione_olio_litriettaro'], 'min_yield_pct': variety_info['min__resa'], 'max_yield_pct': variety_info['max__resa'], 'min_oil_prod_l_ha': variety_info['min_produzione_olio_litriettaro'], 'max_oil_prod_l_ha': variety_info['max_produzione_olio_litriettaro'], 'avg_oil_prod_l_ha': variety_info['media_produzione_olio_litriettaro'], 'l_per_t': variety_info['litri_per_tonnellata'], 'min_l_per_t': variety_info['min_litri_per_tonnellata'], 'max_l_per_t': variety_info['max_litri_per_tonnellata'], 'avg_l_per_t': variety_info['media_litri_per_tonnellata'], 'water_need_spring': variety_info['fabbisogno_acqua_primavera_mettaro'], 'water_need_summer': variety_info['fabbisogno_acqua_estate_mettaro'], 'water_need_autumn': variety_info['fabbisogno_acqua_autunno_mettaro'], 'water_need_winter': variety_info['fabbisogno_acqua_inverno_mettaro'], 'annual_water_need': variety_info['fabbisogno_idrico_annuale_mettaro'], 'optimal_temp': variety_info['temperatura_ottimale'], 'drought_resistance': variety_info['resistenza_alla_siccit'] }) # Crea il vettore delle feature static_features = [hectares] # Lista delle feature per ogni varietà variety_features = ['pct', 'prod_t_ha', 'oil_prod_t_ha', 'oil_prod_l_ha', 'min_yield_pct', 'max_yield_pct', 'min_oil_prod_l_ha', 'max_oil_prod_l_ha', 'avg_oil_prod_l_ha', 'l_per_t', 'min_l_per_t', 'max_l_per_t', 'avg_l_per_t', 'water_need_spring', 'water_need_summer', 'water_need_autumn', 'water_need_winter', 'annual_water_need', 'optimal_temp', 'drought_resistance'] # Appiattisci i dati delle varietà for variety in all_varieties: variety_lower = variety.lower() # Feature esistenti for feature in variety_features: static_features.append(variety_data[variety_lower][feature]) # Feature binarie per le tecniche for technique in ['tradizionale', 'intensiva', 'superintensiva']: static_features.append(1 if variety_data[variety_lower]['tech'] == technique else 0) return np.array(static_features).reshape(1, -1) def get_feature_names(all_varieties: List[str]) -> List[str]: """ Genera i nomi delle feature nell'ordine corretto. Parameters ---------- all_varieties : List[str] Lista di tutte le varietà possibili Returns ------- List[str] Lista dei nomi delle feature """ feature_names = ['hectares'] variety_features = ['pct', 'prod_t_ha', 'oil_prod_t_ha', 'oil_prod_l_ha', 'min_yield_pct', 'max_yield_pct', 'min_oil_prod_l_ha', 'max_oil_prod_l_ha', 'avg_oil_prod_l_ha', 'l_per_t', 'min_l_per_t', 'max_l_per_t', 'avg_l_per_t'] techniques = ['tradizionale', 'intensiva', 'superintensiva'] for variety in all_varieties: for feature in variety_features: feature_names.append(f"{variety}_{feature}") for technique in techniques: feature_names.append(f"{variety}_tech_{technique}") return feature_names def add_controlled_variation(base_value: float, max_variation_pct: float = 0.20) -> float: """ Aggiunge una variazione controllata a un valore base. Parameters ---------- base_value : float Valore base da modificare max_variation_pct : float Percentuale massima di variazione (default 20%) Returns ------- float Valore con variazione applicata """ variation = np.random.uniform(-max_variation_pct, max_variation_pct) return base_value * (1 + variation) def get_growth_phase(month): if month in [12, 1, 2]: return 'dormancy' elif month in [3, 4, 5]: return 'flowering' elif month in [6, 7, 8]: return 'fruit_set' else: return 'ripening' def calculate_weather_effect(row, optimal_temp): # Effetti base temp_effect = -0.1 * (row['temp_mean'] - optimal_temp) ** 2 rain_effect = -0.05 * (row['precip_sum'] - 600) ** 2 / 10000 sun_effect = 0.1 * row['solarenergy_sum'] / 1000 # Fattori di scala basati sulla fase di crescita if row['growth_phase'] == 'dormancy': temp_scale = 0.5 rain_scale = 0.2 sun_scale = 0.1 elif row['growth_phase'] == 'flowering': temp_scale = 2.0 rain_scale = 1.5 sun_scale = 1.0 elif row['growth_phase'] == 'fruit_set': temp_scale = 1.5 rain_scale = 1.0 sun_scale = 0.8 else: # ripening temp_scale = 1.0 rain_scale = 0.5 sun_scale = 1.2 # Calcolo dell'effetto combinato combined_effect = ( temp_scale * temp_effect + rain_scale * rain_effect + sun_scale * sun_effect ) # Aggiustamenti specifici per fase if row['growth_phase'] == 'flowering': combined_effect -= 0.5 * max(0, row['precip_sum'] - 50) # Penalità per pioggia eccessiva durante la fioritura elif row['growth_phase'] == 'fruit_set': combined_effect += 0.3 * max(0, row['temp_mean'] - (optimal_temp + 5)) # Bonus per temperature più alte durante la formazione dei frutti return combined_effect def calculate_water_need(weather_data, base_need, optimal_temp): # Calcola il fabbisogno idrico basato su temperatura e precipitazioni temp_factor = 1 + 0.05 * (weather_data['temp_mean'] - optimal_temp) # Aumenta del 5% per ogni grado sopra l'ottimale rain_factor = 1 - 0.001 * weather_data['precip_sum'] # Diminuisce leggermente con l'aumentare delle precipitazioni return base_need * temp_factor * rain_factor def create_technique_mapping(olive_varieties, mapping_path='./sources/technique_mapping.joblib'): # Estrai tutte le tecniche uniche dal dataset e convertile in lowercase all_techniques = olive_varieties['Tecnica di Coltivazione'].str.lower().unique() # Crea il mapping partendo da 1 technique_mapping = {tech: i + 1 for i, tech in enumerate(sorted(all_techniques))} # Salva il mapping os.makedirs(os.path.dirname(mapping_path), exist_ok=True) joblib.dump(technique_mapping, mapping_path) return technique_mapping def encode_techniques(df, mapping_path='./sources/technique_mapping.joblib'): if not os.path.exists(mapping_path): raise FileNotFoundError(f"Mapping not found at {mapping_path}. Run create_technique_mapping first.") technique_mapping = joblib.load(mapping_path) # Trova tutte le colonne delle tecniche tech_columns = [col for col in df.columns if col.endswith('_tech')] # Applica il mapping a tutte le colonne delle tecniche for col in tech_columns: df[col] = df[col].str.lower().map(technique_mapping).fillna(0).astype(int) return df def decode_techniques(df, mapping_path='./sources/technique_mapping.joblib'): if not os.path.exists(mapping_path): raise FileNotFoundError(f"Mapping not found at {mapping_path}") technique_mapping = joblib.load(mapping_path) reverse_mapping = {v: k for k, v in technique_mapping.items()} reverse_mapping[0] = '' # Aggiungi un mapping per 0 a stringa vuota # Trova tutte le colonne delle tecniche tech_columns = [col for col in df.columns if col.endswith('_tech')] # Applica il reverse mapping a tutte le colonne delle tecniche for col in tech_columns: df[col] = df[col].map(reverse_mapping) return df def decode_single_technique(technique_value, mapping_path='./sources/technique_mapping.joblib'): if not os.path.exists(mapping_path): raise FileNotFoundError(f"Mapping not found at {mapping_path}") technique_mapping = joblib.load(mapping_path) reverse_mapping = {v: k for k, v in technique_mapping.items()} reverse_mapping[0] = '' return reverse_mapping.get(technique_value, '') def preprocess_weather_data(weather_df): # Calcola statistiche mensili per ogni anno monthly_weather = weather_df.groupby(['year', 'month']).agg({ 'temp': ['mean', 'min', 'max'], 'humidity': 'mean', 'precip': 'sum', 'windspeed': 'mean', 'cloudcover': 'mean', 'solarradiation': 'sum', 'solarenergy': 'sum', 'uvindex': 'max' }).reset_index() monthly_weather.columns = ['year', 'month'] + [f'{col[0]}_{col[1]}' for col in monthly_weather.columns[2:]] return monthly_weather