cloudstack-k8s-upgrader/pkg/cloudstack/cloudstack.go

417 lines
13 KiB
Go

// Package cloudstack provides utilities for interacting with a CloudStack
// management server.
package cloudstack
/*
* See https://cloudstack.apache.org/api/apidocs-4.16/ for API docs
*/
import (
"bytes"
"crypto/hmac"
"crypto/sha1"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"sort"
"strings"
"time"
"github.com/rs/zerolog/log"
"git.csclub.uwaterloo.ca/cloud/cloudstack-k8s-upgrader/pkg/config"
)
// A CloudstackClient interacts with a CloudStack management server via the
// REST API.
type CloudstackClient struct {
cfg *config.Config
}
// New returns a CloudStack client with the given config values.
func New(cfg *config.Config) *CloudstackClient {
return &CloudstackClient{cfg: cfg}
}
// CloudStack uses java.net.URLEncoder when calculating signatures:
// https://github.com/apache/cloudstack/blob/main/server/src/main/java/com/cloud/api/ApiServer.java
// Unfortunately java.net.URLEncoder transforms "~" into "%7E".
// This is a bug: https://bugs.openjdk.org/browse/JDK-8204530
// So, we need to match Java's behaviour so that the server does not
// reject our signature.
// TODO: file a GitHub issue in the cloudmonkey repo
func urlQueryEscape(value string) string {
return strings.ReplaceAll(url.QueryEscape(value), "~", "%7E")
}
// Adapted from https://github.com/apache/cloudstack-cloudmonkey/blob/main/cmd/network.go
func encodeRequestParams(params map[string]string) string {
if params == nil {
return ""
}
keys := make([]string, 0, len(params))
for key := range params {
keys = append(keys, key)
}
sort.Strings(keys)
var buf bytes.Buffer
for _, key := range keys {
value := params[key]
if buf.Len() > 0 {
buf.WriteByte('&')
}
buf.WriteString(key)
buf.WriteString("=")
buf.WriteString(urlQueryEscape(value))
}
return buf.String()
}
// See http://docs.cloudstack.apache.org/en/4.16.0.0/developersguide/dev.html#how-to-sign-an-api-call-with-python
func (client *CloudstackClient) createURL(params map[string]string) string {
cfg := client.cfg
params["apiKey"] = cfg.CloudstackApiKey
params["response"] = "json"
// adapted from https://github.com/apache/cloudstack-cloudmonkey/blob/main/cmd/network.go
encodedParams := encodeRequestParams(params)
mac := hmac.New(sha1.New, []byte(cfg.CloudstackSecretKey))
mac.Write([]byte(strings.Replace(strings.ToLower(encodedParams), "+", "%20", -1)))
signature := base64.StdEncoding.EncodeToString(mac.Sum(nil))
encodedParams += fmt.Sprintf("&signature=%s", urlQueryEscape(signature))
return fmt.Sprintf("%s?%s", cfg.CloudstackApiBaseUrl, encodedParams)
}
func getDeserializedResponse(url string, unmarshaledValue interface{}) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
panic(err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()
buf, err := io.ReadAll(resp.Body)
if err != nil {
panic(err)
}
if err = json.Unmarshal(buf, unmarshaledValue); err != nil {
panic(err)
}
}
type ErrorInfo struct {
ErrorCode int `json:"errorcode"`
ErrorText string `json:"errortext"`
}
func checkErrorInfo(data *ErrorInfo) {
if data.ErrorCode != 0 {
panic(fmt.Sprintf("Error %d: %s", data.ErrorCode, data.ErrorText))
}
}
// See https://cloudstack.apache.org/api/apidocs-4.16/apis/listKubernetesSupportedVersions.html
type KubernetesSupportedVersionResponse struct {
Id string `json:"id"`
IsoId string `json:"isoid"`
// e.g. "1.22.2-Kubernetes-Binaries-ISO"
IsoName string `json:"isoname"`
// e.g. "Ready"
IsoState string `json:"isostate"`
MinCpuNumber int `json:"mincpunumber"`
MinMemory int `json:"minmemory"`
Name string `json:"name"`
// e.g. "1.22.2"
SemanticVersion string `json:"semanticversion"`
// e.g. "Enabled"
State string `json:"state"`
SupportsAutoscaling bool `json:"supportsautoscaling"`
SupportsHA bool `json:"supportsha"`
}
type ListKubernetesSupportedVersionsResponse struct {
ErrorInfo
Count int `json:"count"`
KubernetesSupportedVersion []KubernetesSupportedVersionResponse `json:"kubernetessupportedversion"`
}
func (client *CloudstackClient) listKubernetesSupportedVersionsById(id *string) *ListKubernetesSupportedVersionsResponse {
params := map[string]string{"command": "listKubernetesSupportedVersions"}
if id != nil {
params["id"] = *id
}
url := client.createURL(params)
responseWrapper := struct {
Response ListKubernetesSupportedVersionsResponse `json:"listkubernetessupportedversionsresponse"`
}{}
getDeserializedResponse(url, &responseWrapper)
data := &responseWrapper.Response
checkErrorInfo(&data.ErrorInfo)
return data
}
func (client *CloudstackClient) ListKubernetesSupportedVersions() *ListKubernetesSupportedVersionsResponse {
return client.listKubernetesSupportedVersionsById(nil)
}
type AddKubernetesSupportedVersionResponse struct {
ErrorInfo
KubernetesSupportedVersion KubernetesSupportedVersionResponse `json:"kubernetessupportedversion"`
}
func (client *CloudstackClient) AddKubernetesSupportedVersion(semver string, downloadURL string) *KubernetesSupportedVersionResponse {
url := client.createURL(map[string]string{
"command": "addKubernetesSupportedVersion",
"mincpunumber": "2",
"minmemory": "2048",
"semanticversion": semver,
"name": semver,
"url": downloadURL,
})
responseWrapper := struct {
Response AddKubernetesSupportedVersionResponse `json:"addkubernetessupportedversionresponse"`
}{}
getDeserializedResponse(url, &responseWrapper)
data := &responseWrapper.Response
checkErrorInfo(&data.ErrorInfo)
isoState := data.KubernetesSupportedVersion.IsoState
isoID := data.KubernetesSupportedVersion.Id
const maxTimeToWait = 10 * time.Minute
const retryInterval = 20 * time.Second
const maxTries = int(maxTimeToWait / retryInterval)
for i := 0; i < maxTries; i++ {
log.Debug().
Str("id", isoID).
Str("isoState", isoState).
Msg("Waiting for ISO to become ready...")
if isoState == "Ready" {
break
}
time.Sleep(retryInterval)
versionsById := client.listKubernetesSupportedVersionsById(&isoID)
if versionsById.Count != 1 {
panic(fmt.Sprintf("Expected to get 1 version for ISO, got %d", versionsById.Count))
}
isoState = versionsById.KubernetesSupportedVersion[0].IsoState
}
if isoState != "Ready" {
panic("Timed out waiting for ISO to become ready")
}
return &data.KubernetesSupportedVersion
}
type ListDomainsResponse struct {
ErrorInfo
Count int `json:"count"`
Domain []struct {
Name string `json:"name"`
Id string `json:"id"`
} `json:"domain"`
}
func (client *CloudstackClient) GetDomainId(domainName string) string {
url := client.createURL(map[string]string{
"command": "listDomains",
"details": "min",
"name": domainName,
})
responseWrapper := struct {
Response ListDomainsResponse `json:"listdomainsresponse"`
}{}
getDeserializedResponse(url, &responseWrapper)
data := &responseWrapper.Response
checkErrorInfo(&data.ErrorInfo)
if data.Count != 1 {
panic("there should be one domain found")
}
return data.Domain[0].Id
}
// See https://cloudstack.apache.org/api/apidocs-4.16/apis/listKubernetesClusters.html
// Fields in which we are not interested have been omitted
type KubernetesClusterResponse struct {
Id string `json:"id"`
Account string `json:"account"`
Domain string `json:"string"`
DomainId string `json:"domainid"`
Endpoint string `json:"endpoint"`
KubernetesVersionId string `json:"kubernetesversionid"`
KubernetesVersionName string `json:"kubernetesversionname"`
Name string `json:"name"`
ServiceOfferingId string `json:"serviceofferingid"`
ServiceOfferingName string `json:"serviceofferingname"`
State string `json:"state"`
TemplateId string `json:"templateid"`
}
type ListKubernetesClustersResponse struct {
ErrorInfo
Count int `json:"count"`
KubernetesCluster []KubernetesClusterResponse `json:"kubernetescluster"`
}
func (client *CloudstackClient) listKubernetesClusters(name string, domainId string) *ListKubernetesClustersResponse {
url := client.createURL(map[string]string{
"command": "listKubernetesClusters",
"name": name,
"domainid": domainId,
})
responseWrapper := struct {
Response ListKubernetesClustersResponse `json:"listkubernetesclustersresponse"`
}{}
getDeserializedResponse(url, &responseWrapper)
data := &responseWrapper.Response
checkErrorInfo(&data.ErrorInfo)
return data
}
func (client *CloudstackClient) GetKubernetesCluster(name string) *KubernetesClusterResponse {
domainId := client.GetDomainId("ROOT")
clusters := client.listKubernetesClusters(name, domainId)
if clusters.Count != 1 {
panic(fmt.Sprintf("Expected to find one k8s cluster, found %d instead", clusters.Count))
}
return &clusters.KubernetesCluster[0]
}
type QueryAsyncJobResultResponse struct {
ErrorInfo
Created string `json:"created"`
Completed string `json:"completed"`
JobStatus int `json:"jobstatus"`
JobResultCode int `json:"jobresultcode"`
JobResult struct {
// These fields are mutually exclusive
ErrorText string `json:"errortext"`
Success bool `json:"success"`
} `json:"jobresult"`
}
type JobFailedError struct {
JobID string
Status int
ResultCode int
ErrorText string
}
func (e *JobFailedError) Error() string {
return fmt.Sprintf("Job %s has status code %d", e.JobID, e.Status)
}
func (client *CloudstackClient) WaitForJobToComplete(jobID string) error {
url := client.createURL(map[string]string{
"command": "queryAsyncJobResult",
"jobid": jobID,
})
const maxTimeToWait = 1 * time.Hour
const retryInterval = 15 * time.Second
const maxTries = int(maxTimeToWait / retryInterval)
for i := 0; i < maxTries; i++ {
responseWrapper := struct {
Response QueryAsyncJobResultResponse `json:"queryasyncjobresultresponse"`
}{}
getDeserializedResponse(url, &responseWrapper)
data := &responseWrapper.Response
checkErrorInfo(&data.ErrorInfo)
if data.JobStatus == 0 {
log.Debug().Str("id", jobID).Msg("job is still pending")
time.Sleep(retryInterval)
continue
} else if data.JobStatus == 1 {
log.Debug().Str("id", jobID).Msg("job completed successfully")
return nil
} else {
// either an error, or an unknown status code
// see https://github.com/apache/cloudstack-cloudmonkey/blob/main/cmd/network.go
return &JobFailedError{
JobID: jobID,
Status: data.JobStatus,
ResultCode: data.JobResultCode,
ErrorText: data.JobResult.ErrorText,
}
}
}
return fmt.Errorf("Job %s took too long", jobID)
}
type GenericAsyncResponse struct {
ErrorInfo
JobId string `json:"jobid"`
}
func (client *CloudstackClient) UpgradeKubernetesCluster(clusterID string, kubernetesVersionID string) error {
url := client.createURL(map[string]string{
"command": "upgradeKubernetesCluster",
"id": clusterID,
"kubernetesversionid": kubernetesVersionID,
})
responseWrapper := struct {
Response GenericAsyncResponse `json:"upgradekubernetesclusterresponse"`
}{}
getDeserializedResponse(url, &responseWrapper)
data := &responseWrapper.Response
checkErrorInfo(&data.ErrorInfo)
return client.WaitForJobToComplete(data.JobId)
}
func (client *CloudstackClient) StopKubernetesCluster(clusterID string) {
url := client.createURL(map[string]string{
"command": "stopKubernetesCluster",
"id": clusterID,
})
responseWrapper := struct {
Response GenericAsyncResponse `json:"stopkubernetesclusterresponse"`
}{}
getDeserializedResponse(url, &responseWrapper)
data := &responseWrapper.Response
checkErrorInfo(&data.ErrorInfo)
err := client.WaitForJobToComplete(data.JobId)
if err != nil {
panic(err)
}
}
func (client *CloudstackClient) StartKubernetesCluster(clusterID string) {
url := client.createURL(map[string]string{
"command": "startKubernetesCluster",
"id": clusterID,
})
responseWrapper := struct {
Response GenericAsyncResponse `json:"startkubernetesclusterresponse"`
}{}
getDeserializedResponse(url, &responseWrapper)
data := &responseWrapper.Response
checkErrorInfo(&data.ErrorInfo)
err := client.WaitForJobToComplete(data.JobId)
if err != nil {
panic(err)
}
}
type DeleteKubernetesSupportedVersionResponse struct {
ErrorInfo
JobId string `json:"jobid"`
}
func (client *CloudstackClient) DeleteKubernetesSupportedVersion(kubernetesVersionID string) error {
url := client.createURL(map[string]string{
"command": "deleteKubernetesSupportedVersion",
"id": kubernetesVersionID,
})
responseWrapper := struct {
Response DeleteKubernetesSupportedVersionResponse `json:"deletekubernetessupportedversionresponse"`
}{}
getDeserializedResponse(url, &responseWrapper)
data := &responseWrapper.Response
checkErrorInfo(&data.ErrorInfo)
return client.WaitForJobToComplete(data.JobId)
}