Migração de Pipeline: Prefect 1.4 → Prefect 3.0

Visão Geral

Este guia irá ajudá-lo a entender as principais diferenças e como migrar sua pipeline de forma segura.

Principais Mudanças

ComponentePrefect 1.4Prefect 3.0
SchedulesPython codeYAML configuration
Flows@task + Flow()@flow + @task
DeploymentsPython scriptsYAML files
AgentsAgentsWork Pools
ConfigurationPython/JSONYAML

Pré-requisitos

Antes de começar a migração, certifique-se de que você possui:
  • ✅ Pipeline Prefect 1.4 funcionando corretamente
  • ✅ Acesso completo ao código fonte da pipeline
  • ✅ Conhecimento básico de YAML
  • ✅ Acesso ao repositório Git da pipeline

Estrutura de Arquivos

Estrutura Atual (Prefect 1.4)

minha-pipeline/
├── flow.py              # Definição do flow e tasks
├── schedules.py         # Configuração de schedules

Estrutura Nova (Prefect 3.0)

minha-pipeline/
├── flow.py              # Flow com @flow decorator
├── prefect.yaml         # Configuração principal
├── Dockerfile           # Container
└── pyproject.toml       # Dependências Python

Passo a Passo da Migração

💡 Dica para Usuários de IDEs com LLM: Se você usa Cursor ou outra IDE com LLM integrada, pode usar o seguinte prompt para automatizar a migração:
Quero migrar um flow do Prefect 1.4 para o Prefect 3.0. Para isso, siga as etapas abaixo, usando apenas os arquivos cujos caminhos fornecerei:

### Arquivos:
- Schedule original (Prefect 1.4): [CAMINHO_DO_SCHEDULE_ANTIGO]
- YAML de referência (Prefect 3.0): [CAMINHO_YAML_REFERENCIA]
- YAML a ser criado (Prefect 3.0): [CAMINHO_YAML_NOVO]

- Flow base (bem escrito): [CAMINHO_FLOW_REFERENCIA]
- Flow a ser ajustado: [CAMINHO_FLOW_A_MODIFICAR]

---

### Etapas:

1. **Migrar o Schedule:**
   - Pegue as informações do `schedule` do Prefect 1.4 (arquivo original).
   - Converta para formato compatível com o Prefect 3.0.
   - Aplique o estilo e formatação usados no YAML de referência.
   - Gere um novo arquivo YAML no local especificado.

2. **Adaptar o Flow:**
   - Use o flow de referência como modelo de estilo e boas práticas.
   - Modifique o flow de destino para fazer a mesma coisa (ou o equivalente), mas adaptando os nomes de variáveis, funções e fluxo de execução conforme o nome da pasta onde o flow de destino está localizado (use o nome da pasta final do caminho como referência).
   - Garanta compatibilidade com Prefect 3.0.

3. **Output:**
   - Gere o conteúdo final do YAML convertido e do novo flow modificado.
   - Comente resumidamente o que foi alterado em relação aos originais.

1. Gerar Estrutura Base com CookieCutter

Use nosso template do cookiecutter para gerar rapidamente a estrutura de diretórios e arquivos necessários para uma nova pipeline Prefect. Para criar uma nova pipeline, instale uv e rode:
uvx cookiecutter templates --output-dir=pipelines
Você será solicitado a informar valores como secretaria e pipeline, que serão utilizados para preencher os nomes dos diretórios, arquivos e variáveis nos templates. O template gerado já segue o padrão de nomenclatura definido anteriormente. O template irá gerar automaticamente:
  • flow.py - Estrutura base do flow
  • prefect.yaml - Configuração do deployment
  • Dockerfile - Container para execução
  • pyproject.toml - Dependências do projeto
  • .dockerignore - Arquivos ignorados no build

2. Análise da Pipeline Atual

Antes de migrar, analise sua pipeline atual:

2.1 Analisar Configurações Atuais

Identifique as configurações da sua pipeline de dump de banco:
  • Schedules: Frequência, horário e timezone de execução
  • Parâmetros: Configurações específicas do banco e tabelas
  • Dependências: Imports e bibliotecas utilizadas

3. Migração do Schedule

3.1 Analisar o Schedule Atual

Examine seu arquivo schedules.py atual:
# Exemplo: schedules.py (Prefect 1.4)
from prefect.schedules import Schedule
from datetime import timedelta, datetime
import pytz
from prefeitura_rio.pipelines_utils.prefect import generate_dump_db_schedules

# Configuração do schedule
sme_clocks = generate_dump_db_schedules(
    interval=timedelta(days=1),
    start_date=datetime(2022, 1, 1, 21, 0, tzinfo=pytz.timezone("America/Sao_Paulo")),
    table_parameters=sme_core_sso_queries,
)

schedule = Schedule(clocks=untuple(sme_clocks))

3.2 Converter para YAML

Crie a seção de schedules no prefect.yaml:
# prefect.yaml
name: minha-pipeline
prefect-version: 3.4.3

deployments:
  - name: minha-pipeline--prod
    entrypoint: flow.py:main_flow
    work_pool:
      name: default-agent-pool
      work_queue_name: default
    schedules:
      - interval: 86400  # 24 horas em segundos
        anchor_date: "2022-01-01T21:00:00"
        timezone: America/Sao_Paulo
        slug: daily-execution
        parameters:
          table_id: minha_tabela
          execute_query: |
            SELECT * FROM minha_tabela 
            WHERE data_atualizacao >= CURRENT_DATE - 1;

3.3 Mapeamento de Configurações

Prefect 1.4Prefect 3.0Exemplo
timedelta(days=1)interval: 8640086400 segundos = 1 dia
timedelta(hours=6)interval: 2160021600 segundos = 6 horas
start_dateanchor_date”2022-01-01T21:00:00”
timezonetimezone”America/Sao_Paulo”
table_parametersparametersParâmetros específicos

4. Migração do Flow

4.1 Identificar o Template

Para pipelines de dump de banco, use o template específico:
from iplanrio.pipelines_templates.dump_db.tasks import (
    dump_upload_batch_task,
    format_partitioned_query_task,
)

4.2 Adaptar Imports

# Antes (Prefect 1.4)
from prefect import task, Flow
from prefect.schedules import Schedule
from prefeitura_rio.pipelines_utils.prefect import generate_dump_db_schedules

# Depois (Prefect 3.0)
from prefect import task, flow
from iplanrio.pipelines_templates.dump_db.tasks import (
    dump_upload_batch_task,
    format_partitioned_query_task,
)

4.3 Adaptar Decorators

# Antes (Prefect 1.4)
@task
def minha_task():
    return "resultado"

def main_flow():
    result = minha_task()
    return result

# Criar o flow
with Flow("minha-pipeline") as flow:
    main_flow()

# Depois (Prefect 3.0)
@task
def minha_task():
    return "resultado"

@flow(log_prints=True, name="minha-pipeline")
def main_flow():
    result = minha_task()
    return result

4.4 Adaptar Parâmetros

# Antes (Prefect 1.4)
def main_flow(param1, param2="default"):
    # lógica da pipeline
    pass

# Depois (Prefect 3.0)
@flow(log_prints=True)
def main_flow(
    param1: str,
    param2: str = "default",
    param3: Optional[str] = None,
):
    # lógica da pipeline
    pass

5. Configuração do prefect.yaml

5.1 Estrutura Básica

name: minha-pipeline
prefect-version: 3.4.3

build:
  - prefect.deployments.steps.run_shell_script:
      id: get-commit-hash
      script: git rev-parse --short HEAD
      stream_output: false
  - prefect_docker.deployments.steps.build_docker_image:
      id: build-image
      requires: prefect-docker>=0.6.5
      image_name: seu-registro/imagem
      tag: "minha-pipeline-{{ get-commit-hash.stdout }}"
      dockerfile: Dockerfile

deployments:
  - name: minha-pipeline--staging
    version: "{{ build-image.tag }}"
    entrypoint: flow.py:main_flow
    work_pool:
      name: default-agent-pool
      work_queue_name: default
      job_variables:
        image: "{{ build-image.image_name }}:{{ build-image.tag }}"
        command: python -m prefect flow-run execute

5.2 Configuração de Schedules

deployments:
  - name: minha-pipeline--prod
    version: "{{ build-image.tag }}"
    entrypoint: flow.py:main_flow
    work_pool:
      name: default-agent-pool
      work_queue_name: default
      job_variables:
        image: "{{ build-image.image_name }}:{{ build-image.tag }}"
        command: python -m prefect flow-run execute
    schedules:
      - interval: 86400  # 24 horas
        anchor_date: "2022-01-01T21:00:00"
        timezone: America/Sao_Paulo
        slug: daily-execution
        parameters:
          param1: valor1
          param2: valor2

6. Configuração de Work Pool

Este repositório utiliza dois work pools principais para execução dos deployments Prefect:
  • default-pool: Destinado à execução geral de pipelines, incluindo fluxos que não possuem requisitos especiais de rede ou infraestrutura. É o pool padrão para a maioria dos deployments.
  • datario-pool: Utilizado para pipelines que acessam bancos de dados ou sistemas internos da IplanRio, especialmente aqueles que exigem conexão via VPN. Esse pool garante que os jobs sejam executados em ambientes com acesso seguro e autorizado aos recursos internos.
Para pipelines de dump de banco de dados, use o datario-pool:
work_pool:
  name: datario-pool
  work_queue_name: default
  job_variables:
    image: "{{ build-image.image_name }}:{{ build-image.tag }}"
    command: python -m prefect flow-run execute

7. Migração de Secrets para Infisical

Para pipelines de dump de banco de dados, é necessário configurar as credenciais de acesso no Infisical. Acesse infisical.iplan.dados.rio e configure os secrets nos projetos:
  • prefect-jobs (produção)
  • prefect-jobs-staging (staging)

7.1 Estrutura de Pastas

Crie uma pasta com o nome da pipeline seguindo o seguinte padrão. Exemplo para a pipeline db-gestao-escolar:
Pipeline: db-gestao-escolar
├── Pasta: db-gestao-escolar
    ├── DB_GESTAO_ESCOLAR__DB_USERNAME
    ├── DB_GESTAO_ESCOLAR__DB_PASSWORD
    └── Outras variáveis específicas

7.2 Padrão de Nomenclatura

Para pipelines de ingestão de banco de dados, use sempre o padrão:
  • {PIPELINE_NAME_UPPER}__DB_USERNAME
  • {PIPELINE_NAME_UPPER}__DB_PASSWORD

7.3 Configuração no Flow

No seu flow, referencie as variáveis usando o caminho do Infisical:
@flow(log_prints=True)
def dump_database_flow(
    # ... outros parâmetros ...
    infisical_secret_path: str = "/db-gestao-escolar",
):
    # O Prefect irá buscar automaticamente as variáveis
    # DB_GESTAO_ESCOLAR__DB_USERNAME e DB_GESTAO_ESCOLAR__DB_PASSWORD
    # no caminho /db-gestao-escolar do Infisical

Exemplos Práticos

Exemplo Completo

Flow (flow.py)

from prefect import flow, task
from iplanrio.pipelines_templates.dump_db.tasks import (
    dump_upload_batch_task,
    format_partitioned_query_task,
)

@flow(log_prints=True)
def dump_database_flow(
    db_database: str = "meu_banco",
    db_host: str = "localhost",
    db_port: str = "1433",
    db_type: str = "sql_server",
    execute_query: str = "SELECT * FROM minha_tabela",
    dataset_id: str = "meu_dataset",
    table_id: str = "minha_tabela",
    infisical_secret_path: str = "/secrets/database",
    dump_mode: str = "overwrite",
    batch_size: int = 50000,
    biglake_table: bool = True,
):
    print(f"Iniciando dump da tabela {table_id}")
    
    # Executar o dump
    result = dump_upload_batch_task(
        db_database=db_database,
        db_host=db_host,
        db_port=db_port,
        db_type=db_type,
        execute_query=execute_query,
        dataset_id=dataset_id,
        table_id=table_id,
        infisical_secret_path=infisical_secret_path,
        dump_mode=dump_mode,
        batch_size=batch_size,
        biglake_table=biglake_table,
    )
    
    print(f"Dump concluído: {result}")
    return result

Configuração (prefect.yaml)

name: dump-database-pipeline
prefect-version: 3.4.3

build:
  - prefect.deployments.steps.run_shell_script:
      id: get-commit-hash
      script: git rev-parse --short HEAD
      stream_output: false
  - prefect_docker.deployments.steps.build_docker_image:
      id: build-image
      requires: prefect-docker>=0.6.5
      image_name: seu-registro/imagem
      tag: "dump-database-{{ get-commit-hash.stdout }}"
      dockerfile: Dockerfile

deployments:
  - name: dump-database--prod
    version: "{{ build-image.tag }}"
    entrypoint: flow.py:dump_database_flow
    work_pool:
      name: datario-pool
      work_queue_name: default
      job_variables:
        image: "{{ build-image.image_name }}:{{ build-image.tag }}"
        command: python -m prefect flow-run execute
    schedules:
      - interval: 86400
        anchor_date: "2022-01-01T21:00:00"
        timezone: America/Sao_Paulo
        slug: daily-dump
        parameters:
          db_database: "meu_banco"
          table_id: "minha_tabela"
          execute_query: "SELECT * FROM minha_tabela WHERE data_atualizacao >= CURRENT_DATE - 1"