Relax an assertion in start_selection()

It asserted that the previous count was always nonnegative, but DISCONNECTED is
a valid value for it to see. In order to continue to remember to store
DISCONNECTED after DISCONNECTED was seen, I also added a helper method.

Closes #12226
This commit is contained in:
Alex Crichton 2014-02-12 20:51:42 -08:00
parent 411a01feb3
commit 065e121fc2
3 changed files with 122 additions and 10 deletions

View file

@ -151,6 +151,11 @@ impl Select {
/// event could either be that data is available or the corresponding /// event could either be that data is available or the corresponding
/// channel has been closed. /// channel has been closed.
pub fn wait(&self) -> uint { pub fn wait(&self) -> uint {
self.wait2(false)
}
/// Helper method for skipping the preflight checks during testing
fn wait2(&self, do_preflight_checks: bool) -> uint {
// Note that this is currently an inefficient implementation. We in // Note that this is currently an inefficient implementation. We in
// theory have knowledge about all ports in the set ahead of time, so // theory have knowledge about all ports in the set ahead of time, so
// this method shouldn't really have to iterate over all of them yet // this method shouldn't really have to iterate over all of them yet
@ -175,7 +180,7 @@ impl Select {
let mut amt = 0; let mut amt = 0;
for p in self.iter() { for p in self.iter() {
amt += 1; amt += 1;
if (*p).packet.can_recv() { if do_preflight_checks && (*p).packet.can_recv() {
return (*p).id; return (*p).id;
} }
} }
@ -507,7 +512,7 @@ mod test {
let (p2, c2) = Chan::<()>::new(); let (p2, c2) = Chan::<()>::new();
let (p, c) = Chan::new(); let (p, c) = Chan::new();
spawn(proc() { spawn(proc() {
let mut s = Select::new(); let s = Select::new();
let mut h1 = s.handle(&p1); let mut h1 = s.handle(&p1);
let mut h2 = s.handle(&p2); let mut h2 = s.handle(&p2);
unsafe { h2.add(); } unsafe { h2.add(); }
@ -521,4 +526,91 @@ mod test {
c2.send(()); c2.send(());
p.recv(); p.recv();
}) })
test!(fn preflight1() {
let (p, c) = Chan::new();
c.send(());
select!(
() = p.recv() => {},
)
})
test!(fn preflight2() {
let (p, c) = Chan::new();
c.send(());
c.send(());
select!(
() = p.recv() => {},
)
})
test!(fn preflight3() {
let (p, c) = Chan::new();
drop(c.clone());
c.send(());
select!(
() = p.recv() => {},
)
})
test!(fn preflight4() {
let (p, c) = Chan::new();
c.send(());
let s = Select::new();
let mut h = s.handle(&p);
unsafe { h.add(); }
assert_eq!(s.wait2(false), h.id);
})
test!(fn preflight5() {
let (p, c) = Chan::new();
c.send(());
c.send(());
let s = Select::new();
let mut h = s.handle(&p);
unsafe { h.add(); }
assert_eq!(s.wait2(false), h.id);
})
test!(fn preflight6() {
let (p, c) = Chan::new();
drop(c.clone());
c.send(());
let s = Select::new();
let mut h = s.handle(&p);
unsafe { h.add(); }
assert_eq!(s.wait2(false), h.id);
})
test!(fn preflight7() {
let (p, c) = Chan::<()>::new();
drop(c);
let s = Select::new();
let mut h = s.handle(&p);
unsafe { h.add(); }
assert_eq!(s.wait2(false), h.id);
})
test!(fn preflight8() {
let (p, c) = Chan::new();
c.send(());
drop(c);
p.recv();
let s = Select::new();
let mut h = s.handle(&p);
unsafe { h.add(); }
assert_eq!(s.wait2(false), h.id);
})
test!(fn preflight9() {
let (p, c) = Chan::new();
drop(c.clone());
c.send(());
drop(c);
p.recv();
let s = Select::new();
let mut h = s.handle(&p);
unsafe { h.add(); }
assert_eq!(s.wait2(false), h.id);
})
} }

View file

@ -398,6 +398,17 @@ impl<T: Send> Packet<T> {
cnt == DISCONNECTED || cnt - self.steals > 0 cnt == DISCONNECTED || cnt - self.steals > 0
} }
// increment the count on the channel (used for selection)
fn bump(&mut self, amt: int) -> int {
match self.cnt.fetch_add(amt, atomics::SeqCst) {
DISCONNECTED => {
self.cnt.store(DISCONNECTED, atomics::SeqCst);
DISCONNECTED
}
n => n
}
}
// Inserts the blocked task for selection on this port, returning it back if // Inserts the blocked task for selection on this port, returning it back if
// the port already has data on it. // the port already has data on it.
// //
@ -408,8 +419,8 @@ impl<T: Send> Packet<T> {
match self.decrement(task) { match self.decrement(task) {
Ok(()) => Ok(()), Ok(()) => Ok(()),
Err(task) => { Err(task) => {
let prev = self.cnt.fetch_add(1, atomics::SeqCst); let prev = self.bump(1);
assert!(prev >= 0); assert!(prev == DISCONNECTED || prev >= 0);
return Err(task); return Err(task);
} }
} }
@ -440,11 +451,10 @@ impl<T: Send> Packet<T> {
let cnt = self.cnt.load(atomics::SeqCst); let cnt = self.cnt.load(atomics::SeqCst);
if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0} if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0}
}; };
let prev = self.cnt.fetch_add(steals + 1, atomics::SeqCst); let prev = self.bump(steals + 1);
if prev == DISCONNECTED { if prev == DISCONNECTED {
assert_eq!(self.to_wake.load(atomics::SeqCst), 0); assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
self.cnt.store(DISCONNECTED, atomics::SeqCst);
true true
} else { } else {
let cur = prev + steals + 1; let cur = prev + steals + 1;

View file

@ -333,6 +333,17 @@ impl<T: Send> Packet<T> {
} }
} }
// increment the count on the channel (used for selection)
fn bump(&mut self, amt: int) -> int {
match self.cnt.fetch_add(amt, atomics::SeqCst) {
DISCONNECTED => {
self.cnt.store(DISCONNECTED, atomics::SeqCst);
DISCONNECTED
}
n => n
}
}
// Attempts to start selecting on this port. Like a oneshot, this can fail // Attempts to start selecting on this port. Like a oneshot, this can fail
// immediately because of an upgrade. // immediately because of an upgrade.
pub fn start_selection(&mut self, task: BlockedTask) -> SelectionResult<T> { pub fn start_selection(&mut self, task: BlockedTask) -> SelectionResult<T> {
@ -351,8 +362,8 @@ impl<T: Send> Packet<T> {
}; };
// Undo our decrement above, and we should be guaranteed that the // Undo our decrement above, and we should be guaranteed that the
// previous value is positive because we're not going to sleep // previous value is positive because we're not going to sleep
let prev = self.cnt.fetch_add(1, atomics::SeqCst); let prev = self.bump(1);
assert!(prev >= 0); assert!(prev == DISCONNECTED || prev >= 0);
return ret; return ret;
} }
} }
@ -384,13 +395,12 @@ impl<T: Send> Packet<T> {
// and in the stream case we can have at most one steal, so just assume // and in the stream case we can have at most one steal, so just assume
// that we had one steal. // that we had one steal.
let steals = 1; let steals = 1;
let prev = self.cnt.fetch_add(steals + 1, atomics::SeqCst); let prev = self.bump(steals + 1);
// If we were previously disconnected, then we know for sure that there // If we were previously disconnected, then we know for sure that there
// is no task in to_wake, so just keep going // is no task in to_wake, so just keep going
let has_data = if prev == DISCONNECTED { let has_data = if prev == DISCONNECTED {
assert_eq!(self.to_wake.load(atomics::SeqCst), 0); assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
self.cnt.store(DISCONNECTED, atomics::SeqCst);
true // there is data, that data is that we're disconnected true // there is data, that data is that we're disconnected
} else { } else {
let cur = prev + steals + 1; let cur = prev + steals + 1;