4

In older versions of the mio doc I find mio::channel, which it seems was used to create a channel implementing EventedFd that can be registered with Poll. I also see on reddit that this was changed in favor of something else, but I can't figure out what the new way of registering a channel is.

What is the current way to date of waiting on a channel (std::sync::mpsc or something else) with mio ? All I can find on google is link to the old versions of the mio docs. Thanks !

As I ran into the same problem, I hope it is okay I provide some code to make the question more easily understood:

use std::io;
use std::net::*; //{TcpListener,TcpStream,IpAddr,Ipv4Addr,SocketAddr};
use std::thread::*;
use std::sync::mpsc::*; //{Receiver,SyncSender,Sender,channel,sync_channel};

fn poll_quit( rx : &Receiver::<u8> ) -> bool {
    match rx.try_recv() {
        Ok(_) => true,
        Err(TryRecvError::Empty) => false,
        Err(TryRecvError::Disconnected) => true
    }
}

fn guard_external_tcp_port( rx : Receiver::<u8>) -> () {
    let listener = TcpListener::bind("127.0.0.1:8384").expect("tcp guard - bind failed!");
    listener.set_nonblocking(true).expect("cannot set tcp listener to nonblocking!");
    while false == poll_quit(&rx) {
        match listener.accept() {
            Ok((_s,pa)) => {
                println!("tcp client connected: {} - closing it down."
                    , pa);
            }
            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                //wait_for_fd();
                continue;
            }
            Err(e) => panic!("encountered IO error: {}", e),

        }
    }
}

fn start_tcpguard() -> (JoinHandle<()>,SyncSender::<u8>) {
    let (tx,rx) = sync_channel::<u8>(0);
    let thrd = spawn(move || {
        guard_external_tcp_port(rx);
    });
    (thrd,tx)
}

Up to this point, I tried to do without mio but the problem is obvious: You want to block until either the shutdown channel message arrives OR the listener socket is being signaled. Enough incentive to switch to mio. Unless mio is not able to wait for a Receiver.

BitTickler
  • 10,905
  • 5
  • 32
  • 53
Ulrar
  • 895
  • 8
  • 17
  • Having a server run in a thread and using a channel to signal the thread to shut down is a MAJOR use case. Yes, you can poll the ``Receiver`` but unless you set the ``TcpListener`` to non blocking and poll it as well, you get no clean shut down. So, one has to mio - wait for both a channel and a listener. – BitTickler May 01 '19 at 20:44

1 Answers1

2

There is now yet another mio crate, which has the channel related functionality:

mio-extras

So, all you have to do do is add that crate to your cargo.toml [dependencies].
Then, in your main.rs, add extern crate mio_extras and you should have it all ready to be used.

So, in case of my code snippet I added to the question, it eventually looks like this:

extern crate mio;
extern crate mio_extras;

use std::io;
use mio::*;
use mio::net::*;
use mio_extras::channel::*;
use std::thread::*;

fn guard_external_tcp_port( rx : mio_extras::channel::Receiver::<u8>) -> () {
    let poll = Poll::new().expect("could not create a new Poll instance.");
    const QUIT : Token = Token(0);
    const CONNECT : Token = Token(1);


    let mut events = Events::with_capacity(5);
    poll.register(&rx,QUIT,Ready::readable(),PollOpt::level() ).expect("could not register channel Receiver.");


    let addr : std::net::SocketAddr = "127.0.0.1:8384".parse().unwrap();
    let listener = TcpListener::bind(&addr).expect("tcp guard - bind failed!");

    poll.register(&listener,CONNECT,Ready::all(),PollOpt::edge()).expect("could not register our listening socket.");

    let mut running : bool = true;
    //listener.set_nonblocking(true).expect("cannot set tcp listener to nonblocking!");
    while running {
        let _nevents = poll.poll(&mut events,None).unwrap();
        println!("poll returned!");
        for event in &events {
            match event.token() {
                QUIT => running = false,
                CONNECT => {
                    match listener.accept() {
                        Ok((_s,pa)) => {
                            println!("tcp client connected: {} - closing it down."
                                , pa);
                        }
                        Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                            //wait_for_fd();
                            continue;
                        }
                        Err(e) => panic!("encountered IO error: {}", e),
                    }
                },
                Token(_) => continue
            }
        }
    }
}

fn start_tcpguard() -> (JoinHandle<()>,Sender::<u8>) {
    let (tx,rx) = mio_extras::channel::channel::<u8>();
    let thrd = spawn(move || {
        guard_external_tcp_port(rx);
    });
    (thrd,tx)
}

I also tried the sync_channel(0) version and it fails to work as intended. The version in the code above, though works as far as poll receiving the shutdown event is concerned.

BitTickler
  • 10,905
  • 5
  • 32
  • 53