Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(datastreams): initial impl #622

Open
wants to merge 9 commits into
base: feat/rpc
Choose a base branch
from
Open

Conversation

anunaym14
Copy link

@anunaym14 anunaym14 commented Mar 6, 2025

Have a bunch of questions added as comments, please check those!

TODO:
- Docs
- Attachments for text streams
- Sanitize example
- Removing deprecated DataPacket.Kind usage
- #622 (comment)

@anunaym14 anunaym14 requested a review from a team March 6, 2025 17:24
Copy link
Contributor

@boks1971 boks1971 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! But, I do not understand the nuances of the e2e flow and the protocol. Left some comments.

engine.go Outdated

func (e *RTCEngine) waitForBufferStatusLow(kind livekit.DataPacket_Kind) {
// will block forever if dc is nil
// probably have a timeout here?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just return immediately if nil? Haven't read the full code to understand the use for this. Maybe, returning immediately is not the right thing, but seems like valid behaviour if dc can be nil.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually return an error and bubble that up all the way?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually return an error and bubble that up all the way?

Yeah I was thinking the same

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, but error propagation is a bit tricky, please check the newly added comments

Copy link
Author

@anunaym14 anunaym14 Mar 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this again, we should not return an error if the datachannel isn't available, instead we should wait for it to come up, then all the publish calls won't have any reason to fail either, so we can get rid of the error propagation and call this in a goroutine internally.

The above ideology fails if a data channel closes unexpectedly, in that case, do we open a new one?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

opening a new data channel is an option, but we do not do it anywhere now AFAIK. So, should make sure that server can handle it properly.

@anunaym14
Copy link
Author

@boks1971 please check this again, I'll replied to some of the comments and marked others which are done as resolved. Feel free to mark them as unresolved if you think something needs a change

Copy link
Contributor

@boks1971 boks1971 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for addressing comments @anunaym14 . I think there are still bits that need some clarification. Have left some comments.

@anunaym14 anunaym14 marked this pull request as ready for review March 13, 2025 13:50
// EOF represents the end of the stream
if err == io.EOF {
break
} else if err == lksdk.EAGAIN {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you leaving this to the user? I thought we could just do it internally and externally, the user either gets data or io.EOF. WDYT?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need ErrAgain anyway, because, if there are some bytes to read, but not enough, the Go APIs will return the available data with an EOF. In that case, before stream close, we'll have to notify that to the user with ErrAgain (better to return what we have than waiting for more). And since, the user will be handling ErrAgain anyway for that case, it might make sense to let them handle it for no bytes available case as well.

But I understand the pros of doing it internally as well, I don't really have a hard preference among the two.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood, but feels like it can be handled internally.

notify that to the user with ErrAgain (better to return what we have than waiting for more)
I am not understanding. I think this can return (num_bytes_read, nil) and user would take those bytes and read again.

Internally, if we get (o, io.EOF) from Go APIs before the stream end, we just loop and go back and block on read?

Copy link
Author

@anunaym14 anunaym14 Mar 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So block internally just on (0, io.EOF)?

In cases of (n, io.EOF) where n > 0, do we return ErrAgain with n? Will modify the error message to day "not enough bytes available"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe my understanding is incomplete, but this what I am thinking

user

for {
    if chunk, err := stream.Read(); err != nil {
      // done
      break
    } else {
       data += chunk
    }

internally

func Read() ([]byte, error) {
     buf, err := buffer.Read()
     // unclear to me how the end of stream is known, assuming that is known
     if end-of-stream {
          return buf, io.EOF
     }. else if len(buf) > 0 {
         return buf, nil
     } else {
          // len(buf) == 0 and err is probably io.EOF
          time.Sleep(10 * time.Millisecond)
     }
}

Effectively, Read() blocks till there is data or end of stream.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood. Just one doubt:

The Read() method takes []byte say p and returns len(p) bytes from the buffer, if buffer has less bytes, it returns whatever is available with err == nil and only returns io.EOF when the buffer has nothing to read from.

But, the ReadBytes() method takes a delim byte, and returns till the delim including the delim itself. But, if delim isn't present in the buffer, it returns everything available but with io.EOF.

How should we handle this? Do we return nil in these cases (before stream closure), or do we return either or ErrAgain or io.EOF?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think io.EOF should be returned only on stream end. As long we know the stream end and can return io.EOF only on that, that is clear.

For other cases

  • Read() has data, internally get (n, nil) -> pass that on to the user

  • Read() has no data, internally get (0, io.EOF) -> internally we know this is not end of stream, so loop and read again

  • ReadBytes() has delim - internally get (n, nil) -> pass that on to the user

  • ReadBytes() has no delim - internally get (n, i.EOF) -> this seems to be only tricky case -> I think we should return (n, nil) to the user, the reason I am saying nil here is ErrAgain or nil does not make a difference to the user. They just have to read till stream end. So, what I am proposing to not return an extra error unless there is some well-defined clear use that is not covered by other paths.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check the latest commit, this should be handled properly now!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants