Relax an assertion in start_selection()
It asserted that the previous count was always nonnegative, but DISCONNECTED is a valid value for it to see. In order to continue to remember to store DISCONNECTED after DISCONNECTED was seen, I also added a helper method. Closes #12226
This commit is contained in:
parent
411a01feb3
commit
065e121fc2
3 changed files with 122 additions and 10 deletions
|
|
@ -151,6 +151,11 @@ impl Select {
|
||||||
/// event could either be that data is available or the corresponding
|
/// event could either be that data is available or the corresponding
|
||||||
/// channel has been closed.
|
/// channel has been closed.
|
||||||
pub fn wait(&self) -> uint {
|
pub fn wait(&self) -> uint {
|
||||||
|
self.wait2(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Helper method for skipping the preflight checks during testing
|
||||||
|
fn wait2(&self, do_preflight_checks: bool) -> uint {
|
||||||
// Note that this is currently an inefficient implementation. We in
|
// Note that this is currently an inefficient implementation. We in
|
||||||
// theory have knowledge about all ports in the set ahead of time, so
|
// theory have knowledge about all ports in the set ahead of time, so
|
||||||
// this method shouldn't really have to iterate over all of them yet
|
// this method shouldn't really have to iterate over all of them yet
|
||||||
|
|
@ -175,7 +180,7 @@ impl Select {
|
||||||
let mut amt = 0;
|
let mut amt = 0;
|
||||||
for p in self.iter() {
|
for p in self.iter() {
|
||||||
amt += 1;
|
amt += 1;
|
||||||
if (*p).packet.can_recv() {
|
if do_preflight_checks && (*p).packet.can_recv() {
|
||||||
return (*p).id;
|
return (*p).id;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -507,7 +512,7 @@ mod test {
|
||||||
let (p2, c2) = Chan::<()>::new();
|
let (p2, c2) = Chan::<()>::new();
|
||||||
let (p, c) = Chan::new();
|
let (p, c) = Chan::new();
|
||||||
spawn(proc() {
|
spawn(proc() {
|
||||||
let mut s = Select::new();
|
let s = Select::new();
|
||||||
let mut h1 = s.handle(&p1);
|
let mut h1 = s.handle(&p1);
|
||||||
let mut h2 = s.handle(&p2);
|
let mut h2 = s.handle(&p2);
|
||||||
unsafe { h2.add(); }
|
unsafe { h2.add(); }
|
||||||
|
|
@ -521,4 +526,91 @@ mod test {
|
||||||
c2.send(());
|
c2.send(());
|
||||||
p.recv();
|
p.recv();
|
||||||
})
|
})
|
||||||
|
|
||||||
|
test!(fn preflight1() {
|
||||||
|
let (p, c) = Chan::new();
|
||||||
|
c.send(());
|
||||||
|
select!(
|
||||||
|
() = p.recv() => {},
|
||||||
|
)
|
||||||
|
})
|
||||||
|
|
||||||
|
test!(fn preflight2() {
|
||||||
|
let (p, c) = Chan::new();
|
||||||
|
c.send(());
|
||||||
|
c.send(());
|
||||||
|
select!(
|
||||||
|
() = p.recv() => {},
|
||||||
|
)
|
||||||
|
})
|
||||||
|
|
||||||
|
test!(fn preflight3() {
|
||||||
|
let (p, c) = Chan::new();
|
||||||
|
drop(c.clone());
|
||||||
|
c.send(());
|
||||||
|
select!(
|
||||||
|
() = p.recv() => {},
|
||||||
|
)
|
||||||
|
})
|
||||||
|
|
||||||
|
test!(fn preflight4() {
|
||||||
|
let (p, c) = Chan::new();
|
||||||
|
c.send(());
|
||||||
|
let s = Select::new();
|
||||||
|
let mut h = s.handle(&p);
|
||||||
|
unsafe { h.add(); }
|
||||||
|
assert_eq!(s.wait2(false), h.id);
|
||||||
|
})
|
||||||
|
|
||||||
|
test!(fn preflight5() {
|
||||||
|
let (p, c) = Chan::new();
|
||||||
|
c.send(());
|
||||||
|
c.send(());
|
||||||
|
let s = Select::new();
|
||||||
|
let mut h = s.handle(&p);
|
||||||
|
unsafe { h.add(); }
|
||||||
|
assert_eq!(s.wait2(false), h.id);
|
||||||
|
})
|
||||||
|
|
||||||
|
test!(fn preflight6() {
|
||||||
|
let (p, c) = Chan::new();
|
||||||
|
drop(c.clone());
|
||||||
|
c.send(());
|
||||||
|
let s = Select::new();
|
||||||
|
let mut h = s.handle(&p);
|
||||||
|
unsafe { h.add(); }
|
||||||
|
assert_eq!(s.wait2(false), h.id);
|
||||||
|
})
|
||||||
|
|
||||||
|
test!(fn preflight7() {
|
||||||
|
let (p, c) = Chan::<()>::new();
|
||||||
|
drop(c);
|
||||||
|
let s = Select::new();
|
||||||
|
let mut h = s.handle(&p);
|
||||||
|
unsafe { h.add(); }
|
||||||
|
assert_eq!(s.wait2(false), h.id);
|
||||||
|
})
|
||||||
|
|
||||||
|
test!(fn preflight8() {
|
||||||
|
let (p, c) = Chan::new();
|
||||||
|
c.send(());
|
||||||
|
drop(c);
|
||||||
|
p.recv();
|
||||||
|
let s = Select::new();
|
||||||
|
let mut h = s.handle(&p);
|
||||||
|
unsafe { h.add(); }
|
||||||
|
assert_eq!(s.wait2(false), h.id);
|
||||||
|
})
|
||||||
|
|
||||||
|
test!(fn preflight9() {
|
||||||
|
let (p, c) = Chan::new();
|
||||||
|
drop(c.clone());
|
||||||
|
c.send(());
|
||||||
|
drop(c);
|
||||||
|
p.recv();
|
||||||
|
let s = Select::new();
|
||||||
|
let mut h = s.handle(&p);
|
||||||
|
unsafe { h.add(); }
|
||||||
|
assert_eq!(s.wait2(false), h.id);
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -398,6 +398,17 @@ impl<T: Send> Packet<T> {
|
||||||
cnt == DISCONNECTED || cnt - self.steals > 0
|
cnt == DISCONNECTED || cnt - self.steals > 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// increment the count on the channel (used for selection)
|
||||||
|
fn bump(&mut self, amt: int) -> int {
|
||||||
|
match self.cnt.fetch_add(amt, atomics::SeqCst) {
|
||||||
|
DISCONNECTED => {
|
||||||
|
self.cnt.store(DISCONNECTED, atomics::SeqCst);
|
||||||
|
DISCONNECTED
|
||||||
|
}
|
||||||
|
n => n
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Inserts the blocked task for selection on this port, returning it back if
|
// Inserts the blocked task for selection on this port, returning it back if
|
||||||
// the port already has data on it.
|
// the port already has data on it.
|
||||||
//
|
//
|
||||||
|
|
@ -408,8 +419,8 @@ impl<T: Send> Packet<T> {
|
||||||
match self.decrement(task) {
|
match self.decrement(task) {
|
||||||
Ok(()) => Ok(()),
|
Ok(()) => Ok(()),
|
||||||
Err(task) => {
|
Err(task) => {
|
||||||
let prev = self.cnt.fetch_add(1, atomics::SeqCst);
|
let prev = self.bump(1);
|
||||||
assert!(prev >= 0);
|
assert!(prev == DISCONNECTED || prev >= 0);
|
||||||
return Err(task);
|
return Err(task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -440,11 +451,10 @@ impl<T: Send> Packet<T> {
|
||||||
let cnt = self.cnt.load(atomics::SeqCst);
|
let cnt = self.cnt.load(atomics::SeqCst);
|
||||||
if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0}
|
if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0}
|
||||||
};
|
};
|
||||||
let prev = self.cnt.fetch_add(steals + 1, atomics::SeqCst);
|
let prev = self.bump(steals + 1);
|
||||||
|
|
||||||
if prev == DISCONNECTED {
|
if prev == DISCONNECTED {
|
||||||
assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
|
assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
|
||||||
self.cnt.store(DISCONNECTED, atomics::SeqCst);
|
|
||||||
true
|
true
|
||||||
} else {
|
} else {
|
||||||
let cur = prev + steals + 1;
|
let cur = prev + steals + 1;
|
||||||
|
|
|
||||||
|
|
@ -333,6 +333,17 @@ impl<T: Send> Packet<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// increment the count on the channel (used for selection)
|
||||||
|
fn bump(&mut self, amt: int) -> int {
|
||||||
|
match self.cnt.fetch_add(amt, atomics::SeqCst) {
|
||||||
|
DISCONNECTED => {
|
||||||
|
self.cnt.store(DISCONNECTED, atomics::SeqCst);
|
||||||
|
DISCONNECTED
|
||||||
|
}
|
||||||
|
n => n
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Attempts to start selecting on this port. Like a oneshot, this can fail
|
// Attempts to start selecting on this port. Like a oneshot, this can fail
|
||||||
// immediately because of an upgrade.
|
// immediately because of an upgrade.
|
||||||
pub fn start_selection(&mut self, task: BlockedTask) -> SelectionResult<T> {
|
pub fn start_selection(&mut self, task: BlockedTask) -> SelectionResult<T> {
|
||||||
|
|
@ -351,8 +362,8 @@ impl<T: Send> Packet<T> {
|
||||||
};
|
};
|
||||||
// Undo our decrement above, and we should be guaranteed that the
|
// Undo our decrement above, and we should be guaranteed that the
|
||||||
// previous value is positive because we're not going to sleep
|
// previous value is positive because we're not going to sleep
|
||||||
let prev = self.cnt.fetch_add(1, atomics::SeqCst);
|
let prev = self.bump(1);
|
||||||
assert!(prev >= 0);
|
assert!(prev == DISCONNECTED || prev >= 0);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -384,13 +395,12 @@ impl<T: Send> Packet<T> {
|
||||||
// and in the stream case we can have at most one steal, so just assume
|
// and in the stream case we can have at most one steal, so just assume
|
||||||
// that we had one steal.
|
// that we had one steal.
|
||||||
let steals = 1;
|
let steals = 1;
|
||||||
let prev = self.cnt.fetch_add(steals + 1, atomics::SeqCst);
|
let prev = self.bump(steals + 1);
|
||||||
|
|
||||||
// If we were previously disconnected, then we know for sure that there
|
// If we were previously disconnected, then we know for sure that there
|
||||||
// is no task in to_wake, so just keep going
|
// is no task in to_wake, so just keep going
|
||||||
let has_data = if prev == DISCONNECTED {
|
let has_data = if prev == DISCONNECTED {
|
||||||
assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
|
assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
|
||||||
self.cnt.store(DISCONNECTED, atomics::SeqCst);
|
|
||||||
true // there is data, that data is that we're disconnected
|
true // there is data, that data is that we're disconnected
|
||||||
} else {
|
} else {
|
||||||
let cur = prev + steals + 1;
|
let cur = prev + steals + 1;
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue