Skip to content

Let unpfs server operate through plain file descriptors. #30

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,13 @@ cargo run --release 'tcp!0.0.0.0!564' /exportdir
# port number is a suffix to the unix domain socket
# 'unix!/tmp/unpfs-socket!n' creates `/tmp/unpfs-socket:n`
cargo run --release 'unix!/tmp/unpfs-socket!0' /exportdir

# File descriptors (cannot be used directly; see
# example/unpfs/fd-mount.py for a use of this)
cargo build --release
./target/release/unpfs 'fd!0!1' /exportdir
```

You are now ready to import/mount the remote filesystem.
Let's mount it at `/mountdir`:

Expand All @@ -48,6 +54,8 @@ Let's mount it at `/mountdir`:
sudo mount -t 9p -o version=9p2000.L,trans=tcp,port=564,uname=$USER 127.0.0.1 /mountdir
# Unix domain socket
sudo mount -t 9p -o version=9p2000.L,trans=unix,uname=$USER /tmp/unpfs-socket:0 /mountdir
# File descriptors: see example/unpfs/fd-mount.py
# for a working example.
```

| Mount option | Value |
Expand Down
101 changes: 101 additions & 0 deletions example/unpfs/fd-mount.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import getpass
import os
import shlex
import shutil
import socket
import subprocess
import sys
import logging


def error(message: str, exitstatus: int = 4) -> int:
print("error:", message, file=sys.stderr)
return exitstatus


def main() -> int:
"""Run the routine."""
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()

hostname = socket.gethostname()
try:
source = sys.argv[1]
target = sys.argv[2]
except IndexError:
relfile = os.path.relpath(__file__, os.getcwd())
return error(
f"""you did not specify the source or the target folders

usage:

export PATH="/path/to/unpfs/binary:$PATH"
{sys.executable} {relfile} <folder to export through unpfs> <mount point for the exported folder>

Note that this program will attempt to run the mount operation using sudo.
If sudo is not passwordless and you are not running as root, the mount
operation will fail.

To see real-time information from unpfs, export variable RUST_LOG=info
before executing this program.
""",
os.EX_USAGE,
)

if not shutil.which("unpfs"):
return error(
"unpfs cannot be found in the system PATH",
os.EX_USAGE,
)

f1_read, f1_write = os.pipe2(0)
f2_read, f2_write = os.pipe2(0)

stdin_for_read = os.fdopen(f1_read, "rb", buffering=0)
stdout_for_write = os.fdopen(f2_write, "wb", buffering=0)
stdin_for_write = os.fdopen(f1_write, "wb", buffering=0)
stdout_for_read = os.fdopen(f2_read, "rb", buffering=0)

# With the fingerprint, use it to invoke the RPC service that was
# just authorized for this script.
cmdline = ["unpfs", "fd!0!1", sys.argv[1]]
env = dict(os.environ.items())
env["RUST_LOG"] = "info"
logger.info("Running %s", shlex.join(cmdline))
subprocess.Popen(
cmdline,
stdin=stdin_for_read,
stdout=stdout_for_write,
bufsize=0,
close_fds=True,
env=env,
)
stdin_for_read.close()
stdout_for_write.close()

uid = os.getuid()
gid = os.getgid()
username = getpass.getuser()
cmdline = [
"/usr/bin/sudo",
"/usr/bin/mount",
"-t",
"9p",
"-o",
"trans=fd,rfdno=%s,wfdno=%s,version=9p2000.L,dfltuid=%s,dfltgid=%s,uname=%s,aname=%s"
% (0, 1, uid, gid, username, source),
"unpfs://%s%s" % (hostname, source),
target,
]
logger.info("Running %s", shlex.join(cmdline))
p2 = subprocess.Popen(
cmdline, stdin=stdout_for_read, stdout=stdin_for_write, close_fds=True
)
stdout_for_read.close()
stdin_for_write.close()

return p2.wait()


if __name__ == "__main__":
sys.exit(main())
10 changes: 7 additions & 3 deletions example/unpfs/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,12 @@ impl Filesystem for Unpfs {

async fn unpfs_main(args: Vec<String>) -> rs9p::Result<i32> {
if args.len() < 3 {
eprintln!("Usage: {} proto!address!port mountpoint", args[0]);
eprintln!(" where: proto = tcp | unix");
eprintln!("Usage: {} <proto!address!port> <mountpoint>", args[0]);
eprintln!(" where: proto = tcp | unix | fd");
eprintln!("Examples:");
eprintln!(" {} 'tcp!0.0.0.0!564' /path/to/export", args[0]);
eprintln!(" {} 'unix!/tmp/unpfs.socket!0' /path/to/export", args[0]);
eprintln!(" {} 'fd!0!1' /path/to/export", args[0]);
return Ok(-1);
}

Expand All @@ -402,7 +406,7 @@ async fn unpfs_main(args: Vec<String>) -> rs9p::Result<i32> {
return res!(io_err!(Other, "mount point must be a directory"));
}

println!("[*] Ready to accept clients: {}", addr);
eprintln!("[*] About to accept clients: {}", addr);
srv_async(
Unpfs {
realroot: mountpoint,
Expand Down
2 changes: 2 additions & 0 deletions example/unpfs/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ macro_rules! INVALID_FID {

pub fn create_buffer(size: usize) -> Vec<u8> {
let mut buffer = Vec::with_capacity(size);
// https://rust-lang.github.io/rust-clippy/master/index.html#/uninit_vec
let _remaining = buffer.spare_capacity_mut();
unsafe {
buffer.set_len(size);
}
Expand Down
58 changes: 43 additions & 15 deletions src/srv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ use {
error::errno::*,
fcall::*,
serialize,
utils::{self, Result},
utils::{self, Result, AddrSpec},
},
async_trait::async_trait,
bytes::buf::{Buf, BufMut},
futures::sink::SinkExt,
std::{collections::HashMap, sync::Arc},
std::{collections::HashMap, sync::Arc, str::FromStr, os::fd::FromRawFd},
tokio::{
io::{AsyncRead, AsyncWrite},
net::{TcpListener, UnixListener},
Expand Down Expand Up @@ -396,14 +396,15 @@ where

loop {
let (stream, peer) = listener.accept().await?;
info!("accepted: {:?}", peer);
info!("Accepted TCP peer: {:?}", peer);

let fs = filesystem.clone();
tokio::spawn(async move {
let (readhalf, writehalf) = stream.into_split();
let res = dispatch(fs, readhalf, writehalf).await;
if let Err(e) = res {
error!("Error: {}: {:?}", e, e);
if let Err(e) = dispatch(fs, readhalf, writehalf).await {
error!("Error with TCP peer {:?}: {:?}", peer, e);
} else {
info!("TCP peer {:?} now disconnected", peer);
}
});
}
Expand All @@ -417,29 +418,56 @@ where

loop {
let (stream, peer) = listener.accept().await?;
info!("accepted: {:?}", peer);
info!("Accepted UNIX peer: {:?}", peer);

let fs = filesystem.clone();
tokio::spawn(async move {
let (readhalf, writehalf) = tokio::io::split(stream);
let res = dispatch(fs, readhalf, writehalf).await;
if let Err(e) = res {
error!("Error: {:?}", e);
if let Err(e) = dispatch(fs, readhalf, writehalf).await {
error!("Error with UNIX peer {:?}: {:?}", peer, e);
} else {
info!("UNIX peer {:?} now disconnected", peer);
}
});
}
}

pub async fn srv_once_fd<Fs>(filesystem: Fs, readfd: i32, writefd: i32) -> Result<()>
where
Fs: 'static + Filesystem + Send + Sync + Clone,
{
let readhalf: tokio::fs::File;
let writehalf: tokio::fs::File;
unsafe {
readhalf = tokio::fs::File::from_raw_fd(readfd);
writehalf = tokio::fs::File::from_raw_fd(writefd);
}
info!("Accepted FD pair peer: {:?},{:?}", readfd, writefd);
let fs = filesystem.clone();
if let Err(e) = dispatch(fs, readhalf, writehalf).await {
error!("Error with FD pair peer {:?},{:?}: {:?}", readfd, writefd, e);
Err(e)
} else {
info!("FD pair peer {:?},{:?} now disconnected", readfd, writefd);
Ok(())
}
}

pub async fn srv_async<Fs>(filesystem: Fs, addr: &str) -> Result<()>
where
Fs: 'static + Filesystem + Send + Sync + Clone,
{
let (proto, listen_addr) = utils::parse_proto(addr)
.ok_or_else(|| io_err!(InvalidInput, "Invalid protocol or address"))?;
let proto = match utils::AddrSpec::from_str(addr) {
Ok(p) => p,
Err(e) => {
error!("error: {}", e);
return Err(error::Error::No(nix::errno::Errno::EINVAL));
},
};

match proto {
"tcp" => srv_async_tcp(filesystem, &listen_addr).await,
"unix" => srv_async_unix(filesystem, &listen_addr).await,
_ => Err(From::from(io_err!(InvalidInput, "Protocol not supported"))),
AddrSpec::Tcp(listen_addr) => srv_async_tcp(filesystem, listen_addr.as_str()).await,
AddrSpec::Unix(listen_addr) => srv_async_unix(filesystem, listen_addr.as_str()).await,
AddrSpec::Fd(readfd, writefd) => srv_once_fd(filesystem, readfd, writefd).await,
}
}
86 changes: 82 additions & 4 deletions src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::fmt;
use crate::error;
use std::result::Result as StdResult;
use std::str::FromStr;

pub type Result<T> = ::std::result::Result<T, error::Error>;

Expand All @@ -14,9 +17,84 @@ macro_rules! res {
};
}

pub fn parse_proto(arg: &str) -> Option<(&str, String)> {
let mut split = arg.split('!');
let (proto, addr, port) = (split.next()?, split.next()?, split.next()?);
#[derive(Debug)]
pub struct AddrSpecParseError {
message: String,
}

impl AddrSpecParseError {
fn new(message :String) -> Self {
return AddrSpecParseError{message}
}
}

impl std::error::Error for AddrSpecParseError {}

impl fmt::Display for AddrSpecParseError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.message)
}
}

pub enum AddrSpec {
Tcp(String),
Unix(String),
Fd(i32, i32),
}

impl FromStr for AddrSpec {
type Err = AddrSpecParseError;

fn from_str(arg: &str) -> StdResult<Self, Self::Err> {
let mut split = arg.split('!');
let proto = match split.next() {
Some(p) => p,
None => return Err(AddrSpecParseError::new("No protocol specified".into())),
};

Some((proto, addr.to_owned() + ":" + port))
match proto {
"tcp" => {
let addr = match split.next() {
Some(p) => p,
None => return Err(AddrSpecParseError::new("No listen address specified".into())),
};
let port = match split.next() {
Some(p) => p,
None => return Err(AddrSpecParseError::new("No listen port specified".into())),
};
Ok(AddrSpec::Tcp(addr.to_owned() + ":" + port))
},
"unix" => {
let addr = match split.next() {
Some(p) => p,
None => return Err(AddrSpecParseError::new("No listen socket path specified".into())),
};
let port = match split.next() {
Some(p) => p,
None => return Err(AddrSpecParseError::new("No listen socket port specified".into())),
};
Ok(AddrSpec::Unix(addr.to_owned() + ":" + port))
},
"fd" => {
let readfd = match split.next() {
Some(p) => match p.parse::<i32>() {
Ok(p) => p,
Err(e) => return Err(AddrSpecParseError::new(format!("Invalid read file descriptor: {}", e))),
},
None => return Err(AddrSpecParseError::new("No read file descriptor specified".into())),
};
let writefd = match split.next() {
Some(p) => match p.parse::<i32>() {
Ok(p) => p,
Err(e) => return Err(AddrSpecParseError::new(format!("Invalid write file descriptor: {}", e))),
},
None => return Err(AddrSpecParseError::new("No file descriptor specified".into())),
};
Ok(AddrSpec::Fd(readfd, writefd))
},
_ => {
Err(AddrSpecParseError::new(format!("Unsupported protocol {}", proto)))
}
}
}
}