Commit 42bc7fc8 authored by Ian Lance Taylor's avatar Ian Lance Taylor

net: Don't force epoll/kqueue to wake up in order to add new events.

In conjunction with the non-blocking system call CL, this
gives about an 8% performance improvement on a client/server
test running on my local machine.

R=rsc, iant2
CC=golang-dev
https://golang.org/cl/4272057
parent c0f3b6c8
...@@ -2,8 +2,6 @@ ...@@ -2,8 +2,6 @@
// Use of this source code is governed by a BSD-style // Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
// TODO(rsc): All the prints in this file should go to standard error.
package net package net
import ( import (
...@@ -85,11 +83,12 @@ func (e *InvalidConnError) Timeout() bool { return false } ...@@ -85,11 +83,12 @@ func (e *InvalidConnError) Timeout() bool { return false }
// will the fd be closed. // will the fd be closed.
type pollServer struct { type pollServer struct {
cr, cw chan *netFD // buffered >= 1 cr, cw chan *netFD // buffered >= 1
pr, pw *os.File pr, pw *os.File
pending map[int]*netFD poll *pollster // low-level OS hooks
poll *pollster // low-level OS hooks sync.Mutex // controls pending and deadline
deadline int64 // next deadline (nsec since 1970) pending map[int]*netFD
deadline int64 // next deadline (nsec since 1970)
} }
func (s *pollServer) AddFD(fd *netFD, mode int) { func (s *pollServer) AddFD(fd *netFD, mode int) {
...@@ -103,10 +102,8 @@ func (s *pollServer) AddFD(fd *netFD, mode int) { ...@@ -103,10 +102,8 @@ func (s *pollServer) AddFD(fd *netFD, mode int) {
} }
return return
} }
if err := s.poll.AddFD(intfd, mode, false); err != nil {
panic("pollServer AddFD " + err.String()) s.Lock()
return
}
var t int64 var t int64
key := intfd << 1 key := intfd << 1
...@@ -119,11 +116,27 @@ func (s *pollServer) AddFD(fd *netFD, mode int) { ...@@ -119,11 +116,27 @@ func (s *pollServer) AddFD(fd *netFD, mode int) {
t = fd.wdeadline t = fd.wdeadline
} }
s.pending[key] = fd s.pending[key] = fd
doWakeup := false
if t > 0 && (s.deadline == 0 || t < s.deadline) { if t > 0 && (s.deadline == 0 || t < s.deadline) {
s.deadline = t s.deadline = t
doWakeup = true
}
if err := s.poll.AddFD(intfd, mode, false); err != nil {
panic("pollServer AddFD " + err.String())
}
s.Unlock()
if doWakeup {
s.Wakeup()
} }
} }
var wakeupbuf [1]byte
func (s *pollServer) Wakeup() { s.pw.Write(wakeupbuf[0:]) }
func (s *pollServer) LookupFD(fd int, mode int) *netFD { func (s *pollServer) LookupFD(fd int, mode int) *netFD {
key := fd << 1 key := fd << 1
if mode == 'w' { if mode == 'w' {
...@@ -195,6 +208,8 @@ func (s *pollServer) CheckDeadlines() { ...@@ -195,6 +208,8 @@ func (s *pollServer) CheckDeadlines() {
func (s *pollServer) Run() { func (s *pollServer) Run() {
var scratch [100]byte var scratch [100]byte
s.Lock()
defer s.Unlock()
for { for {
var t = s.deadline var t = s.deadline
if t > 0 { if t > 0 {
...@@ -204,7 +219,7 @@ func (s *pollServer) Run() { ...@@ -204,7 +219,7 @@ func (s *pollServer) Run() {
continue continue
} }
} }
fd, mode, err := s.poll.WaitFD(t) fd, mode, err := s.poll.WaitFD(s, t)
if err != nil { if err != nil {
print("pollServer WaitFD: ", err.String(), "\n") print("pollServer WaitFD: ", err.String(), "\n")
return return
...@@ -219,18 +234,7 @@ func (s *pollServer) Run() { ...@@ -219,18 +234,7 @@ func (s *pollServer) Run() {
// but it's unlikely that there are more than // but it's unlikely that there are more than
// len(scratch) wakeup calls). // len(scratch) wakeup calls).
s.pr.Read(scratch[0:]) s.pr.Read(scratch[0:])
// Read from channels s.CheckDeadlines()
Update:
for {
select {
case fd := <-s.cr:
s.AddFD(fd, 'r')
case fd := <-s.cw:
s.AddFD(fd, 'w')
default:
break Update
}
}
} else { } else {
netfd := s.LookupFD(fd, mode) netfd := s.LookupFD(fd, mode)
if netfd == nil { if netfd == nil {
...@@ -242,19 +246,13 @@ func (s *pollServer) Run() { ...@@ -242,19 +246,13 @@ func (s *pollServer) Run() {
} }
} }
var wakeupbuf [1]byte
func (s *pollServer) Wakeup() { s.pw.Write(wakeupbuf[0:]) }
func (s *pollServer) WaitRead(fd *netFD) { func (s *pollServer) WaitRead(fd *netFD) {
s.cr <- fd s.AddFD(fd, 'r')
s.Wakeup()
<-fd.cr <-fd.cr
} }
func (s *pollServer) WaitWrite(fd *netFD) { func (s *pollServer) WaitWrite(fd *netFD) {
s.cw <- fd s.AddFD(fd, 'w')
s.Wakeup()
<-fd.cw <-fd.cw
} }
......
...@@ -75,7 +75,7 @@ func (p *pollster) DelFD(fd int, mode int) { ...@@ -75,7 +75,7 @@ func (p *pollster) DelFD(fd int, mode int) {
syscall.Kevent(p.kq, events[0:], events[0:], nil) syscall.Kevent(p.kq, events[0:], events[0:], nil)
} }
func (p *pollster) WaitFD(nsec int64) (fd int, mode int, err os.Error) { func (p *pollster) WaitFD(s *pollServer, nsec int64) (fd int, mode int, err os.Error) {
var t *syscall.Timespec var t *syscall.Timespec
for len(p.events) == 0 { for len(p.events) == 0 {
if nsec > 0 { if nsec > 0 {
...@@ -84,7 +84,11 @@ func (p *pollster) WaitFD(nsec int64) (fd int, mode int, err os.Error) { ...@@ -84,7 +84,11 @@ func (p *pollster) WaitFD(nsec int64) (fd int, mode int, err os.Error) {
} }
*t = syscall.NsecToTimespec(nsec) *t = syscall.NsecToTimespec(nsec)
} }
s.Unlock()
nn, e := syscall.Kevent(p.kq, nil, p.eventbuf[0:], t) nn, e := syscall.Kevent(p.kq, nil, p.eventbuf[0:], t)
s.Lock()
if e != 0 { if e != 0 {
if e == syscall.EINTR { if e == syscall.EINTR {
continue continue
......
...@@ -71,7 +71,7 @@ func (p *pollster) DelFD(fd int, mode int) { ...@@ -71,7 +71,7 @@ func (p *pollster) DelFD(fd int, mode int) {
syscall.Kevent(p.kq, events[:], nil, nil) syscall.Kevent(p.kq, events[:], nil, nil)
} }
func (p *pollster) WaitFD(nsec int64) (fd int, mode int, err os.Error) { func (p *pollster) WaitFD(s *pollServer, nsec int64) (fd int, mode int, err os.Error) {
var t *syscall.Timespec var t *syscall.Timespec
for len(p.events) == 0 { for len(p.events) == 0 {
if nsec > 0 { if nsec > 0 {
...@@ -80,7 +80,11 @@ func (p *pollster) WaitFD(nsec int64) (fd int, mode int, err os.Error) { ...@@ -80,7 +80,11 @@ func (p *pollster) WaitFD(nsec int64) (fd int, mode int, err os.Error) {
} }
*t = syscall.NsecToTimespec(nsec) *t = syscall.NsecToTimespec(nsec)
} }
s.Unlock()
nn, e := syscall.Kevent(p.kq, nil, p.eventbuf[:], t) nn, e := syscall.Kevent(p.kq, nil, p.eventbuf[:], t)
s.Lock()
if e != 0 { if e != 0 {
if e == syscall.EINTR { if e == syscall.EINTR {
continue continue
......
...@@ -20,6 +20,7 @@ type pollster struct { ...@@ -20,6 +20,7 @@ type pollster struct {
epfd int epfd int
// Events we're already waiting for // Events we're already waiting for
// Must hold pollServer lock
events map[int]uint32 events map[int]uint32
} }
...@@ -38,6 +39,8 @@ func newpollster() (p *pollster, err os.Error) { ...@@ -38,6 +39,8 @@ func newpollster() (p *pollster, err os.Error) {
} }
func (p *pollster) AddFD(fd int, mode int, repeat bool) os.Error { func (p *pollster) AddFD(fd int, mode int, repeat bool) os.Error {
// pollServer is locked.
var ev syscall.EpollEvent var ev syscall.EpollEvent
var already bool var already bool
ev.Fd = int32(fd) ev.Fd = int32(fd)
...@@ -65,6 +68,8 @@ func (p *pollster) AddFD(fd int, mode int, repeat bool) os.Error { ...@@ -65,6 +68,8 @@ func (p *pollster) AddFD(fd int, mode int, repeat bool) os.Error {
} }
func (p *pollster) StopWaiting(fd int, bits uint) { func (p *pollster) StopWaiting(fd int, bits uint) {
// pollServer is locked.
events, already := p.events[fd] events, already := p.events[fd]
if !already { if !already {
print("Epoll unexpected fd=", fd, "\n") print("Epoll unexpected fd=", fd, "\n")
...@@ -98,6 +103,8 @@ func (p *pollster) StopWaiting(fd int, bits uint) { ...@@ -98,6 +103,8 @@ func (p *pollster) StopWaiting(fd int, bits uint) {
} }
func (p *pollster) DelFD(fd int, mode int) { func (p *pollster) DelFD(fd int, mode int) {
// pollServer is locked.
if mode == 'r' { if mode == 'r' {
p.StopWaiting(fd, readFlags) p.StopWaiting(fd, readFlags)
} else { } else {
...@@ -105,7 +112,9 @@ func (p *pollster) DelFD(fd int, mode int) { ...@@ -105,7 +112,9 @@ func (p *pollster) DelFD(fd int, mode int) {
} }
} }
func (p *pollster) WaitFD(nsec int64) (fd int, mode int, err os.Error) { func (p *pollster) WaitFD(s *pollServer, nsec int64) (fd int, mode int, err os.Error) {
s.Unlock()
// Get an event. // Get an event.
var evarray [1]syscall.EpollEvent var evarray [1]syscall.EpollEvent
ev := &evarray[0] ev := &evarray[0]
...@@ -117,6 +126,9 @@ func (p *pollster) WaitFD(nsec int64) (fd int, mode int, err os.Error) { ...@@ -117,6 +126,9 @@ func (p *pollster) WaitFD(nsec int64) (fd int, mode int, err os.Error) {
for e == syscall.EAGAIN || e == syscall.EINTR { for e == syscall.EAGAIN || e == syscall.EINTR {
n, e = syscall.EpollWait(p.epfd, evarray[0:], msec) n, e = syscall.EpollWait(p.epfd, evarray[0:], msec)
} }
s.Lock()
if e != 0 { if e != 0 {
return -1, 0, os.NewSyscallError("epoll_wait", e) return -1, 0, os.NewSyscallError("epoll_wait", e)
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment