|
| 1 | +package main |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "flag" |
| 6 | + "fmt" |
| 7 | + "io" |
| 8 | + "log" |
| 9 | + "log/slog" |
| 10 | + "os" |
| 11 | + "os/signal" |
| 12 | + "path/filepath" |
| 13 | + "syscall" |
| 14 | + "time" |
| 15 | + |
| 16 | + "github.com/OCAP2/web/internal/server" |
| 17 | + "github.com/OCAP2/web/internal/conversion" |
| 18 | + "github.com/OCAP2/web/internal/storage" |
| 19 | + "github.com/labstack/echo/v4" |
| 20 | + "github.com/labstack/echo/v4/middleware" |
| 21 | +) |
| 22 | + |
| 23 | +func main() { |
| 24 | + if len(os.Args) > 1 && os.Args[1] == "convert" { |
| 25 | + if err := runConvert(os.Args[2:]); err != nil { |
| 26 | + log.Fatalf("convert: %v", err) |
| 27 | + } |
| 28 | + return |
| 29 | + } |
| 30 | + |
| 31 | + if err := app(); err != nil { |
| 32 | + log.Panicln(err) |
| 33 | + } |
| 34 | +} |
| 35 | + |
| 36 | +func runConvert(args []string) error { |
| 37 | + fs := flag.NewFlagSet("convert", flag.ExitOnError) |
| 38 | + inputFile := fs.String("input", "", "Convert a single JSON file") |
| 39 | + all := fs.Bool("all", false, "Convert all pending operations") |
| 40 | + status := fs.Bool("status", false, "Show conversion status of all operations") |
| 41 | + setFormat := fs.String("set-format", "", "Set storage format for an operation (use with --id)") |
| 42 | + opID := fs.Int64("id", 0, "Operation ID (for --set-format)") |
| 43 | + chunkSize := fs.Uint("chunk-size", 300, "Frames per chunk (default: 300)") |
| 44 | + format := fs.String("format", "protobuf", "Output format: protobuf or flatbuffers") |
| 45 | + |
| 46 | + fs.Usage = func() { |
| 47 | + fmt.Fprintf(os.Stderr, "Usage: %s convert [options]\n\n", os.Args[0]) |
| 48 | + fmt.Fprintf(os.Stderr, "Options:\n") |
| 49 | + fs.PrintDefaults() |
| 50 | + fmt.Fprintf(os.Stderr, "\nExamples:\n") |
| 51 | + fmt.Fprintf(os.Stderr, " %s convert --input mission.json.gz Convert to protobuf\n", os.Args[0]) |
| 52 | + fmt.Fprintf(os.Stderr, " %s convert --input mission.json.gz --format flatbuffers Convert to flatbuffers\n", os.Args[0]) |
| 53 | + fmt.Fprintf(os.Stderr, " %s convert --all Convert all pending\n", os.Args[0]) |
| 54 | + fmt.Fprintf(os.Stderr, " %s convert --status Show conversion status\n", os.Args[0]) |
| 55 | + fmt.Fprintf(os.Stderr, " %s convert --set-format flatbuffers --id 1 Set format for operation\n", os.Args[0]) |
| 56 | + } |
| 57 | + |
| 58 | + if err := fs.Parse(args); err != nil { |
| 59 | + return err |
| 60 | + } |
| 61 | + |
| 62 | + setting, err := server.NewSetting() |
| 63 | + if err != nil { |
| 64 | + return fmt.Errorf("setting: %w", err) |
| 65 | + } |
| 66 | + |
| 67 | + repo, err := server.NewRepoOperation(setting.DB) |
| 68 | + if err != nil { |
| 69 | + return fmt.Errorf("operation: %w", err) |
| 70 | + } |
| 71 | + |
| 72 | + ctx := context.Background() |
| 73 | + |
| 74 | + switch { |
| 75 | + case *status: |
| 76 | + return showConversionStatus(ctx, repo) |
| 77 | + |
| 78 | + case *setFormat != "": |
| 79 | + if *opID == 0 { |
| 80 | + return fmt.Errorf("--id is required when using --set-format") |
| 81 | + } |
| 82 | + if err := repo.UpdateStorageFormat(ctx, *opID, *setFormat); err != nil { |
| 83 | + return fmt.Errorf("update format: %w", err) |
| 84 | + } |
| 85 | + log.Printf("Updated operation %d to format: %s", *opID, *setFormat) |
| 86 | + return showConversionStatus(ctx, repo) |
| 87 | + |
| 88 | + case *inputFile != "": |
| 89 | + return convertSingleFile(ctx, *inputFile, setting.Data, uint32(*chunkSize), *format) |
| 90 | + |
| 91 | + case *all: |
| 92 | + return convertAll(ctx, repo, setting, uint32(*chunkSize), *format) |
| 93 | + |
| 94 | + default: |
| 95 | + fs.Usage() |
| 96 | + return nil |
| 97 | + } |
| 98 | +} |
| 99 | + |
| 100 | +func showConversionStatus(ctx context.Context, repo *server.RepoOperation) error { |
| 101 | + ops, err := repo.Select(ctx, server.Filter{}) |
| 102 | + if err != nil { |
| 103 | + return fmt.Errorf("select operations: %w", err) |
| 104 | + } |
| 105 | + |
| 106 | + fmt.Printf("%-6s %-30s %-10s %-12s\n", "ID", "Mission Name", "Format", "Status") |
| 107 | + fmt.Println(string(make([]byte, 62))) |
| 108 | + |
| 109 | + for _, op := range ops { |
| 110 | + name := op.MissionName |
| 111 | + if len(name) > 28 { |
| 112 | + name = name[:28] + ".." |
| 113 | + } |
| 114 | + fmt.Printf("%-6d %-30s %-10s %-12s\n", |
| 115 | + op.ID, name, op.StorageFormat, op.ConversionStatus) |
| 116 | + } |
| 117 | + |
| 118 | + return nil |
| 119 | +} |
| 120 | + |
| 121 | +func convertSingleFile(ctx context.Context, inputFile, dataDir string, chunkSize uint32, format string) error { |
| 122 | + // Determine output path |
| 123 | + baseName := filepath.Base(inputFile) |
| 124 | + if ext := filepath.Ext(baseName); ext == ".gz" { |
| 125 | + baseName = baseName[:len(baseName)-len(ext)] |
| 126 | + } |
| 127 | + if ext := filepath.Ext(baseName); ext == ".json" { |
| 128 | + baseName = baseName[:len(baseName)-len(ext)] |
| 129 | + } |
| 130 | + outputPath := filepath.Join(dataDir, baseName) |
| 131 | + |
| 132 | + log.Printf("Converting %s to %s (format: %s, chunk size: %d)", inputFile, outputPath, format, chunkSize) |
| 133 | + |
| 134 | + // Register engines if not already done |
| 135 | + storage.RegisterEngine(storage.NewProtobufEngine(dataDir)) |
| 136 | + storage.RegisterEngine(storage.NewFlatBuffersEngine(dataDir)) |
| 137 | + |
| 138 | + // Get the appropriate storage engine |
| 139 | + engine, err := storage.GetEngine(format) |
| 140 | + if err != nil { |
| 141 | + return fmt.Errorf("unknown format %q: %w", format, err) |
| 142 | + } |
| 143 | + |
| 144 | + if err := engine.Convert(ctx, inputFile, outputPath); err != nil { |
| 145 | + return fmt.Errorf("conversion failed: %w", err) |
| 146 | + } |
| 147 | + |
| 148 | + log.Printf("Conversion complete: %s", outputPath) |
| 149 | + return nil |
| 150 | +} |
| 151 | + |
| 152 | +func convertAll(ctx context.Context, repo *server.RepoOperation, setting server.Setting, chunkSize uint32, format string) error { |
| 153 | + operations, err := repo.SelectAll(ctx) |
| 154 | + if err != nil { |
| 155 | + return fmt.Errorf("select operations: %w", err) |
| 156 | + } |
| 157 | + |
| 158 | + if len(operations) == 0 { |
| 159 | + log.Println("No operations to convert") |
| 160 | + return nil |
| 161 | + } |
| 162 | + |
| 163 | + log.Printf("Found %d operations to convert (format: %s)", len(operations), format) |
| 164 | + |
| 165 | + worker := conversion.NewWorker( |
| 166 | + &repoAdapter{repo}, |
| 167 | + conversion.Config{ |
| 168 | + DataDir: setting.Data, |
| 169 | + ChunkSize: chunkSize, |
| 170 | + StorageFormat: format, |
| 171 | + }, |
| 172 | + ) |
| 173 | + |
| 174 | + for _, op := range operations { |
| 175 | + log.Printf("Converting operation %d: %s", op.ID, op.Filename) |
| 176 | + if err := worker.ConvertOne(ctx, op.ID, op.Filename); err != nil { |
| 177 | + log.Printf("Error converting %s: %v", op.Filename, err) |
| 178 | + // Update status to failed |
| 179 | + repo.UpdateConversionStatus(ctx, op.ID, "failed") |
| 180 | + } |
| 181 | + } |
| 182 | + |
| 183 | + // Show final status |
| 184 | + fmt.Println() |
| 185 | + return showConversionStatus(ctx, repo) |
| 186 | +} |
| 187 | + |
| 188 | +func app() error { |
| 189 | + setting, err := server.NewSetting() |
| 190 | + if err != nil { |
| 191 | + return fmt.Errorf("setting: %w", err) |
| 192 | + } |
| 193 | + |
| 194 | + // Configure structured JSON logging |
| 195 | + var logOutput io.Writer = os.Stdout |
| 196 | + var flog *os.File |
| 197 | + if setting.Logger { |
| 198 | + flog, err = os.OpenFile("ocap.log", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) |
| 199 | + if err != nil { |
| 200 | + return fmt.Errorf("open logger file: %w", err) |
| 201 | + } |
| 202 | + defer flog.Close() |
| 203 | + logOutput = io.MultiWriter(os.Stdout, flog) |
| 204 | + } |
| 205 | + |
| 206 | + // Set up slog with JSON handler for consistent logging |
| 207 | + slog.SetDefault(slog.New(slog.NewJSONHandler(logOutput, nil))) |
| 208 | + |
| 209 | + operation, err := server.NewRepoOperation(setting.DB) |
| 210 | + if err != nil { |
| 211 | + return fmt.Errorf("operation: %w", err) |
| 212 | + } |
| 213 | + |
| 214 | + marker, err := server.NewRepoMarker(setting.Markers) |
| 215 | + if err != nil { |
| 216 | + return fmt.Errorf("marker: %w", err) |
| 217 | + } |
| 218 | + |
| 219 | + ammo, err := server.NewRepoAmmo(setting.Ammo) |
| 220 | + if err != nil { |
| 221 | + return fmt.Errorf("ammo: %w", err) |
| 222 | + } |
| 223 | + |
| 224 | + e := echo.New() |
| 225 | + |
| 226 | + loggerConfig := middleware.DefaultLoggerConfig |
| 227 | + loggerConfig.Output = logOutput |
| 228 | + |
| 229 | + e.Use( |
| 230 | + middleware.LoggerWithConfig(loggerConfig), |
| 231 | + ) |
| 232 | + |
| 233 | + // Create conversion worker if enabled (before handler so we can pass it) |
| 234 | + ctx, cancel := context.WithCancel(context.Background()) |
| 235 | + defer cancel() |
| 236 | + |
| 237 | + var handlerOpts []server.HandlerOption |
| 238 | + if setting.Conversion.Enabled { |
| 239 | + interval, err := time.ParseDuration(setting.Conversion.Interval) |
| 240 | + if err != nil { |
| 241 | + log.Printf("Invalid conversion interval %q, using default 5m", setting.Conversion.Interval) |
| 242 | + interval = 5 * time.Minute |
| 243 | + } |
| 244 | + |
| 245 | + worker := conversion.NewWorker( |
| 246 | + &repoAdapter{operation}, |
| 247 | + conversion.Config{ |
| 248 | + DataDir: setting.Data, |
| 249 | + Interval: interval, |
| 250 | + BatchSize: setting.Conversion.BatchSize, |
| 251 | + ChunkSize: setting.Conversion.ChunkSize, |
| 252 | + StorageFormat: setting.Conversion.StorageEngine, |
| 253 | + }, |
| 254 | + ) |
| 255 | + |
| 256 | + // Pass worker to handler for event-driven conversion on upload |
| 257 | + handlerOpts = append(handlerOpts, server.WithConversionTrigger(worker)) |
| 258 | + |
| 259 | + // Start background worker for retries and batch processing |
| 260 | + go worker.Start(ctx) |
| 261 | + } |
| 262 | + |
| 263 | + server.NewHandler(e, operation, marker, ammo, setting, handlerOpts...) |
| 264 | + |
| 265 | + // Handle graceful shutdown |
| 266 | + go func() { |
| 267 | + quit := make(chan os.Signal, 1) |
| 268 | + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) |
| 269 | + <-quit |
| 270 | + cancel() |
| 271 | + e.Shutdown(context.Background()) |
| 272 | + }() |
| 273 | + |
| 274 | + err = e.Start(setting.Listen) |
| 275 | + if err != nil { |
| 276 | + return fmt.Errorf("start server: %w", err) |
| 277 | + } |
| 278 | + |
| 279 | + return nil |
| 280 | +} |
| 281 | + |
| 282 | +// repoAdapter adapts server.RepoOperation to conversion.OperationRepo |
| 283 | +type repoAdapter struct { |
| 284 | + repo *server.RepoOperation |
| 285 | +} |
| 286 | + |
| 287 | +func (a *repoAdapter) SelectPending(ctx context.Context, limit int) ([]conversion.Operation, error) { |
| 288 | + ops, err := a.repo.SelectPending(ctx, limit) |
| 289 | + if err != nil { |
| 290 | + return nil, err |
| 291 | + } |
| 292 | + result := make([]conversion.Operation, len(ops)) |
| 293 | + for i, op := range ops { |
| 294 | + result[i] = conversion.Operation{ |
| 295 | + ID: op.ID, |
| 296 | + Filename: op.Filename, |
| 297 | + } |
| 298 | + } |
| 299 | + return result, nil |
| 300 | +} |
| 301 | + |
| 302 | +func (a *repoAdapter) UpdateConversionStatus(ctx context.Context, id int64, status string) error { |
| 303 | + return a.repo.UpdateConversionStatus(ctx, id, status) |
| 304 | +} |
| 305 | + |
| 306 | +func (a *repoAdapter) UpdateStorageFormat(ctx context.Context, id int64, format string) error { |
| 307 | + return a.repo.UpdateStorageFormat(ctx, id, format) |
| 308 | +} |
| 309 | + |
| 310 | +func (a *repoAdapter) UpdateMissionDuration(ctx context.Context, id int64, duration float64) error { |
| 311 | + return a.repo.UpdateMissionDuration(ctx, id, duration) |
| 312 | +} |
0 commit comments