Commit 4523ee9a authored by Ken Thompson's avatar Ken Thompson

close/closed on chans

R=r
OCL=26281
CL=26285
parent db3a21d7
......@@ -9,10 +9,10 @@ static Lock chanlock;
enum
{
Wclosed = 0x0001,
Rclosed = 0xfffe,
Rincr = 0x0002,
Rmax = 0x8000,
Wclosed = 0x0001, // writer has closed
Rclosed = 0x0002, // reader has seen close
Eincr = 0x0004, // increment errors
Emax = 0x0800, // error limit before throw
};
typedef struct Hchan Hchan;
......@@ -41,8 +41,7 @@ struct WaitQ
struct Hchan
{
uint16 elemsize;
uint16 closed; // Wclosed closed() hash been called
// Rclosed read-count after closed()
uint16 closed; // Wclosed Rclosed errorcount
uint32 dataqsiz; // size of the circular q
uint32 qcount; // total data in the q
Alg* elemalg; // interface for element type
......@@ -143,6 +142,16 @@ sys·newchan(uint32 elemsize, uint32 elemalg, uint32 hint,
}
}
static void
incerr(Hchan* c)
{
c->closed += Eincr;
if(c->closed & Emax) {
unlock(&chanlock);
throw("too many operations on a closed channel");
}
}
/*
* generic single channel send/recv
* if the bool pointer is nil,
......@@ -167,9 +176,13 @@ sendchan(Hchan *c, byte *ep, bool *pres)
}
lock(&chanlock);
if(c->dataqsiz > 0)
goto asynch;
if(c->closed & Wclosed)
goto closed;
sg = dequeue(&c->recvq, c);
if(sg != nil) {
if(ep != nil)
......@@ -209,7 +222,10 @@ sendchan(Hchan *c, byte *ep, bool *pres)
return;
asynch:
while(c->qcount >= c->dataqsiz) {
if(c->closed & Wclosed)
goto closed;
if(c->qcount >= c->dataqsiz) {
if(pres != nil) {
unlock(&chanlock);
*pres = false;
......@@ -222,6 +238,7 @@ asynch:
sys·Gosched();
lock(&chanlock);
goto asynch;
}
if(ep != nil)
c->elemalg->copy(c->elemsize, c->senddataq->elem, ep);
......@@ -238,6 +255,13 @@ asynch:
unlock(&chanlock);
if(pres != nil)
*pres = true;
return;
closed:
incerr(c);
if(pres != nil)
*pres = false;
unlock(&chanlock);
}
static void
......@@ -256,6 +280,9 @@ chanrecv(Hchan* c, byte *ep, bool* pres)
if(c->dataqsiz > 0)
goto asynch;
if(c->closed & Wclosed)
goto closed;
sg = dequeue(&c->sendq, c);
if(sg != nil) {
c->elemalg->copy(c->elemsize, ep, sg->elem);
......@@ -285,6 +312,12 @@ chanrecv(Hchan* c, byte *ep, bool* pres)
lock(&chanlock);
sg = g->param;
if(c->closed & Wclosed) {
freesg(c, sg);
goto closed;
}
c->elemalg->copy(c->elemsize, ep, sg->elem);
freesg(c, sg);
unlock(&chanlock);
......@@ -293,7 +326,10 @@ chanrecv(Hchan* c, byte *ep, bool* pres)
return;
asynch:
while(c->qcount <= 0) {
if(c->qcount <= 0) {
if(c->closed & Wclosed)
goto closed;
if(pres != nil) {
unlock(&chanlock);
*pres = false;
......@@ -304,7 +340,9 @@ asynch:
enqueue(&c->recvq, sg);
unlock(&chanlock);
sys·Gosched();
lock(&chanlock);
goto asynch;
}
c->elemalg->copy(c->elemsize, ep, c->recvdataq->elem);
c->recvdataq = c->recvdataq->link;
......@@ -315,10 +353,23 @@ asynch:
freesg(c, sg);
unlock(&chanlock);
ready(gp);
} else
unlock(&chanlock);
if(pres != nil)
*pres = true;
return;
}
unlock(&chanlock);
if(pres != nil)
*pres = true;
return;
closed:
c->elemalg->copy(c->elemsize, ep, nil);
c->closed |= Rclosed;
incerr(c);
if(pres != nil)
*pres = false;
unlock(&chanlock);
}
// chansend1(hchan *chan any, elem any);
......@@ -602,10 +653,14 @@ loop:
if(cas->send) {
if(c->qcount < c->dataqsiz)
goto asyns;
if(c->closed & Wclosed)
goto gots;
goto next1;
}
if(c->qcount > 0)
goto asynr;
if(c->closed & Wclosed)
goto gotr;
goto next1;
}
......@@ -613,11 +668,15 @@ loop:
sg = dequeue(&c->recvq, c);
if(sg != nil)
goto gots;
if(c->closed & Wclosed)
goto gots;
goto next1;
}
sg = dequeue(&c->sendq, c);
if(sg != nil)
goto gotr;
if(c->closed & Wclosed)
goto gotr;
next1:
o += p;
......@@ -764,6 +823,13 @@ gotr:
sys·printint(o);
prints("\n");
}
if(c->closed & Wclosed) {
if(cas->u.elemp != nil)
c->elemalg->copy(c->elemsize, cas->u.elemp, nil);
c->closed |= Rclosed;
incerr(c);
goto retc;
}
if(cas->u.elemp != nil)
c->elemalg->copy(c->elemsize, cas->u.elemp, sg->elem);
gp = sg->g;
......@@ -782,6 +848,10 @@ gots:
sys·printint(o);
prints("\n");
}
if(c->closed & Wclosed) {
incerr(c);
goto retc;
}
c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem);
gp = sg->g;
gp->param = sg;
......@@ -803,35 +873,47 @@ retc:
void
sys·closechan(Hchan *c)
{
if(c == nil)
throw("closechan: channel not allocated");
// if wclosed already set
// work has been done - just return
if(c->closed & Wclosed)
return;
SudoG *sg;
G* gp;
// set wclosed
lock(&chanlock);
incerr(c);
c->closed |= Wclosed;
// release all readers
for(;;) {
sg = dequeue(&c->recvq, c);
if(sg == nil)
break;
gp = sg->g;
gp->param = nil;
freesg(c, sg);
ready(gp);
}
// release all writers
for(;;) {
sg = dequeue(&c->sendq, c);
if(sg == nil)
break;
gp = sg->g;
gp->param = nil;
freesg(c, sg);
ready(gp);
}
unlock(&chanlock);
}
// closedchan(sel *byte) bool;
void
sys·closedchan(Hchan *c, bool closed)
{
if(c == nil)
throw("closedchan: channel not allocated");
// test Rclosed
closed = 0;
// test rclosed
if(c->closed & Rclosed) {
// see if rclosed has been set a lot
if(c->closed & Rmax)
throw("closedchan: ignored");
c->closed += Rincr;
if(c->closed & Rclosed)
closed = 1;
}
FLUSH(&closed);
}
......@@ -892,11 +974,13 @@ allocsg(Hchan *c)
static void
freesg(Hchan *c, SudoG *sg)
{
if(sg->isfree)
throw("chan.freesg: already free");
sg->isfree = 1;
sg->link = c->free;
c->free = sg;
if(sg != nil) {
if(sg->isfree)
throw("chan.freesg: already free");
sg->isfree = 1;
sg->link = c->free;
c->free = sg;
}
}
static uint32
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment