This repository has been archived on 2023-07-11. You can view files and clone it, but cannot push or open issues or pull requests.
vault-sidekick/vault.go
Rohith 9d02d7e843 - extracting the options completelt
- passing all the options into the write, effectivly allowing you to pass all options
- fixed the format of the content
- added a changelog
- shifting to version v0.0.6
2016-03-16 13:07:54 +00:00

440 lines
14 KiB
Go

/*
Copyright 2015 Home Office All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"net"
"net/http"
"time"
"github.com/golang/glog"
"github.com/hashicorp/vault/api"
"strings"
)
const (
// VaultAuth the method to use when authenticating to vault
VaultAuth = "method"
)
// AuthInterface is the authentication interface
type AuthInterface interface {
// Create and handle renewals of the token
Create(map[string]string) (string, error)
}
// VaultService is the main interface into the vault API - placing into a structure
// allows one to easily mock it and two to simplify the interface for us
type VaultService struct {
vaultURL string
// the vault client
client *api.Client
// the vault config
config *api.Config
// the token to authenticate with
token string
// the listener channel - technically we only have the one listener but there a long term reasons for adding this
listeners []chan VaultEvent
// a channel to inform of a new resource to processor
resourceChannel chan *watchedResource
}
// VaultEvent is the definition which captures a change
type VaultEvent struct {
// the resource this relates to
Resource *VaultResource
// the secret associated
Secret map[string]interface{}
}
// NewVaultService creates a new implementation to speak to vault and retrieve the resources
// url : the url of the vault service
func NewVaultService(url string) (*VaultService, error) {
var err error
// step: create the config for client
service := new(VaultService)
service.vaultURL = url
service.listeners = make([]chan VaultEvent, 0)
// step: create the service processor channels
service.resourceChannel = make(chan *watchedResource, 20)
// step: retrieve a vault client
service.client, err = newVaultClient(&options)
if err != nil {
return nil, err
}
// step: start the service processor off
service.vaultServiceProcessor()
return service, nil
}
// AddListener ... add a listener to the events listeners
func (r *VaultService) AddListener(ch chan VaultEvent) {
glog.V(10).Infof("adding the listener: %v", ch)
r.listeners = append(r.listeners, ch)
}
// Watch adds a watch on a resource and inform, renew which required and inform us when
// the resource is ready
func (r VaultService) Watch(rn *VaultResource) {
r.resourceChannel <- &watchedResource{resource: rn}
}
// vaultServiceProcessor is the background routine responsible for retrieving the resources, renewing when required and
// informing those who are watching the resource that something has changed
func (r *VaultService) vaultServiceProcessor() {
go func() {
// a list of resource being watched
var items []*watchedResource
// the channel to receive renewal notifications on
renewChannel := make(chan *watchedResource, 10)
retrieveChannel := make(chan *watchedResource, 10)
revokeChannel := make(chan *watchedResource, 10)
statsChannel := time.NewTicker(options.statsInterval)
for {
select {
// A new resource is being added to the service processor;
// - schedule the resource for retrieval
case x := <-r.resourceChannel:
glog.V(4).Infof("adding a resource into the service processor, resource: %s", x.resource)
// step: add to the list of resources
items = append(items, x)
// step: push into the retrieval channel
r.scheduleNow(x, retrieveChannel)
// Retrieve a resource from vault
// - we retrieve the resource from vault
// - if we error attempting to retrieve the secret, we background and reschedule an attempt to add it
// - if ok, we grab the lease it and lease time, we setup a notification on renewal
case x := <-retrieveChannel:
// step: save the current lease if we have one
leaseID := ""
if x.secret != nil && x.secret.LeaseID != "" {
leaseID = x.secret.LeaseID
glog.V(10).Infof("resource: %s has a previous lease: %s", x.resource, leaseID)
}
// step: retrieve the resource from vault
err := r.get(x)
if err != nil {
glog.Errorf("failed to retrieve the resource: %s from vault, error: %s", x.resource, err)
// reschedule the attempt for later
r.scheduleIn(x, retrieveChannel, getDurationWithin(3, 10))
break
}
glog.V(4).Infof("successfully retrieved resournce: %s, leaseID: %s", x.resource, x.secret.LeaseID)
// step: if we had a previous lease and the option is to revoke, lets throw into the revoke channel
if leaseID != "" && x.resource.revoked {
// step: make a rough copy
copy := &watchedResource{
secret: &api.Secret{
LeaseID: x.secret.LeaseID,
},
}
r.scheduleIn(copy, revokeChannel, x.resource.revokeDelay)
}
// step: setup a timer for renewal
x.notifyOnRenewal(renewChannel)
// step: update the upstream consumers
r.upstream(x)
// A watched resource is coming up for renewal
// - we attempt to renew the resource from vault
// - if we encounter an error, we reschedule the attempt for the future
// - if we're ok, we update the watchedResource and we send a notification of the change upstream
case x := <-renewChannel:
glog.V(4).Infof("resource: %s, lease: %s up for renewal, renewable: %t, revoked: %t", x.resource,
x.secret.LeaseID, x.resource.renewable, x.resource.revoked)
// step: we need to check if the lease has expired?
if time.Now().Before(x.leaseExpireTime) {
glog.V(3).Infof("the lease on resource: %s has expired, we need to get a new lease", x.resource)
// push into the retrieval channel and break
r.scheduleNow(x, retrieveChannel)
break
}
// step: are we renewing the resource?
if x.resource.renewable {
// step: is the underlining resource even renewable? - otherwise we can just grab a new lease
if !x.secret.Renewable {
glog.V(10).Infof("the resource: %s is not renewable, retrieving a new lease instead", x.resource)
r.scheduleNow(x, retrieveChannel)
break
}
// step: lets renew the resource
err := r.renew(x)
if err != nil {
glog.Errorf("failed to renew the resounce: %s for renewal, error: %s", x.resource, err)
// reschedule the attempt for later
r.scheduleIn(x, renewChannel, getDurationWithin(3, 10))
break
}
}
// step: the option for this resource is not to renew the secret but regenerate a new secret
if !x.resource.renewable {
glog.V(4).Infof("resource: %s flagged as not renewable, shifting to regenerating the resource", x.resource)
r.scheduleNow(x, retrieveChannel)
break
}
// step: setup a timer for renewal
x.notifyOnRenewal(renewChannel)
// step: update any listener upstream
r.upstream(x)
// We receive a lease ID along on the channel, just revoke the lease when you can
case x := <-revokeChannel:
err := r.revoke(x.secret.LeaseID)
if err != nil {
glog.Errorf("failed to revoke the lease: %s, error: %s", x.secret.LeaseID, err)
}
// The statistics timer has gone off; we iterate the watched items and
case <-statsChannel.C:
glog.V(3).Infof("stats: %d resources being watched", len(items))
for _, item := range items {
glog.V(3).Infof("resourse: %s, lease id: %s, renewal in: %s seconds, expiration: %s",
item.resource, item.secret.LeaseID, item.renewalTime, item.leaseExpireTime)
}
}
}
}()
}
// scheduleNow ... a helper method to perform an immediate reschedule into a channel
// rn : a pointer to the watched resource you wish to reschedule
// ch : the channel the resource should be placed into
func (r VaultService) scheduleNow(rn *watchedResource, ch chan *watchedResource) {
r.scheduleIn(rn, ch, time.Duration(0))
}
// scheduleIn ... schedules an event back into a channel after n seconds
// rn : a referrence some reason you wish to pass
// ch : the channel the resource should be placed into
// min : the minimum amount of time i'm willing to wait
// max : the maximum amount of time i'm willing to wait
func (r VaultService) scheduleIn(rn *watchedResource, ch chan *watchedResource, duration time.Duration) {
go func(x *watchedResource) {
glog.V(3).Infof("rescheduling the resource: %s, channel: %v", rn.resource, ch)
// step: are we doing a random wait?
if duration > 0 {
<-time.After(duration)
}
ch <- x
}(rn)
}
// upstream ... the resource has changed thus we notify the upstream listener
// item : the item which has changed
func (r VaultService) upstream(item *watchedResource) {
// step: chunk this into a go-routine not to block us
for _, listener := range r.listeners {
go func(ch chan VaultEvent) {
ch <- VaultEvent{
Resource: item.resource,
Secret: item.secret.Data,
}
}(listener)
}
}
// renew attempts to renew the lease on a resource
// rn : the resource we wish to renew the lease on
func (r VaultService) renew(rn *watchedResource) error {
glog.V(4).Infof("attempting to renew the lease: %s on resource: %s", rn.secret.LeaseID, rn.resource)
// step: check the resource is renewable
if !rn.secret.Renewable {
return fmt.Errorf("the resource: %s is not renewable", rn.resource)
}
secret, err := r.client.Sys().Renew(rn.secret.LeaseID, 0)
if err != nil {
return err
}
// step: update the resource
rn.lastUpdated = time.Now()
rn.leaseExpireTime = rn.lastUpdated.Add(time.Duration(secret.LeaseDuration))
glog.V(3).Infof("renewed resource: %s, leaseId: %s, lease_time: %s, expiration: %s",
rn.resource, rn.secret.LeaseID, rn.secret.LeaseID, rn.leaseExpireTime)
return nil
}
// revoke attempts to revoke the lease of a resource
// lease : the lease lease which was given when you got it
func (r VaultService) revoke(lease string) error {
glog.V(3).Infof("attemping to revoking the lease: %s", lease)
err := r.client.Sys().Revoke(lease)
if err != nil {
return err
}
glog.V(3).Infof("successfully revoked the leaseId: %s", lease)
return nil
}
// get retrieves a secret from the vault
// rn : the watched resource
func (r VaultService) get(rn *watchedResource) (err error) {
var secret *api.Secret
// step: not sure who to cast map[string]string to map[string]interface{} doesn't like it anyway i try and do it
params := make(map[string]interface{}, 0)
for k, v := range rn.resource.options {
params[k] = interface{}(v)
}
glog.V(10).Infof("get path: %s, params: %v", rn.resource.path, params)
glog.V(5).Infof("attempting to retrieve the resource: %s from vault", rn.resource)
// step: perform a request to vault
switch rn.resource.resource {
case "pki":
secret, err = r.client.Logical().Write(fmt.Sprintf(rn.resource.path), params)
case "transit":
secret, err = r.client.Logical().Write(fmt.Sprintf(rn.resource.path), params)
case "aws":
fallthrough
case "cubbyhole":
fallthrough
case "mysql":
fallthrough
case "postgres":
fallthrough
case "secret":
secret, err = r.client.Logical().Read(rn.resource.path)
}
// step: check the error if any
if err != nil {
if strings.Contains(err.Error(), "missing client token") {
// decision: until the rewrite, lets just exit for now
glog.Fatalf("the vault token is no longer valid, exitting, error: %s", err)
}
return err
}
if secret == nil && err != nil {
return fmt.Errorf("the resource does not exist")
}
if secret == nil {
return fmt.Errorf("unable to retrieve the secret")
}
// step: update the watched resource
rn.lastUpdated = time.Now()
rn.secret = secret
rn.leaseExpireTime = rn.lastUpdated.Add(time.Duration(secret.LeaseDuration))
glog.V(3).Infof("retrieved resource: %s, leaseId: %s, lease_time: %s",
rn.resource, rn.secret.LeaseID, time.Duration(rn.secret.LeaseDuration)*time.Second)
return err
}
// newVaultClient creates and authenticates a vault client
func newVaultClient(opts *config) (*api.Client, error) {
var err error
var token string
config := api.DefaultConfig()
config.Address = opts.vaultURL
config.HttpClient.Transport, err = buildHTTPTransport(opts)
if err != nil {
return nil, err
}
// step: create the actual client
client, err := api.NewClient(config)
if err != nil {
return nil, err
}
plugin, _ := opts.vaultAuthOptions[VaultAuth]
switch plugin {
case "userpass":
token, err = NewUserPassPlugin(client).Create(opts.vaultAuthOptions)
case "token":
opts.vaultAuthOptions["filename"] = options.vaultAuthFile
token, err = NewUserTokenPlugin(client).Create(opts.vaultAuthOptions)
default:
return nil, fmt.Errorf("unsupported authentication plugin: %s", plugin)
}
if err != nil {
return nil, err
}
// step: set the token for the client
client.SetToken(token)
return client, nil
}
// buildHTTPTransport constructs a http transport for the http client
func buildHTTPTransport(opts *config) (*http.Transport, error) {
// step: create the vault sidekick
transport := &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 10 * time.Second,
KeepAlive: 10 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
}
// step: are we skip the tls verify?
if options.tlsVerify {
transport.TLSClientConfig = &tls.Config{
InsecureSkipVerify: true,
}
}
// step: are we loading a CA file
if opts.vaultCaFile != "" {
// step: load the ca file
caCert, err := ioutil.ReadFile(opts.vaultCaFile)
if err != nil {
return nil, fmt.Errorf("unable to read in the ca: %s, reason: %s", opts.vaultCaFile, err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
// step: add the ca to the root
transport.TLSClientConfig.RootCAs = caCertPool
}
return transport, nil
}