Skip to content

Rustup #80

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 134 additions & 104 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ features = ["unicode"]

[dependencies.futures-preview]
git = "https://github.com/embed-rs/futures-rs.git"
branch = "alloc"
branch = "alloc-rebase"
default-features = false
features = ["alloc"]

Expand Down
14 changes: 7 additions & 7 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub mod future {
pin::Pin,
ptr,
sync::atomic::{AtomicPtr, Ordering},
task::{LocalWaker, Poll},
task::{Waker, Poll},
};

/// Wrap a future in a generator.
Expand All @@ -36,7 +36,7 @@ pub mod future {

impl<T: Generator<Yield = ()>> Future for GenFuture<T> {
type Output = T::Return;
fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
fn poll(self: Pin<&mut Self>, lw: &Waker) -> Poll<Self::Output> {
// Safe because we're !Unpin + !Drop mapping to a ?Unpin value
let gen = unsafe { Pin::map_unchecked_mut(self, |s| &mut s.0) };
set_task_waker(lw, || match gen.resume() {
Expand All @@ -47,9 +47,9 @@ pub mod future {
}

// FIXME: Should be thread local, but is currently a static since we only have a single thread
static TLS_WAKER: AtomicPtr<LocalWaker> = AtomicPtr::new(ptr::null_mut());
static TLS_WAKER: AtomicPtr<Waker> = AtomicPtr::new(ptr::null_mut());

struct SetOnDrop(*mut LocalWaker);
struct SetOnDrop(*mut Waker);

impl Drop for SetOnDrop {
fn drop(&mut self) {
Expand All @@ -58,7 +58,7 @@ pub mod future {
}

/// Sets the thread-local task context used by async/await futures.
pub fn set_task_waker<F, R>(lw: &LocalWaker, f: F) -> R
pub fn set_task_waker<F, R>(lw: &Waker, f: F) -> R
where
F: FnOnce() -> R,
{
Expand All @@ -75,14 +75,14 @@ pub mod future {
/// retrieved by a surrounding call to get_task_waker.
pub fn get_task_waker<F, R>(f: F) -> R
where
F: FnOnce(&LocalWaker) -> R,
F: FnOnce(&Waker) -> R,
{
// Clear the entry so that nested `get_task_waker` calls
// will fail or set their own value.
let waker_ptr = TLS_WAKER.swap(ptr::null_mut(), Ordering::SeqCst);
let _reset_waker = SetOnDrop(waker_ptr);

let waker_ptr = unsafe { waker_ptr.as_ref() }.expect("TLS LocalWaker not set.");
let waker_ptr = unsafe { waker_ptr.as_ref() }.expect("TLS Waker not set.");
f(waker_ptr)
}

Expand Down
15 changes: 8 additions & 7 deletions src/bin/async-await.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ where

async fn run(mut self) {
use smoltcp::dhcp::Dhcpv4Client;
use smoltcp::socket::{RawPacketMetadata, RawSocketBuffer, SocketSet};
use smoltcp::socket::SocketSet;
use smoltcp::wire::{IpCidr, Ipv4Address};

let ethernet_interface = ethernet::EthernetDevice::new(
Expand All @@ -517,14 +517,14 @@ where

let mut sockets = SocketSet::new(Vec::new());

let dhcp_rx_buffer = RawSocketBuffer::new([RawPacketMetadata::EMPTY; 1], vec![0; 1500]);
let dhcp_tx_buffer = RawSocketBuffer::new([RawPacketMetadata::EMPTY; 1], vec![0; 3000]);
let dhcp_rx_buffer = UdpSocketBuffer::new([UdpPacketMetadata::EMPTY; 1], vec![0; 1500]);
let dhcp_tx_buffer = UdpSocketBuffer::new([UdpPacketMetadata::EMPTY; 1], vec![0; 3000]);
let mut dhcp = Dhcpv4Client::new(
&mut sockets,
dhcp_rx_buffer,
dhcp_tx_buffer,
Instant::from_millis(system_clock::ms() as i64),
);
).expect("could not bind udp socket for dhcp");
let mut prev_ip_addr = iface.ipv4_addr().unwrap();

// handle new ethernet packets
Expand All @@ -546,8 +546,8 @@ where
}
}

dhcp.poll(&mut iface, &mut sockets, timestamp)
.unwrap_or_else(|e| println!("DHCP: {:?}", e));
let config = dhcp.poll(&mut iface, &mut sockets, timestamp)
.unwrap_or_else(|e| {println!("DHCP: {:?}", e); None });
let ip_addr = iface.ipv4_addr().unwrap();
if ip_addr != prev_ip_addr {
println!("\nAssigned a new IPv4 address: {}", ip_addr);
Expand All @@ -558,7 +558,7 @@ where
println!("Default gateway: {}", default_route.via_router);
});
});
for dns_server in dhcp.dns_servers() {
for dns_server in config.iter().flat_map(|c| c.dns_servers.iter()).filter_map(|x| x.as_ref()) {
println!("DNS servers: {}", dns_server);
}

Expand Down Expand Up @@ -608,6 +608,7 @@ where
}
socket.send_slice(&reply.0, reply.1)?;
},
smoltcp::dhcp::UDP_CLIENT_PORT => {}, // dhcp packet
_ => unreachable!(),
},
&mut Socket::Tcp(ref mut socket) => match socket.local_endpoint().port {
Expand Down
18 changes: 9 additions & 9 deletions src/bin/polling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ use alloc_cortex_m::CortexMHeap;
use core::alloc::Layout as AllocLayout;
use core::fmt::Write;
use core::panic::PanicInfo;
use cortex_m::{asm, interrupt};
use cortex_m::{asm, interrupt, peripheral::NVIC};
use rt::{entry, exception, ExceptionFrame};
use sh::hio::{self, HStdout};
use smoltcp::{
dhcp::Dhcpv4Client,
socket::{
RawPacketMetadata, RawSocketBuffer, Socket, SocketSet, TcpSocket, TcpSocketBuffer,
Socket, SocketSet, TcpSocket, TcpSocketBuffer,
UdpPacketMetadata, UdpSocket, UdpSocketBuffer,
},
time::Instant,
Expand Down Expand Up @@ -156,14 +156,14 @@ fn main() -> ! {
};

let mut sockets = SocketSet::new(Vec::new());
let dhcp_rx_buffer = RawSocketBuffer::new([RawPacketMetadata::EMPTY; 1], vec![0; 1500]);
let dhcp_tx_buffer = RawSocketBuffer::new([RawPacketMetadata::EMPTY; 1], vec![0; 3000]);
let dhcp_rx_buffer = UdpSocketBuffer::new([UdpPacketMetadata::EMPTY; 1], vec![0; 1500]);
let dhcp_tx_buffer = UdpSocketBuffer::new([UdpPacketMetadata::EMPTY; 1], vec![0; 3000]);
let mut dhcp = Dhcpv4Client::new(
&mut sockets,
dhcp_rx_buffer,
dhcp_tx_buffer,
Instant::from_millis(system_clock::ms() as i64),
);
).expect("could not bind udp socket");

let mut previous_button_state = pins.button.get();
let mut audio_writer = AudioWriter::new();
Expand All @@ -175,7 +175,7 @@ fn main() -> ! {
pins.led.toggle();

// trigger the `EXTI0` interrupt
nvic.set_pending(Interrupt::EXTI0);
NVIC::pend(Interrupt::EXTI0);
}

previous_button_state = current_button_state;
Expand Down Expand Up @@ -216,8 +216,8 @@ fn main() -> ! {
}
}

dhcp.poll(iface, &mut sockets, timestamp)
.unwrap_or_else(|e| println!("DHCP: {:?}", e));
let config = dhcp.poll(iface, &mut sockets, timestamp)
.unwrap_or_else(|e| { println!("DHCP: {:?}", e); None});
let ip_addr = iface.ipv4_addr().unwrap();
if ip_addr != *prev_ip_addr {
println!("\nAssigned a new IPv4 address: {}", ip_addr);
Expand All @@ -228,7 +228,7 @@ fn main() -> ! {
println!("Default gateway: {}", default_route.via_router);
});
});
for dns_server in dhcp.dns_servers() {
for dns_server in config.iter().flat_map(|c| c.dns_servers.iter()).filter_map(|x| x.as_ref()) {
println!("DNS servers: {}", dns_server);
}

Expand Down
8 changes: 4 additions & 4 deletions src/future_mutex.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
//! Provides a non-blocking Mutex based on Futures.

use crate::mpsc_queue::{PopResult, Queue};
use alloc::task::LocalWaker;
use core::task::Waker;
use core::{future::Future, mem, pin::Pin};
use futures::task::Poll;
use spin::Mutex;

/// A Mutex that yields instead of blocking.
pub struct FutureMutex<T> {
mutex: Mutex<T>,
waker_queue: Queue<LocalWaker>,
waker_queue: Queue<Waker>,
}

impl<T> FutureMutex<T> {
Expand Down Expand Up @@ -44,7 +44,7 @@ where
{
mutex: &'a Mutex<T>,
f: Option<F>,
waker_queue: &'a Queue<LocalWaker>,
waker_queue: &'a Queue<Waker>,
}

impl<'a, T, R, F> Future for FutureMutexResult<'a, T, R, F>
Expand All @@ -53,7 +53,7 @@ where
{
type Output = R;

fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, lw: &Waker) -> Poll<Self::Output> {
match self.mutex.try_lock() {
None => {
self.waker_queue.push(lw.clone());
Expand Down
4 changes: 2 additions & 2 deletions src/interrupts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,13 +479,13 @@ impl<'a> InterruptTable<'a> {
/// Clears the pending state of the interrupt corresponding to the `interrupt_handle`.
pub fn clear_pending_state<T>(&mut self, interrupt_handle: &InterruptHandle<T>) {
let irq = InterruptId(interrupt_handle.irq.nr());
self.nvic.clear_pending(irq);
NVIC::unpend(irq);
}

/// Sets the pending state of the interrupt corresponding to the `interrupt_handle`.
pub fn set_pending_state<T>(&mut self, interrupt_handle: &InterruptHandle<T>) {
let irq = InterruptId(interrupt_handle.irq.nr());
self.nvic.set_pending(irq);
NVIC::pend(irq);
}

/// Returns the pending state of the interrupt corresponding to the `interrupt_handle`.
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#![feature(never_type)]
#![feature(generators)]
#![feature(async_await)]
#![feature(const_transmute)]
#![warn(missing_docs)]

#[macro_use]
Expand Down
62 changes: 47 additions & 15 deletions src/task_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@ use alloc::{
collections::BTreeMap,
prelude::*,
sync::Arc,
task::{local_waker_from_nonlocal, LocalWaker, Wake},
};
use core::ops::{Add, AddAssign};
use core::pin::Pin;
use futures::{
channel::mpsc,
future::{FutureObj, LocalFutureObj},
prelude::*,
task::{LocalSpawn, Poll, Spawn, SpawnError},
task::{LocalSpawn, Poll, Spawn, SpawnError, Waker, RawWaker, RawWakerVTable},
};

/// An executor that schedules tasks round-robin, and executes an idle_task
Expand Down Expand Up @@ -77,32 +76,51 @@ impl Executor {
woken_tasks: self.woken_tasks.clone(),
};
let poll_result = {
let task = self.tasks.get_mut(&task_id).expect(&format!("task with id {:?} not found", task_id));
task.as_mut().poll(&local_waker_from_nonlocal(Arc::new(waker)))
let task = self.tasks.get_mut(&task_id).unwrap_or_else(|| panic!("task with id {:?} not found", task_id));
task.as_mut().poll(&waker.into_waker())
};
if poll_result.is_ready() {
self.tasks.remove(&task_id).expect(&format!("Task {:?} not found", task_id));
self.tasks.remove(&task_id).unwrap_or_else(|| panic!("Task {:?} not found", task_id));
}
}
PopResult::Empty => {}
PopResult::Inconsistent => {} // println!("woken_tasks queue is inconsistent"),
}
if let Some(ref mut idle_task) = self.idle_task {
idle_task
let _ = idle_task
.as_mut()
.poll(&local_waker_from_nonlocal(Arc::new(NoOpWaker)));
.poll(&NoOpWaker.into_waker());
};
}
}

#[derive(Clone)]
struct MyWaker {
task_id: TaskId,
woken_tasks: Arc<Queue<TaskId>>,
}

impl Wake for MyWaker {
fn wake(arc_self: &Arc<Self>) {
arc_self.woken_tasks.push(arc_self.task_id);
const MY_WAKER_VTABLE: RawWakerVTable = unsafe { RawWakerVTable {
drop: core::mem::transmute(MyWaker::waker_drop as fn(Box<MyWaker>)),
wake: core::mem::transmute(MyWaker::wake as fn(&MyWaker)),
clone: core::mem::transmute(MyWaker::waker_clone as fn(&MyWaker) -> RawWaker),
}};

impl MyWaker {
fn into_raw_waker(self) -> RawWaker {
RawWaker::new(Box::into_raw(Box::new(self)) as *const (), &MY_WAKER_VTABLE)
}
fn waker_drop(_: Box<Self>) {}
fn waker_clone(&self) -> RawWaker {
self.clone().into_raw_waker()
}
fn wake(&self) {
self.woken_tasks.push(self.task_id);
}
fn into_waker(self) -> Waker {
unsafe {
Waker::new_unchecked(self.into_raw_waker())
}
}
}

Expand All @@ -125,8 +143,22 @@ impl AddAssign<u64> for TaskId {

struct NoOpWaker;

impl Wake for NoOpWaker {
fn wake(_arc_self: &Arc<Self>) {}
impl NoOpWaker {
fn into_waker(self) -> Waker {
unsafe {
Waker::new_unchecked(self.into_raw_waker())
}
}
fn into_raw_waker(self) -> RawWaker {
RawWaker::new(
&NoOpWaker as *const _ as *const (),
&RawWakerVTable {
drop: (|_| {}) as fn(*const ()),
wake: (|_| {}) as fn(*const ()),
clone: (|_| NoOpWaker.into_raw_waker()) as fn(*const ()) -> RawWaker,
},
)
}
}

/// This stream can be used by tasks that want to run when the CPU is idle.
Expand All @@ -139,15 +171,15 @@ impl Wake for NoOpWaker {
#[derive(Debug, Clone)]
pub struct IdleStream {
idle: bool,
idle_waker_sink: mpsc::UnboundedSender<LocalWaker>,
idle_waker_sink: mpsc::UnboundedSender<Waker>,
}

impl IdleStream {
/// Creates a new IdleStream with the passed sending end of an idle stream.
///
/// The idle task should wake the tasks received from the receiving end
/// of the idle stream, thereby waking the tasks on idle.
pub fn new(idle_waker_sink: mpsc::UnboundedSender<LocalWaker>) -> Self {
pub fn new(idle_waker_sink: mpsc::UnboundedSender<Waker>) -> Self {
IdleStream {
idle_waker_sink,
idle: false,
Expand All @@ -158,7 +190,7 @@ impl IdleStream {
impl futures::prelude::Stream for IdleStream {
type Item = ();

fn poll_next(mut self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Option<()>> {
fn poll_next(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Option<()>> {
let result = if self.idle {
Poll::Ready(Some(()))
} else {
Expand Down