Unverified Commit 3630a532 authored by Martin Hickey's avatar Martin Hickey Committed by GitHub

Merge pull request #5678 from pdecat/fix-helm-repo-add-concurrent

Fix issues with concurrent `helm repo add` commands
parents 2a344c0e 79a190a6
...@@ -17,16 +17,20 @@ limitations under the License. ...@@ -17,16 +17,20 @@ limitations under the License.
package main package main
import ( import (
"context"
"fmt" "fmt"
"io" "io"
"syscall"
"time"
"golang.org/x/crypto/ssh/terminal"
"github.com/gofrs/flock"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"golang.org/x/crypto/ssh/terminal"
"k8s.io/helm/pkg/getter" "k8s.io/helm/pkg/getter"
"k8s.io/helm/pkg/helm/helmpath" "k8s.io/helm/pkg/helm/helmpath"
"k8s.io/helm/pkg/repo" "k8s.io/helm/pkg/repo"
"syscall"
) )
type repoAddCmd struct { type repoAddCmd struct {
...@@ -131,6 +135,25 @@ func addRepository(name, url, username, password string, home helmpath.Home, cer ...@@ -131,6 +135,25 @@ func addRepository(name, url, username, password string, home helmpath.Home, cer
return fmt.Errorf("Looks like %q is not a valid chart repository or cannot be reached: %s", url, err.Error()) return fmt.Errorf("Looks like %q is not a valid chart repository or cannot be reached: %s", url, err.Error())
} }
// Lock the repository file for concurrent goroutines or processes synchronization
fileLock := flock.New(home.RepositoryFile())
lockCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
locked, err := fileLock.TryLockContext(lockCtx, time.Second)
if err == nil && locked {
defer fileLock.Unlock()
}
if err != nil {
return err
}
// Re-read the repositories file before updating it as its content may have been changed
// by a concurrent execution after the first read and before being locked
f, err = repo.LoadRepositoriesFile(home.RepositoryFile())
if err != nil {
return err
}
f.Update(&c) f.Update(&c)
return f.WriteFile(home.RepositoryFile(), 0644) return f.WriteFile(home.RepositoryFile(), 0644)
......
...@@ -17,13 +17,18 @@ limitations under the License. ...@@ -17,13 +17,18 @@ limitations under the License.
package main package main
import ( import (
"fmt"
"io" "io"
"os" "os"
"os/exec"
"strings"
"sync"
"testing" "testing"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"k8s.io/helm/pkg/helm" "k8s.io/helm/pkg/helm"
"k8s.io/helm/pkg/helm/helmpath"
"k8s.io/helm/pkg/repo" "k8s.io/helm/pkg/repo"
"k8s.io/helm/pkg/repo/repotest" "k8s.io/helm/pkg/repo/repotest"
) )
...@@ -101,3 +106,111 @@ func TestRepoAdd(t *testing.T) { ...@@ -101,3 +106,111 @@ func TestRepoAdd(t *testing.T) {
t.Errorf("Duplicate repository name was added") t.Errorf("Duplicate repository name was added")
} }
} }
func TestRepoAddConcurrentGoRoutines(t *testing.T) {
ts, thome, err := repotest.NewTempServer("testdata/testserver/*.*")
if err != nil {
t.Fatal(err)
}
cleanup := resetEnv()
defer func() {
ts.Stop()
os.RemoveAll(thome.String())
cleanup()
}()
settings.Home = thome
if err := ensureTestHome(settings.Home, t); err != nil {
t.Fatal(err)
}
var wg sync.WaitGroup
wg.Add(3)
for i := 0; i < 3; i++ {
go func(name string) {
defer wg.Done()
if err := addRepository(name, ts.URL(), "", "", settings.Home, "", "", "", true); err != nil {
t.Error(err)
}
}(fmt.Sprintf("%s-%d", testName, i))
}
wg.Wait()
f, err := repo.LoadRepositoriesFile(settings.Home.RepositoryFile())
if err != nil {
t.Error(err)
}
var name string
for i := 0; i < 3; i++ {
name = fmt.Sprintf("%s-%d", testName, i)
if !f.Has(name) {
t.Errorf("%s was not successfully inserted into %s", name, settings.Home.RepositoryFile())
}
}
}
// Same as TestRepoAddConcurrentGoRoutines but with repository additions in sub-processes
func TestRepoAddConcurrentSubProcesses(t *testing.T) {
goWantHelperProcess := os.Getenv("GO_WANT_HELPER_PROCESS")
if goWantHelperProcess == "" {
// parent
ts, thome, err := repotest.NewTempServer("testdata/testserver/*.*")
if err != nil {
t.Fatal(err)
}
settings.Home = thome
cleanup := resetEnv()
defer func() {
ts.Stop()
os.RemoveAll(thome.String())
cleanup()
}()
if err := ensureTestHome(settings.Home, t); err != nil {
t.Fatal(err)
}
var wg sync.WaitGroup
wg.Add(2)
for i := 0; i < 2; i++ {
go func(name string) {
defer wg.Done()
cmd := exec.Command(os.Args[0], "-test.run=^TestRepoAddConcurrentSubProcesses$")
cmd.Env = append(os.Environ(), fmt.Sprintf("GO_WANT_HELPER_PROCESS=%s,%s", name, ts.URL()), fmt.Sprintf("HELM_HOME=%s", settings.Home))
out, err := cmd.CombinedOutput()
if len(out) > 0 || err != nil {
t.Fatalf("child process: %q, %v", out, err)
}
}(fmt.Sprintf("%s-%d", testName, i))
}
wg.Wait()
f, err := repo.LoadRepositoriesFile(settings.Home.RepositoryFile())
if err != nil {
t.Error(err)
}
var name string
for i := 0; i < 2; i++ {
name = fmt.Sprintf("%s-%d", testName, i)
if !f.Has(name) {
t.Errorf("%s was not successfully inserted into %s", name, settings.Home.RepositoryFile())
}
}
} else {
// child
s := strings.Split(goWantHelperProcess, ",")
settings.Home = helmpath.Home(os.Getenv("HELM_HOME"))
repoName := s[0]
tsURL := s[1]
if err := addRepository(repoName, tsURL, "", "", settings.Home, "", "", "", true); err != nil {
t.Fatal(err)
}
os.Exit(0)
}
}
...@@ -114,6 +114,8 @@ imports: ...@@ -114,6 +114,8 @@ imports:
- syntax/lexer - syntax/lexer
- util/runes - util/runes
- util/strings - util/strings
- name: github.com/gofrs/flock
version: 392e7fae8f1b0bdbd67dad7237d23f618feb6dbb
- name: github.com/gogo/protobuf - name: github.com/gogo/protobuf
version: 342cbe0a04158f6dcb03ca0079991a51a4248c02 version: 342cbe0a04158f6dcb03ca0079991a51a4248c02
subpackages: subpackages:
......
...@@ -67,6 +67,8 @@ import: ...@@ -67,6 +67,8 @@ import:
- package: github.com/jmoiron/sqlx - package: github.com/jmoiron/sqlx
version: ^1.2.0 version: ^1.2.0
- package: github.com/rubenv/sql-migrate - package: github.com/rubenv/sql-migrate
- package: github.com/gofrs/flock
version: v0.7.1
testImports: testImports:
- package: github.com/stretchr/testify - package: github.com/stretchr/testify
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment