Skip to main content

Construindo uma Pipeline com Prefect 3

Este guia explica como criar e configurar pipelines de dados usando Prefect 3 na Prefeitura do Rio de Janeiro, seguindo as melhores práticas e padrões estabelecidos pela equipe IplanRio.

✅ Pré-requisitos

  • Acesso de leitura ao projeto BigQuery rj-iplanrio
  • Permissões de colaborador no repositório GitHub
  • Conhecimento básico de Git e GitHub
  • Python 3.8+ instalado
  • uv package manager instalado

🔧 Configuração Inicial

1. Clonar o Repositório

git clone https://github.com/prefeitura-rio/prefect_rj_iplanrio.git
cd prefect_rj_iplanrio

🚀 Boas Práticas de Desenvolvimento

1. Estrutura de Branch

# Criar branch para sua pipeline
git checkout -b staging/nome-da-pipeline

# Exemplo para pipeline de saúde
git checkout -b staging/pipeline-dados-saude-sms
IMPORTANTE: Use sempre o prefixo staging/ no nome da branch para que o CI/CD reconheça e processe sua pipeline automaticamente.

2. Nomenclatura de Pipelines

Siga o padrão estabelecido:
# ✅ Correto: Nome descritivo e em snake_case
@flow(log_prints=True)
def rj_cvl__os_info():
    pass

# ❌ Evite: Nomes genéricos
@flow(log_prints=True)
def extracao_de_dados():
    pass

🧪 Criação de Nova Pipeline

1. Usar Template Cookiecutter

O repositório utiliza cookiecutter para facilitar a criação de novas pipelines de forma padronizada. Os templates disponíveis em templates/ permitem gerar rapidamente a estrutura de diretórios e arquivos necessários para uma nova pipeline Prefect, incluindo Dockerfile, flow.py, prefect.yaml e pyproject.toml. Para criar uma nova pipeline, instale uv e rode:
uvx cookiecutter templates --output-dir=pipelines
Você será solicitado a informar valores como secretaria e pipeline, que serão utilizados para preencher os nomes dos diretórios, arquivos e variáveis nos templates. O template gerado já segue o padrão de nomenclatura definido anteriormente. Exemplo de interação:
$ uvx cookiecutter templates --output-dir=pipelines
secretaria [sms]: sms
pipeline [dados_pacientes]: dados_saude_sms

2. Configurar flow.py

Após gerar o template, substitua os valores padrão no arquivo flow.py pelos valores específicos da sua pipeline:
IMPORTANTE: É necessário solicitar à equipe de desenvolvimento da IplanRio para adicionar o usuário e senha do seu banco de origem no Infisical no caminho especificado em infisical_secret_path. Sem essas credenciais configuradas, sua pipeline não conseguirá se conectar ao banco de dados.
# Exemplo de configuração em flow.py
@flow(log_prints=True)
def pipeline_dados_saude_sms():
    # Configurações da base de dados
    db_database = "banco1"  # Nome do Banco de Origem
    db_host = "10.15.255.1"  # Host do Banco de Origem
    db_port = 3306  # Porta do Banco de Origem
    db_type = "mysql"  # Tipo do Banco de Origem
    
    # Configurações do dataset
    dataset_id = "nome_do_dataset"  # Dataset de destino em rj-iplanrio
    
    # Configurações do Infisical
    infisical_secret_path = "/db-seu-banco-de-origem"  # Caminho dos secrets
    
Parâmetros importantes:
  • db_database: Nome do banco de dados de origem (ex: banco1)
  • db_host: Host do banco de dados de origem (ex: 10.15.255.1)
  • db_port: Porta de conexão do banco de dados de origem (ex: 3306)
  • db_type: Tipo do banco de dados de origem (ex: mysql, postgres, etc)
  • dataset_id: Dataset de destino no BigQuery (todas as extrações são adicionadas ao projeto rj-iplanrio)
  • infisical_secret_path: Caminho dos secrets no Infisical

3. Configurar prefect.yaml

Configure os schedules no arquivo prefect.yaml conforme suas necessidades:

Schedule Overwrite

schedules:
  # Exemplo de schedule overwrite
  - interval: 86400 # executa a cada 24h
    anchor_date: "2022-01-01T01:00:00" # Data de inicio do schedule em UTC
    timezone: America/Sao_Paulo
    slug: dados_saude_sms_diario # slug do schedule
    parameters:
      table_id: pacientes_sms # Nome da tabela no BigQuery
      execute_query: |
        SELECT
          id_paciente,
          nome,
          data_nascimento,
          telefone
        FROM banco1.saude

Schedule Incremental (Incremental)

schedules:
  # Exemplo de schedule incremental
  - interval: 86400
    anchor_date: "2022-01-01T02:32:00"
    timezone: America/Sao_Paulo
    slug: dados_saude_sms_incremental
    parameters:
      table_id: pacientes_sms_incremental
      dump_mode: append
      partition_columns: data_atualizacao # Coluna de particionamento
      partition_date_format: "%Y-%m-%d" # Formato da data de particionamento
      break_query_frequency: day # Frequencia de particionamento
      break_query_start: current_day # Data de inicio do particionamento
      break_query_end: current_day # Data de fim do particionamento
      execute_query: |
        SELECT
          id_paciente,
          nome,
          data_nascimento,
          telefone,
          data_atualizacao
        FROM banco1.saude
Explicação dos parâmetros:
  • interval: Intervalo entre as runs em segundos (86400 = 24 horas)
  • anchor_date: Data de início do schedule em UTC
  • timezone: Fuso horário (America/Sao_Paulo)
  • slug: Identificador único do schedule
  • table_id: Nome da tabela no BigQuery
  • dump_mode: Modo de inserção (append para incremental, overwrite para substituição)
  • partition_columns: Coluna(s) para particionamento
  • partition_date_format: Formato da data de particionamento
  • break_query_frequency: Frequência de particionamento (day, month, year)
  • break_query_start/end: Período de dados a serem processados

4. Commit e Push

# 1. Adicionar arquivos
git add pipelines/sua_secretaria/sua_pipeline/

# 2. Commit com mensagem descritiva
git commit -m "feat: adiciona pipeline de dados de saúde SMS

- Cria pipeline para extração de dados SMS
- Configura schedule diário incremental
- Adiciona particionamento por data"

# 3. Push para branch
git push origin staging/sua-pipeline

5. Pull Request

  1. Criar PR no GitHub
  2. Descrição: Explicar mudanças e impacto
  3. Review: Solicitar revisão da equipe IplanRio
  4. Testes: Garantir que todos os testes passem
  5. Teste em Staging: Após o deploy em staging funcionar, você já pode testar sua pipeline no ambiente de desenvolvimento
  6. Merge: Após aprovação, merge para main

Workflow de CI/CD Automático

O repositório utiliza GitHub Actions para automatizar todo o ciclo de vida das pipelines, incluindo build, deploy e publicação de imagens Docker: 🚀 Deploy Automático dos Flows: O sistema possui dois workflows principais para deploy automático:
  • .github/workflows/deploy-prefect-flows-staging.yaml - Ambiente de Staging
    • Acionado a cada push em branches staging/*
    • Pode ser executado manualmente
    • Monitora alterações em pipelines/**
  • .github/workflows/deploy-prefect-flows-prod.yaml - Ambiente de Produção
    • Acionado a cada push na branch master
    • Pode ser executado manualmente
    • Monitora alterações em pipelines/**
🔧 Processo de Deploy: Ambos os workflows executam as seguintes etapas:
  1. Checkout do código-fonte
  2. Login no GitHub Container Registry (ghcr.io)
  3. Instalação das dependências Python com uv
  4. Execução do script de deploy .github/scripts/deploy_prefect_flows.py
    • Faz o deploy automático de todos os flows definidos em pipelines/*/prefect.yaml
    • Caso algum deploy falhe, o workflow é interrompido e o erro é registrado nos logs
🐳 Build e Publicação da Imagem Base: O workflow .github/workflows/build-and-push-root-dockerfile.yaml:
  • Acionado: Em alterações no Dockerfile da raiz ou push na branch master
  • Execução manual: Disponível quando necessário
  • Processo:
    • Build da imagem Docker definida no Dockerfile do repositório
    • Publicação da imagem no GitHub Container Registry (ghcr.io/${{ github.repository }}:latest)
O workflow de staging permite testar suas pipelines em um ambiente isolado antes do deploy em produção. Após o deploy em staging funcionar, você já pode testar sua pipeline no ambiente de desenvolvimento. Apenas após o merge na branch master é que as pipelines são deployadas em produção.
IMPORTANTE: Se algum deploy falhar durante o processo, o workflow será interrompido e você precisará corrigir os problemas antes de tentar novamente.
📊 Monitoramento do CI/CD:
  • Acompanhe o progresso dos deploys na aba “Actions” do seu repositório
  • Verifique os logs para identificar possíveis erros
  • Aguarde a conclusão bem-sucedida antes de solicitar review da equipe
  • Se houver falhas, corrija os problemas e faça novo commit - o CI será executado novamente

🔧 Troubleshooting

Problemas Comuns

Erro de conexão com banco de dados:
  • Verifique se as credenciais estão corretas no Infisical
  • Confirme se o host e porta estão acessíveis
  • Teste a conexão manualmente
Falha no deploy:
  • Verifique os logs na aba “Actions” do GitHub
  • Confirme se todos os arquivos necessários foram commitados
  • Valide a sintaxe do prefect.yaml
Pipeline não executa no schedule:
  • Verifique se o anchor_date está no passado
  • Confirme se o timezone está correto
  • Valide se o interval está em segundos
Dados não aparecem no BigQuery:
  • Verifique se o dataset_id está correto
  • Confirme se a query está retornando dados
  • Valide as permissões de escrita no projeto rj-iplanrio

📚 Recursos Adicionais


I