Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 90 additions & 78 deletions FurLab.CLI/Commands/QueryCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public static class QueryCommand
},
ShouldHandle = static args =>
{
var handled = args.Outcome.Exception is NpgsqlException or TimeoutException or OperationCanceledException;
var handled = args.Outcome.Exception is NpgsqlException or TimeoutException;
return new ValueTask<bool>(handled);
}
})
Expand Down Expand Up @@ -305,13 +305,43 @@ private static async Task ExecuteOnSelectedServers(List<ServerConfigEntry> selec
var errorFilePath = Path.Combine(executionDirectory, $"{timestamp}_erros.csv");
var logFilePath = Path.Combine(executionDirectory, $"{timestamp}_log.csv");

using var cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
cts.Cancel();
};

var allDatabases = new List<(ServerConfigEntry Server, string Database)>();
foreach (var server in selectedServers)
var hasAutoDiscover = selectedServers.Any(s => s.FetchAllDatabases);

if (hasAutoDiscover)
{
await AnsiConsole.Status()
.Spinner(Spinner.Known.Dots)
.SpinnerStyle(Style.Parse("grey"))
.StartAsync("Discovering databases...", async ctx =>
{
foreach (var server in selectedServers)
{
ctx.Status($"Discovering databases on [bold]{Markup.Escape(server.Name)}[/]...");
var databases = await GetDatabasesForServerAsync(server, cts.Token);
foreach (var db in databases)
{
allDatabases.Add((server, db));
}
}
});
}
else
{
var databases = await GetDatabasesForServerAsync(server, CancellationToken.None);
foreach (var db in databases)
foreach (var server in selectedServers)
{
allDatabases.Add((server, db));
var databases = await GetDatabasesForServerAsync(server, cts.Token);
foreach (var db in databases)
{
allDatabases.Add((server, db));
}
}
}

Expand Down Expand Up @@ -363,26 +393,30 @@ private static async Task ExecuteOnSelectedServers(List<ServerConfigEntry> selec
}
});

await AnsiConsole.Progress()
.Columns(new ProgressColumn[]
{
new TaskDescriptionColumn(),
new ProgressBarColumn(),
new PercentageColumn(),
new RemainingTimeColumn(),
new SpinnerColumn()
})
var resultsTable = new Table()
.Border(TableBorder.Rounded)
.AddColumn(new TableColumn("[grey]Status[/]").Centered().Width(8))
.AddColumn(new TableColumn("[grey]Server[/]"))
.AddColumn(new TableColumn("[grey]Database[/]"))
.AddColumn(new TableColumn("[grey]Rows[/]").RightAligned())
.AddColumn(new TableColumn("[grey]Duration[/]").RightAligned())
.AddColumn(new TableColumn("[grey]Detail[/]"));

var tableLock = new object();

await AnsiConsole.Live(resultsTable)
.AutoClear(false)
.Overflow(VerticalOverflow.Ellipsis)
.StartAsync(async ctx =>
{
var progressTask = ctx.AddTask($"[bold]Executing across {selectedServers.Count} servers, {totalDatabases} databases[/]", maxValue: totalDatabases);
ctx.Refresh();

var parallelOptions = new ParallelOptions
{
MaxDegreeOfParallelism = defaults.MaxParallelism
MaxDegreeOfParallelism = defaults.MaxParallelism,
CancellationToken = cts.Token
};

var completedCount = 0;

await Parallel.ForEachAsync(selectedServers, parallelOptions, async (server, ct) =>
{
var databases = allDatabases
Expand All @@ -392,7 +426,11 @@ await Parallel.ForEachAsync(selectedServers, parallelOptions, async (server, ct)

if (databases.Count == 0)
{
ctx.AddTask($"[yellow]{Markup.Escape(server.Name)}: no databases[/]").StopTask();
lock (tableLock)
{
resultsTable.AddRow("[yellow]—[/]", Markup.Escape(server.Name), "[grey]—[/]", "[grey]—[/]", "[grey]—[/]", "[yellow]no databases[/]");
ctx.Refresh();
}
return;
}

Expand All @@ -418,7 +456,17 @@ await Parallel.ForEachAsync(databases, serverParallelOptions, async (database, d
Interlocked.Increment(ref successCount);
Interlocked.Add(ref totalRowCount, queryResult.Data.Count);

ctx.AddTask($" [green]✓ {Markup.Escape(server.Name)}/{Markup.Escape(database)}[/] — {queryResult.Data.Count} rows ({sw.Elapsed.TotalSeconds:F1}s)").StopTask();
lock (tableLock)
{
resultsTable.AddRow(
"[green]✓[/]",
Markup.Escape(server.Name),
Markup.Escape(database),
queryResult.Data.Count.ToString(CultureInfo.InvariantCulture),
$"{sw.Elapsed.TotalSeconds:F1}s",
"[grey]—[/]");
ctx.Refresh();
}
}
catch (Exception ex)
{
Expand All @@ -428,19 +476,24 @@ await Parallel.ForEachAsync(databases, serverParallelOptions, async (database, d

Interlocked.Increment(ref failureCount);

ctx.AddTask($" [red]✗ {Markup.Escape(server.Name)}/{Markup.Escape(database)}[/] — {Markup.Escape(ex.Message)}").StopTask();
lock (tableLock)
{
resultsTable.AddRow(
"[red]✗[/]",
Markup.Escape(server.Name),
Markup.Escape(database),
"[grey]—[/]",
"[grey]—[/]",
$"[red]{Markup.Escape(ex.Message)}[/]");
ctx.Refresh();
}
}

Interlocked.Increment(ref completedCount);
progressTask.Increment(1);
});
});

progressTask.StopTask();
});

channel.Writer.Complete();
await writerCompleted.Task;
await writerCompleted.Task.WaitAsync(cts.Token);

CsvExporter.MergeServerCsvsToConsolidated(executionDirectory, timestamp, selectedServers.Select(s => s.Name).ToList());

Expand Down Expand Up @@ -516,79 +569,38 @@ await Parallel.ForEachAsync(databases, serverParallelOptions, async (database, d
}

/// <summary>
/// Gets the list of accessible databases for a server.
/// Uses explicitly configured databases, auto-discovers via <c>pg_database</c> query when
/// <c>FetchAllDatabases</c> is true, and falls back to configured databases if discovery fails.
/// Each candidate database is validated by attempting a test connection.
/// Gets the list of databases for a server.
/// When <c>FetchAllDatabases</c> is true, auto-discovers via <c>pg_database</c> query,
/// falling back to configured databases if discovery fails.
/// Otherwise, returns the databases listed in the server configuration directly.
/// </summary>
private static async Task<List<string>> GetDatabasesForServerAsync(ServerConfigEntry server, CancellationToken ct)
{
if (!server.FetchAllDatabases && server.Databases.Count > 0)
{
return await ValidateDatabaseAccessAsync(server, server.Databases, ct);
}

if (server.FetchAllDatabases)
{
try
{
var discoveredDatabases = await ListDatabasesAsync(server, ct);
return await ValidateDatabaseAccessAsync(server, discoveredDatabases, ct);
return await ListDatabasesAsync(server, ct);
}
catch (Exception ex)
{
AnsiConsole.MarkupLine($"[yellow]Warning: Auto-discovery failed for '{server.Name}': {ex.Message}[/]");
if (server.Databases.Count > 0)
{
AnsiConsole.MarkupLine("[yellow]Falling back to configured databases.[/]");
return await ValidateDatabaseAccessAsync(server, server.Databases, ct);
return server.Databases;
}
return [];
}
}

var defaultDb = server.Databases.FirstOrDefault() ?? string.Empty;
if (string.IsNullOrEmpty(defaultDb))
{
return [];
}

return await ValidateDatabaseAccessAsync(server, [defaultDb], ct);
}

/// <summary>
/// Validates access to each database by attempting a test connection and a <c>SELECT 1</c> query.
/// Returns only databases that are accessible; inaccessible databases are logged as warnings.
/// </summary>
private static async Task<List<string>> ValidateDatabaseAccessAsync(ServerConfigEntry server, List<string> databases, CancellationToken ct)
{
var accessibleDatabases = new List<string>();

foreach (var database in databases)
if (server.Databases.Count > 0)
{
if (string.IsNullOrWhiteSpace(database))
{
continue;
}

try
{
var connectionString = BuildConnectionStringForServer(server, database);
await using var connection = new NpgsqlConnection(connectionString);
await connection.OpenAsync(ct);

await using var command = new NpgsqlCommand("SELECT 1", connection);
await command.ExecuteScalarAsync(ct);

accessibleDatabases.Add(database);
}
catch (Exception ex)
{
AnsiConsole.MarkupLine($"[yellow]Warning: Cannot access database '{database}' on server '{server.Name}': {ex.Message}[/]");
}
return server.Databases;
}

return accessibleDatabases;
var defaultDb = server.Databases.FirstOrDefault() ?? string.Empty;
return string.IsNullOrEmpty(defaultDb) ? [] : [defaultDb];
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
schema: spec-driven
created: 2026-04-15
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
## Contexto

O `QueryCommand` cria um `CancellationTokenSource` interno e registra um handler de `Console.CancelKeyPress` para cancelar o token quando o usuário pressiona Ctrl+C. Porém, há três pontos onde o token não chega corretamente:

1. O `ParallelOptions` do loop externo (`Parallel.ForEachAsync` por servidor, linha 414) não recebe `CancellationToken`, fazendo com que o `ct` injetado pelo framework nos lambdas seja `CancellationToken.None`.
2. O `ResiliencePipeline` do Polly tem `ShouldHandle` que inclui `OperationCanceledException`, causando até 3 retentativas (com backoff de 500ms, 1000ms, 2000ms) após o cancelamento.
3. O `await writerCompleted.Task` não recebe nenhum token, podendo travar indefinidamente após as queries serem interrompidas.

## Objetivos / Não-Objetivos

**Objetivos:**
- Ctrl+C deve interromper as queries em andamento imediatamente (no cliente e no servidor PostgreSQL)
- O processo deve encerrar limpo, sem travar, após o cancelamento
- O Polly não deve retentar operações canceladas intencionalmente

**Não-Objetivos:**
- Não alterar a lógica de retry para falhas transientes de rede (`NpgsqlException`, `TimeoutException`)
- Não adicionar confirmação ou prompt antes de cancelar
- Não alterar comportamento de cancelamento em outros comandos (`DatabaseCommand`, etc.)

## Decisões

### Decisão 1: Passar `cts.Token` ao `ParallelOptions` externo

O `ParallelOptions` da linha 414 deve receber `CancellationToken = cts.Token`. Isso faz com que o `ct` injetado pelo framework nos lambdas seja o token real, que é repassado corretamente ao loop interno e às chamadas Npgsql.

**Alternativa considerada:** Capturar `cts.Token` por closure no lambda externo em vez de usar `ParallelOptions.CancellationToken`. Rejeitado porque o `Parallel.ForEachAsync` não interromperia o agendamento de novas iterações — seria necessário checar manualmente o token no início de cada lambda.

---

### Decisão 2: Remover `OperationCanceledException` do `ShouldHandle` do Polly

O `ResiliencePipeline` estático deve tratar apenas `NpgsqlException` e `TimeoutException` como erros retryáveis. `OperationCanceledException` representa cancelamento intencional e não deve ser retentado.

**Alternativa considerada:** Criar um segundo pipeline sem retry para uso com cancelamento. Rejeitado por complexidade desnecessária — o caminho certo é não retentar cancelamentos nunca.

---

### Decisão 3: Passar o token ao `writerCompleted.Task` via `WaitAsync`

Substituir `await writerCompleted.Task` por `await writerCompleted.Task.WaitAsync(cts.Token)` para garantir que o processo não trave após cancelamento. Se o writer não drenar o channel a tempo, o `WaitAsync` lança `OperationCanceledException` e o processo encerra.

**Alternativa considerada:** Adicionar um timeout fixo (ex: 5s). Rejeitado porque o comportamento correto é respeitar o mesmo token de cancelamento já em uso, sem introduzir valores mágicos.

## Riscos / Trade-offs

- **[Risco] Perda de linhas já lidas mas não escritas no CSV** → Após cancelamento, o channel pode conter linhas em trânsito que não serão persistidas. Mitigação: aceitável — o usuário pediu para parar; o log e o CSV ficam parciais, mas a execução encerra limpa.
- **[Trade-off] `WaitAsync` pode lançar se o writer ainda não drenou** → O writer task continuará rodando em background por um momento até o GC coletar. Não causa vazamento — o processo encerra logo após o `OperationCanceledException` ser capturado pelo `Program.Main`.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
## Por que

O comando `fur query run` não responde ao Ctrl+C durante a execução de queries. O `CancellationToken` criado pelo handler de Ctrl+C nunca é passado para o `Parallel.ForEachAsync` externo (por servidor), fazendo com que todas as queries em andamento continuem executando até o fim no PostgreSQL, independentemente de quantas vezes o usuário pressione Ctrl+C.

## O que Muda

- O `CancellationToken` do `CancellationTokenSource` interno deve ser passado ao `ParallelOptions` do loop externo (`Parallel.ForEachAsync` por servidor)
- O Polly não deve retentar `OperationCanceledException` — cancelamento é intencional, não uma falha transiente
- O `await writerCompleted.Task` deve respeitar o cancelamento para evitar travamento após as queries serem interrompidas

## Capacidades

### Novas Capacidades

Nenhuma.

### Capacidades Modificadas

- `query-run-multi-server-execution`: O comportamento de cancelamento via Ctrl+C muda — queries em andamento devem ser interrompidas imediatamente ao sinal de cancelamento, e o Polly não deve retentar operações canceladas intencionalmente.

## Impacto

- `FurLab.CLI/Commands/QueryCommand.cs`: ajustes no `ResiliencePipeline` (ShouldHandle), no `ParallelOptions` externo e no `await writerCompleted.Task`
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
## MODIFICADO Requirements

### Requirement: Retry com Polly
O sistema DEVE usar Polly para retries automáticos em falhas transitórias de conexão. O sistema NÃO DEVE retentar operações canceladas intencionalmente pelo usuário.

#### Cenário: Falha transiente de rede
- **QUANDO** conexão falha por timeout ou erro transitório de rede (`NpgsqlException` ou `TimeoutException`)
- **ENTÃO** sistema tenta reconectar até 3 vezes com backoff exponencial
- **E** se todas as tentativas falham, registra erro e continua

#### Cenário: Cancelamento pelo usuário durante retry
- **QUANDO** usuário pressiona Ctrl+C enquanto uma query está sendo executada ou retentada
- **ENTÃO** sistema NÃO inicia nova tentativa
- **E** sistema encerra a execução imediatamente

## ADICIONADO Requirements

### Requirement: Cancelamento via Ctrl+C
O sistema DEVE interromper todas as queries em andamento quando o usuário pressiona Ctrl+C, encerrando o processo limpo sem travar.

#### Cenário: Ctrl+C durante execução de query
- **QUANDO** usuário pressiona Ctrl+C durante a execução de queries
- **ENTÃO** sistema cancela todas as queries em andamento em todos os servidores e databases
- **E** sistema exibe mensagem de cancelamento
- **E** sistema encerra com exit code 130

#### Cenário: Ctrl+C com múltiplos servidores em paralelo
- **QUANDO** usuário pressiona Ctrl+C enquanto queries estão rodando em múltiplos servidores simultaneamente
- **ENTÃO** sistema cancela todos os loops paralelos (por servidor e por database)
- **E** nenhuma nova query é iniciada após o sinal de cancelamento

#### Cenário: Ctrl+C pressionado múltiplas vezes
- **QUANDO** usuário pressiona Ctrl+C mais de uma vez
- **ENTÃO** sistema encerra da mesma forma que no primeiro Ctrl+C
- **E** não há travamento nem execução duplicada de cancelamento
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
## 1. Corrigir propagação do CancellationToken

- [x] 1.1 Em `QueryCommand.cs`, adicionar `CancellationToken = cts.Token` ao `ParallelOptions` externo (linha ~414) do `Parallel.ForEachAsync` por servidor

## 2. Corrigir o ResiliencePipeline do Polly

- [x] 2.1 Em `QueryCommand.cs`, remover `OperationCanceledException` do `ShouldHandle` do `ResiliencePipeline` estático, mantendo apenas `NpgsqlException` e `TimeoutException`

## 3. Corrigir o await do writer task

- [x] 3.1 Em `QueryCommand.cs`, substituir `await writerCompleted.Task` por `await writerCompleted.Task.WaitAsync(cts.Token)` para evitar travamento após cancelamento

## 4. Verificação

- [x] 4.1 Executar `fur query run` com uma query longa e confirmar que Ctrl+C interrompe a execução imediatamente
- [x] 4.2 Confirmar que o processo encerra com exit code 130
- [x] 4.3 Confirmar que falhas transientes de rede ainda são retentadas (comportamento do Polly preservado)
- [x] 4.4 Executar os testes existentes e confirmar que não há regressões
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
schema: spec-driven
created: 2026-04-14
Loading
Loading