Commit 7ef9bb6f authored by Taylor Thomas's avatar Taylor Thomas

feat(*): Add --wait flag

Adds `--wait` flag to helm that waits for all pods to reach a ready
state, PVCs to be bound, and services to have IP addresses

Closes #1805
parent 7389b341
......@@ -194,6 +194,9 @@ message UpdateReleaseRequest {
int64 timeout = 7;
// ResetValues will cause Tiller to ignore stored values, resetting to default values.
bool reset_values = 8;
// wait, if true, will wait until all Pods, PVCs, and Services are in a ready state
// before marking the release as successful. It will wait for as long as timeout
bool wait = 9;
}
// UpdateReleaseResponse is the response to an update request.
......@@ -214,6 +217,9 @@ message RollbackReleaseRequest {
bool recreate = 5;
// timeout specifies the max amount of time any kubernetes client command can run.
int64 timeout = 6;
// wait, if true, will wait until all Pods, PVCs, and Services are in a ready state
// before marking the release as successful. It will wait for as long as timeout
bool wait = 7;
}
// RollbackReleaseResponse is the response to an update request.
......@@ -248,6 +254,9 @@ message InstallReleaseRequest {
// timeout specifies the max amount of time any kubernetes client command can run.
int64 timeout = 8;
// wait, if true, will wait until all Pods, PVCs, and Services are in a ready state
// before marking the release as successful. It will wait for as long as timeout
bool wait = 9;
}
// InstallReleaseResponse is the response from a release installation.
......
......@@ -112,6 +112,7 @@ type installCmd struct {
nameTemplate string
version string
timeout int64
wait bool
}
type valueFiles []string
......@@ -169,6 +170,7 @@ func newInstallCmd(c helm.Interface, out io.Writer) *cobra.Command {
f.StringVar(&inst.keyring, "keyring", defaultKeyring(), "location of public keys used for verification")
f.StringVar(&inst.version, "version", "", "specify the exact chart version to install. If this is not specified, the latest version is installed")
f.Int64Var(&inst.timeout, "timeout", 300, "time in seconds to wait for any individual kubernetes operation (like Jobs for hooks)")
f.BoolVar(&inst.wait, "wait", false, "if set, will wait until all Pods, PVCs, and Services are in a ready state before marking the release as successful. It will wait for as long as --timeout")
return cmd
}
......@@ -205,7 +207,8 @@ func (i *installCmd) run() error {
helm.InstallDryRun(i.dryRun),
helm.InstallReuseName(i.replace),
helm.InstallDisableHooks(i.disableHooks),
helm.InstallTimeout(i.timeout))
helm.InstallTimeout(i.timeout),
helm.InstallWait(i.wait))
if err != nil {
return prettyError(err)
}
......
......@@ -98,6 +98,14 @@ func TestInstall(t *testing.T) {
expected: "foobar",
resp: releaseMock(&releaseOptions{name: "foobar"}),
},
// Install, with wait
{
name: "install with a wait",
args: []string{"testdata/testcharts/alpine"},
flags: strings.Split("--wait", " "),
expected: "apollo",
resp: releaseMock(&releaseOptions{name: "apollo"}),
},
// Install, using the name-template
{
name: "install with name-template",
......
......@@ -43,6 +43,7 @@ type rollbackCmd struct {
out io.Writer
client helm.Interface
timeout int64
wait bool
}
func newRollbackCmd(c helm.Interface, out io.Writer) *cobra.Command {
......@@ -79,6 +80,7 @@ func newRollbackCmd(c helm.Interface, out io.Writer) *cobra.Command {
f.BoolVar(&rollback.recreate, "recreate-pods", false, "performs pods restart for the resource if applicable")
f.BoolVar(&rollback.disableHooks, "no-hooks", false, "prevent hooks from running during rollback")
f.Int64Var(&rollback.timeout, "timeout", 300, "time in seconds to wait for any individual kubernetes operation (like Jobs for hooks)")
f.BoolVar(&rollback.wait, "wait", false, "if set, will wait until all Pods, PVCs, and Services are in a ready state before marking the release as successful. It will wait for as long as --timeout")
return cmd
}
......@@ -90,7 +92,8 @@ func (r *rollbackCmd) run() error {
helm.RollbackRecreate(r.recreate),
helm.RollbackDisableHooks(r.disableHooks),
helm.RollbackVersion(r.revision),
helm.RollbackTimeout(r.timeout))
helm.RollbackTimeout(r.timeout),
helm.RollbackWait(r.wait))
if err != nil {
return prettyError(err)
}
......
......@@ -37,6 +37,12 @@ func TestRollbackCmd(t *testing.T) {
flags: []string{"--timeout", "120"},
expected: "Rollback was a success! Happy Helming!",
},
{
name: "rollback a release with wait",
args: []string{"funny-honey", "1"},
flags: []string{"--wait"},
expected: "Rollback was a success! Happy Helming!",
},
{
name: "rollback a release without revision",
args: []string{"funny-honey"},
......
......@@ -71,6 +71,7 @@ type upgradeCmd struct {
version string
timeout int64
resetValues bool
wait bool
}
func newUpgradeCmd(client helm.Interface, out io.Writer) *cobra.Command {
......@@ -112,6 +113,7 @@ func newUpgradeCmd(client helm.Interface, out io.Writer) *cobra.Command {
f.StringVar(&upgrade.version, "version", "", "specify the exact chart version to use. If this is not specified, the latest version is used")
f.Int64Var(&upgrade.timeout, "timeout", 300, "time in seconds to wait for any individual kubernetes operation (like Jobs for hooks)")
f.BoolVar(&upgrade.resetValues, "reset-values", false, "when upgrading, reset the values to the ones built into the chart")
f.BoolVar(&upgrade.wait, "wait", false, "if set, will wait until all Pods, PVCs, and Services are in a ready state before marking the release as successful. It will wait for as long as --timeout")
f.MarkDeprecated("disable-hooks", "use --no-hooks instead")
......@@ -147,6 +149,7 @@ func (u *upgradeCmd) run() error {
values: u.values,
namespace: u.namespace,
timeout: u.timeout,
wait: u.wait,
}
return ic.run()
}
......@@ -165,7 +168,8 @@ func (u *upgradeCmd) run() error {
helm.UpgradeRecreate(u.recreate),
helm.UpgradeDisableHooks(u.disableHooks),
helm.UpgradeTimeout(u.timeout),
helm.ResetValues(u.resetValues))
helm.ResetValues(u.resetValues),
helm.UpgradeWait(u.wait))
if err != nil {
return fmt.Errorf("UPGRADE FAILED: %v", prettyError(err))
}
......
......@@ -114,6 +114,13 @@ func TestUpgradeCmd(t *testing.T) {
resp: releaseMock(&releaseOptions{name: "crazy-bunny", version: 1, chart: ch}),
expected: "Release \"crazy-bunny\" has been upgraded. Happy Helming!\n",
},
{
name: "upgrade a release with wait",
args: []string{"crazy-bunny", chartPath},
flags: []string{"--wait"},
resp: releaseMock(&releaseOptions{name: "crazy-bunny", version: 2, chart: ch2}),
expected: "Release \"crazy-bunny\" has been upgraded. Happy Helming!\n",
},
}
cmd := func(c *fakeReleaseClient, out io.Writer) *cobra.Command {
......
......@@ -181,6 +181,27 @@ func RollbackTimeout(timeout int64) RollbackOption {
}
}
// InstallWait specifies whether or not to wait for all resources to be ready
func InstallWait(wait bool) InstallOption {
return func(opts *options) {
opts.instReq.Wait = wait
}
}
// UpgradeWait specifies whether or not to wait for all resources to be ready
func UpgradeWait(wait bool) UpdateOption {
return func(opts *options) {
opts.updateReq.Wait = wait
}
}
// RollbackWait specifies whether or not to wait for all resources to be ready
func RollbackWait(wait bool) RollbackOption {
return func(opts *options) {
opts.rollbackReq.Wait = wait
}
}
// UpdateValueOverrides specifies a list of values to include when upgrading
func UpdateValueOverrides(raw []byte) UpdateOption {
return func(opts *options) {
......
......@@ -30,6 +30,7 @@ import (
"k8s.io/kubernetes/pkg/api/v1"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
......@@ -40,6 +41,7 @@ import (
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/strategicpatch"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
)
......@@ -67,7 +69,7 @@ type ResourceActorFunc func(*resource.Info) error
// Create creates kubernetes resources from an io.reader
//
// Namespace will set the namespace
func (c *Client) Create(namespace string, reader io.Reader) error {
func (c *Client) Create(namespace string, reader io.Reader, timeout int64, shouldWait bool) error {
client, err := c.ClientSet()
if err != nil {
return err
......@@ -75,7 +77,18 @@ func (c *Client) Create(namespace string, reader io.Reader) error {
if err := ensureNamespace(client, namespace); err != nil {
return err
}
return perform(c, namespace, reader, createResource)
infos, buildErr := c.Build(namespace, reader)
if buildErr != nil {
return buildErr
}
err = perform(c, namespace, infos, createResource)
if err != nil {
return err
}
if shouldWait {
err = c.waitForResources(time.Duration(timeout)*time.Second, infos)
}
return err
}
func (c *Client) newBuilder(namespace string, reader io.Reader) *resource.Result {
......@@ -107,7 +120,11 @@ func (c *Client) Get(namespace string, reader io.Reader) (string, error) {
// Since we don't know what order the objects come in, let's group them by the types, so
// that when we print them, they come looking good (headers apply to subgroups, etc.)
objs := make(map[string][]runtime.Object)
err := perform(c, namespace, reader, func(info *resource.Info) error {
infos, err := c.Build(namespace, reader)
if err != nil {
return "", err
}
err = perform(c, namespace, infos, func(info *resource.Info) error {
log.Printf("Doing get for: '%s'", info.Name)
obj, err := resource.NewHelper(info.Client, info.Mapping).Get(info.Namespace, info.Name, info.Export)
if err != nil {
......@@ -159,7 +176,7 @@ func (c *Client) Get(namespace string, reader io.Reader) (string, error) {
// not present in the target configuration
//
// Namespace will set the namespaces
func (c *Client) Update(namespace string, originalReader, targetReader io.Reader, recreate bool) error {
func (c *Client) Update(namespace string, originalReader, targetReader io.Reader, recreate bool, timeout int64, shouldWait bool) error {
original, err := c.Build(namespace, originalReader)
if err != nil {
return fmt.Errorf("failed decoding reader into objects: %s", err)
......@@ -224,6 +241,12 @@ func (c *Client) Update(namespace string, originalReader, targetReader io.Reader
log.Printf("Failed to delete %s, err: %s", info.Name, err)
}
}
if shouldWait {
err = c.waitForResources(time.Duration(timeout)*time.Second, target)
}
if err != nil {
return err
}
return nil
}
......@@ -231,7 +254,11 @@ func (c *Client) Update(namespace string, originalReader, targetReader io.Reader
//
// Namespace will set the namespace
func (c *Client) Delete(namespace string, reader io.Reader) error {
return perform(c, namespace, reader, func(info *resource.Info) error {
infos, err := c.Build(namespace, reader)
if err != nil {
return err
}
return perform(c, namespace, infos, func(info *resource.Info) error {
log.Printf("Starting delete for %s %s", info.Name, info.Mapping.GroupVersionKind.Kind)
err := deleteResource(c, info)
return skipIfNotFound(err)
......@@ -264,18 +291,18 @@ func watchTimeout(t time.Duration) ResourceActorFunc {
// ascertained by watching the Status fields in a job's output.
//
// Handling for other kinds will be added as necessary.
func (c *Client) WatchUntilReady(namespace string, reader io.Reader, timeout int64) error {
func (c *Client) WatchUntilReady(namespace string, reader io.Reader, timeout int64, shouldWait bool) error {
infos, err := c.Build(namespace, reader)
if err != nil {
return err
}
// For jobs, there's also the option to do poll c.Jobs(namespace).Get():
// https://github.com/adamreese/kubernetes/blob/master/test/e2e/job.go#L291-L300
return perform(c, namespace, reader, watchTimeout(time.Duration(timeout)*time.Second))
return perform(c, namespace, infos, watchTimeout(time.Duration(timeout)*time.Second))
}
func perform(c *Client, namespace string, reader io.Reader, fn ResourceActorFunc) error {
infos, err := c.Build(namespace, reader)
switch {
case err != nil:
return err
case len(infos) == 0:
func perform(c *Client, namespace string, infos Result, fn ResourceActorFunc) error {
if len(infos) == 0 {
return ErrNoObjectsVisited
}
for _, info := range infos {
......@@ -287,8 +314,12 @@ func perform(c *Client, namespace string, reader io.Reader, fn ResourceActorFunc
}
func createResource(info *resource.Info) error {
_, err := resource.NewHelper(info.Client, info.Mapping).Create(info.Namespace, true, info.Object)
obj, err := resource.NewHelper(info.Client, info.Mapping).Create(info.Namespace, true, info.Object)
if err != nil {
return err
}
info.Refresh(obj, true)
return nil
}
func deleteResource(c *Client, info *resource.Info) error {
......@@ -328,7 +359,8 @@ func updateResource(c *Client, target *resource.Info, currentObj runtime.Object,
// send patch to server
helper := resource.NewHelper(target.Client, target.Mapping)
if _, err = helper.Patch(target.Namespace, target.Name, api.StrategicMergePatchType, patch); err != nil {
var obj runtime.Object
if obj, err = helper.Patch(target.Namespace, target.Name, api.StrategicMergePatchType, patch); err != nil {
return err
}
......@@ -336,6 +368,7 @@ func updateResource(c *Client, target *resource.Info, currentObj runtime.Object,
client, _ := c.ClientSet()
return recreatePods(client, target.Namespace, extractSelector(currentObj))
}
target.Refresh(obj, true)
return nil
}
......@@ -418,6 +451,132 @@ func watchUntilReady(timeout time.Duration, info *resource.Info) error {
return err
}
func podsReady(pods []api.Pod) bool {
if len(pods) == 0 {
return true
}
for _, pod := range pods {
if !api.IsPodReady(&pod) {
return false
}
}
return true
}
func servicesReady(svc []api.Service) bool {
if len(svc) == 0 {
return true
}
for _, s := range svc {
if !api.IsServiceIPSet(&s) {
return false
}
// This checks if the service has a LoadBalancer and that balancer has an Ingress defined
if s.Spec.Type == api.ServiceTypeLoadBalancer && s.Status.LoadBalancer.Ingress == nil {
return false
}
}
return true
}
func volumesReady(vols []api.PersistentVolumeClaim) bool {
if len(vols) == 0 {
return true
}
for _, v := range vols {
if v.Status.Phase != api.ClaimBound {
return false
}
}
return true
}
func getPods(client *internalclientset.Clientset, namespace string, selector map[string]string) ([]api.Pod, error) {
list, err := client.Pods(namespace).List(api.ListOptions{
FieldSelector: fields.Everything(),
LabelSelector: labels.Set(selector).AsSelector(),
})
if err != nil {
return nil, err
}
return list.Items, nil
}
// waitForResources polls to get the current status of all pods, PVCs, and Services
// until all are ready or a timeout is reached
func (c *Client) waitForResources(timeout time.Duration, created Result) error {
log.Printf("beginning wait for resources with timeout of %v", timeout)
client, _ := c.ClientSet()
return wait.Poll(2*time.Second, timeout, func() (bool, error) {
pods := []api.Pod{}
services := []api.Service{}
pvc := []api.PersistentVolumeClaim{}
for _, v := range created {
switch value := v.Object.(type) {
case (*api.ReplicationController):
list, err := getPods(client, value.Namespace, value.Spec.Selector)
if err != nil {
return false, err
}
pods = append(pods, list...)
case (*api.Pod):
pod, err := client.Pods(value.Namespace).Get(value.Name)
if err != nil {
return false, err
}
pods = append(pods, *pod)
case (*extensions.Deployment):
// Get the RS children first
rs, err := client.ReplicaSets(value.Namespace).List(api.ListOptions{
FieldSelector: fields.Everything(),
LabelSelector: labels.Set(value.Spec.Selector.MatchLabels).AsSelector(),
})
if err != nil {
return false, err
}
for _, r := range rs.Items {
list, err := getPods(client, value.Namespace, r.Spec.Selector.MatchLabels)
if err != nil {
return false, err
}
pods = append(pods, list...)
}
case (*extensions.DaemonSet):
list, err := getPods(client, value.Namespace, value.Spec.Selector.MatchLabels)
if err != nil {
return false, err
}
pods = append(pods, list...)
case (*apps.StatefulSet):
list, err := getPods(client, value.Namespace, value.Spec.Selector.MatchLabels)
if err != nil {
return false, err
}
pods = append(pods, list...)
case (*extensions.ReplicaSet):
list, err := getPods(client, value.Namespace, value.Spec.Selector.MatchLabels)
if err != nil {
return false, err
}
pods = append(pods, list...)
case (*api.PersistentVolumeClaim):
claim, err := client.PersistentVolumeClaims(value.Namespace).Get(value.Name)
if err != nil {
return false, err
}
pvc = append(pvc, *claim)
case (*api.Service):
svc, err := client.Services(value.Namespace).Get(value.Name)
if err != nil {
return false, err
}
services = append(services, *svc)
}
}
return podsReady(pods) && servicesReady(services) && volumesReady(pvc), nil
})
}
// waitForJob is a helper that waits for a job to complete.
//
// This operates on an event returned from a watcher.
......
......@@ -105,7 +105,9 @@ func (f *fakeReaperFactory) Reaper(mapping *meta.RESTMapping) (kubectl.Reaper, e
func TestUpdate(t *testing.T) {
listA := newPodList("starfish", "otter", "squid")
listB := newPodList("starfish", "otter", "dolphin")
listC := newPodList("starfish", "otter", "dolphin")
listB.Items[0].Spec.Containers[0].Ports = []api.ContainerPort{{Name: "https", ContainerPort: 443}}
listC.Items[0].Spec.Containers[0].Ports = []api.ContainerPort{{Name: "https", ContainerPort: 443}}
actions := make(map[string]string)
......@@ -148,10 +150,19 @@ func TestUpdate(t *testing.T) {
reaper := &fakeReaper{}
rf := &fakeReaperFactory{Factory: f, reaper: reaper}
c := &Client{Factory: rf}
if err := c.Update(api.NamespaceDefault, objBody(codec, &listA), objBody(codec, &listB), false); err != nil {
if err := c.Update(api.NamespaceDefault, objBody(codec, &listA), objBody(codec, &listB), false, 0, false); err != nil {
t.Fatal(err)
}
// TODO: Find a way to test methods that use Client Set
// Test with a wait
// if err := c.Update("test", objBody(codec, &listB), objBody(codec, &listC), false, 300, true); err != nil {
// t.Fatal(err)
// }
// Test with a wait should fail
// TODO: A way to make this not based off of an extremely short timeout?
// if err := c.Update("test", objBody(codec, &listC), objBody(codec, &listA), false, 2, true); err != nil {
// t.Fatal(err)
// }
expectedActions := map[string]string{
"/namespaces/default/pods/dolphin": "GET",
"/namespaces/default/pods/otter": "GET",
......@@ -171,7 +182,7 @@ func TestUpdate(t *testing.T) {
}
func TestPerform(t *testing.T) {
func TestBuild(t *testing.T) {
tests := []struct {
name string
namespace string
......@@ -186,12 +197,6 @@ func TestPerform(t *testing.T) {
namespace: "test",
reader: strings.NewReader(guestbookManifest),
count: 6,
}, {
name: "Empty manifests",
namespace: "test",
reader: strings.NewReader(""),
err: true,
errMessage: "no objects visited",
}, {
name: "Invalid schema",
namespace: "test",
......@@ -202,6 +207,59 @@ func TestPerform(t *testing.T) {
},
}
for _, tt := range tests {
f, tf, _, _ := cmdtesting.NewAPIFactory()
c := &Client{Factory: f}
if tt.swaggerFile != "" {
data, err := ioutil.ReadFile(tt.swaggerFile)
if err != nil {
t.Fatalf("could not read swagger spec: %s", err)
}
validator, err := validation.NewSwaggerSchemaFromBytes(data, nil)
if err != nil {
t.Fatalf("could not load swagger spec: %s", err)
}
tf.Validator = validator
}
// Test for an invalid manifest
infos, err := c.Build(tt.namespace, tt.reader)
if err != nil && err.Error() != tt.errMessage {
t.Errorf("%q. expected error message: %v, got %v", tt.name, tt.errMessage, err)
} else if err != nil && !tt.err {
t.Errorf("%q. Got error message when no error should have occurred: %v, got %v", tt.name, tt.errMessage, err)
}
if len(infos) != tt.count {
t.Errorf("%q. expected %d result objects, got %d", tt.name, tt.count, len(infos))
}
}
}
func TestPerform(t *testing.T) {
tests := []struct {
name string
namespace string
reader io.Reader
count int
swaggerFile string
err bool
errMessage string
}{
{
name: "Valid input",
namespace: "test",
reader: strings.NewReader(guestbookManifest),
count: 6,
}, {
name: "Empty manifests",
namespace: "test",
reader: strings.NewReader(""),
err: true,
errMessage: "no objects visited",
},
}
for _, tt := range tests {
results := []*resource.Info{}
......@@ -228,7 +286,12 @@ func TestPerform(t *testing.T) {
tf.Validator = validator
}
err := perform(c, tt.namespace, tt.reader, fn)
infos, err := c.Build(tt.namespace, tt.reader)
if err != nil && err.Error() != tt.errMessage {
t.Errorf("%q. Error while building manifests: %v", tt.name, err)
}
err = perform(c, tt.namespace, infos, fn)
if (err != nil) != tt.err {
t.Errorf("%q. expected error: %v, got %v", tt.name, tt.err, err)
}
......@@ -245,13 +308,13 @@ func TestPerform(t *testing.T) {
func TestReal(t *testing.T) {
t.Skip("This is a live test, comment this line to run")
c := New(nil)
if err := c.Create("test", strings.NewReader(guestbookManifest)); err != nil {
if err := c.Create("test", strings.NewReader(guestbookManifest), 300, false); err != nil {
t.Fatal(err)
}
testSvcEndpointManifest := testServiceManifest + "\n---\n" + testEndpointManifest
c = New(nil)
if err := c.Create("test-delete", strings.NewReader(testSvcEndpointManifest)); err != nil {
if err := c.Create("test-delete", strings.NewReader(testSvcEndpointManifest), 300, false); err != nil {
t.Fatal(err)
}
......
This diff is collapsed.
......@@ -99,7 +99,7 @@ type KubeClient interface {
//
// reader must contain a YAML stream (one or more YAML documents separated
// by "\n---\n").
Create(namespace string, reader io.Reader) error
Create(namespace string, reader io.Reader, timeout int64, shouldWait bool) error
// Get gets one or more resources. Returned string hsa the format like kubectl
// provides with the column headers separating the resource types.
......@@ -123,7 +123,7 @@ type KubeClient interface {
// For Jobs, "ready" means the job ran to completion (excited without error).
// For all other kinds, it means the kind was created or modified without
// error.
WatchUntilReady(namespace string, reader io.Reader, timeout int64) error
WatchUntilReady(namespace string, reader io.Reader, timeout int64, shouldWait bool) error
// Update updates one or more resources or creates the resource
// if it doesn't exist
......@@ -132,7 +132,7 @@ type KubeClient interface {
//
// reader must contain a YAML stream (one or more YAML documents separated
// by "\n---\n").
Update(namespace string, originalReader, modifiedReader io.Reader, recreate bool) error
Update(namespace string, originalReader, modifiedReader io.Reader, recreate bool, timeout int64, shouldWait bool) error
Build(namespace string, reader io.Reader) (kube.Result, error)
}
......@@ -144,7 +144,7 @@ type PrintingKubeClient struct {
}
// Create prints the values of what would be created with a real KubeClient.
func (p *PrintingKubeClient) Create(ns string, r io.Reader) error {
func (p *PrintingKubeClient) Create(ns string, r io.Reader, timeout int64, shouldWait bool) error {
_, err := io.Copy(p.Out, r)
return err
}
......@@ -164,13 +164,13 @@ func (p *PrintingKubeClient) Delete(ns string, r io.Reader) error {
}
// WatchUntilReady implements KubeClient WatchUntilReady.
func (p *PrintingKubeClient) WatchUntilReady(ns string, r io.Reader, t int64) error {
func (p *PrintingKubeClient) WatchUntilReady(ns string, r io.Reader, timeout int64, shouldWait bool) error {
_, err := io.Copy(p.Out, r)
return err
}
// Update implements KubeClient Update.
func (p *PrintingKubeClient) Update(ns string, currentReader, modifiedReader io.Reader, recreate bool) error {
func (p *PrintingKubeClient) Update(ns string, currentReader, modifiedReader io.Reader, recreate bool, timeout int64, shouldWait bool) error {
_, err := io.Copy(p.Out, modifiedReader)
return err
}
......
......@@ -37,7 +37,7 @@ func (e *mockEngine) Render(chrt *chart.Chart, v chartutil.Values) (map[string]s
type mockKubeClient struct{}
func (k *mockKubeClient) Create(ns string, r io.Reader) error {
func (k *mockKubeClient) Create(ns string, r io.Reader, timeout int64, shouldWait bool) error {
return nil
}
func (k *mockKubeClient) Get(ns string, r io.Reader) (string, error) {
......@@ -46,10 +46,10 @@ func (k *mockKubeClient) Get(ns string, r io.Reader) (string, error) {
func (k *mockKubeClient) Delete(ns string, r io.Reader) error {
return nil
}
func (k *mockKubeClient) Update(ns string, currentReader, modifiedReader io.Reader, recreate bool) error {
func (k *mockKubeClient) Update(ns string, currentReader, modifiedReader io.Reader, recreate bool, timeout int64, shouldWait bool) error {
return nil
}
func (k *mockKubeClient) WatchUntilReady(ns string, r io.Reader, t int64) error {
func (k *mockKubeClient) WatchUntilReady(ns string, r io.Reader, timeout int64, shouldWait bool) error {
return nil
}
func (k *mockKubeClient) Build(ns string, reader io.Reader) (kube.Result, error) {
......@@ -91,7 +91,7 @@ func TestKubeClient(t *testing.T) {
b.WriteString(content)
}
if err := env.KubeClient.Create("sharry-bobbins", b); err != nil {
if err := env.KubeClient.Create("sharry-bobbins", b, 300, false); err != nil {
t.Errorf("Kubeclient failed: %s", err)
}
}
......@@ -299,7 +299,7 @@ func (s *ReleaseServer) performUpdate(originalRelease, updatedRelease *release.R
}
}
if err := s.performKubeUpdate(originalRelease, updatedRelease, req.Recreate); err != nil {
if err := s.performKubeUpdate(originalRelease, updatedRelease, req.Recreate, req.Timeout, req.Wait); err != nil {
log.Printf("warning: Release Upgrade %q failed: %s", updatedRelease.Name, err)
originalRelease.Info.Status.Code = release.Status_SUPERSEDED
updatedRelease.Info.Status.Code = release.Status_FAILED
......@@ -453,7 +453,7 @@ func (s *ReleaseServer) performRollback(currentRelease, targetRelease *release.R
}
}
if err := s.performKubeUpdate(currentRelease, targetRelease, req.Recreate); err != nil {
if err := s.performKubeUpdate(currentRelease, targetRelease, req.Recreate, req.Timeout, req.Wait); err != nil {
log.Printf("warning: Release Rollback %q failed: %s", targetRelease.Name, err)
currentRelease.Info.Status.Code = release.Status_SUPERSEDED
targetRelease.Info.Status.Code = release.Status_FAILED
......@@ -477,11 +477,11 @@ func (s *ReleaseServer) performRollback(currentRelease, targetRelease *release.R
return res, nil
}
func (s *ReleaseServer) performKubeUpdate(currentRelease, targetRelease *release.Release, recreate bool) error {
func (s *ReleaseServer) performKubeUpdate(currentRelease, targetRelease *release.Release, recreate bool, timeout int64, shouldWait bool) error {
kubeCli := s.env.KubeClient
current := bytes.NewBufferString(currentRelease.Manifest)
target := bytes.NewBufferString(targetRelease.Manifest)
return kubeCli.Update(targetRelease.Namespace, current, target, recreate)
return kubeCli.Update(targetRelease.Namespace, current, target, recreate, timeout, shouldWait)
}
// prepareRollback finds the previous release and prepares a new release object with
......@@ -820,7 +820,7 @@ func (s *ReleaseServer) performRelease(r *release.Release, req *services.Install
// so as to append to the old release's history
r.Version = old.Version + 1
if err := s.performKubeUpdate(old, r, false); err != nil {
if err := s.performKubeUpdate(old, r, false, req.Timeout, req.Wait); err != nil {
log.Printf("warning: Release replace %q failed: %s", r.Name, err)
old.Info.Status.Code = release.Status_SUPERSEDED
r.Info.Status.Code = release.Status_FAILED
......@@ -833,7 +833,7 @@ func (s *ReleaseServer) performRelease(r *release.Release, req *services.Install
// nothing to replace, create as normal
// regular manifests
b := bytes.NewBufferString(r.Manifest)
if err := s.env.KubeClient.Create(r.Namespace, b); err != nil {
if err := s.env.KubeClient.Create(r.Namespace, b, req.Timeout, req.Wait); err != nil {
log.Printf("warning: Release %q failed: %s", r.Name, err)
r.Info.Status.Code = release.Status_FAILED
s.recordRelease(r, false)
......@@ -885,14 +885,14 @@ func (s *ReleaseServer) execHook(hs []*release.Hook, name, namespace, hook strin
}
b := bytes.NewBufferString(h.Manifest)
if err := kubeCli.Create(namespace, b); err != nil {
if err := kubeCli.Create(namespace, b, timeout, false); err != nil {
log.Printf("warning: Release %q %s %s failed: %s", name, hook, h.Path, err)
return err
}
// No way to rewind a bytes.Buffer()?
b.Reset()
b.WriteString(h.Manifest)
if err := kubeCli.WatchUntilReady(namespace, b, timeout); err != nil {
if err := kubeCli.WatchUntilReady(namespace, b, timeout, false); err != nil {
log.Printf("warning: Release %q %s %s could not complete: %s", name, hook, h.Path, err)
return err
}
......
......@@ -1390,7 +1390,7 @@ type updateFailingKubeClient struct {
environment.PrintingKubeClient
}
func (u *updateFailingKubeClient) Update(namespace string, originalReader, modifiedReader io.Reader, recreate bool) error {
func (u *updateFailingKubeClient) Update(namespace string, originalReader, modifiedReader io.Reader, recreate bool, timeout int64, shouldWait bool) error {
return errors.New("Failed update in kube client")
}
......@@ -1404,7 +1404,7 @@ type hookFailingKubeClient struct {
environment.PrintingKubeClient
}
func (h *hookFailingKubeClient) WatchUntilReady(ns string, r io.Reader, t int64) error {
func (h *hookFailingKubeClient) WatchUntilReady(ns string, r io.Reader, timeout int64, shouldWait bool) error {
return errors.New("Failed watch")
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment