From 03823f554fa1beba8b4a2f6a63aabb1b4f6f9422 Mon Sep 17 00:00:00 2001 From: Timothy Chen Date: Sun, 15 Oct 2017 20:31:47 -0700 Subject: [PATCH 1/2] WIP: Allow deployment file to be loaded from configdb --- api.go | 2 +- clients/deployer.go | 69 +++++++++++++++++++++++++++++++++------ clients/influx_client.go | 1 - db/db.go | 19 +++++++++++ documents/template.config | 3 +- jobs/clusters.go | 63 +++++++++++++++++++++++++---------- jobs/jobs.go | 5 +-- models/models.go | 1 + runners/benchmark.go | 1 - 9 files changed, 130 insertions(+), 34 deletions(-) diff --git a/api.go b/api.go index eb60cf2..aeb18f6 100644 --- a/api.go +++ b/api.go @@ -82,7 +82,7 @@ func (server *Server) StartServer() error { router.GET("/state/:runId", server.state) - jobManager, err := jobs.NewJobManager(server.Config) + jobManager, err := jobs.NewJobManager(server.Config, server.ConfigDB) if err != nil { return errors.New("Unable to create job manager: " + err.Error()) } diff --git a/clients/deployer.go b/clients/deployer.go index 0bcc715..2da79a9 100644 --- a/clients/deployer.go +++ b/clients/deployer.go @@ -360,12 +360,61 @@ func (client *DeployerClient) DeleteDeployment(deploymentId string, log *logging } func (client *DeployerClient) CreateDeployment( + deployment *deployer.Deployment, + loadTesterName string, + log *logging.Logger) (string, error) { + requestUrl := UrlBasePath(client.Url) + path.Join( + client.Url.Path, "v1", "deployments") + + response, err := resty.R().SetBody(deployment).Post(requestUrl) + if err != nil { + return "", err + } + + if response.StatusCode() != 202 { + return "", fmt.Errorf("Invalid status code returned %d: %s", response.StatusCode(), response.String()) + } + + var createResponse struct { + Error bool `json:"error"` + Data string `json:"data` + DeploymentId string `json:"deploymentId` + } + + if err := json.Unmarshal(response.Body(), &createResponse); err != nil { + return "", errors.New("Unable to parse failed create deployment response: " + err.Error()) + } + + log.Debugf("Received deployer response: %+v", createResponse) + if createResponse.Error { + return "", errors.New("Unable to create deployment: " + createResponse.Data) + } + + deploymentId := createResponse.DeploymentId + if deploymentId == "" { + return "", errors.New("Unable to get deployment id") + } + + log.Infof("Waiting for deployment %s to be available...", deploymentId) + if err := client.waitUntilDeploymentStateAvailable(deploymentId, log); err != nil { + return "", errors.New("Unable to waiting for deployment state to be available: " + err.Error()) + } + + log.Infof("Waiting for load tester %s service url to be available...", loadTesterName) + if err := client.waitUntilServiceUrlAvailable(deploymentId, loadTesterName, log); err != nil { + return "", fmt.Errorf("Unable to waiting for %s url to be available: %s", loadTesterName, err) + } + + return deploymentId, nil +} + +func (client *DeployerClient) CreateDeploymentWithTemplate( deploymentTemplate string, deployment *deployer.Deployment, loadTesterName string, - log *logging.Logger) (*string, error) { + log *logging.Logger) (string, error) { if deploymentTemplate == "" { - return nil, errors.New("Empty deployment template found") + return "", errors.New("Empty deployment template found") } requestUrl := UrlBasePath(client.Url) + path.Join( @@ -373,11 +422,11 @@ func (client *DeployerClient) CreateDeployment( response, err := resty.R().SetBody(deployment).Post(requestUrl) if err != nil { - return nil, err + return "", err } if response.StatusCode() != 202 { - return nil, fmt.Errorf("Invalid status code returned %d: %s", response.StatusCode(), response.String()) + return "", fmt.Errorf("Invalid status code returned %d: %s", response.StatusCode(), response.String()) } var createResponse struct { @@ -387,30 +436,30 @@ func (client *DeployerClient) CreateDeployment( } if err := json.Unmarshal(response.Body(), &createResponse); err != nil { - return nil, errors.New("Unable to parse failed create deployment response: " + err.Error()) + return "", errors.New("Unable to parse failed create deployment response: " + err.Error()) } log.Debugf("Received deployer response: %+v", createResponse) if createResponse.Error { - return nil, errors.New("Unable to create deployment: " + createResponse.Data) + return "", errors.New("Unable to create deployment: " + createResponse.Data) } deploymentId := createResponse.DeploymentId if deploymentId == "" { - return nil, errors.New("Unable to get deployment id") + return "", errors.New("Unable to get deployment id") } log.Infof("Waiting for deployment %s to be available...", deploymentId) if err := client.waitUntilDeploymentStateAvailable(deploymentId, log); err != nil { - return nil, errors.New("Unable to waiting for deployment state to be available: " + err.Error()) + return "", errors.New("Unable to waiting for deployment state to be available: " + err.Error()) } log.Infof("Waiting for load tester %s service url to be available...", loadTesterName) if err := client.waitUntilServiceUrlAvailable(deploymentId, loadTesterName, log); err != nil { - return nil, fmt.Errorf("Unable to waiting for %s url to be available: %s", loadTesterName, err) + return "", fmt.Errorf("Unable to waiting for %s url to be available: %s", loadTesterName, err) } - return &deploymentId, nil + return deploymentId, nil } func (client *DeployerClient) waitUntilServiceUrlAvailable( diff --git a/clients/influx_client.go b/clients/influx_client.go index 58214d2..1bfa247 100644 --- a/clients/influx_client.go +++ b/clients/influx_client.go @@ -1,7 +1,6 @@ package clients import ( - "bytes" "fmt" "os/exec" ) diff --git a/db/db.go b/db/db.go index db2fb3f..fceac2a 100644 --- a/db/db.go +++ b/db/db.go @@ -8,6 +8,7 @@ import ( "gopkg.in/mgo.v2/bson" "github.com/golang/glog" + deployer "github.com/hyperpilotio/deployer/apis" "github.com/hyperpilotio/workload-profiler/models" "github.com/spf13/viper" ) @@ -19,6 +20,7 @@ type ConfigDB struct { Database string ApplicationsCollection string BenchmarksCollection string + DeploymentCollection string NodeTypeCollection string PreviousGenerationCollection string } @@ -79,6 +81,23 @@ func (configDb *ConfigDB) GetApplicationConfig(name string) (*models.Application return &appConfig, nil } +func (configDb *ConfigDB) GetDeploymentConfig(name string) (*deployer.Deployment, error) { + session, sessionErr := connectMongo(configDb.Url, configDb.Database, configDb.User, configDb.Password) + if sessionErr != nil { + return nil, errors.New("Unable to create mongo session: " + sessionErr.Error()) + } + glog.V(1).Infof("Successfully connected to the config DB for deployment %s", name) + defer session.Close() + + collection := session.DB(configDb.Database).C(configDb.DeploymentCollection) + var deployment deployer.Deployment + if err := collection.Find(bson.M{"name": name}).One(&deployment); err != nil { + return nil, errors.New("Unable to find deployment config from db: " + err.Error()) + } + + return &deployment, nil +} + func (configDb *ConfigDB) GetNodeTypeConfig(region string) (*models.AWSRegionNodeTypeConfig, error) { session, sessionErr := connectMongo(configDb.Url, configDb.Database, configDb.User, configDb.Password) if sessionErr != nil { diff --git a/documents/template.config b/documents/template.config index fb986ca..278fa65 100644 --- a/documents/template.config +++ b/documents/template.config @@ -11,6 +11,7 @@ "benchmarkCollection": "benchmarks", "metricDatabase": "metricdb", "calibrationCollection": "calibration", - "profilingCollection": "profiling" + "profilingCollection": "profiling", + "deploymentCollection": "deployment" } } diff --git a/jobs/clusters.go b/jobs/clusters.go index bb5a932..6898194 100644 --- a/jobs/clusters.go +++ b/jobs/clusters.go @@ -13,6 +13,7 @@ import ( deployer "github.com/hyperpilotio/deployer/apis" "github.com/hyperpilotio/go-utils/log" "github.com/hyperpilotio/workload-profiler/clients" + "github.com/hyperpilotio/workload-profiler/db" "github.com/hyperpilotio/workload-profiler/models" logging "github.com/op/go-logging" "github.com/spf13/viper" @@ -51,6 +52,7 @@ type UnreserveResult struct { type cluster struct { deploymentTemplate string + deploymentFile string deploymentId string runId string state clusterState @@ -59,7 +61,8 @@ type cluster struct { } type Clusters struct { - Store blobstore.BlobStore + ClusterStore blobstore.BlobStore + ConfigDB *db.ConfigDB Config *viper.Viper DeployerClient *clients.DeployerClient mutex sync.Mutex @@ -87,20 +90,22 @@ func ParseStateString(state string) clusterState { type storeCluster struct { DeploymentTemplate string + DeploymentFile string DeploymentId string RunId string State string Created string } -func NewClusters(deployerClient *clients.DeployerClient, config *viper.Viper) (*Clusters, error) { +func NewClusters(deployerClient *clients.DeployerClient, config *viper.Viper, configDb *db.ConfigDB) (*Clusters, error) { clusterStore, err := blobstore.NewBlobStore("WorkloadProfilerClusters", config) if err != nil { return nil, errors.New("Unable to create deployments store: " + err.Error()) } return &Clusters{ - Store: clusterStore, + ClusterStore: clusterStore, + ConfigDB: configDb, Config: config, DeployerClient: deployerClient, Deployments: []*cluster{}, @@ -109,7 +114,7 @@ func NewClusters(deployerClient *clients.DeployerClient, config *viper.Viper) (* } func (clusters *Clusters) ReloadClusterState() error { - existingClusters, err := clusters.Store.LoadAll(func() interface{} { + existingClusters, err := clusters.ClusterStore.LoadAll(func() interface{} { return &storeCluster{} }) @@ -129,6 +134,7 @@ func (clusters *Clusters) ReloadClusterState() error { if deploymentReady { reloadCluster := &cluster{ deploymentTemplate: storeCluster.DeploymentTemplate, + deploymentFile: storeCluster.DeploymentFile, deploymentId: storeCluster.DeploymentId, runId: storeCluster.RunId, state: ParseStateString(storeCluster.State), @@ -143,7 +149,7 @@ func (clusters *Clusters) ReloadClusterState() error { storeClusters = append(storeClusters, reloadCluster) } else { glog.V(1).Infof("Found recovered cluster %s to be not available, deleting from store", storeCluster.DeploymentId) - if err := clusters.Store.Delete(storeCluster.RunId); err != nil { + if err := clusters.ClusterStore.Delete(storeCluster.RunId); err != nil { glog.Errorf("Unable to delete profiler cluster: %s", err.Error()) } } @@ -177,6 +183,7 @@ func (clusters *Clusters) ReloadClusterState() error { func (clusters *Clusters) newStoreCluster(selectedCluster *cluster) (*storeCluster, error) { cluster := &storeCluster{ DeploymentTemplate: selectedCluster.deploymentTemplate, + DeploymentFile: selectedCluster.deploymentFile, DeploymentId: selectedCluster.deploymentId, RunId: selectedCluster.runId, State: GetStateString(selectedCluster.state), @@ -226,6 +233,7 @@ func (clusters *Clusters) ReserveDeployment( selectedCluster = &cluster{ deploymentTemplate: applicationConfig.DeploymentTemplate, + deploymentFile: applicationConfig.DeploymentFile, runId: runId, state: DEPLOYING, created: time.Now(), @@ -244,8 +252,8 @@ func (clusters *Clusters) ReserveDeployment( return } - glog.Infof("New cluster deployed successfully with deployment id %s", *deploymentId) - selectedCluster.deploymentId = *deploymentId + glog.Infof("New cluster deployed successfully with deployment id %s", deploymentId) + selectedCluster.deploymentId = deploymentId selectedCluster.state = RESERVED if err := clusters.storeCluster(selectedCluster); err != nil { @@ -253,7 +261,7 @@ func (clusters *Clusters) ReserveDeployment( } reserveResult <- ReserveResult{ - DeploymentId: *deploymentId, + DeploymentId: deploymentId, } }() } else { @@ -292,7 +300,7 @@ func (clusters *Clusters) ReserveDeployment( selectedCluster.runId = runId if originRunId != "" { - if err := clusters.Store.Delete(originRunId); err != nil { + if err := clusters.ClusterStore.Delete(originRunId); err != nil { log.Errorf("Unable to delete profiler cluster: %s", err.Error()) } } @@ -334,7 +342,7 @@ func (clusters *Clusters) unreserveCluster(cluster *cluster, deleteCluster bool, RunId: cluster.runId, } - if err := clusters.Store.Delete(cluster.runId); err != nil { + if err := clusters.ClusterStore.Delete(cluster.runId); err != nil { glog.Errorf("Unable to delete profiler cluster: %s", err.Error()) } return unreserveResult @@ -354,7 +362,7 @@ func (clusters *Clusters) unreserveCluster(cluster *cluster, deleteCluster bool, clusters.removeDeployment(cluster.runId) - if err := clusters.Store.Delete(cluster.runId); err != nil { + if err := clusters.ClusterStore.Delete(cluster.runId); err != nil { glog.Errorf("Unable to delete profiler cluster: %s", err.Error()) } }() @@ -384,7 +392,7 @@ func (clusters *Clusters) storeCluster(cluster *cluster) error { return fmt.Errorf("Unable to create store cluster for run %s: %s", cluster.runId, err) } - if err := clusters.Store.Store(storeCluster.RunId, storeCluster); err != nil { + if err := clusters.ClusterStore.Store(storeCluster.RunId, storeCluster); err != nil { return fmt.Errorf("Unable to store %s cluster: %s", cluster.runId, err.Error()) } @@ -395,16 +403,34 @@ func (clusters *Clusters) createDeployment( applicationConfig *models.ApplicationConfig, jobDeploymentConfig JobDeploymentConfig, runId string, - log *logging.Logger) (*string, error) { + log *logging.Logger) (string, error) { // TODO: We assume region is us-east-1 and we assume Kubernetes only. clusterDefinition := &deployer.ClusterDefinition{ Nodes: jobDeploymentConfig.GetNodes(), } userId := clusters.Config.GetString("defaultClusterUserId") + if applicationConfig.DeploymentFile != "" { + // Create deployment with deployment file + deployment, err := clusters.ConfigDB.GetDeploymentConfig(applicationConfig.DeploymentFile) + if err != nil { + return "", errors.New("Unable to load deployment from configdb: " + err.Error()) + } + + if userId != "" { + deployment.UserId = userId + } + + deploymentId, err := clusters.DeployerClient.CreateDeployment( + deployment, applicationConfig.LoadTester.Name, log) + if err != nil { + return "", errors.New("Unable to create deployment with deployer client: " + err.Error()) + } + + return deploymentId, nil + } deployment := &deployer.Deployment{ - Region: "us-east-1", Name: "workload-profiler-" + applicationConfig.Name, NodeMapping: []deployer.NodeMapping{}, ClusterDefinition: *clusterDefinition, @@ -417,14 +443,15 @@ func (clusters *Clusters) createDeployment( deployment.UserId = userId } + // Create deployment with template for _, appTask := range applicationConfig.TaskDefinitions { nodeMapping := &deployer.NodeMapping{} if err := clusters.convertBsonType(appTask.NodeMapping, nodeMapping); err != nil { - return nil, errors.New("Unable to convert to nodeMapping: " + err.Error()) + return "", errors.New("Unable to convert to nodeMapping: " + err.Error()) } kubernetesTask := &deployer.KubernetesTask{} if err := clusters.convertBsonType(appTask.TaskDefinition, kubernetesTask); err != nil { - return nil, errors.New("Unable to convert to nodeMapping: " + err.Error()) + return "", errors.New("Unable to convert to nodeMapping: " + err.Error()) } deployment.NodeMapping = append(deployment.NodeMapping, *nodeMapping) @@ -432,10 +459,10 @@ func (clusters *Clusters) createDeployment( append(deployment.KubernetesDeployment.Kubernetes, *kubernetesTask) } - deploymentId, createErr := clusters.DeployerClient.CreateDeployment( + deploymentId, createErr := clusters.DeployerClient.CreateDeploymentWithTemplate( applicationConfig.DeploymentTemplate, deployment, applicationConfig.LoadTester.Name, log) if createErr != nil { - return nil, errors.New("Unable to create deployment: " + createErr.Error()) + return "", errors.New("Unable to create deployment: " + createErr.Error()) } return deploymentId, nil diff --git a/jobs/jobs.go b/jobs/jobs.go index 2929c86..21f5223 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -10,6 +10,7 @@ import ( deployer "github.com/hyperpilotio/deployer/apis" "github.com/hyperpilotio/go-utils/log" "github.com/hyperpilotio/workload-profiler/clients" + "github.com/hyperpilotio/workload-profiler/db" "github.com/hyperpilotio/workload-profiler/models" "github.com/spf13/viper" ) @@ -194,13 +195,13 @@ type JobManager struct { mutex sync.Mutex } -func NewJobManager(config *viper.Viper) (*JobManager, error) { +func NewJobManager(config *viper.Viper, configDb *db.ConfigDB) (*JobManager, error) { deployerClient, err := clients.NewDeployerClient(config) if err != nil { return nil, errors.New("Unable to create new deployer client: " + err.Error()) } - clusters, err := NewClusters(deployerClient, config) + clusters, err := NewClusters(deployerClient, config, configDb) if err != nil { return nil, errors.New("Unable to create clusters object: " + err.Error()) } diff --git a/models/models.go b/models/models.go index 948db85..7aa6a66 100644 --- a/models/models.go +++ b/models/models.go @@ -93,6 +93,7 @@ type Benchmark struct { type ApplicationConfig struct { Name string `bson:"name" json:"name"` + DeploymentFile string `bson:"deploymentFile" json:"deploymentFile"` DeploymentTemplate string `bson:"deploymentTemplate" json:"deploymentTemplate"` TaskDefinitions []ApplicationTask `bson:"taskDefinitions" json:"taskDefinitions"` ServiceNames []string `bson:"serviceNames" json:"serviceNames"` diff --git a/runners/benchmark.go b/runners/benchmark.go index 421d4c1..ec2877a 100644 --- a/runners/benchmark.go +++ b/runners/benchmark.go @@ -401,7 +401,6 @@ func (run *SingleBenchmarkInfluxRun) Run(deploymentId string) error { // TODO: Snapshot influx data //run.snapshotInfluxData() - return nil } From ad3876a44c884bfb72cd137b1179ad459bf8c364 Mon Sep 17 00:00:00 2001 From: William Chen Date: Mon, 16 Oct 2017 23:32:46 +0800 Subject: [PATCH 2/2] [WIP] 1. add deploymentCollection field to configDB 2. modify deployed.config for fit AWS config (in-cluster deployer for GCP not ready for test) TODO: haven't fully go through the test yet. It can push deploy JSON file to in-cluster deployer, but create resource-worker-service fail because of it doesn't contains anything in NodeMapping section. --- clients/deployer.go | 1 + db/db.go | 1 + documents/deployed.config | 9 ++++++--- jobs/clusters.go | 1 - 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/clients/deployer.go b/clients/deployer.go index 2da79a9..53c7359 100644 --- a/clients/deployer.go +++ b/clients/deployer.go @@ -413,6 +413,7 @@ func (client *DeployerClient) CreateDeploymentWithTemplate( deployment *deployer.Deployment, loadTesterName string, log *logging.Logger) (string, error) { + if deploymentTemplate == "" { return "", errors.New("Empty deployment template found") } diff --git a/db/db.go b/db/db.go index fceac2a..6641418 100644 --- a/db/db.go +++ b/db/db.go @@ -46,6 +46,7 @@ func NewConfigDB(config *viper.Viper) *ConfigDB { BenchmarksCollection: config.GetString("database.benchmarkCollection"), NodeTypeCollection: config.GetString("database.nodeTypeCollection"), PreviousGenerationCollection: config.GetString("database.previousGenerationCollection"), + DeploymentCollection: config.GetString("database.deploymentCollection"), } } diff --git a/documents/deployed.config b/documents/deployed.config index 795b87d..5e7598f 100644 --- a/documents/deployed.config +++ b/documents/deployed.config @@ -4,10 +4,12 @@ "analyzerUrl": "http://analyzer:5000", "writeResults": true, "filesPath": "/tmp/profiler", - "userId": "hyperpilot", + "userId": "william", "workerCount": 5, + "awsId": "AKIAIMJUJ75EFI6SU2RA", + "awsSecret": "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", "database": { - "url": "mongo-serve:27017", + "url": "ds161262.mlab.com:61262", "user": "analyzer", "password": "hyperpilot", "configDatabase": "configdb", @@ -19,7 +21,8 @@ "calibrationCollection": "calibration", "profilingCollection": "profiling", "sizingCollection": "sizing", - "allInstanceCollection": "allinstance" + "allInstanceCollection": "allinstance", + "deploymentCollection": "deployments" }, "store": { "type": "simpledb", diff --git a/jobs/clusters.go b/jobs/clusters.go index 6898194..7dcc02b 100644 --- a/jobs/clusters.go +++ b/jobs/clusters.go @@ -458,7 +458,6 @@ func (clusters *Clusters) createDeployment( deployment.KubernetesDeployment.Kubernetes = append(deployment.KubernetesDeployment.Kubernetes, *kubernetesTask) } - deploymentId, createErr := clusters.DeployerClient.CreateDeploymentWithTemplate( applicationConfig.DeploymentTemplate, deployment, applicationConfig.LoadTester.Name, log) if createErr != nil {