-
Notifications
You must be signed in to change notification settings - Fork 55
Add selfstreaming feature to the runners #1657
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Changes from 15 commits
e257a21
f1ee130
0921d53
937d9b4
a47752b
8d13d7b
4900eee
252e78d
8e9e867
037ef4b
c406fe2
eb8a2c4
f584c2a
515502a
a6a86eb
8a1e514
cf4e33d
4df8fc8
8aaa8f2
a5d9a13
8ad7a71
6df2ea6
37df4da
24dfae1
ec18a49
17d653b
e6d7d95
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,10 +6,12 @@ import ( | |
| "fmt" | ||
| log "log/slog" | ||
| "net" | ||
| "os" | ||
| "strconv" | ||
| "strings" | ||
| "time" | ||
|
|
||
| "github.com/TUM-Dev/gocast/web" | ||
| "google.golang.org/grpc" | ||
| "google.golang.org/grpc/codes" | ||
| "google.golang.org/grpc/credentials/insecure" | ||
|
|
@@ -24,6 +26,10 @@ import ( | |
| "github.com/tum-dev/gocast/runner/protobuf" | ||
| ) | ||
|
|
||
| var logger = log.New(log.NewJSONHandler(os.Stdout, &log.HandlerOptions{ | ||
| Level: log.LevelDebug, | ||
| })).With("service", "runner_manager") | ||
|
|
||
| // Manager manages communication with runners and handles job distribution | ||
| type Manager struct { | ||
| dao dao.DaoWrapper | ||
|
|
@@ -46,6 +52,12 @@ func (m *Manager) TriggerDueStreams() error { | |
| log.Info("Triggering due streams") | ||
| ctx := context.Background() | ||
| streams, err := m.dao.GetDueStreamsForRunners() | ||
| // TODO: Remove this when turning off workers | ||
| if web.VersionTag == "development" { | ||
| workerStreams := m.dao.GetDueStreamsForWorkers() | ||
| streams = append(streams, workerStreams...) | ||
| } | ||
|
|
||
| log.Info(fmt.Sprintf("%d streams to start for runner", len(streams))) | ||
| if err != nil { | ||
| return err | ||
|
|
@@ -240,15 +252,85 @@ func (m *Manager) requestStreamVersion(ctx context.Context, s model.Stream, clie | |
| default: | ||
| return nil, fmt.Errorf("invalid stream version %v", version) | ||
| } | ||
|
|
||
| outputOptions := "-c:v libx264 -preset veryfast -tune zerolatency -c:a aac -ar 44100 -b:a 128k -g 60 -bufsize 5000k -maxrate 5000k -b:v 3500k" | ||
| if !s.IsSelfStream() { | ||
| switch lh.StreamProtocol { | ||
| case model.RTSP: | ||
| outputOptions = "-c:a copy -c:v copy -rtsp_transport tcp -preset veryfast -tune zerolatency" | ||
| case model.SRT: | ||
| outputOptions = "-c:a copy -c:v copy -preset veryfast -tune zerolatency" | ||
| } | ||
| } | ||
|
|
||
| var input string | ||
| if !s.IsSelfStream() { | ||
| switch lh.StreamProtocol { | ||
| case model.RTSP: | ||
| input = fmt.Sprintf("rtsp://%s", ip) | ||
| case model.SRT: | ||
| input = fmt.Sprintf("srt://%s", ip) | ||
| default: | ||
| return nil, fmt.Errorf("invalid stream protocol %v", lh.StreamProtocol) | ||
| } | ||
| } else { | ||
| // Lecture is Selfstream | ||
| course, err := m.dao.CoursesDao.GetCourseById(ctx, s.CourseID) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| input = fmt.Sprintf("rtmp://localhost/%s-%d", course.Slug, s.ID) | ||
| } | ||
| return client.RequestStream(ctx, &protobuf.StreamRequest{ | ||
| StreamId: ptr.Take(uint64(s.ID)), | ||
| Version: ptr.Take(version), | ||
| End: timestamppb.New(s.End), | ||
| FfmpegOutputOptions: ptr.Take("-c:a copy -c:v copy"), | ||
| Input: ptr.Take(fmt.Sprintf("srt://%s", ip)), | ||
| FfmpegOutputOptions: ptr.Take(outputOptions), | ||
| Input: ptr.Take(input), | ||
| }) | ||
| } | ||
|
|
||
| func (m *Manager) RequestSelfStream(ctx context.Context, request *protobuf.SelfStreamRequest) (*protobuf.SelfStreamResponse, error) { | ||
| if _, err := m.dao.RunnerDao.Get(ctx, *request.Hostname); err != nil { | ||
| logger.Warn("Could not find runner for selfstream", "hostname", *request.Hostname) | ||
| return nil, err | ||
| } | ||
|
|
||
| if *request.StreamKey == "" { | ||
SebiWrn marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return nil, errors.New("no stream key") | ||
| } | ||
| stream, err := m.dao.StreamsDao.GetStreamByKey(ctx, *request.StreamKey) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| course, err := m.dao.CoursesDao.GetCourseById(ctx, stream.CourseID) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| if *request.CourseSlug != fmt.Sprintf("%s-%d", course.Slug, stream.ID) { | ||
| return nil, fmt.Errorf("bad stream name, should: %s, is: %s", fmt.Sprintf("%s-%d", course.Slug, stream.ID), *request.CourseSlug) | ||
| } | ||
| // reject streams that are more than 30 minutes in the future or more than 30 minutes past | ||
| if !(time.Now().After(stream.Start.Add(time.Minute*-30)) && time.Now().Before(stream.End.Add(time.Minute*30))) { | ||
| logger.Warn("Stream rejected, time out of bounds", "streamId", stream.ID) | ||
| return nil, errors.New("stream rejected") | ||
| } | ||
|
|
||
| client, err := m.getClient(ctx) | ||
| if err != nil { | ||
| logger.Error("Could not get client", "err", err) | ||
| return nil, err | ||
| } | ||
|
|
||
| resp, err := m.requestStreamVersion(ctx, stream, client, model.LectureHall{}, protobuf.StreamVersion_STREAM_VERSION_COMBINED) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to make sure, this is requested at the same runner, that this request originates from, otherwise it will break as soon as we have more than one runner
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're right, I forgot to implement this. An alternative would be to have only one mediaMTX instance at a time and then distribute the load of selfstreams evenly to the runners as every runner could fetch the stream from this instance and we wouldn't rely on only one machine for a stream (except for the mediaMTX). What do you think about this?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd like all runners to be treated the same, let's design this to work without magic knowledge of what which runner does
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we need load balancing to multiple ingest servers, let's use traefik and DNS rr to distribute streams to multiple runners
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This means we only have one mediamtx for all instead of having one for each runner? |
||
| if err != nil && !errors.Is(err, errNotNoLectureSource) { | ||
| logger.Error("Could not start selfstream", "err", err) | ||
| return nil, err | ||
| } | ||
| log.With("stream", stream.ID, "job", resp.JobId, "version", model.COMB).Info("started Stream") | ||
| return &protobuf.SelfStreamResponse{}, nil | ||
| } | ||
|
|
||
| func dialRunner(ctx context.Context, runner model.Runner) (*grpc.ClientConn, error) { | ||
| return grpc.NewClient(fmt.Sprintf("%s:%d", runner.Hostname, runner.Port), grpc.WithTransportCredentials(insecure.NewCredentials())) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -56,5 +56,6 @@ func main() { | |
| } | ||
|
|
||
| blocking := make(chan struct{}) | ||
| //nolint:all | ||
| _ = <-blocking | ||
| } | ||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Uh oh!
There was an error while loading. Please reload this page.