# 🤖 BOT INTELLIGENTE v40_gold - FINAL FIX (RA/RS/INC REALI)
import numpy as np
import pandas as pd
import os
import time
import random
import json
import copy
import re
import math
from itertools import combinations
from numba import njit, prange
from tqdm.notebook import tqdm
from IPython.display import Audio, display, HTML
# ============================================================
# 0. FIX VISUALIZZAZIONE KAGGLE (FULL WIDTH)
# ============================================================
display(HTML("""
<style>
.container { width:100% !important; }
div.output_area { width: 100% !important; }
div.output_area pre {
white-space: pre-wrap;
word-wrap: break-word;
width: 100%;
overflow-x: hidden;
}
.widget-area { width: 100% !important; }
</style>
"""))
# ============================================================
# 1. CONFIGURAZIONE INPUT
# ============================================================
# --- SELEZIONE MODALITÀ INPUT ---
TIPO_INPUT = 'FILE' # 'FILE' o 'MANUALE'
# --- SELEZIONE OBIETTIVO DI STOP ---
MODALITA_STOP = 'GEMMA'
# ------------------------------------------------------------
# OPZIONE A: LISTA MANUALE
# ------------------------------------------------------------
MATRICE_MANUALE = [
[1,4,9,11,13,16,17,23,25,26,30,40,42,49,55,58,59,61,67,68,73,77,81,83,85,86,88,89,90],
]
# ------------------------------------------------------------
# OPZIONE B: FILE
# ------------------------------------------------------------
PATH_FILE_MATRICE = "/kaggle/input/14ineabsxesuttenzdal1871/solo14ineabsxEsuTTeNZdal1871solonumeri.txt"
# ------------------------------------------------------------
# IMPOSTAZIONI GENERALI
# ------------------------------------------------------------
RIGA_DA_ANALIZZARE = -2
TIMEOUT_MINUTI_PER_RIGA = 1
SOGLIA_COLPI_RIMANENTI = 9
GRUPPO_BASE_BACKUP = [1,4,9,11,13,16,17,23,25,26,30,40,42,49,55,58,59,61,67,68,73,77,81,83,85,86,88,89,90]
# ============================================================
# 2. OBIETTIVI & LIMITI
# ============================================================
TARGET_CASI_MINIMI = 1000
TARGET_PERC_MINIMA = 100
TARGET_COLPI_MAX = 60
MIN_NUMERI_GRUPPO = 2
MAX_NUMERI_GRUPPO = 2
MIN_SOGLIA = 18
MAX_SOGLIA = 18
POSSIBILI_SORTI_RICERCA = [1]
MIN_RUOTE_UNITE = 1
MAX_RUOTE_UNITE = 1
RUOTE_NOMI = ['BA','CA','FI','GE','MI','NA','PA','RO','TO','VE']
PATH_RUOTE = "/kaggle/input/estrazionilotto-aggiornate-al-20-1-2026"
SORTE_VERIFICA = 1
# ============================================================
# UTILITÀ
# ============================================================
def leggi_file_matrice(path):
if not os.path.exists(path):
print(f"⚠️ File non trovato: {path}")
return [GRUPPO_BASE_BACKUP]
matrice = []
try:
with open(path, 'r') as f:
for line in f:
line = line.strip()
if not line: continue
line_clean = re.sub(r'[^\d]+', ' ', line)
nums = [int(x) for x in line_clean.split() if x.isdigit()]
nums = sorted(list(set(nums)))
if len(nums) > 0:
matrice.append(nums)
except Exception as e:
print(f"⚠️ Errore lettura: {e}")
return [GRUPPO_BASE_BACKUP]
if not matrice: return [GRUPPO_BASE_BACKUP]
return matrice
def play_sound():
display(HTML("""<script>new Audio('https://www.soundjay.com/buttons/beep-01a.mp3').play();</script>"""))
def play_final_sound():
display(HTML("""
<script>
var a = new Audio('https://www.soundjay.com/buttons/beep-01a.mp3');
a.play(); setTimeout(function(){ a.play(); }, 500);
</script>
<b style="color:green; font-size:20px;">✅ ANALISI COMPLETATA!</b>
"""))
# ============================================================
# ENGINE
# ============================================================
def carica_bin():
estr = {}
min_len = 10**9
if not os.path.exists(PATH_RUOTE) and not os.path.exists(os.path.join(PATH_RUOTE, "estrazioni-BA.txt")):
min_len = 2000
binmat = np.zeros((min_len, len(RUOTE_NOMI), 90), dtype=np.uint8)
for t in range(min_len):
for r in range(len(RUOTE_NOMI)):
nums = np.random.choice(np.arange(90), 5, replace=False)
for n in nums: binmat[t, r, n] = 1
return binmat, min_len
for r in RUOTE_NOMI:
fname = os.path.join(PATH_RUOTE, f"estrazioni-{r}.txt")
if os.path.exists(fname):
rows = []
with open(fname) as f:
for l in f:
p = l.strip().split(".")
if len(p) == 5: rows.append([int(x) for x in p])
arr = np.array(rows, dtype=np.int16)
estr[r] = arr
min_len = min(min_len, len(arr))
# IMPORTANTE: Qui carichiamo in modo che binmat[0] sia l'estrazione PIÙ VECCHIA (1871)
# e binmat[min_len-1] sia quella ODIERNA.
# Questo perché estr[r] letto dal file ha [0]=Recente.
# [::-1] lo inverte. Quindi [0]=Vecchio.
for r in RUOTE_NOMI: estr[r] = estr[r][:min_len][::-1]
binmat = np.zeros((min_len, len(RUOTE_NOMI), 90), dtype=np.uint8)
for ri, r in enumerate(RUOTE_NOMI):
for t in range(min_len):
for n in estr[r][t]:
binmat[t, ri, n-1] = 1
return binmat, min_len
@njit(parallel=True)
def scan_core(binmat, combs, ruote_idx_list, sric, sver, thr_ric, thr_ver):
n_groups = ruote_idx_list.shape[0]
n_ruote_in_group = ruote_idx_list.shape[1]
ncomb = combs.shape[0]
nt = binmat.shape[0]
tot_ev = 0
tot_ok = 0
for g_idx in prange(n_groups):
for i in range(ncomb):
rit_ric = 0
in_ver = False
rit_ver = 0
for t in range(nt):
max_hit = 0
for r_sub_idx in range(n_ruote_in_group):
r_real = ruote_idx_list[g_idx, r_sub_idx]
current_hit = 0
for k in range(combs.shape[1]):
current_hit += binmat[t, r_real, combs[i,k]]
if current_hit > max_hit: max_hit = current_hit
if not in_ver:
if max_hit >= sric: rit_ric = 0
else: rit_ric += 1
if rit_ric >= thr_ric:
in_ver = True
rit_ver = 0
else:
rit_ver += 1
if max_hit >= sver:
tot_ev += 1
if rit_ver <= thr_ver: tot_ok += 1
in_ver = False
rit_ric = 0
return tot_ev, tot_ok
def valuta_stato(binmat, stato):
real_combs = np.array([ [x-1 for x in stato['gruppo']] ], dtype=np.int16)
n_r = stato['num_ruote']
ruote_combs = list(combinations(range(len(RUOTE_NOMI)), n_r))
ruote_idx_arr = np.array(ruote_combs, dtype=np.int32)
if ruote_idx_arr.ndim == 1:
ruote_idx_arr = ruote_idx_arr.reshape(-1, 1)
ev, ok = scan_core(
binmat, real_combs, ruote_idx_arr,
stato['sorte_ric'], SORTE_VERIFICA,
stato['soglia_ric'], TARGET_COLPI_MAX
)
perc = (ok / ev * 100) if ev > 0 else 0
return ev, ok, perc
@njit
def scan_dettagliata_single_group(binmat, combs, ruote_idx, sric, sver, thr_ric, thr_ver):
ncomb = combs.shape[0]
nt = binmat.shape[0]
n_ruote = ruote_idx.shape[0]
colpi_esito = np.zeros(5000, dtype=np.int16)
count_ev = 0
for i in range(ncomb):
rit_ric = 0
in_ver = False
rit_ver = 0
for t in range(nt):
max_hit = 0
for r_sub_idx in range(n_ruote):
r_real = ruote_idx[r_sub_idx]
current_hit = 0
for k in range(combs.shape[1]):
current_hit += binmat[t, r_real, combs[i,k]]
if current_hit > max_hit: max_hit = current_hit
if not in_ver:
if max_hit >= sric: rit_ric = 0
else: rit_ric += 1
if rit_ric >= thr_ric:
in_ver = True
rit_ver = 0
else:
rit_ver += 1
if max_hit >= sver:
if count_ev < 5000:
colpi_esito[count_ev] = rit_ver
count_ev += 1
in_ver = False
rit_ric = 0
return count_ev, colpi_esito
@njit
def check_active_status(binmat, combs, ruote_idx, sric, sver, thr_ric, thr_ver):
ncomb = combs.shape[0]
nt = binmat.shape[0]
n_ruote = ruote_idx.shape[0]
is_active = False
colpi_passati = 0
for i in range(ncomb):
rit_ric = 0
in_ver = False
rit_ver = 0
for t in range(nt):
max_hit = 0
for r_sub_idx in range(n_ruote):
r_real = ruote_idx[r_sub_idx]
current_hit = 0
for k in range(combs.shape[1]):
current_hit += binmat[t, r_real, combs[i,k]]
if current_hit > max_hit: max_hit = current_hit
if not in_ver:
if max_hit >= sric: rit_ric = 0
else: rit_ric += 1
if rit_ric >= thr_ric:
in_ver = True
rit_ver = 0
else:
rit_ver += 1
if max_hit >= sver:
in_ver = False
rit_ric = 0
if in_ver:
if rit_ver <= thr_ver:
is_active = True
colpi_passati = rit_ver
break
return is_active, colpi_passati
# ============================================================
# STATISTICHE STANDARD (FIX CRONOLOGICO DEFINITIVO)
# ============================================================
@njit
def calcola_statistiche_standard(binmat, combs, ruote_idx, sorte):
# binmat[0] = 1871 (VECCHIO)
# binmat[nt-1] = OGGI (NUOVO)
nt = binmat.shape[0]
n_ruote = ruote_idx.shape[0]
comb = combs[0]
# 1. CALCOLO RA (Ritardo Attuale)
# Dobbiamo partire dalla FINE (Oggi) e tornare indietro.
ra = 0
found_first = False
for i in range(nt):
t = nt - 1 - i # Indici: N-1 (Oggi), N-2, ... 0 (1871)
hit_count = 0
for r_sub_idx in range(n_ruote):
r_real = ruote_idx[r_sub_idx]
for k in range(len(comb)):
hit_count += binmat[t, r_real, comb[k]]
if hit_count >= sorte:
found_first = True
break # Trovato l'ultimo esito, fermiamo il conteggio RA
ra += 1
if not found_first: ra = nt # Mai uscito nella storia
# 2. CALCOLO RS PRECEDENTE (Max Gap Storico)
# Scorriamo dall'INIZIO (1871) alla FINE (Oggi)
max_rit_chiuso = 0
freq = 0
last_hit_t = -1
for t in range(nt): # t: 0, 1, ... N-1
hit_count = 0
for r_sub_idx in range(n_ruote):
r_real = ruote_idx[r_sub_idx]
for k in range(len(comb)):
hit_count += binmat[t, r_real, comb[k]]
if hit_count >= sorte:
freq += 1
if last_hit_t != -1:
gap = t - last_hit_t - 1
if gap > max_rit_chiuso: max_rit_chiuso = gap
else:
# Ritardo iniziale (da inizio storia alla prima uscita)
gap = t
if gap > max_rit_chiuso: max_rit_chiuso = gap
last_hit_t = t
# 3. OUTPUT
# RS Display: Il massimo tra l'attuale RA e il record storico precedente
rs_display = max(ra, max_rit_chiuso)
# INC: Differenza tra RA e il vecchio record
inc = ra - max_rit_chiuso
return ra, rs_display, freq, inc
# ============================================================
# CALCOLO PERCENTILI
# ============================================================
def calcola_distribuzione(arr_colpi):
if len(arr_colpi) == 0:
return 0, 0, 0
med = np.percentile(arr_colpi, 50)
p80 = np.percentile(arr_colpi, 80)
p95 = np.percentile(arr_colpi, 95)
return int(med), int(p80), int(p95)
# ============================================================
# REPORTISTICA
# ============================================================
def stampa_report_completo_per_riga(binmat, stato, ev, perc, titolo="REPORT"):
SEP = "=" * 65
SUB = "-" * 65
print("\n" + SEP)
print(f" 🛑 {titolo}")
print(SEP)
print(f"📊 RISULTATO: {ev} Eventi Totali | {perc:.2f}% Positivi")
print(SUB)
print("⚙️ CONFIGURAZIONE:")
print(f" • Numeri nel Gruppo : {len(stato['gruppo'])}")
print(f" • Ruote Unite : {stato['num_ruote']}")
print(f" • Sorte Ricerca (Trig.) : {stato['sorte_ric']}")
print(f" • Soglia Ritardo : {stato['soglia_ric']}")
print(SUB)
print(f"🎱 LUNGHETTA TROVATA ({len(stato['gruppo'])}):")
print(f" {stato['gruppo']}")
print(SEP)
# BACKTEST
print(" 🔎 ANALISI BACKTEST (STORICO TOP 10)")
print(SUB)
real_combs = np.array([ [x-1 for x in stato['gruppo']] ], dtype=np.int16)
n_r = stato['num_ruote']
ruote_combs_list = list(combinations(range(len(RUOTE_NOMI)), n_r))
results = []
for rc in ruote_combs_list:
single_ruote_idx = np.array(rc, dtype=np.int32)
cnt, colpi_arr = scan_dettagliata_single_group(
binmat, real_combs, single_ruote_idx,
stato['sorte_ric'], SORTE_VERIFICA,
stato['soglia_ric'], TARGET_COLPI_MAX
)
if cnt > 0:
valid_colpi = colpi_arr[:cnt]
hits = valid_colpi[valid_colpi <= TARGET_COLPI_MAX]
max_pos = np.max(hits) if len(hits) > 0 else 0
max_abs = np.max(valid_colpi) if len(valid_colpi) > 0 else 0
ok = len(hits)
perc_loc = (ok / cnt) * 100
med, p80, p95 = calcola_distribuzione(hits if len(hits) > 0 else valid_colpi)
negativi = valid_colpi[valid_colpi > TARGET_COLPI_MAX]
dettagli_negativi = []
if len(negativi) > 0:
for n in negativi[:5]: dettagli_negativi.append(f"{n}")
if len(negativi) > 5: dettagli_negativi.append("..")
neg_str = ",".join(dettagli_negativi) if dettagli_negativi else "OK"
results.append({
"Ruote": "-".join([RUOTE_NOMI[x] for x in rc]),
"Ev": cnt, "Perc": perc_loc, "Out": neg_str,
"MaxPos": max_pos, "MaxAbs": max_abs,
"Med": med, "P80": p80, "P95": p95
})
df = pd.DataFrame(results)
if not df.empty:
df = df.sort_values(by=["Perc", "Ev"], ascending=False)
for idx, row in df.head(10).iterrows():
print(f"📌 {row['Ruote']} -> {row['Perc']:.2f}% ({row['Ev']} cs)")
print(f" ⏱️ Max In: {row['MaxPos']} | ⚠️ Max Abs: {row['MaxAbs']}")
if row['Perc'] < 100: print(f" ⚠️ Ritardatari: {row['Out']}")
else: print(f" ✅ PERFETTO")
print(SUB)
else:
print("Nessun evento trovato.")
# ATTIVE
print("\n 🔮 PREVISIONI ATTIVE (TUTTE)")
print(SUB)
found_any = False
for rc in ruote_combs_list:
single_ruote_idx = np.array(rc, dtype=np.int32)
active, colpi_passati = check_active_status(
binmat, real_combs, single_ruote_idx,
stato['sorte_ric'], SORTE_VERIFICA,
stato['soglia_ric'], TARGET_COLPI_MAX
)
if active:
found_any = True
colpi_rim = TARGET_COLPI_MAX - colpi_passati
nomi = "-".join([RUOTE_NOMI[x] for x in rc])
cnt, colpi_arr = scan_dettagliata_single_group(
binmat, real_combs, single_ruote_idx,
stato['sorte_ric'], SORTE_VERIFICA,
stato['soglia_ric'], TARGET_COLPI_MAX
)
perc_loc = 0
max_pos = 0
max_abs = 0
med, p80, p95 = 0, 0, 0
if cnt > 0:
valid = colpi_arr[:cnt]
hits = valid[valid <= TARGET_COLPI_MAX]
if len(hits) > 0: max_pos = np.max(hits)
max_abs = np.max(valid) if len(valid) > 0 else 0
perc_loc = (len(hits) / cnt) * 100
med, p80, p95 = calcola_distribuzione(hits if len(hits) > 0 else valid)
icon = "🟢" if perc_loc == 100 else "🟡"
print(f"{icon} {nomi}")
print(f" 🔥 Attiva da: {colpi_passati} | 🎲 Gioca per: {colpi_rim}")
print(f" 📊 Storico: {perc_loc:.1f}% | ⏱️ Max In: {max_pos} | ⚠️ Max Abs: {max_abs}")
print(f" 📉 Distrib: Med:{med} | p80:{p80} | p95:{p95}")
print(SUB)
if not found_any: print("ℹ️ Nessuna previsione attiva per questa riga.")
print(SEP + "\n")
# ============================================================
# CRUSCOTTO FINALE (FIX RA/RS/INC)
# ============================================================
def stampa_cruscotto_finale(binmat, risultati_globali):
if not risultati_globali: return
SEP = "=" * 70
print("\n\n")
print(SEP)
print(" 🚀 CRUSCOTTO FINALE DI SINTESI")
print(f" (Mostro SOLO previsioni urgenti con ≤ {SOGLIA_COLPI_RIMANENTI} colpi rimanenti)")
print(SEP)
previsioni_raccolte = []
global_max_in_target = 0
global_count_in = 0
global_max_absolute = 0
global_count_abs = 0
global_max_p95 = 0
best_historical_gem = None
min_max_abs_ever = 9999
for nome_job, stato in risultati_globali:
real_combs = np.array([ [x-1 for x in stato['gruppo']] ], dtype=np.int16)
n_r = stato['num_ruote']
ruote_combs_list = list(combinations(range(len(RUOTE_NOMI)), n_r))
for rc in ruote_combs_list:
single_ruote_idx = np.array(rc, dtype=np.int32)
cnt, colpi_arr = scan_dettagliata_single_group(
binmat, real_combs, single_ruote_idx,
stato['sorte_ric'], SORTE_VERIFICA,
stato['soglia_ric'], TARGET_COLPI_MAX
)
med, p80, p95 = 0, 0, 0
local_max_pos = 0
local_max_abs = 0
if cnt > 0:
valid = colpi_arr[:cnt]
hits = valid[valid <= TARGET_COLPI_MAX]
if len(hits) > 0:
local_max_in = np.max(hits)
local_count_in = np.sum(hits == local_max_in)
if local_max_in > global_max_in_target:
global_max_in_target = local_max_in
global_count_in = local_count_in
elif local_max_in == global_max_in_target:
global_count_in += local_count_in
local_max_pos = local_max_in
med, p80, p95 = calcola_distribuzione(hits)
if p95 > global_max_p95: global_max_p95 = p95
if len(valid) > 0:
local_max_abs = np.max(valid)
local_count_abs = np.sum(valid == local_max_abs)
if local_max_abs > global_max_absolute:
global_max_absolute = local_max_abs
global_count_abs = local_count_abs
elif local_max_abs == global_max_absolute:
global_count_abs += local_count_abs
# GEMMA STORICA
perc_totale = (np.sum(valid <= TARGET_COLPI_MAX) / cnt) * 100
if perc_totale >= 95 and cnt > 10:
if local_max_abs < min_max_abs_ever:
min_max_abs_ever = local_max_abs
ra_std, rs_std, fq_std, inc_std = calcola_statistiche_standard(binmat, real_combs, single_ruote_idx, SORTE_VERIFICA)
best_historical_gem = {
'ID': nome_job,
'Ruote': "-".join([RUOTE_NOMI[x] for x in rc]),
'Numeri': str(stato['gruppo']),
'MaxAbs': local_max_abs,
'Perc': perc_totale,
'Casi': cnt,
'RA_STD': ra_std, 'RS_STD': rs_std, 'FQ_STD': fq_std, 'INC_STD': inc_std
}
active, colpi_passati = check_active_status(
binmat, real_combs, single_ruote_idx,
stato['sorte_ric'], SORTE_VERIFICA,
stato['soglia_ric'], TARGET_COLPI_MAX
)
if active:
colpi_rim = TARGET_COLPI_MAX - colpi_passati
if colpi_rim <= SOGLIA_COLPI_RIMANENTI:
perc_loc = 0
if cnt > 0:
hits = valid[valid <= TARGET_COLPI_MAX]
perc_loc = (len(hits) / cnt) * 100
ra_std, rs_std, fq_std, inc_std = calcola_statistiche_standard(binmat, real_combs, single_ruote_idx, SORTE_VERIFICA)
previsioni_raccolte.append({
'ID': nome_job,
'Ruote': "-".join([RUOTE_NOMI[x] for x in rc]),
'Numeri': str(stato['gruppo']),
'Passati': colpi_passati,
'Rimasti': colpi_rim,
'Storico': perc_loc,
'Casi': cnt,
'MaxPos': local_max_pos,
'MaxAbs': local_max_abs,
'Med': med, 'P80': p80, 'P95': p95,
'RA_STD': ra_std, 'RS_STD': rs_std, 'FQ_STD': fq_std, 'INC_STD': inc_std
})
if not previsioni_raccolte:
print("ℹ️ Nessuna previsione soddisfa i criteri di alta urgenza.")
else:
df = pd.DataFrame(previsioni_raccolte)
df = df.sort_values(by=['Rimasti', 'Storico'], ascending=[True, False])
for idx, row in df.iterrows():
icona = "🔴" if row['Rimasti'] <= 1 else "⚡"
print(f"{icona} [{row['ID']}] {row['Ruote']} -> {row['Numeri']}")
print(f" ⏳ RIMANGONO: {row['Rimasti']} colpi (Attiva da {row['Passati']})")
print(f" 📊 Affidabilità: {row['Storico']:.1f}% su {row['Casi']} casi")
print(f" ⏱️ Max In: {row['MaxPos']} | ⚠️ Max Abs: {row['MaxAbs']}")
print(f" 📉 Distrib: Med:{row['Med']} | p80:{row['P80']} | p95:{row['P95']}")
# STAMPA STANDARD CON INC CALCOLATO CORRETTAMENTE
print(f" 📈 STANDARD: RA: {row['RA_STD']} | RS: {row['RS_STD']} | FQ: {row['FQ_STD']} | INC: {row['INC_STD']:+d}")
print("-" * 70)
print(SEP)
# RIEPILOGO CONVERGENZE
print("\n\n")
print("=" * 70)
print(" 🧩 RIEPILOGO CONVERGENZE (Ruote Unite)")
print("=" * 70)
groups = {}
for p in previsioni_raccolte:
key = (p['Numeri'], p['Rimasti'])
ruote_set = set(p['Ruote'].split('-'))
if key not in groups:
groups[key] = []
groups[key].append(ruote_set)
any_agg_found = False
sorted_keys = sorted(groups.keys(), key=lambda x: x[1])
for k in sorted_keys:
nums_str, rimasti = k
sets_list = groups[k]
if len(sets_list) < 2: continue
clusters = []
for s in sets_list:
candidates_idx = []
for i, c in enumerate(clusters):
if not s.isdisjoint(c['ruote']):
candidates_idx.append(i)
if not candidates_idx:
clusters.append({'ruote': s, 'count': 1})
else:
base_idx = candidates_idx[0]
clusters[base_idx]['ruote'].update(s)
clusters[base_idx]['count'] += 1
for other_idx in sorted(candidates_idx[1:], reverse=True):
clusters[base_idx]['ruote'].update(clusters[other_idx]['ruote'])
clusters[base_idx]['count'] += clusters[other_idx]['count']
clusters.pop(other_idx)
all_ruote_ordered = ['BA','CA','FI','GE','MI','NA','PA','RO','TO','VE']
for c in clusters:
if c['count'] >= 2:
any_agg_found = True
rr = sorted(list(c['ruote']), key=lambda x: all_ruote_ordered.index(x) if x in all_ruote_ordered else 99)
ruote_str_compact = "".join(rr)
print(f"🔗 {ruote_str_compact} {nums_str} -> ⏳ {rimasti} colpi teorici")
print("-" * 70)
if not any_agg_found:
print("ℹ️ Nessuna convergenza complessa trovata.")
# RECORD GLOBALI
print("\n\n")
print("=" * 70)
print(" 🏆 RECORD STORICI DELL'INTERA RICERCA")
print("=" * 70)
print(f" • ⏱️ Max Ritardo (In Target) : {global_max_in_target} (verificato {global_count_in} volte)")
print(f" • ⚠️ Max Ritardo Assoluto : {global_max_absolute} (verificato {global_count_abs} volte)")
print(f" • 📈 Max P95 Globale : {global_max_p95} (Il 95% dei casi esce entro questo valore)")
if best_historical_gem:
print("-" * 70)
print(" 🌟 LA GEMMA STORICA (Miglior Max Assoluto registrato)")
print(f" [{best_historical_gem['ID']}] {best_historical_gem['Ruote']} -> {best_historical_gem['Numeri']}")
print(f" ⏱️ Max Assoluto Storico: {best_historical_gem['MaxAbs']} (su {best_historical_gem['Casi']} casi, {best_historical_gem['Perc']:.1f}%)")
print(f" 📊 Dati Std: RA: {best_historical_gem['RA_STD']} | RS: {best_historical_gem['RS_STD']} | FQ: {best_historical_gem['FQ_STD']} | INC: {best_historical_gem['INC_STD']:+d}")
print("=" * 70)
# LA GEMMA (ATTUALE)
print("\n\n")
print("=" * 70)
print(" 💎 REPORT RICERCA: LA GEMMA ATTUALE (Perfect Match)")
print(f" Criteria: Casi>={TARGET_CASI_MINIMI} | Perc>={TARGET_PERC_MINIMA}% | Rimasti<={SOGLIA_COLPI_RIMANENTI}")
print("=" * 70)
gemme = []
if previsioni_raccolte:
df_gems = pd.DataFrame(previsioni_raccolte)
gemme = df_gems[
(df_gems['Casi'] >= TARGET_CASI_MINIMI) &
(df_gems['Storico'] >= TARGET_PERC_MINIMA) &
(df_gems['Rimasti'] <= SOGLIA_COLPI_RIMANENTI)
]
if len(gemme) > 0:
print(f" 🎉 TROVATE {len(gemme)} CONFIGURAZIONI PERFETTE (GEMME)!\n")
for idx, row in gemme.iterrows():
print(f" 💎 [{row['ID']}] {row['Ruote']} -> {row['Numeri']}")
print(f" ✅ {row['Storico']:.1f}% su {row['Casi']} casi")
print(f" ⏳ {row['Rimasti']} colpi rimanenti")
print(f" ⏱️ Max In: {row['MaxPos']} | ⚠️ Max Abs: {row['MaxAbs']}")
print(f" 📉 Distrib: Med:{row['Med']} | p80:{row['P80']} | p95:{row['P95']}")
print(f" 📊 Dati Std: RA: {row['RA_STD']} | RS: {row['RS_STD']} | FQ: {row['FQ_STD']} | INC: {row['INC_STD']:+d}")
print("-" * 70)
else:
print(" 🌑 NESSUNA GEMMA TROVATA che soddisfi TUTTI i requisiti contemporaneamente.")
print("=" * 70)
# ============================================================
# OPTIMIZATION FUNCTION
# ============================================================
def run_optimization_for_pool(binmat, pool_numeri, id_riga_display):
SEP = "=" * 55
print("\n" + SEP)
print(f" ⚡ AVVIO ANALISI: {id_riga_display}")
print(f" 🎱 Numeri: {pool_numeri}")
print(SEP)
start_size = random.randint(MIN_NUMERI_GRUPPO, MAX_NUMERI_GRUPPO)
if len(pool_numeri) < start_size: start_size = len(pool_numeri)
current_stato = {
'gruppo': sorted(random.sample(pool_numeri, start_size)),
'soglia_ric': random.randint(MIN_SOGLIA, MAX_SOGLIA),
'sorte_ric': random.choice(POSSIBILI_SORTI_RICERCA),
'num_ruote': random.randint(MIN_RUOTE_UNITE, MAX_RUOTE_UNITE)
}
cur_ev, cur_ok, cur_perc = valuta_stato(binmat, current_stato)
best_stato = copy.deepcopy(current_stato)
best_ev, best_ok, best_perc = cur_ev, cur_ok, cur_perc
pbar = tqdm(desc=f"Opt {id_riga_display}", unit="it", dynamic_ncols=True)
stagnation_zero = 0
start_time = time.time()
timeout_seconds = TIMEOUT_MINUTI_PER_RIGA * 60
try:
while True:
# CHECK TIMEOUT
if time.time() - start_time > timeout_seconds:
pbar.close()
stampa_report_completo_per_riga(binmat, best_stato, best_ev, best_perc, f"TIMEOUT {id_riga_display}")
return best_stato
# CHECK TARGETS PRIMARI
targets_met = (best_ev >= TARGET_CASI_MINIMI and best_perc >= TARGET_PERC_MINIMA)
if targets_met:
stop_optimization = False
if MODALITA_STOP == 'STANDARD':
stop_optimization = True
msg_vittoria = f"VITTORIA (STANDARD) {id_riga_display}"
elif MODALITA_STOP == 'GEMMA':
real_combs = np.array([ [x-1 for x in best_stato['gruppo']] ], dtype=np.int16)
n_r = best_stato['num_ruote']
ruote_combs = list(combinations(range(len(RUOTE_NOMI)), n_r))
found_gem = False
for rc in ruote_combs:
single_ruote_idx = np.array(rc, dtype=np.int32)
active, colpi_passati = check_active_status(
binmat, real_combs, single_ruote_idx,
best_stato['sorte_ric'], SORTE_VERIFICA,
best_stato['soglia_ric'], TARGET_COLPI_MAX
)
if active:
rimanenti = TARGET_COLPI_MAX - colpi_passati
if rimanenti <= SOGLIA_COLPI_RIMANENTI:
found_gem = True
break
if found_gem:
stop_optimization = True
msg_vittoria = f"VITTORIA (GEMMA TROVATA) {id_riga_display}"
if stop_optimization:
pbar.close()
play_sound()
stampa_report_completo_per_riga(binmat, best_stato, best_ev, best_perc, msg_vittoria)
return best_stato
pbar.update(1)
pbar.set_description(f"Best: {best_perc:.1f}% ({best_ev})")
pbar.set_postfix(
L=len(new_stato['gruppo'] if 'new_stato' in locals() else current_stato['gruppo']),
R=current_stato['num_ruote'],
Sg=current_stato['soglia_ric'],
LastEv=cur_ev
)
if best_ev == 0:
stagnation_zero += 1
if stagnation_zero > 50:
current_stato['soglia_ric'] = random.randint(MIN_SOGLIA, MAX_SOGLIA)
stagnation_zero = 0
new_stato = copy.deepcopy(current_stato)
dice = random.random()
if dice < 0.25:
r2 = random.random()
if r2 < 0.33:
new_stato['soglia_ric'] += random.choice([-10, -5, 5, 10])
new_stato['soglia_ric'] = max(MIN_SOGLIA, min(MAX_SOGLIA, new_stato['soglia_ric']))
elif r2 < 0.66:
new_stato['num_ruote'] += random.choice([-1, 1])
new_stato['num_ruote'] = max(MIN_RUOTE_UNITE, min(MAX_RUOTE_UNITE, new_stato['num_ruote']))
else:
new_stato['sorte_ric'] = random.choice(POSSIBILI_SORTI_RICERCA)
elif dice < 0.45:
curr_len = len(new_stato['gruppo'])
action = 0
if curr_len < MAX_NUMERI_GRUPPO and curr_len > MIN_NUMERI_GRUPPO: action = random.choice([-1, 1])
elif curr_len == MIN_NUMERI_GRUPPO and curr_len < MAX_NUMERI_GRUPPO: action = 1
elif curr_len == MAX_NUMERI_GRUPPO and curr_len > MIN_NUMERI_GRUPPO: action = -1
if action == 1:
pool = [x for x in pool_numeri if x not in new_stato['gruppo']]
if pool:
new_stato['gruppo'].append(random.choice(pool))
new_stato['gruppo'].sort()
elif action == -1:
new_stato['gruppo'].pop(random.randint(0, len(new_stato['gruppo'])-1))
else:
n_chg = 1
if cur_ev < TARGET_CASI_MINIMI / 2: n_chg = 2
for _ in range(n_chg):
if len(new_stato['gruppo']) > 0: new_stato['gruppo'].pop(random.randint(0, len(new_stato['gruppo'])-1))
pool = [n for n in pool_numeri if n not in new_stato['gruppo']]
while len(new_stato['gruppo']) < len(current_stato['gruppo']):
if not pool: break
r = random.choice(pool)
new_stato['gruppo'].append(r)
pool.remove(r)
new_stato['gruppo'].sort()
ev, ok, perc = valuta_stato(binmat, new_stato)
accepted = False
if ev > 0:
if perc > best_perc: accepted = True
elif perc == best_perc and ev > best_ev: accepted = True
elif random.random() < 0.05: accepted = True
if accepted:
current_stato = new_stato
if perc > best_perc or (perc == best_perc and ev > best_ev):
best_stato = copy.deepcopy(new_stato)
best_ev, best_ok, best_perc = ev, ok, perc
tqdm.write(f"★ {ev} ({perc:.1f}%) | L:{len(new_stato['gruppo'])} R{new_stato['num_ruote']} Sg{new_stato['soglia_ric']}")
cur_ev = ev
except KeyboardInterrupt:
pbar.close()
print(f"⚠️ STOP MANUALE SU {id_riga_display}")
stampa_report_completo_per_riga(binmat, best_stato, best_ev, best_perc, f"INTERROTTO {id_riga_display}")
return best_stato
# ============================================================
# MAIN
# ============================================================
def main_autonomous():
binmat, num_estr = carica_bin()
print("\n" + "="*60)
print(" 🤖 BOT INTELLIGENTE v40_gold - FINAL FIX")
print("="*60)
# 1. INFO DATI
print("📂 DATI ESTRAZIONI:")
print(f" • Database: {PATH_RUOTE}")
print(f" • Estrazioni caricate: {num_estr}")
print("-" * 60)
modalita_effettiva = 'FILE'
if TIPO_INPUT.strip().upper() == 'MANUALE':
modalita_effettiva = 'MANUALE'
# 2. INFO INPUT
print("📊 INPUT MATRICE:")
if modalita_effettiva == 'FILE':
print(f" • Modalità: FILE")
print(f" • Path: {PATH_FILE_MATRICE}")
else:
print(f" • Modalità: LISTA MANUALE (Hard-Coded)")
print(f" • Righe caricate: {len(MATRICE_MANUALE)}")
print(f" • Strategia Riga: {RIGA_DA_ANALIZZARE}")
print(f" • Modalità Stop: {MODALITA_STOP} ", end="")
if MODALITA_STOP == 'GEMMA': print("(Cerca Casi + % + Colpi)")
else: print("(Cerca solo Casi + %)")
print("-" * 60)
# 3. PARAMETRI
print("⚙️ PARAMETRI DI RICERCA:")
print(f" • Lunghezza Gruppo: {MIN_NUMERI_GRUPPO} - {MAX_NUMERI_GRUPPO}")
print(f" • Ruote Unite: {MIN_RUOTE_UNITE} - {MAX_RUOTE_UNITE}")
print(f" • Range Ritardo: {MIN_SOGLIA} - {MAX_SOGLIA}")
print(f" • Timeout per Riga: {TIMEOUT_MINUTI_PER_RIGA} minuti")
print("-" * 60)
# 4. OBIETTIVI
print("🏆 OBIETTIVI & OUTPUT:")
print(f" • Target Minimo: >{TARGET_CASI_MINIMI} Eventi | {TARGET_PERC_MINIMA}% Ok")
print(f" • Filtro Finale: Mostra solo previsioni con ≤ {SOGLIA_COLPI_RIMANENTI} colpi.")
print("="*60 + "\n")
matrice_completa = []
if modalita_effettiva == 'FILE':
matrice_completa = leggi_file_matrice(PATH_FILE_MATRICE)
else:
matrice_completa = MATRICE_MANUALE
cleaned = []
for row in matrice_completa:
if isinstance(row, list):
cleaned.append(sorted(list(set(row))))
matrice_completa = cleaned
if not matrice_completa:
print("⚠️ ERRORE: Nessun dato trovato in input (o lista manuale vuota). Uso gruppo backup.")
matrice_completa = [GRUPPO_BASE_BACKUP]
pools_da_analizzare = []
if RIGA_DA_ANALIZZARE == -1:
pool_totale = set()
for r in matrice_completa: pool_totale.update(r)
pools_da_analizzare.append(("ALL_MERGED", sorted(list(pool_totale))))
elif RIGA_DA_ANALIZZARE == -2:
for i, r in enumerate(matrice_completa): pools_da_analizzare.append((f"RIGA_{i}", r))
elif 0 <= RIGA_DA_ANALIZZARE < len(matrice_completa):
pools_da_analizzare.append((f"RIGA_{RIGA_DA_ANALIZZARE}", matrice_completa[RIGA_DA_ANALIZZARE]))
else:
pools_da_analizzare.append(("RIGA_0", matrice_completa[0]))
risultati_globali = []
try:
for nome_job, pool in pools_da_analizzare:
best_state = run_optimization_for_pool(binmat, pool, nome_job)
if best_state:
risultati_globali.append((nome_job, best_state))
time.sleep(1)
play_final_sound()
stampa_cruscotto_finale(binmat, risultati_globali)
except KeyboardInterrupt:
print("\n🛑 INTERRUZIONE. Genero report riassuntivo del lavoro svolto...")
stampa_cruscotto_finale(binmat, risultati_globali)
if __name__ == "__main__":
main_autonomous()