Commit b1ade9c8 authored by Matt Butcher's avatar Matt Butcher Committed by GitHub

Merge pull request #2314 from nebril/lock-release

Releases are locked to avoid parallel changes
parents 6e63a547 ec92b760
......@@ -19,6 +19,7 @@ package storage // import "k8s.io/helm/pkg/storage"
import (
"fmt"
"log"
"sync"
rspb "k8s.io/helm/pkg/proto/hapi/release"
relutil "k8s.io/helm/pkg/releaseutil"
......@@ -28,6 +29,11 @@ import (
// Storage represents a storage engine for a Release.
type Storage struct {
driver.Driver
// releaseLocks are for locking releases to make sure that only one operation at a time is executed on each release
releaseLocks map[string]*sync.Mutex
// releaseLocksLock is a mutex for accessing releaseLocks
releaseLocksLock *sync.Mutex
}
// Get retrieves the release from storage. An error is returned
......@@ -153,6 +159,51 @@ func (s *Storage) Last(name string) (*rspb.Release, error) {
return h[0], nil
}
// LockRelease gains a mutually exclusive access to a release via a mutex.
func (s *Storage) LockRelease(name string) error {
s.releaseLocksLock.Lock()
defer s.releaseLocksLock.Unlock()
var lock *sync.Mutex
lock, exists := s.releaseLocks[name]
if !exists {
releases, err := s.ListReleases()
if err != nil {
return err
}
found := false
for _, release := range releases {
if release.Name == name {
found = true
}
}
if !found {
return fmt.Errorf("Unable to lock release %s: release not found", name)
}
lock = &sync.Mutex{}
s.releaseLocks[name] = lock
}
lock.Lock()
return nil
}
// UnlockRelease releases a mutually exclusive access to a release.
// If release doesn't exist or wasn't previously locked - the unlock will pass
func (s *Storage) UnlockRelease(name string) {
s.releaseLocksLock.Lock()
defer s.releaseLocksLock.Unlock()
var lock *sync.Mutex
lock, exists := s.releaseLocks[name]
if !exists {
return
}
lock.Unlock()
}
// makeKey concatenates a release name and version into
// a string with format ```<release_name>#v<version>```.
// This key is used to uniquely identify storage objects.
......@@ -167,5 +218,9 @@ func Init(d driver.Driver) *Storage {
if d == nil {
d = driver.NewMemory()
}
return &Storage{Driver: d}
return &Storage{
Driver: d,
releaseLocks: make(map[string]*sync.Mutex),
releaseLocksLock: &sync.Mutex{},
}
}
......@@ -272,3 +272,31 @@ func assertErrNil(eh func(args ...interface{}), err error, message string) {
eh(fmt.Sprintf("%s: %q", message, err))
}
}
func TestReleaseLocksNotExist(t *testing.T) {
s := Init(driver.NewMemory())
err := s.LockRelease("no-such-release")
if err == nil {
t.Errorf("Exptected error when trying to lock non-existing release, got nil")
}
}
func TestReleaseLocks(t *testing.T) {
s := Init(driver.NewMemory())
releaseName := "angry-beaver"
rls := ReleaseTestData{
Name: releaseName,
Version: 1,
}.ToRelease()
s.Create(rls)
err := s.LockRelease(releaseName)
if err != nil {
t.Errorf("Exptected nil err when locking existing release")
}
s.UnlockRelease(releaseName)
}
......@@ -283,6 +283,12 @@ func (s *ReleaseServer) GetReleaseContent(c ctx.Context, req *services.GetReleas
// UpdateRelease takes an existing release and new information, and upgrades the release.
func (s *ReleaseServer) UpdateRelease(c ctx.Context, req *services.UpdateReleaseRequest) (*services.UpdateReleaseResponse, error) {
err := s.env.Releases.LockRelease(req.Name)
if err != nil {
return nil, err
}
defer s.env.Releases.UnlockRelease(req.Name)
currentRelease, updatedRelease, err := s.prepareUpdate(req)
if err != nil {
return nil, err
......@@ -465,6 +471,12 @@ func (s *ReleaseServer) prepareUpdate(req *services.UpdateReleaseRequest) (*rele
// RollbackRelease rolls back to a previous version of the given release.
func (s *ReleaseServer) RollbackRelease(c ctx.Context, req *services.RollbackReleaseRequest) (*services.RollbackReleaseResponse, error) {
err := s.env.Releases.LockRelease(req.Name)
if err != nil {
return nil, err
}
defer s.env.Releases.UnlockRelease(req.Name)
currentRelease, targetRelease, err := s.prepareRollback(req)
if err != nil {
return nil, err
......@@ -983,6 +995,12 @@ func (s *ReleaseServer) purgeReleases(rels ...*release.Release) error {
// UninstallRelease deletes all of the resources associated with this release, and marks the release DELETED.
func (s *ReleaseServer) UninstallRelease(c ctx.Context, req *services.UninstallReleaseRequest) (*services.UninstallReleaseResponse, error) {
err := s.env.Releases.LockRelease(req.Name)
if err != nil {
return nil, err
}
defer s.env.Releases.UnlockRelease(req.Name)
if !ValidName.MatchString(req.Name) {
log.Printf("uninstall: Release not found: %s", req.Name)
return nil, errMissingRelease
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment