Commit 8486d96a authored by Alex Brainman's avatar Alex Brainman

runtime: change netpoll in preparation for windows implementation

- change runtime_pollWait so it does not return
  closed or timeout if IO is ready - windows must
  know if IO has completed or not even after
  interruption;
- add (*pollDesc).Prepare(mode int) that can be
  used for both read and write, same for Wait;
- introduce runtime_pollWaitCanceled and expose
  it in net as (*pollDesc).WaitCanceled(mode int);

Full windows netpoll changes are
here https://golang.org/cl/8670044/.

R=golang-dev, dvyukov
CC=golang-dev
https://golang.org/cl/10485043
parent 02991bb9
...@@ -16,6 +16,7 @@ func runtime_pollServerInit() ...@@ -16,6 +16,7 @@ func runtime_pollServerInit()
func runtime_pollOpen(fd uintptr) (uintptr, int) func runtime_pollOpen(fd uintptr) (uintptr, int)
func runtime_pollClose(ctx uintptr) func runtime_pollClose(ctx uintptr)
func runtime_pollWait(ctx uintptr, mode int) int func runtime_pollWait(ctx uintptr, mode int) int
func runtime_pollWaitCanceled(ctx uintptr, mode int) int
func runtime_pollReset(ctx uintptr, mode int) int func runtime_pollReset(ctx uintptr, mode int) int
func runtime_pollSetDeadline(ctx uintptr, d int64, mode int) func runtime_pollSetDeadline(ctx uintptr, d int64, mode int)
func runtime_pollUnblock(ctx uintptr) func runtime_pollUnblock(ctx uintptr)
...@@ -56,24 +57,42 @@ func (pd *pollDesc) Evict() bool { ...@@ -56,24 +57,42 @@ func (pd *pollDesc) Evict() bool {
return false return false
} }
func (pd *pollDesc) PrepareRead() error { func (pd *pollDesc) Prepare(mode int) error {
res := runtime_pollReset(pd.runtimeCtx, 'r') res := runtime_pollReset(pd.runtimeCtx, mode)
return convertErr(res) return convertErr(res)
} }
func (pd *pollDesc) PrepareRead() error {
return pd.Prepare('r')
}
func (pd *pollDesc) PrepareWrite() error { func (pd *pollDesc) PrepareWrite() error {
res := runtime_pollReset(pd.runtimeCtx, 'w') return pd.Prepare('w')
}
func (pd *pollDesc) Wait(mode int) error {
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res) return convertErr(res)
} }
func (pd *pollDesc) WaitRead() error { func (pd *pollDesc) WaitRead() error {
res := runtime_pollWait(pd.runtimeCtx, 'r') return pd.Wait('r')
return convertErr(res)
} }
func (pd *pollDesc) WaitWrite() error { func (pd *pollDesc) WaitWrite() error {
res := runtime_pollWait(pd.runtimeCtx, 'w') return pd.Wait('w')
return convertErr(res) }
func (pd *pollDesc) WaitCanceled(mode int) {
runtime_pollWaitCanceled(pd.runtimeCtx, mode)
}
func (pd *pollDesc) WaitCanceledRead() {
pd.WaitCanceled('r')
}
func (pd *pollDesc) WaitCanceledWrite() {
pd.WaitCanceled('w')
} }
func convertErr(res int) error { func convertErr(res int) error {
...@@ -85,6 +104,7 @@ func convertErr(res int) error { ...@@ -85,6 +104,7 @@ func convertErr(res int) error {
case 2: case 2:
return errTimeout return errTimeout
} }
println("unreachable: ", res)
panic("unreachable") panic("unreachable")
} }
......
...@@ -47,8 +47,8 @@ static struct ...@@ -47,8 +47,8 @@ static struct
// seq is incremented when deadlines are changed or descriptor is reused. // seq is incremented when deadlines are changed or descriptor is reused.
} pollcache; } pollcache;
static void netpollblock(PollDesc*, int32); static bool netpollblock(PollDesc*, int32);
static G* netpollunblock(PollDesc*, int32); static G* netpollunblock(PollDesc*, int32, bool);
static void deadline(int64, Eface); static void deadline(int64, Eface);
static void readDeadline(int64, Eface); static void readDeadline(int64, Eface);
static void writeDeadline(int64, Eface); static void writeDeadline(int64, Eface);
...@@ -112,11 +112,21 @@ ret: ...@@ -112,11 +112,21 @@ ret:
func runtime_pollWait(pd *PollDesc, mode int) (err int) { func runtime_pollWait(pd *PollDesc, mode int) (err int) {
runtime·lock(pd); runtime·lock(pd);
err = checkerr(pd, mode); err = checkerr(pd, mode);
if(err) if(err == 0) {
goto ret; if(!netpollblock(pd, mode)) {
netpollblock(pd, mode); err = checkerr(pd, mode);
err = checkerr(pd, mode); if(err == 0)
ret: runtime·throw("runtime_pollWait: unblocked by ioready");
}
}
runtime·unlock(pd);
}
func runtime_pollWaitCanceled(pd *PollDesc, mode int) {
runtime·lock(pd);
// wait for ioready, ignore closing or timeouts.
while(!netpollblock(pd, mode))
;
runtime·unlock(pd); runtime·unlock(pd);
} }
...@@ -179,8 +189,8 @@ func runtime_pollUnblock(pd *PollDesc) { ...@@ -179,8 +189,8 @@ func runtime_pollUnblock(pd *PollDesc) {
runtime·throw("runtime_pollUnblock: already closing"); runtime·throw("runtime_pollUnblock: already closing");
pd->closing = true; pd->closing = true;
pd->seq++; pd->seq++;
rg = netpollunblock(pd, 'r'); rg = netpollunblock(pd, 'r', false);
wg = netpollunblock(pd, 'w'); wg = netpollunblock(pd, 'w', false);
if(pd->rt.fv) { if(pd->rt.fv) {
runtime·deltimer(&pd->rt); runtime·deltimer(&pd->rt);
pd->rt.fv = nil; pd->rt.fv = nil;
...@@ -205,9 +215,9 @@ runtime·netpollready(G **gpp, PollDesc *pd, int32 mode) ...@@ -205,9 +215,9 @@ runtime·netpollready(G **gpp, PollDesc *pd, int32 mode)
rg = wg = nil; rg = wg = nil;
runtime·lock(pd); runtime·lock(pd);
if(mode == 'r' || mode == 'r'+'w') if(mode == 'r' || mode == 'r'+'w')
rg = netpollunblock(pd, 'r'); rg = netpollunblock(pd, 'r', true);
if(mode == 'w' || mode == 'r'+'w') if(mode == 'w' || mode == 'r'+'w')
wg = netpollunblock(pd, 'w'); wg = netpollunblock(pd, 'w', true);
runtime·unlock(pd); runtime·unlock(pd);
if(rg) { if(rg) {
rg->schedlink = *gpp; rg->schedlink = *gpp;
...@@ -229,7 +239,8 @@ checkerr(PollDesc *pd, int32 mode) ...@@ -229,7 +239,8 @@ checkerr(PollDesc *pd, int32 mode)
return 0; return 0;
} }
static void // returns true if IO is ready, or false if timedout or closed
static bool
netpollblock(PollDesc *pd, int32 mode) netpollblock(PollDesc *pd, int32 mode)
{ {
G **gpp; G **gpp;
...@@ -239,17 +250,20 @@ netpollblock(PollDesc *pd, int32 mode) ...@@ -239,17 +250,20 @@ netpollblock(PollDesc *pd, int32 mode)
gpp = &pd->wg; gpp = &pd->wg;
if(*gpp == READY) { if(*gpp == READY) {
*gpp = nil; *gpp = nil;
return; return true;
} }
if(*gpp != nil) if(*gpp != nil)
runtime·throw("epoll: double wait"); runtime·throw("epoll: double wait");
*gpp = g; *gpp = g;
runtime·park(runtime·unlock, &pd->Lock, "IO wait"); runtime·park(runtime·unlock, &pd->Lock, "IO wait");
runtime·lock(pd); runtime·lock(pd);
if(g->param)
return true;
return false;
} }
static G* static G*
netpollunblock(PollDesc *pd, int32 mode) netpollunblock(PollDesc *pd, int32 mode, bool ioready)
{ {
G **gpp, *old; G **gpp, *old;
...@@ -259,10 +273,15 @@ netpollunblock(PollDesc *pd, int32 mode) ...@@ -259,10 +273,15 @@ netpollunblock(PollDesc *pd, int32 mode)
if(*gpp == READY) if(*gpp == READY)
return nil; return nil;
if(*gpp == nil) { if(*gpp == nil) {
*gpp = READY; // Only set READY for ioready. runtime_pollWait
// will check for timeout/cancel before waiting.
if(ioready)
*gpp = READY;
return nil; return nil;
} }
old = *gpp; old = *gpp;
// pass unblock reason onto blocked g
old->param = (void*)ioready;
*gpp = nil; *gpp = nil;
return old; return old;
} }
...@@ -291,14 +310,14 @@ deadlineimpl(int64 now, Eface arg, bool read, bool write) ...@@ -291,14 +310,14 @@ deadlineimpl(int64 now, Eface arg, bool read, bool write)
runtime·throw("deadlineimpl: inconsistent read deadline"); runtime·throw("deadlineimpl: inconsistent read deadline");
pd->rd = -1; pd->rd = -1;
pd->rt.fv = nil; pd->rt.fv = nil;
rg = netpollunblock(pd, 'r'); rg = netpollunblock(pd, 'r', false);
} }
if(write) { if(write) {
if(pd->wd <= 0 || (pd->wt.fv == nil && !read)) if(pd->wd <= 0 || (pd->wt.fv == nil && !read))
runtime·throw("deadlineimpl: inconsistent write deadline"); runtime·throw("deadlineimpl: inconsistent write deadline");
pd->wd = -1; pd->wd = -1;
pd->wt.fv = nil; pd->wt.fv = nil;
wg = netpollunblock(pd, 'w'); wg = netpollunblock(pd, 'w', false);
} }
runtime·unlock(pd); runtime·unlock(pd);
if(rg) if(rg)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment