Merge remote-tracking branch 'origin/develop' into feat/EE-189/EE-248/support-automated-sync-for-stacks
This commit is contained in:
@@ -5,7 +5,7 @@ import (
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/portainer/portainer/api"
|
||||
portainer "github.com/portainer/portainer/api"
|
||||
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
19
api/cmd/portainer/log.go
Normal file
19
api/cmd/portainer/log.go
Normal file
@@ -0,0 +1,19 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func configureLogger() {
|
||||
logger := logrus.New() // logger is to implicitly substitute stdlib's log
|
||||
log.SetOutput(logger.Writer())
|
||||
|
||||
formatter := &logrus.TextFormatter{DisableTimestamp: true, DisableLevelTruncation: true}
|
||||
logger.SetFormatter(formatter)
|
||||
logrus.SetFormatter(formatter)
|
||||
|
||||
logger.SetLevel(logrus.DebugLevel)
|
||||
logrus.SetLevel(logrus.DebugLevel)
|
||||
}
|
||||
@@ -92,8 +92,8 @@ func initSwarmStackManager(assetsPath string, dataStorePath string, signatureSer
|
||||
return exec.NewSwarmStackManager(assetsPath, dataStorePath, signatureService, fileService, reverseTunnelService)
|
||||
}
|
||||
|
||||
func initKubernetesDeployer(dataStore portainer.DataStore, reverseTunnelService portainer.ReverseTunnelService, signatureService portainer.DigitalSignatureService, assetsPath string) portainer.KubernetesDeployer {
|
||||
return exec.NewKubernetesDeployer(dataStore, reverseTunnelService, signatureService, assetsPath)
|
||||
func initKubernetesDeployer(kubernetesTokenCacheManager *kubeproxy.TokenCacheManager, kubernetesClientFactory *kubecli.ClientFactory, dataStore portainer.DataStore, reverseTunnelService portainer.ReverseTunnelService, signatureService portainer.DigitalSignatureService, assetsPath string) portainer.KubernetesDeployer {
|
||||
return exec.NewKubernetesDeployer(kubernetesTokenCacheManager, kubernetesClientFactory, dataStore, reverseTunnelService, signatureService, assetsPath)
|
||||
}
|
||||
|
||||
func initJWTService(dataStore portainer.DataStore) (portainer.JWTService, error) {
|
||||
@@ -405,7 +405,7 @@ func buildServer(flags *portainer.CLIFlags) portainer.Server {
|
||||
|
||||
composeStackManager := initComposeStackManager(*flags.Assets, *flags.Data, reverseTunnelService, proxyManager)
|
||||
|
||||
kubernetesDeployer := initKubernetesDeployer(dataStore, reverseTunnelService, digitalSignatureService, *flags.Assets)
|
||||
kubernetesDeployer := initKubernetesDeployer(kubernetesTokenCacheManager, kubernetesClientFactory, dataStore, reverseTunnelService, digitalSignatureService, *flags.Assets)
|
||||
|
||||
if dataStore.IsNew() {
|
||||
err = updateSettingsFromFlags(dataStore, flags)
|
||||
@@ -506,6 +506,8 @@ func buildServer(flags *portainer.CLIFlags) portainer.Server {
|
||||
func main() {
|
||||
flags := initCLI()
|
||||
|
||||
configureLogger()
|
||||
|
||||
for {
|
||||
server := buildServer(flags)
|
||||
log.Printf("Starting Portainer %s on %s\n", portainer.APIVersion, *flags.Addr)
|
||||
|
||||
@@ -5,6 +5,9 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/portainer/portainer/api/http/proxy/factory/kubernetes"
|
||||
"github.com/portainer/portainer/api/http/security"
|
||||
"github.com/portainer/portainer/api/kubernetes/cli"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
@@ -20,27 +23,64 @@ import (
|
||||
|
||||
// KubernetesDeployer represents a service to deploy resources inside a Kubernetes environment.
|
||||
type KubernetesDeployer struct {
|
||||
binaryPath string
|
||||
dataStore portainer.DataStore
|
||||
reverseTunnelService portainer.ReverseTunnelService
|
||||
signatureService portainer.DigitalSignatureService
|
||||
binaryPath string
|
||||
dataStore portainer.DataStore
|
||||
reverseTunnelService portainer.ReverseTunnelService
|
||||
signatureService portainer.DigitalSignatureService
|
||||
kubernetesClientFactory *cli.ClientFactory
|
||||
kubernetesTokenCacheManager *kubernetes.TokenCacheManager
|
||||
}
|
||||
|
||||
// NewKubernetesDeployer initializes a new KubernetesDeployer service.
|
||||
func NewKubernetesDeployer(datastore portainer.DataStore, reverseTunnelService portainer.ReverseTunnelService, signatureService portainer.DigitalSignatureService, binaryPath string) *KubernetesDeployer {
|
||||
func NewKubernetesDeployer(kubernetesTokenCacheManager *kubernetes.TokenCacheManager, kubernetesClientFactory *cli.ClientFactory, datastore portainer.DataStore, reverseTunnelService portainer.ReverseTunnelService, signatureService portainer.DigitalSignatureService, binaryPath string) *KubernetesDeployer {
|
||||
return &KubernetesDeployer{
|
||||
binaryPath: binaryPath,
|
||||
dataStore: datastore,
|
||||
reverseTunnelService: reverseTunnelService,
|
||||
signatureService: signatureService,
|
||||
binaryPath: binaryPath,
|
||||
dataStore: datastore,
|
||||
reverseTunnelService: reverseTunnelService,
|
||||
signatureService: signatureService,
|
||||
kubernetesClientFactory: kubernetesClientFactory,
|
||||
kubernetesTokenCacheManager: kubernetesTokenCacheManager,
|
||||
}
|
||||
}
|
||||
|
||||
func (deployer *KubernetesDeployer) getToken(request *http.Request, endpoint *portainer.Endpoint, setLocalAdminToken bool) (string, error) {
|
||||
tokenData, err := security.RetrieveTokenData(request)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
kubecli, err := deployer.kubernetesClientFactory.GetKubeClient(endpoint)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
tokenCache := deployer.kubernetesTokenCacheManager.GetOrCreateTokenCache(int(endpoint.ID))
|
||||
|
||||
tokenManager, err := kubernetes.NewTokenManager(kubecli, deployer.dataStore, tokenCache, setLocalAdminToken)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if tokenData.Role == portainer.AdministratorRole {
|
||||
return tokenManager.GetAdminServiceAccountToken(), nil
|
||||
}
|
||||
|
||||
token, err := tokenManager.GetUserServiceAccountToken(int(tokenData.ID), endpoint.ID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if token == "" {
|
||||
return "", fmt.Errorf("can not get a valid user service account token")
|
||||
}
|
||||
return token, nil
|
||||
}
|
||||
|
||||
// Deploy will deploy a Kubernetes manifest inside a specific namespace in a Kubernetes endpoint.
|
||||
// Otherwise it will use kubectl to deploy the manifest.
|
||||
func (deployer *KubernetesDeployer) Deploy(endpoint *portainer.Endpoint, stackConfig string, namespace string) (string, error) {
|
||||
func (deployer *KubernetesDeployer) Deploy(request *http.Request, endpoint *portainer.Endpoint, stackConfig string, namespace string) (string, error) {
|
||||
if endpoint.Type == portainer.KubernetesLocalEnvironment {
|
||||
token, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/token")
|
||||
token, err := deployer.getToken(request, endpoint, true);
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@@ -53,7 +93,7 @@ func (deployer *KubernetesDeployer) Deploy(endpoint *portainer.Endpoint, stackCo
|
||||
args := make([]string, 0)
|
||||
args = append(args, "--server", endpoint.URL)
|
||||
args = append(args, "--insecure-skip-tls-verify")
|
||||
args = append(args, "--token", string(token))
|
||||
args = append(args, "--token", token)
|
||||
args = append(args, "--namespace", namespace)
|
||||
args = append(args, "apply", "-f", "-")
|
||||
|
||||
@@ -139,8 +179,14 @@ func (deployer *KubernetesDeployer) Deploy(endpoint *portainer.Endpoint, stackCo
|
||||
return "", err
|
||||
}
|
||||
|
||||
token, err := deployer.getToken(request, endpoint, false);
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
req.Header.Set(portainer.PortainerAgentPublicKeyHeader, deployer.signatureService.EncodedPublicKey())
|
||||
req.Header.Set(portainer.PortainerAgentSignatureHeader, signature)
|
||||
req.Header.Set(portainer.PortainerAgentKubernetesSATokenHeader, token)
|
||||
|
||||
resp, err := httpCli.Do(req)
|
||||
if err != nil {
|
||||
|
||||
@@ -32,6 +32,7 @@ require (
|
||||
github.com/portainer/libcrypto v0.0.0-20190723020515-23ebe86ab2c2
|
||||
github.com/portainer/libhttp v0.0.0-20190806161843-ba068f58be33
|
||||
github.com/robfig/cron/v3 v3.0.1
|
||||
github.com/sirupsen/logrus v1.8.1
|
||||
github.com/stretchr/testify v1.7.0
|
||||
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2
|
||||
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
|
||||
|
||||
@@ -186,7 +186,6 @@ github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQL
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/koding/websocketproxy v0.0.0-20181220232114-7ed82d81a28c h1:N7A4JCA2G+j5fuFxCsJqjFU/sZe0mj8H0sSoSwbaikw=
|
||||
github.com/koding/websocketproxy v0.0.0-20181220232114-7ed82d81a28c/go.mod h1:Nn5wlyECw3iJrzi0AhIWg+AJUb4PlRQVW4/3XHH1LZA=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
@@ -269,8 +268,9 @@ github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzG
|
||||
github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0=
|
||||
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
|
||||
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
|
||||
github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k=
|
||||
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
|
||||
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
|
||||
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
|
||||
github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
|
||||
github.com/spf13/pflag v0.0.0-20170130214245-9ff6c6923cff/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
|
||||
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
|
||||
|
||||
56
api/http/handler/endpoints/endpoint_association_delete.go
Normal file
56
api/http/handler/endpoints/endpoint_association_delete.go
Normal file
@@ -0,0 +1,56 @@
|
||||
package endpoints
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"net/http"
|
||||
|
||||
httperror "github.com/portainer/libhttp/error"
|
||||
"github.com/portainer/libhttp/request"
|
||||
"github.com/portainer/libhttp/response"
|
||||
portainer "github.com/portainer/portainer/api"
|
||||
bolterrors "github.com/portainer/portainer/api/bolt/errors"
|
||||
)
|
||||
|
||||
// @id EndpointAssociationDelete
|
||||
// @summary De-association an edge endpoint
|
||||
// @description De-association an edge endpoint.
|
||||
// @description **Access policy**: administrator
|
||||
// @security jwt
|
||||
// @tags endpoints
|
||||
// @produce json
|
||||
// @param id path int true "Endpoint identifier"
|
||||
// @success 200 {object} portainer.Endpoint "Success"
|
||||
// @failure 400 "Invalid request"
|
||||
// @failure 404 "Endpoint not found"
|
||||
// @failure 500 "Server error"
|
||||
// @router /api/endpoints/:id/association [put]
|
||||
func (handler *Handler) endpointAssociationDelete(w http.ResponseWriter, r *http.Request) *httperror.HandlerError {
|
||||
endpointID, err := request.RetrieveNumericRouteVariableValue(r, "id")
|
||||
if err != nil {
|
||||
return &httperror.HandlerError{http.StatusBadRequest, "Invalid endpoint identifier route variable", err}
|
||||
}
|
||||
|
||||
endpoint, err := handler.DataStore.Endpoint().Endpoint(portainer.EndpointID(endpointID))
|
||||
if err == bolterrors.ErrObjectNotFound {
|
||||
return &httperror.HandlerError{http.StatusNotFound, "Unable to find an endpoint with the specified identifier inside the database", err}
|
||||
} else if err != nil {
|
||||
return &httperror.HandlerError{http.StatusInternalServerError, "Unable to find an endpoint with the specified identifier inside the database", err}
|
||||
}
|
||||
|
||||
if endpoint.Type != portainer.EdgeAgentOnKubernetesEnvironment && endpoint.Type != portainer.EdgeAgentOnDockerEnvironment {
|
||||
return &httperror.HandlerError{http.StatusBadRequest, "Invalid endpoint type", errors.New("Invalid endpoint type")}
|
||||
}
|
||||
|
||||
endpoint.EdgeID = ""
|
||||
endpoint.Snapshots = []portainer.DockerSnapshot{}
|
||||
endpoint.Kubernetes.Snapshots = []portainer.KubernetesSnapshot{}
|
||||
|
||||
err = handler.DataStore.Endpoint().UpdateEndpoint(portainer.EndpointID(endpointID), endpoint)
|
||||
if err != nil {
|
||||
return &httperror.HandlerError{http.StatusInternalServerError, "Failed persisting endpoint in database", err}
|
||||
}
|
||||
|
||||
handler.ReverseTunnelService.SetTunnelStatusToIdle(endpoint.ID)
|
||||
|
||||
return response.JSON(w, endpoint)
|
||||
}
|
||||
@@ -132,8 +132,18 @@ func getDockerHubLimits(httpClient *client.HTTPClient, token string) (*dockerhub
|
||||
return nil, errors.New("failed fetching dockerhub limits")
|
||||
}
|
||||
|
||||
// An error with rateLimit-Limit or RateLimit-Remaining is likely for dockerhub pro accounts where there is no rate limit.
|
||||
// In that specific case the headers will not be present. Don't bubble up the error as its normal
|
||||
// See: https://docs.docker.com/docker-hub/download-rate-limit/
|
||||
rateLimit, err := parseRateLimitHeader(resp.Header, "RateLimit-Limit")
|
||||
if err != nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
rateLimitRemaining, err := parseRateLimitHeader(resp.Header, "RateLimit-Remaining")
|
||||
if err != nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return &dockerhubStatusResponse{
|
||||
Limit: rateLimit,
|
||||
|
||||
@@ -45,6 +45,8 @@ func NewHandler(bouncer *security.RequestBouncer) *Handler {
|
||||
bouncer.AdminAccess(httperror.LoggerHandler(h.endpointCreate))).Methods(http.MethodPost)
|
||||
h.Handle("/endpoints/{id}/settings",
|
||||
bouncer.AdminAccess(httperror.LoggerHandler(h.endpointSettingsUpdate))).Methods(http.MethodPut)
|
||||
h.Handle("/endpoints/{id}/association",
|
||||
bouncer.AdminAccess(httperror.LoggerHandler(h.endpointAssociationDelete))).Methods(http.MethodDelete)
|
||||
h.Handle("/endpoints/snapshot",
|
||||
bouncer.AdminAccess(httperror.LoggerHandler(h.endpointSnapshots))).Methods(http.MethodPost)
|
||||
h.Handle("/endpoints",
|
||||
|
||||
@@ -95,7 +95,7 @@ func (handler *Handler) createKubernetesStackFromFileContent(w http.ResponseWrit
|
||||
doCleanUp := true
|
||||
defer handler.cleanUp(stack, &doCleanUp)
|
||||
|
||||
output, err := handler.deployKubernetesStack(endpoint, payload.StackFileContent, payload.ComposeFormat, payload.Namespace)
|
||||
output, err := handler.deployKubernetesStack(r, endpoint, payload.StackFileContent, payload.ComposeFormat, payload.Namespace)
|
||||
if err != nil {
|
||||
return &httperror.HandlerError{StatusCode: http.StatusInternalServerError, Message: "Unable to deploy Kubernetes stack", Err: err}
|
||||
}
|
||||
@@ -139,7 +139,7 @@ func (handler *Handler) createKubernetesStackFromGitRepository(w http.ResponseWr
|
||||
return &httperror.HandlerError{StatusCode: http.StatusInternalServerError, Message: "Failed to process manifest from Git repository", Err: err}
|
||||
}
|
||||
|
||||
output, err := handler.deployKubernetesStack(endpoint, stackFileContent, payload.ComposeFormat, payload.Namespace)
|
||||
output, err := handler.deployKubernetesStack(r, endpoint, stackFileContent, payload.ComposeFormat, payload.Namespace)
|
||||
if err != nil {
|
||||
return &httperror.HandlerError{StatusCode: http.StatusInternalServerError, Message: "Unable to deploy Kubernetes stack", Err: err}
|
||||
}
|
||||
@@ -155,7 +155,7 @@ func (handler *Handler) createKubernetesStackFromGitRepository(w http.ResponseWr
|
||||
return response.JSON(w, resp)
|
||||
}
|
||||
|
||||
func (handler *Handler) deployKubernetesStack(endpoint *portainer.Endpoint, stackConfig string, composeFormat bool, namespace string) (string, error) {
|
||||
func (handler *Handler) deployKubernetesStack(request *http.Request, endpoint *portainer.Endpoint, stackConfig string, composeFormat bool, namespace string) (string, error) {
|
||||
handler.stackCreationMutex.Lock()
|
||||
defer handler.stackCreationMutex.Unlock()
|
||||
|
||||
@@ -167,7 +167,7 @@ func (handler *Handler) deployKubernetesStack(endpoint *portainer.Endpoint, stac
|
||||
stackConfig = string(convertedConfig)
|
||||
}
|
||||
|
||||
return handler.KubernetesDeployer.Deploy(endpoint, stackConfig, namespace)
|
||||
return handler.KubernetesDeployer.Deploy(request, endpoint, stackConfig, namespace)
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"github.com/gorilla/websocket"
|
||||
httperror "github.com/portainer/libhttp/error"
|
||||
portainer "github.com/portainer/portainer/api"
|
||||
"github.com/portainer/portainer/api/http/proxy/factory/kubernetes"
|
||||
"github.com/portainer/portainer/api/http/security"
|
||||
"github.com/portainer/portainer/api/kubernetes/cli"
|
||||
)
|
||||
@@ -12,20 +13,22 @@ import (
|
||||
// Handler is the HTTP handler used to handle websocket operations.
|
||||
type Handler struct {
|
||||
*mux.Router
|
||||
DataStore portainer.DataStore
|
||||
SignatureService portainer.DigitalSignatureService
|
||||
ReverseTunnelService portainer.ReverseTunnelService
|
||||
KubernetesClientFactory *cli.ClientFactory
|
||||
requestBouncer *security.RequestBouncer
|
||||
connectionUpgrader websocket.Upgrader
|
||||
DataStore portainer.DataStore
|
||||
SignatureService portainer.DigitalSignatureService
|
||||
ReverseTunnelService portainer.ReverseTunnelService
|
||||
KubernetesClientFactory *cli.ClientFactory
|
||||
requestBouncer *security.RequestBouncer
|
||||
connectionUpgrader websocket.Upgrader
|
||||
kubernetesTokenCacheManager *kubernetes.TokenCacheManager
|
||||
}
|
||||
|
||||
// NewHandler creates a handler to manage websocket operations.
|
||||
func NewHandler(bouncer *security.RequestBouncer) *Handler {
|
||||
func NewHandler(kubernetesTokenCacheManager *kubernetes.TokenCacheManager, bouncer *security.RequestBouncer) *Handler {
|
||||
h := &Handler{
|
||||
Router: mux.NewRouter(),
|
||||
connectionUpgrader: websocket.Upgrader{},
|
||||
requestBouncer: bouncer,
|
||||
kubernetesTokenCacheManager: kubernetesTokenCacheManager,
|
||||
}
|
||||
h.PathPrefix("/websocket/exec").Handler(
|
||||
bouncer.AuthenticatedAccess(httperror.LoggerHandler(h.websocketExec)))
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package websocket
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/portainer/portainer/api/http/security"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
@@ -11,6 +13,7 @@ import (
|
||||
"github.com/portainer/libhttp/request"
|
||||
portainer "github.com/portainer/portainer/api"
|
||||
bolterrors "github.com/portainer/portainer/api/bolt/errors"
|
||||
"github.com/portainer/portainer/api/http/proxy/factory/kubernetes"
|
||||
)
|
||||
|
||||
// @summary Execute a websocket on pod
|
||||
@@ -70,8 +73,14 @@ func (handler *Handler) websocketPodExec(w http.ResponseWriter, r *http.Request)
|
||||
return &httperror.HandlerError{http.StatusForbidden, "Permission denied to access endpoint", err}
|
||||
}
|
||||
|
||||
token, useAdminToken, err := handler.getToken(r, endpoint, false)
|
||||
if err != nil {
|
||||
return &httperror.HandlerError{http.StatusInternalServerError, "Unable to get user service account token", err}
|
||||
}
|
||||
|
||||
params := &webSocketRequestParams{
|
||||
endpoint: endpoint,
|
||||
token: token,
|
||||
}
|
||||
|
||||
r.Header.Del("Origin")
|
||||
@@ -112,7 +121,7 @@ func (handler *Handler) websocketPodExec(w http.ResponseWriter, r *http.Request)
|
||||
return &httperror.HandlerError{http.StatusInternalServerError, "Unable to create Kubernetes client", err}
|
||||
}
|
||||
|
||||
err = cli.StartExecProcess(namespace, podName, containerName, commandArray, stdinReader, stdoutWriter)
|
||||
err = cli.StartExecProcess(token, useAdminToken, namespace, podName, containerName, commandArray, stdinReader, stdoutWriter)
|
||||
if err != nil {
|
||||
return &httperror.HandlerError{http.StatusInternalServerError, "Unable to start exec process inside container", err}
|
||||
}
|
||||
@@ -124,3 +133,37 @@ func (handler *Handler) websocketPodExec(w http.ResponseWriter, r *http.Request)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (handler *Handler) getToken(request *http.Request, endpoint *portainer.Endpoint, setLocalAdminToken bool) (string, bool, error) {
|
||||
tokenData, err := security.RetrieveTokenData(request)
|
||||
if err != nil {
|
||||
return "", false, err
|
||||
}
|
||||
|
||||
kubecli, err := handler.KubernetesClientFactory.GetKubeClient(endpoint)
|
||||
if err != nil {
|
||||
return "", false, err
|
||||
}
|
||||
|
||||
tokenCache := handler.kubernetesTokenCacheManager.GetOrCreateTokenCache(int(endpoint.ID))
|
||||
|
||||
tokenManager, err := kubernetes.NewTokenManager(kubecli, handler.DataStore, tokenCache, setLocalAdminToken)
|
||||
if err != nil {
|
||||
return "", false, err
|
||||
}
|
||||
|
||||
if tokenData.Role == portainer.AdministratorRole {
|
||||
return tokenManager.GetAdminServiceAccountToken(), true, nil
|
||||
}
|
||||
|
||||
token, err := tokenManager.GetUserServiceAccountToken(int(tokenData.ID), endpoint.ID)
|
||||
if err != nil {
|
||||
return "", false, err
|
||||
}
|
||||
|
||||
if token == "" {
|
||||
return "", false, fmt.Errorf("can not get a valid user service account token")
|
||||
}
|
||||
|
||||
return token, false, nil
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@ func (handler *Handler) proxyEdgeAgentWebsocketRequest(w http.ResponseWriter, r
|
||||
|
||||
proxy.Director = func(incoming *http.Request, out http.Header) {
|
||||
out.Set(portainer.PortainerAgentTargetHeader, params.nodeName)
|
||||
out.Set(portainer.PortainerAgentKubernetesSATokenHeader, params.token)
|
||||
}
|
||||
|
||||
handler.ReverseTunnelService.SetTunnelStatusToActive(params.endpoint.ID)
|
||||
@@ -64,6 +65,7 @@ func (handler *Handler) proxyAgentWebsocketRequest(w http.ResponseWriter, r *htt
|
||||
out.Set(portainer.PortainerAgentPublicKeyHeader, handler.SignatureService.EncodedPublicKey())
|
||||
out.Set(portainer.PortainerAgentSignatureHeader, signature)
|
||||
out.Set(portainer.PortainerAgentTargetHeader, params.nodeName)
|
||||
out.Set(portainer.PortainerAgentKubernetesSATokenHeader, params.token)
|
||||
}
|
||||
|
||||
proxy.ServeHTTP(w, r)
|
||||
|
||||
@@ -8,4 +8,5 @@ type webSocketRequestParams struct {
|
||||
ID string
|
||||
nodeName string
|
||||
endpoint *portainer.Endpoint
|
||||
token string
|
||||
}
|
||||
|
||||
@@ -1,10 +1,8 @@
|
||||
package kubernetes
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"sync"
|
||||
|
||||
portainer "github.com/portainer/portainer/api"
|
||||
"io/ioutil"
|
||||
)
|
||||
|
||||
const defaultServiceAccountTokenFile = "/var/run/secrets/kubernetes.io/serviceaccount/token"
|
||||
@@ -13,7 +11,6 @@ type tokenManager struct {
|
||||
tokenCache *tokenCache
|
||||
kubecli portainer.KubeClient
|
||||
dataStore portainer.DataStore
|
||||
mutex sync.Mutex
|
||||
adminToken string
|
||||
}
|
||||
|
||||
@@ -25,7 +22,6 @@ func NewTokenManager(kubecli portainer.KubeClient, dataStore portainer.DataStore
|
||||
tokenCache: cache,
|
||||
kubecli: kubecli,
|
||||
dataStore: dataStore,
|
||||
mutex: sync.Mutex{},
|
||||
adminToken: "",
|
||||
}
|
||||
|
||||
@@ -41,13 +37,13 @@ func NewTokenManager(kubecli portainer.KubeClient, dataStore portainer.DataStore
|
||||
return tokenManager, nil
|
||||
}
|
||||
|
||||
func (manager *tokenManager) getAdminServiceAccountToken() string {
|
||||
func (manager *tokenManager) GetAdminServiceAccountToken() string {
|
||||
return manager.adminToken
|
||||
}
|
||||
|
||||
func (manager *tokenManager) getUserServiceAccountToken(userID int, endpointID portainer.EndpointID) (string, error) {
|
||||
manager.mutex.Lock()
|
||||
defer manager.mutex.Unlock()
|
||||
func (manager *tokenManager) GetUserServiceAccountToken(userID int, endpointID portainer.EndpointID) (string, error) {
|
||||
manager.tokenCache.mutex.Lock()
|
||||
defer manager.tokenCache.mutex.Unlock()
|
||||
|
||||
token, ok := manager.tokenCache.getToken(userID)
|
||||
if !ok {
|
||||
|
||||
@@ -2,6 +2,7 @@ package kubernetes
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"github.com/orcaman/concurrent-map"
|
||||
)
|
||||
@@ -14,6 +15,7 @@ type (
|
||||
|
||||
tokenCache struct {
|
||||
userTokenCache cmap.ConcurrentMap
|
||||
mutex sync.Mutex
|
||||
}
|
||||
)
|
||||
|
||||
@@ -35,6 +37,18 @@ func (manager *TokenCacheManager) CreateTokenCache(endpointID int) *tokenCache {
|
||||
return tokenCache
|
||||
}
|
||||
|
||||
// GetOrCreateTokenCache will get the tokenCache from the manager map of caches if it exists,
|
||||
// otherwise it will create a new tokenCache object, associate it to the manager map of caches
|
||||
// and return a pointer to that tokenCache instance.
|
||||
func (manager *TokenCacheManager) GetOrCreateTokenCache(endpointID int) *tokenCache {
|
||||
key := strconv.Itoa(endpointID)
|
||||
if epCache, ok := manager.tokenCaches.Get(key); ok {
|
||||
return epCache.(*tokenCache)
|
||||
}
|
||||
|
||||
return manager.CreateTokenCache(endpointID)
|
||||
}
|
||||
|
||||
// RemoveUserFromCache will ensure that the specific userID is removed from all registered caches.
|
||||
func (manager *TokenCacheManager) RemoveUserFromCache(userID int) {
|
||||
for cache := range manager.tokenCaches.IterBuffered() {
|
||||
@@ -45,6 +59,7 @@ func (manager *TokenCacheManager) RemoveUserFromCache(userID int) {
|
||||
func newTokenCache() *tokenCache {
|
||||
return &tokenCache{
|
||||
userTokenCache: cmap.New(),
|
||||
mutex: sync.Mutex{},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -110,9 +110,9 @@ func (transport *baseTransport) getRoundTripToken(request *http.Request, tokenMa
|
||||
|
||||
var token string
|
||||
if tokenData.Role == portainer.AdministratorRole {
|
||||
token = tokenManager.getAdminServiceAccountToken()
|
||||
token = tokenManager.GetAdminServiceAccountToken()
|
||||
} else {
|
||||
token, err = tokenManager.getUserServiceAccountToken(int(tokenData.ID), transport.endpoint.ID)
|
||||
token, err = tokenManager.GetUserServiceAccountToken(int(tokenData.ID), transport.endpoint.ID)
|
||||
if err != nil {
|
||||
log.Printf("Failed retrieving service account token: %v", err)
|
||||
return "", err
|
||||
|
||||
@@ -208,7 +208,7 @@ func (server *Server) Start() error {
|
||||
userHandler.DataStore = server.DataStore
|
||||
userHandler.CryptoService = server.CryptoService
|
||||
|
||||
var websocketHandler = websocket.NewHandler(requestBouncer)
|
||||
var websocketHandler = websocket.NewHandler(server.KubernetesTokenCacheManager, requestBouncer)
|
||||
websocketHandler.DataStore = server.DataStore
|
||||
websocketHandler.SignatureService = server.SignatureService
|
||||
websocketHandler.ReverseTunnelService = server.ReverseTunnelService
|
||||
|
||||
@@ -14,13 +14,18 @@ import (
|
||||
// StartExecProcess will start an exec process inside a container located inside a pod inside a specific namespace
|
||||
// using the specified command. The stdin parameter will be bound to the stdin process and the stdout process will write
|
||||
// to the stdout parameter.
|
||||
// This function only works against a local endpoint using an in-cluster config.
|
||||
func (kcl *KubeClient) StartExecProcess(namespace, podName, containerName string, command []string, stdin io.Reader, stdout io.Writer) error {
|
||||
// This function only works against a local endpoint using an in-cluster config with the user's SA token.
|
||||
func (kcl *KubeClient) StartExecProcess(token string, useAdminToken bool, namespace, podName, containerName string, command []string, stdin io.Reader, stdout io.Writer) error {
|
||||
config, err := rest.InClusterConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !useAdminToken {
|
||||
config.BearerToken = token
|
||||
config.BearerTokenFile = ""
|
||||
}
|
||||
|
||||
req := kcl.cli.CoreV1().RESTClient().
|
||||
Post().
|
||||
Resource("pods").
|
||||
|
||||
@@ -2,6 +2,7 @@ package portainer
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
gittypes "github.com/portainer/portainer/api/git/types"
|
||||
@@ -1188,7 +1189,7 @@ type (
|
||||
KubeClient interface {
|
||||
SetupUserServiceAccount(userID int, teamIDs []int, restrictDefaultNamespace bool) error
|
||||
GetServiceAccountBearerToken(userID int) (string, error)
|
||||
StartExecProcess(namespace, podName, containerName string, command []string, stdin io.Reader, stdout io.Writer) error
|
||||
StartExecProcess(token string, useAdminToken bool, namespace, podName, containerName string, command []string, stdin io.Reader, stdout io.Writer) error
|
||||
NamespaceAccessPoliciesDeleteNamespace(namespace string) error
|
||||
GetNamespaceAccessPolicies() (map[string]K8sNamespaceAccessPolicy, error)
|
||||
UpdateNamespaceAccessPolicies(accessPolicies map[string]K8sNamespaceAccessPolicy) error
|
||||
@@ -1199,7 +1200,7 @@ type (
|
||||
|
||||
// KubernetesDeployer represents a service to deploy a manifest inside a Kubernetes endpoint
|
||||
KubernetesDeployer interface {
|
||||
Deploy(endpoint *Endpoint, data string, namespace string) (string, error)
|
||||
Deploy(request *http.Request, endpoint *Endpoint, data string, namespace string) (string, error)
|
||||
ConvertCompose(data string) ([]byte, error)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user