-
Notifications
You must be signed in to change notification settings - Fork 110
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RSDK-8982, RSDK-9167, RSDK-8979] - Add SetStreamOptions to stream server #4530
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stream state and server are still maturing concepts in my head. Tried my best to focus on code quality stuff and learned stuff about the former along the way :)
robot/web/stream/state/state.go
Outdated
@@ -61,6 +64,7 @@ func New( | |||
robot: r, | |||
msgChan: make(chan msg), | |||
tickChan: make(chan struct{}), | |||
resized: false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit suggestion] could we call this isResized
or wasResized
the past-tense-as-a-bool is kinda overlapping with certain interface naming conventions e.g. resource.Named
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. Changed to isResized
.
robot/web/stream/server.go
Outdated
return nil, errors.New("stream name is required") | ||
} | ||
if req.Resolution == nil { | ||
return nil, errors.New("resolution is required") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we prefix these errors with what stream it's about i.e. include the stream name in the error message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call. Added name to error returns and more details where appropriate.
robot/web/stream/stream_test.go
Outdated
test.That(t, getStreamOptionsResp, test.ShouldNotBeNil) | ||
test.That(t, len(getStreamOptionsResp.Resolutions), test.ShouldEqual, 5) | ||
|
||
// Test setting stream options with invalid name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[potential improvement] Is it possible to use use t.Run instead of flat tests with comments? Unless we want the tests to be sequential and do not want independent isolated execution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah good call - better debugging if things fail. I think t.Run's should still be sequential if under a parent test fn (we would have to manually call t.Parallel()
in the subtest).
robot/web/stream/stream_test.go
Outdated
test.That(tb, videoCnt, test.ShouldEqual, 2) | ||
}) | ||
|
||
// Try setting stream options with RTPPassthrough enabled |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way to write a test to assert that RTP passthrough is off? Or is that inherently tested through using a non-standard res/the stream state test file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Testing this specifically in state_test.go
. This is just testing that the API call succeeds and track is still available.
robot/web/stream/state/state_test.go
Outdated
<-writeRTPCalledCtx.Done() | ||
|
||
// Call Resize to simulate a resize event. This should stop rtp_passthrough and start gostream. | ||
logger.Info("calling Resize() should stop rtp_passthrough and start gostream") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here might be nice to use t.Run
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep should be ok to nest t.Run tests here.
robot/web/stream/server.go
Outdated
@@ -336,6 +336,8 @@ func (server *Server) GetStreamOptions( | |||
ctx context.Context, | |||
req *streampb.GetStreamOptionsRequest, | |||
) (*streampb.GetStreamOptionsResponse, error) { | |||
server.mu.RLock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to add this read lock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mutex is protecting these struct members:
nameToStreamState map[string]*state.StreamState
activePeerStreams map[*webrtc.PeerConnection]map[string]*peerState
activeBackgroundWorkers sync.WaitGroup
isAlive bool
I don't see them being accessed so I don't understand why this mutex is being taken
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call - I am just pulling the camera from robot not accessing any of these. Removed.
robot/web/stream/server.go
Outdated
if req.Name == "" { | ||
return nil, errors.New("stream name is required in request") | ||
} | ||
if req.Resolution == nil { | ||
return nil, fmt.Errorf("resolution is required to resize stream %q", req.Name) | ||
} | ||
if req.Resolution.Width <= 0 || req.Resolution.Height <= 0 { | ||
return nil, fmt.Errorf( | ||
"invalid resolution to resize stream %q: width (%d) and height (%d) must be greater than 0", | ||
req.Name, req.Resolution.Width, req.Resolution.Height, | ||
) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets move this into a validation function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good - moved to helper.
robot/web/stream/server.go
Outdated
if err != nil { | ||
return nil, fmt.Errorf("failed to resize video source for stream %q: %w", req.Name, err) | ||
} | ||
streamState, ok := server.nameToStreamState[req.Name] | ||
if !ok { | ||
return nil, fmt.Errorf("stream state not found with name %q", req.Name) | ||
} | ||
err = streamState.Resize() | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to resize stream %q: %w", req.Name, err) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we move this logic into resizeVideoSource
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep -- moved all resize logic into resizeVideoSource
.
@@ -185,6 +200,10 @@ func (state *StreamState) sourceEventHandler() { | |||
if state.activeClients == 0 { | |||
state.tick() | |||
} | |||
case msgTypeResize: | |||
state.logger.Debug("resize event received") | |||
state.isResized = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we ever go back to not resized?
t.Run("when in rtppassthrough mode and a resize occurs test downgrade path to gostream", func(t *testing.T) { | ||
var startCount atomic.Int64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A lot of your tests are time based and waiting for assertions, can you run with -race
and -failfast
in canon to verify that they're not flaky.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
go test -timeout 30s -race -failfast -run ^TestSetStreamOptions$ go.viam.com/rdk/robot/web/stream
ok go.viam.com/rdk/robot/web/stream 3.255s
go test -timeout 30s -race -failfast -run ^TestStreamState$/^when_in_rtppassthrough_mode_and_a_resize_occurs_test_downgrade_path_to_gostream$ go.viam.com/rdk/robot/web/stream/state
ok go.viam.com/rdk/robot/web/stream/state 3.056s
@@ -271,7 +302,7 @@ func (state *StreamState) tick() { | |||
case state.streamSource == streamSourcePassthrough: | |||
// no op if we are using passthrough & are healthy | |||
state.logger.Debug("still healthy and using h264 passthrough") | |||
case state.streamSource == streamSourceGoStream: | |||
case state.streamSource == streamSourceGoStream && !state.isResized: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does a user do to hit this state after they've selected a resolution in the UI?
Description
RSDK-8982, RSDK-9167 RSDK-8979
This PR adds the
SetStreamOptions
endpoint to the stream server.NewResizeVideoSource
.Resize
msg to thestreamState
handler.Increment
andDecrement
calls.Tests
NewStreamServiceClient
script):Demo
Screen.Recording.2024-11-12.at.2.05.18.PM.mov
TODO: