Commit 1c9ae557 authored by Maciej Kwiek's avatar Maciej Kwiek Committed by Maciej Kwiek

Basic Rudder Delete implementation

Extracted delete specific code from ReleaseServer to external function
which is called from both Local and Remote ReleaseModules.

Made getVersionSet function from tiller package exported.
parent 5937e1a2
......@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package main // import "k8s.io/helm/cmd/rudder"
package main
import (
"bytes"
......@@ -24,16 +24,26 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/helm/pkg/kube"
rudderAPI "k8s.io/helm/pkg/proto/hapi/rudder"
"k8s.io/helm/pkg/rudder"
"k8s.io/helm/pkg/tiller"
"k8s.io/helm/pkg/version"
)
var kubeClient = kube.New(nil)
var kubeClient *kube.Client
var clientset internalclientset.Interface
func main() {
var err error
kubeClient = kube.New(nil)
clientset, err = kubeClient.ClientSet()
if err != nil {
grpclog.Fatalf("Cannot initialize Kubernetes connection: %s", err)
}
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", rudder.GrpcPort))
if err != nil {
grpclog.Fatalf("failed to listen: %v", err)
......@@ -64,18 +74,42 @@ func (r *ReleaseModuleServiceServer) InstallRelease(ctx context.Context, in *rud
b := bytes.NewBufferString(in.Release.Manifest)
err := kubeClient.Create(in.Release.Namespace, b, 500, false)
if err != nil {
grpclog.Printf("error when creating release: %s", err)
grpclog.Printf("error when creating release: %v", err)
}
return &rudderAPI.InstallReleaseResponse{}, err
}
// DeleteRelease is not implemented
// DeleteRelease deletes a provided release
func (r *ReleaseModuleServiceServer) DeleteRelease(ctx context.Context, in *rudderAPI.DeleteReleaseRequest) (*rudderAPI.DeleteReleaseResponse, error) {
grpclog.Print("delete")
return nil, nil
resp := &rudderAPI.DeleteReleaseResponse{}
rel := in.Release
vs, err := tiller.GetVersionSet(clientset.Discovery())
if err != nil {
return resp, fmt.Errorf("Could not get apiVersions from Kubernetes: %v", err)
}
kept, errs := tiller.DeleteRelease(rel, vs, kubeClient)
rel.Manifest = kept
allErrors := ""
for _, e := range errs {
allErrors = allErrors + "\n" + e.Error()
}
if len(allErrors) > 0 {
err = fmt.Errorf(allErrors)
} else {
err = nil
}
return &rudderAPI.DeleteReleaseResponse{
Release: rel,
}, err
}
// RollbackRelease is not implemented
// RollbackRelease rolls back the release
func (r *ReleaseModuleServiceServer) RollbackRelease(ctx context.Context, in *rudderAPI.RollbackReleaseRequest) (*rudderAPI.RollbackReleaseResponse, error) {
grpclog.Print("rollback")
c := bytes.NewBufferString(in.Current.Manifest)
......
......@@ -1342,7 +1342,6 @@ var _ReleaseService_serviceDesc = grpc.ServiceDesc{
func init() { proto.RegisterFile("hapi/services/tiller.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
<<<<<<< b2afe5ec3cb08ec6ff5ca4200bd954d33154c948
// 1170 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x57, 0xdb, 0x6e, 0xe3, 0x44,
0x18, 0xae, 0xe3, 0x1c, 0xff, 0x1e, 0x48, 0xa7, 0x27, 0xd7, 0x02, 0x54, 0x8c, 0xa0, 0xd9, 0x85,
......
......@@ -77,3 +77,15 @@ func ReleaseStatus(req *rudderAPI.ReleaseStatusRequest) (*rudderAPI.ReleaseStatu
client := rudderAPI.NewReleaseModuleServiceClient(conn)
return client.ReleaseStatus(context.Background(), req)
}
// DeleteRelease calls Rudder DeleteRelease method which should uninstall provided release
func DeleteRelease(rel *rudderAPI.DeleteReleaseRequest) (*rudderAPI.DeleteReleaseResponse, error) {
conn, err := grpc.Dial(grpcAddr, grpc.WithInsecure())
if err != nil {
return nil, err
}
defer conn.Close()
client := rudderAPI.NewReleaseModuleServiceClient(conn)
return client.DeleteRelease(context.Background(), rel)
}
......@@ -18,10 +18,19 @@ package tiller
import (
"bytes"
"errors"
"fmt"
"log"
"strings"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/helm/pkg/chartutil"
"k8s.io/helm/pkg/kube"
"k8s.io/helm/pkg/proto/hapi/release"
rudderAPI "k8s.io/helm/pkg/proto/hapi/rudder"
"k8s.io/helm/pkg/proto/hapi/services"
relutil "k8s.io/helm/pkg/releaseutil"
"k8s.io/helm/pkg/rudder"
"k8s.io/helm/pkg/tiller/environment"
)
......@@ -32,10 +41,13 @@ type ReleaseModule interface {
Update(current, target *release.Release, req *services.UpdateReleaseRequest, env *environment.Environment) error
Rollback(current, target *release.Release, req *services.RollbackReleaseRequest, env *environment.Environment) error
Status(r *release.Release, req *services.GetReleaseStatusRequest, env *environment.Environment) (string, error)
Delete(r *release.Release, req *services.UninstallReleaseRequest, env *environment.Environment) (string, []error)
}
// LocalReleaseModule is a local implementation of ReleaseModule
type LocalReleaseModule struct{}
type LocalReleaseModule struct {
clientset internalclientset.Interface
}
// Create creates a release via kubeclient from provided environment
func (m *LocalReleaseModule) Create(r *release.Release, req *services.InstallReleaseRequest, env *environment.Environment) error {
......@@ -59,6 +71,15 @@ func (m *LocalReleaseModule) Status(r *release.Release, req *services.GetRelease
return env.KubeClient.Get(r.Namespace, bytes.NewBufferString(r.Manifest))
}
// Delete deletes the release and returns manifests that were kept in the deletion process
func (m *LocalReleaseModule) Delete(rel *release.Release, req *services.UninstallReleaseRequest, env *environment.Environment) (kept string, errs []error) {
vs, err := GetVersionSet(m.clientset.Discovery())
if err != nil {
return rel.Manifest, []error{fmt.Errorf("Could not get apiVersions from Kubernetes: %v", err)}
}
return DeleteRelease(rel, vs, env.KubeClient)
}
// RemoteReleaseModule is a ReleaseModule which calls Rudder service to operate on a release
type RemoteReleaseModule struct{}
......@@ -100,3 +121,48 @@ func (m *RemoteReleaseModule) Status(r *release.Release, req *services.GetReleas
resp, err := rudder.ReleaseStatus(statusRequest)
return resp.Info.Status.Resources, err
}
// Delete calls rudder.DeleteRelease
func (m *RemoteReleaseModule) Delete(r *release.Release, req *services.UninstallReleaseRequest, env *environment.Environment) (string, []error) {
deleteRequest := &rudderAPI.DeleteReleaseRequest{Release: r}
resp, err := rudder.DeleteRelease(deleteRequest)
if err != nil {
return resp.Release.Manifest, []error{err}
}
return resp.Release.Manifest, []error{}
}
// DeleteRelease is a helper that allows Rudder to delete a release without exposing most of Tiller inner functions
func DeleteRelease(rel *release.Release, vs chartutil.VersionSet, kubeClient environment.KubeClient) (kept string, errs []error) {
manifests := relutil.SplitManifests(rel.Manifest)
_, files, err := sortManifests(manifests, vs, UninstallOrder)
if err != nil {
// We could instead just delete everything in no particular order.
// FIXME: One way to delete at this point would be to try a label-based
// deletion. The problem with this is that we could get a false positive
// and delete something that was not legitimately part of this release.
return rel.Manifest, []error{fmt.Errorf("corrupted release record. You must manually delete the resources: %s", err)}
}
filesToKeep, filesToDelete := filterManifestsToKeep(files)
if len(filesToKeep) > 0 {
kept = summarizeKeptManifests(filesToKeep)
}
errs = []error{}
for _, file := range filesToDelete {
b := bytes.NewBufferString(strings.TrimSpace(file.content))
if b.Len() == 0 {
continue
}
if err := kubeClient.Delete(rel.Namespace, b); err != nil {
log.Printf("uninstall: Failed deletion of %q: %s", rel.Name, err)
if err == kube.ErrNoObjectsVisited {
// Rewrite the message from "no objects visited"
err = errors.New("object not found, skipping delete")
}
errs = append(errs, err)
}
}
return kept, errs
}
......@@ -33,7 +33,6 @@ import (
"k8s.io/helm/pkg/chartutil"
"k8s.io/helm/pkg/hooks"
"k8s.io/helm/pkg/kube"
"k8s.io/helm/pkg/proto/hapi/chart"
"k8s.io/helm/pkg/proto/hapi/release"
"k8s.io/helm/pkg/proto/hapi/services"
......@@ -93,7 +92,9 @@ func NewReleaseServer(env *environment.Environment, clientset internalclientset.
if useRemote {
releaseModule = &RemoteReleaseModule{}
} else {
releaseModule = &LocalReleaseModule{}
releaseModule = &LocalReleaseModule{
clientset: clientset,
}
}
return &ReleaseServer{
......@@ -681,7 +682,7 @@ func capabilities(disc discovery.DiscoveryInterface) (*chartutil.Capabilities, e
if err != nil {
return nil, err
}
vs, err := getVersionSet(disc)
vs, err := GetVersionSet(disc)
if err != nil {
return nil, fmt.Errorf("Could not get apiVersions from Kubernetes: %s", err)
}
......@@ -769,7 +770,8 @@ func (s *ReleaseServer) prepareRelease(req *services.InstallReleaseRequest) (*re
return rel, err
}
func getVersionSet(client discovery.ServerGroupsInterface) (chartutil.VersionSet, error) {
// GetVersionSet retrieves a set of available k8s API versions
func GetVersionSet(client discovery.ServerGroupsInterface) (chartutil.VersionSet, error) {
groups, err := client.ServerGroups()
if err != nil {
return chartutil.DefaultVersionSet, err
......@@ -1050,47 +1052,19 @@ func (s *ReleaseServer) UninstallRelease(c ctx.Context, req *services.UninstallR
}
}
vs, err := getVersionSet(s.clientset.Discovery())
if err != nil {
return nil, fmt.Errorf("Could not get apiVersions from Kubernetes: %s", err)
}
// From here on out, the release is currently considered to be in Status_DELETING
// state.
if err := s.env.Releases.Update(rel); err != nil {
log.Printf("uninstall: Failed to store updated release: %s", err)
}
manifests := relutil.SplitManifests(rel.Manifest)
_, files, err := sortManifests(manifests, vs, UninstallOrder)
if err != nil {
// We could instead just delete everything in no particular order.
// FIXME: One way to delete at this point would be to try a label-based
// deletion. The problem with this is that we could get a false positive
// and delete something that was not legitimately part of this release.
return nil, fmt.Errorf("corrupted release record. You must manually delete the resources: %s", err)
}
kept, errs := s.ReleaseModule.Delete(rel, req, s.env)
res.Info = kept
filesToKeep, filesToDelete := filterManifestsToKeep(files)
if len(filesToKeep) > 0 {
res.Info = summarizeKeptManifests(filesToKeep)
}
// Collect the errors, and return them later.
es := []string{}
for _, file := range filesToDelete {
b := bytes.NewBufferString(strings.TrimSpace(file.content))
if b.Len() == 0 {
continue
}
if err := s.env.KubeClient.Delete(rel.Namespace, b); err != nil {
log.Printf("uninstall: Failed deletion of %q: %s", req.Name, err)
if err == kube.ErrNoObjectsVisited {
// Rewrite the message from "no objects visited"
err = errors.New("object not found, skipping delete")
}
es = append(es, err.Error())
}
es := make([]string, 0, len(errs))
for _, e := range errs {
log.Printf("error: %v", e)
es = append(es, e.Error())
}
if !req.DisableHooks {
......
......@@ -98,10 +98,13 @@ data:
`
func rsFixture() *ReleaseServer {
clientset := fake.NewSimpleClientset()
return &ReleaseServer{
ReleaseModule: &LocalReleaseModule{},
env: MockEnvironment(),
clientset: fake.NewSimpleClientset(),
ReleaseModule: &LocalReleaseModule{
clientset: clientset,
},
env: MockEnvironment(),
clientset: clientset,
}
}
......@@ -207,7 +210,7 @@ func TestValidName(t *testing.T) {
func TestGetVersionSet(t *testing.T) {
rs := rsFixture()
vs, err := getVersionSet(rs.clientset.Discovery())
vs, err := GetVersionSet(rs.clientset.Discovery())
if err != nil {
t.Error(err)
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment