Commit 0bee99ab authored by Dmitriy Vyukov's avatar Dmitriy Vyukov

runtime: integrated network poller for darwin

vs tip:
benchmark                           old ns/op    new ns/op    delta
BenchmarkTCP4Persistent                 67786        33175  -51.06%
BenchmarkTCP4Persistent-2               49085        31227  -36.38%
BenchmarkTCP4PersistentTimeout          69265        32565  -52.98%
BenchmarkTCP4PersistentTimeout-2        49217        32588  -33.79%

vs old scheduler:
benchmark                           old ns/op    new ns/op    delta
BenchmarkTCP4Persistent                 63517        33175  -47.77%
BenchmarkTCP4Persistent-2               54760        31227  -42.97%
BenchmarkTCP4PersistentTimeout          63234        32565  -48.50%
BenchmarkTCP4PersistentTimeout-2        56956        32588  -42.78%

R=golang-dev, bradfitz, devon.odell, mikioh.mikioh, iant, rsc
CC=golang-dev, pabuhr
https://golang.org/cl/7569043
parent a11d7d4e
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Waiting for FDs via kqueue/kevent.
package net
import (
"errors"
"os"
"syscall"
)
type pollster struct {
kq int
eventbuf [10]syscall.Kevent_t
events []syscall.Kevent_t
// An event buffer for AddFD/DelFD.
// Must hold pollServer lock.
kbuf [1]syscall.Kevent_t
}
func newpollster() (p *pollster, err error) {
p = new(pollster)
if p.kq, err = syscall.Kqueue(); err != nil {
return nil, os.NewSyscallError("kqueue", err)
}
syscall.CloseOnExec(p.kq)
p.events = p.eventbuf[0:0]
return p, nil
}
// First return value is whether the pollServer should be woken up.
// This version always returns false.
func (p *pollster) AddFD(fd int, mode int, repeat bool) (bool, error) {
// pollServer is locked.
var kmode int
if mode == 'r' {
kmode = syscall.EVFILT_READ
} else {
kmode = syscall.EVFILT_WRITE
}
ev := &p.kbuf[0]
// EV_ADD - add event to kqueue list
// EV_RECEIPT - generate fake EV_ERROR as result of add,
// rather than waiting for real event
// EV_ONESHOT - delete the event the first time it triggers
flags := syscall.EV_ADD | syscall.EV_RECEIPT
if !repeat {
flags |= syscall.EV_ONESHOT
}
syscall.SetKevent(ev, fd, kmode, flags)
n, err := syscall.Kevent(p.kq, p.kbuf[:], p.kbuf[:], nil)
if err != nil {
return false, os.NewSyscallError("kevent", err)
}
if n != 1 || (ev.Flags&syscall.EV_ERROR) == 0 || int(ev.Ident) != fd || int(ev.Filter) != kmode {
return false, errors.New("kqueue phase error")
}
if ev.Data != 0 {
return false, syscall.Errno(ev.Data)
}
return false, nil
}
// Return value is whether the pollServer should be woken up.
// This version always returns false.
func (p *pollster) DelFD(fd int, mode int) bool {
// pollServer is locked.
var kmode int
if mode == 'r' {
kmode = syscall.EVFILT_READ
} else {
kmode = syscall.EVFILT_WRITE
}
ev := &p.kbuf[0]
// EV_DELETE - delete event from kqueue list
// EV_RECEIPT - generate fake EV_ERROR as result of add,
// rather than waiting for real event
syscall.SetKevent(ev, fd, kmode, syscall.EV_DELETE|syscall.EV_RECEIPT)
syscall.Kevent(p.kq, p.kbuf[0:], p.kbuf[0:], nil)
return false
}
func (p *pollster) WaitFD(s *pollServer, nsec int64) (fd int, mode int, err error) {
var t *syscall.Timespec
for len(p.events) == 0 {
if nsec > 0 {
if t == nil {
t = new(syscall.Timespec)
}
*t = syscall.NsecToTimespec(nsec)
}
s.Unlock()
n, err := syscall.Kevent(p.kq, nil, p.eventbuf[:], t)
s.Lock()
if err != nil {
if err == syscall.EINTR {
continue
}
return -1, 0, os.NewSyscallError("kevent", nil)
}
if n == 0 {
return -1, 0, nil
}
p.events = p.eventbuf[:n]
}
ev := &p.events[0]
p.events = p.events[1:]
fd = int(ev.Ident)
if ev.Filter == syscall.EVFILT_READ {
mode = 'r'
} else {
mode = 'w'
}
return fd, mode, nil
}
func (p *pollster) Close() error { return os.NewSyscallError("close", syscall.Close(p.kq)) }
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build darwin
package net
import (
"sync"
"syscall"
"time"
)
func runtime_pollServerInit()
func runtime_pollOpen(fd int) (uintptr, int)
func runtime_pollClose(ctx uintptr)
func runtime_pollWait(ctx uintptr, mode int) int
func runtime_pollReset(ctx uintptr, mode int) int
func runtime_pollSetDeadline(ctx uintptr, d int64, mode int)
func runtime_pollUnblock(ctx uintptr)
var canCancelIO = true // used for testing current package
type pollDesc struct {
runtimeCtx uintptr
}
var serverInit sync.Once
func sysInit() {
}
func (pd *pollDesc) Init(fd *netFD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(fd.sysfd)
if errno != 0 {
return syscall.Errno(errno)
}
pd.runtimeCtx = ctx
return nil
}
func (pd *pollDesc) Close() {
runtime_pollClose(pd.runtimeCtx)
}
func (pd *pollDesc) Lock() {
}
func (pd *pollDesc) Unlock() {
}
func (pd *pollDesc) Wakeup() {
}
// Evict evicts fd from the pending list, unblocking any I/O running on fd.
// Return value is whether the pollServer should be woken up.
func (pd *pollDesc) Evict() bool {
runtime_pollUnblock(pd.runtimeCtx)
return false
}
func (pd *pollDesc) PrepareRead() error {
res := runtime_pollReset(pd.runtimeCtx, 'r')
return convertErr(res)
}
func (pd *pollDesc) PrepareWrite() error {
res := runtime_pollReset(pd.runtimeCtx, 'w')
return convertErr(res)
}
func (pd *pollDesc) WaitRead() error {
res := runtime_pollWait(pd.runtimeCtx, 'r')
return convertErr(res)
}
func (pd *pollDesc) WaitWrite() error {
res := runtime_pollWait(pd.runtimeCtx, 'w')
return convertErr(res)
}
func convertErr(res int) error {
switch res {
case 0:
return nil
case 1:
return errClosing
case 2:
return errTimeout
}
panic("unreachable")
}
func setReadDeadline(fd *netFD, t time.Time) error {
return setDeadlineImpl(fd, t, 'r')
}
func setWriteDeadline(fd *netFD, t time.Time) error {
return setDeadlineImpl(fd, t, 'w')
}
func setDeadline(fd *netFD, t time.Time) error {
return setDeadlineImpl(fd, t, 'r'+'w')
}
func setDeadlineImpl(fd *netFD, t time.Time, mode int) error {
d := t.UnixNano()
if t.IsZero() {
d = 0
}
if err := fd.incref(false); err != nil {
return err
}
runtime_pollSetDeadline(fd.pd.runtimeCtx, d, mode)
fd.decref()
return nil
}
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style // Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
// +build darwin freebsd linux netbsd openbsd // +build freebsd linux netbsd openbsd
package net package net
......
...@@ -7,8 +7,8 @@ ...@@ -7,8 +7,8 @@
/* /*
Input to cgo. Input to cgo.
GOARCH=amd64 cgo -cdefs defs_darwin.go >defs_darwin_amd64.h GOARCH=amd64 go tool cgo -cdefs defs_darwin.go >defs_darwin_amd64.h
GOARCH=386 cgo -cdefs defs_darwin.go >defs_darwin_386.h GOARCH=386 go tool cgo -cdefs defs_darwin.go >defs_darwin_386.h
*/ */
package runtime package runtime
...@@ -19,12 +19,17 @@ package runtime ...@@ -19,12 +19,17 @@ package runtime
#include <mach/message.h> #include <mach/message.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/time.h> #include <sys/time.h>
#include <errno.h>
#include <signal.h> #include <signal.h>
#include <sys/event.h>
#include <sys/mman.h> #include <sys/mman.h>
*/ */
import "C" import "C"
const ( const (
EINTR = C.EINTR
EFAULT = C.EFAULT
PROT_NONE = C.PROT_NONE PROT_NONE = C.PROT_NONE
PROT_READ = C.PROT_READ PROT_READ = C.PROT_READ
PROT_WRITE = C.PROT_WRITE PROT_WRITE = C.PROT_WRITE
...@@ -128,6 +133,14 @@ const ( ...@@ -128,6 +133,14 @@ const (
ITIMER_REAL = C.ITIMER_REAL ITIMER_REAL = C.ITIMER_REAL
ITIMER_VIRTUAL = C.ITIMER_VIRTUAL ITIMER_VIRTUAL = C.ITIMER_VIRTUAL
ITIMER_PROF = C.ITIMER_PROF ITIMER_PROF = C.ITIMER_PROF
EV_ADD = C.EV_ADD
EV_DELETE = C.EV_DELETE
EV_CLEAR = C.EV_CLEAR
EV_RECEIPT = C.EV_RECEIPT
EV_ERROR = C.EV_ERROR
EVFILT_READ = C.EVFILT_READ
EVFILT_WRITE = C.EVFILT_WRITE
) )
type MachBody C.mach_msg_body_t type MachBody C.mach_msg_body_t
...@@ -144,6 +157,7 @@ type Sigval C.union_sigval ...@@ -144,6 +157,7 @@ type Sigval C.union_sigval
type Siginfo C.siginfo_t type Siginfo C.siginfo_t
type Timeval C.struct_timeval type Timeval C.struct_timeval
type Itimerval C.struct_itimerval type Itimerval C.struct_itimerval
type Timespec C.struct_timespec
type FPControl C.struct_fp_control type FPControl C.struct_fp_control
type FPStatus C.struct_fp_status type FPStatus C.struct_fp_status
...@@ -161,3 +175,5 @@ type ExceptionState32 C.struct_i386_exception_state ...@@ -161,3 +175,5 @@ type ExceptionState32 C.struct_i386_exception_state
type Mcontext32 C.struct_mcontext32 type Mcontext32 C.struct_mcontext32
type Ucontext C.struct_ucontext type Ucontext C.struct_ucontext
type Kevent C.struct_kevent
...@@ -3,6 +3,9 @@ ...@@ -3,6 +3,9 @@
enum { enum {
EINTR = 0x4,
EFAULT = 0xe,
PROT_NONE = 0x0, PROT_NONE = 0x0,
PROT_READ = 0x1, PROT_READ = 0x1,
PROT_WRITE = 0x2, PROT_WRITE = 0x2,
...@@ -106,6 +109,14 @@ enum { ...@@ -106,6 +109,14 @@ enum {
ITIMER_REAL = 0x0, ITIMER_REAL = 0x0,
ITIMER_VIRTUAL = 0x1, ITIMER_VIRTUAL = 0x1,
ITIMER_PROF = 0x2, ITIMER_PROF = 0x2,
EV_ADD = 0x1,
EV_DELETE = 0x2,
EV_CLEAR = 0x20,
EV_RECEIPT = 0x40,
EV_ERROR = 0x4000,
EVFILT_READ = -0x1,
EVFILT_WRITE = -0x2,
}; };
typedef struct MachBody MachBody; typedef struct MachBody MachBody;
...@@ -117,6 +128,7 @@ typedef struct Sigaction Sigaction; ...@@ -117,6 +128,7 @@ typedef struct Sigaction Sigaction;
typedef struct Siginfo Siginfo; typedef struct Siginfo Siginfo;
typedef struct Timeval Timeval; typedef struct Timeval Timeval;
typedef struct Itimerval Itimerval; typedef struct Itimerval Itimerval;
typedef struct Timespec Timespec;
typedef struct FPControl FPControl; typedef struct FPControl FPControl;
typedef struct FPStatus FPStatus; typedef struct FPStatus FPStatus;
typedef struct RegMMST RegMMST; typedef struct RegMMST RegMMST;
...@@ -130,6 +142,7 @@ typedef struct FloatState32 FloatState32; ...@@ -130,6 +142,7 @@ typedef struct FloatState32 FloatState32;
typedef struct ExceptionState32 ExceptionState32; typedef struct ExceptionState32 ExceptionState32;
typedef struct Mcontext32 Mcontext32; typedef struct Mcontext32 Mcontext32;
typedef struct Ucontext Ucontext; typedef struct Ucontext Ucontext;
typedef struct Kevent Kevent;
#pragma pack on #pragma pack on
...@@ -170,7 +183,7 @@ struct StackT { ...@@ -170,7 +183,7 @@ struct StackT {
typedef byte Sighandler[4]; typedef byte Sighandler[4];
struct Sigaction { struct Sigaction {
Sighandler __sigaction_u; byte __sigaction_u[4];
void *sa_tramp; void *sa_tramp;
uint32 sa_mask; uint32 sa_mask;
int32 sa_flags; int32 sa_flags;
...@@ -185,7 +198,7 @@ struct Siginfo { ...@@ -185,7 +198,7 @@ struct Siginfo {
uint32 si_uid; uint32 si_uid;
int32 si_status; int32 si_status;
byte *si_addr; byte *si_addr;
Sigval si_value; byte si_value[4];
int32 si_band; int32 si_band;
uint32 __pad[7]; uint32 __pad[7];
}; };
...@@ -197,6 +210,10 @@ struct Itimerval { ...@@ -197,6 +210,10 @@ struct Itimerval {
Timeval it_interval; Timeval it_interval;
Timeval it_value; Timeval it_value;
}; };
struct Timespec {
int32 tv_sec;
int32 tv_nsec;
};
struct FPControl { struct FPControl {
byte Pad_cgo_0[2]; byte Pad_cgo_0[2];
...@@ -362,5 +379,14 @@ struct Ucontext { ...@@ -362,5 +379,14 @@ struct Ucontext {
Mcontext32 *uc_mcontext; Mcontext32 *uc_mcontext;
}; };
struct Kevent {
uint32 ident;
int16 filter;
uint16 flags;
uint32 fflags;
int32 data;
byte *udata;
};
#pragma pack off #pragma pack off
...@@ -3,6 +3,9 @@ ...@@ -3,6 +3,9 @@
enum { enum {
EINTR = 0x4,
EFAULT = 0xe,
PROT_NONE = 0x0, PROT_NONE = 0x0,
PROT_READ = 0x1, PROT_READ = 0x1,
PROT_WRITE = 0x2, PROT_WRITE = 0x2,
...@@ -106,6 +109,14 @@ enum { ...@@ -106,6 +109,14 @@ enum {
ITIMER_REAL = 0x0, ITIMER_REAL = 0x0,
ITIMER_VIRTUAL = 0x1, ITIMER_VIRTUAL = 0x1,
ITIMER_PROF = 0x2, ITIMER_PROF = 0x2,
EV_ADD = 0x1,
EV_DELETE = 0x2,
EV_CLEAR = 0x20,
EV_RECEIPT = 0x40,
EV_ERROR = 0x4000,
EVFILT_READ = -0x1,
EVFILT_WRITE = -0x2,
}; };
typedef struct MachBody MachBody; typedef struct MachBody MachBody;
...@@ -117,6 +128,7 @@ typedef struct Sigaction Sigaction; ...@@ -117,6 +128,7 @@ typedef struct Sigaction Sigaction;
typedef struct Siginfo Siginfo; typedef struct Siginfo Siginfo;
typedef struct Timeval Timeval; typedef struct Timeval Timeval;
typedef struct Itimerval Itimerval; typedef struct Itimerval Itimerval;
typedef struct Timespec Timespec;
typedef struct FPControl FPControl; typedef struct FPControl FPControl;
typedef struct FPStatus FPStatus; typedef struct FPStatus FPStatus;
typedef struct RegMMST RegMMST; typedef struct RegMMST RegMMST;
...@@ -130,6 +142,7 @@ typedef struct FloatState32 FloatState32; ...@@ -130,6 +142,7 @@ typedef struct FloatState32 FloatState32;
typedef struct ExceptionState32 ExceptionState32; typedef struct ExceptionState32 ExceptionState32;
typedef struct Mcontext32 Mcontext32; typedef struct Mcontext32 Mcontext32;
typedef struct Ucontext Ucontext; typedef struct Ucontext Ucontext;
typedef struct Kevent Kevent;
#pragma pack on #pragma pack on
...@@ -171,7 +184,7 @@ struct StackT { ...@@ -171,7 +184,7 @@ struct StackT {
typedef byte Sighandler[8]; typedef byte Sighandler[8];
struct Sigaction { struct Sigaction {
Sighandler __sigaction_u; byte __sigaction_u[8];
void *sa_tramp; void *sa_tramp;
uint32 sa_mask; uint32 sa_mask;
int32 sa_flags; int32 sa_flags;
...@@ -186,7 +199,7 @@ struct Siginfo { ...@@ -186,7 +199,7 @@ struct Siginfo {
uint32 si_uid; uint32 si_uid;
int32 si_status; int32 si_status;
byte *si_addr; byte *si_addr;
Sigval si_value; byte si_value[8];
int64 si_band; int64 si_band;
uint64 __pad[7]; uint64 __pad[7];
}; };
...@@ -199,6 +212,10 @@ struct Itimerval { ...@@ -199,6 +212,10 @@ struct Itimerval {
Timeval it_interval; Timeval it_interval;
Timeval it_value; Timeval it_value;
}; };
struct Timespec {
int64 tv_sec;
int64 tv_nsec;
};
struct FPControl { struct FPControl {
byte Pad_cgo_0[2]; byte Pad_cgo_0[2];
...@@ -365,5 +382,14 @@ struct Ucontext { ...@@ -365,5 +382,14 @@ struct Ucontext {
Mcontext64 *uc_mcontext; Mcontext64 *uc_mcontext;
}; };
struct Kevent {
uint64 ident;
int16 filter;
uint16 flags;
uint32 fflags;
int64 data;
byte *udata;
};
#pragma pack off #pragma pack off
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package net
#include "runtime.h"
#include "defs_GOOS_GOARCH.h"
#include "arch_GOARCH.h"
#include "malloc.h"
// Integrated network poller (platform-independent part).
// A particular implementation (epoll/kqueue) must define the following functions:
// void runtime·netpollinit(void); // to initialize the poller
// int32 runtime·netpollopen(int32 fd, PollDesc *pd); // to arm edge-triggered notifications
// and associate fd with pd.
// An implementation must call the following function to denote that the pd is ready.
// void runtime·netpollready(G **gpp, PollDesc *pd, int32 mode);
#define READY ((G*)1)
struct PollDesc
{
PollDesc* link; // in pollcache, protected by pollcache.Lock
Lock; // protectes the following fields
bool closing;
uintptr seq; // protects from stale timers and ready notifications
G* rg; // G waiting for read or READY (binary semaphore)
Timer rt; // read deadline timer (set if rt.fv != nil)
int64 rd; // read deadline
G* wg; // the same for writes
Timer wt;
int64 wd;
};
static struct
{
Lock;
PollDesc* first;
// PollDesc objects must be type-stable,
// because we can get ready notification from epoll/kqueue
// after the descriptor is closed/reused.
// Stale notifications are detected using seq variable,
// seq is incremented when deadlines are changed or descriptor is reused.
} pollcache;
static void netpollblock(PollDesc*, int32);
static G* netpollunblock(PollDesc*, int32);
static void deadline(int64, Eface);
static void readDeadline(int64, Eface);
static void writeDeadline(int64, Eface);
static PollDesc* allocPollDesc(void);
static intgo checkerr(PollDesc *pd, int32 mode);
static FuncVal deadlineFn = {(void(*)(void))deadline};
static FuncVal readDeadlineFn = {(void(*)(void))readDeadline};
static FuncVal writeDeadlineFn = {(void(*)(void))writeDeadline};
func runtime_pollServerInit() {
runtime·netpollinit();
}
func runtime_pollOpen(fd int) (pd *PollDesc, errno int) {
pd = allocPollDesc();
runtime·lock(pd);
if(pd->wg != nil && pd->wg != READY)
runtime·throw("runtime_pollOpen: blocked write on free descriptor");
if(pd->rg != nil && pd->rg != READY)
runtime·throw("runtime_pollOpen: blocked read on free descriptor");
pd->closing = false;
pd->seq++;
pd->rg = nil;
pd->rd = 0;
pd->wg = nil;
pd->wd = 0;
runtime·unlock(pd);
errno = runtime·netpollopen(fd, pd);
}
func runtime_pollClose(pd *PollDesc) {
if(!pd->closing)
runtime·throw("runtime_pollClose: close w/o unblock");
if(pd->wg != nil && pd->wg != READY)
runtime·throw("runtime_pollClose: blocked write on closing descriptor");
if(pd->rg != nil && pd->rg != READY)
runtime·throw("runtime_pollClose: blocked read on closing descriptor");
runtime·lock(&pollcache);
pd->link = pollcache.first;
pollcache.first = pd;
runtime·unlock(&pollcache);
}
func runtime_pollReset(pd *PollDesc, mode int) (err int) {
runtime·lock(pd);
err = checkerr(pd, mode);
if(err)
goto ret;
if(mode == 'r')
pd->rg = nil;
else if(mode == 'w')
pd->wg = nil;
ret:
runtime·unlock(pd);
}
func runtime_pollWait(pd *PollDesc, mode int) (err int) {
runtime·lock(pd);
err = checkerr(pd, mode);
if(err)
goto ret;
netpollblock(pd, mode);
err = checkerr(pd, mode);
ret:
runtime·unlock(pd);
}
func runtime_pollSetDeadline(pd *PollDesc, d int64, mode int) {
runtime·lock(pd);
if(pd->closing)
goto ret;
pd->seq++; // invalidate current timers
// Reset current timers.
if(pd->rt.fv) {
runtime·deltimer(&pd->rt);
pd->rt.fv = nil;
}
if(pd->wt.fv) {
runtime·deltimer(&pd->wt);
pd->wt.fv = nil;
}
// Setup new timers.
if(d != 0 && d <= runtime·nanotime()) {
d = -1;
}
if(mode == 'r' || mode == 'r'+'w')
pd->rd = d;
if(mode == 'w' || mode == 'r'+'w')
pd->wd = d;
if(pd->rd > 0 && pd->rd == pd->wd) {
pd->rt.fv = &deadlineFn;
pd->rt.when = pd->rd;
// Copy current seq into the timer arg.
// Timer func will check the seq against current descriptor seq,
// if they differ the descriptor was reused or timers were reset.
pd->rt.arg.type = (Type*)pd->seq;
pd->rt.arg.data = pd;
runtime·addtimer(&pd->rt);
} else {
if(pd->rd > 0) {
pd->rt.fv = &readDeadlineFn;
pd->rt.when = pd->rd;
pd->rt.arg.type = (Type*)pd->seq;
pd->rt.arg.data = pd;
runtime·addtimer(&pd->rt);
}
if(pd->wd > 0) {
pd->wt.fv = &writeDeadlineFn;
pd->wt.when = pd->wd;
pd->wt.arg.type = (Type*)pd->seq;
pd->wt.arg.data = pd;
runtime·addtimer(&pd->wt);
}
}
ret:
runtime·unlock(pd);
}
func runtime_pollUnblock(pd *PollDesc) {
G *rg, *wg;
runtime·lock(pd);
if(pd->closing)
runtime·throw("runtime_pollUnblock: already closing");
pd->closing = true;
pd->seq++;
rg = netpollunblock(pd, 'r');
wg = netpollunblock(pd, 'w');
if(pd->rt.fv) {
runtime·deltimer(&pd->rt);
pd->rt.fv = nil;
}
if(pd->wt.fv) {
runtime·deltimer(&pd->wt);
pd->wt.fv = nil;
}
runtime·unlock(pd);
if(rg)
runtime·ready(rg);
if(wg)
runtime·ready(wg);
}
// make pd ready, newly runnable goroutines (if any) are enqueued info gpp list
void
runtime·netpollready(G **gpp, PollDesc *pd, int32 mode)
{
G *rg, *wg;
rg = wg = nil;
runtime·lock(pd);
if(mode == 'r' || mode == 'r'+'w')
rg = netpollunblock(pd, 'r');
if(mode == 'w' || mode == 'r'+'w')
wg = netpollunblock(pd, 'w');
runtime·unlock(pd);
if(rg) {
rg->schedlink = *gpp;
*gpp = rg;
}
if(wg) {
wg->schedlink = *gpp;
*gpp = wg;
}
}
static intgo
checkerr(PollDesc *pd, int32 mode)
{
if(pd->closing)
return 1; // errClosing
if((mode == 'r' && pd->rd < 0) || (mode == 'w' && pd->wd < 0))
return 2; // errTimeout
return 0;
}
static void
netpollblock(PollDesc *pd, int32 mode)
{
G **gpp;
gpp = &pd->rg;
if(mode == 'w')
gpp = &pd->wg;
if(*gpp == READY) {
*gpp = nil;
return;
}
if(*gpp != nil)
runtime·throw("epoll: double wait");
*gpp = g;
runtime·park(runtime·unlock, &pd->Lock, "IO wait");
runtime·lock(pd);
}
static G*
netpollunblock(PollDesc *pd, int32 mode)
{
G **gpp, *old;
gpp = &pd->rg;
if(mode == 'w')
gpp = &pd->wg;
if(*gpp == READY)
return nil;
if(*gpp == nil) {
*gpp = READY;
return nil;
}
old = *gpp;
*gpp = nil;
return old;
}
static void
deadlineimpl(int64 now, Eface arg, bool read, bool write)
{
PollDesc *pd;
uint32 seq;
G *rg, *wg;
USED(now);
pd = (PollDesc*)arg.data;
// This is the seq when the timer was set.
// If it's stale, ignore the timer event.
seq = (uintptr)arg.type;
rg = wg = nil;
runtime·lock(pd);
if(seq != pd->seq) {
// The descriptor was reused or timers were reset.
runtime·unlock(pd);
return;
}
if(read) {
if(pd->rd <= 0 || pd->rt.fv == nil)
runtime·throw("deadlineimpl: inconsistent read deadline");
pd->rd = -1;
pd->rt.fv = nil;
rg = netpollunblock(pd, 'r');
}
if(write) {
if(pd->wd <= 0 || (pd->wt.fv == nil && !read))
runtime·throw("deadlineimpl: inconsistent write deadline");
pd->wd = -1;
pd->wt.fv = nil;
wg = netpollunblock(pd, 'w');
}
runtime·unlock(pd);
if(rg)
runtime·ready(rg);
if(wg)
runtime·ready(wg);
}
static void
deadline(int64 now, Eface arg)
{
deadlineimpl(now, arg, true, true);
}
static void
readDeadline(int64 now, Eface arg)
{
deadlineimpl(now, arg, true, false);
}
static void
writeDeadline(int64 now, Eface arg)
{
deadlineimpl(now, arg, false, true);
}
static PollDesc*
allocPollDesc(void)
{
PollDesc *pd;
uint32 i, n;
runtime·lock(&pollcache);
if(pollcache.first == nil) {
n = PageSize/sizeof(*pd);
if(n == 0)
n = 1;
// Must be in non-GC memory because can be referenced
// only from epoll/kqueue internals.
pd = runtime·SysAlloc(n*sizeof(*pd));
for(i = 0; i < n; i++) {
pd[i].link = pollcache.first;
pollcache.first = &pd[i];
}
}
pd = pollcache.first;
pollcache.first = pd->link;
runtime·unlock(&pollcache);
return pd;
}
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build darwin
#include "runtime.h"
#include "defs_GOOS_GOARCH.h"
// Integrated network poller (kqueue-based implementation).
int32 runtime·kqueue(void);
int32 runtime·kevent(int32, Kevent*, int32, Kevent*, int32, Timespec*);
void runtime·closeonexec(int32);
static int32 kq = -1;
void
runtime·netpollinit(void)
{
kq = runtime·kqueue();
if(kq < 0) {
runtime·printf("netpollinit: kqueue failed with %d\n", -kq);
runtime·throw("netpollinit: kqueue failed");
}
runtime·closeonexec(kq);
}
int32
runtime·netpollopen(int32 fd, PollDesc *pd)
{
Kevent ev[2];
int32 n;
// Arm both EVFILT_READ and EVFILT_WRITE in edge-triggered mode (EV_CLEAR)
// for the whole fd lifetime. The notifications are automatically unregistered
// when fd is closed.
ev[0].ident = fd;
ev[0].filter = EVFILT_READ;
ev[0].flags = EV_ADD|EV_RECEIPT|EV_CLEAR;
ev[0].fflags = 0;
ev[0].data = 0;
ev[0].udata = (byte*)pd;
ev[1] = ev[0];
ev[1].filter = EVFILT_WRITE;
n = runtime·kevent(kq, ev, 2, ev, 2, nil);
if(n < 0)
return -n;
if(n != 2 ||
(ev[0].flags&EV_ERROR) == 0 || ev[0].ident != fd || ev[0].filter != EVFILT_READ ||
(ev[1].flags&EV_ERROR) == 0 || ev[1].ident != fd || ev[1].filter != EVFILT_WRITE)
return EFAULT; // just to mark out from other errors
if(ev[0].data != 0)
return ev[0].data;
if(ev[1].data != 0)
return ev[1].data;
return 0;
}
// Polls for ready network connections.
// Returns list of goroutines that become runnable.
G*
runtime·netpoll(bool block)
{
Kevent events[64], *ev;
Timespec ts, *tp;
int32 n, i;
G *gp;
if(kq == -1)
return nil;
tp = nil;
if(!block) {
ts.tv_sec = 0;
ts.tv_nsec = 0;
tp = &ts;
}
gp = nil;
retry:
n = runtime·kevent(kq, nil, 0, events, nelem(events), tp);
if(n < 0) {
if(n != -EINTR)
runtime·printf("kqueue failed with %d\n", -n);
goto retry;
}
for(i = 0; i < n; i++) {
ev = &events[i];
if(ev->filter == EVFILT_READ)
runtime·netpollready(&gp, (PollDesc*)ev->udata, 'r');
if(ev->filter == EVFILT_WRITE)
runtime·netpollready(&gp, (PollDesc*)ev->udata, 'w');
}
if(block && gp == nil)
goto retry;
return gp;
}
...@@ -2,6 +2,8 @@ ...@@ -2,6 +2,8 @@
// Use of this source code is governed by a BSD-style // Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
// +build freebsd linux netbsd openbsd plan9 windows
#include "runtime.h" #include "runtime.h"
// Polls for ready network connections. // Polls for ready network connections.
......
...@@ -85,6 +85,7 @@ typedef struct LFNode LFNode; ...@@ -85,6 +85,7 @@ typedef struct LFNode LFNode;
typedef struct ParFor ParFor; typedef struct ParFor ParFor;
typedef struct ParForThread ParForThread; typedef struct ParForThread ParForThread;
typedef struct CgoMal CgoMal; typedef struct CgoMal CgoMal;
typedef struct PollDesc PollDesc;
/* /*
* Per-CPU declaration. * Per-CPU declaration.
...@@ -786,6 +787,9 @@ extern int64 runtime·blockprofilerate; ...@@ -786,6 +787,9 @@ extern int64 runtime·blockprofilerate;
void runtime·addtimer(Timer*); void runtime·addtimer(Timer*);
bool runtime·deltimer(Timer*); bool runtime·deltimer(Timer*);
G* runtime·netpoll(bool); G* runtime·netpoll(bool);
void runtime·netpollinit(void);
int32 runtime·netpollopen(int32, PollDesc*);
void runtime·netpollready(G**, PollDesc*, int32);
#pragma varargck argpos runtime·printf 1 #pragma varargck argpos runtime·printf 1
#pragma varargck type "d" int32 #pragma varargck type "d" int32
......
...@@ -488,3 +488,32 @@ TEXT runtime·sysctl(SB),7,$0 ...@@ -488,3 +488,32 @@ TEXT runtime·sysctl(SB),7,$0
RET RET
MOVL $0, AX MOVL $0, AX
RET RET
// int32 runtime·kqueue(void);
TEXT runtime·kqueue(SB),7,$0
MOVL $362, AX
INT $0x80
JAE 2(PC)
NEGL AX
RET
// int32 runtime·kevent(int kq, Kevent *changelist, int nchanges, Kevent *eventlist, int nevents, Timespec *timeout);
TEXT runtime·kevent(SB),7,$0
MOVL $363, AX
INT $0x80
JAE 2(PC)
NEGL AX
RET
// int32 runtime·closeonexec(int32 fd);
TEXT runtime·closeonexec(SB),7,$32
MOVL $92, AX // fcntl
// 0(SP) is where the caller PC would be; kernel skips it
MOVL fd+0(FP), BX
MOVL BX, 4(SP) // fd
MOVL $2, 8(SP) // F_SETFD
MOVL $1, 12(SP) // FD_CLOEXEC
INT $0x80
JAE 2(PC)
NEGL AX
RET
...@@ -439,3 +439,37 @@ TEXT runtime·sysctl(SB),7,$0 ...@@ -439,3 +439,37 @@ TEXT runtime·sysctl(SB),7,$0
RET RET
MOVL $0, AX MOVL $0, AX
RET RET
// int32 runtime·kqueue(void);
TEXT runtime·kqueue(SB),7,$0
MOVQ $0, DI
MOVQ $0, SI
MOVQ $0, DX
MOVL $(0x2000000+362), AX
SYSCALL
JCC 2(PC)
NEGL AX
RET
// int32 runtime·kevent(int kq, Kevent *changelist, int nchanges, Kevent *eventlist, int nevents, Timespec *timeout);
TEXT runtime·kevent(SB),7,$0
MOVL 8(SP), DI
MOVQ 16(SP), SI
MOVL 24(SP), DX
MOVQ 32(SP), R10
MOVL 40(SP), R8
MOVQ 48(SP), R9
MOVL $(0x2000000+363), AX
SYSCALL
JCC 2(PC)
NEGL AX
RET
// void runtime·closeonexec(int32 fd);
TEXT runtime·closeonexec(SB),7,$0
MOVL 8(SP), DI // fd
MOVQ $2, SI // F_SETFD
MOVQ $1, DX // FD_CLOEXEC
MOVL $(0x2000000+92), AX // fcntl
SYSCALL
RET
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment