Skip to content
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e257a21
Added SMP support to runners
SebiWrn Oct 15, 2025
f1ee130
lint fix
SebiWrn Oct 15, 2025
0921d53
Update runner/pkg/actions/stream.go
SebiWrn Oct 16, 2025
937d9b4
Update runner/pkg/actions/stream_end.go
SebiWrn Oct 16, 2025
a47752b
Update runner/serverhandler.go
SebiWrn Oct 16, 2025
8d13d7b
Update pkg/runner_manager/manager.go
SebiWrn Oct 16, 2025
4900eee
Suggested changes
SebiWrn Oct 16, 2025
252e78d
Suggested changes
SebiWrn Oct 16, 2025
8e9e867
Default outputOptions
SebiWrn Oct 16, 2025
037ef4b
Add selfstream to runners
SebiWrn Oct 15, 2025
c406fe2
Fixed failing test
SebiWrn Oct 15, 2025
eb8a2c4
Empty commit to trigger actions
SebiWrn Oct 15, 2025
f584c2a
Fixed a bug that starts multiple selfstreams for the same stream
SebiWrn Oct 16, 2025
515502a
Adapted bitrates
SebiWrn Oct 16, 2025
a6a86eb
golangci-lint fix
SebiWrn Oct 16, 2025
8a1e514
Merge branch 'dev' into runner/add-selfstream
SebiWrn Oct 16, 2025
cf4e33d
Suggested changes
SebiWrn Oct 17, 2025
4df8fc8
Added only post requests to onPublish
SebiWrn Oct 17, 2025
8aaa8f2
Added ingest as own package
SebiWrn Oct 19, 2025
a5d9a13
Changed runner selfstream to not localhost
SebiWrn Oct 19, 2025
8ad7a71
Added TODO and moved mediamtx
SebiWrn Oct 19, 2025
6df2ea6
Corrected mediamtx
SebiWrn Oct 19, 2025
37df4da
Moved selfstreaming requests to main instance
SebiWrn Oct 19, 2025
24dfae1
Fixed gotest
SebiWrn Oct 19, 2025
ec18a49
Fixed golangci-lint
SebiWrn Oct 19, 2025
17d653b
Fixed golangci-lint
SebiWrn Oct 19, 2025
e6d7d95
Renamed folders
SebiWrn Oct 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 84 additions & 2 deletions pkg/runner_manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"fmt"
log "log/slog"
"net"
"os"
"strconv"
"strings"
"time"

"github.com/TUM-Dev/gocast/web"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -24,6 +26,10 @@ import (
"github.com/tum-dev/gocast/runner/protobuf"
)

var logger = log.New(log.NewJSONHandler(os.Stdout, &log.HandlerOptions{
Level: log.LevelDebug,
})).With("service", "runner_manager")

// Manager manages communication with runners and handles job distribution
type Manager struct {
dao dao.DaoWrapper
Expand All @@ -46,6 +52,12 @@ func (m *Manager) TriggerDueStreams() error {
log.Info("Triggering due streams")
ctx := context.Background()
streams, err := m.dao.GetDueStreamsForRunners()
// TODO: Remove this when turning off workers
if web.VersionTag == "development" {
workerStreams := m.dao.GetDueStreamsForWorkers()
streams = append(streams, workerStreams...)
}

log.Info(fmt.Sprintf("%d streams to start for runner", len(streams)))
if err != nil {
return err
Expand Down Expand Up @@ -240,15 +252,85 @@ func (m *Manager) requestStreamVersion(ctx context.Context, s model.Stream, clie
default:
return nil, fmt.Errorf("invalid stream version %v", version)
}

outputOptions := "-c:v libx264 -preset veryfast -tune zerolatency -c:a aac -ar 44100 -b:a 128k -g 60 -bufsize 5000k -maxrate 5000k -b:v 3500k"
if !s.IsSelfStream() {
switch lh.StreamProtocol {
case model.RTSP:
outputOptions = "-c:a copy -c:v copy -rtsp_transport tcp -preset veryfast -tune zerolatency"
case model.SRT:
outputOptions = "-c:a copy -c:v copy -preset veryfast -tune zerolatency"
}
}

var input string
if !s.IsSelfStream() {
switch lh.StreamProtocol {
case model.RTSP:
input = fmt.Sprintf("rtsp://%s", ip)
case model.SRT:
input = fmt.Sprintf("srt://%s", ip)
default:
return nil, fmt.Errorf("invalid stream protocol %v", lh.StreamProtocol)
}
} else {
// Lecture is Selfstream
course, err := m.dao.CoursesDao.GetCourseById(ctx, s.CourseID)
if err != nil {
return nil, err
}
input = fmt.Sprintf("rtmp://localhost/%s-%d", course.Slug, s.ID)
}
return client.RequestStream(ctx, &protobuf.StreamRequest{
StreamId: ptr.Take(uint64(s.ID)),
Version: ptr.Take(version),
End: timestamppb.New(s.End),
FfmpegOutputOptions: ptr.Take("-c:a copy -c:v copy"),
Input: ptr.Take(fmt.Sprintf("srt://%s", ip)),
FfmpegOutputOptions: ptr.Take(outputOptions),
Input: ptr.Take(input),
})
}

func (m *Manager) RequestSelfStream(ctx context.Context, request *protobuf.SelfStreamRequest) (*protobuf.SelfStreamResponse, error) {
if _, err := m.dao.RunnerDao.Get(ctx, *request.Hostname); err != nil {
logger.Warn("Could not find runner for selfstream", "hostname", *request.Hostname)
return nil, err
}

if *request.StreamKey == "" {
return nil, errors.New("no stream key")
}
stream, err := m.dao.StreamsDao.GetStreamByKey(ctx, *request.StreamKey)
if err != nil {
return nil, err
}
course, err := m.dao.CoursesDao.GetCourseById(ctx, stream.CourseID)
if err != nil {
return nil, err
}
if *request.CourseSlug != fmt.Sprintf("%s-%d", course.Slug, stream.ID) {
return nil, fmt.Errorf("bad stream name, should: %s, is: %s", fmt.Sprintf("%s-%d", course.Slug, stream.ID), *request.CourseSlug)
}
// reject streams that are more than 30 minutes in the future or more than 30 minutes past
if !(time.Now().After(stream.Start.Add(time.Minute*-30)) && time.Now().Before(stream.End.Add(time.Minute*30))) {
logger.Warn("Stream rejected, time out of bounds", "streamId", stream.ID)
return nil, errors.New("stream rejected")
}

client, err := m.getClient(ctx)
if err != nil {
logger.Error("Could not get client", "err", err)
return nil, err
}

resp, err := m.requestStreamVersion(ctx, stream, client, model.LectureHall{}, protobuf.StreamVersion_STREAM_VERSION_COMBINED)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to make sure, this is requested at the same runner, that this request originates from, otherwise it will break as soon as we have more than one runner

Copy link
Collaborator Author

@SebiWrn SebiWrn Oct 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I forgot to implement this. An alternative would be to have only one mediaMTX instance at a time and then distribute the load of selfstreams evenly to the runners as every runner could fetch the stream from this instance and we wouldn't rely on only one machine for a stream (except for the mediaMTX). What do you think about this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like all runners to be treated the same, let's design this to work without magic knowledge of what which runner does

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we need load balancing to multiple ingest servers, let's use traefik and DNS rr to distribute streams to multiple runners

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means we only have one mediamtx for all instead of having one for each runner?

if err != nil && !errors.Is(err, errNotNoLectureSource) {
logger.Error("Could not start selfstream", "err", err)
return nil, err
}
log.With("stream", stream.ID, "job", resp.JobId, "version", model.COMB).Info("started Stream")
return &protobuf.SelfStreamResponse{}, nil
}

func dialRunner(ctx context.Context, runner model.Runner) (*grpc.ClientConn, error) {
return grpc.NewClient(fmt.Sprintf("%s:%d", runner.Hostname, runner.Port), grpc.WithTransportCredentials(insecure.NewCredentials()))
}
1 change: 1 addition & 0 deletions runner/cmd/runner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,6 @@ func main() {
}

blocking := make(chan struct{})
//nolint:all
_ = <-blocking
}
4 changes: 1 addition & 3 deletions runner/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ import (
"github.com/tum-dev/gocast/runner/protobuf"
)

func (r *Runner) RequestStream(ctx context.Context, req *protobuf.StreamRequest) (*protobuf.StreamResponse, error) {
ctx = context.Background()

func (r *Runner) RequestStream(_ context.Context, req *protobuf.StreamRequest) (*protobuf.StreamResponse, error) {
data := map[string]any{
"streamID": req.GetStreamId(),
"streamVersion": protobuf.StreamVersion_name[int32(req.GetVersion())],
Expand Down
4 changes: 2 additions & 2 deletions runner/hls.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ type HLSServer struct {
version string
}

func NewHLSServer(LiveDir string, log *slog.Logger, version string) *HLSServer {
return &HLSServer{fs: http.FileServer(http.Dir(LiveDir)), log: log, version: version}
func NewHLSServer(liveDir string, log *slog.Logger, version string) *HLSServer {
return &HLSServer{fs: http.FileServer(http.Dir(liveDir)), log: log, version: version}
}

func (h *HLSServer) Start() error {
Expand Down
2 changes: 1 addition & 1 deletion runner/pkg/actions/stream_end.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package actions
import (
"context"
"fmt"
"github.com/tum-dev/gocast/runner/pkg/ptr"
"log/slog"

"github.com/tum-dev/gocast/runner/pkg/ptr"
"github.com/tum-dev/gocast/runner/pkg/metrics"
"github.com/tum-dev/gocast/runner/protobuf"
)
Expand Down
17 changes: 15 additions & 2 deletions runner/pkg/netutil/netutil.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,27 @@
package netutil

import "net"
import (
"log/slog"
"net"
"os"
)

var logger = slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelDebug,
})).With("service", "netutil")

// GetFreePort returns a free port for tcp use.
func GetFreePort() (port int, err error) {
var a *net.TCPAddr
if a, err = net.ResolveTCPAddr("tcp", "localhost:0"); err == nil {
var l *net.TCPListener
if l, err = net.ListenTCP("tcp", a); err == nil {
defer l.Close()
defer func(l *net.TCPListener) {
err := l.Close()
if err != nil {
logger.Error("failed to close listener: %v", "err", err)
}
}(l)
return l.Addr().(*net.TCPAddr).Port, nil
}
}
Expand Down
35 changes: 14 additions & 21 deletions runner/protobuf/commons.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading