504 lines
16 KiB
Python
Executable File

import psutil
import multiprocessing
import re
import pandas as pd
import numpy as np
from typing import List, Dict
import os
import joblib
def get_optimal_workers() -> int:
"""
Calcola il numero ottimale di workers basandosi sulle risorse del sistema.
Returns
-------
int
Numero ottimale di workers
"""
# Ottiene il numero di CPU logiche (inclusi i thread virtuali)
cpu_count = multiprocessing.cpu_count()
# Ottiene la memoria totale e disponibile in GB
memory = psutil.virtual_memory()
total_memory_gb = memory.total / (1024 ** 3)
available_memory_gb = memory.available / (1024 ** 3)
# Stima della memoria necessaria per worker (esempio: 2GB per worker)
memory_per_worker_gb = 2
# Calcola il numero massimo di workers basato sulla memoria disponibile
max_workers_by_memory = int(available_memory_gb / memory_per_worker_gb)
# Usa il minimo tra:
# - numero di CPU disponibili - 1 (lascia una CPU libera per il sistema)
# - numero massimo di workers basato sulla memoria
# - un limite massimo arbitrario (es. 32) per evitare troppo overhead
optimal_workers = min(
cpu_count - 1,
max_workers_by_memory,
32 # limite massimo arbitrario
)
# Assicura almeno 1 worker
return max(1, optimal_workers)
def clean_column_name(name: str) -> str:
"""
Rimuove caratteri speciali e spazi, converte in snake_case e abbrevia.
Parameters
----------
name : str
Nome della colonna da pulire
Returns
-------
str
Nome della colonna pulito
"""
# Rimuove caratteri speciali
name = re.sub(r'[^a-zA-Z0-9\s]', '', name)
# Converte in snake_case
name = name.lower().replace(' ', '_')
# Abbreviazioni comuni
abbreviations = {
'production': 'prod',
'percentage': 'pct',
'hectare': 'ha',
'tonnes': 't',
'litres': 'l',
'minimum': 'min',
'maximum': 'max',
'average': 'avg'
}
for full, abbr in abbreviations.items():
name = name.replace(full, abbr)
return name
def clean_column_names(df: pd.DataFrame) -> List[str]:
"""
Pulisce tutti i nomi delle colonne in un DataFrame.
Parameters
----------
df : pd.DataFrame
DataFrame con le colonne da pulire
Returns
-------
list
Lista dei nuovi nomi delle colonne puliti
"""
new_columns = []
for col in df.columns:
# Usa regex per separare le varietà
varieties = re.findall(r'([a-z]+)_([a-z_]+)', col)
if varieties:
new_columns.append(f"{varieties[0][0]}_{varieties[0][1]}")
else:
new_columns.append(col)
return new_columns
def to_camel_case(text: str) -> str:
"""
Converte una stringa in camelCase.
Gestisce stringhe con spazi, trattini o underscore.
Se è una sola parola, la restituisce in minuscolo.
Parameters
----------
text : str
Testo da convertire
Returns
-------
str
Testo convertito in camelCase
"""
# Rimuove eventuali spazi iniziali e finali
text = text.strip()
# Se la stringa è vuota, ritorna stringa vuota
if not text:
return ""
# Sostituisce trattini e underscore con spazi
text = text.replace('-', ' ').replace('_', ' ')
# Divide la stringa in parole
words = text.split()
# Se non ci sono parole dopo lo split, ritorna stringa vuota
if not words:
return ""
# Se c'è una sola parola, ritorna in minuscolo
if len(words) == 1:
return words[0].lower()
# Altrimenti procedi con il camelCase
result = words[0].lower()
for word in words[1:]:
result += word.capitalize()
return result
def get_full_data(simulated_data: pd.DataFrame,
olive_varieties: pd.DataFrame) -> pd.DataFrame:
"""
Ottiene il dataset completo combinando dati simulati e varietà di olive.
Parameters
----------
simulated_data : pd.DataFrame
DataFrame con i dati simulati
olive_varieties : pd.DataFrame
DataFrame con le informazioni sulle varietà
Returns
-------
pd.DataFrame
DataFrame completo con tutte le informazioni
"""
# Colonne base rilevanti
relevant_columns = [
'year', 'temp_mean', 'precip_sum', 'solar_energy_sum',
'ha', 'zone', 'olive_prod'
]
# Aggiungi colonne specifiche per varietà
all_varieties = olive_varieties['Varietà di Olive'].unique()
varieties = [clean_column_name(variety) for variety in all_varieties]
for variety in varieties:
relevant_columns.extend([
f'{variety}_olive_prod',
f'{variety}_tech'
])
# Seleziona solo le colonne rilevanti
full_data = simulated_data[relevant_columns].copy()
# Aggiungi feature calcolate
for variety in varieties:
# Calcola efficienza produttiva
if f'{variety}_olive_prod' in full_data.columns:
full_data[f'{variety}_efficiency'] = (
full_data[f'{variety}_olive_prod'] / full_data['ha']
)
# Aggiungi indicatori tecnici
if f'{variety}_tech' in full_data.columns:
technique_dummies = pd.get_dummies(
full_data[f'{variety}_tech'],
prefix=f'{variety}_technique'
)
full_data = pd.concat([full_data, technique_dummies], axis=1)
# Aggiungi feature temporali
full_data['month'] = 1 # Assumiamo dati annuali
full_data['day'] = 1 # Assumiamo dati annuali
# Calcola medie mobili
for col in ['temp_mean', 'precip_sum', 'solar_energy_sum']:
full_data[f'{col}_ma3'] = full_data[col].rolling(window=3, min_periods=1).mean()
full_data[f'{col}_ma5'] = full_data[col].rolling(window=5, min_periods=1).mean()
return full_data
def prepare_static_features_multiple(varieties_info: List[Dict],
percentages: List[float],
hectares: float,
all_varieties: List[str]) -> np.ndarray:
"""
Prepara le feature statiche per multiple varietà.
Parameters
----------
varieties_info : List[Dict]
Lista di dizionari contenenti le informazioni sulle varietà selezionate
percentages : List[float]
Lista delle percentuali corrispondenti a ciascuna varietà selezionata
hectares : float
Numero di ettari totali
all_varieties : List[str]
Lista di tutte le possibili varietà nel dataset originale
Returns
-------
np.ndarray
Array numpy contenente tutte le feature statiche
"""
# Inizializza un dizionario per tutte le varietà possibili
variety_data = {variety.lower(): {
'pct': 0,
'prod_t_ha': 0,
'tech': '',
'oil_prod_t_ha': 0,
'oil_prod_l_ha': 0,
'min_yield_pct': 0,
'max_yield_pct': 0,
'min_oil_prod_l_ha': 0,
'max_oil_prod_l_ha': 0,
'avg_oil_prod_l_ha': 0,
'l_per_t': 0,
'min_l_per_t': 0,
'max_l_per_t': 0,
'avg_l_per_t': 0,
'water_need_spring': 0,
'water_need_summer': 0,
'water_need_autumn': 0,
'water_need_winter': 0,
'annual_water_need': 0,
'optimal_temp': 0,
'drought_resistance': 0
} for variety in all_varieties}
# Aggiorna i dati per le varietà selezionate
for variety_info, percentage in zip(varieties_info, percentages):
variety_name = clean_column_name(variety_info['variet_di_olive']).lower()
technique = clean_column_name(variety_info['tecnica_di_coltivazione']).lower()
if variety_name not in variety_data:
print(f"Attenzione: La varietà '{variety_name}' non è presente nella lista delle varietà conosciute.")
continue
variety_data[variety_name].update({
'pct': percentage / 100,
'prod_t_ha': variety_info['produzione_tonnellateettaro'],
'tech': technique,
'oil_prod_t_ha': variety_info['produzione_olio_tonnellateettaro'],
'oil_prod_l_ha': variety_info['produzione_olio_litriettaro'],
'min_yield_pct': variety_info['min__resa'],
'max_yield_pct': variety_info['max__resa'],
'min_oil_prod_l_ha': variety_info['min_produzione_olio_litriettaro'],
'max_oil_prod_l_ha': variety_info['max_produzione_olio_litriettaro'],
'avg_oil_prod_l_ha': variety_info['media_produzione_olio_litriettaro'],
'l_per_t': variety_info['litri_per_tonnellata'],
'min_l_per_t': variety_info['min_litri_per_tonnellata'],
'max_l_per_t': variety_info['max_litri_per_tonnellata'],
'avg_l_per_t': variety_info['media_litri_per_tonnellata'],
'water_need_spring': variety_info['fabbisogno_acqua_primavera_mettaro'],
'water_need_summer': variety_info['fabbisogno_acqua_estate_mettaro'],
'water_need_autumn': variety_info['fabbisogno_acqua_autunno_mettaro'],
'water_need_winter': variety_info['fabbisogno_acqua_inverno_mettaro'],
'annual_water_need': variety_info['fabbisogno_idrico_annuale_mettaro'],
'optimal_temp': variety_info['temperatura_ottimale'],
'drought_resistance': variety_info['resistenza_alla_siccit']
})
# Crea il vettore delle feature
static_features = [hectares]
# Lista delle feature per ogni varietà
variety_features = ['pct', 'prod_t_ha', 'oil_prod_t_ha', 'oil_prod_l_ha',
'min_yield_pct', 'max_yield_pct', 'min_oil_prod_l_ha',
'max_oil_prod_l_ha', 'avg_oil_prod_l_ha', 'l_per_t',
'min_l_per_t', 'max_l_per_t', 'avg_l_per_t',
'water_need_spring', 'water_need_summer', 'water_need_autumn',
'water_need_winter', 'annual_water_need', 'optimal_temp',
'drought_resistance']
# Appiattisci i dati delle varietà
for variety in all_varieties:
variety_lower = variety.lower()
# Feature esistenti
for feature in variety_features:
static_features.append(variety_data[variety_lower][feature])
# Feature binarie per le tecniche
for technique in ['tradizionale', 'intensiva', 'superintensiva']:
static_features.append(1 if variety_data[variety_lower]['tech'] == technique else 0)
return np.array(static_features).reshape(1, -1)
def get_feature_names(all_varieties: List[str]) -> List[str]:
"""
Genera i nomi delle feature nell'ordine corretto.
Parameters
----------
all_varieties : List[str]
Lista di tutte le varietà possibili
Returns
-------
List[str]
Lista dei nomi delle feature
"""
feature_names = ['hectares']
variety_features = ['pct', 'prod_t_ha', 'oil_prod_t_ha', 'oil_prod_l_ha',
'min_yield_pct', 'max_yield_pct', 'min_oil_prod_l_ha',
'max_oil_prod_l_ha', 'avg_oil_prod_l_ha', 'l_per_t',
'min_l_per_t', 'max_l_per_t', 'avg_l_per_t']
techniques = ['tradizionale', 'intensiva', 'superintensiva']
for variety in all_varieties:
for feature in variety_features:
feature_names.append(f"{variety}_{feature}")
for technique in techniques:
feature_names.append(f"{variety}_tech_{technique}")
return feature_names
def add_controlled_variation(base_value: float, max_variation_pct: float = 0.20) -> float:
"""
Aggiunge una variazione controllata a un valore base.
Parameters
----------
base_value : float
Valore base da modificare
max_variation_pct : float
Percentuale massima di variazione (default 20%)
Returns
-------
float
Valore con variazione applicata
"""
variation = np.random.uniform(-max_variation_pct, max_variation_pct)
return base_value * (1 + variation)
def get_growth_phase(month):
if month in [12, 1, 2]:
return 'dormancy'
elif month in [3, 4, 5]:
return 'flowering'
elif month in [6, 7, 8]:
return 'fruit_set'
else:
return 'ripening'
def calculate_weather_effect(row, optimal_temp):
# Effetti base
temp_effect = -0.1 * (row['temp_mean'] - optimal_temp) ** 2
rain_effect = -0.05 * (row['precip_sum'] - 600) ** 2 / 10000
sun_effect = 0.1 * row['solarenergy_sum'] / 1000
# Fattori di scala basati sulla fase di crescita
if row['growth_phase'] == 'dormancy':
temp_scale = 0.5
rain_scale = 0.2
sun_scale = 0.1
elif row['growth_phase'] == 'flowering':
temp_scale = 2.0
rain_scale = 1.5
sun_scale = 1.0
elif row['growth_phase'] == 'fruit_set':
temp_scale = 1.5
rain_scale = 1.0
sun_scale = 0.8
else: # ripening
temp_scale = 1.0
rain_scale = 0.5
sun_scale = 1.2
# Calcolo dell'effetto combinato
combined_effect = (
temp_scale * temp_effect +
rain_scale * rain_effect +
sun_scale * sun_effect
)
# Aggiustamenti specifici per fase
if row['growth_phase'] == 'flowering':
combined_effect -= 0.5 * max(0, row['precip_sum'] - 50) # Penalità per pioggia eccessiva durante la fioritura
elif row['growth_phase'] == 'fruit_set':
combined_effect += 0.3 * max(0, row['temp_mean'] - (optimal_temp + 5)) # Bonus per temperature più alte durante la formazione dei frutti
return combined_effect
def calculate_water_need(weather_data, base_need, optimal_temp):
# Calcola il fabbisogno idrico basato su temperatura e precipitazioni
temp_factor = 1 + 0.05 * (weather_data['temp_mean'] - optimal_temp) # Aumenta del 5% per ogni grado sopra l'ottimale
rain_factor = 1 - 0.001 * weather_data['precip_sum'] # Diminuisce leggermente con l'aumentare delle precipitazioni
return base_need * temp_factor * rain_factor
def create_technique_mapping(olive_varieties, mapping_path='./sources/technique_mapping.joblib'):
# Estrai tutte le tecniche uniche dal dataset e convertile in lowercase
all_techniques = olive_varieties['Tecnica di Coltivazione'].str.lower().unique()
# Crea il mapping partendo da 1
technique_mapping = {tech: i + 1 for i, tech in enumerate(sorted(all_techniques))}
# Salva il mapping
os.makedirs(os.path.dirname(mapping_path), exist_ok=True)
joblib.dump(technique_mapping, mapping_path)
return technique_mapping
def encode_techniques(df, mapping_path='./sources/technique_mapping.joblib'):
if not os.path.exists(mapping_path):
raise FileNotFoundError(f"Mapping not found at {mapping_path}. Run create_technique_mapping first.")
technique_mapping = joblib.load(mapping_path)
# Trova tutte le colonne delle tecniche
tech_columns = [col for col in df.columns if col.endswith('_tech')]
# Applica il mapping a tutte le colonne delle tecniche
for col in tech_columns:
df[col] = df[col].str.lower().map(technique_mapping).fillna(0).astype(int)
return df
def decode_techniques(df, mapping_path='./sources/technique_mapping.joblib'):
if not os.path.exists(mapping_path):
raise FileNotFoundError(f"Mapping not found at {mapping_path}")
technique_mapping = joblib.load(mapping_path)
reverse_mapping = {v: k for k, v in technique_mapping.items()}
reverse_mapping[0] = '' # Aggiungi un mapping per 0 a stringa vuota
# Trova tutte le colonne delle tecniche
tech_columns = [col for col in df.columns if col.endswith('_tech')]
# Applica il reverse mapping a tutte le colonne delle tecniche
for col in tech_columns:
df[col] = df[col].map(reverse_mapping)
return df
def decode_single_technique(technique_value, mapping_path='./sources/technique_mapping.joblib'):
if not os.path.exists(mapping_path):
raise FileNotFoundError(f"Mapping not found at {mapping_path}")
technique_mapping = joblib.load(mapping_path)
reverse_mapping = {v: k for k, v in technique_mapping.items()}
reverse_mapping[0] = ''
return reverse_mapping.get(technique_value, '')
def preprocess_weather_data(weather_df):
# Calcola statistiche mensili per ogni anno
monthly_weather = weather_df.groupby(['year', 'month']).agg({
'temp': ['mean', 'min', 'max'],
'humidity': 'mean',
'precip': 'sum',
'windspeed': 'mean',
'cloudcover': 'mean',
'solarradiation': 'sum',
'solarenergy': 'sum',
'uvindex': 'max'
}).reset_index()
monthly_weather.columns = ['year', 'month'] + [f'{col[0]}_{col[1]}' for col in monthly_weather.columns[2:]]
return monthly_weather