Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: TCP socket support using LibUV #6683

Open
wants to merge 44 commits into
base: master
Choose a base branch
from

Conversation

algebraic-dev
Copy link
Contributor

This PR introduces TCP socket support using the LibUV library, enabling asynchronous I/O operations with it.

algebraic-dev and others added 11 commits January 10, 2025 16:44
This PR adds support for `Timer` and a event loop thread that gets
requests from another threads and executes it.

---------

Co-authored-by: Markus Himmel <[email protected]>
Co-authored-by: Henrik Böving <[email protected]>
This PR implements a basic asynchronous timer API on top of the libuv
work.

It purposely puts this into `Std.Internal` as we might still have to
change the API as we continue develop of the async library across
releases so I would only like to stabilize it once we are certain this
is a fine API.

A few additional notes:
- we currently do not implement a bind operator on `AsyncTask` on
purpose as `Task.bind` on `Task.pure` is a non trivial operation and
users should be aware of it. Furthermore there is the consideration that
as they will have to bind on both `IO` and `AsyncTask` we might want to
make potential task points explicit in the syntax (did somebody say
`await`?).
- the API generally takes inspiration from
https://docs.rs/tokio/latest/tokio/time/index.html, though it has to
adapt as Rust's and Lean's asynchronity concepts are sufficiently
different.

Stacked on top of leanprover#6219.
@algebraic-dev algebraic-dev requested a review from TwoFX as a code owner January 17, 2025 23:53
@algebraic-dev algebraic-dev marked this pull request as draft January 17, 2025 23:53
src/runtime/uv/tcp.cpp Outdated Show resolved Hide resolved
src/runtime/uv/tcp.cpp Show resolved Hide resolved
src/runtime/uv/tcp.cpp Show resolved Hide resolved
src/runtime/uv/tcp.cpp Show resolved Hide resolved
src/runtime/uv/tcp.cpp Outdated Show resolved Hide resolved
src/runtime/uv/tcp.cpp Show resolved Hide resolved
src/runtime/uv/tcp.cpp Outdated Show resolved Hide resolved
src/runtime/uv/tcp.cpp Show resolved Hide resolved
src/runtime/uv/tcp.cpp Show resolved Hide resolved
@github-actions github-actions bot added the toolchain-available A toolchain is available for this PR, at leanprover/lean4-pr-releases:pr-release-NNNN label Jan 23, 2025
@leanprover-community-bot
Copy link
Collaborator

leanprover-community-bot commented Jan 23, 2025

Mathlib CI status (docs):

  • ❗ Batteries/Mathlib CI will not be attempted unless your PR branches off the nightly-with-mathlib branch. Try git rebase 9b629cc81f01642200eb0762da39b2935652754c --onto 9b74c07767dc50645efa00356a7724e7f7176227. (2025-01-23 05:57:38)
  • ❗ Batteries/Mathlib CI will not be attempted unless your PR branches off the nightly-with-mathlib branch. Try git rebase 104b3519d79f5996dd6d325cba18d94a4615e418 --onto 20c616503abe5ce4253c56dbcd7766a91c675ba0. (2025-01-30 00:33:23)
  • ❗ Batteries/Mathlib CI will not be attempted unless your PR branches off the nightly-with-mathlib branch. Try git rebase 104b3519d79f5996dd6d325cba18d94a4615e418 --onto a35bf7ee4c4d900475d88886825a5337f389bd0f. (2025-01-30 00:52:24)
  • ❗ Batteries/Mathlib CI will not be attempted unless your PR branches off the nightly-with-mathlib branch. Try git rebase 104b3519d79f5996dd6d325cba18d94a4615e418 --onto e922edfc218090ab2e54092336c67fe3b970dfc2. (2025-01-30 17:26:52)
  • ❗ Batteries/Mathlib CI will not be attempted unless your PR branches off the nightly-with-mathlib branch. Try git rebase 104b3519d79f5996dd6d325cba18d94a4615e418 --onto 832d7c500d709deb5e7f0a5a6fd0f01865d1a303. (2025-02-04 04:01:35)
  • ❗ Batteries/Mathlib CI will not be attempted unless your PR branches off the nightly-with-mathlib branch. Try git rebase 5eca093a89bc7dcd09e54b0a9c71bb3472d66709 --onto b01ca8ee237a1b3c299384e73ad79d424e216a84. (2025-02-08 00:17:56)

Copy link
Contributor

@hargoniX hargoniX left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is very promising now! There's mostly on memory leaks and stylistic things left but the implementation looks like it should be able to work now.

Beyond these issues it would of course be great if we could also have some tests like for Timer that exercise this API. I would assume you can just connect to localhost and do an echo or something like that? Of course we should have tests for both IPv4 and v6.

After we have those tests I guess it might also be a good idea to write a little benchmark to see how many TCP connections we can handle per second to get an estimate of what kind of load we can expect to handle at best given this implementation.

src/Std/Internal/UV/Tcp.lean Outdated Show resolved Hide resolved
src/runtime/uv/tcp.cpp Outdated Show resolved Hide resolved
src/runtime/uv/tcp.cpp Outdated Show resolved Hide resolved
src/runtime/uv/tcp.cpp Show resolved Hide resolved
src/runtime/uv/tcp.cpp Outdated Show resolved Hide resolved
src/runtime/uv/tcp.cpp Outdated Show resolved Hide resolved
src/runtime/uv/tcp.cpp Show resolved Hide resolved
src/runtime/uv/tcp.cpp Outdated Show resolved Hide resolved
src/runtime/uv/tcp.cpp Show resolved Hide resolved
src/runtime/uv/tcp.cpp Outdated Show resolved Hide resolved
src/runtime/uv/tcp.cpp Outdated Show resolved Hide resolved
src/runtime/uv/tcp.cpp Show resolved Hide resolved
src/runtime/uv/tcp.cpp Outdated Show resolved Hide resolved
src/runtime/uv/tcp.cpp Show resolved Hide resolved
src/runtime/uv/tcp.cpp Show resolved Hide resolved
src/runtime/uv/tcp.cpp Show resolved Hide resolved
src/runtime/uv/tcp.cpp Outdated Show resolved Hide resolved
@algebraic-dev algebraic-dev added awaiting-review Waiting for someone to review the PR changelog-library Library labels Feb 4, 2025
@algebraic-dev algebraic-dev marked this pull request as ready for review February 4, 2025 03:52
@algebraic-dev algebraic-dev requested a review from kim-em February 4, 2025 13:55
src/Std/Internal/UV/Tcp.lean Outdated Show resolved Hide resolved
src/Std/Internal/Async/Tcp.lean Outdated Show resolved Hide resolved
src/Std/Internal/Async/Tcp.lean Outdated Show resolved Hide resolved
src/Std/Internal/Async/Tcp.lean Outdated Show resolved Hide resolved
src/Std/Internal/Async/Tcp.lean Outdated Show resolved Hide resolved
src/Std/Internal/Async/Tcp.lean Outdated Show resolved Hide resolved
Receive data from the socket.
-/
@[inline]
def recv (s : Socket) (size : UInt64) : IO (AsyncTask ByteArray) :=
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that it's not well explained what exactly this function does.

  • What exactly does the size parameter represent? Is the ByteArray we eventually end up with always going to have size size?
  • When exactly does the returned task resolve?

I'm currently trying to port the following simple Java echo server:

        try (ServerSocket serverSocket = new ServerSocket(8080, 1, InetAddress.getLoopbackAddress())) {
            System.out.println("Echo server listening on localhost:8080");
            
            while (true) {
                System.out.println("Going to connect");
                try (Socket clientSocket = serverSocket.accept();
                     BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
                     PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true)) {
                    System.out.println("Connected.");
                    String inputLine;
                    while ((inputLine = in.readLine()) != null) {
                        System.out.println("Received: " + inputLine);
                        out.println(inputLine);
                        System.out.println("Reading another...");
                    }
                    System.out.println("Loop exited...");
                } catch (IOException e) {
                    System.err.println("Error handling client: " + e.getMessage());
                }
            }
        } catch (IOException e) {
            System.err.println("Could not listen on port 8080: " + e.getMessage());
        }

But I'm not sure how to detect with the Lean API that the client has disconnected. I have the following Lean skeleton, and I don't know how to fill in the inner while condition.

def server : IO Unit := do
  let sock ← Socket.mk
  sock.bind (.v4 <| .mk (.ofParts 127 0 0 1) 8080)
  sock.listen 1
  while true do
    let connToClient ← (← sock.accept).block
    while ???????? do
      let receive ← connToClient.recv 10000
      let data ← receive.block
      let str := String.fromUTF8! data
      println! "Received: {str}"
      let send ← connToClient.send data
      send.block

Copy link
Contributor Author

@algebraic-dev algebraic-dev Feb 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The recv function will throw the error resource vanished (error code: 4294967242, connection reset by peer). You just need to catch the error and assume that it's not connected anymore.

The size parameter defines the maximum amount of data that can be received in a single call. This means the function will retrieve up to this limit, and any remaining data will be delivered in the next calls.

The code now is:

def server : IO Unit := do
  let sock ← Socket.Server.mk

  try
    sock.bind (.v4 <| .mk (.ofParts 127 0 0 1) 8080)
  catch err =>
    IO.eprintln s!"ould not listen on port 8080: {err}"
    return

  sock.listen 1
  IO.println "Echo server listening on localhost:8080"

  while true do
    IO.println "Going to connect"
    let connToClient ← (← sock.accept).block

    while true do
      try
        let receive ← connToClient.recv 10000
        let data ← receive.block
        println! "Received: {data.size}"
      catch err =>
        match err with
        | .resourceVanished _ _ => do IO.println "Disconnected"; break
        | err => IO.eprintln s!"Error handling client: {err}"
    IO.println "Loop exited.."

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks!
Do you think it would be feasible to add an overview to all of the docstrings about which IO errors callers may want to handle? At the moment it seems to me that it's quite difficult to write robust code without a lot of knowledge about the low-level details.

Copy link
Member

@TwoFX TwoFX Feb 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, perhaps it would be possible to add functions that allow inspecting the state of the socket without trying something and looking at the errors? Like Socket.Client.isBound, Socket.Client.isConnected and so on?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
awaiting-review Waiting for someone to review the PR changelog-library Library toolchain-available A toolchain is available for this PR, at leanprover/lean4-pr-releases:pr-release-NNNN
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants