# ==============================================================================
# SISTEMA PREDITTIVO LOTTO PYTHON CON REINFORCEMENT LEARNING
# Autore: Modello Gemini di Google su richiesta di Tom :)
# Versione: 1.4 - Checkpointing, ripresa del training e massima robustezza <- effetto "lucertola"
#
# DESCRIZIONE:
# Script per l'analisi predittiva delle estrazioni del lotto utilizzando un
# approccio basato su Reinforcement Learning (RL) con auto-ottimizzazione.
# Salva i progressi dopo ogni episodio e può riprendere un training interrotto. <- tipo quando la lucertola perde solo.. la coda..
#
# DISCLAIMER:
# Il gioco del Lotto è un processo casuale. Questo script è un esercizio
# accademico e di programmazione AI e non garantisce alcuna vincita. Giocare responsabilmente.
# ==============================================================================
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, BatchNormalization
from tensorflow.keras.optimizers import Adam
from collections import deque
import random
import os
import logging
from tqdm import tqdm
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics.pairwise import cosine_similarity
# --- Sopprimi i messaggi informativi di TensorFlow ---
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
# --- Setup del Logging ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --- CONFIGURAZIONE GLOBALE ---
NUM_NUMBERS = 90
NUM_EXTRACTED = 5
STATE_WINDOW_SIZE = 50
# ==============================================================================
# ====> MODIFICA QUI LA RUOTA E I PARAMETRI DI TRAINING <====
# ==============================================================================
WHEEL_NAME_TO_ANALYZE = 'Firenze'
TOTAL_EPISODES = 100 # Numero totale di episodi di training
BATCH_SIZE = 128 # Dimensione del batch per il replay
# ==============================================================================
class LottoRLEnvironment:
def __init__(self, data_features, data_actual_draws):
self.features = data_features
self.actual_draws = data_actual_draws
self.n_steps = len(data_features)
self.current_step = STATE_WINDOW_SIZE
def reset(self):
self.current_step = STATE_WINDOW_SIZE
return self.features[self.current_step]
def step(self, action):
self.current_step += 1
done = self.current_step >= self.n_steps -1
predicted_number = action + 1
actual_next_draw = self.actual_draws[self.current_step]
reward = 1.0 if predicted_number in actual_next_draw else -0.1
next_state = self.features[self.current_step] if not done else None
return next_state, reward, done
class DQNAgent:
def __init__(self, state_size, action_size, params):
self.state_size = state_size
self.action_size = action_size
self.memory = deque(maxlen=20000)
self.gamma = params.get('gamma', 0.95)
self.epsilon = params.get('epsilon', 1.0)
self.epsilon_min = 0.01
self.epsilon_decay = params.get('epsilon_decay', 0.999)
self.learning_rate = params.get('learning_rate', 0.001)
self.model = self._build_model()
self.target_model = self._build_model()
self.update_target_model()
def _build_model(self):
model = Sequential([
Dense(128, input_dim=self.state_size, activation='relu'),
BatchNormalization(), Dropout(0.3),
Dense(128, activation='relu'),
BatchNormalization(), Dropout(0.3),
Dense(self.action_size, activation='linear')
])
model.compile(loss='mse', optimizer=Adam(learning_rate=self.learning_rate))
return model
def update_target_model(self):
self.target_model.set_weights(self.model.get_weights())
def remember(self, state, action, reward, next_state, done):
self.memory.append((state, action, reward, next_state, done))
def act(self, state):
if np.random.rand() <= self.epsilon:
return random.randrange(self.action_size)
act_values = self.model.predict(state, verbose=0)
return np.argmax(act_values[0])
def replay(self, batch_size):
if len(self.memory) < batch_size: return
minibatch = random.sample(self.memory, batch_size)
states, targets_f = [], []
for state, action, reward, next_state, done in minibatch:
target = reward
if not done:
q_next = self.target_model.predict(next_state, verbose=0)[0]
target = reward + self.gamma * np.amax(q_next)
target_f = self.model.predict(state, verbose=0)
target_f[0][action] = target
states.append(state[0])
targets_f.append(target_f[0])
self.model.fit(np.array(states), np.array(targets_f), epochs=1, verbose=0)
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
def save(self, name):
self.model.save(name)
def load(self, name):
self.model.load_weights(name)
self.update_target_model()
class LottoRLPredictor:
def __init__(self, wheel_name):
self.wheel_name = wheel_name
self.df = None
self.all_draws = []
self.features = None
self.agent = None
self.scaler = MinMaxScaler()
self.algorithms = {'DQN': {'agent': DQNAgent, 'params': {'learning_rate': 0.001, 'gamma': 0.95, 'epsilon_decay': 0.999}}}
self.best_algorithm_name = None
self.best_params = None
self.historical_accuracy_report = {}
self.training_rewards = []
def generate_dummy_data(self, file_path, rows=10700):
if not os.path.exists(file_path):
logging.warning(f"File {file_path} non trovato. Generazione di un dataset di esempio.")
header = 'Num;Estrazione;Ruota;Numeri;Ritardo;Sortita;Estratti;'
data = []
start_date = pd.to_datetime('1871-01-01')
for i in range(rows):
date = start_date + pd.DateOffset(days=i*5)
numbers = sorted(random.sample(range(1, NUM_NUMBERS + 1), NUM_EXTRACTED))
numeri_str_dot = '.'.join([f'{n:02}' for n in numbers])
numeri_str_space = ' '.join([f'{n:02}' for n in numbers])
row_str = f"{i};{i+1} - {date.strftime('%d/%m/%Y')};{self.wheel_name};{numeri_str_dot};0;Cinquina;{numeri_str_space} ;"
data.append(row_str)
with open(file_path, 'w', encoding='utf-8') as f:
f.write(header + '\n')
for row in data:
f.write(row + '\n')
logging.info(f"Dataset di esempio generato e salvato in {file_path}")
def load_and_preprocess_data(self, file_path):
logging.info(f"Caricamento dati da {file_path} per la ruota '{self.wheel_name}'...")
try:
df_temp = pd.read_csv(file_path, sep=';', header=None, skiprows=1, dtype=str, on_bad_lines='skip', encoding='utf-8')
column_names = ['Num', 'Estrazione', 'Ruota', 'Numeri', 'Ritardo', 'Sortita', 'Estratti']
num_cols_to_assign = min(len(column_names), df_temp.shape[1])
self.df = df_temp.iloc[:, :num_cols_to_assign]
self.df.columns = column_names[:num_cols_to_assign]
except Exception as e:
logging.error(f"Errore durante la lettura del CSV: {e}"); return
self.df['Ruota'] = self.df['Ruota'].str.strip().str.title()
self.wheel_name = self.wheel_name.strip().title()
self.df = self.df[self.df['Ruota'] == self.wheel_name].copy()
if self.df.empty:
logging.error(f"Nessuna estrazione trovata per la ruota '{self.wheel_name}'."); self.df = None; return
self.df['Data'] = pd.to_datetime(self.df['Estrazione'].str.split(' - ').str[1], format='%d/%m/%Y', errors='coerce')
num_col = 'Estratti' if 'Estratti' in self.df.columns else 'Numeri'
sep = ' ' if num_col == 'Estratti' else '.'
self.df.dropna(subset=[num_col, 'Data'], inplace=True)
self.df['Parsed_Numeri'] = self.df[num_col].astype(str).str.strip().str.split(sep).apply(lambda x: [int(n) for n in x if n.isdigit()])
self.df = self.df[self.df['Parsed_Numeri'].apply(len) == NUM_EXTRACTED]
self.df.sort_values('Data', inplace=True); self.df.reset_index(drop=True, inplace=True)
self.all_draws = self.df['Parsed_Numeri'].tolist()
logging.info(f"Caricate {len(self.df)} estrazioni per la ruota '{self.wheel_name}'.")
self._feature_engineering()
def _feature_engineering(self):
logging.info("Inizio Feature Engineering...")
delays = np.zeros((len(self.df), NUM_NUMBERS), dtype=int)
last_seen = -np.ones(NUM_NUMBERS + 1, dtype=int)
for i, draw in enumerate(tqdm(self.all_draws, desc="Calcolo Features")):
for num in range(1, NUM_NUMBERS + 1):
delays[i, num - 1] = i - last_seen[num] if last_seen[num] != -1 else i + 1
for extracted_num in draw:
last_seen[extracted_num] = i
delay_df = pd.DataFrame(delays)
ma_delay_50 = delay_df.rolling(window=50).mean().fillna(0)
self.features_df = pd.concat([delay_df.add_prefix('delay_'), ma_delay_50.add_prefix('ma_delay_50_')], axis=1)
self.features = self.scaler.fit_transform(self.features_df)
logging.info(f"Feature engineering completato. Shape: {self.features.shape}")
def auto_select_algorithm(self):
logging.info("Selezione automatica dell'algoritmo migliore (Default: DQN)...")
self.best_algorithm_name = 'DQN'
self.best_params = self.algorithms['DQN']['params']
def train_model(self, episodes, batch_size):
if self.features is None: logging.error("Dati non caricati."); return
state_size = self.features.shape[1]
action_size = NUM_NUMBERS
agent_class = self.algorithms[self.best_algorithm_name]['agent']
self.agent = agent_class(state_size, action_size, self.best_params)
model_filename = f"lotto_model_{self.wheel_name.lower()}.h5"
if os.path.exists(model_filename):
logging.info(f"Trovato modello pre-esistente. Caricamento di '{model_filename}' per riprendere il training.")
self.agent.load(model_filename)
logging.info(f"Inizio training per {episodes} episodi...")
env = LottoRLEnvironment(self.features, self.all_draws)
for e in range(episodes):
state = env.reset()
state = np.reshape(state, [1, state_size])
total_reward = 0
pbar = tqdm(range(STATE_WINDOW_SIZE, env.n_steps - 1), desc=f"Episodio {e+1}/{episodes}", unit="estrazione")
for _ in pbar:
action = self.agent.act(state)
next_state, reward, done = env.step(action)
total_reward += reward
if not done:
next_state = np.reshape(next_state, [1, state_size])
self.agent.remember(state, action, reward, next_state, done)
state = next_state
if done: break
if len(self.agent.memory) > batch_size: self.agent.replay(batch_size)
pbar.set_postfix({"Reward": f"{total_reward:.1f}", "Epsilon": f"{self.agent.epsilon:.3f}"})
self.training_rewards.append(total_reward)
self.agent.save(model_filename)
logging.info(f"Fine Episodio {e+1}/{episodes} - Reward: {total_reward:.2f} - Modello salvato in '{model_filename}'")
logging.info("Sessione di training completata.")
def calculate_max_delay_and_remaining(self, number):
binary_occurrence = self.df['Parsed_Numeri'].apply(lambda x: number in x)
if not binary_occurrence.any(): return 0, 0, 0
indices = binary_occurrence[binary_occurrence].index
delays = np.diff(indices)
max_delay = delays.max() if len(delays) > 0 else 0
current_delay = len(self.df) - 1 - indices.max()
remaining = max(0, max_delay - current_delay)
return int(max_delay), int(current_delay), int(remaining)
def find_analogous_conditions(self, current_state_vector):
similarities = cosine_similarity(current_state_vector, self.features[:-1])
best_match_index = np.argsort(similarities[0])[-1]
analogous_draw_date = self.df.loc[best_match_index, 'Data']
return f"Estrazione del {analogous_draw_date.strftime('%d/%m/%Y')} (similarità : {similarities[0][best_match_index]:.2f})"
def evaluate_historical_accuracy(self, test_period_size=100):
logging.info("Valutazione accuratezza storica (backtesting)...")
if self.agent is None: logging.error("Modello non addestrato."); return
test_start_index = len(self.features) - test_period_size
hits_top1, hits_top5 = 0, 0
for i in tqdm(range(test_start_index, len(self.features) - 1), desc="Backtesting"):
state = np.reshape(self.features[i], [1, self.features.shape[1]])
q_values = self.agent.model.predict(state, verbose=0)[0]
predicted_numbers_top5 = np.argsort(q_values)[-NUM_EXTRACTED:][::-1] + 1
actual_numbers = self.all_draws[i + 1]
if predicted_numbers_top5[0] in actual_numbers: hits_top1 += 1
if any(num in actual_numbers for num in predicted_numbers_top5): hits_top5 += 1
self.historical_accuracy_report = {
"Top-1 Accuracy": (hits_top1 / test_period_size) * 100, "Top-5 Accuracy": (hits_top5 / test_period_size) * 100, "Test Period": test_period_size
}
logging.info(f"Accuratezza Top-1: {self.historical_accuracy_report['Top-1 Accuracy']:.2f}%, Top-5: {self.historical_accuracy_report['Top-5 Accuracy']:.2f}%")
return self.historical_accuracy_report
def predict_next_extraction(self):
if self.agent is None: logging.error("Modello non addestrato."); return None
logging.info("Generazione predizioni per la prossima estrazione...")
last_state = np.reshape(self.features[-1], [1, self.features.shape[1]])
q_values = self.agent.model.predict(last_state, verbose=0)[0]
sorted_actions = np.argsort(q_values)[::-1]
q_min, q_max = q_values.min(), q_values.max()
confidence_scores = (q_values - q_min) / (q_max - q_min + 1e-9) * 100
predictions = []
for i in range(NUM_EXTRACTED):
action = sorted_actions[i]; number = action + 1
max_delay, _, remaining = self.calculate_max_delay_and_remaining(number)
predictions.append({
"Numero Predetto": number, "Colpo Massimo Storico": max_delay, "Colpi Rimanenti Stimati": remaining,
"% Successo Passato": f"{self.historical_accuracy_report.get('Top-5 Accuracy', 0.0):.2f}%",
"Condizione Analoga Migliore": self.find_analogous_conditions(last_state),
"Confidenza": f"{confidence_scores[action]:.2f}%"
})
return predictions
def display_results(self, predictions):
if not predictions: logging.warning("Nessuna predizione da mostrare."); return
print(f"\n{'='*60}\nPREDIZIONI PROSSIMA ESTRAZIONE - RUOTA [{self.wheel_name.upper()}]\n{'='*60}\n")
for i, p in enumerate(predictions):
print(f"ESTRATTO {i+1}:")
for key, value in p.items(): print(f"- {key}: {value}")
print("-" * 30)
print(f"\n{'='*60}\nPERFORMANCE ALGORITMO\n{'='*60}")
print(f"- Algoritmo Selezionato: {self.best_algorithm_name}")
if self.historical_accuracy_report:
print(f"- Accuratezza Media (Top-5): {self.historical_accuracy_report['Top-5 Accuracy']:.2f}%")
plt.figure(figsize=(12, 6))
plt.plot(self.training_rewards); plt.title(f'Reward Totale per Episodio - Ruota di {self.wheel_name}')
plt.xlabel('Episodio'); plt.ylabel('Reward Totale Accumulato'); plt.grid(True); plt.tight_layout()
plt.savefig(f'training_rewards_{self.wheel_name.lower()}.png')
print(f"\nGrafico performance salvato come 'training_rewards_{self.wheel_name.lower()}.png'")
plt.show()
# ==============================================================================
# --- FLUSSO DI ESECUZIONE PRINCIPALE ---
# ==============================================================================
if __name__ == '__main__':
# Puoi cambiare il nome del file qui se necessario
DATASET_FILE = 'estrazioni_lotto_1871_2025.csv'
predictor = LottoRLPredictor(wheel_name=WHEEL_NAME_TO_ANALYZE)
if not os.path.exists(DATASET_FILE):
predictor.generate_dummy_data(DATASET_FILE)
predictor.load_and_preprocess_data(DATASET_FILE)
if predictor.df is not None and not predictor.df.empty:
# Seleziona l'algoritmo (passo semplice per ora)
predictor.auto_select_algorithm()
# Addestra il modello (caricherà un modello esistente se lo trova)
predictor.train_model(episodes=TOTAL_EPISODES, batch_size=BATCH_SIZE)
# Valuta e predici
predictor.evaluate_historical_accuracy(test_period_size=min(100, len(predictor.df) - STATE_WINDOW_SIZE - 2))
final_predictions = predictor.predict_next_extraction()
predictor.display_results(final_predictions)
else:
logging.error("Impossibile procedere: nessun dato valido caricato per la ruota specificata.")
# il formato del lotto deve essere questo qui sotto di esempio (ovvero quello che si ottiene da spaziometria sezione statistica veloce in basso a sx. Ovviamente si può anche usare un altro formato di dataset ma a patto di modificare anche il relativo codice di lettura dello stesso...)
# Num;Estrazione;Ruota;Numeri;Ritardo;Sortita;Estratti;
# 10581;10588 - 30/01/2025;Firenze;04.63.56.34.90;0;Cinquina;04 63 56 34 90 ;
# 10582;10589 - 31/01/2025;Firenze;38.51.15.50.56;0;Cinquina;38 51 15 50 56 ;
# 10583;10590 - 01/02/2025;Firenze;73.63.57.74.86;0;Cinquina;73 63 57 74 86 ;
# 10584;10591 - 04/02/2025;Firenze;20.33.60.73.87;0;Cinquina;20 33 60 73 87 ;
# 10585;10592 - 06/02/2025;Firenze;59.29.75.36.73;0;Cinquina;59 29 75 36 73 ;
# 10586;10593 - 07/02/2025;Firenze;01.31.73.34.37;0;Cinquina;01 31 73 34 37 ;
# 10587;10594 - 08/02/2025;Firenze;24.45.62.77.76;0;Cinquina;24 45 62 77 76 ;
# 10588;10595 - 11/02/2025;Firenze;04.57.11.79.12;0;Cinquina;04 57 11 79 12 ;
# 10589;10596 - 13/02/2025;Firenze;21.57.56.04.44;0;Cinquina;21 57 56 04 44 ;
# 10590;10597 - 14/02/2025;Firenze;21.87.82.53.31;0;Cinquina;21 87 82 53 31 ;
# 10591;10598 - 15/02/2025;Firenze;77.37.02.57.36;0;Cinquina;77 37 02 57 36 ;
# 10592;10599 - 18/02/2025;Firenze;11.68.62.04.21;0;Cinquina;11 68 62 04 21 ;
# 10593;10600 - 20/02/2025;Firenze;60.80.86.48.19;0;Cinquina;60 80 86 48 19 ;
# 10594;10601 - 21/02/2025;Firenze;54.01.37.47.04;0;Cinquina;54 01 37 47 04 ;
# 10595;10602 - 22/02/2025;Firenze;42.38.70.71.80;0;Cinquina;42 38 70 71 80 ;
....