Unverified Commit 0d81d92a authored by Matthew Fisher's avatar Matthew Fisher Committed by GitHub

Merge pull request #5112 from mortent/WaitCRDEstablished

fix(helm): Wait for CRDs to reach established state for crd_install hook
parents 8494d133 b8e40a7c
...@@ -37,6 +37,7 @@ import ( ...@@ -37,6 +37,7 @@ import (
batch "k8s.io/api/batch/v1" batch "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
extv1beta1 "k8s.io/api/extensions/v1beta1" extv1beta1 "k8s.io/api/extensions/v1beta1"
apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apiequality "k8s.io/apimachinery/pkg/api/equality" apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
...@@ -45,6 +46,7 @@ import ( ...@@ -45,6 +46,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/resource" "k8s.io/cli-runtime/pkg/resource"
...@@ -76,6 +78,12 @@ func New(getter genericclioptions.RESTClientGetter) *Client { ...@@ -76,6 +78,12 @@ func New(getter genericclioptions.RESTClientGetter) *Client {
if getter == nil { if getter == nil {
getter = genericclioptions.NewConfigFlags(true) getter = genericclioptions.NewConfigFlags(true)
} }
err := apiextv1beta1.AddToScheme(scheme.Scheme)
if err != nil {
panic(err)
}
return &Client{ return &Client{
Factory: cmdutil.NewFactory(getter), Factory: cmdutil.NewFactory(getter),
Log: nopLogger, Log: nopLogger,
...@@ -488,6 +496,55 @@ func (c *Client) WatchUntilReady(namespace string, reader io.Reader, timeout int ...@@ -488,6 +496,55 @@ func (c *Client) WatchUntilReady(namespace string, reader io.Reader, timeout int
return perform(infos, c.watchTimeout(time.Duration(timeout)*time.Second)) return perform(infos, c.watchTimeout(time.Duration(timeout)*time.Second))
} }
// WatchUntilCRDEstablished polls the given CRD until it reaches the established
// state. A CRD needs to reach the established state before CRs can be created.
//
// If a naming conflict condition is found, this function will return an error.
func (c *Client) WaitUntilCRDEstablished(reader io.Reader, timeout time.Duration) error {
infos, err := c.BuildUnstructured(metav1.NamespaceAll, reader)
if err != nil {
return err
}
return perform(infos, c.pollCRDEstablished(timeout))
}
func (c *Client) pollCRDEstablished(t time.Duration) ResourceActorFunc {
return func(info *resource.Info) error {
return c.pollCRDUntilEstablished(t, info)
}
}
func (c *Client) pollCRDUntilEstablished(timeout time.Duration, info *resource.Info) error {
return wait.PollImmediate(time.Second, timeout, func() (bool, error) {
err := info.Get()
if err != nil {
return false, fmt.Errorf("unable to get CRD: %v", err)
}
crd := &apiextv1beta1.CustomResourceDefinition{}
err = scheme.Scheme.Convert(info.Object, crd, nil)
if err != nil {
return false, fmt.Errorf("unable to convert to CRD type: %v", err)
}
for _, cond := range crd.Status.Conditions {
switch cond.Type {
case apiextv1beta1.Established:
if cond.Status == apiextv1beta1.ConditionTrue {
return true, nil
}
case apiextv1beta1.NamesAccepted:
if cond.Status == apiextv1beta1.ConditionFalse {
return false, fmt.Errorf("naming conflict detected for CRD %s", crd.GetName())
}
}
}
return false, nil
})
}
func perform(infos Result, fn ResourceActorFunc) error { func perform(infos Result, fn ResourceActorFunc) error {
if len(infos) == 0 { if len(infos) == 0 {
return ErrNoObjectsVisited return ErrNoObjectsVisited
......
...@@ -24,8 +24,10 @@ import ( ...@@ -24,8 +24,10 @@ import (
"sort" "sort"
"strings" "strings"
"testing" "testing"
"time"
v1 "k8s.io/api/core/v1" "k8s.io/api/core/v1"
apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
...@@ -33,15 +35,35 @@ import ( ...@@ -33,15 +35,35 @@ import (
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest/fake" "k8s.io/client-go/rest/fake"
cmdtesting "k8s.io/kubernetes/pkg/kubectl/cmd/testing" cmdtesting "k8s.io/kubernetes/pkg/kubectl/cmd/testing"
kubectlscheme "k8s.io/kubernetes/pkg/kubectl/scheme"
) )
func init() {
err := apiextv1beta1.AddToScheme(scheme.Scheme)
if err != nil {
panic(err)
}
// Tiller use the scheme from go-client, but the cmdtesting
// package used here is hardcoded to use the scheme from
// kubectl. So for testing, we need to add the CustomResourceDefinition
// type to both schemes.
err = apiextv1beta1.AddToScheme(kubectlscheme.Scheme)
if err != nil {
panic(err)
}
}
var ( var (
codec = scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...)
unstructuredSerializer = resource.UnstructuredPlusDefaultContentConfig().NegotiatedSerializer unstructuredSerializer = resource.UnstructuredPlusDefaultContentConfig().NegotiatedSerializer
) )
func getCodec() runtime.Codec {
return scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...)
}
func objBody(obj runtime.Object) io.ReadCloser { func objBody(obj runtime.Object) io.ReadCloser {
return ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(codec, obj)))) return ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(getCodec(), obj))))
} }
func newPod(name string) v1.Pod { func newPod(name string) v1.Pod {
...@@ -103,7 +125,7 @@ func notFoundBody() *metav1.Status { ...@@ -103,7 +125,7 @@ func notFoundBody() *metav1.Status {
func newResponse(code int, obj runtime.Object) (*http.Response, error) { func newResponse(code int, obj runtime.Object) (*http.Response, error) {
header := http.Header{} header := http.Header{}
header.Set("Content-Type", runtime.ContentTypeJSON) header.Set("Content-Type", runtime.ContentTypeJSON)
body := ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(codec, obj)))) body := ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(getCodec(), obj))))
return &http.Response{StatusCode: code, Header: header, Body: body}, nil return &http.Response{StatusCode: code, Header: header, Body: body}, nil
} }
...@@ -434,6 +456,88 @@ func TestResourceSortOrder(t *testing.T) { ...@@ -434,6 +456,88 @@ func TestResourceSortOrder(t *testing.T) {
} }
} }
func TestWaitUntilCRDEstablished(t *testing.T) {
testCases := map[string]struct {
conditions []apiextv1beta1.CustomResourceDefinitionCondition
returnConditionsAfter int
success bool
}{
"crd reaches established state after 2 requests": {
conditions: []apiextv1beta1.CustomResourceDefinitionCondition{
{
Type: apiextv1beta1.Established,
Status: apiextv1beta1.ConditionTrue,
},
},
returnConditionsAfter: 2,
success: true,
},
"crd does not reach established state before timeout": {
conditions: []apiextv1beta1.CustomResourceDefinitionCondition{},
returnConditionsAfter: 100,
success: false,
},
"crd name is not accepted": {
conditions: []apiextv1beta1.CustomResourceDefinitionCondition{
{
Type: apiextv1beta1.NamesAccepted,
Status: apiextv1beta1.ConditionFalse,
},
},
returnConditionsAfter: 1,
success: false,
},
}
for tn, tc := range testCases {
func(name string) {
c := newTestClient()
defer c.Cleanup()
crdWithoutConditions := newCrdWithStatus("name", apiextv1beta1.CustomResourceDefinitionStatus{})
crdWithConditions := newCrdWithStatus("name", apiextv1beta1.CustomResourceDefinitionStatus{
Conditions: tc.conditions,
})
requestCount := 0
c.TestFactory.UnstructuredClient = &fake.RESTClient{
GroupVersion: schema.GroupVersion{Version: "v1"},
NegotiatedSerializer: unstructuredSerializer,
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
var crd apiextv1beta1.CustomResourceDefinition
if requestCount < tc.returnConditionsAfter {
crd = crdWithoutConditions
} else {
crd = crdWithConditions
}
requestCount += 1
return newResponse(200, &crd)
}),
}
err := c.WaitUntilCRDEstablished(strings.NewReader(crdManifest), 5*time.Second)
if err != nil && tc.success {
t.Errorf("%s: expected no error, but got %v", name, err)
}
if err == nil && !tc.success {
t.Errorf("%s: expected error, but didn't get one", name)
}
}(tn)
}
}
func newCrdWithStatus(name string, status apiextv1beta1.CustomResourceDefinitionStatus) apiextv1beta1.CustomResourceDefinition {
crd := apiextv1beta1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: metav1.NamespaceDefault,
},
Spec: apiextv1beta1.CustomResourceDefinitionSpec{},
Status: status,
}
return crd
}
func TestPerform(t *testing.T) { func TestPerform(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
...@@ -701,3 +805,41 @@ spec: ...@@ -701,3 +805,41 @@ spec:
ports: ports:
- containerPort: 80 - containerPort: 80
` `
const crdManifest = `
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
creationTimestamp: null
labels:
controller-tools.k8s.io: "1.0"
name: applications.app.k8s.io
spec:
group: app.k8s.io
names:
kind: Application
plural: applications
scope: Namespaced
validation:
openAPIV3Schema:
properties:
apiVersion:
description: 'Description'
type: string
kind:
description: 'Kind'
type: string
metadata:
type: object
spec:
type: object
status:
type: object
version: v1beta1
status:
acceptedNames:
kind: ""
plural: ""
conditions: []
storedVersions: []
`
...@@ -144,6 +144,8 @@ type KubeClient interface { ...@@ -144,6 +144,8 @@ type KubeClient interface {
// WaitAndGetCompletedPodPhase waits up to a timeout until a pod enters a completed phase // WaitAndGetCompletedPodPhase waits up to a timeout until a pod enters a completed phase
// and returns said phase (PodSucceeded or PodFailed qualify). // and returns said phase (PodSucceeded or PodFailed qualify).
WaitAndGetCompletedPodPhase(namespace string, reader io.Reader, timeout time.Duration) (v1.PodPhase, error) WaitAndGetCompletedPodPhase(namespace string, reader io.Reader, timeout time.Duration) (v1.PodPhase, error)
WaitUntilCRDEstablished(reader io.Reader, timeout time.Duration) error
} }
// PrintingKubeClient implements KubeClient, but simply prints the reader to // PrintingKubeClient implements KubeClient, but simply prints the reader to
...@@ -210,6 +212,11 @@ func (p *PrintingKubeClient) WaitAndGetCompletedPodPhase(namespace string, reade ...@@ -210,6 +212,11 @@ func (p *PrintingKubeClient) WaitAndGetCompletedPodPhase(namespace string, reade
return v1.PodUnknown, err return v1.PodUnknown, err
} }
func (p *PrintingKubeClient) WaitUntilCRDEstablished(reader io.Reader, timeout time.Duration) error {
_, err := io.Copy(p.Out, reader)
return err
}
// Environment provides the context for executing a client request. // Environment provides the context for executing a client request.
// //
// All services in a context are concurrency safe. // All services in a context are concurrency safe.
......
...@@ -72,6 +72,10 @@ func (k *mockKubeClient) WaitAndGetCompletedPodStatus(namespace string, reader i ...@@ -72,6 +72,10 @@ func (k *mockKubeClient) WaitAndGetCompletedPodStatus(namespace string, reader i
return "", nil return "", nil
} }
func (k *mockKubeClient) WaitUntilCRDEstablished(reader io.Reader, timeout time.Duration) error {
return nil
}
var _ Engine = &mockEngine{} var _ Engine = &mockEngine{}
var _ KubeClient = &mockKubeClient{} var _ KubeClient = &mockKubeClient{}
var _ KubeClient = &PrintingKubeClient{} var _ KubeClient = &PrintingKubeClient{}
......
...@@ -23,6 +23,7 @@ import ( ...@@ -23,6 +23,7 @@ import (
"path" "path"
"regexp" "regexp"
"strings" "strings"
"time"
"github.com/technosophos/moniker" "github.com/technosophos/moniker"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
...@@ -399,7 +400,7 @@ func (s *ReleaseServer) execHook(hs []*release.Hook, name, namespace, hook strin ...@@ -399,7 +400,7 @@ func (s *ReleaseServer) execHook(hs []*release.Hook, name, namespace, hook strin
b.Reset() b.Reset()
b.WriteString(h.Manifest) b.WriteString(h.Manifest)
// We can't watch CRDs // We can't watch CRDs, but need to wait until they reach the established state before continuing
if hook != hooks.CRDInstall { if hook != hooks.CRDInstall {
if err := kubeCli.WatchUntilReady(namespace, b, timeout, false); err != nil { if err := kubeCli.WatchUntilReady(namespace, b, timeout, false); err != nil {
s.Log("warning: Release %s %s %s could not complete: %s", name, hook, h.Path, err) s.Log("warning: Release %s %s %s could not complete: %s", name, hook, h.Path, err)
...@@ -410,6 +411,11 @@ func (s *ReleaseServer) execHook(hs []*release.Hook, name, namespace, hook strin ...@@ -410,6 +411,11 @@ func (s *ReleaseServer) execHook(hs []*release.Hook, name, namespace, hook strin
} }
return err return err
} }
} else {
if err := kubeCli.WaitUntilCRDEstablished(b, time.Duration(timeout)*time.Second); err != nil {
s.Log("warning: Release %s %s %s could not complete: %s", name, hook, h.Path, err)
return err
}
} }
} }
......
...@@ -654,6 +654,10 @@ func (kc *mockHooksKubeClient) WaitAndGetCompletedPodPhase(namespace string, rea ...@@ -654,6 +654,10 @@ func (kc *mockHooksKubeClient) WaitAndGetCompletedPodPhase(namespace string, rea
return v1.PodUnknown, nil return v1.PodUnknown, nil
} }
func (kc *mockHooksKubeClient) WaitUntilCRDEstablished(reader io.Reader, timeout time.Duration) error {
return nil
}
func deletePolicyStub(kubeClient *mockHooksKubeClient) *ReleaseServer { func deletePolicyStub(kubeClient *mockHooksKubeClient) *ReleaseServer {
e := environment.New() e := environment.New()
e.Releases = storage.Init(driver.NewMemory()) e.Releases = storage.Init(driver.NewMemory())
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment