Skip to content

endpoint.online() not actually meaning endpoint is online? #3713

@gold-silver-copper

Description

@gold-silver-copper

When testing connections through EndpointID, I ran into an issue, that I could not reliably connect to an endpoint, if I did not sleep the task a bit. Even when using endpoint.online().await, it did not seem to actually guarantee that the endpoints are indeed "online". I am wondering if I am failing to account for something or if there is a better API?

Basically, in the following code(just the echo example with EndpointAddr changed to EndpointID), if the tokio::time::sleep is set to zero, the connection fails all the time, if it is set to 200, it fails maybe 50% of the time, if it is set to 2000, it connects basically 100% of the time.

The error when it fails is:

Error: No addressing information available
Caused by:
    Discovery service failed
    Discovery produced no results for b57736e18d
//! Very basic example to showcase how to use iroh's APIs.
//!
//! This example implements a simple protocol that echos any data sent to it in the first stream.
//!
//! ## Usage
//!
//!     cargo run --example echo --features=examples

use std::time::Duration;

use iroh::{
    Endpoint, EndpointAddr, EndpointId,
    endpoint::Connection,
    protocol::{AcceptError, ProtocolHandler, Router},
};
use n0_error::{Result, StdResultExt};

/// Each protocol is identified by its ALPN string.
///
/// The ALPN, or application-layer protocol negotiation, is exchanged in the connection handshake,
/// and the connection is aborted unless both endpoints pass the same bytestring.
const ALPN: &[u8] = b"iroh-example/echo/0";

#[tokio::main]
async fn main() -> Result<()> {
    let router = start_accept_side().await?;

    // wait for the endpoint to be online
    router.endpoint().online().await;

    connect_side(router.endpoint().id()).await?;

    // This makes sure the endpoint in the router is closed properly and connections close gracefully
    router.shutdown().await.anyerr()?;

    Ok(())
}

async fn connect_side(addr: EndpointId) -> Result<()> {
    let endpoint = Endpoint::bind().await?;
    endpoint.online().await;

    tokio::time::sleep(Duration::from_millis(0)).await;
    // Open a connection to the accepting endpoint
    let conn = endpoint.connect(addr, ALPN).await?;

    // Open a bidirectional QUIC stream
    let (mut send, mut recv) = conn.open_bi().await.anyerr()?;

    // Send some data to be echoed
    send.write_all(b"Hello, world!").await.anyerr()?;

    // Signal the end of data for this particular stream
    send.finish().anyerr()?;

    // Receive the echo, but limit reading up to maximum 1000 bytes
    let response = recv.read_to_end(1000).await.anyerr()?;
    assert_eq!(&response, b"Hello, world!");

    // Explicitly close the whole connection.
    conn.close(0u32.into(), b"bye!");

    // The above call only queues a close message to be sent (see how it's not async!).
    // We need to actually call this to make sure this message is sent out.
    endpoint.close().await;
    // If we don't call this, but continue using the endpoint, we then the queued
    // close call will eventually be picked up and sent.
    // But always try to wait for endpoint.close().await to go through before dropping
    // the endpoint to ensure any queued messages are sent through and connections are
    // closed gracefully.
    Ok(())
}

async fn start_accept_side() -> Result<Router> {
    let endpoint = Endpoint::bind().await?;

    endpoint.online().await;
    // Build our protocol handler and add our protocol, identified by its ALPN, and spawn the endpoint.
    let router = Router::builder(endpoint).accept(ALPN, Echo).spawn();
    Ok(router)
}

#[derive(Debug, Clone)]
struct Echo;

impl ProtocolHandler for Echo {
    /// The `accept` method is called for each incoming connection for our ALPN.
    ///
    /// The returned future runs on a newly spawned tokio task, so it can run as long as
    /// the connection lasts.
    async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
        // We can get the remote's endpoint id from the connection.
        let endpoint_id = connection.remote_id();
        println!("accepted connection from {endpoint_id}");

        // Our protocol is a simple request-response protocol, so we expect the
        // connecting peer to open a single bi-directional stream.
        let (mut send, mut recv) = connection.accept_bi().await?;

        // Echo any bytes received back directly.
        // This will keep copying until the sender signals the end of data on the stream.
        let bytes_sent = tokio::io::copy(&mut recv, &mut send).await?;
        println!("Copied over {bytes_sent} byte(s)");

        // By calling `finish` on the send stream we signal that we will not send anything
        // further, which makes the receive stream on the other end terminate.
        send.finish()?;

        // Wait until the remote closes the connection, which it does once it
        // received the response.
        connection.closed().await;

        Ok(())
    }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    discoveryiroh's endpoint discovery subsystemdocsImprovements or additions to documentation

    Type

    No type

    Projects

    Status

    No status

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions