Commit 6b7efb9c authored by Adam Reese's avatar Adam Reese Committed by GitHub

Merge pull request #1941 from adamreese/feat/tpr-support

feat(kube): support third party resources
parents 5ee6ab12 0f461ba8
......@@ -82,7 +82,11 @@ func main() {
p.StringVarP(&grpcAddr, "listen", "l", ":44134", "address:port to listen on")
p.StringVar(&store, "storage", storageConfigMap, "storage driver to use. One of 'configmap' or 'memory'")
p.BoolVar(&enableTracing, "trace", false, "enable rpc tracing")
rootCommand.Execute()
if err := rootCommand.Execute(); err != nil {
fmt.Fprint(os.Stderr, err)
os.Exit(1)
}
}
func start(c *cobra.Command, args []string) {
......
hash: f59cd1f34ecf299aeb8ca9e16cfc181d8aeaf5a5762b89d76f18d08be35e4d67
updated: 2016-12-21T11:19:14.731981344-07:00
hash: d9b023509801b816bc80b3abd67eb80532af1625e71ad4e0ff8ef98664f96ded
updated: 2017-02-10T11:42:12.50337033-08:00
imports:
- name: cloud.google.com/go
version: 3b1ae45394a234c385be014e9a488f2bb6eef821
......@@ -290,7 +290,7 @@ imports:
- name: gopkg.in/yaml.v2
version: a83829b6f1293c91addabc89d0571c246397bbf4
- name: k8s.io/kubernetes
version: 5f332aab13e58173f85fd204a2c77731f7a2573f
version: 08e099554f3c31f6e6f07b448ab3ed78d0520507
subpackages:
- cmd/kubeadm/app/apis/kubeadm
- cmd/kubeadm/app/apis/kubeadm/install
......@@ -482,6 +482,7 @@ imports:
- pkg/util/yaml
- pkg/version
- pkg/watch
- pkg/watch/json
- pkg/watch/versioned
- plugin/pkg/client/auth
- plugin/pkg/client/auth/gcp
......
......@@ -55,6 +55,7 @@ import:
- openpgp
- package: github.com/gobwas/glob
version: ^0.2.1
- package: github.com/evanphx/json-patch
testImports:
- package: github.com/stretchr/testify
version: ^1.1.4
......
......@@ -18,6 +18,7 @@ package kube // import "k8s.io/helm/pkg/kube"
import (
"bytes"
"encoding/json"
goerrors "errors"
"fmt"
"io"
......@@ -25,6 +26,7 @@ import (
"strings"
"time"
jsonpatch "github.com/evanphx/json-patch"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/v1"
......@@ -78,7 +80,7 @@ func (c *Client) Create(namespace string, reader io.Reader, timeout int64, shoul
if err := ensureNamespace(client, namespace); err != nil {
return err
}
infos, buildErr := c.Build(namespace, reader)
infos, buildErr := c.BuildUnstructured(namespace, reader)
if buildErr != nil {
return buildErr
}
......@@ -107,6 +109,30 @@ func (c *Client) newBuilder(namespace string, reader io.Reader) *resource.Result
Do()
}
// BuildUnstructured validates for Kubernetes objects and returns unstructured infos.
func (c *Client) BuildUnstructured(namespace string, reader io.Reader) (Result, error) {
schema, err := c.Validator(true, c.SchemaCacheDir)
if err != nil {
log.Printf("warning: failed to load schema: %s", err)
}
mapper, typer, err := c.UnstructuredObject()
if err != nil {
log.Printf("failed to load mapper: %s", err)
return nil, err
}
var result Result
result, err = resource.NewBuilder(mapper, typer, resource.ClientMapperFunc(c.UnstructuredClientForMapping), runtime.UnstructuredJSONScheme).
ContinueOnError().
Schema(schema).
NamespaceParam(namespace).
DefaultNamespace().
Stream(reader, "").
Flatten().
Do().Infos()
return result, scrubValidationError(err)
}
// Build validates for Kubernetes objects and returns resource Infos from a io.Reader.
func (c *Client) Build(namespace string, reader io.Reader) (Result, error) {
var result Result
......@@ -121,7 +147,7 @@ func (c *Client) Get(namespace string, reader io.Reader) (string, error) {
// Since we don't know what order the objects come in, let's group them by the types, so
// that when we print them, they come looking good (headers apply to subgroups, etc.)
objs := make(map[string][]runtime.Object)
infos, err := c.Build(namespace, reader)
infos, err := c.BuildUnstructured(namespace, reader)
if err != nil {
return "", err
}
......@@ -178,12 +204,12 @@ func (c *Client) Get(namespace string, reader io.Reader) (string, error) {
//
// Namespace will set the namespaces
func (c *Client) Update(namespace string, originalReader, targetReader io.Reader, recreate bool, timeout int64, shouldWait bool) error {
original, err := c.Build(namespace, originalReader)
original, err := c.BuildUnstructured(namespace, originalReader)
if err != nil {
return fmt.Errorf("failed decoding reader into objects: %s", err)
}
target, err := c.Build(namespace, targetReader)
target, err := c.BuildUnstructured(namespace, targetReader)
if err != nil {
return fmt.Errorf("failed decoding reader into objects: %s", err)
}
......@@ -216,12 +242,7 @@ func (c *Client) Update(namespace string, originalReader, targetReader io.Reader
return fmt.Errorf("no resource with the name %s found", info.Name)
}
versionedObject, err := originalInfo.Mapping.ConvertToVersion(originalInfo.Object, originalInfo.Mapping.GroupVersionKind.GroupVersion())
if err != nil {
return err
}
if err := updateResource(c, info, versionedObject, recreate); err != nil {
if err := updateResource(c, info, originalInfo.Object, recreate); err != nil {
log.Printf("error updating the resource %s:\n\t %v", info.Name, err)
updateErrors = append(updateErrors, err.Error())
}
......@@ -245,17 +266,14 @@ func (c *Client) Update(namespace string, originalReader, targetReader io.Reader
if shouldWait {
err = c.waitForResources(time.Duration(timeout)*time.Second, target)
}
if err != nil {
return err
}
return nil
return err
}
// Delete deletes kubernetes resources from an io.reader
//
// Namespace will set the namespace
func (c *Client) Delete(namespace string, reader io.Reader) error {
infos, err := c.Build(namespace, reader)
infos, err := c.BuildUnstructured(namespace, reader)
if err != nil {
return err
}
......@@ -337,32 +355,51 @@ func deleteResource(c *Client, info *resource.Info) error {
return reaper.Stop(info.Namespace, info.Name, 0, nil)
}
func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, recreate bool) error {
encoder := c.JSONEncoder()
original, err := runtime.Encode(encoder, currentObj)
func createPatch(target *resource.Info, currentObj runtime.Object) ([]byte, api.PatchType, error) {
// Get a versioned object
versionedObject, err := api.Scheme.New(target.Mapping.GroupVersionKind)
if err != nil {
return err
return nil, api.StrategicMergePatchType, fmt.Errorf("failed to get versionedObject: %s", err)
}
modified, err := runtime.Encode(encoder, target.Object)
oldData, err := json.Marshal(currentObj)
if err != nil {
return err
return nil, api.StrategicMergePatchType, fmt.Errorf("serializing current configuration: %s", err)
}
newData, err := json.Marshal(target.Object)
if err != nil {
return nil, api.StrategicMergePatchType, fmt.Errorf("serializing target configuration: %s", err)
}
if api.Semantic.DeepEqual(original, modified) {
log.Printf("Looks like there are no changes for %s", target.Name)
return nil
if api.Semantic.DeepEqual(oldData, newData) {
return nil, api.StrategicMergePatchType, nil
}
switch target.Object.(type) {
case *runtime.Unstructured:
patch, err := jsonpatch.CreateMergePatch(oldData, newData)
return patch, api.MergePatchType, err
default:
log.Printf("generating strategic merge patch for %T", target.Object)
patch, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, versionedObject)
return patch, api.StrategicMergePatchType, err
}
}
patch, err := strategicpatch.CreateTwoWayMergePatch(original, modified, currentObj)
func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, recreate bool) error {
patch, patchType, err := createPatch(target, currentObj)
if err != nil {
return err
return fmt.Errorf("failed to create patch: %s", err)
}
if patch == nil {
log.Printf("Looks like there are no changes for %s", target.Name)
return nil
}
// send patch to server
helper := resource.NewHelper(target.Client, target.Mapping)
var obj runtime.Object
if obj, err = helper.Patch(target.Namespace, target.Name, api.StrategicMergePatchType, patch); err != nil {
if obj, err = helper.Patch(target.Namespace, target.Name, patchType, patch); err != nil {
return err
}
......@@ -585,7 +622,7 @@ func (c *Client) waitForResources(timeout time.Duration, created Result) error {
func waitForJob(e watch.Event, name string) (bool, error) {
o, ok := e.Object.(*batch.Job)
if !ok {
return true, fmt.Errorf("Expected %s to be a *batch.Job, got %T", name, o)
return true, fmt.Errorf("Expected %s to be a *batch.Job, got %T", name, e.Object)
}
for _, c := range o.Status.Conditions {
......
......@@ -169,9 +169,9 @@ func TestUpdate(t *testing.T) {
t.Fatalf("could not dump request: %s", err)
}
req.Body.Close()
expected := `{"spec":{"containers":[{"name":"app:v4","ports":[{"containerPort":443,"name":"https","protocol":"TCP"},{"$patch":"delete","containerPort":80}]}]}}`
expected := `{"spec":{"containers":[{"image":"abc/app:v4","name":"app:v4","ports":[{"containerPort":443,"name":"https"}],"resources":{}}]}}`
if string(data) != expected {
t.Errorf("expected patch %s, got %s", expected, string(data))
t.Errorf("expected patch\n%s\ngot\n%s", expected, string(data))
}
return newResponse(200, &listB.Items[0])
case p == "/namespaces/default/pods" && m == "POST":
......
......@@ -137,6 +137,7 @@ type KubeClient interface {
Update(namespace string, originalReader, modifiedReader io.Reader, recreate bool, timeout int64, shouldWait bool) error
Build(namespace string, reader io.Reader) (kube.Result, error)
BuildUnstructured(namespace string, reader io.Reader) (kube.Result, error)
// WaitAndGetCompletedPodPhase waits up to a timeout until a pod enters a completed phase
// and returns said phase (PodSucceeded or PodFailed qualify)
......@@ -186,6 +187,11 @@ func (p *PrintingKubeClient) Build(ns string, reader io.Reader) (kube.Result, er
return []*resource.Info{}, nil
}
// BuildUnstructured implements KubeClient BuildUnstructured.
func (p *PrintingKubeClient) BuildUnstructured(ns string, reader io.Reader) (kube.Result, error) {
return []*resource.Info{}, nil
}
// WaitAndGetCompletedPodPhase implements KubeClient WaitAndGetCompletedPodPhase
func (p *PrintingKubeClient) WaitAndGetCompletedPodPhase(namespace string, reader io.Reader, timeout time.Duration) (api.PodPhase, error) {
_, err := io.Copy(p.Out, reader)
......
......@@ -57,6 +57,9 @@ func (k *mockKubeClient) WatchUntilReady(ns string, r io.Reader, timeout int64,
func (k *mockKubeClient) Build(ns string, reader io.Reader) (kube.Result, error) {
return []*resource.Info{}, nil
}
func (k *mockKubeClient) BuildUnstructured(ns string, reader io.Reader) (kube.Result, error) {
return []*resource.Info{}, nil
}
func (k *mockKubeClient) WaitAndGetCompletedPodPhase(namespace string, reader io.Reader, timeout time.Duration) (api.PodPhase, error) {
return api.PodUnknown, nil
}
......
......@@ -1047,7 +1047,7 @@ func (s *ReleaseServer) UninstallRelease(c ctx.Context, req *services.UninstallR
func validateManifest(c environment.KubeClient, ns string, manifest []byte) error {
r := bytes.NewReader(manifest)
_, err := c.Build(ns, r)
_, err := c.BuildUnstructured(ns, r)
return err
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment