Commit ec92b760 authored by Maciej Kwiek's avatar Maciej Kwiek

Release are locked to avoid parallel changes

Environment is supplied with release lock map which allows to lock a
release by name to make sure that update, rollback or uninstall aren't
running on one release at the same time.
parent 7ce0e27c
...@@ -19,6 +19,7 @@ package storage // import "k8s.io/helm/pkg/storage" ...@@ -19,6 +19,7 @@ package storage // import "k8s.io/helm/pkg/storage"
import ( import (
"fmt" "fmt"
"log" "log"
"sync"
rspb "k8s.io/helm/pkg/proto/hapi/release" rspb "k8s.io/helm/pkg/proto/hapi/release"
relutil "k8s.io/helm/pkg/releaseutil" relutil "k8s.io/helm/pkg/releaseutil"
...@@ -28,6 +29,11 @@ import ( ...@@ -28,6 +29,11 @@ import (
// Storage represents a storage engine for a Release. // Storage represents a storage engine for a Release.
type Storage struct { type Storage struct {
driver.Driver driver.Driver
// releaseLocks are for locking releases to make sure that only one operation at a time is executed on each release
releaseLocks map[string]*sync.Mutex
// releaseLocksLock is a mutex for accessing releaseLocks
releaseLocksLock *sync.Mutex
} }
// Get retrieves the release from storage. An error is returned // Get retrieves the release from storage. An error is returned
...@@ -153,6 +159,51 @@ func (s *Storage) Last(name string) (*rspb.Release, error) { ...@@ -153,6 +159,51 @@ func (s *Storage) Last(name string) (*rspb.Release, error) {
return h[0], nil return h[0], nil
} }
// LockRelease gains a mutually exclusive access to a release via a mutex.
func (s *Storage) LockRelease(name string) error {
s.releaseLocksLock.Lock()
defer s.releaseLocksLock.Unlock()
var lock *sync.Mutex
lock, exists := s.releaseLocks[name]
if !exists {
releases, err := s.ListReleases()
if err != nil {
return err
}
found := false
for _, release := range releases {
if release.Name == name {
found = true
}
}
if !found {
return fmt.Errorf("Unable to lock release %s: release not found", name)
}
lock = &sync.Mutex{}
s.releaseLocks[name] = lock
}
lock.Lock()
return nil
}
// UnlockRelease releases a mutually exclusive access to a release.
// If release doesn't exist or wasn't previously locked - the unlock will pass
func (s *Storage) UnlockRelease(name string) {
s.releaseLocksLock.Lock()
defer s.releaseLocksLock.Unlock()
var lock *sync.Mutex
lock, exists := s.releaseLocks[name]
if !exists {
return
}
lock.Unlock()
}
// makeKey concatenates a release name and version into // makeKey concatenates a release name and version into
// a string with format ```<release_name>#v<version>```. // a string with format ```<release_name>#v<version>```.
// This key is used to uniquely identify storage objects. // This key is used to uniquely identify storage objects.
...@@ -167,5 +218,9 @@ func Init(d driver.Driver) *Storage { ...@@ -167,5 +218,9 @@ func Init(d driver.Driver) *Storage {
if d == nil { if d == nil {
d = driver.NewMemory() d = driver.NewMemory()
} }
return &Storage{Driver: d} return &Storage{
Driver: d,
releaseLocks: make(map[string]*sync.Mutex),
releaseLocksLock: &sync.Mutex{},
}
} }
...@@ -272,3 +272,31 @@ func assertErrNil(eh func(args ...interface{}), err error, message string) { ...@@ -272,3 +272,31 @@ func assertErrNil(eh func(args ...interface{}), err error, message string) {
eh(fmt.Sprintf("%s: %q", message, err)) eh(fmt.Sprintf("%s: %q", message, err))
} }
} }
func TestReleaseLocksNotExist(t *testing.T) {
s := Init(driver.NewMemory())
err := s.LockRelease("no-such-release")
if err == nil {
t.Errorf("Exptected error when trying to lock non-existing release, got nil")
}
}
func TestReleaseLocks(t *testing.T) {
s := Init(driver.NewMemory())
releaseName := "angry-beaver"
rls := ReleaseTestData{
Name: releaseName,
Version: 1,
}.ToRelease()
s.Create(rls)
err := s.LockRelease(releaseName)
if err != nil {
t.Errorf("Exptected nil err when locking existing release")
}
s.UnlockRelease(releaseName)
}
...@@ -283,6 +283,12 @@ func (s *ReleaseServer) GetReleaseContent(c ctx.Context, req *services.GetReleas ...@@ -283,6 +283,12 @@ func (s *ReleaseServer) GetReleaseContent(c ctx.Context, req *services.GetReleas
// UpdateRelease takes an existing release and new information, and upgrades the release. // UpdateRelease takes an existing release and new information, and upgrades the release.
func (s *ReleaseServer) UpdateRelease(c ctx.Context, req *services.UpdateReleaseRequest) (*services.UpdateReleaseResponse, error) { func (s *ReleaseServer) UpdateRelease(c ctx.Context, req *services.UpdateReleaseRequest) (*services.UpdateReleaseResponse, error) {
err := s.env.Releases.LockRelease(req.Name)
if err != nil {
return nil, err
}
defer s.env.Releases.UnlockRelease(req.Name)
currentRelease, updatedRelease, err := s.prepareUpdate(req) currentRelease, updatedRelease, err := s.prepareUpdate(req)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -465,6 +471,12 @@ func (s *ReleaseServer) prepareUpdate(req *services.UpdateReleaseRequest) (*rele ...@@ -465,6 +471,12 @@ func (s *ReleaseServer) prepareUpdate(req *services.UpdateReleaseRequest) (*rele
// RollbackRelease rolls back to a previous version of the given release. // RollbackRelease rolls back to a previous version of the given release.
func (s *ReleaseServer) RollbackRelease(c ctx.Context, req *services.RollbackReleaseRequest) (*services.RollbackReleaseResponse, error) { func (s *ReleaseServer) RollbackRelease(c ctx.Context, req *services.RollbackReleaseRequest) (*services.RollbackReleaseResponse, error) {
err := s.env.Releases.LockRelease(req.Name)
if err != nil {
return nil, err
}
defer s.env.Releases.UnlockRelease(req.Name)
currentRelease, targetRelease, err := s.prepareRollback(req) currentRelease, targetRelease, err := s.prepareRollback(req)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -983,6 +995,12 @@ func (s *ReleaseServer) purgeReleases(rels ...*release.Release) error { ...@@ -983,6 +995,12 @@ func (s *ReleaseServer) purgeReleases(rels ...*release.Release) error {
// UninstallRelease deletes all of the resources associated with this release, and marks the release DELETED. // UninstallRelease deletes all of the resources associated with this release, and marks the release DELETED.
func (s *ReleaseServer) UninstallRelease(c ctx.Context, req *services.UninstallReleaseRequest) (*services.UninstallReleaseResponse, error) { func (s *ReleaseServer) UninstallRelease(c ctx.Context, req *services.UninstallReleaseRequest) (*services.UninstallReleaseResponse, error) {
err := s.env.Releases.LockRelease(req.Name)
if err != nil {
return nil, err
}
defer s.env.Releases.UnlockRelease(req.Name)
if !ValidName.MatchString(req.Name) { if !ValidName.MatchString(req.Name) {
log.Printf("uninstall: Release not found: %s", req.Name) log.Printf("uninstall: Release not found: %s", req.Name)
return nil, errMissingRelease return nil, errMissingRelease
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment