feat: Implementar funcionalidades de Tarefas e Saúde

- Criadas APIs para Health (métricas de saúde)
  * Registrar peso, altura, % gordura, medidas
  * Histórico completo de medições
  * Estatísticas e resumo

- Criadas APIs para Tasks (tarefas)
  * Criar, editar e deletar tarefas
  * Filtros por status e data
  * Estatísticas detalhadas
  * Prioridades (baixa, média, alta)

- Frontend implementado:
  * Página Health.tsx - registro de métricas
  * Página Tasks.tsx - gerenciamento de tarefas
  * Página Progress.tsx - visualização de progresso
  * Dashboard integrado com estatísticas reais

- Schemas e modelos atualizados
- Todas as funcionalidades testadas e operacionais
This commit is contained in:
Sergio Correa
2025-11-22 02:33:15 +00:00
commit f50174f898
68 changed files with 6835 additions and 0 deletions

0
backend/app/__init__.py Normal file
View File

View File

@@ -0,0 +1,4 @@
from . import auth, habits, health, messages, admin, tasks
__all__ = ['auth', 'habits', 'health', 'messages', 'admin', 'tasks']

148
backend/app/api/admin.py Normal file
View File

@@ -0,0 +1,148 @@
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from typing import List
from datetime import datetime
from app.core.database import get_db
from app.core.security import decode_access_token, get_password_hash
from app.models.user import User
from app.schemas.admin import UserListResponse, UserUpdateRequest, PasswordChangeRequest
router = APIRouter(prefix="/admin", tags=["admin"])
security = HTTPBearer()
def get_current_superadmin(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
) -> User:
"""Verifica se usuário é superadmin"""
token = credentials.credentials
payload = decode_access_token(token)
if not payload:
raise HTTPException(status_code=401, detail="Token inválido")
user_id = payload.get("sub")
user = db.query(User).filter(User.id == user_id).first()
if not user or not user.is_superadmin:
raise HTTPException(
status_code=403,
detail="Acesso negado. Apenas superadmin."
)
return user
@router.get("/users", response_model=List[UserListResponse])
async def list_all_users(
admin: User = Depends(get_current_superadmin),
db: Session = Depends(get_db)
):
"""Listar todos os usuários (apenas superadmin)"""
users = db.query(User).order_by(User.created_at.desc()).all()
return users
@router.get("/stats")
async def get_admin_stats(
admin: User = Depends(get_current_superadmin),
db: Session = Depends(get_db)
):
"""Estatísticas gerais do sistema"""
from sqlalchemy import func
from app.models.habit import Habit, HabitCompletion
total_users = db.query(func.count(User.id)).scalar()
active_users = db.query(func.count(User.id)).filter(User.is_active == True).scalar()
total_habits = db.query(func.count(Habit.id)).scalar()
total_completions = db.query(func.count(HabitCompletion.id)).scalar()
return {
"total_users": total_users,
"active_users": active_users,
"inactive_users": total_users - active_users,
"total_habits": total_habits,
"total_completions": total_completions
}
@router.patch("/users/{user_id}")
async def update_user(
user_id: str,
data: UserUpdateRequest,
admin: User = Depends(get_current_superadmin),
db: Session = Depends(get_db)
):
"""Atualizar dados de usuário"""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="Usuário não encontrado")
# Atualizar campos
if data.email is not None:
# Verificar se email já existe
existing = db.query(User).filter(
User.email == data.email,
User.id != user_id
).first()
if existing:
raise HTTPException(status_code=400, detail="Email já em uso")
user.email = data.email
if data.username is not None:
# Verificar se username já existe
existing = db.query(User).filter(
User.username == data.username,
User.id != user_id
).first()
if existing:
raise HTTPException(status_code=400, detail="Username já em uso")
user.username = data.username
if data.full_name is not None:
user.full_name = data.full_name
if data.is_active is not None:
user.is_active = data.is_active
if data.is_verified is not None:
user.is_verified = data.is_verified
db.commit()
db.refresh(user)
return {"message": "Usuário atualizado", "user": user}
@router.delete("/users/{user_id}")
async def delete_user(
user_id: str,
admin: User = Depends(get_current_superadmin),
db: Session = Depends(get_db)
):
"""Deletar usuário (soft delete - apenas desativa)"""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="Usuário não encontrado")
if user.is_superadmin:
raise HTTPException(status_code=403, detail="Não é possível deletar superadmin")
user.is_active = False
db.commit()
return {"message": "Usuário desativado com sucesso"}
@router.post("/users/{user_id}/reset-password")
async def reset_user_password(
user_id: str,
data: PasswordChangeRequest,
admin: User = Depends(get_current_superadmin),
db: Session = Depends(get_db)
):
"""Resetar senha de usuário"""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="Usuário não encontrado")
user.password_hash = get_password_hash(data.new_password)
db.commit()
return {"message": "Senha alterada com sucesso"}

56
backend/app/api/auth.py Normal file
View File

@@ -0,0 +1,56 @@
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from app.core.database import get_db
from app.core.security import get_password_hash, verify_password, create_access_token
from app.models.user import User
from app.schemas.user import UserCreate, UserLogin, Token, UserResponse
from datetime import datetime
router = APIRouter()
@router.post("/register", response_model=Token)
def register(user: UserCreate, db: Session = Depends(get_db)):
if db.query(User).filter(User.email == user.email).first():
raise HTTPException(status_code=400, detail="Email already registered")
if db.query(User).filter(User.username == user.username).first():
raise HTTPException(status_code=400, detail="Username already taken")
db_user = User(
email=user.email,
username=user.username,
password_hash=get_password_hash(user.password),
full_name=user.full_name,
phone=user.phone,
is_verified=True # SEM verificação de email
)
db.add(db_user)
db.commit()
db.refresh(db_user)
access_token = create_access_token(data={"sub": db_user.email})
return {
"access_token": access_token,
"token_type": "bearer",
"user": UserResponse.from_orm(db_user)
}
@router.post("/login", response_model=Token)
def login(credentials: UserLogin, db: Session = Depends(get_db)):
user = db.query(User).filter(User.email == credentials.email).first()
if not user or not verify_password(credentials.password, user.password_hash):
raise HTTPException(status_code=401, detail="Invalid credentials")
user.last_login_at = datetime.utcnow()
db.commit()
access_token = create_access_token(data={"sub": user.email})
return {
"access_token": access_token,
"token_type": "bearer",
"user": UserResponse.from_orm(user)
}

155
backend/app/api/habits.py Normal file
View File

@@ -0,0 +1,155 @@
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from typing import List
from datetime import date, datetime
from app.core.database import get_db
from app.core.security import decode_access_token
from app.models.user import User
from app.models.habit import Habit, HabitCompletion
from app.schemas.habit import HabitCreate, HabitResponse, HabitCompletionCreate
router = APIRouter(prefix="/habits", tags=["habits"])
security = HTTPBearer()
def get_current_user_id(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str:
"""Extrai user_id do token JWT"""
token = credentials.credentials
payload = decode_access_token(token)
if not payload:
raise HTTPException(status_code=401, detail="Token inválido")
return payload.get("sub")
@router.post("/", response_model=HabitResponse, status_code=status.HTTP_201_CREATED)
async def create_habit(
habit_data: HabitCreate,
user_id: str = Depends(get_current_user_id),
db: Session = Depends(get_db)
):
"""Criar novo hábito"""
new_habit = Habit(
user_id=user_id,
name=habit_data.name,
description=habit_data.description,
frequency_type=habit_data.frequency_type,
target_count=habit_data.target_count,
reminder_time=habit_data.reminder_time,
start_date=habit_data.start_date or date.today()
)
db.add(new_habit)
db.commit()
db.refresh(new_habit)
return new_habit
@router.get("/", response_model=List[HabitResponse])
async def list_habits(
user_id: str = Depends(get_current_user_id),
db: Session = Depends(get_db)
):
"""Listar todos os hábitos do usuário"""
habits = db.query(Habit).filter(
Habit.user_id == user_id,
Habit.is_active == True
).all()
return habits
@router.post("/{habit_id}/complete", status_code=status.HTTP_201_CREATED)
async def complete_habit(
habit_id: str,
user_id: str = Depends(get_current_user_id),
db: Session = Depends(get_db)
):
"""Marcar hábito como completo para hoje"""
# Verificar se já completou hoje
today = date.today()
existing = db.query(HabitCompletion).filter(
HabitCompletion.habit_id == habit_id,
HabitCompletion.user_id == user_id,
HabitCompletion.completion_date == today
).first()
if existing:
raise HTTPException(status_code=400, detail="Hábito já completado hoje")
# Criar completion
completion = HabitCompletion(
habit_id=habit_id,
user_id=user_id,
completion_date=today,
completion_time=datetime.now().time()
)
db.add(completion)
db.commit()
return {"message": "Hábito completado!", "date": today}
@router.delete("/{habit_id}/complete")
async def uncomplete_habit(
habit_id: str,
user_id: str = Depends(get_current_user_id),
db: Session = Depends(get_db)
):
"""Desmarcar hábito de hoje"""
today = date.today()
completion = db.query(HabitCompletion).filter(
HabitCompletion.habit_id == habit_id,
HabitCompletion.user_id == user_id,
HabitCompletion.completion_date == today
).first()
if not completion:
raise HTTPException(status_code=404, detail="Completion não encontrado")
db.delete(completion)
db.commit()
return {"message": "Completion removido"}
@router.get("/stats")
async def get_habit_stats(
user_id: str = Depends(get_current_user_id),
db: Session = Depends(get_db)
):
"""Estatísticas de hábitos"""
from sqlalchemy import func
total_habits = db.query(func.count(Habit.id)).filter(
Habit.user_id == user_id,
Habit.is_active == True
).scalar()
completed_today = db.query(func.count(HabitCompletion.id)).filter(
HabitCompletion.user_id == user_id,
HabitCompletion.completion_date == date.today()
).scalar()
return {
"total_habits": total_habits,
"completed_today": completed_today,
"pending_today": total_habits - completed_today
}
@router.delete("/{habit_id}")
async def delete_habit(
habit_id: str,
user_id: str = Depends(get_current_user_id),
db: Session = Depends(get_db)
):
"""Deletar hábito"""
habit = db.query(Habit).filter(
Habit.id == habit_id,
Habit.user_id == user_id
).first()
if not habit:
raise HTTPException(status_code=404, detail="Hábito não encontrado")
db.delete(habit)
db.commit()
return {"message": "Hábito deletado"}

169
backend/app/api/health.py Normal file
View File

@@ -0,0 +1,169 @@
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from typing import List
from datetime import date
from app.core.database import get_db
from app.core.security import decode_access_token
from app.models.health import HealthMetric
from app.schemas.health import HealthMetricCreate, HealthMetricResponse, HealthMetricUpdate
router = APIRouter()
security = HTTPBearer()
def get_current_user_id(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str:
"""Extrai user_id do token JWT"""
token = credentials.credentials
payload = decode_access_token(token)
if not payload:
raise HTTPException(status_code=401, detail="Token inválido")
return payload.get("sub")
@router.post("/", response_model=HealthMetricResponse, status_code=status.HTTP_201_CREATED)
async def create_health_metric(
metric_data: HealthMetricCreate,
user_id: str = Depends(get_current_user_id),
db: Session = Depends(get_db)
):
"""Criar nova métrica de saúde"""
new_metric = HealthMetric(
user_id=user_id,
measurement_date=metric_data.measurement_date,
measurement_time=metric_data.measurement_time,
weight=metric_data.weight,
height=metric_data.height,
body_fat_percentage=metric_data.body_fat_percentage,
muscle_mass=metric_data.muscle_mass,
waist=metric_data.waist,
chest=metric_data.chest,
hips=metric_data.hips,
notes=metric_data.notes
)
db.add(new_metric)
db.commit()
db.refresh(new_metric)
return new_metric
@router.get("/", response_model=List[HealthMetricResponse])
async def list_health_metrics(
user_id: str = Depends(get_current_user_id),
db: Session = Depends(get_db),
limit: int = 100
):
"""Listar todas as métricas de saúde do usuário"""
metrics = db.query(HealthMetric).filter(
HealthMetric.user_id == user_id
).order_by(HealthMetric.measurement_date.desc()).limit(limit).all()
return metrics
@router.get("/latest", response_model=HealthMetricResponse)
async def get_latest_metric(
user_id: str = Depends(get_current_user_id),
db: Session = Depends(get_db)
):
"""Obter última métrica registrada"""
metric = db.query(HealthMetric).filter(
HealthMetric.user_id == user_id
).order_by(HealthMetric.measurement_date.desc()).first()
if not metric:
raise HTTPException(status_code=404, detail="Nenhuma métrica encontrada")
return metric
@router.get("/{metric_id}", response_model=HealthMetricResponse)
async def get_health_metric(
metric_id: str,
user_id: str = Depends(get_current_user_id),
db: Session = Depends(get_db)
):
"""Obter métrica específica"""
metric = db.query(HealthMetric).filter(
HealthMetric.id == metric_id,
HealthMetric.user_id == user_id
).first()
if not metric:
raise HTTPException(status_code=404, detail="Métrica não encontrada")
return metric
@router.put("/{metric_id}", response_model=HealthMetricResponse)
async def update_health_metric(
metric_id: str,
metric_data: HealthMetricUpdate,
user_id: str = Depends(get_current_user_id),
db: Session = Depends(get_db)
):
"""Atualizar métrica de saúde"""
metric = db.query(HealthMetric).filter(
HealthMetric.id == metric_id,
HealthMetric.user_id == user_id
).first()
if not metric:
raise HTTPException(status_code=404, detail="Métrica não encontrada")
# Atualizar campos
update_data = metric_data.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(metric, field, value)
db.commit()
db.refresh(metric)
return metric
@router.delete("/{metric_id}")
async def delete_health_metric(
metric_id: str,
user_id: str = Depends(get_current_user_id),
db: Session = Depends(get_db)
):
"""Deletar métrica de saúde"""
metric = db.query(HealthMetric).filter(
HealthMetric.id == metric_id,
HealthMetric.user_id == user_id
).first()
if not metric:
raise HTTPException(status_code=404, detail="Métrica não encontrada")
db.delete(metric)
db.commit()
return {"message": "Métrica deletada com sucesso"}
@router.get("/stats/summary")
async def get_health_stats(
user_id: str = Depends(get_current_user_id),
db: Session = Depends(get_db)
):
"""Estatísticas e resumo de saúde"""
latest = db.query(HealthMetric).filter(
HealthMetric.user_id == user_id
).order_by(HealthMetric.measurement_date.desc()).first()
# Pegar primeira e última medição para calcular diferença
first = db.query(HealthMetric).filter(
HealthMetric.user_id == user_id,
HealthMetric.weight.isnot(None)
).order_by(HealthMetric.measurement_date.asc()).first()
stats = {
"current_weight": float(latest.weight) if latest and latest.weight else None,
"current_height": float(latest.height) if latest and latest.height else None,
"current_body_fat": float(latest.body_fat_percentage) if latest and latest.body_fat_percentage else None,
"weight_change": None,
"total_measurements": db.query(HealthMetric).filter(HealthMetric.user_id == user_id).count()
}
if latest and first and latest.weight and first.weight:
stats["weight_change"] = float(latest.weight - first.weight)
return stats

View File

@@ -0,0 +1,93 @@
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import Dict
from app.core.database import get_db
from app.services.message_service import MessageService
router = APIRouter(prefix="/messages", tags=["messages"])
@router.get("/daily", response_model=Dict)
async def get_daily_message(
user_id: str, # TODO: Pegar do JWT token quando implementar autenticação
db: Session = Depends(get_db)
):
"""
Retorna a mensagem motivacional do dia para o usuário.
A mensagem é contextual baseada em:
- Streaks e conquistas
- Inatividade
- Lembretes
- Progresso
- Hora do dia
"""
try:
message_service = MessageService(db)
message = message_service.get_message_of_the_day(user_id)
return {
"success": True,
"message": message
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/click/{message_id}")
async def mark_message_clicked(
message_id: str,
user_id: str,
db: Session = Depends(get_db)
):
"""Marca mensagem como clicada pelo usuário"""
from app.models.message import UserMessageLog
from datetime import datetime
try:
log = db.query(UserMessageLog).filter(
UserMessageLog.id == message_id,
UserMessageLog.user_id == user_id
).first()
if log:
log.was_clicked = True
log.clicked_at = datetime.now()
db.commit()
return {"success": True, "message": "Click registrado"}
else:
raise HTTPException(status_code=404, detail="Mensagem não encontrada")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/history")
async def get_message_history(
user_id: str,
limit: int = 10,
db: Session = Depends(get_db)
):
"""Retorna histórico de mensagens do usuário"""
from app.models.message import UserMessageLog
try:
logs = db.query(UserMessageLog).filter(
UserMessageLog.user_id == user_id
).order_by(UserMessageLog.shown_at.desc()).limit(limit).all()
return {
"success": True,
"messages": [
{
"id": str(log.id),
"message_text": log.message_text,
"message_type": log.message_type,
"shown_at": log.shown_at.isoformat(),
"was_clicked": log.was_clicked
}
for log in logs
]
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

218
backend/app/api/tasks.py Normal file
View File

@@ -0,0 +1,218 @@
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from typing import List, Optional
from datetime import date
from app.core.database import get_db
from app.core.security import decode_access_token
from app.models.task import Task
from app.schemas.task import TaskCreate, TaskResponse, TaskUpdate
router = APIRouter()
security = HTTPBearer()
def get_current_user_id(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str:
"""Extrai user_id do token JWT"""
token = credentials.credentials
payload = decode_access_token(token)
if not payload:
raise HTTPException(status_code=401, detail="Token inválido")
return payload.get("sub")
@router.post("/", response_model=TaskResponse, status_code=status.HTTP_201_CREATED)
async def create_task(
task_data: TaskCreate,
user_id: str = Depends(get_current_user_id),
db: Session = Depends(get_db)
):
"""Criar nova tarefa"""
new_task = Task(
user_id=user_id,
title=task_data.title,
description=task_data.description,
priority=task_data.priority,
status=task_data.status,
due_date=task_data.due_date,
due_time=task_data.due_time,
category_id=task_data.category_id
)
db.add(new_task)
db.commit()
db.refresh(new_task)
return new_task
@router.get("/", response_model=List[TaskResponse])
async def list_tasks(
user_id: str = Depends(get_current_user_id),
db: Session = Depends(get_db),
status_filter: Optional[str] = None,
include_archived: bool = False
):
"""Listar todas as tarefas do usuário"""
query = db.query(Task).filter(Task.user_id == user_id)
if not include_archived:
query = query.filter(Task.is_archived == False)
if status_filter:
query = query.filter(Task.status == status_filter)
tasks = query.order_by(Task.due_date.asc()).all()
return tasks
@router.get("/today", response_model=List[TaskResponse])
async def get_today_tasks(
user_id: str = Depends(get_current_user_id),
db: Session = Depends(get_db)
):
"""Obter tarefas de hoje"""
today = date.today()
tasks = db.query(Task).filter(
Task.user_id == user_id,
Task.due_date == today,
Task.is_archived == False
).order_by(Task.due_time.asc()).all()
return tasks
@router.get("/{task_id}", response_model=TaskResponse)
async def get_task(
task_id: str,
user_id: str = Depends(get_current_user_id),
db: Session = Depends(get_db)
):
"""Obter tarefa específica"""
task = db.query(Task).filter(
Task.id == task_id,
Task.user_id == user_id
).first()
if not task:
raise HTTPException(status_code=404, detail="Tarefa não encontrada")
return task
@router.put("/{task_id}", response_model=TaskResponse)
async def update_task(
task_id: str,
task_data: TaskUpdate,
user_id: str = Depends(get_current_user_id),
db: Session = Depends(get_db)
):
"""Atualizar tarefa"""
task = db.query(Task).filter(
Task.id == task_id,
Task.user_id == user_id
).first()
if not task:
raise HTTPException(status_code=404, detail="Tarefa não encontrada")
# Atualizar campos
update_data = task_data.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(task, field, value)
db.commit()
db.refresh(task)
return task
@router.patch("/{task_id}/status")
async def update_task_status(
task_id: str,
status: str,
user_id: str = Depends(get_current_user_id),
db: Session = Depends(get_db)
):
"""Atualizar status da tarefa"""
task = db.query(Task).filter(
Task.id == task_id,
Task.user_id == user_id
).first()
if not task:
raise HTTPException(status_code=404, detail="Tarefa não encontrada")
task.status = status
db.commit()
return {"message": "Status atualizado", "status": status}
@router.delete("/{task_id}")
async def delete_task(
task_id: str,
user_id: str = Depends(get_current_user_id),
db: Session = Depends(get_db)
):
"""Deletar tarefa"""
task = db.query(Task).filter(
Task.id == task_id,
Task.user_id == user_id
).first()
if not task:
raise HTTPException(status_code=404, detail="Tarefa não encontrada")
db.delete(task)
db.commit()
return {"message": "Tarefa deletada com sucesso"}
@router.get("/stats/summary")
async def get_task_stats(
user_id: str = Depends(get_current_user_id),
db: Session = Depends(get_db)
):
"""Estatísticas de tarefas"""
from sqlalchemy import func
total_tasks = db.query(func.count(Task.id)).filter(
Task.user_id == user_id,
Task.is_archived == False
).scalar()
pending = db.query(func.count(Task.id)).filter(
Task.user_id == user_id,
Task.status == "pending",
Task.is_archived == False
).scalar()
in_progress = db.query(func.count(Task.id)).filter(
Task.user_id == user_id,
Task.status == "in_progress",
Task.is_archived == False
).scalar()
completed = db.query(func.count(Task.id)).filter(
Task.user_id == user_id,
Task.status == "completed",
Task.is_archived == False
).scalar()
today = date.today()
today_tasks = db.query(func.count(Task.id)).filter(
Task.user_id == user_id,
Task.due_date == today,
Task.is_archived == False
).scalar()
today_completed = db.query(func.count(Task.id)).filter(
Task.user_id == user_id,
Task.due_date == today,
Task.status == "completed"
).scalar()
return {
"total_tasks": total_tasks,
"pending": pending,
"in_progress": in_progress,
"completed": completed,
"today_tasks": today_tasks,
"today_completed": today_completed
}

View File

View File

@@ -0,0 +1,13 @@
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
DATABASE_URL: str
REDIS_URL: str
SECRET_KEY: str
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 10080
class Config:
env_file = ".env"
settings = Settings()

View File

@@ -0,0 +1,21 @@
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import os
DATABASE_URL = os.getenv(
"DATABASE_URL",
"postgresql://vida180_user:vida180_strong_password_2024@postgres:5432/vida180_db"
)
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

View File

@@ -0,0 +1,41 @@
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
from app.core.config import settings
# Usar Argon2 ao invés de Bcrypt
ph = PasswordHasher()
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verifica se a senha corresponde ao hash"""
try:
ph.verify(hashed_password, plain_password)
return True
except VerifyMismatchError:
return False
def get_password_hash(password: str) -> str:
"""Gera hash da senha"""
return ph.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Cria token JWT"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def decode_access_token(token: str) -> Optional[dict]:
"""Decodifica token JWT"""
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
return payload
except JWTError:
return None

27
backend/app/main.py Normal file
View File

@@ -0,0 +1,27 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.core.database import engine, Base
from app.api import auth, habits, health, messages, admin, tasks
app = FastAPI(title="Vida180 API")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
Base.metadata.create_all(bind=engine)
app.include_router(auth.router, prefix="/api/v1/auth", tags=["auth"])
app.include_router(habits.router, prefix="/api/v1/habits", tags=["habits"])
app.include_router(health.router, prefix="/api/v1/health", tags=["health"])
app.include_router(messages.router, prefix="/api/v1/messages", tags=["messages"])
app.include_router(admin.router, prefix="/api/v1/admin", tags=["admin"])
app.include_router(tasks.router, prefix="/api/v1/tasks", tags=["tasks"])
@app.get("/")
def root():
return {"message": "Vida180 API"}

View File

@@ -0,0 +1,5 @@
-- Adicionar campo is_superadmin
ALTER TABLE users ADD COLUMN IF NOT EXISTS is_superadmin BOOLEAN DEFAULT FALSE;
-- Criar índice
CREATE INDEX IF NOT EXISTS idx_users_superadmin ON users(is_superadmin);

View File

@@ -0,0 +1 @@
# Models package

View File

@@ -0,0 +1,34 @@
from sqlalchemy import Column, String, Integer, Boolean, Date, Time, DateTime, ForeignKey, Text
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.sql import func
import uuid
from app.core.database import Base
class Habit(Base):
__tablename__ = "habits"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
user_id = Column(UUID(as_uuid=True), ForeignKey('users.id', ondelete='CASCADE'), nullable=False, index=True)
name = Column(String(255), nullable=False)
description = Column(Text)
frequency_type = Column(String(50), default='daily')
target_count = Column(Integer, default=1)
reminder_time = Column(Time)
start_date = Column(Date, default=func.current_date())
end_date = Column(Date)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
class HabitCompletion(Base):
__tablename__ = "habit_completions"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
habit_id = Column(UUID(as_uuid=True), ForeignKey('habits.id', ondelete='CASCADE'), nullable=False, index=True)
user_id = Column(UUID(as_uuid=True), ForeignKey('users.id', ondelete='CASCADE'), nullable=False, index=True)
completion_date = Column(Date, nullable=False, index=True)
completion_time = Column(Time)
notes = Column(Text)
quality_rating = Column(Integer)
created_at = Column(DateTime(timezone=True), server_default=func.now())

View File

@@ -0,0 +1,21 @@
from sqlalchemy import Column, String, Date, Time, Numeric, ForeignKey
from sqlalchemy.dialects.postgresql import UUID
import uuid
from app.core.database import Base
class HealthMetric(Base):
__tablename__ = "health_metrics"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
user_id = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), nullable=False)
measurement_date = Column(Date, nullable=False)
measurement_time = Column(Time)
weight = Column(Numeric(5, 2))
height = Column(Numeric(5, 2))
body_fat_percentage = Column(Numeric(4, 2))
muscle_mass = Column(Numeric(5, 2))
waist = Column(Numeric(5, 2))
chest = Column(Numeric(5, 2))
hips = Column(Numeric(5, 2))
notes = Column(String)

View File

@@ -0,0 +1,31 @@
from sqlalchemy import Column, String, Text, Integer, Boolean, DateTime, ForeignKey
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.sql import func
import uuid
from app.core.database import Base
class MotivationalMessage(Base):
__tablename__ = "motivational_messages"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
message_type = Column(String(50), nullable=False)
trigger_condition = Column(String(100), nullable=False)
message_text = Column(Text, nullable=False)
icon = Column(String(10))
priority = Column(Integer, default=0)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
class UserMessageLog(Base):
__tablename__ = "user_messages_log"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
user_id = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), nullable=False)
message_id = Column(UUID(as_uuid=True), ForeignKey("motivational_messages.id", ondelete="SET NULL"))
message_text = Column(Text, nullable=False)
message_type = Column(String(50))
shown_at = Column(DateTime(timezone=True), server_default=func.now())
was_clicked = Column(Boolean, default=False)
clicked_at = Column(DateTime(timezone=True))

View File

@@ -0,0 +1,16 @@
from sqlalchemy import Column, Integer, Date, ForeignKey
from sqlalchemy.dialects.postgresql import UUID
import uuid
from app.core.database import Base
class Streak(Base):
__tablename__ = "streaks"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
habit_id = Column(UUID(as_uuid=True), ForeignKey("habits.id", ondelete="CASCADE"), nullable=False)
user_id = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), nullable=False)
current_streak = Column(Integer, default=0)
longest_streak = Column(Integer, default=0)
last_completion_date = Column(Date)
total_completions = Column(Integer, default=0)

View File

@@ -0,0 +1,21 @@
from sqlalchemy import Column, String, Date, Time, Boolean, ForeignKey
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.sql import func
from datetime import datetime
import uuid
from app.core.database import Base
class Task(Base):
__tablename__ = "tasks"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
user_id = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), nullable=False)
category_id = Column(UUID(as_uuid=True), nullable=True) # Removido FK para categories por enquanto
title = Column(String(255), nullable=False)
description = Column(String)
priority = Column(String(20), default='medium')
status = Column(String(20), default='pending')
due_date = Column(Date)
due_time = Column(Time)
is_archived = Column(Boolean, default=False)

View File

@@ -0,0 +1,25 @@
from sqlalchemy import Column, String, Boolean, DateTime
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.sql import func
import uuid
from app.core.database import Base
class User(Base):
__tablename__ = "users"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
email = Column(String(255), unique=True, nullable=False, index=True)
username = Column(String(100), unique=True, nullable=False, index=True)
password_hash = Column(String(255), nullable=False)
full_name = Column(String(255))
phone = Column(String(20)) # NOVO
avatar_url = Column(String)
timezone = Column(String(50), default='America/Sao_Paulo')
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
last_login_at = Column(DateTime(timezone=True))
is_active = Column(Boolean, default=True)
is_verified = Column(Boolean, default=False)
is_superadmin = Column(Boolean, default=False)
verification_token = Column(String(255)) # NOVO - para confirmação de email

View File

View File

@@ -0,0 +1,28 @@
from pydantic import BaseModel, EmailStr
from typing import Optional
from datetime import datetime
import uuid
class UserListResponse(BaseModel):
id: uuid.UUID
email: EmailStr
username: str
full_name: Optional[str]
is_active: bool
is_verified: bool
is_superadmin: bool
created_at: datetime
last_login_at: Optional[datetime]
class Config:
from_attributes = True
class UserUpdateRequest(BaseModel):
email: Optional[EmailStr] = None
username: Optional[str] = None
full_name: Optional[str] = None
is_active: Optional[bool] = None
is_verified: Optional[bool] = None
class PasswordChangeRequest(BaseModel):
new_password: str

View File

@@ -0,0 +1,28 @@
from pydantic import BaseModel
from typing import Optional
from datetime import date, time
import uuid
class HabitCreate(BaseModel):
name: str
description: Optional[str] = None
frequency_type: str = "daily"
target_count: int = 1
reminder_time: Optional[time] = None
start_date: Optional[date] = None
class HabitResponse(BaseModel):
id: uuid.UUID
name: str
description: Optional[str]
frequency_type: str
target_count: int
is_active: bool
start_date: date
class Config:
from_attributes = True
class HabitCompletionCreate(BaseModel):
notes: Optional[str] = None
quality_rating: Optional[int] = None

View File

@@ -0,0 +1,47 @@
from pydantic import BaseModel
from typing import Optional
from datetime import date, time
from decimal import Decimal
import uuid
class HealthMetricCreate(BaseModel):
measurement_date: date
measurement_time: Optional[time] = None
weight: Optional[Decimal] = None
height: Optional[Decimal] = None
body_fat_percentage: Optional[Decimal] = None
muscle_mass: Optional[Decimal] = None
waist: Optional[Decimal] = None
chest: Optional[Decimal] = None
hips: Optional[Decimal] = None
notes: Optional[str] = None
class HealthMetricResponse(BaseModel):
id: uuid.UUID
user_id: uuid.UUID
measurement_date: date
measurement_time: Optional[time]
weight: Optional[Decimal]
height: Optional[Decimal]
body_fat_percentage: Optional[Decimal]
muscle_mass: Optional[Decimal]
waist: Optional[Decimal]
chest: Optional[Decimal]
hips: Optional[Decimal]
notes: Optional[str]
class Config:
from_attributes = True
class HealthMetricUpdate(BaseModel):
measurement_date: Optional[date] = None
measurement_time: Optional[time] = None
weight: Optional[Decimal] = None
height: Optional[Decimal] = None
body_fat_percentage: Optional[Decimal] = None
muscle_mass: Optional[Decimal] = None
waist: Optional[Decimal] = None
chest: Optional[Decimal] = None
hips: Optional[Decimal] = None
notes: Optional[str] = None

View File

@@ -0,0 +1,39 @@
from pydantic import BaseModel
from typing import Optional
from datetime import date, time
import uuid
class TaskCreate(BaseModel):
title: str
description: Optional[str] = None
priority: str = "medium" # low, medium, high
status: str = "pending" # pending, in_progress, completed
due_date: Optional[date] = None
due_time: Optional[time] = None
category_id: Optional[uuid.UUID] = None
class TaskResponse(BaseModel):
id: uuid.UUID
user_id: uuid.UUID
title: str
description: Optional[str]
priority: str
status: str
due_date: Optional[date]
due_time: Optional[time]
is_archived: bool
category_id: Optional[uuid.UUID]
class Config:
from_attributes = True
class TaskUpdate(BaseModel):
title: Optional[str] = None
description: Optional[str] = None
priority: Optional[str] = None
status: Optional[str] = None
due_date: Optional[date] = None
due_time: Optional[time] = None
is_archived: Optional[bool] = None
category_id: Optional[uuid.UUID] = None

View File

@@ -0,0 +1,53 @@
from pydantic import BaseModel, EmailStr, field_validator
from typing import Optional
import uuid
import re
class UserBase(BaseModel):
email: EmailStr
username: str
full_name: str
phone: str
class UserCreate(UserBase):
password: str
@field_validator('phone')
@classmethod
def validate_phone(cls, v):
# Remove tudo que não é número
phone = re.sub(r'\D', '', v)
if len(phone) < 10 or len(phone) > 11:
raise ValueError('Telefone deve ter 10 ou 11 dígitos')
return phone
@field_validator('full_name')
@classmethod
def validate_full_name(cls, v):
if not v or len(v.strip()) < 3:
raise ValueError('Nome completo deve ter no mínimo 3 caracteres')
return v.strip()
class UserLogin(BaseModel):
email: EmailStr
password: str
class UserResponse(BaseModel):
id: uuid.UUID
email: EmailStr
username: str
full_name: Optional[str]
phone: Optional[str]
is_superadmin: bool = False
is_verified: bool = False
class Config:
from_attributes = True
class Token(BaseModel):
access_token: str
token_type: str = "bearer"
user: UserResponse
class TokenData(BaseModel):
sub: str

View File

View File

@@ -0,0 +1,246 @@
from sendgrid import SendGridAPIClient
from sendgrid.helpers.mail import Mail, Email, To, Content
import logging
from app.core.config import settings
logger = logging.getLogger(__name__)
def send_verification_email(to_email: str, username: str, verification_token: str):
"""Envia email de verificação para novo usuário"""
verification_url = f"{settings.FRONTEND_URL}/verify-email/{verification_token}"
# Template do email
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<style>
body {{
font-family: 'Arial', sans-serif;
background-color: #f3f4f6;
padding: 20px;
}}
.container {{
max-width: 600px;
margin: 0 auto;
background: white;
border-radius: 16px;
padding: 40px;
box-shadow: 0 10px 30px rgba(0,0,0,0.1);
}}
.header {{
text-align: center;
margin-bottom: 30px;
}}
.logo {{
font-size: 2.5rem;
font-weight: 800;
background: linear-gradient(135deg, #10b981 0%, #059669 100%);
-webkit-background-clip: text;
-webkit-text-fill-color: transparent;
margin: 0;
}}
.content {{
color: #374151;
line-height: 1.6;
}}
.button {{
display: inline-block;
margin: 30px 0;
padding: 15px 40px;
background: linear-gradient(135deg, #10b981 0%, #059669 100%);
color: white !important;
text-decoration: none;
border-radius: 12px;
font-weight: 700;
font-size: 16px;
}}
.footer {{
margin-top: 40px;
padding-top: 20px;
border-top: 2px solid #e5e7eb;
color: #6b7280;
font-size: 14px;
text-align: center;
}}
.highlight {{
color: #10b981;
font-weight: 700;
}}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1 class="logo">🚀 VIDA180</h1>
</div>
<div class="content">
<h2>Olá, <span class="highlight">{username}</span>! 👋</h2>
<p>Seja muito bem-vindo(a) ao <strong>Vida180</strong>! 🎉</p>
<p>Estamos muito felizes em ter você conosco nessa jornada de transformação!</p>
<p>Para ativar sua conta e começar a usar todas as funcionalidades, clique no botão abaixo:</p>
<div style="text-align: center;">
<a href="{verification_url}" class="button">
✅ Verificar Meu Email
</a>
</div>
<p style="color: #6b7280; font-size: 14px;">
Ou copie e cole este link no seu navegador:<br>
<a href="{verification_url}" style="color: #10b981;">{verification_url}</a>
</p>
<p>Após verificar seu email, você terá acesso completo a:</p>
<ul>
<li>🎯 <strong>Hábitos:</strong> Construa rotinas poderosas</li>
<li>📝 <strong>Tarefas:</strong> Organize seu dia</li>
<li>💪 <strong>Saúde:</strong> Acompanhe sua evolução física</li>
<li>📊 <strong>Progresso:</strong> Visualize sua transformação</li>
</ul>
<p><strong>A transformação acontece um dia de cada vez!</strong> 💪</p>
</div>
<div class="footer">
<p>
Este email foi enviado porque você se cadastrou no Vida180.<br>
Se você não criou esta conta, pode ignorar este email.
</p>
<p style="margin-top: 20px;">
<strong>Vida180</strong> - Transforme sua vida, um dia de cada vez 🚀
</p>
</div>
</div>
</body>
</html>
"""
try:
message = Mail(
from_email=Email(settings.SENDGRID_FROM_EMAIL, settings.SENDGRID_FROM_NAME),
to_emails=To(to_email),
subject=f"🚀 Bem-vindo ao Vida180! Confirme seu email",
html_content=Content("text/html", html_content)
)
sg = SendGridAPIClient(settings.SENDGRID_API_KEY)
response = sg.send(message)
logger.info(f"Email enviado para {to_email} - Status: {response.status_code}")
return True
except Exception as e:
logger.error(f"Erro ao enviar email para {to_email}: {str(e)}")
return False
def send_password_reset_email(to_email: str, username: str, reset_token: str):
"""Envia email de reset de senha"""
reset_url = f"{settings.FRONTEND_URL}/reset-password/{reset_token}"
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<style>
body {{
font-family: 'Arial', sans-serif;
background-color: #f3f4f6;
padding: 20px;
}}
.container {{
max-width: 600px;
margin: 0 auto;
background: white;
border-radius: 16px;
padding: 40px;
box-shadow: 0 10px 30px rgba(0,0,0,0.1);
}}
.header {{
text-align: center;
margin-bottom: 30px;
}}
.logo {{
font-size: 2.5rem;
font-weight: 800;
background: linear-gradient(135deg, #10b981 0%, #059669 100%);
-webkit-background-clip: text;
-webkit-text-fill-color: transparent;
margin: 0;
}}
.button {{
display: inline-block;
margin: 30px 0;
padding: 15px 40px;
background: linear-gradient(135deg, #f97316 0%, #ea580c 100%);
color: white !important;
text-decoration: none;
border-radius: 12px;
font-weight: 700;
}}
.alert {{
background: #fef3c7;
border-left: 4px solid #f59e0b;
padding: 15px;
margin: 20px 0;
border-radius: 8px;
}}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1 class="logo">🚀 VIDA180</h1>
</div>
<h2>Olá, {username}! 👋</h2>
<p>Recebemos uma solicitação para redefinir sua senha.</p>
<div class="alert">
⚠️ Se você não solicitou a alteração de senha, ignore este email e sua senha permanecerá a mesma.
</div>
<p>Para criar uma nova senha, clique no botão abaixo:</p>
<div style="text-align: center;">
<a href="{reset_url}" class="button">
🔑 Redefinir Minha Senha
</a>
</div>
<p style="color: #6b7280; font-size: 14px;">
Este link expira em 24 horas.
</p>
<div style="margin-top: 40px; padding-top: 20px; border-top: 2px solid #e5e7eb; color: #6b7280; font-size: 14px; text-align: center;">
<strong>Vida180</strong> - Transforme sua vida, um dia de cada vez 🚀
</div>
</div>
</body>
</html>
"""
try:
message = Mail(
from_email=Email(settings.SENDGRID_FROM_EMAIL, settings.SENDGRID_FROM_NAME),
to_emails=To(to_email),
subject="🔑 Redefinir senha - Vida180",
html_content=Content("text/html", html_content)
)
sg = SendGridAPIClient(settings.SENDGRID_API_KEY)
response = sg.send(message)
logger.info(f"Email de reset enviado para {to_email}")
return True
except Exception as e:
logger.error(f"Erro ao enviar email de reset: {str(e)}")
return False

View File

@@ -0,0 +1,290 @@
from datetime import datetime, timedelta
from sqlalchemy.orm import Session
from sqlalchemy import func, and_
from typing import Optional, Dict
import random
from app.models.message import MotivationalMessage, UserMessageLog
from app.models.habit import Habit, HabitCompletion
from app.models.task import Task
from app.models.health import HealthMetric
from app.models.streak import Streak
class MessageService:
"""Serviço para gerar mensagens contextuais inteligentes"""
def __init__(self, db: Session):
self.db = db
def get_message_of_the_day(self, user_id: str) -> Dict:
"""Retorna a mensagem mais relevante para o usuário"""
# 1. Verificar streak (prioridade alta)
streak_message = self._check_streak_achievements(user_id)
if streak_message:
self._log_message(user_id, streak_message)
return streak_message
# 2. Verificar inatividade (recuperação)
inactive_message = self._check_inactivity(user_id)
if inactive_message:
self._log_message(user_id, inactive_message)
return inactive_message
# 3. Verificar lembretes importantes
reminder_message = self._check_reminders(user_id)
if reminder_message:
self._log_message(user_id, reminder_message)
return reminder_message
# 4. Verificar progresso e milestones
progress_message = self._check_progress(user_id)
if progress_message:
self._log_message(user_id, progress_message)
return progress_message
# 5. Mensagem baseada em hora do dia
time_message = self._get_time_based_message()
if time_message:
self._log_message(user_id, time_message)
return time_message
# 6. Mensagem motivacional padrão
default_message = self._get_daily_motivation()
self._log_message(user_id, default_message)
return default_message
def _check_streak_achievements(self, user_id: str) -> Optional[Dict]:
"""Verifica conquistas de streak"""
# Buscar maior streak do usuário
max_streak = self.db.query(func.max(Streak.current_streak)).filter(
Streak.user_id == user_id
).scalar() or 0
# Definir condições de streak
conditions = [
(100, 'streak_100'),
(60, 'streak_60'),
(30, 'streak_30'),
(14, 'streak_14'),
(7, 'streak_7')
]
for days, condition in conditions:
if max_streak >= days:
# Verificar se já mostrou essa mensagem hoje
already_shown = self.db.query(UserMessageLog).filter(
and_(
UserMessageLog.user_id == user_id,
UserMessageLog.message_type == 'streak_achievement',
func.date(UserMessageLog.shown_at) == datetime.now().date()
)
).first()
if not already_shown:
message = self.db.query(MotivationalMessage).filter(
and_(
MotivationalMessage.message_type == 'streak_achievement',
MotivationalMessage.trigger_condition == condition,
MotivationalMessage.is_active == True
)
).first()
if message:
return self._message_to_dict(message)
return None
def _check_inactivity(self, user_id: str) -> Optional[Dict]:
"""Verifica se o usuário está inativo"""
last_log = self.db.query(UserMessageLog).filter(
UserMessageLog.user_id == user_id
).order_by(UserMessageLog.shown_at.desc()).first()
if not last_log:
return None
days_inactive = (datetime.now() - last_log.shown_at).days
if days_inactive >= 14:
condition = 'inactive_14days'
elif days_inactive >= 7:
condition = 'inactive_7days'
else:
return None
message = self.db.query(MotivationalMessage).filter(
and_(
MotivationalMessage.message_type == 'recovery',
MotivationalMessage.trigger_condition == condition,
MotivationalMessage.is_active == True
)
).first()
return self._message_to_dict(message) if message else None
def _check_reminders(self, user_id: str) -> Optional[Dict]:
"""Verifica lembretes importantes"""
# Verificar última medição de saúde
last_metric = self.db.query(HealthMetric).filter(
HealthMetric.user_id == user_id
).order_by(HealthMetric.measurement_date.desc()).first()
if last_metric:
days_since_last = (datetime.now().date() - last_metric.measurement_date).days
if days_since_last >= 7:
condition = 'no_weight_7days'
elif days_since_last >= 3:
condition = 'no_weight_3days'
else:
condition = None
if condition:
message = self.db.query(MotivationalMessage).filter(
and_(
MotivationalMessage.message_type == 'reminder',
MotivationalMessage.trigger_condition == condition,
MotivationalMessage.is_active == True
)
).first()
if message:
return self._message_to_dict(message)
# Verificar hábitos pendentes
pending_habits = self.db.query(Habit).filter(
and_(
Habit.user_id == user_id,
Habit.is_active == True,
~Habit.id.in_(
self.db.query(HabitCompletion.habit_id).filter(
and_(
HabitCompletion.user_id == user_id,
HabitCompletion.completion_date == datetime.now().date()
)
)
)
)
).count()
if pending_habits > 0:
message = self.db.query(MotivationalMessage).filter(
and_(
MotivationalMessage.message_type == 'reminder',
MotivationalMessage.trigger_condition == 'habits_pending',
MotivationalMessage.is_active == True
)
).first()
if message:
msg_dict = self._message_to_dict(message)
msg_dict['message_text'] = f"🎯 Você tem {pending_habits} hábitos esperando por você hoje!"
return msg_dict
return None
def _check_progress(self, user_id: str) -> Optional[Dict]:
"""Verifica progresso e milestones"""
# Calcular consistência do mês
first_day_month = datetime.now().replace(day=1).date()
total_habits_month = self.db.query(func.count(Habit.id)).filter(
and_(
Habit.user_id == user_id,
Habit.start_date <= datetime.now().date()
)
).scalar()
completed_habits_month = self.db.query(func.count(HabitCompletion.id)).filter(
and_(
HabitCompletion.user_id == user_id,
HabitCompletion.completion_date >= first_day_month
)
).scalar()
if total_habits_month > 0:
consistency = (completed_habits_month / total_habits_month) * 100
if consistency >= 80:
message = self.db.query(MotivationalMessage).filter(
and_(
MotivationalMessage.message_type == 'progress_milestone',
MotivationalMessage.trigger_condition == 'consistency_80',
MotivationalMessage.is_active == True
)
).first()
if message:
msg_dict = self._message_to_dict(message)
msg_dict['message_text'] = f"💎 {int(consistency)}% de consistência esse mês! Top 10% dos usuários!"
return msg_dict
return None
def _get_time_based_message(self) -> Optional[Dict]:
"""Retorna mensagem baseada na hora do dia"""
hour = datetime.now().hour
if 5 <= hour < 12:
condition = 'morning'
elif 12 <= hour < 18:
condition = 'afternoon'
elif 18 <= hour < 22:
condition = 'evening'
else:
return None
message = self.db.query(MotivationalMessage).filter(
and_(
MotivationalMessage.message_type == 'time_based',
MotivationalMessage.trigger_condition == condition,
MotivationalMessage.is_active == True
)
).first()
return self._message_to_dict(message) if message else None
def _get_daily_motivation(self) -> Dict:
"""Retorna uma mensagem motivacional aleatória"""
messages = self.db.query(MotivationalMessage).filter(
and_(
MotivationalMessage.message_type == 'daily_motivation',
MotivationalMessage.is_active == True
)
).all()
if messages:
message = random.choice(messages)
return self._message_to_dict(message)
# Fallback
return {
'id': None,
'message_text': '💪 A transformação acontece um dia de cada vez. Esse dia é hoje!',
'icon': '💪',
'message_type': 'daily_motivation',
'priority': 1
}
def _message_to_dict(self, message: MotivationalMessage) -> Dict:
"""Converte mensagem para dicionário"""
return {
'id': str(message.id) if message.id else None,
'message_text': message.message_text,
'icon': message.icon,
'message_type': message.message_type,
'priority': message.priority
}
def _log_message(self, user_id: str, message: Dict):
"""Registra mensagem exibida no histórico"""
log = UserMessageLog(
user_id=user_id,
message_id=message.get('id'),
message_text=message['message_text'],
message_type=message['message_type']
)
self.db.add(log)
self.db.commit()