# ==============================================================================
# SISTEMA PREDITTIVO LOTTO CON REINFORCEMENT LEARNING
# Autore: Modello Gemini di Google + Humano (tom) :)
# Versione: 1.5 - Specialista di Gregge numerico
#
# DESCRIZIONE:
# Script specializzato per analizzare un gruppo ristretto di numeri ("gregge")
# e predire un ambo. Include checkpointing e ripresa del training.
#
# DISCLAIMER:
# Il gioco del Lotto è un processo casuale. Questo script è un esercizio
# accademico e non garantisce alcuna vincita. Giocare responsabilmente.
# ==============================================================================
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, BatchNormalization
from tensorflow.keras.optimizers import Adam
from collections import deque
import random
import os
import logging
from tqdm import tqdm
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics.pairwise import cosine_similarity
# --- Sopprimi i messaggi informativi di TensorFlow ---
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
# --- Setup del Logging ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# ==============================================================================
# ====> CONFIGURAZIONE DELLO STUDIO SUL GREGGE <====
# ==============================================================================
# 1. DEFINISCI IL TUO GREGGE DI NUMERI #es. c9 abs x s1 su TT e NZ unite by 8117
# GREGGE_SPECIALE = [
# 19.32.41.47.49.54.56.82.90 <- METTI QUI il tuo GRUPPO BASE NUMERICO (ovviamente togli i # dei commenti per attivare questo pezzo di code)
# ]
# 2. DEFINISCI LA RUOTA DA ANALIZZARE (il "pascolo")
WHEEL_NAME_TO_ANALYZE = 'Napoli' # Esempio, cambiala con la tua scelta
# 3. IMPOSTA I PARAMETRI DI TRAINING
TOTAL_EPISODES = 1 # Imposta a 1 per il training notturno incrementale
BATCH_SIZE = 64 # Dimensione del batch, 64 è un buon compromesso
NUM_PREDICTED_AMBO = 2 # L'obiettivo è predire 2 numeri (un ambo)
# --- Costanti derivate dalla configurazione ---
NUM_GREGGE = len(GREGGE_SPECIALE)
STATE_WINDOW_SIZE = 50
# ==============================================================================
class LottoRLEnvironment:
def __init__(self, data_features, data_actual_draws):
self.features = data_features
self.actual_draws = data_actual_draws
self.n_steps = len(data_features)
self.current_step = STATE_WINDOW_SIZE
def reset(self):
self.current_step = STATE_WINDOW_SIZE
return self.features[self.current_step]
def step(self, actions): # NOTA: Ora accetta più azioni (le 2 predizioni)
self.current_step += 1
done = self.current_step >= self.n_steps - 1
predicted_numbers = [GREGGE_SPECIALE[a] for a in actions]
actual_next_draw = self.actual_draws[self.current_step]
# --- Nuova Reward Function per l'Ambo ---
hits = sum(1 for num in predicted_numbers if num in actual_next_draw)
if hits == 2:
reward = 10.0 # Grande ricompensa per l'ambo secco!
elif hits == 1:
reward = 1.0 # Piccola ricompensa per un estratto
else:
reward = -0.1 # Piccola penalitÃ
next_state = self.features[self.current_step] if not done else None
return next_state, reward, done
class DQNAgent:
# ... (La classe DQNAgent rimane quasi identica, le modifiche sono nel flusso) ...
def __init__(self, state_size, action_size, params):
self.state_size = state_size
self.action_size = action_size # Ora sarà 29
self.memory = deque(maxlen=20000)
self.gamma = params.get('gamma', 0.95)
self.epsilon = params.get('epsilon', 1.0)
self.epsilon_min = 0.01
self.epsilon_decay = params.get('epsilon_decay', 0.999)
self.learning_rate = params.get('learning_rate', 0.001)
self.model = self._build_model()
self.target_model = self._build_model()
self.update_target_model()
def _build_model(self):
model = Sequential([
Dense(128, input_dim=self.state_size, activation='relu'),
BatchNormalization(), Dropout(0.3),
Dense(128, activation='relu'),
BatchNormalization(), Dropout(0.3),
Dense(self.action_size, activation='linear') # Output di 29 neuroni
])
model.compile(loss='mse', optimizer=Adam(learning_rate=self.learning_rate))
return model
def update_target_model(self):
self.target_model.set_weights(self.model.get_weights())
def remember(self, state, action, reward, next_state, done):
# NOTA: Ora salviamo una singola azione scelta, non l'ambo
self.memory.append((state, action, reward, next_state, done))
def act(self, state, num_to_predict):
if np.random.rand() <= self.epsilon:
# Sceglie N azioni casuali e uniche
return np.random.choice(self.action_size, num_to_predict, replace=False)
# Sceglie le N migliori azioni
act_values = self.model.predict(state, verbose=0)[0]
return np.argsort(act_values)[-num_to_predict:][::-1] # Indici delle N migliori azioni
def replay(self, batch_size):
if len(self.memory) < batch_size: return
minibatch = random.sample(self.memory, batch_size)
# Il replay avviene su ogni singola azione/numero predetto
for state, action, reward, next_state, done in minibatch:
target = reward
if not done:
q_next = self.target_model.predict(next_state, verbose=0)[0]
target = reward + self.gamma * np.amax(q_next)
target_f = self.model.predict(state, verbose=0)
target_f[0][action] = target
self.model.fit(state, target_f, epochs=1, verbose=0)
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
def save(self, name):
self.model.save(name)
def load(self, name):
self.model.load_weights(name)
self.update_target_model()
class LottoRLPredictor:
def __init__(self, wheel_name):
self.wheel_name = wheel_name
self.df = None
self.all_draws = []
self.features = None
self.agent = None
self.scaler = MinMaxScaler()
self.algorithms = {'DQN': {'agent': DQNAgent, 'params': {'learning_rate': 0.001, 'gamma': 0.95, 'epsilon_decay': 0.9995}}}
self.best_algorithm_name = None
self.best_params = None
self.historical_accuracy_report = {}
self.training_rewards = []
def load_and_preprocess_data(self, file_path):
logging.info(f"Caricamento dati da {file_path} per la ruota '{self.wheel_name}'...")
try:
df_temp = pd.read_csv(file_path, sep=';', header=None, skiprows=1, dtype=str, on_bad_lines='skip', encoding='utf-8')
column_names = ['Num', 'Estrazione', 'Ruota', 'Numeri', 'Ritardo', 'Sortita', 'Estratti']
num_cols_to_assign = min(len(column_names), df_temp.shape[1])
self.df = df_temp.iloc[:, :num_cols_to_assign]
self.df.columns = column_names[:num_cols_to_assign]
except Exception as e:
logging.error(f"Errore durante la lettura del CSV: {e}"); return
self.df['Ruota'] = self.df['Ruota'].str.strip().str.title()
self.wheel_name = self.wheel_name.strip().title()
self.df = self.df[self.df['Ruota'] == self.wheel_name].copy()
if self.df.empty:
logging.error(f"Nessuna estrazione trovata per la ruota '{self.wheel_name}'."); self.df = None; return
self.df['Data'] = pd.to_datetime(self.df['Estrazione'].str.split(' - ').str[1], format='%d/%m/%Y', errors='coerce')
num_col = 'Estratti' if 'Estratti' in self.df.columns else 'Numeri'
sep = ' ' if num_col == 'Estratti' else '.'
self.df.dropna(subset=[num_col, 'Data'], inplace=True)
self.df['Parsed_Numeri'] = self.df[num_col].astype(str).str.strip().str.split(sep).apply(lambda x: [int(n) for n in x if n.isdigit()])
self.df.sort_values('Data', inplace=True); self.df.reset_index(drop=True, inplace=True)
self.all_draws = self.df['Parsed_Numeri'].tolist()
logging.info(f"Caricate {len(self.df)} estrazioni per la ruota '{self.wheel_name}'.")
self._feature_engineering()
def _feature_engineering(self):
logging.info("Inizio Feature Engineering per il Gregge Speciale...")
# Le features ora sono calcolate solo per i numeri del gregge
delays = np.zeros((len(self.df), NUM_GREGGE), dtype=int)
last_seen = {num: -1 for num in GREGGE_SPECIALE}
for i, draw in enumerate(tqdm(self.all_draws, desc="Calcolo Features Gregge")):
for idx, num in enumerate(GREGGE_SPECIALE):
delays[i, idx] = i - last_seen[num] if last_seen[num] != -1 else i + 1
for extracted_num in draw:
if extracted_num in last_seen:
last_seen[extracted_num] = i
delay_df = pd.DataFrame(delays)
ma_delay_50 = delay_df.rolling(window=50).mean().fillna(0)
self.features_df = pd.concat([delay_df.add_prefix('delay_'), ma_delay_50.add_prefix('ma_delay_50_')], axis=1)
self.features = self.scaler.fit_transform(self.features_df)
logging.info(f"Feature engineering completato. Shape: {self.features.shape}")
def train_model(self, episodes, batch_size):
if self.features is None: logging.error("Dati non caricati."); return
state_size = self.features.shape[1]
action_size = NUM_GREGGE # Azioni possibili = numero di pecore nel gregge
self.agent = DQNAgent(state_size, action_size, self.algorithms['DQN']['params'])
model_filename = f"gregge_model_{self.wheel_name.lower()}.h5"
if os.path.exists(model_filename):
logging.info(f"Trovato modello pre-esistente. Caricamento di '{model_filename}' per riprendere il training.")
self.agent.load(model_filename)
logging.info(f"Inizio training per {episodes} episodi...")
env = LottoRLEnvironment(self.features, self.all_draws)
for e in range(episodes):
state = env.reset()
state = np.reshape(state, [1, state_size])
total_reward = 0
pbar = tqdm(range(STATE_WINDOW_SIZE, env.n_steps - 1), desc=f"Episodio {e+1}/{episodes}", unit="estrazione")
for _ in pbar:
# L'agente sceglie le N migliori azioni (indici del gregge)
actions = self.agent.act(state, NUM_PREDICTED_AMBO)
# Passiamo le azioni all'ambiente e otteniamo un reward per la coppia
next_state, reward, done = env.step(actions)
total_reward += reward
# Per l'apprendimento, salviamo ogni azione separatamente
if not done:
next_state = np.reshape(next_state, [1, state_size])
# Semplificazione: diamo lo stesso reward a entrambe le azioni scelte
for action in actions:
self.agent.remember(state, action, reward, next_state, done)
state = next_state
if done: break
# Il replay è stato semplificato per gestire azione singola
if len(self.agent.memory) > batch_size:
self.agent.replay(batch_size)
pbar.set_postfix({"Reward": f"{total_reward:.1f}", "Epsilon": f"{self.agent.epsilon:.3f}"})
self.training_rewards.append(total_reward)
self.agent.save(model_filename)
logging.info(f"Fine Episodio {e+1}/{episodes} - Reward: {total_reward:.2f} - Modello salvato in '{model_filename}'")
logging.info("Sessione di training completata.")
def predict_next_extraction(self):
if self.agent is None: logging.error("Modello non addestrato."); return None
logging.info("Generazione predizione AMBO per il gregge...")
last_state = np.reshape(self.features[-1], [1, self.features.shape[1]])
# L'agente predice gli indici delle migliori N azioni
predicted_indices = self.agent.act(last_state, NUM_PREDICTED_AMBO)
# Mappiamo gli indici ai numeri reali del gregge
predicted_numbers = [GREGGE_SPECIALE[i] for i in predicted_indices]
return predicted_numbers
def display_results(self, predictions):
if not predictions: logging.warning("Nessuna predizione da mostrare."); return
print(f"\n{'='*60}\nPREDIZIONE PROSSIMO AMBO - RUOTA [{self.wheel_name.upper()}]\n{'='*60}\n")
print(f"Gruppo di Riferimento: {NUM_GREGGE} numeri")
print(f"Ambo Predetto: {predictions[0]} - {predictions[1]}")
print("-" * 30)
plt.figure(figsize=(12, 6))
plt.plot(self.training_rewards); plt.title(f'Reward Totale per Episodio (Gregge) - Ruota di {self.wheel_name}')
plt.xlabel('Episodio'); plt.ylabel('Reward Totale Accumulato'); plt.grid(True); plt.tight_layout()
plt.savefig(f'gregge_training_rewards_{self.wheel_name.lower()}.png')
print(f"\nGrafico performance salvato come 'gregge_training_rewards_{self.wheel_name.lower()}.png'")
plt.show()
# ==============================================================================
# --- FLUSSO DI ESECUZIONE PRINCIPALE ---
# ==============================================================================
if __name__ == '__main__':
# Usa un nome file specifico per la ruota che stai analizzando
# Esempio: "archivio_firenze_1000.csv"
DATASET_FILE = 'archivio_ultime_820ca_sfaldamenti_su-NAPOLI.csv' # <--- METTI QUI IL NOME DEL TUO FILE CSV
predictor = LottoRLPredictor(wheel_name=WHEEL_NAME_TO_ANALYZE)
predictor.load_and_preprocess_data(DATASET_FILE)
if predictor.df is not None and not predictor.df.empty:
predictor.train_model(episodes=TOTAL_EPISODES, batch_size=BATCH_SIZE)
final_predictions = predictor.predict_next_extraction()
predictor.display_results(final_predictions)
else:
logging.error("Impossibile procedere: nessun dato valido caricato.")