Commit 03d641b9 authored by Russ Cox's avatar Russ Cox

net: break up >1GB reads and writes on stream connections

Also fix behavior of Buffers.WriteTo when writev returns an error.

Fixes #16266.

Change-Id: Idc9503408ce2cb460663768fab86035cbab11aef
Reviewed-on: https://go-review.googlesource.com/31584
Run-TryBot: Russ Cox <rsc@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: 's avatarBrad Fitzpatrick <bradfitz@golang.org>
Reviewed-by: 's avatarIan Lance Taylor <iant@golang.org>
parent 21a1fe4d
...@@ -22,6 +22,7 @@ type netFD struct { ...@@ -22,6 +22,7 @@ type netFD struct {
dir string dir string
listen, ctl, data *os.File listen, ctl, data *os.File
laddr, raddr Addr laddr, raddr Addr
isStream bool
} }
var ( var (
......
...@@ -24,6 +24,7 @@ type netFD struct { ...@@ -24,6 +24,7 @@ type netFD struct {
sysfd int sysfd int
family int family int
sotype int sotype int
isStream bool
isConnected bool isConnected bool
net string net string
laddr Addr laddr Addr
...@@ -40,7 +41,7 @@ func sysInit() { ...@@ -40,7 +41,7 @@ func sysInit() {
} }
func newFD(sysfd, family, sotype int, net string) (*netFD, error) { func newFD(sysfd, family, sotype int, net string) (*netFD, error) {
return &netFD{sysfd: sysfd, family: family, sotype: sotype, net: net}, nil return &netFD{sysfd: sysfd, family: family, sotype: sotype, net: net, isStream: sotype == syscall.SOCK_STREAM}, nil
} }
func (fd *netFD) init() error { func (fd *netFD) init() error {
...@@ -238,6 +239,9 @@ func (fd *netFD) Read(p []byte) (n int, err error) { ...@@ -238,6 +239,9 @@ func (fd *netFD) Read(p []byte) (n int, err error) {
if err := fd.pd.prepareRead(); err != nil { if err := fd.pd.prepareRead(); err != nil {
return 0, err return 0, err
} }
if fd.isStream && len(p) > 1<<30 {
p = p[:1<<30]
}
for { for {
n, err = syscall.Read(fd.sysfd, p) n, err = syscall.Read(fd.sysfd, p)
if err != nil { if err != nil {
...@@ -321,7 +325,11 @@ func (fd *netFD) Write(p []byte) (nn int, err error) { ...@@ -321,7 +325,11 @@ func (fd *netFD) Write(p []byte) (nn int, err error) {
} }
for { for {
var n int var n int
n, err = syscall.Write(fd.sysfd, p[nn:]) max := len(p)
if fd.isStream && max-nn > 1<<30 {
max = nn + 1<<30
}
n, err = syscall.Write(fd.sysfd, p[nn:max])
if n > 0 { if n > 0 {
nn += n nn += n
} }
......
...@@ -239,6 +239,7 @@ type netFD struct { ...@@ -239,6 +239,7 @@ type netFD struct {
sysfd syscall.Handle sysfd syscall.Handle
family int family int
sotype int sotype int
isStream bool
isConnected bool isConnected bool
skipSyncNotif bool skipSyncNotif bool
net string net string
...@@ -257,7 +258,7 @@ func newFD(sysfd syscall.Handle, family, sotype int, net string) (*netFD, error) ...@@ -257,7 +258,7 @@ func newFD(sysfd syscall.Handle, family, sotype int, net string) (*netFD, error)
return nil, initErr return nil, initErr
} }
onceStartServer.Do(startServer) onceStartServer.Do(startServer)
return &netFD{sysfd: sysfd, family: family, sotype: sotype, net: net}, nil return &netFD{sysfd: sysfd, family: family, sotype: sotype, net: net, isStream: sotype == syscall.SOCK_STREAM}, nil
} }
func (fd *netFD) init() error { func (fd *netFD) init() error {
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
package net package net
import ( import (
"fmt"
"internal/testenv" "internal/testenv"
"io" "io"
"reflect" "reflect"
...@@ -654,3 +655,58 @@ func TestTCPSelfConnect(t *testing.T) { ...@@ -654,3 +655,58 @@ func TestTCPSelfConnect(t *testing.T) {
} }
} }
} }
// Test that >32-bit reads work on 64-bit systems.
// On 32-bit systems this tests that maxint reads work.
func TestTCPBig(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode")
}
for _, writev := range []bool{false, true} {
t.Run(fmt.Sprintf("writev=%v", writev), func(t *testing.T) {
ln, err := newLocalListener("tcp")
if err != nil {
t.Fatal(err)
}
defer ln.Close()
x := int(1 << 30)
x = x*5 + 1<<20 // just over 5 GB on 64-bit, just over 1GB on 32-bit
done := make(chan int)
go func() {
defer close(done)
c, err := ln.Accept()
if err != nil {
t.Error(err)
return
}
buf := make([]byte, x)
var n int
if writev {
var n64 int64
n64, err = (&Buffers{buf}).WriteTo(c)
n = int(n64)
} else {
n, err = c.Write(buf)
}
if n != len(buf) || err != nil {
t.Errorf("Write(buf) = %d, %v, want %d, nil", n, err, x)
}
c.Close()
}()
c, err := Dial("tcp", ln.Addr().String())
if err != nil {
t.Fatal(err)
}
buf := make([]byte, x)
n, err := io.ReadFull(c, buf)
if n != len(buf) || err != nil {
t.Errorf("Read(buf) = %d, %v, want %d, nil", n, err, x)
}
c.Close()
<-done
})
}
}
...@@ -169,3 +169,44 @@ func testBuffer_writeTo(t *testing.T, chunks int, useCopy bool) { ...@@ -169,3 +169,44 @@ func testBuffer_writeTo(t *testing.T, chunks int, useCopy bool) {
return nil return nil
}) })
} }
func TestWritevError(t *testing.T) {
ln, err := newLocalListener("tcp")
if err != nil {
t.Fatal(err)
}
defer ln.Close()
ch := make(chan Conn, 1)
go func() {
defer close(ch)
c, err := ln.Accept()
if err != nil {
t.Error(err)
return
}
ch <- c
}()
c1, err := Dial("tcp", ln.Addr().String())
if err != nil {
t.Fatal(err)
}
defer c1.Close()
c2 := <-ch
if c2 == nil {
t.Fatal("no server side connection")
}
c2.Close()
// 1 GB of data should be enough to notice the connection is gone.
// Just a few bytes is not enough.
// Arrange to reuse the same 1 MB buffer so that we don't allocate much.
buf := make([]byte, 1<<20)
buffers := make(Buffers, 1<<10)
for i := range buffers {
buffers[i] = buf
}
if _, err := buffers.WriteTo(c1); err == nil {
t.Fatalf("Buffers.WriteTo(closed conn) succeeded, want error", err)
}
}
...@@ -49,6 +49,10 @@ func (fd *netFD) writeBuffers(v *Buffers) (n int64, err error) { ...@@ -49,6 +49,10 @@ func (fd *netFD) writeBuffers(v *Buffers) (n int64, err error) {
continue continue
} }
iovecs = append(iovecs, syscall.Iovec{Base: &chunk[0]}) iovecs = append(iovecs, syscall.Iovec{Base: &chunk[0]})
if fd.isStream && len(chunk) > 1<<30 {
iovecs[len(iovecs)-1].SetLen(1 << 30)
break // continue chunk on next writev
}
iovecs[len(iovecs)-1].SetLen(len(chunk)) iovecs[len(iovecs)-1].SetLen(len(chunk))
if len(iovecs) == maxVec { if len(iovecs) == maxVec {
break break
...@@ -63,7 +67,7 @@ func (fd *netFD) writeBuffers(v *Buffers) (n int64, err error) { ...@@ -63,7 +67,7 @@ func (fd *netFD) writeBuffers(v *Buffers) (n int64, err error) {
uintptr(fd.sysfd), uintptr(fd.sysfd),
uintptr(unsafe.Pointer(&iovecs[0])), uintptr(unsafe.Pointer(&iovecs[0])),
uintptr(len(iovecs))) uintptr(len(iovecs)))
if wrote < 0 { if wrote == ^uintptr(0) {
wrote = 0 wrote = 0
} }
testHookDidWritev(int(wrote)) testHookDidWritev(int(wrote))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment