Files
NoIdle/CLIENTE_CORRIGIDO.py
root 6086c13be7 feat: Implementação completa do NoIdle - Cliente, Backend e Scripts
- Cliente Windows com modo silencioso e auto-start robusto
- Backend Node.js + API REST
- Frontend Next.js + Dashboard
- Scripts PowerShell de configuração e diagnóstico
- Documentação completa
- Build scripts para Windows e Linux
- Solução de auto-start após reinicialização

Resolução do problema: Cliente não voltava ativo após reboot
Solução: Registro do Windows + Task Scheduler + Modo silencioso
2025-11-16 22:56:35 +00:00

839 lines
29 KiB
Python

import os
import sys
import json
import time
import psutil
import requests
import win32gui
import win32process
import win32api
import win32evtlog
import win32evtlogutil
import winreg
import sqlite3
import shutil
from datetime import datetime
from pathlib import Path
import pystray
from PIL import Image, ImageDraw
from threading import Thread
import schedule
import atexit
import argparse
import subprocess
API_URL = "https://admin.noidle.tech/api"
CONFIG_FILE = Path(os.getenv('APPDATA')) / 'NoIdle' / 'config.json'
INSTALL_DIR = Path(os.getenv('ProgramFiles')) / 'NoIdle'
INSTALL_EXE = INSTALL_DIR / 'NoIdle.exe'
MONITOR_INTERVAL = 10 # 10 segundos (mais frequente para capturar mudanças)
IDLE_THRESHOLD = 30 # 30 segundos para considerar ocioso
class Config:
def __init__(self):
self.config_dir = CONFIG_FILE.parent
self.config_dir.mkdir(parents=True, exist_ok=True)
self.data = self.load()
def load(self):
if CONFIG_FILE.exists():
with open(CONFIG_FILE, 'r') as f:
return json.load(f)
return {}
def save(self):
with open(CONFIG_FILE, 'w') as f:
json.dump(self.data, f, indent=2)
def get(self, key, default=None):
return self.data.get(key, default)
def set(self, key, value):
self.data[key] = value
self.save()
def has(self, key):
return key in self.data
class ActivationWindow:
def __init__(self):
self.activation_key = None
self.result = None
def show(self):
import tkinter as tk
from tkinter import messagebox
root = tk.Tk()
root.title("NoIdle - Ativacao")
root.geometry("500x300")
root.resizable(False, False)
root.eval('tk::PlaceWindow . center')
title = tk.Label(root, text="NoIdle", font=("Arial", 24, "bold"), fg="#2563eb")
title.pack(pady=20)
subtitle = tk.Label(root, text="Zero Idle, Maximum Productivity", font=("Arial", 10))
subtitle.pack()
instruction = tk.Label(root, text="Insira a chave de ativacao:", font=("Arial", 10))
instruction.pack(pady=20)
key_var = tk.StringVar()
entry = tk.Entry(root, textvariable=key_var, font=("Courier", 12), width=40, justify="center")
entry.pack(pady=10)
entry.focus()
error_label = tk.Label(root, text="", fg="red", font=("Arial", 9))
error_label.pack()
def on_activate():
key = key_var.get().strip()
if not key:
error_label.config(text="Insira uma chave")
return
error_label.config(text="Ativando...")
root.update()
try:
result = activate_device(key)
if result.get('success'):
self.activation_key = key
self.result = result
messagebox.showinfo("Sucesso", "Dispositivo ativado com sucesso!")
root.destroy()
else:
error_label.config(text=result.get('message', 'Erro ao ativar'))
except Exception as e:
error_label.config(text=str(e))
btn = tk.Button(root, text="Ativar", command=on_activate, bg="#2563eb", fg="white", font=("Arial", 11, "bold"), padx=30, pady=10)
btn.pack(pady=20)
entry.bind('<Return>', lambda e: on_activate())
root.mainloop()
return self.result
def activate_device(activation_key):
try:
import platform
response = requests.post(
f"{API_URL}/devices/activate",
json={
'activation_key': activation_key,
'device_name': os.environ.get('COMPUTERNAME', 'Unknown'),
'hostname': os.environ.get('COMPUTERNAME', 'Unknown'),
'username': os.environ.get('USERNAME', 'Unknown')
},
timeout=10
)
result = response.json()
if result.get('success') and 'device_id' in result:
return {'success': True, 'device_id': result['device_id']}
return result
except Exception as e:
return {'success': False, 'message': str(e)}
def get_chrome_history():
"""Captura histórico recente do Chrome"""
try:
username = os.environ.get('USERNAME')
history_db = Path(f"C:/Users/{username}/AppData/Local/Google/Chrome/User Data/Default/History")
if not history_db.exists():
return []
# Copiar database porque Chrome mantém lock
temp_db = Path(os.getenv('TEMP')) / 'chrome_history_temp.db'
try:
shutil.copy2(history_db, temp_db)
except Exception as e:
print(f"Erro ao copiar histórico do Chrome: {e}")
return []
conn = sqlite3.connect(str(temp_db))
cursor = conn.cursor()
# Pegar URLs dos últimos 2 minutos
try:
cursor.execute("""
SELECT url, title, last_visit_time
FROM urls
WHERE last_visit_time > (strftime('%s','now') - 120) * 1000000 + 11644473600000000
ORDER BY last_visit_time DESC
LIMIT 20
""")
results = cursor.fetchall()
except Exception as e:
print(f"Erro ao ler histórico do Chrome: {e}")
results = []
finally:
conn.close()
temp_db.unlink(missing_ok=True)
urls = []
for url, title, timestamp in results:
if url and url.strip():
urls.append({
'url': url,
'title': title or url,
'browser': 'Chrome'
})
return urls
except Exception as e:
print(f"Erro ao ler Chrome: {e}")
return []
def get_edge_history():
"""Captura histórico recente do Edge"""
try:
username = os.environ.get('USERNAME')
history_db = Path(f"C:/Users/{username}/AppData/Local/Microsoft/Edge/User Data/Default/History")
if not history_db.exists():
return []
temp_db = Path(os.getenv('TEMP')) / 'edge_history_temp.db'
try:
shutil.copy2(history_db, temp_db)
except Exception as e:
print(f"Erro ao copiar histórico do Edge: {e}")
return []
conn = sqlite3.connect(str(temp_db))
cursor = conn.cursor()
try:
cursor.execute("""
SELECT url, title, last_visit_time
FROM urls
WHERE last_visit_time > (strftime('%s','now') - 120) * 1000000 + 11644473600000000
ORDER BY last_visit_time DESC
LIMIT 20
""")
results = cursor.fetchall()
except Exception as e:
print(f"Erro ao ler histórico do Edge: {e}")
results = []
finally:
conn.close()
temp_db.unlink(missing_ok=True)
urls = []
for url, title, timestamp in results:
if url and url.strip():
urls.append({
'url': url,
'title': title or url,
'browser': 'Edge'
})
return urls
except Exception as e:
print(f"Erro ao ler Edge: {e}")
return []
def get_firefox_history():
"""Captura histórico recente do Firefox"""
try:
username = os.environ.get('USERNAME')
# Firefox pode ter múltiplos perfis
firefox_profiles = Path(f"C:/Users/{username}/AppData/Roaming/Mozilla/Firefox/Profiles")
if not firefox_profiles.exists():
return []
urls = []
# Procurar em todos os perfis
for profile_dir in firefox_profiles.iterdir():
if not profile_dir.is_dir():
continue
places_db = profile_dir / 'places.sqlite'
if not places_db.exists():
continue
# Copiar database porque Firefox mantém lock
temp_db = Path(os.getenv('TEMP')) / f'firefox_history_temp_{profile_dir.name}.db'
try:
shutil.copy2(places_db, temp_db)
except Exception as e:
print(f"Erro ao copiar histórico do Firefox: {e}")
continue
conn = sqlite3.connect(str(temp_db))
cursor = conn.cursor()
try:
# Firefox usa formato diferente: moz_places e moz_historyvisits
# last_visit_date está em microssegundos desde epoch
two_minutes_ago = (time.time() - 120) * 1000000
cursor.execute("""
SELECT p.url, p.title, MAX(v.visit_date) as last_visit
FROM moz_places p
JOIN moz_historyvisits v ON p.id = v.place_id
WHERE v.visit_date > ?
GROUP BY p.id
ORDER BY last_visit DESC
LIMIT 20
""", (two_minutes_ago,))
results = cursor.fetchall()
for url, title, timestamp in results:
if url and url.strip():
urls.append({
'url': url,
'title': title or url,
'browser': 'Firefox'
})
except Exception as e:
print(f"Erro ao ler histórico do Firefox: {e}")
finally:
conn.close()
temp_db.unlink(missing_ok=True)
return urls
except Exception as e:
print(f"Erro ao ler Firefox: {e}")
return []
def send_activity_log(device_id, window_title, application_name, idle_time_seconds=0, urls=None):
try:
data = {
'device_id': device_id,
'window_title': window_title,
'application_name': application_name,
'idle_time_seconds': idle_time_seconds
}
# Adicionar URLs se houver
if urls and len(urls) > 0:
data['urls'] = urls
response = requests.post(
f"{API_URL}/activity/log",
json=data,
timeout=10
)
if response.status_code == 200:
print(f"✅ Atividade enviada: {application_name} - {window_title[:50]}")
else:
print(f"⚠️ Erro ao enviar atividade: {response.status_code}")
except Exception as e:
print(f"❌ Erro ao enviar log: {e}")
def send_session_event(device_id, event_type):
"""Envia evento de login/logout"""
try:
response = requests.post(
f"{API_URL}/activity/session",
json={
'device_id': device_id,
'event_type': event_type,
'username': os.environ.get('USERNAME', 'Unknown')
},
timeout=10
)
if response.status_code == 200:
print(f"✅ Evento de sessão enviado: {event_type}")
else:
print(f"⚠️ Erro ao enviar evento: {response.status_code}")
except Exception as e:
print(f"❌ Erro ao enviar evento de sessão: {e}")
def get_active_window_info():
"""Captura informações da janela ativa - CORRIGIDO"""
try:
# Obter janela ativa
hwnd = win32gui.GetForegroundWindow()
if hwnd == 0:
return None
# Obter título da janela
window_title = win32gui.GetWindowText(hwnd)
# Obter processo
try:
_, pid = win32process.GetWindowThreadProcessId(hwnd)
process = psutil.Process(pid)
process_name = process.name()
# Garantir que tenha .exe
if not process_name.lower().endswith('.exe'):
process_name = f"{process_name}.exe"
# Se não conseguiu obter título, usar nome do processo
if not window_title or window_title.strip() == '':
window_title = process_name
# NUNCA retornar "System Idle" ou "[IDLE]" se há uma janela ativa
if window_title.lower() in ['system idle', '[idle]', 'idle']:
window_title = process_name
return {
'process_name': process_name,
'window_title': window_title
}
except psutil.NoSuchProcess:
# Processo não existe mais, mas ainda temos o título
if window_title and window_title.strip():
return {
'process_name': 'Unknown.exe',
'window_title': window_title
}
return None
except Exception as e:
print(f"Erro ao obter processo: {e}")
# Se conseguiu o título, usar mesmo sem processo
if window_title and window_title.strip():
return {
'process_name': 'Unknown.exe',
'window_title': window_title
}
return None
except Exception as e:
print(f"Erro ao obter janela ativa: {e}")
return None
def get_idle_time():
"""Calcula tempo ocioso em segundos"""
try:
last_input_info = win32api.GetLastInputInfo()
tick_count = win32api.GetTickCount()
idle_time = (tick_count - last_input_info) / 1000.0
return idle_time
except Exception as e:
print(f"Erro ao calcular tempo ocioso: {e}")
return 0
def monitor_activity():
"""Monitora atividade + URLs de navegadores - CORRIGIDO"""
config = Config()
device_id = config.get('device_id')
if not device_id:
print("⚠️ Device ID não configurado")
return
# Calcular tempo ocioso
idle_time = get_idle_time()
idle_time_seconds = int(idle_time)
# Tentar capturar janela ativa
window_info = get_active_window_info()
# Coletar URLs dos navegadores
urls = []
try:
urls.extend(get_chrome_history())
urls.extend(get_edge_history())
urls.extend(get_firefox_history())
except Exception as e:
print(f"Erro ao coletar URLs: {e}")
# LÓGICA CORRIGIDA: Sempre tentar enviar dados reais
if window_info and window_info.get('window_title') and window_info.get('process_name'):
# Temos dados reais da janela ativa
window_title = window_info['window_title']
application_name = window_info['process_name']
# Só considerar ocioso se realmente estiver ocioso por muito tempo
if idle_time_seconds > IDLE_THRESHOLD:
# Mesmo ocioso, enviar dados da janela (pode estar minimizada)
send_activity_log(
device_id,
window_title,
application_name,
idle_time_seconds,
urls if urls else None
)
else:
# Usuário ativo, enviar dados reais
send_activity_log(
device_id,
window_title,
application_name,
0,
urls if urls else None
)
else:
# Não conseguiu capturar janela ativa
# Só enviar "System Idle" se realmente estiver ocioso
if idle_time_seconds > IDLE_THRESHOLD:
send_activity_log(
device_id,
'[IDLE]',
'System Idle',
idle_time_seconds,
None
)
else:
# Tentar novamente na próxima iteração
print("⚠️ Não foi possível capturar janela ativa, mas usuário não está ocioso")
def send_heartbeat(device_id):
"""Envia heartbeat para manter dispositivo ativo"""
try:
response = requests.post(
f"{API_URL}/devices/heartbeat",
json={'device_id': device_id},
timeout=10
)
if response.status_code == 200:
print(f"💓 Heartbeat enviado")
except Exception as e:
print(f"❌ Erro ao enviar heartbeat: {e}")
def on_logon():
config = Config()
device_id = config.get('device_id')
if device_id:
send_session_event(device_id, 'logon')
def on_logoff():
config = Config()
device_id = config.get('device_id')
if device_id:
send_session_event(device_id, 'logoff')
def monitor_windows_events():
"""Monitora eventos de logon/logoff do Windows Event Log"""
config = Config()
device_id = config.get('device_id')
if not device_id:
return
try:
# Abrir o log de segurança do Windows
hand = win32evtlog.OpenEventLog(None, "Security")
# Ler eventos recentes (últimos 100 eventos)
flags = win32evtlog.EVENTLOG_BACKWARDS_READ | win32evtlog.EVENTLOG_SEQUENTIAL_READ
events, _ = win32evtlog.ReadEventLog(hand, flags, 0, 100)
# IDs de eventos do Windows para logon/logoff
# 4624 = Logon bem-sucedido
# 4634 = Logoff bem-sucedido
# 4647 = Logoff iniciado pelo usuário
# 4625 = Falha no logon (não vamos usar)
logon_event_ids = [4624]
logoff_event_ids = [4634, 4647]
last_checked_time = config.get('last_event_check_time', 0)
current_time = time.time()
new_events_found = False
for event in events:
try:
event_id = event.EventID
event_time = event.TimeGenerated.timestamp()
# Só processar eventos novos (desde a última verificação)
if event_time <= last_checked_time:
continue
# Verificar se é evento de logon
if event_id in logon_event_ids:
username = None
# Tentar extrair username do evento
try:
event_strings = win32evtlogutil.SafeFormatMessage(event, "Security")
# Procurar por padrões comuns no evento
if 'Account Name:' in event_strings or 'Subject:' in event_strings:
# Extrair username (simplificado)
for line in event_strings.split('\n'):
if 'Account Name:' in line or 'Account:' in line:
parts = line.split(':')
if len(parts) > 1:
username = parts[-1].strip()
break
except:
pass
if not username:
username = os.environ.get('USERNAME', 'Unknown')
print(f"🔐 Evento de LOGON detectado: {username}")
send_session_event(device_id, 'logon')
new_events_found = True
# Verificar se é evento de logoff
elif event_id in logoff_event_ids:
username = os.environ.get('USERNAME', 'Unknown')
print(f"🔐 Evento de LOGOFF detectado: {username}")
send_session_event(device_id, 'logoff')
new_events_found = True
except Exception as e:
# Ignorar erros em eventos individuais
continue
win32evtlog.CloseEventLog(hand)
# Atualizar timestamp da última verificação
if new_events_found or current_time - last_checked_time > 60:
config.set('last_event_check_time', current_time)
except Exception as e:
# Se não conseguir ler eventos (pode ser falta de permissão), usar método alternativo
print(f"⚠️ Não foi possível ler Event Log: {e}")
print(f"⚠️ Tentando método alternativo...")
# Método alternativo: verificar mudanças na sessão atual
try:
current_session_id = win32api.GetCurrentProcess()
last_session_id = config.get('last_session_id')
if last_session_id != str(current_session_id):
if last_session_id:
# Sessão mudou, pode ser logon
print(f"🔐 Possível evento de LOGON detectado (mudança de sessão)")
send_session_event(device_id, 'logon')
config.set('last_session_id', str(current_session_id))
except:
pass
def install_to_program_files():
"""Instala o executável em Program Files"""
try:
# Verificar se já está instalado
current_exe = Path(sys.executable)
if INSTALL_EXE.exists() and current_exe.exists():
try:
if INSTALL_EXE.samefile(current_exe):
print(f"✅ Já instalado em: {INSTALL_DIR}")
return True
except (OSError, ValueError):
# Arquivos diferentes ou erro ao comparar
pass
# Criar diretório se não existir
INSTALL_DIR.mkdir(parents=True, exist_ok=True)
# Copiar executável atual para Program Files
current_exe = Path(sys.executable)
if current_exe.exists():
print(f"📦 Instalando em: {INSTALL_DIR}")
shutil.copy2(current_exe, INSTALL_EXE)
print(f"✅ Instalado com sucesso: {INSTALL_EXE}")
return True
else:
print(f"⚠️ Executável atual não encontrado: {current_exe}")
return False
except PermissionError:
print(f"❌ Erro de permissão ao instalar. Execute como Administrador.")
return False
except Exception as e:
print(f"❌ Erro ao instalar: {e}")
return False
def set_startup_registry():
"""Configura para iniciar automaticamente com o Windows"""
try:
# Caminho do executável instalado COM parâmetro --silent
exe_path = f'"{str(INSTALL_EXE)}" --silent'
# Abrir chave do registro para inicialização automática
key = winreg.OpenKey(
winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Run",
0,
winreg.KEY_SET_VALUE
)
# Adicionar entrada
winreg.SetValueEx(key, "NoIdle", 0, winreg.REG_SZ, exe_path)
winreg.CloseKey(key)
print(f"✅ Configurado para iniciar automaticamente no boot")
# BACKUP: Criar Task Scheduler também (mais confiável)
try:
create_task_scheduler()
except Exception as e:
print(f"⚠️ Task Scheduler não configurado: {e}")
return True
except PermissionError:
print(f"⚠️ Erro de permissão ao configurar inicialização automática")
return False
except Exception as e:
print(f"❌ Erro ao configurar inicialização: {e}")
return False
def create_task_scheduler():
"""Cria uma tarefa no Task Scheduler para maior confiabilidade"""
try:
task_name = "NoIdle_Monitor"
exe_path = str(INSTALL_EXE)
# Comando PowerShell para criar tarefa agendada
ps_command = f'''
$action = New-ScheduledTaskAction -Execute '"{exe_path}"' -Argument '--silent'
$trigger = New-ScheduledTaskTrigger -AtLogOn -User $env:USERNAME
$principal = New-ScheduledTaskPrincipal -UserId $env:USERNAME -LogonType Interactive -RunLevel Limited
$settings = New-ScheduledTaskSettingsSet -AllowStartIfOnBatteries -DontStopIfGoingOnBatteries -StartWhenAvailable -RestartCount 3 -RestartInterval (New-TimeSpan -Minutes 1)
Register-ScheduledTask -TaskName "{task_name}" -Action $action -Trigger $trigger -Principal $principal -Settings $settings -Force
'''
# Executar PowerShell
result = subprocess.run(
['powershell', '-Command', ps_command],
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
print(f"✅ Task Scheduler configurado: {task_name}")
return True
else:
print(f"⚠️ Erro ao criar Task Scheduler: {result.stderr}")
return False
except Exception as e:
print(f"⚠️ Não foi possível criar Task Scheduler: {e}")
return False
def check_and_install():
"""Verifica se precisa instalar e configura inicialização automática"""
config = Config()
# Verificar se já foi instalado
if config.get('installed', False):
# Verificar se o executável ainda existe
if INSTALL_EXE.exists():
return True
else:
# Reinstalar se foi removido
config.set('installed', False)
# Tentar instalar
if install_to_program_files():
# Configurar inicialização automática
set_startup_registry()
config.set('installed', True)
# Se não estamos rodando do Program Files, iniciar a versão instalada
current_exe = Path(sys.executable)
if INSTALL_EXE.exists() and current_exe.exists():
try:
if not INSTALL_EXE.samefile(current_exe):
print(f"🔄 Reiniciando versão instalada...")
try:
import subprocess
subprocess.Popen([str(INSTALL_EXE)], shell=False)
sys.exit(0)
except Exception as e:
print(f"⚠️ Erro ao iniciar versão instalada: {e}")
except (OSError, ValueError):
# Erro ao comparar, assumir que são diferentes
if INSTALL_EXE.exists():
print(f"🔄 Reiniciando versão instalada...")
try:
import subprocess
subprocess.Popen([str(INSTALL_EXE)], shell=False)
sys.exit(0)
except Exception as e:
print(f"⚠️ Erro ao iniciar versão instalada: {e}")
return True
else:
print(f"⚠️ Instalação falhou, continuando da localização atual")
return False
def create_image():
img = Image.new('RGB', (64, 64), color='#2563eb')
draw = ImageDraw.Draw(img)
draw.rectangle([16, 16, 48, 48], fill='white')
return img
def on_quit(icon, item):
on_logoff()
icon.stop()
sys.exit(0)
def create_tray_icon():
menu = pystray.Menu(pystray.MenuItem('Sair', on_quit))
return pystray.Icon("NoIdle", create_image(), "NoIdle - Monitoramento Ativo", menu)
def main():
# Parse argumentos de linha de comando
parser = argparse.ArgumentParser(description='NoIdle - Monitoramento de Atividade')
parser.add_argument('--silent', action='store_true', help='Executar em modo silencioso (sem janela de ativação)')
parser.add_argument('--minimized', action='store_true', help='Iniciar minimizado')
args = parser.parse_args()
silent_mode = args.silent or args.minimized
# Verificar e instalar em Program Files + configurar inicialização automática
check_and_install()
config = Config()
# Se não tem device_id e não está em modo silencioso, mostrar janela de ativação
if not config.has('device_id'):
if silent_mode:
# Modo silencioso mas não configurado - sair e aguardar configuração manual
print("⚠️ Dispositivo não ativado. Execute sem --silent para ativar.")
sys.exit(0)
else:
result = ActivationWindow().show()
if not result or not result.get('success'):
print("❌ Ativação cancelada ou falhou")
sys.exit(1)
device_id = result.get('device_id')
if device_id:
config.set('device_id', device_id)
else:
print("❌ Device ID não recebido")
sys.exit(1)
device_id = config.get('device_id')
print(f"✅ NoIdle iniciado - Device ID: {device_id}")
on_logon()
atexit.register(on_logoff)
# Monitorar atividade a cada 10 segundos
schedule.every(MONITOR_INTERVAL).seconds.do(monitor_activity)
# Enviar heartbeat a cada 30 segundos
schedule.every(30).seconds.do(lambda: send_heartbeat(device_id))
# Monitorar eventos de logon/logoff do Windows a cada 60 segundos
schedule.every(60).seconds.do(monitor_windows_events)
# Primeira execução imediata
monitor_activity()
send_heartbeat(device_id)
def run_schedule():
while True:
schedule.run_pending()
time.sleep(1)
Thread(target=run_schedule, daemon=True).start()
# Se estiver em modo silencioso, não mostrar tray icon (apenas rodar em background)
if silent_mode:
print("🔇 Modo silencioso ativado - Rodando em segundo plano")
# Manter o programa rodando sem tray icon
try:
while True:
time.sleep(60)
except KeyboardInterrupt:
print("❌ Encerrando...")
on_logoff()
sys.exit(0)
else:
create_tray_icon().run()
if __name__ == '__main__':
main()